Hi, I want to apply a company internal transformer based NLP model on the rows in a hf dataset. It works fine when i use map with num_proc <=4. For values grater than it errors out with the following stacktrace:
Internal error
Traceback (most recent call last):
File “/flow/metaflow/metaflow/cli.py”, line 1172, in main
start(auto_envvar_prefix=“METAFLOW”, obj=state)
File “/flow/metaflow/metaflow/_vendor/click/core.py”, line 829, in call
return self.main(args, kwargs)
File “/flow/metaflow/metaflow/_vendor/click/core.py”, line 782, in main
rv = self.invoke(ctx)
File “/flow/metaflow/metaflow/_vendor/click/core.py”, line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File “/flow/metaflow/metaflow/_vendor/click/core.py”, line 1066, in invoke
return ctx.invoke(self.callback, ctx.params)
File “/flow/metaflow/metaflow/_vendor/click/core.py”, line 610, in invoke
return callback(args, kwargs)
File “/flow/metaflow/metaflow/_vendor/click/decorators.py”, line 21, in new_func
return f(get_current_context(), args, kwargs)
File “/flow/metaflow/metaflow/cli.py”, line 581, in step
task.run_step(
File “/flow/metaflow/metaflow/task.py”, line 583, in run_step
self._exec_step_function(step_func)
File “/flow/metaflow/metaflow/task.py”, line 57, in _exec_step_function
step_function()
File “flow.py”, line 196, in hit_model
result_dataset = dataset.map(process_row,num_proc=16, batch_size=30)
File “/usr/local/lib/python3.8/site-packages/datasets/arrow_dataset.py”, line 592, in wrapper
out: Union[“Dataset”, “DatasetDict”] = func(self, args, kwargs)
File “/usr/local/lib/python3.8/site-packages/datasets/arrow_dataset.py”, line 557, in wrapper
out: Union[“Dataset”, “DatasetDict”] = func(self, args, kwargs)
File “/usr/local/lib/python3.8/site-packages/datasets/arrow_dataset.py”, line 3189, in map
for rank, done, content in iflatmap_unordered(
File “/usr/local/lib/python3.8/site-packages/datasets/utils/py_utils.py”, line 1387, in iflatmap_unordered
raise RuntimeError(
RuntimeError: One of the subprocesses has abruptly died during map operation.To debug the error, disable multiprocessing.
Below is the code which is a step in a metaflow pipeline. we have 32 cpus and 22gigs of memory.
dataset = load_dataset('/root/dir/',num_proc=32)
MODEL = Model()
def process_row(row):
try:
result = MODEL.run(row)
except:
result = {}
return result
result_dataset = dataset.map(process_row,num_proc=16, batch_size=30)