How to run an end to end example of distributed data parallel with hugging face's trainer api (ideally on a single node multiple gpus)?

cross posted: python - How to run an end to end example of distributed data parallel with hugging face's trainer api (ideally on a single node multiple gpus)? - Stack Overflow

I’ve extensively look over the internet, hugging face’s (hf’s) discuss forum & repo but found no end to end example of how to properly do ddp/distributed data parallel with HF (links at the end).

This is what I need to be capable of running it end to end:

  1. do we wrap the hf model in DDP? (script needs to know how to synchronize stuff at some point somehow somewhere, otherwise just launching torch.distributed from the command line)
  2. do we change the args to trainer or trainer args in anyway?
    wrap the optimizer in any distributed trainer (like cherry? cherry is a pytorch lib for things like this)
  3. do we do the usual init group that is usually needed for ddp?
  4. what is the role of local rank?
  5. terminal launch script e.g. python -m torch.distributed.launch --nproc_per_node=2 distributed_maml.py
  6. how do we use the world size to shard the data at each loop e.g. see learn2learn/distributed_maml.py at master · learnables/learn2learn · GitHub

given answers to those I think I could write my own notebook and share it widely.

This is my starter code that I want to complete but unsure if I am doing it right (especially since I don’t know which args to trainer to change):

"""

- training on multiple gpus: https://huggingface.co/docs/transformers/perf_train_gpu_many#efficient-training-on-multiple-gpus
- data paralelism, dp vs ddp: https://huggingface.co/docs/transformers/perf_train_gpu_many#data-parallelism
- github example: https://github.com/huggingface/transformers/tree/main/examples/pytorch#distributed-training-and-mixed-precision
    - above came from hf discuss: https://discuss.huggingface.co/t/using-transformers-with-distributeddataparallel-any-examples/10775/7

⇨ Single Node / Multi-GPU

Model fits onto a single GPU:

DDP - Distributed DP
ZeRO - may or may not be faster depending on the situation and configuration used.

...https://huggingface.co/docs/transformers/perf_train_gpu_many#scalability-strategy

python -m torch.distributed.launch \
    --nproc_per_node number_of_gpu_you_have path_to_script.py \
	--all_arguments_of_the_script

python -m torch.distributed.launch --nproc_per_node 2 main_data_parallel_ddp_pg.py
python -m torch.distributed.launch --nproc_per_node 2 ~/ultimate-utils/tutorials_for_myself/my_hf_hugging_face_pg/main_data_parallel_ddp_pg.py

e.g.
python -m torch.distributed.launch \
    --nproc_per_node 8 pytorch/text-classification/run_glue.py \

    --model_name_or_path bert-large-uncased-whole-word-masking \
    --task_name mnli \
    --do_train \
    --do_eval \
    --max_seq_length 128 \
    --per_device_train_batch_size 8 \
    --learning_rate 2e-5 \
    --num_train_epochs 3.0 \
    --output_dir /tmp/mnli_output/
"""
# %%

# - init group
# - set up processes a la l2l
# local_rank: int = local_rank: int = int(os.environ["LOCAL_RANK"]) # get_local_rank()
# print(f'{local_rank=}')
## init_process_group_l2l(args, local_rank=local_rank, world_size=args.world_size, init_method=args.init_method)
# init_process_group_l2l bellow
# if is_running_parallel(rank):
#     print(f'----> setting up rank={rank} (with world_size={world_size})')
#     # MASTER_ADDR = 'localhost'
#     MASTER_ADDR = '127.0.0.1'
#     MASTER_PORT = master_port
#     # set up the master's ip address so this child process can coordinate
#     os.environ['MASTER_ADDR'] = MASTER_ADDR
#     print(f"---> {MASTER_ADDR=}")
#     os.environ['MASTER_PORT'] = MASTER_PORT
#     print(f"---> {MASTER_PORT=}")
#
#     # - use NCCL if you are using gpus: https://pytorch.org/tutorials/intermediate/dist_tuto.html#communication-backends
#     if torch.cuda.is_available():
#         backend = 'nccl'
#         # You need to call torch_uu.cuda.set_device(rank) before init_process_group is called. https://github.com/pytorch/pytorch/issues/54550
#         torch.cuda.set_device(
#             args.device)  # is this right if we do parallel cpu? # You need to call torch_uu.cuda.set_device(rank) before init_process_group is called. https://github.com/pytorch/pytorch/issues/54550
#     print(f'---> {backend=}')
# rank: int = torch.distributed.get_rank() if is_running_parallel(local_rank) else -1

