Map fails for more than 4 processes

Hi, I want to apply a company internal transformer based NLP model on the rows in a hf dataset. It works fine when i use map with num_proc <=4. For values grater than it errors out with the following stacktrace:

Internal error

Traceback (most recent call last):
File “/flow/metaflow/metaflow/cli.py”, line 1172, in main
start(auto_envvar_prefix=“METAFLOW”, obj=state)
File “/flow/metaflow/metaflow/_vendor/click/core.py”, line 829, in call
return self.main(args, kwargs)
File “/flow/metaflow/metaflow/_vendor/click/core.py”, line 782, in main
rv = self.invoke(ctx)
File “/flow/metaflow/metaflow/_vendor/click/core.py”, line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File “/flow/metaflow/metaflow/_vendor/click/core.py”, line 1066, in invoke
return ctx.invoke(self.callback, ctx.params)
File “/flow/metaflow/metaflow/_vendor/click/core.py”, line 610, in invoke
return callback(args, kwargs)
File “/flow/metaflow/metaflow/_vendor/click/decorators.py”, line 21, in new_func
return f(get_current_context(), args, kwargs)
File “/flow/metaflow/metaflow/cli.py”, line 581, in step
task.run_step(
File “/flow/metaflow/metaflow/task.py”, line 583, in run_step
self._exec_step_function(step_func)
File “/flow/metaflow/metaflow/task.py”, line 57, in _exec_step_function
step_function()
File “flow.py”, line 196, in hit_model
result_dataset = dataset.map(process_row,num_proc=16, batch_size=30)
File “/usr/local/lib/python3.8/site-packages/datasets/arrow_dataset.py”, line 592, in wrapper
out: Union[“Dataset”, “DatasetDict”] = func(self, args, kwargs)
File “/usr/local/lib/python3.8/site-packages/datasets/arrow_dataset.py”, line 557, in wrapper
out: Union[“Dataset”, “DatasetDict”] = func(self, args, kwargs)
File “/usr/local/lib/python3.8/site-packages/datasets/arrow_dataset.py”, line 3189, in map
for rank, done, content in iflatmap_unordered(
File “/usr/local/lib/python3.8/site-packages/datasets/utils/py_utils.py”, line 1387, in iflatmap_unordered
raise RuntimeError(
RuntimeError: One of the subprocesses has abruptly died during map operation.To debug the error, disable multiprocessing.
Below is the code which is a step in a metaflow pipeline. we have 32 cpus and 22gigs of memory.

        dataset = load_dataset('/root/dir/',num_proc=32)

        MODEL = Model()

        def process_row(row):
            try:
                result = MODEL.run(row)
            except:
                result = {}
            return result

        result_dataset =  dataset.map(process_row,num_proc=16, batch_size=30)

It’s impossible to recover the exception from a failed multiprocess process, so set num_proc=None and run the map on a smaller subset to get an error causing one (or more) of the processes to die.

concurrent.futures.ProcessPoolExecutor can recover such errors but is limited in terms of what it can serialize, so we cannot make the switch.

Hi,
The code runs fine for num_proc=1 or if I spin up really big machine and not use all the processes.
I think my problem is similar to this issue discussed where the memory keeps on increasing. Was this ever resolved? is there a new suggested solution other than this?

hi did you find any solution ?

I have the same problem