How to efficiently convert a large parallel corpus to a Huggingface dataset to train an EncoderDecoderModel?

Typical EncoderDecoderModel that works on a Pre-coded Dataset

The code snippet snippet as below is frequently used to train an EncoderDecoderModel from Huggingface’s transformer library

from transformers import EncoderDecoderModel
from transformers import PreTrainedTokenizerFast

multibert = EncoderDecoderModel.from_encoder_decoder_pretrained(
    "bert-base-multilingual-uncased", "bert-base-multilingual-uncased"
)


tokenizer = PreTrainedTokenizerFast.from_pretrained("bert-base-multilingual-uncased")

...

And a pre-processed/coded dataset can be used to train the model as such, when using the wmt14 dataset:

import datasets

train_data = datasets.load_dataset("wmt14", "de-en", split="train")
val_data = datasets.load_dataset("wmt14", "de-en", split="validation[:10%]")


from functools import partial

def process_data_to_model_inputs(batch, encoder_max_length=512, decoder_max_length=512, batch_size=2): 
    inputs = tokenizer([segment["en"] for segment in batch['translation']], 
                       padding="max_length", truncation=True, max_length=encoder_max_length)
    outputs = tokenizer([segment["de"] for segment in batch['translation']], 
                       padding="max_length", truncation=True, max_length=encoder_max_length)


    batch["input_ids"] = inputs.input_ids
    batch["attention_mask"] = inputs.attention_mask
    batch["decoder_input_ids"] = outputs.input_ids
    batch["decoder_attention_mask"] = outputs.attention_mask
    batch["labels"] = outputs.input_ids.copy()

    # because BERT automatically shifts the labels, the labels correspond exactly to `decoder_input_ids`. 
    # We have to make sure that the PAD token is ignored
    batch["labels"] = [[-100 if token == tokenizer.pad_token_id else token for token in labels] for labels in batch["labels"]]
    return batch


def munge_dataset_to_pacify_bert(dataset, encoder_max_length=512, decoder_max_length=512, batch_size=2):
    bert_wants_to_see = ["input_ids", "attention_mask", "decoder_input_ids", 
                         "decoder_attention_mask", "labels"]
    
    _process_data_to_model_inputs = partial(process_data_to_model_inputs, 
                                                encoder_max_length=encoder_max_length, 
                                                decoder_max_length=decoder_max_length, 
                                                batch_size=batch_size
                                           )
    dataset = dataset.map(_process_data_to_model_inputs, 
                           batched=True, 
                           batch_size=batch_size
                          )
    dataset.set_format(type="torch", columns=bert_wants_to_see)
    return dataset

train_data = munge_dataset_to_pacify_bert(train_data)
val_data = munge_dataset_to_pacify_bert(val_data)

Then the training can be done easily as such:

from transformers import Seq2SeqTrainer, Seq2SeqTrainingArguments


# set training arguments - these params are not really tuned, feel free to change
training_args = Seq2SeqTrainingArguments(
    output_dir="./",
    evaluation_strategy="steps",
    ...
)


# instantiate trainer
trainer = Seq2SeqTrainer(
    model=multibert,
    tokenizer=tokenizer,
    args=training_args,
    train_dataset=train_data,
    eval_dataset=val_data,
)

trainer.train()

A working example can be found on something like: Neural Plasticity - Bert2Bert on WMT14 | Kaggle

However, parallel data used to an EncoderDecoderModel usually exists as .txt or .tsv files, not a pre-coded dataset

Given a large .tsv file (e.g. 1 billion lines), e.g.

hello world\tHallo Welt
how are you?\twie gehts?
...\t...

Step 1: we can convert into the parquet / pyarrow format, one can do something like:

import vaex  # Using vaex 
import sys

filename = "train.en-de.tsv"

df = vaex.from_csv(filename, sep="\t", header=None, names=["src", "trg"], convert=True, chunk_size=50_000_000)

df.export(f"{filename}.parquet")

Step 2: Then we will can read it into a Pyarrow table to fit into the datasets.Dataset object and use the munge_dataset_to_pacify_bert() as shown above, e.g

from datasets import Dataset, load_from_disk
import pyarrow as pa

