How to run an end to end example of distributed data parallel with hugging face's trainer api (ideally on a single node multiple gpus)?

cross posted: python - How to run an end to end example of distributed data parallel with hugging face's trainer api (ideally on a single node multiple gpus)? - Stack Overflow

I’ve extensively look over the internet, hugging face’s (hf’s) discuss forum & repo but found no end to end example of how to properly do ddp/distributed data parallel with HF (links at the end).

This is what I need to be capable of running it end to end:

  1. do we wrap the hf model in DDP? (script needs to know how to synchronize stuff at some point somehow somewhere, otherwise just launching torch.distributed from the command line)
  2. do we change the args to trainer or trainer args in anyway?
    wrap the optimizer in any distributed trainer (like cherry? cherry is a pytorch lib for things like this)
  3. do we do the usual init group that is usually needed for ddp?
  4. what is the role of local rank?
  5. terminal launch script e.g. python -m torch.distributed.launch --nproc_per_node=2 distributed_maml.py
  6. how do we use the world size to shard the data at each loop e.g. see learn2learn/distributed_maml.py at master · learnables/learn2learn · GitHub

given answers to those I think I could write my own notebook and share it widely.

This is my starter code that I want to complete but unsure if I am doing it right (especially since I don’t know which args to trainer to change):

"""

- training on multiple gpus: https://huggingface.co/docs/transformers/perf_train_gpu_many#efficient-training-on-multiple-gpus
- data paralelism, dp vs ddp: https://huggingface.co/docs/transformers/perf_train_gpu_many#data-parallelism
- github example: https://github.com/huggingface/transformers/tree/main/examples/pytorch#distributed-training-and-mixed-precision
    - above came from hf discuss: https://discuss.huggingface.co/t/using-transformers-with-distributeddataparallel-any-examples/10775/7

⇨ Single Node / Multi-GPU

Model fits onto a single GPU:

DDP - Distributed DP
ZeRO - may or may not be faster depending on the situation and configuration used.

...https://huggingface.co/docs/transformers/perf_train_gpu_many#scalability-strategy

python -m torch.distributed.launch \
    --nproc_per_node number_of_gpu_you_have path_to_script.py \
	--all_arguments_of_the_script

python -m torch.distributed.launch --nproc_per_node 2 main_data_parallel_ddp_pg.py
python -m torch.distributed.launch --nproc_per_node 2 ~/ultimate-utils/tutorials_for_myself/my_hf_hugging_face_pg/main_data_parallel_ddp_pg.py

e.g.
python -m torch.distributed.launch \
    --nproc_per_node 8 pytorch/text-classification/run_glue.py \

    --model_name_or_path bert-large-uncased-whole-word-masking \
    --task_name mnli \
    --do_train \
    --do_eval \
    --max_seq_length 128 \
    --per_device_train_batch_size 8 \
    --learning_rate 2e-5 \
    --num_train_epochs 3.0 \
    --output_dir /tmp/mnli_output/
"""
# %%

# - init group
# - set up processes a la l2l
# local_rank: int = local_rank: int = int(os.environ["LOCAL_RANK"]) # get_local_rank()
# print(f'{local_rank=}')
## init_process_group_l2l(args, local_rank=local_rank, world_size=args.world_size, init_method=args.init_method)
# init_process_group_l2l bellow
# if is_running_parallel(rank):
#     print(f'----> setting up rank={rank} (with world_size={world_size})')
#     # MASTER_ADDR = 'localhost'
#     MASTER_ADDR = '127.0.0.1'
#     MASTER_PORT = master_port
#     # set up the master's ip address so this child process can coordinate
#     os.environ['MASTER_ADDR'] = MASTER_ADDR
#     print(f"---> {MASTER_ADDR=}")
#     os.environ['MASTER_PORT'] = MASTER_PORT
#     print(f"---> {MASTER_PORT=}")
#
#     # - use NCCL if you are using gpus: https://pytorch.org/tutorials/intermediate/dist_tuto.html#communication-backends
#     if torch.cuda.is_available():
#         backend = 'nccl'
#         # You need to call torch_uu.cuda.set_device(rank) before init_process_group is called. https://github.com/pytorch/pytorch/issues/54550
#         torch.cuda.set_device(
#             args.device)  # is this right if we do parallel cpu? # You need to call torch_uu.cuda.set_device(rank) before init_process_group is called. https://github.com/pytorch/pytorch/issues/54550
#     print(f'---> {backend=}')
# rank: int = torch.distributed.get_rank() if is_running_parallel(local_rank) else -1

# https://huggingface.co/docs/transformers/tasks/translation
import datasets
from datasets import load_dataset, DatasetDict

books: DatasetDict = load_dataset("opus_books", "en-fr")
print(f'{books=}')

