Hello Everybody,
While training my model with deepspeed on 4GPUs, I was trying to Inject some custom behaviour in the evaluation loop.
According to the Trainer docs under evaluate
function it says.
You can also subclass and override this method to inject custom behavior
However when I tried doing this, I get the following Error:
Traceback (most recent call last):
File "GPT2nr.py", line 109, in <module>
Traceback (most recent call last):
File "GPT2nr.py", line 109, in <module>
values = trainer.evaluate()
File "/home/dagbert-skunkworks/src/skunkworks/models/GPT2Trainer.py", line 42, in evaluate
values = trainer.evaluate()
File "/home/dagbert-skunkworks/src/skunkworks/models/GPT2Trainer.py", line 42, in evaluate
eval_loop = self.prediction_loop if self.args.use_legacy_prediction_loop else self.evaluation_loop
AttributeError: 'TrainingArguments' object has no attribute 'use_legacy_prediction_loop'
eval_loop = self.prediction_loop if self.args.use_legacy_prediction_loop else self.evaluation_loop
AttributeError: 'TrainingArguments' object has no attribute 'use_legacy_prediction_loop'
Traceback (most recent call last):
File "GPT2nr.py", line 109, in <module>
values = trainer.evaluate()
File "/home/dagbert-skunkworks/src/skunkworks/models/GPT2Trainer.py", line 42, in evaluate
eval_loop = self.prediction_loop if self.args.use_legacy_prediction_loop else self.evaluation_loop
AttributeError: 'TrainingArguments' object has no attribute 'use_legacy_prediction_loop'
Traceback (most recent call last):
File "GPT2nr.py", line 109, in <module>
values = trainer.evaluate()
File "/home/dagbert-skunkworks/src/skunkworks/models/GPT2Trainer.py", line 42, in evaluate
eval_loop = self.prediction_loop if self.args.use_legacy_prediction_loop else self.evaluation_loop
AttributeError: 'TrainingArguments' object has no attribute 'use_legacy_prediction_loop'
Killing subprocess 136
Killing subprocess 137
Killing subprocess 138
Killing subprocess 139
Traceback (most recent call last):
File "/usr/lib/python3.6/runpy.py", line 193, in _run_module_as_main
"__main__", mod_spec)
File "/usr/lib/python3.6/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/usr/local/lib/python3.6/dist-packages/deepspeed/launcher/launch.py", line 171, in <module>
main()
File "/usr/local/lib/python3.6/dist-packages/deepspeed/launcher/launch.py", line 161, in main
sigkill_handler(signal.SIGTERM, None) # not coming back
File "/usr/local/lib/python3.6/dist-packages/deepspeed/launcher/launch.py", line 139, in sigkill_handler
raise subprocess.CalledProcessError(returncode=last_return_code, cmd=cmd)
subprocess.CalledProcessError: Command '['/usr/bin/python', '-u', 'GPT2nr.py', '--local_rank=3']' returned non-zero exit status 1.
GPT2Trainer.py
import math
import time
from typing import Dict, List, NamedTuple, Optional, Tuple, Union
import numpy as np
from datasets import Dataset
from torch.utils.data.dataloader import DataLoader
from transformers.trainer import Trainer
class EvalPrediction(NamedTuple):
predictions: Union[np.ndarray, Tuple[np.ndarray]]
label_ids: np.ndarray
classes: List
class EvalLoopOutput(NamedTuple):
predictions: Union[np.ndarray, Tuple[np.ndarray]]
label_ids: Optional[np.ndarray]
metrics: Optional[Dict[str, float]]
num_samples: Optional[int]
classes: Optional[List]
class GPT2Trainer(Trainer):
def __init__(self, model,args = None,data_collator = None,train_dataset = None,eval_dataset = None,tokenizer = None,
model_init = None,compute_metrics = None,callbacks = None,optimizers = (None,None)):
super(GPT2Trainer,self).__init__(model, args, data_collator, train_dataset, eval_dataset, tokenizer, model_init,
compute_metrics, callbacks, optimizers)
def evaluate(self,
eval_dataset: Optional[Dataset] = None,
ignore_keys: Optional[List[str]] = None,
metric_key_prefix: str = "eval",) -> Dict[str, float]:
self._memory_tracker.start()
eval_dataloader = self.get_eval_dataloader(eval_dataset)
start_time = time.time()
eval_loop = self.prediction_loop if self.args.use_legacy_prediction_loop else self.evaluation_loop
output = eval_loop(
eval_dataloader,
description="Evaluation",
# No point gathering the predictions if there are no metrics, otherwise we defer to
# self.args.prediction_loss_only
prediction_loss_only=True if self.compute_metrics is None else None,
ignore_keys=ignore_keys,
metric_key_prefix=metric_key_prefix,
)
total_batch_size = self.args.eval_batch_size * self.args.world_size
output.metrics.update(
speed_metrics(
metric_key_prefix,
start_time,
num_samples=output.num_samples,
num_steps=math.ceil(output.num_samples / total_batch_size),
)
)
self.log(output.metrics)
if DebugOption.TPU_METRICS_DEBUG in self.args.debug:
# tpu-comment: Logging debug metrics for PyTorch/XLA (compile, execute times, ops, etc.)
xm.master_print(met.metrics_report())
self.control = self.callback_handler.on_evaluate(self.args, self.state, self.control, output.metrics)
self._memory_tracker.stop_and_update_metrics(output.metrics)
return output.metrics
def evaluation_loop(
self,
dataloader: DataLoader,
description: str,
prediction_loss_only: Optional[bool] = None,
ignore_keys: Optional[List[str]] = None,
metric_key_prefix: str = "eval",
) -> EvalLoopOutput:
"""
Prediction/evaluation loop, shared by :obj:`Trainer.evaluate()` and :obj:`Trainer.predict()`.
Works both with or without labels.
"""
prediction_loss_only = (
prediction_loss_only if prediction_loss_only is not None else self.args.prediction_loss_only
)
# if eval is called w/o train init deepspeed here
if self.args.deepspeed and not self.deepspeed:
# XXX: eval doesn't have `resume_from_checkpoint` arg but we should be able to do eval
# from the checkpoint eventually
deepspeed_engine, _, _ = deepspeed_init(self, num_training_steps=0, resume_from_checkpoint=None)
self.model = deepspeed_engine.module
self.model_wrapped = deepspeed_engine
self.deepspeed = deepspeed_engine
# XXX: we don't need optim/sched for inference, but this needs to be sorted out, since
# for example the Z3-optimizer is a must for zero3 to work even for inference - what we
# don't need is the deepspeed basic optimizer which is self.optimizer.optimizer
deepspeed_engine.optimizer.optimizer = None
deepspeed_engine.lr_scheduler = None
model = self._wrap_model(self.model, training=False)
# if full fp16 is wanted on eval and this ``evaluation`` or ``predict`` isn't called while
# ``train`` is running, halve it first and then put on device
if not self.is_in_train and self.args.fp16_full_eval:
model = model.half().to(self.args.device)
batch_size = dataloader.batch_size
logger.info(f"***** Running {description} *****")
if isinstance(dataloader.dataset, collections.abc.Sized):
logger.info(f" Num examples = {self.num_examples(dataloader)}")
else:
logger.info(" Num examples: Unknown")
logger.info(f" Batch size = {batch_size}")
model.eval()
self.callback_handler.eval_dataloader = dataloader
# Do this before wrapping.
eval_dataset = dataloader.dataset
if is_torch_tpu_available():
dataloader = pl.ParallelLoader(dataloader, [self.args.device]).per_device_loader(self.args.device)
if self.args.past_index >= 0:
self._past = None
# Initialize containers
# losses/preds/labels on GPU/TPU (accumulated for eval_accumulation_steps)
losses_host = None
preds_host = None
labels_host = None
# losses/preds/labels on CPU (final containers)
all_losses = None
all_preds = None
all_labels = None
# Will be useful when we have an iterable dataset so don't know its length.
observed_num_examples = 0
# Main evaluation loop
for step, inputs in enumerate(dataloader):
# Update the observed num examples
inputs, class_labels = inputs.get('input_ids'),inputs.get('labels')
observed_batch_size = find_batch_size(inputs)
if observed_batch_size is not None:
observed_num_examples += observed_batch_size
# Prediction step
loss, logits, labels = self.prediction_step(model, inputs, prediction_loss_only, ignore_keys=ignore_keys)
# Update containers on host
if loss is not None:
losses = self._nested_gather(loss.repeat(batch_size))
losses_host = losses if losses_host is None else torch.cat((losses_host, losses), dim=0)
if logits is not None:
logits = self._pad_across_processes(logits)
logits = self._nested_gather(logits)
preds_host = logits if preds_host is None else nested_concat(preds_host, logits, padding_index=-100)
if labels is not None:
labels = self._pad_across_processes(labels)
labels = self._nested_gather(labels)
labels_host = labels if labels_host is None else nested_concat(labels_host, labels, padding_index=-100)
self.control = self.callback_handler.on_prediction_step(self.args, self.state, self.control)
# Gather all tensors and put them back on the CPU if we have done enough accumulation steps.
if self.args.eval_accumulation_steps is not None and (step + 1) % self.args.eval_accumulation_steps == 0:
if losses_host is not None:
losses = nested_numpify(losses_host)
all_losses = losses if all_losses is None else np.concatenate((all_losses, losses), axis=0)
if preds_host is not None:
logits = nested_numpify(preds_host)
all_preds = logits if all_preds is None else nested_concat(all_preds, logits, padding_index=-100)
if labels_host is not None:
labels = nested_numpify(labels_host)
all_labels = (
labels if all_labels is None else nested_concat(all_labels, labels, padding_index=-100)
)
# Set back to None to begin a new accumulation
losses_host, preds_host, labels_host = None, None, None
if self.args.past_index and hasattr(self, "_past"):
# Clean the state at the end of the evaluation loop
delattr(self, "_past")
# Gather all remaining tensors and put them back on the CPU
if losses_host is not None:
losses = nested_numpify(losses_host)
all_losses = losses if all_losses is None else np.concatenate((all_losses, losses), axis=0)
if preds_host is not None:
logits = nested_numpify(preds_host)
all_preds = logits if all_preds is None else nested_concat(all_preds, logits, padding_index=-100)
if labels_host is not None:
labels = nested_numpify(labels_host)
all_labels = labels if all_labels is None else nested_concat(all_labels, labels, padding_index=-100)
# Number of samples
if not isinstance(eval_dataset, IterableDataset):
num_samples = len(eval_dataset)
# The instance check is weird and does not actually check for the type, but whether the dataset has the right
# methods. Therefore we need to make sure it also has the attribute.
elif isinstance(eval_dataset, IterableDatasetShard) and hasattr(eval_dataset, "num_examples"):
num_samples = eval_dataset.num_examples
else:
num_samples = observed_num_examples
# Number of losses has been rounded to a multiple of batch_size and in a distributed training, the number of
# samplers has been rounded to a multiple of batch_size, so we truncate.
if all_losses is not None:
all_losses = all_losses[:num_samples]
if all_preds is not None:
all_preds = nested_truncate(all_preds, num_samples)
if all_labels is not None:
all_labels = nested_truncate(all_labels, num_samples)
# Metrics!
if self.compute_metrics is not None and all_preds is not None and all_labels is not None:
metrics = self.compute_metrics(EvalPrediction(predictions=all_preds, label_ids=all_labels,classes=class_labels))
else:
metrics = {}
# To be JSON-serializable, we need to remove numpy types or zero-d tensors
metrics = denumpify_detensorize(metrics)
if all_losses is not None:
metrics[f"{metric_key_prefix}_loss"] = all_losses.mean().item()
# Prefix all keys with metric_key_prefix + '_'
for key in list(metrics.keys()):
if not key.startswith(f"{metric_key_prefix}_"):
metrics[f"{metric_key_prefix}_{key}"] = metrics.pop(key)
return EvalLoopOutput(predictions=all_preds, label_ids=all_labels, metrics=metrics, num_samples=num_samples)
GPT2nr.py
import json
from typing import Dict, List, Optional
import deepspeed
import numpy as np
import pandas as pd
import torch
from datasets import Dataset, load_dataset, load_metric
from skunkworks.models.GPT2Trainer import (EvalLoopOutput, EvalPrediction,
GPT2Trainer)
from transformers import (GPT2LMHeadModel, GPT2TokenizerFast, Trainer,
TrainingArguments)
from transformers.trainer_utils import get_last_checkpoint, is_main_process
def compute_metrics(eval_pred: EvalPrediction) -> Dict:
logits, label_ids,clas_labels = eval_pred
logits = torch.Tensor(logits)
logits = torch.argmax(logits,axis=-1)
predictions = tokenizer.batch_decode(logits)
label_ids = tokenizer.batch_decode(label_ids)
metric_values = metric.compute(predictions=predictions, references=label_ids)
avg_divergence_df = pd.DataFrame({"labels":class_labels,"scores":metric_values['raw_scores']})
return metric_values
deepspeed_dict = json.load(open('ds_config_zero2manual.json','r'))
metric = load_metric("../metrics/hf_metric.py")
block_size = 128
def tokenize_function(examples,field):
return tokenizer(examples[field], padding="max_length", truncation=True)
def group_texts(examples):
# Concatenate all texts.
concatenated_examples = {k: sum(examples[k], []) for k in examples.keys()}
total_length = len(concatenated_examples[list(examples.keys())[0]])
# We drop the small remainder, we could add padding if the model supported it instead of this drop, you can
# customize this part to your needs.
total_length = (total_length // block_size) * block_size
# Split by chunks of max_len.
result = {
k: [t[i : i + block_size] for i in range(0, total_length, block_size)]
for k, t in concatenated_examples.items()
}
result["labels"] = result["input_ids"].copy()
return result
model = GPT2LMHeadModel.from_pretrained('gpt2-medium')
tokenizer = GPT2TokenizerFast.from_pretrained('gpt2-medium')
tokenizer.pad_token = tokenizer.eos_token
train_dataset = load_dataset('csv', data_files={'train': 'train.csv'})
test_dataset = load_dataset('csv', data_files={'test': 'test.csv'})
tokenized_train_dataset = train_dataset.map(lambda x: tokenize_function(x,'text'), batched=True, num_proc=4, remove_columns=["text"])
tokenized_train_dataset = tokenized_train_dataset.map(group_texts,batched=True,batch_size=1,num_proc=4)
tokenized_test_dataset = test_dataset.map(lambda x: tokenize_function(x,'data'), batched=True, num_proc=4, remove_columns=["data"])
training_args = TrainingArguments("test-trainer",per_device_train_batch_size=8,
per_device_eval_batch_size=8,
num_train_epochs=1,
learning_rate=2e-5,
weight_decay=0.01,
eval_accumulation_steps=2,
fp16=True,
deepspeed=deepspeed_dict)
trainer = GPT2Trainer(
model=model,args=training_args,
train_dataset=tokenized_train_dataset["train"],
eval_dataset=tokenized_test_dataset["test"],
tokenizer=tokenizer,
compute_metrics=compute_metrics
)
trainer.train()
values = trainer.evaluate()
Transformers Version: 4.5.2
Training GPUs: 4
Training GPU Model: Tesla T4