Interleaving Iterable Dataset with num_workers > 0

Hello,

I have a couple of iterable datasets I’d like to interleave, but the dataloader raises this error:

NotImplementedError: Sharding a CyclingMultiSourcesExamplesIterable is not implemented

It looks like multiprocessing with interleaved iterable datasets is not yet supported.
Yet, having a dataloader with num_workers > 0 is important for my use case.

What would be the easiest way to achieve my goal ?

If it’s not possible to create a batch that comprises data from multiple interleaved dataset with num_workers > 0, I thought of another way to achieve similar results:

I could build a DataLoader that gives batches that alternate between the datasets and then use buffer shuffling to mix those batches together.

Does the second option seems doable ? Is it of interest for this repo ? If so I could do a PR to add it.

Cheers

Simple code to reproduce this issue:

from torch.utils.data import DataLoader

from datasets import load_dataset, interleave_datasets

if __name__ == "__main__":
    dataset = load_dataset("mozilla-foundation/common_voice_11_0", "br", streaming=True)
    dataset1 = dataset["train"]
    dataset2 = dataset["test"]
    
    d3 = interleave_datasets([dataset1, dataset2])
    dataloader = DataLoader(d3, num_workers=1, batch_size=1)
    
    for element in dataloader:
        print(element)

NotImplementedError: Sharding a CyclingMultiSourcesExamplesIterable is not implemented

Hi !

It’s not implemented because there’s currently no communication between the workers to tell which shards from one subset or the other are being read per worker. This is important to be able to stream the dataset completely without duplicates.

Though passing num_workers=1 should actually be allowed - it’s probably worth opening an issue on GitHub about it.

Anyway you should at least be able to have num_workers=1 using a dataset defined with from_generator - since it has only 1 shard/data_source which is its generator:

d0 = load_dataset("mozilla-foundation/common_voice_11_0", "br", streaming=True)
d1 = d0["train"]
d2 = d0["test"]
d3 = interleave_datasets([d1, d2])

def generate_data(): 
    # use .iter() because __iter__ would raises an error in a DataLoader:
    # NotImplementedError: Sharding a CyclingMultiSourcesExamplesIterable is not implemented
    for batch in d3.iter(batch_size=1):  
        example = {key: value[0] for key, value in batch.items()}
        yield example

dataset = IterableDataset.from_generator(generate_data)

If you want to use more workers to decode the audio data, you’d need to allow generate_data to have a list of data sources as input, but it requires a bit more work to make sure that the audio decoding is distributed correctly across workers

1 Like

Thanks for your answer !

Before reading your answer, I tried implementing sharding of the merged iterable datasets.
I tested my customization with interleave_datasets and take commands and they seem to be working fine. But it is possible I missed something major with the sharding.

Can you have a look at my PR when you’ve got the time ? I’d like to know if my solution makes sense before I make all the improvements necessary for it to be merged :slight_smile:

1 Like

I’ll take a look, thanks !

1 Like