books: DatasetDict = books["train"].train_test_split(test_size=0.2)
print(f'{books=}')
print(f'{books["train"]=}')

print(books["train"][0])
"""
{'id': '90560',
 'translation': {'en': 'But this lofty plateau measured only a few fathoms, and soon we reentered Our Element.',
  'fr': 'Mais ce plateau élevé ne mesurait que quelques toises, et bientôt nous fûmes rentrés dans notre élément.'}}
"""

# - t5 tokenizer

from transformers import AutoTokenizer, PreTrainedTokenizerFast, PreTrainedTokenizer

tokenizer: PreTrainedTokenizerFast = AutoTokenizer.from_pretrained("t5-small")
print(f'{isinstance(tokenizer, PreTrainedTokenizer)=}')
print(f'{isinstance(tokenizer, PreTrainedTokenizerFast)=}')

source_lang = "en"
target_lang = "fr"
prefix = "translate English to French: "


def preprocess_function(examples):
    inputs = [prefix + example[source_lang] for example in examples["translation"]]
    targets = [example[target_lang] for example in examples["translation"]]
    model_inputs = tokenizer(inputs, max_length=128, truncation=True)

    with tokenizer.as_target_tokenizer():
        labels = tokenizer(targets, max_length=128, truncation=True)

    model_inputs["labels"] = labels["input_ids"]
    return model_inputs


# Then create a smaller subset of the dataset as previously shown to speed up the fine-tuning: (hack to seep up tutorial)
books['train'] = books["train"].shuffle(seed=42).select(range(100))
books['test'] = books["test"].shuffle(seed=42).select(range(100))

# # use 🤗 Datasets map method to apply a preprocessing function over the entire dataset:
# tokenized_datasets = dataset.map(tokenize_function, batched=True, batch_size=2)

# todo - would be nice to remove this since gpt-2/3 size you can't preprocess the entire data set...or can you?
# tokenized_books = books.map(preprocess_function, batched=True, batch_size=2)
from uutils.torch_uu.data_uu.hf_uu_data_preprocessing import preprocess_function_translation_tutorial

preprocessor = lambda examples: preprocess_function_translation_tutorial(examples, tokenizer)
tokenized_books = books.map(preprocessor, batched=True, batch_size=2)
print(f'{tokenized_books=}')

# - load model
from transformers import AutoModelForSeq2SeqLM

model = AutoModelForSeq2SeqLM.from_pretrained("t5-small")

# - to DDP
# model = model().to(rank)
# from torch.nn.parallel import DistributedDataParallel as DDP
# ddp_model = DDP(model, device_ids=[rank])

# Use DataCollatorForSeq2Seq to create a batch of examples. It will also dynamically pad your text and labels to the
# length of the longest element in its batch, so they are a uniform length.
# While it is possible to pad your text in the tokenizer function by setting padding=True, dynamic padding is more efficient.

from transformers import DataCollatorForSeq2Seq

# Data collator that will dynamically pad the inputs received, as well as the labels.
data_collator: DataCollatorForSeq2Seq = DataCollatorForSeq2Seq(tokenizer=tokenizer, model=model)

"""
At this point, only three steps remain:

- Define your training hyperparameters in Seq2SeqTrainingArguments.
- Pass the training arguments to Seq2SeqTrainer along with the model, dataset, tokenizer, and data collator.
- Call train() to fine-tune your model.
"""
report_to = "none"
if report_to != 'none':
    import wandb
    wandb.init(project="playground", entity="brando", name='run_name', group='expt_name')

from transformers import Seq2SeqTrainingArguments, Seq2SeqTrainer

# fp16 = True # cuda
# fp16 = False # cpu
import torch

fp16 = torch.cuda.is_available()  # True for cuda, false for cpu
training_args = Seq2SeqTrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    weight_decay=0.01,
    save_total_limit=3,
    num_train_epochs=1,
    fp16=fp16,
    report_to=report_to,
)

trainer = Seq2SeqTrainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_books["train"],
    eval_dataset=tokenized_books["test"],
    tokenizer=tokenizer,
    data_collator=data_collator,
)

trainer.train()

print('\n ----- Success\a')

All references I consulted when writing this question:

It’s actually very simple and straightforward to do this. The Trainer will automatically pick up the number of devices you want to use. Any and all of the examples in transformers/examples/pytorch are capable to be ran on multi-gpu automatically. HuggingFace fully supports all DDP

In my example I’ll use the text classification one.

To check if it’s using two GPU’s the whole time, I’ll start with watch -n0.1 nvidia-smi in a separate terminal.

Now I’m assuming we’re running this through a clone of the repo, so the args will be setup similar to how the tests are done:

torchrun --n-proc-per-node 2 examples/pytorch/text-classification/run_glue.py --model_name_or_path distilbert-base-uncased --output_dir outputs --train_file ./tests/fixtures/tests_samples/MRPC/train.csv --validation_file ./tests/fixtures/tests_samples/MRPC/dev.csv --do_train --do_eval --per_device_train_batch_size=2 --per_device_eval_batch_size=1

