Here’s the code I am using:
training_args = Seq2SeqTrainingArguments(
output_dir,
per_device_train_batch_size=16,
num_train_epochs=1,
logging_dir=f"{output_dir}/logs",
report_to="tensorboard",
save_strategy="no",
eval_strategy="no"
)
trainer = Seq2SeqTrainer(
model,
args=training_args,
train_dataset=dataset,
data_collator=DataCollatorForSeq2Seq(tokenizer=tokenizer)
)
By default does it use DDP for training ?
And I am using an IterableDataset, will it handle sending different batches to different gpus by default ? It should right ?
1 Like
Just to be sure, I searched for it, and it seems that there are some complicated cases.
Hey, I am trying to train a custom model (which inherits from PreTrainedModel) with IterableDataset using the HuggingFace Trainer in a DDP setup and I have a couple of questions on how to do it best as well as some of my observations.
I know there is a datasets.distributed.split_dataset_by_node() function which can distribute the shards across the nodes, but should I call it myself inside the get_train_dataloader() function inside the Trainer or is it handled automatically?
I noticed that the …
opened 02:34PM - 23 Jun 23 UTC
enhancement
### Feature request
Minimal Example
```
import torch
from datasets impor… t IterableDataset
d = IterableDataset.from_file(<file_name>)
dl = torch.utils.data.dataloader.DataLoader(d,num_workers=3)
for sample in dl:
print(sample)
```
Warning:
Too many dataloader workers: 2 (max is dataset.n_shards=1). Stopping 1 dataloader workers.
To parallelize data loading, we give each process some shards (or data sources) to process. Therefore it's unnecessary to have a number of workers greater than dataset.n_shards=1. To enable more parallelism, please split the dataset in more files than 1.
Expected Behavior:
Dataset is sharded each cpu uses subset (contiguously - so you can do checkpoint loading/saving)
### Motivation
I have a lot of unused cpu's and would like to be able to shard iterable datasets with pytorch's dataloader when num_workers > 1. This is for a very large single file. I am aware that we can use the `split_dataset_by_node` to ensure that each node (for distributed) gets different shards, but we should extend it so that this also continues for multiple workers.
### Your contribution
If someone points me to what needs to change, I can create a PR.
opened 10:22PM - 15 Jan 24 UTC
### Describe the bug
The sharding of IterableDatasets with respect to distrib… uted and dataloader worker processes appears problematic with significant performance traps and inconsistencies wrt to distributed train processes vs worker processes.
Splitting across num_workers (per train process loader processes) and world_size (distributed training processes) appears inconsistent.
* worker split: https://github.com/huggingface/datasets/blob/9d6d16117a30ba345b0236407975f701c5b288d4/src/datasets/iterable_dataset.py#L1266-L1283
* distributed split: https://github.com/huggingface/datasets/blob/9d6d16117a30ba345b0236407975f701c5b288d4/src/datasets/iterable_dataset.py#L1335-L1356
In the case of the distributed split, there is a modulus check that flips between two very different behaviours, why is this different than splitting across the data loader workers? For IterableDatasets the DataLoaders worker processes are independent, so whether it's workers within one train process or across a distributed world the shards should be distributed the same, across `world_size * num_worker` independent workers in either case...
Further, the fallback case when the `n_shards % world_size == 0` check fails is a rather extreme change. I argue it is not desirable to do that implicitly, it should be an explicit case for specific scenarios (ie reliable validation). A train scenario would likely be much better handled with improved wrapping / stopping behaviour to eg also fix #6437. Changing from stepping shards to stepping samples means that every single process reads ALL of the shards. This was never an intended default for sharded training, shards gain their performance advantage in large scale distributed training by explicitly avoiding the need to have every process overlapping in the data they read, by default, only the data allocated to each process via their assigned shards should be read in each pass of the dataset.
Using a large scale CLIP example, some of the larger datasets have 10-20k shards across 100+TB of data. Training with 1000 GPUs we are switching between reading 100 terabytes per epoch to 100 petabytes if say change 20k % 1000 and drop one gpu-node to 20k % 992.
The 'step over samples' case might be worth the overhead in specific validation scenarios where gaurantees of at least/most once samples seen are more important and do not make up a significant portion of train time or are done in smaller world sizes outside of train.
### Steps to reproduce the bug
N/A
### Expected behavior
We have an iterable dataset with N shards, to split across workers
* shuffle shards (same seed across all train processes)
* step shard iterator across distributed processes
* step shard iterator across dataloader worker processes
* shuffle samples in every worker via shuffle buffer (different seed in each worker, but ideally controllable (based on base seed + worker id + epoch).
* end up with (possibly uneven) number of shards per worker but each shard only ever accessed by 1 worker per pass (epoch)
### Environment info
N/A
opened 11:46PM - 27 Jan 24 UTC
enhancement
### Feature request
Let’s say I have a dataset with 5 samples with values [1,… 2, 3, 4, 5], with 2 GPUs (for DDP) and batch size of 2. This dataset is an `IterableDataset` since I am streaming it.
Now I split the dataset using `split_dataset_by_node` to ensure it doesn’t get repeated. And since it’s already splitted, I don’t have to use `DistributedSampler` (also they don't work with iterable datasets anyway)?
But in this case I noticed that the:
First iteraton:
first GPU will get → [1, 2]
first GPU will get → [3, 4]
Second iteraton:
first GPU will get → [5]
first GPU will get → Nothing
which actually creates an issue since in case of `DistributedSampler`, the samples are repeated internally to ensure non of the GPUs at any iteration is missing any data for gradient sync.
So my questions are:
1. Here since splitting is happening before hand, how to make sure each GPU get’s a batch at each iteration to avoid gradient sync issues?
2. Do we need to use `DistributedSampler`? If yes, how?
3. in the docstrings of `split_dataset_by_node`, this is mentioned: *"If the dataset has a number of shards that is a factor of `world_size` (i.e. if `dataset.n_shards % world_size == 0`), then the shards are evenly assigned across the nodes, which is the most optimized. Otherwise, each node keeps 1 example out of `world_size`, skipping the other examples."* Can you explain the last part here?
4. If `dataset.n_shards % world_size != 0`, is it possible to shard the streaming dataset on the fly to avoid the case where data is missing?
### Motivation
Somehow streaming datasets should work with DDP since for big LLMs a lot of data is required and DDP/multi-node is mostly used to train such models and streaming can actually help solve the data part of it.
### Your contribution
Yes, I can help in submitting the PR once we get mutual understanding on how it should behave.
The following is AI-generated code. I’m not sure if it works.
by Hugging Chat: HuggingChat
The behavior of your code regarding distributed training and data handling depends on several factors. Here’s a detailed breakdown:
1. Does the code use DDP (Distributed Data Parallel) by default?
No , your code does not automatically use DDP unless explicitly configured. Here’s why:
Default Behavior : The Hugging Face Trainer
does not enable DDP unless you specify the local_rank
argument or launch the script using a distributed launcher (e.g., torch.distributed.launch
, accelerate
, or deepspeed
).
How to Enable DDP :
2. Does the IterableDataset
handle distributing batches to different GPUs?
No , by default, an IterableDataset
does not automatically shard data across GPUs. You must manually handle sharding for distributed training:
Why? IterableDataset
streams data sequentially, and all processes would otherwise iterate over the same data unless explicitly partitioned.
How to Fix :
Use torch.utils.data.distributed.DistributedSampler
(for MapDataset
) or implement custom sharding for IterableDataset
.
Example for sharding in an IterableDataset
:class ShardedIterableDataset(IterableDataset):
def __init__(self, dataset, rank=0, world_size=1):
self.dataset = dataset
self.rank = rank
self.world_size = world_size
def __iter__(self):
for idx, item in enumerate(self.dataset):
if idx % self.world_size == self.rank:
yield item
Pass the rank and world size to your dataset:from torch.distributed import get_rank, get_world_size
dataset = ShardedIterableDataset(raw_dataset, rank=get_rank(), world_size=get_world_size())
Key Takeaways
Enable DDP by setting local_rank
or using a distributed launcher.
Manually shard your IterableDataset
to avoid redundant data loading across GPUs.
Verify your setup by checking logs for distributed training flags (e.g., Using distributed training: True
in the Trainer
logs).
Example Fix for Distributed Training
# Add local_rank to training args (if using torch.distributed.launch)
training_args = Seq2SeqTrainingArguments(
output_dir,
per_device_train_batch_size=16,
num_train_epochs=1,
logging_dir=f"{output_dir}/logs",
report_to="tensorboard",
save_strategy="no",
eval_strategy="no",
local_rank=0 # Add this if launching via torch.distributed.launch
)
# Shard the dataset manually
from torch.distributed import get_rank, get_world_size
def get_sharded_dataset(rank, world_size):
return ShardedIterableDataset(dataset, rank, world_size)
# Modify your DataLoader (if needed)
trainer = Seq2SeqTrainer(
model,
args=training_args,
train_dataset=get_sharded_dataset(get_rank(), get_world_size()),
data_collator=DataCollatorForSeq2Seq(tokenizer=tokenizer)
)
By addressing these points, you’ll ensure efficient distributed training with proper data sharding. Let me know if you need further clarification!