# https://huggingface.co/docs/transformers/tasks/translation
import datasets
from datasets import load_dataset, DatasetDict

books: DatasetDict = load_dataset("opus_books", "en-fr")
print(f'{books=}')

books: DatasetDict = books["train"].train_test_split(test_size=0.2)
print(f'{books=}')
print(f'{books["train"]=}')

print(books["train"][0])
"""
{'id': '90560',
 'translation': {'en': 'But this lofty plateau measured only a few fathoms, and soon we reentered Our Element.',
  'fr': 'Mais ce plateau élevé ne mesurait que quelques toises, et bientôt nous fûmes rentrés dans notre élément.'}}
"""

# - t5 tokenizer

from transformers import AutoTokenizer, PreTrainedTokenizerFast, PreTrainedTokenizer

tokenizer: PreTrainedTokenizerFast = AutoTokenizer.from_pretrained("t5-small")
print(f'{isinstance(tokenizer, PreTrainedTokenizer)=}')
print(f'{isinstance(tokenizer, PreTrainedTokenizerFast)=}')

source_lang = "en"
target_lang = "fr"
prefix = "translate English to French: "


def preprocess_function(examples):
    inputs = [prefix + example[source_lang] for example in examples["translation"]]
    targets = [example[target_lang] for example in examples["translation"]]
    model_inputs = tokenizer(inputs, max_length=128, truncation=True)

    with tokenizer.as_target_tokenizer():
        labels = tokenizer(targets, max_length=128, truncation=True)

    model_inputs["labels"] = labels["input_ids"]
    return model_inputs


# Then create a smaller subset of the dataset as previously shown to speed up the fine-tuning: (hack to seep up tutorial)
books['train'] = books["train"].shuffle(seed=42).select(range(100))
books['test'] = books["test"].shuffle(seed=42).select(range(100))

# # use 🤗 Datasets map method to apply a preprocessing function over the entire dataset:
# tokenized_datasets = dataset.map(tokenize_function, batched=True, batch_size=2)

# todo - would be nice to remove this since gpt-2/3 size you can't preprocess the entire data set...or can you?
# tokenized_books = books.map(preprocess_function, batched=True, batch_size=2)
from uutils.torch_uu.data_uu.hf_uu_data_preprocessing import preprocess_function_translation_tutorial

preprocessor = lambda examples: preprocess_function_translation_tutorial(examples, tokenizer)
tokenized_books = books.map(preprocessor, batched=True, batch_size=2)
print(f'{tokenized_books=}')

# - load model
from transformers import AutoModelForSeq2SeqLM

model = AutoModelForSeq2SeqLM.from_pretrained("t5-small")

# - to DDP
# model = model().to(rank)
# from torch.nn.parallel import DistributedDataParallel as DDP
# ddp_model = DDP(model, device_ids=[rank])

# Use DataCollatorForSeq2Seq to create a batch of examples. It will also dynamically pad your text and labels to the
# length of the longest element in its batch, so they are a uniform length.
# While it is possible to pad your text in the tokenizer function by setting padding=True, dynamic padding is more efficient.

from transformers import DataCollatorForSeq2Seq

# Data collator that will dynamically pad the inputs received, as well as the labels.
data_collator: DataCollatorForSeq2Seq = DataCollatorForSeq2Seq(tokenizer=tokenizer, model=model)

"""
At this point, only three steps remain:

- Define your training hyperparameters in Seq2SeqTrainingArguments.
- Pass the training arguments to Seq2SeqTrainer along with the model, dataset, tokenizer, and data collator.
- Call train() to fine-tune your model.
"""
report_to = "none"
if report_to != 'none':
    import wandb
    wandb.init(project="playground", entity="brando", name='run_name', group='expt_name')

from transformers import Seq2SeqTrainingArguments, Seq2SeqTrainer

