Debugging parallel Datasets transformations

I’ve recently spent some time scratching my head over some tricky bugs in parallel dataset processing (with .map()).

In particular at the moment, I have a use case with multiple .map()pings where everything works fine (but very slowly) as long as num_proc=None… But when scaling up to multiple workers, the system freezes part way through.

Specifically:

  • The progress indicators always seem to suggest it gets to the end of one of the map()s at the point where the script becomes unresponsive: The last thing to print are full progress bars from the final busy workers.
  • Multiple cores (but not all / as many as I have workers) show as “busy”, but nothing else happens even after leaving the system for minutes or hours.

One reason I’m finding this tough to debug is it seems like there’s so many things that could go wrong in map multiprocessing: I’ve already seen, for example, that with multiple processes uncaught errors can sometimes silently crash workers without managing to print the error… Which is fine for things that can be debugged with num_proc=None, but tricky after that!

Can anybody help narrow down the list: Which of these things do we know could, or wouldn’t cause these kinds of hanging errors in situations where a dataset is prepared through multiple .map() stages:

  1. Are there particular thread-safety restrictions on fn_kwargs keyword args, assuming these are only read and not written by the mapper? E.g. is it OK to use kwargs with types like dict, set, maybe numpy.ndarray?
  2. Is it true a TokenizerFast should be usable in .map() (passed as a kwarg and then called on each batch) without expecting any problems? I think the TOKENIZERS_PARALLELISM=false env var may be required for this…
  3. How about a Processor? Are there processors that might have a problem being used in this way?
  4. In the actual result columns of the dataset returned by the map function, is it ok to return ndarrays? Or must be plain Python lists?
    A). How about if multiple returned features are views that might map to shared underlying ndarray data? (For e.g. by the map fn preparing one big ndarray and then returning slices of it without .copy().
    B). How about if multiple examples in one feature of a returned batch are slices/views?
  5. What about variables in closure scope of the map function? E.g. if some variables are in scope at the point the mapper function is def defined, and the function tries to access those - again assuming it’s in a relatively sensible read-only way.
    • For example, using a pre-defined logger from standard logging library?

So at the moment I have no end to my list of things that could possibly-maybe-perhaps cause deadlock/multiprocessing issues when chaining together a couple of .map() calls.

Any help narrowing down which things are unlikely/impossible to cause these problems, and which are probable causes, would be much appreciated!

For anybody keen to take the plunge, the code I’m struggling with lately is actually public here: Specifically the data subfolder and dataproc_num_workers parameter… But it’s part of a long workflow and designed to run on Amazon SageMaker + AWS - so not exactly a minimal & locally-reproducible example.

1 Like

Hi !

  1. the kwargs are copied (using dill which is an extension of pickle) and passed to each process. In particular we use multiprocess.Pool to spawn the subprocesses.
  2. we set TOKENIZERS_PARALLELISM=false for subprocesses already, and we show a warning to say that we set it explicitly. If you set TOKENIZERS_PARALLELISM=false by yourself, the warning is hidden. This shouldn’t cause the job to hang
  3. It should be fine unless the processor already does some parallelism, in which case you have to account for that.
  4. you can return Numpy arrays, and they are converted to Arrow arrays automatically without copying the data. Then every writer_batch_size examples they are written to disk in an Arrow file. However note that each process is independent and has copies of the kwargs_fn values (from dill).
  5. same as the kwargs_fn: the global variables are copied to each process thanks to multiprocess.Pool

Let me know if you have other questions or if I can help

Hi @Thewz Did you find any solutions to this? I am experiencing the same issue. I can only using the main process but because my dataset is very large, it takes a stupid amount of time on only one process. Thank you!