Map multiprocessing Issue

Thank you!!! I had the same problem with processor!

I’m having the same exact issue. Any solutions?

I got a solution from Github: Multiprocessing for pipeline · Issue #14919 · huggingface/transformers · GitHub.

Something like this works for me:

from datasets import Dataset
from transformers import pipeline

_pipe = None


def create_pipeline():
    global _pipe
    if _pipe is None:
        _pipe = pipeline(tokenizer="/your/tokenizer", model="/your/model")
    return _pipe


def transform(x):
    return {"result": create_pipeline()(x)}


if __name__ == "__main__":
    dataset = Dataset.from_list([{"text": "Some text"}] * 10)
    dataset.map(transform, batch_size=8, num_proc=4)

Hope it helps.

1 Like

Is anybody still stuck with this error? I was not able to solve with the above-mentioned steps.

Faced the same issue on Windows. The problem was (for me) that multiprocessing works differently on Linux systems and Windows.

In Linux, a new process is created using the fork system call - which creates a copy of all the memory (meaning any global variable created before will still be accessible).

In Windows, there is no fork; Instead, a new process is created, and then Python pickles only the relevant data and sends it to the new process (so global variables will not remain by default).

I solve it by wrapping my function in a class (a decorator will work the same way).

I turned this code:

def prepare_dataset(batch):
    audio = batch["audio"]

    batch["input_features"] = processor.feature_extractor(audio["array"], sampling_rate=audio["sampling_rate"]).input_features[0]

    batch["labels"] = processor.tokenizer(batch["transcription"]).input_ids
    return batch

Into that:

@dataclass
class DatasetPrepper:
    processor: WhisperProcessor

    def __call__(self, batch):
        audio = batch["audio"]

        batch["input_features"] = self.processor.feature_extractor(audio["array"], sampling_rate=audio["sampling_rate"]).input_features[0]

        batch["labels"] = self.processor.tokenizer(batch["transcription"]).input_ids
        return batch

I then use it like this:

dataset = dataset.map(DatasetPrepper(processor), remove_columns=dataset.column_names["train"], num_proc=24)

Then, whatever external objects I rely on will be pickled correctly.

Hope it helps someone still struggling with this.

4 Likes

hello i have the same issue in maping data can you help me please:

code:
common_voice = common_voice.map(prepare_dataset, remove_columns=common_voice.column_names[“train”], num_proc=6)

issue:
RuntimeError: One of the subprocesses has abruptly died during map operation.To debug the error, disable multiprocessing.

i tried your solution but didn’t work:
RuntimeError: One of the subprocesses has abruptly died during map operation.To debug the error, disable multiprocessing.
do you have another solution plz

hey am getting this error
AttributeError: in user code:

File "<ipython-input-193-90f6a1831894>", line 24, in None  *
    lambda x: preprocess({'text': x})
File "<ipython-input-201-d754e3ba57bd>", line 12, in preprocess  *
    encoded_inputs = tokenizer.encode(input_text_str.numpy().decode('utf-8'))

AttributeError: 'SymbolicTensor' object has no attribute 'numpy'

for the code
import tensorflow as tf

def preprocess(inputs):
input_text = inputs[‘text’]

def process_text(input_text):
    input_text_str = tf.strings.as_string(input_text)
    print("Input text:", input_text_str.numpy().decode('utf-8'))
    return input_text_str

input_text_str = tf.py_function(process_text, [input_text], tf.string)
encoded_inputs = tokenizer.encode(input_text_str.numpy().decode('utf-8'))
outputs = masker(encoded_inputs)
features = {
    "token_ids": outputs["token_ids"],
    "mask_positions": outputs["mask_positions"],
}
labels = outputs["mask_ids"]
weights = outputs["mask_weights"]
return features, labels, weights

Use prefetch() to pre-compute preprocessed batches on the fly on the CPU.

pretrain_ds = wiki_train_ds.map(
lambda x: preprocess({‘text’: x}),
num_parallel_calls=tf.data.AUTOTUNE
).prefetch(tf.data.AUTOTUNE)

Preview a single input example.

The masks will change each time you run the cell.

print(pretrain_ds.take(1).as_numpy_iterator().next())
anyone please help

I tried your solution but still i am getting same error… can you help me with it?

@rokayabn hey i am too facing the same on the latest release of torch transformers and datasets. what seems to be the solution

Hello! For all of you who are stuck, when you run a multiprocessing .map, you CANNOT use a globally defined variable in the function! Since there is only one object with a globally defined variable, the multiple processes could end up trying to edit the same object at the same time. Instead, you must pass the global variable into the function using the fn_kwargs argument of the .map function.

This is why @simonschoe was getting their “NameError: name ‘tokenizer’ is not defined,” their lambda function was calling for the ‘tokenizer’ variable but said tokenizer variable was not passed into the different processes. To fix their error, they would need to use this instead:

> datasets = datasets.map(
>     lambda sequence: tokenizer(sequence['text'], return_special_tokens_mask=True),
>     batched=True,
>     batch_size=1000,
>     num_proc=2, #psutil.cpu_count()
>     remove_columns=['text'],
>     fn_kwargs={'tokenizer': tokenizer}
> )

(I personally would switch to a def function instead, but I believe this small change should make the lambda function work).

Here is the function I would use if I was writing this code:

def TokenizerFunction(text_col, custom_tokenizer):
  return custom_tokenizer(text_col, return_special_tokens_mask=True)

In the main process, I would then add:

> datasets = datasets.map(
>     function=TokenizerFunction, #pass the tokenizer function above
>     num_proc=2,
>     batched=True,
>     remove_columns=['text'],
>     input_columns=['text'], #I'm using the positional column instead, so in my function text_col will basically be ~= batch['text']
>     fn_kwargs={'custom_tokenizer': tokenizer}) #Here I'm passing a keyword argument (instead of a positional argument), for the argument 'custom_tokenizer'  in my function, I pass the tokenizer global variable.

The trick here is that each process created by the map function cannot access globally declared variables since this would force the processes to share said variable (since Python basically only passes by ref, each process could theoretically change one variable). By passing them as fn_kwargs isntead, I believe the map function instead passes copies of the variable or maybe read only pointers to the global variable. I’ve noticed that while you can use the fn_kwargs variables in multiprocessing, it seems like its not possible to change said variables.

Edit: This also includes imports! If you import something at the global level, you cannot automatically use it in the multiprocessing function! Instead, you need to add whatever import statements are needed for the function inside of the function. This is why @sauce16112002 was getting the “numpy” error, they need to import numpy inside of their function or pass it as an fn_kwargs (ex. fn_kwargs={‘np’: np} where np is created using import numpy as np)

1 Like