# fp16 = True # cuda
# fp16 = False # cpu
import torch

fp16 = torch.cuda.is_available()  # True for cuda, false for cpu
training_args = Seq2SeqTrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    weight_decay=0.01,
    save_total_limit=3,
    num_train_epochs=1,
    fp16=fp16,
    report_to=report_to,
)

trainer = Seq2SeqTrainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_books["train"],
    eval_dataset=tokenized_books["test"],
    tokenizer=tokenizer,
    data_collator=data_collator,
)

trainer.train()

print('\n ----- Success\a')

All references I consulted when writing this question:

1 Like

It’s actually very simple and straightforward to do this. The Trainer will automatically pick up the number of devices you want to use. Any and all of the examples in transformers/examples/pytorch are capable to be ran on multi-gpu automatically. HuggingFace fully supports all DDP

In my example I’ll use the text classification one.

To check if it’s using two GPU’s the whole time, I’ll start with watch -n0.1 nvidia-smi in a separate terminal.

Now I’m assuming we’re running this through a clone of the repo, so the args will be setup similar to how the tests are done:

torchrun --n-proc-per-node 2 examples/pytorch/text-classification/run_glue.py --model_name_or_path distilbert-base-uncased --output_dir outputs --train_file ./tests/fixtures/tests_samples/MRPC/train.csv --validation_file ./tests/fixtures/tests_samples/MRPC/dev.csv --do_train --do_eval --per_device_train_batch_size=2 --per_device_eval_batch_size=1

And that’s all it takes, just launch it like normal via torchrun! You’ll see that both GPUs get utilized immediatly

If you wanted to avoid this, mix it in with our Accelerate library, run accelerate config, and then all you have to do is launch the script via:

accelerate launch examples/pytorch/text-classification/run_glue.py --model_name_or_path distilbert-base-uncased --output_dir outputs --train_file ./tests/fixtures/tests_samples/MRPC/train.csv --validation_file ./tests/fixtures/tests_samples/MRPC/dev.csv --do_train --do_eval --per_device_train_batch_size=2 --per_device_eval_batch_size=1

The config you set will wrap around all the complicated torchrun bits, so you don’t need to do all of that yourself. Even if you don’t use Accelerate for any actual training

Not entirely sure what Cherry is, but Accelerate is a lower-level library capable of doing DDP across single and multi node GPU and TPU, so any and all forms of DDP. But again, Trainer handles all the magic for you in that regard, you just need to launch it the right way

4 Likes

ok I must admit it didn’t occur to me to just run my normal script by appending torchrun --nnodes 2 ... or python -m torch.distributed.launch --nproc_per_node 2 main_data_parallel_ddp_pg.py (with the more familiar/older way to launch for me).

I will launch it as you suggested and track the nvidia-smi usage.

I assume that somehow the processes are in communication and know by coordination when a epoch has truly ended.

Thanks that was useful.


fyi in case torchrun doesn’t work for you:

#pip install torch==1.9.1+cu111 torchvision==0.10.1+cu111 torchaudio==0.9.1 -f https://download.pytorch.org/whl/torch_stable.html
pip install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cu113

or if you want upgrade:

pip install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cu113
pip install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cu113 --upgrade
2 Likes

@muellerzr thank you so much I appreciate your help! Needed to update a bunch of my libraries since my torch was old and it wasn’t running it but now that it does I did:

python -m torch.distributed.launch --nproc_per_node 2 ~/src/main_debug.py

and it worked! see the nvidia-smi & script running side by side(tmux is annoying for copying stuff so doing screen shot):

1 Like

however, torchrun didn’t work:

torchrun --nnodes 2 ...my_script.py

seems to deadlock despite my pytorch libs being up to date. Do you know what might be happening there?

torch                   1.12.1+cu113
torchaudio              0.12.1+cu113
torchtext               0.13.1
torchvision             0.13.1+cu113

I did:

pip install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cu113
pip install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cu113 --upgrade

Needs to be n-proc-per-node

Node = computer in this case. Updated the example above, think i forgot to do that when I did the bug myself!

ok this is the command:

torchrun --nproc_per_node 2 my_script.py
1 Like