_ds = Dataset(pa.compute.drop_null(pa.parquet.read_table('train.en-de.tsv.parquet')
_ds.save_to_disk('train.en-de.tsv.parquet.hfdataset')

_ds = load_from_disk('train.en-de.tsv.parquet.hfdataset')

train_data = munge_dataset_to_pacify_bert(_ds)

train_data.save_to_disk('train.en-de.tsv.parquet.hfdataset')

While the process above works well for small-ish dataset, e.g. 1-5 million lines of data, when the scale of the goes to 500 million to 1 billion, it seems like the last .save_to_disk() function is no where in sight.

Breaking down the steps in the munge_dataset_to_pacify_bert(), there are 2 sub-functions:

  • dataset.map(_process_data_to_model_inputs, batched=True, batch_size=batch_size)
  • dataset.set_format(type="torch", columns=bert_wants_to_see)

For the .map() process, it’s possible to scale in parallel threads by specifying by

dataset.map(_process_data_to_model_inputs, 
    batched=True, batch_size=batch_size, 
    num_proc=32  # num of parallel threads.
    )

And when I tried to process with

  • num_proc=32
  • batch_size=100

The .map() function finishes the processing of 500 million lines in 18 hours of compute time on Intel Xeon E5-2686 @ 2.3GHz with 32 processor cores, optimally.

But somehow the .map() function created 32 temp .arrow files and 128 tmp... binary files. Seemingly the last save_to_disk function has been running for more than 10+ hours and have not finished combining the temp files in parts to save the final HF Dataset to disk.


Given the above context, my questions in parts are:

Question (Part 1): When the mapping function ends and created the temp .arrow and tmp... files, is there a way to read these individually instead of try to save them into a final directory using the save_to_disk() function?


Question (Part 2): Why is the save_to_disk() function so slow after the mapping and how can the mapped processed data be saved in a faster manner?


Question (Part 3): Is there a way to avoid the .set_format() function after the .map() and make it part of the _process_data_to_model_inputs function?


Also asked on python - How to efficiently convert a large parallel corpus to a Huggingface dataset to train an EncoderDecoderModel? - Stack Overflow

Bumping the no. of hours the save_to_disk, 42 hours later, it’s still trying to save…

Killed the process after it ran for >60 hours, now I’m trying to rerun all the steps with some time profiling, with a 500M lines parquet file.

# Reading a parquet file and drop some empty lines, if any.
# Took ~415 secs, to load a parquet, drop null and resave it.
_ds = Dataset(pa.compute.drop_null(pa.parquet.read_table('train.en-de.tsv.parquet')
_ds.save_to_disk('train.en-de.tsv.parquet.hfdataset-1')
# Reloading the dataset
# Took ~524 secs
_ds = load_from_disk('train.en-de.tsv.parquet.hfdataset-1')

Now, I’m re-running the munge_dataset_to_pacify_bert with .map() without the set_format() and timing it:

# Running the .map() without .set_format()
def munge_dataset_to_pacify_bert(dataset, encoder_max_length=512, decoder_max_length=512, batch_size=2):
    _process_data_to_model_inputs = partial(process_data_to_model_inputs, 
                                                encoder_max_length=encoder_max_length, 
                                                decoder_max_length=decoder_max_length, 
                                                batch_size=batch_size
                                           )
    dataset = dataset.map(_process_data_to_model_inputs, 
                           batched=True, 
                           batch_size=batch_size
                          )
    return dataset

# Took ??? secs, will update the number after it's finished.
train_data = process_dataset(_ds)

# Saving the processed data. 
# Took ??? secs.
train_data.save("train.en-de.tsv.parquet.hfdataset-2")

# Doing the set_format
# Took ??? secs.
bert_wants_to_see = ["input_ids", "attention_mask", "decoder_input_ids", 
                         "decoder_attention_mask", "labels"]
train_data = train_data.set_format(type="torch", columns=bert_wants_to_see)


# Saving the final dataset.
# Took ??? secs
train_data.save("train.en-de.tsv.parquet.hfdataset-3")

Hi ! datasets can read TSV files, so if you create a directory data/ containing your TSV file you can do

from datasets import load_dataset
ds = load_dataset("path/to/data")
ds = munge_dataset_to_pacify_bert(ds)

and the next time your re-run this code, it will reload from the cache.

Also note that running set_format is instantaneous - it simply changes the output format of the dataset, not the data itself.

Thanks for the note on the tsv file!

I’ll try to re-profile the timings again, to compare:

  • Read tsv → munge_dataset_to_pacify_bert
  • Vaex → read parquet → munge_dataset_to_pacify_bert
  • Read tsv → munge_dataset_to_pacify_bert → set_format → save
  • Vaex → read parquet → munge_dataset_to_pacify_bert → set_format → save
  • Read tsv → munge_dataset_to_pacify_bert → save
  • Vaex → read parquet → munge_dataset_to_pacify_bert → save

After

ds = munge_dataset_to_pacify_bert(ds)

should there be another .save() operation?

ds = munge_dataset_to_pacify_bert(ds)
ds.save("path/to/data")

The save_to_disk operation is single processed, so yea it can be much slower that map that works in parallel.

After more experiments and profiling, I am giving up on using the datasets and directly using DataPipe to connect it to Huggingface Trainer, e.g. PyTorch DataPipe + HuggingFace Trainer | Kaggle

If anyone knows a better way to handle large datasets, please do let me know too. Thank you in advance!

1 Like

Not that you can also load your dataset in streaming mode if you pass streaming=True to load_dataset. You can use the same map functions you used already, but everything will be computed on-the-fly like a torch DataPipe.

This will save you a lot of time and disk space :wink:

1 Like

Thank you for the streaming=True tip!!

That really saved TBs of data that would otherwise be mapped and temporarily saved, given my dataset =)

I’m not sure why all my posts were “flagged as spam by community”, so instead of sharing the kaggle link, here’s a working example for other future readers in markdown:

from collections import OrderedDict

import torch
from torchdata.datapipes.iter import IterDataPipe, IterableWrapper

from transformers import EncoderDecoderModel
from transformers import PreTrainedTokenizerFast

multibert = EncoderDecoderModel.from_encoder_decoder_pretrained(
    "bert-base-multilingual-uncased", "bert-base-multilingual-uncased"
)


​
tokenizer = PreTrainedTokenizerFast.from_pretrained("bert-base-multilingual-uncased")
​
tokenizer.bos_token = tokenizer.cls_token
tokenizer.eos_token = tokenizer.sep_token
tokenizer.add_special_tokens({'pad_token': '[PAD]'})

# set special tokens
multibert.config.decoder_start_token_id = tokenizer.bos_token_id
multibert.config.eos_token_id = tokenizer.eos_token_id
multibert.config.pad_token_id = tokenizer.pad_token_id

# sensible parameters for beam search
multibert.config.vocab_size = multibert.config.decoder.vocab_size
multibert.config.max_length = 142
multibert.config.min_length = 56
multibert.config.no_repeat_ngram_size = 3
multibert.config.early_stopping = True
multibert.config.length_penalty = 2.0
multibert.config.num_beams = 4

from functools import partial

bert_wants_to_see = ["input_ids", "attention_mask", "decoder_input_ids", 
                     "decoder_attention_mask", "labels"]

def process_data_to_model_inputs(batch, encoder_max_length=512, decoder_max_length=512): 
    inputs = tokenizer(batch["SRC"], padding="max_length",
                       truncation=True, max_length=encoder_max_length)
    outputs = tokenizer(batch["TRG"], padding="max_length", 
                        truncation=True, max_length=decoder_max_length)


    batch["input_ids"] = inputs.input_ids
    batch["attention_mask"] = inputs.attention_mask
    batch["decoder_input_ids"] = outputs.input_ids
    batch["decoder_attention_mask"] = outputs.attention_mask
    batch["labels"] = outputs.input_ids.copy()

    # because BERT automatically shifts the labels, the labels correspond exactly to `decoder_input_ids`. 
    # We have to make sure that the PAD token is ignored
    batch["labels"] = [[-100 if token == tokenizer.pad_token_id else token for token in labels] for labels in batch["labels"]]
    
    return {k:v for k,v in batch.items() if k in bert_wants_to_see}


def munge_dataset_to_pacify_bert(dataset, encoder_max_length=512, decoder_max_length=512):

    
    _process_data_to_model_inputs = partial(process_data_to_model_inputs, 
                                                encoder_max_length=encoder_max_length, 
                                                decoder_max_length=decoder_max_length, 
                                           )
    dataset = dataset.map(_process_data_to_model_inputs, 
                           batched=True)
    ##dataset.set_format(type="torch", columns=bert_wants_to_see)
    return dataset

from datasets import load_dataset

# tatoeba-sentpairs.tsv is a pretty large file.
ds = load_dataset("csv", data_files="../input/tatoeba/tatoeba-sentpairs.tsv", 
                  streaming=True, delimiter="\t", split="train")

train_data = munge_dataset_to_pacify_bert(ds)

flores_ende = load_dataset("facebook/flores", "eng_Latn-deu_Latn", streaming=True, 
                          split="dev")

flores_ende = flores_ende.rename_column('sentence_eng_Latn', 'SRC')
flores_ende = flores_ende.rename_column('sentence_deu_Latn', 'TRG')

valid_data = flores_ende.map(munge_dataset_to_pacify_bert)

from transformers import Seq2SeqTrainer, Seq2SeqTrainingArguments

import os
os.environ["WANDB_DISABLED"] = "true"

batch_size = 1

# set training arguments - these params are not really tuned, feel free to change
training_args = Seq2SeqTrainingArguments(
    output_dir="./",
    evaluation_strategy="steps",
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    predict_with_generate=True,
    logging_steps=2,  # set to 1000 for full training
    save_steps=16,    # set to 500 for full training
    eval_steps=4,     # set to 8000 for full training
    warmup_steps=1,   # set to 2000 for full training
    max_steps=16,     # delete for full training
    # overwrite_output_dir=True,
    save_total_limit=1,
    #fp16=True, 
)


# instantiate trainer
trainer = Seq2SeqTrainer(
    model=multibert,
    tokenizer=tokenizer,
    args=training_args,
    train_dataset=IterableWrapper(train_data),
    eval_dataset=IterableWrapper(valid_data),
)

trainer.train()
3 Likes