Speeding up Streaming of Large Datasets (FineWeb)?

I’m currently trying to filter down FineWeb to only keep a specific domain (type of text) using a classification model (see code below for skeleton).

if __name__ == "__main__":
    SEED = 0
    SAMPLE_BUFFER_SIZE=5_000
    RECORDS_TO_KEEP= 100_000
    TAKE_SIZE = 10_000_000 # 23,355,019,906 is max size

    fw = load_dataset("HuggingFaceFW/fineweb", split="train", streaming=True)
    fw = fw.shuffle(seed=SEED, buffer_size=SAMPLE_BUFFER_SIZE)
    clf = pickle.load(open('dataset_differentiator.pkl','rb'))
    priority_queue = PriorityQueue(RECORDS_TO_KEEP,key=lambda x: x['prob_control'])
    for sample in tqdm(fw.take(TAKE_SIZE)):
        # this is the domain prediction model, I can share more code if it seems relevant
        prediction = do_prediction_here(sample)
        priority_queue.add_record(prediction)
        
  
    json.dump(priority_queue.get_records(), open('sampled_features_100k.json', 'w'))

The classification model itself runs at ~330 records per second, while the overall speed is closer to ~100 records per second. I suspect it is the data loading based on profiling and program behavior. TQDM will show progress for ~8-10k records, stop for a bit, then start again.

Are there any tricks to improve the retrieval speed when streaming a dataset (like is it possible to pre-fetch the next records or do it concurrently, or are there settings like cache size that I could tweak to reduce the down time)? Or is there a better way of filtering down a larger dataset which doesn’t involve downloading the whole thing at once, I’m open to other methods as well

You can use a DataLoader to enable prefetching and concurrency:

from torch.utils.data import DataLoader
dl = DataLoader(fw.take(TAKE_SIZE), num_workers=1, prefetch_factor=10, batch_size=None)

for sample in tqdm(dl):
    ...

and you can increase num_workers=4 for more concurrency, just note that this will not read in the same order (workers will read in parallel from different shards instead of exhausting the first shard first)

1 Like

Merci, that worked!

On a tangent,I got the following error ~ a day into streaming fineweb:

requests.exceptions.ChunkedEncodingError: ('Connection broken: IncompleteRead(1398776 bytes read, 3882472 more expected)', IncompleteRead(1398776 bytes read, 3882472 more expected))

I looked at the Fineweb repo and it doesn’t look like it is using a custom loader like for RedPajamas (Error Handling in IterableDataset?). Could I use the Dataloader to handle errors when loading data or is there a better of doing that? I tried looking through the source code for Datasets, but I don’t understand it well enough atm

Do you happen to have the full stacktrace ?

We have retry mechanisms in place in case there is any issue when streaming, maybe this case is not covered well enough.

No, I didn’t save the full trace

I’m rerunning the code over the weekend, if it fails again I’ll post the full trace on Monday

If not, some gateways for my work laptop’s VPN throw a similar looking error consistently (even before the streaming starts). I’ll use one of the “bad” gateways and share that trace although I’m not sure if it’s the same error exactly…

@lhoestq

Here you go. It’s a bit long because it’s in a tqdm loop and the dataloader, so if it would make it easier I can simplify the code.

Traceback (most recent call last):
  File "C:\Users\flabelle002\Downloads\parquet_files\fineweb_curation.py", line 85, in <module>
    for sample in tqdm(dl):
  File "C:\Users\flabelle002\AppData\Roaming\Python\Python311\site-packages\tqdm\std.py", line 1182, in __iter__
    for obj in iterable:
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\torch\utils\data\dataloader.py", line 630, in __next__
    data = self._next_data()
           ^^^^^^^^^^^^^^^^^
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\torch\utils\data\dataloader.py", line 1325, in _next_data
    return self._process_data(data)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\torch\utils\data\dataloader.py", line 1371, in _process_data
    data.reraise()
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\torch\_utils.py", line 694, in reraise
    raise exception