@muellerzr sorry for dragging this conversation…but now I have 8 wandb runs being made but I only need 1. You think is this line:

# report_to = "none"
report_to = "wandb"
if report_to != 'none':
    wandb.init(project="proj", entity="me", name='run_name', group='expt_name')

?

How have you stopped this?

In Accelerate we report only on the main process, as shown here: https://github.com/huggingface/transformers/blob/main/examples/pytorch/text-classification/run_glue_no_trainer.py#L461-L469

For non-accelerate, I think it’d look something like:

if int(os.environ.get("LOCAL_RANK", -1)) == 0:
  # do any wandb thing on a single node

and do that every single time.

Note this is torch.distributed.run/launch specific, so it won’t work for TPUs in this way. But you need to check that you are on the local node/main node.

Since you’re very interested in the inner workings like this, would recommend giving Accelerate a peek if possible. Sounds like you may like what we’ve done there :slight_smile:

(Though transformers does not use Accelerate internally right now for this type of stuff)

1 Like

Oh cool. Does accelerate work even without hugging face (HF) models? I have a bunch of other code & models I’ve developed in pytorch over the years and it seems it’s nearly trivial to plug accelerate in.

Btw, I want to ask explicitly this too. Is accelerate compatible with HF’s trainer?

e.g.

+ from accelerate import Accelerator
+ accelerator = Accelerator()

+ model, optimizer, training_dataloader, scheduler = accelerator.prepare(
+     model, optimizer, training_dataloader, scheduler
+ )

trainer = Trainer(...model, optimizer, training_dataloader, scheduler...)
trainer.train()

(inspired from Accelerate)

Btw, thanks in advance for being so generous with your advice :slight_smile:

Yes, quite so. E.g. its currently whats used for fastai’s entire distributed module now. If its PyTorch, it can be done with Accelerate.

The repo has a cv example using timm

Not right now, no. Since Trainer handles all the DDP Accelerate can do internally. But as mentioned earlier, you can still use accelerate to launch those scripts :slight_smile:

1 Like

Hi @muellerzr ,

thanks for providing this useful statements. I am using the scenario:

python -m torchrun --nproc_per_node 2 train_xxx.py

which is basically derived from the nlp_example.py

All I actually changed is the tokenize function and the dataset. After starting the script execution,
the model gets downloaded and everything starts properly. nvidia-smi shows that both GPUs are at approx. 80% usage - so far so good.

What worries me now is the fact that the log outputs the things I have been outputting so far, e.g. the size of the dataset etc., twice:

2023-03-31 08:14:23.354 | DEBUG    | __main__:get_dataloaders:79 - DatasetDict({
    train: Dataset({
        features: ['input_ids', 'attention_mask', 'labels'],
        num_rows: 7500
    })
    test: Dataset({
        features: ['input_ids', 'attention_mask', 'labels'],
        num_rows: 2500
    })
})
You're using a GPT2TokenizerFast tokenizer. Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding.
You're using a GPT2TokenizerFast tokenizer. Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding.

My question would be how can I make sure that now not the training runs once on each GPU, but actually distributed?

Kind regards
Julian

Hi @muellerzr , I’m curios about how Trainer works. After I look at the script, I found that when saving model at checkpoint, the script didn’t use local_rank argument to make the script only saving model on first worker. But, the example from Pytorch here showing that saving model at checkpoint using parameter local_rank. Is it okay to do what the Trainer do?

Yes, on the backend the trainer does the same thing, (and in accelerate, save_state), only writing/using on the main worker during saving

What if I’m using torchrun? Is it still okay? Because if I’m using torchrun, I have to explicitly set condition of local_rank == 0 to finally can save the model.

Do you mean raw PyTorch? As torchrun is just the PyTorch version of calling accelerate launch. it just handles spinning up the multi-gpu session, nothing about the code

Hey @muellerzr I have a question related to this. I am trying to finetune a model that is loaded on 8bit using Peft/Lora library in huggingface. I share the code I’m using for this below. My problem is: I have 8 gpu machine (each has 40GB gpu memory), but the below code does use only one of them to process batches. To clarify, I have 3200 examples and I set per_device_train_batch_size=4. Because I have 8 gpus step size I see in the progress bar during traning should be 100 (= 3200/(4*8)) but instead step size is written as 800 which tells me that only one of the gpus are being used. (I see that model is loaded in more than one gpus because when I check nvidia-smi I see that the first gpu is 80% full and the others are around 10-20% full). Do you know what is the problem and how I can fix it?

