This is more of a discussion choice question because I have a workaround to suit my use case.
My understanding was if I am training my model on multiple GPUs (say n GPUs) in a DistributedDataParallel
fashion, all GPUs will receive different batches, and the effective batch size would be n x batch_size_per_gpu.
With this understanding, I created a simple script to see how I can control the randomness of the batches, as shown below. But to my surprise, different workers with the same id
received the same seed
and the same shard
. This would mean, in effect, all the GPUs would be seeing the same data at the same time, thus this would not increase the batch size.
This behavior IMO defeats the purpose of doing parallel training because it does not increase the batch size. Please correct me if I there are any gaps in my understanding or if this is not how we are supposed to use DDP.
The print statement in the SeedTest.seedTest
function prints the seed
received by different workers across GPU (Refer to the output – I reordered it to highlight the point.).
Output:
rank=0 worker=0 seed=6909045637428952499 shards=[0]
rank=1 worker=0 seed=6909045637428952499 shards=[0]
rank=0 worker=1 seed=6909045637428952500 shards=[1]
rank=1 worker=1 seed=6909045637428952500 shards=[1]
rank=0 worker=2 seed=6909045637428952501 shards=[2]
rank=1 worker=2 seed=6909045637428952501 shards=[2]
rank=0 worker=3 seed=6909045637428952502 shards=[3]
rank=1 worker=3 seed=6909045637428952502 shards=[3]
Code
import typing as tp
import os
import torch
from datasets import IterableDataset # huggingface - transformers datasets
from transformers import AutoTokenizer, AutoModelForMaskedLM, DataCollatorForLanguageModeling, TrainingArguments, Trainer
class SeedTest:
def __init__(self) -> None:
pass
def seedTest(self, shards: tp.List[tp.Any]) -> tp.Any:
rank = int(os.environ.get("LOCAL_RANK", 0))
worker_info: tp.Any = torch.utils.data.get_worker_info()
print(f"rank={rank} worker={worker_info.id} seed={worker_info.seed}")
yield {
"seq": "ATGCATGC"
}
if __name__ == "__main__":
model_name ="InstaDeepAI/nucleotide-transformer-v2-50m-multi-species"
model = AutoModelForMaskedLM.from_pretrained(model_name, trust_remote_code=True)
tokeniser = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
collator = DataCollatorForLanguageModeling(tokenizer=tokeniser, mlm=False)
_d_shard = IterableDataset.from_generator(SeedTest().seedTest, gen_kwargs={"shards": list(range(4))})
_d_shard = _d_shard.map(lambda x: tokeniser(x['seq']))
_d_shard = _d_shard.select_columns(["input_ids", "attention_mask"])
_d_shard = _d_shard.with_format("torch")
trargs = TrainingArguments(output_dir="test", max_steps=1, dataloader_num_workers=4)
trainer = Trainer(
model,
args=trargs,
train_dataset=_d_shard,
data_collator=collator
)
trainer.train()
Run command
torchrun --nproc_per_node 2 seed_test.py
P.S.: I was unsure if I should ask it in the Pytorch community or the Huggingface community. Please let me know if it is the correct place to ask the question.