Loading webdatasets across multiple nodes

Hi all,

Iā€™m working on releasing a dataset of around 1M images of galaxies labelled by volunteers (www.galaxyzoo.org). Iā€™m trying to understand the pros and cons of:

  1. Storing the dataset on the hub as either webdataset .tar or HF ā€˜standardā€™ .parquet shards
  2. Loading the data with either the webdataset library or the HF ā€˜standardā€™ Datasets library

My current setup uses webdatasets for both storage and loading. But I think other users might like the friendly Datasets API.

If I upload webdataset .tar and load them with Datasets load_dataset(ā€¦, streaming=True), what happens under the hood?

  • Does each node download the full dataset?
  • Are the webdataset files optionally cached as arrow/parquet, and from here onwards can be treated as a naive HF dataset?
  • How are the sequential read .tar shards compatible with the Dataset approach of skipping individual indexes to rebalance across nodes?

If I instead upload HF native .parquet files:

  • Will I see reduced I/O performance from losing sequential read access?

Lastly, I wanted to bump another userā€™s thread who asked if (in general, i.e. assuming the typical arrow/parquet setup) the Datasets library is compatible with PyTorch distributed training via Lightningā€™s Trainer.

Thank you for your time and for building these cool tools, itā€™s especially awesome that uploading WDS on the hub is so easy.

In streaming mode, only the requested samples are downloaded on-the-fly when iterating on the dataset

  • Does each node download the full dataset?

it depends how you separate your dataset by node. By default the Hugging Face Trainer will make every node stream the full dataset but skip samples to avoid duplicates. There is split_dataset_by_node() that can assign shards to each node though so that each node streams only the sample it uses for training:

  • Are the webdataset files optionally cached as arrow/parquet, and from here onwards can be treated as a naive HF dataset?

there is no cache in streaming mode

Yes webdataset or any sharded format is compatible with this. See split_dataset_by_node() :wink:

If I instead upload HF native .parquet files:

  • Will I see reduced I/O performance from losing sequential read access?

itā€™s a bit slower in single process, but if you use multiple DataLoader workers you should be able to saturate your bandwidth

Lastly, I wanted to bump another userā€™s thread who asked if (in general, i.e. assuming the typical arrow/parquet setup) the Datasets library is compatible with PyTorch distributed training via Lightningā€™s Trainer.

Iā€™m not too familiar with Lightningā€™s Trainer, but calling split_dataset_by_node in a custom IterableDataset should work indeed. Maybe we should add a way to split by node without having to specify the rank and world_size manually in split_by_node, this could solve the issue.

e.g. calling split_dataset_by_node(dataset) without rank and world_size could default to using the values for torch.distributed, WDYT ? cc @rbrthogan as well

Thanks for the detailed answer @lhoestq! Very helpful. Iā€™ve uploaded my datasets in HF native format and Iā€™ll test out how they work distributed with the Lightning Trainer and let you know.

I think it would make a lot of sense to default rank/world to the torch.distributed versions if not provided, as I imagine most users would just be using the torch.distributed versions as args to split_dataset_by_node. Maybe add some logging/warning to make it clear whatā€™s happening (ā€œrank/world not provided, using torch.distributed values ofā€¦ā€)? I generally find distributed data setups a bit opaque in tracking what is going where.

On split_by_node for WDS:

Yes webdataset or any sharded format is compatible with this. See split_dataset_by_node() :wink:

I think the part Iā€™m puzzled by is: WDS is sharded to allow for fast sequential reads. If the shards donā€™t divide evenly, HF will have all workers read all shards but have each worker skip most indices. But unlike the HF indexed parquet files, WDS is not designed to be read while skipping indices? You could read through the WDS shard and do nothing for most of the data, but that seems quite inefficient without some care (see wids for the ā€˜officialā€™ version of this).