requests.exceptions.ConnectionError: Caught ConnectionError in DataLoader worker process 6.
Original Traceback (most recent call last):
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\urllib3\connectionpool.py", line 791, in urlopen
    response = self._make_request(
               ^^^^^^^^^^^^^^^^^^^
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\urllib3\connectionpool.py", line 537, in _make_request
    response = conn.getresponse()
               ^^^^^^^^^^^^^^^^^^
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\urllib3\connection.py", line 461, in getresponse
    httplib_response = super().getresponse()
                       ^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\http\client.py", line 1378, in getresponse
    response.begin()
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\http\client.py", line 318, in begin
    version, status, reason = self._read_status()
                              ^^^^^^^^^^^^^^^^^^^
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\http\client.py", line 287, in _read_status
    raise RemoteDisconnected("Remote end closed connection without"
http.client.RemoteDisconnected: Remote end closed connection without response

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\requests\adapters.py", line 486, in send
    resp = conn.urlopen(
           ^^^^^^^^^^^^^
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\urllib3\connectionpool.py", line 845, in urlopen
    retries = retries.increment(
              ^^^^^^^^^^^^^^^^^^
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\urllib3\util\retry.py", line 470, in increment
    raise reraise(type(error), error, _stacktrace)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\urllib3\util\util.py", line 38, in reraise
    raise value.with_traceback(tb)
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\urllib3\connectionpool.py", line 791, in urlopen
    response = self._make_request(
               ^^^^^^^^^^^^^^^^^^^
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\urllib3\connectionpool.py", line 537, in _make_request
    response = conn.getresponse()
               ^^^^^^^^^^^^^^^^^^
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\urllib3\connection.py", line 461, in getresponse
    httplib_response = super().getresponse()
                       ^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\http\client.py", line 1378, in getresponse
    response.begin()
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\http\client.py", line 318, in begin
    version, status, reason = self._read_status()
                              ^^^^^^^^^^^^^^^^^^^
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\http\client.py", line 287, in _read_status
    raise RemoteDisconnected("Remote end closed connection without"
urllib3.exceptions.ProtocolError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\torch\utils\data\_utils\worker.py", line 308, in _worker_loop
    data = fetcher.fetch(index)
           ^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\torch\utils\data\_utils\fetch.py", line 41, in fetch
    data = next(self.dataset_iter)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\datasets\iterable_dataset.py", line 1358, in __iter__
    yield from self._iter_pytorch()
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\datasets\iterable_dataset.py", line 1293, in _iter_pytorch
    for key, example in ex_iterable:
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\datasets\iterable_dataset.py", line 1039, in __iter__
    yield from islice(self.ex_iterable, self.n)
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\datasets\iterable_dataset.py", line 982, in __iter__
    for x in self.ex_iterable:
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\datasets\iterable_dataset.py", line 281, in __iter__
    for key, pa_table in self.generate_tables_fn(**self.kwargs):
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\datasets\packaged_modules\parquet\parquet.py", line 79, in _generate_tables
    for batch_idx, record_batch in enumerate(
  File "pyarrow\_parquet.pyx", line 1323, in iter_batches
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\datasets\download\streaming_download_manager.py", line 333, in read_with_retries
    out = read(*args, **kwargs)
          ^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\fsspec\spec.py", line 1858, in read
    out = self.cache._fetch(self.loc, self.loc + length)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\fsspec\caching.py", line 156, in _fetch
    self.cache = self.fetcher(start, end)  # new block replaces old
                 ^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\huggingface_hub-0.19.0-py3.8.egg\huggingface_hub\hf_file_system.py", line 444, in _fetch_range
    r = http_backoff("GET", url, headers=headers)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\huggingface_hub-0.19.0-py3.8.egg\huggingface_hub\utils\_http.py", line 267, in http_backoff
    response = session.request(method=method, url=url, **kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\requests\sessions.py", line 589, in request
    resp = self.send(prep, **send_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\requests\sessions.py", line 725, in send
    history = [resp for resp in gen]
              ^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\requests\sessions.py", line 725, in <listcomp>
    history = [resp for resp in gen]
              ^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\requests\sessions.py", line 266, in resolve_redirects
    resp = self.send(
           ^^^^^^^^^^
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\requests\sessions.py", line 703, in send
    r = adapter.send(request, **kwargs)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\huggingface_hub-0.19.0-py3.8.egg\huggingface_hub\utils\_http.py", line 63, in send
    return super().send(request, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\flabelle002\AppData\Local\anaconda3\envs\SRL_Model\Lib\site-packages\requests\adapters.py", line 501, in send
    raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: (ProtocolError('Connection aborted.', RemoteDisconnected('Remote end closed connection without response')), '(Request ID: 0e22e81e-79d7-44d9-9845-7515f81c82ac)') 

Thanks ! I opened [Streaming] retry on requests errors by lhoestq · Pull Request #6963 · huggingface/datasets · GitHub to suggest a fix that would let you increase the number of network connection retries when streaming your dataloader

1 Like

Perfect, thanks Quentin!

I’ll go modify the config file to increase max_retries and add requests errors to the try-catch like in your PR

This topic was automatically closed 12 hours after the last reply. New replies are no longer allowed.