Slow processing with map when using deepspeed or fairscale

I apply the tokenizer to my custom dataset using the datasets.Dataset.map() method as done in the run_mlm.py example. FYI, I am using multiprocessing by setting num_proc parameter of map().

I have a multi-GPU system, and doing the above usually takes about ~10 minutes. But once I use DeepSpeed (deepspeed --include localhost:0,1,2), the process takes much longer (~20 minutes). I’m aware the preprocessing is done 100% on the CPU, so I don’t know why I am experiencing this slow down in speed in the first place. If I limit the number of visible devices to just one (deepspeed --include localhost:0), the preprocessing using map() seems to be fast again.

Maybe @stas will have an idea?

What about DDP with the same number of processes?

python -m torch.distributed.launch --nproc_per_node=3 

You will most likely see the same slow-down as now you have more than 1 process compete over your limited resources. So if the problem is the same, it’s then neither deepspeed nor fairscale but how many processes you use.

With deepspeed or fairscale, when debugging such problems, first always try to remove these from the equation, and do the same setup in straight pytorch.

Correct. Deepspeed or fairscale wasn’t the problem. Can you elaborate a little more on the following?

… more than 1 process compete over your limited resources

So if I understand correctly, if I want to take advantage of fast training using deepspeed and multiple GPUs, is this ‘fighting over limited resources’ inevitable for the preprocessing with map()?

It’s inevitable for any parallel processing. e.g. here you’re most likely hitting an issue with IO, rather than CPU, since now your disc needs to read/write 3 times more - if it’s SSD/NVMe it won’t be that bad, but if it’s normal HD, the disc head will have to madly spin back-n-forth to support the concurrent needs.

And of course your CPU will be 3 times as concurrently busy as when handling only one process (if we ignore other processes)

In such scenarios you ideally want to do all the pre-processing separately before you do the training, so at training time you only need to read data.

Otherwise you need a fast SSD/NVMe to solve the IO bottleneck, and many CPU cores to solve the CPU bottleneck.

2 Likes

Instead of pre-processing, you can also do this:

Mapping in a distributed setting

In a distributed setting, you may use caching and a torch.distributed.barrier() to make sure that only the main process performs the mapping, while the other ones load its results. This avoids duplicating work between all the processes, or worse, requesting more CPUs than your system can handle.

continued at:
https://huggingface.co/docs/datasets/processing.html?highlight=map#mapping-in-a-distributed-setting

2 Likes

In such scenarios you ideally want to do all the pre-processing separately before you do the training, so at training time you only need to read data.

This is what I tried initially. I tried to save the preprocessed data to the disk using save_to_disk(). But my dataset is extremely large, and even saving the dataset within a reasonable time period was not possible given that my hard disk space is limited as well as my RAM.

So I tried using the torch.distributed.barrier() as you pointed me to.

test_dataset = Dataset.from_dict(test_dataset)

if training_args.local_rank > 0:
    logger.info('Waiting for main process to perform the mapping')
    torch.distributed.barrier()

test_dataset = test_dataset.map(
    tokenize_function,
    batched=True,
    num_proc=data_args.preprocessing_num_workers,
    cache_file_name='/data/Jason/.cache/test_dataset.cache',
    load_from_cache_file=True if training_args.local_rank > 0 else False,
)

if training_args.local_rank == 0:
    logger.info('Loading results from the main process')
    torch.distributed.barrier()

I can see that the main process does process and save the cache as seen below.

However, once the main process reaches the barrier and other non-main processes resume, these non-main processes do not load the cache saved by the main process. Is there something that I’m doing wrong in my code?

Thanks.

Honestly, I haven’t tried this code, just found it when researching to answer your question. It’s possible that there is a bug in datasets?

We are going to add this feature to all HF Trainer-based examples [examples] [distributed] process datasets.map only on main process in · Issue #12345 · huggingface/transformers · GitHub so watch that Issue, I will try to work on it shortly and see what needs to be done to make it work.

Thank you. :hugs: I will also update once I find anything on my side.

1 Like

It seems to do the right thing using the barrier code - same as yours (I added some debug prints to it):

        import torch
        if training_args.local_rank > 0:
            logger.info('Waiting for main process to perform the mapping')
            torch.distributed.barrier()

        print(f"START PREPROCESSING on RANK {training_args.local_rank}")
        train_dataset = train_dataset.map(
            preprocess_function,
            batched=True,
            num_proc=data_args.preprocessing_num_workers,
            remove_columns=column_names,
            load_from_cache_file=not data_args.overwrite_cache,
            desc="Running tokenizer on train dataset",
        )
        print(f"END PREPROCESSING on RANK {training_args.local_rank}")

        if training_args.local_rank == 0:
            logger.info('Loading results from the main process')
            torch.distributed.barrier()

Here is the log with some new lines added to make it easier to read:

START PREPROCESSING on RANK 0
Running tokenizer on train dataset:   0%|                                                                                                  | 0/1 [00:00<?, ?ba/s]
06/24/2021 19:21:09 - INFO - datasets.arrow_dataset -   Caching processed dataset at /home/stas/.cache/huggingface/datasets/wmt16/ro-en/1.0.0/0d9fb3e814712c78517
6ad8cdb9f465fbe6479000ee6546725db30ad8a8b5f8a/cache-1bb178ab5058b59a.arrow
Running tokenizer on train dataset: 100%|██████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00, 48.21ba/s]
06/24/2021 19:21:09 - INFO - datasets.arrow_writer -   Done writing 500 examples in 113764 bytes /home/stas/.cache/huggingface/datasets/wmt16/ro-en/1.0.0/0d9fb3e
814712c785176ad8cdb9f465fbe6479000ee6546725db30ad8a8b5f8a/tmp34tbvrkc.
END PREPROCESSING on RANK 0

06/24/2021 19:21:09 - INFO - __main__ -   Loading results from the main process


START PREPROCESSING on RANK 1
06/24/2021 19:21:12 - WARNING - datasets.arrow_dataset -   Loading cached processed dataset at /home/stas/.cache/huggingface/datasets/wmt16/ro-en/1.0.0/0d9fb3e81
4712c785176ad8cdb9f465fbe6479000ee6546725db30ad8a8b5f8a/cache-1bb178ab5058b59a.arrow
END PREPROCESSING on RANK 1

this is on 2 gpus torch distributed.

So the logs look correct to me. The 2nd process loads the cache.

note, I deleted the cache before running the script, i.e.:

rm /home/stas/.cache/huggingface/datasets/wmt16/ro-en/1.0.0/0d9fb3e814712c785176ad8cdb9f465fbe6479000ee6546725db30ad8a8b5f8a/cache-1bb178ab5058b59a.arrow

so that the log shows that the cache was created by the first process. otherwise it too will load the cache.

We are just going to make a context manager to make it easier.

Also make sure you’re on the latest datasets.

I was also able to reproduce the result. Thanks for the prompt support @stas and @sgugger.

2 Likes