I want to run CustomSFTTrainer (inherits SFTTrainer which inturn inherits Trainer class) on a multi-GPU setup using accelerate. I understand that the Trainer class already uses accelerate and hence appropriately creates a dataloader and calls accelerate.prepare(dataloader) in its train method.
However, I fail to understand if it uses DistributedSampler. I noticed that it uses only RandomSampler and accelerate inturn calls SeedableRandomSampler and not a DistributedSampler. I want to run the model on different GPUs with exclusive unique chunks of data so that the training is faster.
How do I use DistrubutedSampler with accelerate and the inbuilt Trainer class?
1 Like
There may be no advantage to explicitly using DistributedSamplerâŚ
As far as I know, for Pytorch, RandomSampler can not be directly used in the distributed data parallel training since DistributedSampler is desired (this link discusses the problem). I am wondering whether accelerator.prepare(dataloader) handles the data split for multiple GPUs if I use the RandomSampler, so that the sub-dataset on each device are exclusive.
You donât have to worry about using a distributed sampler with Accelerate. Whatever your sampler is, Accelerate will automatically shard it for all processes.
I see. So, just to be clear, Accelerate will ensure that, given any sampler, the data will be split exclusively for each GPU? Interesting, because I wasnât able to find this functionality in the prepare_dataloader method of the Accelerate function. Is it wrapped in any other Accelerate method?
1 Like
Itâs hard to tell whatâs where in the code of the library in charge of optimizationâŚ
Thereâs no example that directly mentions the mechanism.
opened 05:22PM - 04 Sep 22 UTC
closed 02:46PM - 12 Sep 22 UTC
feature request
### System Info
```Shell
accelerate: 0.12.0
OS: Linux 5.4.188+ (Colab)
Pyt⌠hon: 3.7.13
numpy: 1.21.6
torch: 1.12.1+cu113
config: 1 CPU
```
### Information
- [ ] The official example scripts
- [X] My own modified scripts
### Tasks
- [ ] One of the scripts in the examples/ folder of Accelerate or an officially supported `no_trainer` script in the `examples` folder of the `transformers` repo (such as `run_no_trainer_glue.py`)
- [X] My own task or dataset (give details below)
### Reproduction
MRE : https://colab.research.google.com/drive/17krCJCF_nWtNFSiMBo3oz12l7eX1bBZ6
First of all, thanks for this library and the great docs and examples that comes with it đ!
I am using a custom torch Dataset that contains a Hugging Face Dataset (pyarrow) instance. Therefore, as indicated in the Datasets docs (https://huggingface.co/docs/datasets/v2.4.0/en/use_with_pytorch#use-a-batchsampler), I tried to use a BatchSampler to reduce the number of queries. However, I have not been able yet to make it work yet with accelerate.
I tried many different possibilities, one of which works one CPU or one GPU, but gets stuck when using distributed training.
Thanks for your help!
opened 07:18PM - 17 Jun 24 UTC
enhancement
feature request
### System Info
```Shell
accelerate 0.31.0
Ubuntu 22.04 (WSL)
python=3.10.⌠14
```
### Information
- [ ] The official example scripts
- [X] My own modified scripts
### Tasks
- [ ] One of the scripts in the examples/ folder of Accelerate or an officially supported `no_trainer` script in the `examples` folder of the `transformers` repo (such as `run_no_trainer_glue.py`)
- [X] My own task or dataset (give details below)
### Reproduction
I would like to combine distributed training and a weighted random sampler. In order to do that, i :
1. Create my Dataset inheriting from torch.utils.data.Dataset
2. Compute weights specific to my classes and data
3. Create my DataLoader with the random sampler
4. Prepare my dataloader with accelerate
But it seems that this is not working because we have data leaks between processes.

I would like to make sure, processes uses different data, like that :
<img src='https://github.com/huggingface/accelerate/assets/26071804/93ce0cc7-4646-4e34-9b71-90c896f06f2a' width='400px' />
I developped an example script in order to understand the process :
```python
from accelerate import Accelerator
import argparse
import os
import torch.distributed as dist
import torch
from tqdm.auto import tqdm
from torch.utils.data import Dataset,DataLoader
from torch.utils.data import WeightedRandomSampler,BatchSampler
WORLD_SIZE = int(os.getenv('WORLD_SIZE',1))
MAIN_PROCESS = not int(os.getenv('RANK',0))
parser = argparse.ArgumentParser()
parser.add_argument('--dataset_count',default=12800)
parser.add_argument('--epochs',default=20)
parser.add_argument('--batch_size',default=64)
parser.add_argument('--balance',action='store_true',default=False)
def is_even(number):
return not number%2 # example 10 => 10%2 == 0
class DummyDataset(Dataset):
def __init__(self,dataset_count:int):
self.data = range(dataset_count)
def __len__(self):
return len(self.data)
def dataloader(self,batch_size,balance:bool=False,seed=42,batch_sampler=False,drop_last:bool=False):
generator = torch.Generator().manual_seed(seed)
def get_weight(num):
if is_even(num):
# even
return 1.0
else:
# odd (impair)
return 0.1
if balance:
weights = [get_weight(i) for i in self.data]
sampler = WeightedRandomSampler(weights,len(self),replacement=True,generator=generator)
else:
sampler = None
if batch_sampler:
return DataLoader(self,batch_sampler=BatchSampler(sampler,batch_size,drop_last))
else:
return DataLoader(self,batch_size,sampler=sampler,drop_last=drop_last)
def __getitem__(self,idx):
row_index = self.data[idx]
return row_index
def main(
dataset_count:int,
epochs:int,
batch_size:int,
balance:bool=True):
if int(os.environ.get('WORLD_SIZE',1))>1:
dist.init_process_group(backend='gloo')
accelerator = Accelerator(cpu=True)
# We mount the right storage...
# We get the path
dataset = DummyDataset(dataset_count)
# Dataloader without Accelerate...
dataloader = dataset.dataloader(batch_size,balance)
batched_data = []
if MAIN_PROCESS:
print(f'Running {epochs*len(dataloader)} iterations')
for epoch in range(epochs):
for batch in dataloader:
batch:torch.Tensor
batched_data.extend(batch.tolist())
count_even = len([v for v in batched_data if is_even(v)])
count_odd = len([v for v in batched_data if not is_even(v)])
ratio_odd = count_odd/(count_even+count_odd)
if MAIN_PROCESS:
print('Get proportion of Odd data without accelerate')
print(f'Ratio Odd = {ratio_odd}')
# Dataloader with Accelerate...
dataloader = accelerator.prepare(dataloader)
# We increase learning rate when multiGPU
batched_data = []
if MAIN_PROCESS:
print(f'Running {epochs*len(dataloader)} iterations')
for epoch in range(epochs):
for batch in dataloader:
batch:torch.Tensor
batched_data.extend(batch.tolist())
count_even = len([v for v in batched_data if is_even(v)])
count_odd = len([v for v in batched_data if not is_even(v)])
ratio_odd = count_odd/(count_even+count_odd)
if MAIN_PROCESS:
print('Get proportion of Odd data with accelerate')
print(f'Ratio Odd = {ratio_odd}')
# We save to a file for further processing...
suffix = '_balanced' if balance else '_unbalanced'
rank = str(os.environ.get('RANK',0))
with open(f'test_{rank}{suffix}.json','w') as jsf:
import json
json.dump(sorted(batched_data),jsf,indent=4)
accelerator.wait_for_everyone()
seen_data = set(batched_data)
if WORLD_SIZE>1:
# Now every one will open the other...
other_rank = str(int(not int(os.environ.get('RANK',0))))
with open(f'test_{other_rank}{suffix}.json','r') as jsf:
import json
other_data = json.load(jsf)
# We get unique ids in order to check that we don't have leaks...
other_data = set(other_data)
batched_data = set(batched_data)
unique_in_rank = batched_data.difference(other_data)
if MAIN_PROCESS:
print('Verify the unicity of the data on each rank...\n')
print(f'{len(unique_in_rank)}/{len(batched_data)} data only are not leaking from rank {rank} to rank {other_rank}')
seen_data = unique_in_rank.union(other_data)
# Unseen data
unseen_data = set(dataset.data).difference(seen_data)
if MAIN_PROCESS:
print("Unseen Data")
print(f'{len(unseen_data)}/{len(dataset)} have not been seen...')
if __name__=='__main__':
params = vars(parser.parse_args())
print('----------------------------------------')
[print(f'{k}: {v}') for k,v in params.items()]
print('----------------------------------------')
main(**params)
```
You can try to run this script different ways :
## Single node without "balance"
```
----------------------------------------
dataset_count: 12800
epochs: 20
batch_size: 64
balance: False
----------------------------------------
Running 4000 iterations
Get proportion of Odd data without accelerate
Ratio Odd = 0.5
Running 4000 iterations
Get proportion of Odd data with accelerate
Ratio Odd = 0.5
Unseen Data
0/12800 have not been seen...
```
## Multiple node (2) without "balance"
```
----------------------------------------
dataset_count: 12800
epochs: 20
batch_size: 64
balance: False
----------------------------------------
Running 4000 iterations
Get proportion of Odd data without accelerate
Ratio Odd = 0.5
Running 2000 iterations
Get proportion of Odd data with accelerate
Ratio Odd = 0.5
Verify the unicity of the data on each rank...
Verify the unicity of the data on each rank...
6400/6400 data only are not leaking from rank 0 to rank 1
6400/6400 data only are not leaking from rank 1 to rank 0
Unseen Data
0/12800 have not been seen...
```
We see that we do not have any leak, all data are seen.
## Single node with "balance"
```
----------------------------------------
dataset_count: 12800
epochs: 20
batch_size: 64
balance: True
----------------------------------------
Running 4000 iterations
Get proportion of Odd data without accelerate
Ratio Odd = 0.09179296875
Running 4000 iterations
Get proportion of Odd data with accelerate
Ratio Odd = 0.09139453125
Unseen Data
167/12800 have not been seen...
```
We see that a few data has not been seen. It's normal because we have a very low rate of Odd data.
## Multiple node with "balance"
```
----------------------------------------
dataset_count: 12800
epochs: 20
batch_size: 64
balance: True
----------------------------------------
Running 4000 iterations
Get proportion of Odd data without accelerate
Ratio Odd = 0.09179296875
Running 2000 iterations
Get proportion of Odd data with accelerate
Ratio Odd = 0.0917890625
Verify the unicity of the data on each rank...
895/11760 data only are not leaking from rank 0 to rank 1
873/11738 data only are not leaking from rank 1 to rank 0
Unseen Data
167/12800 have not been seen...
```
We see that data are leaking from one node to the other. Like if there was an issue with the distributed sampler.
How to fix it ?
### Expected behavior
I would like the weighted sampler to be used and i would like nothing to leak from node 1 to node 2 like in the case where we don't have weighted sampler.
**Do you have any idea about how to get this result ?**
Thanks !
@property
def batch_sampler(self):
return self._loader.batch_sampler
@property
def dataloader(self):
return self._loader
class DataLoaderDispatcher(DataLoaderAdapter, DataLoaderStateMixin):
"""
Subclass of `DataLoaderAdapter` that will iterate and preprocess on process 0 only, then dispatch on each process
their part of the batch.
Args:
split_batches (`bool`, *optional*, defaults to `False`):
Whether the resulting `DataLoader` should split the batches of the original data loader across devices or
yield full batches (in which case it will yield batches starting at the `process_index`-th and advancing of
`num_processes` batches at each iteration). Another way to see this is that the observed batch size will be
the same as the initial `dataloader` if this option is set to `True`, the batch size of the initial
system
Closed
April 3, 2025, 2:55am
5
This topic was automatically closed 12 hours after the last reply. New replies are no longer allowed.