Multiprocessing map taking too much memory footprint

Hi,

I use map to preprocess my super large datasets(about 450G). I run map function on 1 node/1 gpu to finish tokenization, while I got stuck at transferring to multi-node training on 10 nodes/ 8 gpus, num_proc = 18). I check my system status, and it seems that my memory is out of flow, which didnā€™t happen at 1node/1 gpu condition. How could I fix it? Thanks!

Hereā€™s my 1 node/1 gpu training system status, out of flow happen when transferring to 8gpus


total Memory:950GB

I am wondering the memory footprint is normal or not. Why map take so much memory ( about 400G )? ? Iā€™m very happy to provide some clue about my training.

Hi! What do you get when you access the cache_files attribute of your dataset? map with multiprocessing can be an issue for in-memory datasets due to data being copied to the subprocesses (more info).

Thank you for reply! @mariosasko
Iā€™m not for sure about cache_files , but dataset should be cached to disk I guess? Cause there is some tips like ā€œfound cached files fromā€¦ā€ before go map. I use map like this:

        with training_args.main_process_first(desc="train dataset map pre-processing"):
            train_dataset = train_dataset.map(
                preprocess_function,
                batched=True,
                num_proc=data_args.preprocessing_num_workers,
                batch_size=10000,
                remove_columns=column_names,
                load_from_cache_file=not data_args.overwrite_cache,
                desc="running tokenizer on train dataset",
            )

The main process starts taking up a lot of memory after create num_proc workers to execute map, even though I have done map before, so there is nothing do but loading the cache files.
I found that subprocess in multi-gpu will execute ā€˜mapā€™ too, and memory rate jump to 100%.

I saved the datasets after map , and load it directly (which means I donā€™t need map) and remove map from my scripts, everything just fine, so I think there are some problem with the workers in map

I have two question:

  1. do all workers share or copy the data?
  2. how to control the use of memory for every worker.
  1. do all workers share or copy the data?

When you load a dataset, it actually memory maps the data from your disk. All workers memory map the same files, so this is shared memory.

  1. how to control the use of memory for every worker.

Itā€™s the sum of the physical memory used by your job, and the physical memory occupied by pages
from memory mapped files. Memory mapping will bring pages in physical memory when you access the dataset, and remove them from physical memory after a while if theyā€™re not used. Pages are also automatically removed if your job requires a lot of memory.

I too am having similar issues. I find that while map is processing my dataset, memory gradually fills up until I get an OOM error after a few hours. Iā€™ve tried playing around with batch_size, num_proc, and writer_batch size extensively to no avail (including with each value set to 1!). I then tried sharding the dataset into 64 shards and applying map to each one individually in a for-loop, as can be seen below:

def mem():
    m = psutil.Process(os.getpid()).memory_info().rss / (1024**3)
    return f"{round(m, 2)}G"

N = 64

print("INITIAL:", mem())
print(f"DATASET: {round(dataset.dataset_size / (1024 ** 3), 2)}G")
shards = [dataset.shard(N, i) for i in range(N)]
for i, d in enumerate(shards):
    print(f"{i}:", mem())
    d.map(tokenize_fn, batched=True)

This yields an unexpected log, namely

INITIAL: 0.5G
DATASET: 24.0G
0: 0.5G
1: 17.2G
2: 23.0G
3: 29.5G
4: 34.5G
...

The steady increase in memory utilization is further reflected by watching with top. Maybe Iā€™m misunderstanding how the memory-mapping works, but I would have expected the amount of memory to remain relatively constant, as each shard is processed in-memory, then relinquished. When I add del d or even gc.collect() (where gc is Python garbage collection library) there is not any difference in memory.

One additional unexpected behavior occurs when re-running the code sample. Suppose I ran the first code sample until 5 shards had been processed and the log was printed as depicted above. After that suppose I re-run the exact same code. This time, datasets finds cache files in my ~/.cache directory because the map hash matches. In this circumstance, the following log would be printed

INITIAL: 0.5G
DATASET: 24.0G
0: 0.5G
1: 0.5G
2: 0.5G
3: 0.5G
4: 0.5G
5: 39.8G
6:45.7G
7: 51.2G
...

From this output, it appears to me like memory-mapping works as expected until the sixth iteration, at which point it just ā€œgives upā€ and loads everything into memory? What is weird is that at this point, if I individually process each shard by executing my script once, I could feasibly merge them all together at the end of this with the concatenate function.