And that’s all it takes, just launch it like normal via torchrun! You’ll see that both GPUs get utilized immediatly

If you wanted to avoid this, mix it in with our Accelerate library, run accelerate config, and then all you have to do is launch the script via:

accelerate launch examples/pytorch/text-classification/run_glue.py --model_name_or_path distilbert-base-uncased --output_dir outputs --train_file ./tests/fixtures/tests_samples/MRPC/train.csv --validation_file ./tests/fixtures/tests_samples/MRPC/dev.csv --do_train --do_eval --per_device_train_batch_size=2 --per_device_eval_batch_size=1

The config you set will wrap around all the complicated torchrun bits, so you don’t need to do all of that yourself. Even if you don’t use Accelerate for any actual training

Not entirely sure what Cherry is, but Accelerate is a lower-level library capable of doing DDP across single and multi node GPU and TPU, so any and all forms of DDP. But again, Trainer handles all the magic for you in that regard, you just need to launch it the right way

1 Like

ok I must admit it didn’t occur to me to just run my normal script by appending torchrun --nnodes 2 ... or python -m torch.distributed.launch --nproc_per_node 2 main_data_parallel_ddp_pg.py (with the more familiar/older way to launch for me).

I will launch it as you suggested and track the nvidia-smi usage.

I assume that somehow the processes are in communication and know by coordination when a epoch has truly ended.

Thanks that was useful.


fyi in case torchrun doesn’t work for you:

#pip install torch==1.9.1+cu111 torchvision==0.10.1+cu111 torchaudio==0.9.1 -f https://download.pytorch.org/whl/torch_stable.html
pip install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cu113

or if you want upgrade:

pip install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cu113
pip install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cu113 --upgrade
1 Like

@muellerzr thank you so much I appreciate your help! Needed to update a bunch of my libraries since my torch was old and it wasn’t running it but now that it does I did:

python -m torch.distributed.launch --nproc_per_node 2 ~/src/main_debug.py

and it worked! see the nvidia-smi & script running side by side(tmux is annoying for copying stuff so doing screen shot):

1 Like

however, torchrun didn’t work:

torchrun --nnodes 2 ...my_script.py

seems to deadlock despite my pytorch libs being up to date. Do you know what might be happening there?

torch                   1.12.1+cu113
torchaudio              0.12.1+cu113
torchtext               0.13.1
torchvision             0.13.1+cu113

I did:

pip install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cu113
pip install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cu113 --upgrade

Needs to be n-proc-per-node

Node = computer in this case. Updated the example above, think i forgot to do that when I did the bug myself!

ok this is the command:

torchrun --nproc_per_node 2 my_script.py
1 Like

@muellerzr sorry for dragging this conversation…but now I have 8 wandb runs being made but I only need 1. You think is this line:

# report_to = "none"
report_to = "wandb"
if report_to != 'none':
    wandb.init(project="proj", entity="me", name='run_name', group='expt_name')

?

How have you stopped this?

In Accelerate we report only on the main process, as shown here: transformers/run_glue_no_trainer.py at main · huggingface/transformers · GitHub

For non-accelerate, I think it’d look something like:

if int(os.environ.get("LOCAL_RANK", -1)) == 0:
  # do any wandb thing on a single node

and do that every single time.

Note this is torch.distributed.run/launch specific, so it won’t work for TPUs in this way. But you need to check that you are on the local node/main node.

Since you’re very interested in the inner workings like this, would recommend giving Accelerate a peek if possible. Sounds like you may like what we’ve done there :slight_smile:

(Though transformers does not use Accelerate internally right now for this type of stuff)

1 Like

Oh cool. Does accelerate work even without hugging face (HF) models? I have a bunch of other code & models I’ve developed in pytorch over the years and it seems it’s nearly trivial to plug accelerate in.

Btw, I want to ask explicitly this too. Is accelerate compatible with HF’s trainer?

e.g.

+ from accelerate import Accelerator
+ accelerator = Accelerator()

+ model, optimizer, training_dataloader, scheduler = accelerator.prepare(
+     model, optimizer, training_dataloader, scheduler
+ )

trainer = Trainer(...model, optimizer, training_dataloader, scheduler...)
trainer.train()

(inspired from Accelerate)

Btw, thanks in advance for being so generous with your advice :slight_smile:

Yes, quite so. E.g. its currently whats used for fastai’s entire distributed module now. If its PyTorch, it can be done with Accelerate.

The repo has a cv example using timm

Not right now, no. Since Trainer handles all the DDP Accelerate can do internally. But as mentioned earlier, you can still use accelerate to launch those scripts :slight_smile:

1 Like