Map with num_proc over 1 fails

Hi,
I have tested with simple custom text data.
map with num_proc of 1 or none is fine but num_proc over 1 occurs PermissionError.

from datasets import load_dataset

def preprocess(examples):
    ex = [ex.replace('design', 'text') for ex in examples["name"]]
    examples["name"] = ex
    return examples

train_file = {"train":"test.csv"}
ds_raw = load_dataset("csv", data_files=train_file, sep="\t")
ds = ds_raw.map(preprocess, batched=True, num_proc=2)

PermissionError Traceback (most recent call last)
Cell In[66], line 1
----> 1 ds = ds_raw.map(preprocess, batched=True, num_proc=2)

File ~/.venv/lib/python3.10/site-packages/datasets/dataset_dict.py:868, in DatasetDict.map(self, function, with_indices, with_rank, input_columns, batched, batch_size, drop_last_batch, remove_columns, keep_in_memory, load_from_cache_file, cache_file_names, writer_batch_size, features, disable_nullable, fn_kwargs, num_proc, desc)
865 if cache_file_names is None:
866 cache_file_names = {k: None for k in self}
867 return DatasetDict(
→ 868 {
869 k: dataset.map(
870 function=function,
871 with_indices=with_indices,
872 with_rank=with_rank,
873 input_columns=input_columns,
874 batched=batched,
875 batch_size=batch_size,
876 drop_last_batch=drop_last_batch,
877 remove_columns=remove_columns,
878 keep_in_memory=keep_in_memory,
879 load_from_cache_file=load_from_cache_file,
880 cache_file_name=cache_file_names[k],
881 writer_batch_size=writer_batch_size,
882 features=features,
883 disable_nullable=disable_nullable,
884 fn_kwargs=fn_kwargs,
885 num_proc=num_proc,
886 desc=desc,
887 )
888 for k, dataset in self.items()
889 }
890 )

File ~/.venv/lib/python3.10/site-packages/datasets/dataset_dict.py:869, in (.0)
865 if cache_file_names is None:
866 cache_file_names = {k: None for k in self}
867 return DatasetDict(
868 {
→ 869 k: dataset.map(
870 function=function,
871 with_indices=with_indices,
872 with_rank=with_rank,
873 input_columns=input_columns,
874 batched=batched,
875 batch_size=batch_size,
876 drop_last_batch=drop_last_batch,
877 remove_columns=remove_columns,
878 keep_in_memory=keep_in_memory,
879 load_from_cache_file=load_from_cache_file,
880 cache_file_name=cache_file_names[k],
881 writer_batch_size=writer_batch_size,
882 features=features,
883 disable_nullable=disable_nullable,
884 fn_kwargs=fn_kwargs,
885 num_proc=num_proc,
886 desc=desc,
887 )
888 for k, dataset in self.items()
889 }
890 )

File ~/.venv/lib/python3.10/site-packages/datasets/arrow_dataset.py:592, in transmit_tasks..wrapper(*args, **kwargs)
590 self: “Dataset” = kwargs.pop(“self”)
591 # apply actual function
→ 592 out: Union[“Dataset”, “DatasetDict”] = func(self, *args, **kwargs)
593 datasets: List[“Dataset”] = list(out.values()) if isinstance(out, dict) else [out]
594 for dataset in datasets:
595 # Remove task templates if a column mapping of the template is no longer valid

File ~/.venv/lib/python3.10/site-packages/datasets/arrow_dataset.py:557, in transmit_format..wrapper(*args, **kwargs)
550 self_format = {
551 “type”: self._format_type,
552 “format_kwargs”: self._format_kwargs,
553 “columns”: self._format_columns,
554 “output_all_columns”: self._output_all_columns,
555 }
556 # apply actual function
→ 557 out: Union[“Dataset”, “DatasetDict”] = func(self, *args, **kwargs)
558 datasets: List[“Dataset”] = list(out.values()) if isinstance(out, dict) else [out]
559 # re-apply format to the output

File ~/.venv/lib/python3.10/site-packages/datasets/arrow_dataset.py:3177, in Dataset.map(self, function, with_indices, with_rank, input_columns, batched, batch_size, drop_last_batch, remove_columns, keep_in_memory, load_from_cache_file, cache_file_name, writer_batch_size, features, disable_nullable, fn_kwargs, num_proc, suffix_template, new_fingerprint, desc)
3173 if len(kwargs_per_job) < num_shards:
3174 logger.info(
3175 f"Reprocessing {len(kwargs_per_job)}/{num_shards} shards because some of them were missing from the cache."
3176 )
→ 3177 with Pool(len(kwargs_per_job)) as pool:
3178 os.environ = prev_env
3179 logger.info(f"Spawning {num_proc} processes")

File ~/.venv/lib/python3.10/site-packages/multiprocess/context.py:119, in BaseContext.Pool(self, processes, initializer, initargs, maxtasksperchild)
117 ‘’‘Returns a process pool object’‘’
118 from .pool import Pool
→ 119 return Pool(processes, initializer, initargs, maxtasksperchild,
120 context=self.get_context())

File ~/.venv/lib/python3.10/site-packages/multiprocess/pool.py:191, in Pool.init(self, processes, initializer, initargs, maxtasksperchild, context)
188 self._state = INIT
190 self._ctx = context or get_context()
→ 191 self._setup_queues()
192 self._taskqueue = queue.SimpleQueue()
193 # The _change_notifier queue exist to wake up self._handle_workers()
194 # when the cache (self._cache) is empty or when there is a change in
195 # the _state variable of the thread that runs _handle_workers.

File ~/.venv/lib/python3.10/site-packages/multiprocess/pool.py:346, in Pool._setup_queues(self)
345 def _setup_queues(self):
→ 346 self._inqueue = self._ctx.SimpleQueue()
347 self._outqueue = self._ctx.SimpleQueue()
348 self._quick_put = self._inqueue._writer.send

File ~/.venv/lib/python3.10/site-packages/multiprocess/context.py:113, in BaseContext.SimpleQueue(self)
111 ‘’‘Returns a queue object’‘’
112 from .queues import SimpleQueue
→ 113 return SimpleQueue(ctx=self.get_context())

File ~/.venv/lib/python3.10/site-packages/multiprocess/queues.py:344, in SimpleQueue.init(self, ctx)
342 def init(self, *, ctx):
343 self._reader, self._writer = connection.Pipe(duplex=False)
→ 344 self._rlock = ctx.Lock()
345 self._poll = self._reader.poll
346 if sys.platform == ‘win32’:

File ~/.venv/lib/python3.10/site-packages/multiprocess/context.py:68, in BaseContext.Lock(self)
66 ‘’‘Returns a non-recursive lock object’‘’
67 from .synchronize import Lock
—> 68 return Lock(ctx=self.get_context())

File ~/.venv/lib/python3.10/site-packages/multiprocess/synchronize.py:168, in Lock.init(self, ctx)
167 def init(self, *, ctx):
→ 168 SemLock.init(self, SEMAPHORE, 1, 1, ctx=ctx)

File ~/.venv/lib/python3.10/site-packages/multiprocess/synchronize.py:63, in SemLock.init(self, kind, value, maxvalue, ctx)
61 for i in range(100):
62 try:
—> 63 sl = self._semlock = _multiprocessing.SemLock(
64 kind, value, maxvalue, self._make_name(),
65 unlink_now)
66 except FileExistsError:
67 pass

PermissionError: [Errno 13] Permission denied

I solved it with linux - Python multiprocessing: Permission denied - Stack Overflow