I’m trying to train a model on this dataset: MLCommons/unsupervised_peoples_speech · Datasets at Hugging Face.
I’m using WebDataset to iterate over the tar files using a brace expansion. This is basically a wrapper on top of torch’s IterableDataset. The problem is that if I set up more than 1 worker in the loader I get the following error:
Traceback (most recent call last):
File "/usr/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
self.run()
File "/usr/lib/python3.10/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/home/rafael/unsupervised_peoples_speech/data_wrangling/multi_VAD.py", line 57, in producer
for i in islice(dataset,0,128):
File "/home/rafael/unsupervised_peoples_speech/speech/lib/python3.10/site-packages/webdataset/pipeline.py", line 70, in iterator
yield from self.iterator1()
File "/home/rafael/unsupervised_peoples_speech/speech/lib/python3.10/site-packages/webdataset/filters.py", line 397, in _to_tuple
for sample in data:
File "/home/rafael/unsupervised_peoples_speech/speech/lib/python3.10/site-packages/webdataset/tariterators.py", line 219, in group_by_keys
for filesample in data:
File "/home/rafael/unsupervised_peoples_speech/speech/lib/python3.10/site-packages/webdataset/tariterators.py", line 190, in tar_file_expander
if handler(exn):
File "/home/rafael/unsupervised_peoples_speech/speech/lib/python3.10/site-packages/webdataset/filters.py", line 86, in reraise_exception
raise exn
File "/home/rafael/unsupervised_peoples_speech/speech/lib/python3.10/site-packages/webdataset/tariterators.py", line 177, in tar_file_expander
for sample in tar_file_iterator(
File "/home/rafael/unsupervised_peoples_speech/speech/lib/python3.10/site-packages/webdataset/tariterators.py", line 149, in tar_file_iterator
if handler(exn):
File "/home/rafael/unsupervised_peoples_speech/speech/lib/python3.10/site-packages/webdataset/filters.py", line 86, in reraise_exception
raise exn
File "/home/rafael/unsupervised_peoples_speech/speech/lib/python3.10/site-packages/webdataset/tariterators.py", line 142, in tar_file_iterator
data = stream.extractfile(tarinfo).read()
File "/usr/lib/python3.10/tarfile.py", line 689, in read
b = self.fileobj.read(length)
File "/usr/lib/python3.10/tarfile.py", line 526, in read
buf = self._read(size)
File "/usr/lib/python3.10/tarfile.py", line 534, in _read
return self.__read(size)
File "/usr/lib/python3.10/tarfile.py", line 564, in __read
buf = self.fileobj.read(self.bufsize)
File "/home/rafael/unsupervised_peoples_speech/speech/lib/python3.10/site-packages/webdataset/gopen.py", line 88, in read
self.check_status()
File "/home/rafael/unsupervised_peoples_speech/speech/lib/python3.10/site-packages/webdataset/gopen.py", line 68, in check_status
self.wait_for_child()
File "/home/rafael/unsupervised_peoples_speech/speech/lib/python3.10/site-packages/webdataset/gopen.py", line 83, in wait_for_child
raise IOError(f"{self.args}: exit {self.status} (read) {info}")
OSError: ("(('curl -s -L https://huggingface.co/datasets/MLCommons/unsupervised_peoples_speech/resolve/main/audio/000004.tar -H Authorization:Bearer hf_ASDASDSAD',), {'shell': True, 'bufsize': 8192}): exit 6 (read) {} @ <Pipe (('curl -s -L https://huggingface.co/datasets/MLCommons/unsupervised_peoples_speech/resolve/main/audio/000004.tar -H Authorization:Bearer hf_ASDASDASD',), {'shell': True, 'bufsize': 8192})>", <webdataset.gopen.Pipe object at 0x7f897a132080>, 'pipe:curl -s -L https://huggingface.co/datasets/MLCommons/unsupervised_peoples_speech/resolve/main/audio/000004.tar -H Authorization:Bearer hf_ASDASDASDP')
I’ve done some testing, and if I place a time.sleep this error doesn’t occur, which makes me think there’s a rate limiter in place. Is there a way to overcome this? Any suggestions?
Edit: I’m including the code as reference:
import torch
import torchaudio
import braceexpand
import webdataset as wds
from itertools import islice
from utils import resample_squeeze, extract_tar_number
import multiprocessing
from io import BytesIO
import time
import json
import os
import uuid
from dotenv import load_dotenv
load_dotenv()
NUM_PROCESS=128
token = os.environ['HF_TOKEN']
url = "https://huggingface.co/datasets/MLCommons/unsupervised_peoples_speech/resolve/main/audio/{000001..000005}.tar"
token = f'Authorization:Bearer {token}'
urls = list(braceexpand.braceexpand(url))
urls = [f"pipe:curl -s -L {url} -H {token}" for url in urls]
model, utils = torch.hub.load(repo_or_dir='snakers4/silero-vad',
model='silero_vad',
force_reload=True,
onnx=True)
(get_speech_timestamps,
save_audio,
read_audio,
VADIterator,
collect_chunks) = utils
def vad_process(sample: str, model):
with torch.no_grad():
return get_speech_timestamps(
sample,
model
)
dataset = (wds.WebDataset(urls, nodesplitter=wds.split_by_node).
to_tuple('mp3', '__key__','__url__', handler = wds.handlers.ignore_and_continue))
def producer(queue, dataset):
for i in islice(dataset,0,1024):
queue.put(i)
for i in range(NUM_PROCESS):
queue.put(None)
print('Producer: Done', flush=True)
def consumer(queue):
model, _ = torch.hub.load(repo_or_dir='snakers4/silero-vad',
model='silero_vad',
force_reload=False,
onnx=True)
cur_id = uuid.uuid4()
while True:
item = queue.get()
if item is None:
break
try:
audio = torchaudio.load(BytesIO(item[0]))
waveform, duration = resample_squeeze(audio)
except:
continue
result = {item[1]: {'timestamps': vad_process(waveform, model), 'duration': duration, 'tar_number': extract_tar_number(item[-1])}}
with open(f'results/vad_results_{cur_id}.jsonl', 'a+') as f:
f.write(json.dumps(result) + '\n')
print('Consumer: Done', flush=True)
start = time.perf_counter()
queue = multiprocessing.Queue(maxsize=64)
results = []
consumer_processes = [multiprocessing.Process(target=consumer, args=(queue,)) for _ in range(NUM_PROCESS)]
for process in consumer_processes:
process.start()
producer_process = multiprocessing.Process(target=producer, args=(queue, dataset))
producer_process.start()
producer_process.join()
for process in consumer_processes:
process.join()
end = time.perf_counter()
print(f"Time: {end - start}")