Hey, I am trying to train a custom model (which inherits from PreTrainedModel) with IterableDataset using the HuggingFace Trainer in a DDP setup and I have a couple of questions on how to do it best as well as some of my observations.
- I know there is a 
datasets.distributed.split_dataset_by_node()function which can distribute the shards across the nodes, but should I call it myself inside theget_train_dataloader()function inside theTraineror is it handled automatically? - I noticed that the training fails with:
 
RuntimeError: You can't use batches of different size with `dispatch_batches=True` or when using an `IterableDataset`.either pass `dispatch_batches=False` and have each process fetch its own batch  or pass `split_batches=True`. By doing so, the main process will fetch a full batch and slice it into `num_processes` batches for each process.` if I do not add an argument  `accelerator_config=AcceleratorConfig(dispatch_batches=False, split_batches=False)` to the `TrainingArguments`. 
Should it not be handled automatically? Am I doing something wrong here in specifying it?
- How do I specify the 
num_workersto use in theDataLoaderwith DDP? When I specifynum_workers > n_gpus(i.e.world_size), I get an errorRuntimeError: DataLoader worker (pid(s) 572878) exited unexpectedly. This is an issue for me because the optimalnum_workersin a single GPU setup isnum_workers=16, hence, if I only use one worker per GPU/process. This results in slower training than on a single GPU. I also checked withnvidia-smithat because of this GPUs are idle most of the time. - I noticed that if I do:
 
def get_train_dataloader():
   [...]
   dataset = split_dataset_by_node(dataset, rank=self.args.local_rank, world_size=self.args.world_size)
   return DataLoader(dataset, **dataloader_params)
the training is faster than:
def get_train_dataloader():
   [...]
   dataset = split_dataset_by_node(dataset, rank=self.args.local_rank, world_size=self.args.world_size)
   self.accelerator.prepare(DataLoader(train_dataset, **dataloader_params))
Any idea why is this?
Here is how my get_train_dataloader() function inside trainer looks currently:
def get_train_dataloader(self) -> DataLoader:
        """
        Returns the training [`~torch.utils.data.DataLoader`].
        Will use no sampler if `train_dataset` does not implement `__len__`, a random sampler (adapted to distributed
        training if necessary) otherwise.
        Subclass and override this method if you want to inject some custom behavior.
        """
        if self.train_dataset is None:
            raise ValueError("Trainer: training requires a train_dataset.")
        train_dataset = self.train_dataset
        data_collator = self.data_collator
        if is_datasets_available() and isinstance(train_dataset, datasets.Dataset):
            train_dataset = self._remove_unused_columns(train_dataset, description="training")
        else:
            data_collator = self._get_collator_with_removed_columns(data_collator, description="training")
        dataloader_params = {
            "batch_size": self._train_batch_size,
            "collate_fn": data_collator,
            "num_workers": self.args.dataloader_num_workers,
            "pin_memory": self.args.dataloader_pin_memory,
            "persistent_workers": self.args.dataloader_persistent_workers,
        }
        if not isinstance(train_dataset, torch.utils.data.IterableDataset):
            dataloader_params["sampler"] = self._get_train_sampler()
            dataloader_params["drop_last"] = self.args.dataloader_drop_last
            dataloader_params["worker_init_fn"] = seed_worker
            dataloader_params["prefetch_factor"] = self.args.dataloader_prefetch_factor
        else:
            # if using DDP, we need to split the dataset by node
            if self.args.world_size > 1:
                train_dataset = split_dataset_by_node(
                    train_dataset, rank=self.args.local_rank, world_size=self.args.world_size
                )
        return DataLoader(train_dataset, **dataloader_params)
this is how my IterableDataset is created:
train_dataset = load_dataset("parquet", data_files=data_files, split="train", streaming=True).map(
        transform_fn, batched=False, with_indices=False
    )
and I kick off the training with 4 GPUs on:
torchrun --nproc_per_node 4 training_script.py <args>
I use:
torch==2.0.1
transformers==4.38.2
datasets=2.18.0
accelerate==0.26.1
Nr 3 is the biggest issue! It seems like as currently the DDP training is slower than single-GPU training because it uses only one worker, instead of 16, and there is quite significant on-the-fly processing, hence the importance of having multiple workers.