Edit: I run my code as follows: python script_name.py

import torch
from datasets import load_dataset
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM,BitsAndBytesConfig
from datasets import concatenate_datasets
import numpy as np
from transformers import AutoModelForSeq2SeqLM
from peft import LoraConfig, get_peft_model, prepare_model_for_int8_training, TaskType
from peft import prepare_model_for_kbit_training
from transformers import DataCollatorForSeq2Seq
from transformers import Seq2SeqTrainer, Seq2SeqTrainingArguments
from peft import PeftConfig,PeftModel
from random import randrange
import os
from postprocess_finetuning_data import create_huggingface_dataset


def preprocess_function(sample,padding="max_length"):
    # add prefix to the input for t5
    inputs = [item for item in sample["input"]]

    # tokenize inputs
    model_inputs = tokenizer(inputs, max_length=max_source_length, padding=padding, truncation=True)

    # Tokenize targets with the `text_target` keyword argument
    labels = tokenizer(text_target=sample["output"], max_length=max_target_length, padding=padding, truncation=True)

    # If we are padding here, replace all tokenizer.pad_token_id in the labels by -100 when we want to ignore
    # padding in the loss.
    if padding == "max_length":
        labels["input_ids"] = [
            [(l if l != tokenizer.pad_token_id else -100) for l in label] for label in labels["input_ids"]
        ]

    model_inputs["labels"] = labels["input_ids"]
    return model_inputs

model_id="google/flan-ul2"
dataset = 'finetuning_data/training_data.csv'
per_attribute_max_size=2000
output_dir="lora-flanul2-context5_maxsize2000"


tokenizer = AutoTokenizer.from_pretrained(model_id)
max_source_length=1000
max_target_length = 20
dataset = create_huggingface_dataset(dataset,N=per_attribute_max_size)

# We preprocess our dataset before training and save it to disk
tokenized_dataset = dataset.map(preprocess_function, batched=True, remove_columns=["input", "output"])
print(f"Keys of tokenized dataset: {list(tokenized_dataset['train'].features)}")


# Fine tuning
bnb_config = BitsAndBytesConfig(
    load_in_8bit=True,
)

# load model from the hub
model = AutoModelForSeq2SeqLM.from_pretrained(model_id, quantization_config=bnb_config, device_map="auto")
model.gradient_checkpointing_enable()
model = prepare_model_for_kbit_training(model)

# Define LoRA Config
lora_config = LoraConfig(
 r=16,
 lora_alpha=32,
 target_modules=["q", "v"],
 lora_dropout=0.05,
 bias="none",
 task_type=TaskType.SEQ_2_SEQ_LM
)

model = get_peft_model(model, lora_config)
model.print_trainable_parameters()

# To create a DataCollator that will take care of padding our inputs and labels.
# Use the DataCollatorForSeq2Seq from the Transformers library.
label_pad_token_id = -100
# Data collator
data_collator = DataCollatorForSeq2Seq(
    tokenizer,
    model=model,
    label_pad_token_id=label_pad_token_id,
    pad_to_multiple_of=8
)

# Define training args
training_args = Seq2SeqTrainingArguments(
    output_dir=output_dir,
    learning_rate=3e-4,
    num_train_epochs=1,
    logging_dir=f"{output_dir}/logs",
    logging_strategy="steps",
    logging_steps=400,
    save_strategy="steps",
    save_steps=400,
    report_to="tensorboard",
    evaluation_strategy='steps',
    eval_steps=400,
    per_device_train_batch_size=4,
    load_best_model_at_end=True,
)

# Create Trainer instance
trainer = Seq2SeqTrainer(
    model=model,
    args=training_args,
    data_collator=data_collator,
    train_dataset=tokenized_dataset["train"],
    eval_dataset=tokenized_dataset["valid"]
)
model.config.use_cache = False

trainer.train()
2 Likes

@cyt79 Have you solved this, I am encountering the same phenomenon as well?