Basically, what I would like is to be able to preprocess my dataset without having the entire thing in RAM at once, which is not feasible for my current hardware setup. I can provide more specific details about my code if its helpful, but at the moment Iā€™m assuming this is just a misunderstanding on my part of how memory mapping works. Thanks!

Can you also share the tokenize_fn definition and the code that generates the dataset so we can reproduce the behavior ourselves?

My data processing pipeline is rather extensive. I produced a minimal example along with some test data and posted them to Github along with some detailed documentation.

Let me know if there is anything else I can provide. Thanks!

EDIT: it took me a few minutes to actually get some big files onto Github, but its all there now!

Thanks for the reproducer!

The sole iteration over a dataset in map will cause memory usage to increase (see With dataloader RSS memory consumed by HF datasets monotonically increases Ā· Issue #4883 Ā· huggingface/datasets Ā· GitHub), but not nearly as much as is the case with this tokenizer transform. So I suspect this has something to do with tokenizers. Iā€™m not familiar enough with the tokenizersā€™ internals to explain this behavior, so maybe @Narsil can help.

Yea it could be the fact that Iā€™m doing something a little unusual that the tokenizers may not have been designed for. I do binary analysis, so Iā€™m trying to tokenize raw binaries. Most of the utilities in the tokenizers work smoothest for utf-8 encoding, so I map each byte value to a unique utf-8 symbol (this is why my data values look like absolute mumbo-jumbo). Essentially the issue could be caused by the fact that I am trying to tokenize documents that can be tens (or hundreds) of thousands of tokens. I havnā€™t studied the tokenization algorithms or their complexities directly, so I might be trying to insert a square peg into a round hole, as they say.

Datasets are memory mapped from disk, so accessing slices of data counts as adding them to RSS memory. Though it will not fill your physical memory since it pages out the slices of data as soon as any other process requires some memory. Therefore your RSS memory keeps increasing as you iterate on the dataset, but without OOM because the slices of data that are not used anymore are paged out if your system demands memory for something else.

But it does OOM! Could it be a memory leak with the tokenizers?

I also face the same issue regarding OOM when trying to map and do tokenize sentences on large dataset :face_holding_back_tears:.

It seems as if the memory was not release after finish processing. The only workaround for me now is to use with_transform for lazily mapping on the fly instead which cannot be cached to disk and cause a bottleneck to my GPU :sob:.

1 Like

Oh, able to solve it now. It turns out that even if we set keep_in_memory=False, but we still need to set cache_file_name so that the datasets can move the data from memory into on disk.

hey could you elaborate on the args cache_file_name what is this arg about and how does it help in memory not blowing up.

i am facing similar issues and setting keep_in_memory=False isnt helping

A dataset that comes from memory (e.g. using .from_dict()) doesnā€™t have a cache file yet, so if you want your map() to write on disk instead of filling up your memory you should pass a cache_file_name to map().

Note that at one point we might allocate a cache automatically to such datasets in memory to align with the general behavior.

@lhoestq i did try this but i get the following error:
the following is my map function.

vectorized_datasets = raw_datasets.map(
            prepare_dataset,
            num_proc=data_args.preprocessing_num_workers,
            remove_columns=next(iter(raw_datasets.values())).column_names,
            keep_in_memory=False, 
            cache_file_name="./dataset_cache",
            desc="preprocess train dataset",
        )

the following is the error:

TypeError: DatasetDict.map() got an unexpected keyword argument 'cache_file_name'
  File "/media/user/drive_2/maithili_asr/whisper_ft.py", line 791, in <module>
    main()
  File "/media/user/drive_2/maithili_asr/whisper_ft.py", line 643, in main
    vectorized_datasets = raw_datasets.map(
                          ^^^^^^^^^^^^^^^^^
TypeError: DatasetDict.map() got an unexpected keyword argument 'cache_file_name'

Your object is a DatasetDict so it may contain multiple Dataset objects (e.g. train and test splits).

cache_file_name is available in Dataset.map() so you can do

vectorized_datasets_train = raw_datasets["train"].map(..., cache_file_name=train_cache_file_name)
vectorized_datasets_test = raw_datasets["test"].map(..., cache_file_name=test_cache_file_name)