Typical EncoderDecoderModel that works on a Pre-coded Dataset
The code snippet snippet as below is frequently used to train an EncoderDecoderModel
from Huggingface’s transformer library
from transformers import EncoderDecoderModel
from transformers import PreTrainedTokenizerFast
multibert = EncoderDecoderModel.from_encoder_decoder_pretrained(
"bert-base-multilingual-uncased", "bert-base-multilingual-uncased"
)
tokenizer = PreTrainedTokenizerFast.from_pretrained("bert-base-multilingual-uncased")
...
And a pre-processed/coded dataset can be used to train the model as such, when using the wmt14
dataset:
import datasets
train_data = datasets.load_dataset("wmt14", "de-en", split="train")
val_data = datasets.load_dataset("wmt14", "de-en", split="validation[:10%]")
from functools import partial
def process_data_to_model_inputs(batch, encoder_max_length=512, decoder_max_length=512, batch_size=2):
inputs = tokenizer([segment["en"] for segment in batch['translation']],
padding="max_length", truncation=True, max_length=encoder_max_length)
outputs = tokenizer([segment["de"] for segment in batch['translation']],
padding="max_length", truncation=True, max_length=encoder_max_length)
batch["input_ids"] = inputs.input_ids
batch["attention_mask"] = inputs.attention_mask
batch["decoder_input_ids"] = outputs.input_ids
batch["decoder_attention_mask"] = outputs.attention_mask
batch["labels"] = outputs.input_ids.copy()
# because BERT automatically shifts the labels, the labels correspond exactly to `decoder_input_ids`.
# We have to make sure that the PAD token is ignored
batch["labels"] = [[-100 if token == tokenizer.pad_token_id else token for token in labels] for labels in batch["labels"]]
return batch
def munge_dataset_to_pacify_bert(dataset, encoder_max_length=512, decoder_max_length=512, batch_size=2):
bert_wants_to_see = ["input_ids", "attention_mask", "decoder_input_ids",
"decoder_attention_mask", "labels"]
_process_data_to_model_inputs = partial(process_data_to_model_inputs,
encoder_max_length=encoder_max_length,
decoder_max_length=decoder_max_length,
batch_size=batch_size
)
dataset = dataset.map(_process_data_to_model_inputs,
batched=True,
batch_size=batch_size
)
dataset.set_format(type="torch", columns=bert_wants_to_see)
return dataset
train_data = munge_dataset_to_pacify_bert(train_data)
val_data = munge_dataset_to_pacify_bert(val_data)
Then the training can be done easily as such:
from transformers import Seq2SeqTrainer, Seq2SeqTrainingArguments
# set training arguments - these params are not really tuned, feel free to change
training_args = Seq2SeqTrainingArguments(
output_dir="./",
evaluation_strategy="steps",
...
)
# instantiate trainer
trainer = Seq2SeqTrainer(
model=multibert,
tokenizer=tokenizer,
args=training_args,
train_dataset=train_data,
eval_dataset=val_data,
)
trainer.train()
A working example can be found on something like: Neural Plasticity - Bert2Bert on WMT14 | Kaggle
However, parallel data used to an EncoderDecoderModel usually exists as .txt
or .tsv
files, not a pre-coded dataset
Given a large .tsv
file (e.g. 1 billion lines), e.g.
hello world\tHallo Welt
how are you?\twie gehts?
...\t...
Step 1: we can convert into the parquet / pyarrow format, one can do something like:
import vaex # Using vaex
import sys
filename = "train.en-de.tsv"
df = vaex.from_csv(filename, sep="\t", header=None, names=["src", "trg"], convert=True, chunk_size=50_000_000)
df.export(f"{filename}.parquet")
Step 2: Then we will can read it into a Pyarrow table to fit into the datasets.Dataset
object and use the munge_dataset_to_pacify_bert()
as shown above, e.g
from datasets import Dataset, load_from_disk
import pyarrow as pa
_ds = Dataset(pa.compute.drop_null(pa.parquet.read_table('train.en-de.tsv.parquet')
_ds.save_to_disk('train.en-de.tsv.parquet.hfdataset')
_ds = load_from_disk('train.en-de.tsv.parquet.hfdataset')
train_data = munge_dataset_to_pacify_bert(_ds)
train_data.save_to_disk('train.en-de.tsv.parquet.hfdataset')
While the process above works well for small-ish dataset, e.g. 1-5 million lines of data, when the scale of the goes to 500 million to 1 billion, it seems like the last .save_to_disk()
function is no where in sight.
Breaking down the steps in the munge_dataset_to_pacify_bert()
, there are 2 sub-functions:
dataset.map(_process_data_to_model_inputs, batched=True, batch_size=batch_size)
dataset.set_format(type="torch", columns=bert_wants_to_see)
For the .map()
process, it’s possible to scale in parallel threads by specifying by
dataset.map(_process_data_to_model_inputs,
batched=True, batch_size=batch_size,
num_proc=32 # num of parallel threads.
)
And when I tried to process with
num_proc=32
batch_size=100
The .map()
function finishes the processing of 500 million lines in 18 hours of compute time on Intel Xeon E5-2686 @ 2.3GHz with 32 processor cores, optimally.
But somehow the .map()
function created 32 temp .arrow
files and 128 tmp...
binary files. Seemingly the last save_to_disk
function has been running for more than 10+ hours and have not finished combining the temp files in parts to save the final HF Dataset to disk.
Given the above context, my questions in parts are: