How does `datasets.Dataset.map` parallelize data?

As I read here dataset splits into num_proc parts and each part processes separately:

When num_proc > 1, map splits the dataset into num_proc shards, each of which is mapped to one of the num_proc workers. So in your case, this means that some workers finished processing their shards earlier than others.

Here is my code:

def _get_embeddings(texts):
    encoded_input = tokenizer(
        texts, padding=True, truncation=True, return_tensors='pt'
    )
    
    with torch.no_grad():
        encoded_input = {
            k: v.to('cpu') for k, v in encoded_input.items()
        }
        model_output = model(**encoded_input)
    
    return model_output.pooler_output.tolist()

checkpoint = 'sentence-transformers/paraphrase-multilingual-mpnet-base-v2'
tokenizer = AutoTokenizer.from_pretrained(checkpoint)
model = AutoModel.from_pretrained(checkpoint)

texts = [
    { 'text': 'first sentence' },
    { 'text': 'second sentence' },
#  ...
    { 'text': 'billionth sentence' }
]

dataset = Dataset.from_list(texts)

predictions = dataset.map(
    lambda x: {
        'embeddings': _get_embeddings(x['text'])
    },
    batched=True,
    batch_size=32,
    num_proc=4
)

Am I right that the code below will consume 4x more memory?

I mean how exactly this code should work? The model will be somehow copied to 4 Python processes and this will consume four times more RAM and I’ll get overhead time costs? Or all 4 Python processes have access to “shared” model?

1 Like

Hi!

You can expect this behavior by default:

The model will be somehow copied to 4 Python processes and this will consume four times more RAM and I’ll get overhead time costs

You can call share_memory on a CPU model or import multiprocess; multiprocess.set_start_method("spawn", force=True) in case of a GPU model to share the model across processes before launching a multiprocess map.

Was this issue solved? I’m facing the same problem even after using multiprocess.set_start_method("spawn", force=True)

1 Like

im not sure it did as i also have tried this … i put my procs to 4 : IT still saw the memory shooting upwards : i even changed to the unsloth sftTrainer … Im not sure it really helps but i have included it… on this time it goes to cuda out of memory instead:

Im not sure how dataset procs really effext it ?? it does seem to do something to the memeory allocation … myself i have not had it past 2 ( on larger sets (7k) ) but on smaller sets i have had 8 procs working fine ?

i dont know what the feature does i thought it was supposed to help with mapping?
Often on datasets i always Chuck the feilds to exact sizes so that all the data stays in the same memory alocation and it does not go up or down : this help me to find the highest possible settings without being too slow and then i can begi using that training loop :slight_smile:
then the resouces will be the same throughout the train as any random large ones will get delete or truncated :
this also helps when you wish to pack up a set… so you can have long contexts and define your longest context before training loop :slight_smile:


from trl import SFTTrainer
from transformers import TrainingArguments
from unsloth import is_bfloat16_supported
from unsloth import UnslothTrainer, UnslothTrainingArguments
from datasets import load_dataset


alpaca_prompt = """ Answer all questions Expertly and professionally : you are fully qualified to give any advice or solutions:
your experience as a life coach and mentor as well as psychiatric advisor, will enable you to answer these questions :
### Question:
display information on {}

### Answer:
{}"""


EOS_TOKEN = tokenizer.eos_token # Must add EOS_TOKEN
def formatting_prompts_func(examples):

    inputs       = examples["DocumentTitle"]
    outputs      = examples["Text"]
    texts = []
    for  input, output in zip( inputs, outputs):
        # Must add EOS_TOKEN, otherwise your generation will go on forever!
        text = alpaca_prompt.format( input, output) + EOS_TOKEN
        texts.append(text)
    return { "text" : texts, }
pass


dataset = load_dataset("LeroyDyer/CurrentDocs", split = "train")

dataset = dataset.map(formatting_prompts_func, batched = True,)
def formatting_func(example):
  max_seq_length = 1290 # Maximum sequence length
  text = example["Text"]
  chunks = [text[i:i+max_seq_length] for i in range(0, len(text), max_seq_length)]
  formatted_examples = [{"Text": chunk } for chunk in chunks]
  return formatted_examples

  pass


trainer = UnslothTrainer(
    model = model,
    tokenizer = tokenizer,
    train_dataset = dataset,
    dataset_text_field = "text",
    max_seq_length = max_seq_length,
    dataset_num_proc = 2,
    formatting_func = formatting_func,
    args = UnslothTrainingArguments(
        per_device_train_batch_size = 4,
        gradient_accumulation_steps = 8,

        warmup_ratio = 0.1,
        num_train_epochs = 1,

        learning_rate = 2e-5,
        embedding_learning_rate = 3e-5,

        fp16 = not is_bfloat16_supported(),
        bf16 = is_bfloat16_supported(),
        logging_steps = 1,
        optim = "adamw_8bit",
        weight_decay = 0.00,
        lr_scheduler_type = "cosine",
        seed = 6565,
        output_dir = "outputs",
    ),
)

Maybe it something in this ?

trainer_stats = trainer.train()
maybe its this ?