Problems Subclassing Trainer Class for Custom Evaluation Loop

Hello Everybody,
While training my model with deepspeed on 4GPUs, I was trying to Inject some custom behaviour in the evaluation loop.
According to the Trainer docs under evaluate function it says.

You can also subclass and override this method to inject custom behavior

However when I tried doing this, I get the following Error:

Traceback (most recent call last):
  File "GPT2nr.py", line 109, in <module>
Traceback (most recent call last):
  File "GPT2nr.py", line 109, in <module>
    values = trainer.evaluate()
  File "/home/dagbert-skunkworks/src/skunkworks/models/GPT2Trainer.py", line 42, in evaluate
    values = trainer.evaluate()
  File "/home/dagbert-skunkworks/src/skunkworks/models/GPT2Trainer.py", line 42, in evaluate
    eval_loop = self.prediction_loop if self.args.use_legacy_prediction_loop else self.evaluation_loop
AttributeError: 'TrainingArguments' object has no attribute 'use_legacy_prediction_loop'
    eval_loop = self.prediction_loop if self.args.use_legacy_prediction_loop else self.evaluation_loop
AttributeError: 'TrainingArguments' object has no attribute 'use_legacy_prediction_loop'
Traceback (most recent call last):
  File "GPT2nr.py", line 109, in <module>
    values = trainer.evaluate()
  File "/home/dagbert-skunkworks/src/skunkworks/models/GPT2Trainer.py", line 42, in evaluate
    eval_loop = self.prediction_loop if self.args.use_legacy_prediction_loop else self.evaluation_loop
AttributeError: 'TrainingArguments' object has no attribute 'use_legacy_prediction_loop'
Traceback (most recent call last):
  File "GPT2nr.py", line 109, in <module>
    values = trainer.evaluate()
  File "/home/dagbert-skunkworks/src/skunkworks/models/GPT2Trainer.py", line 42, in evaluate
    eval_loop = self.prediction_loop if self.args.use_legacy_prediction_loop else self.evaluation_loop
AttributeError: 'TrainingArguments' object has no attribute 'use_legacy_prediction_loop'
Killing subprocess 136
Killing subprocess 137
Killing subprocess 138
Killing subprocess 139
Traceback (most recent call last):
  File "/usr/lib/python3.6/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/usr/lib/python3.6/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/usr/local/lib/python3.6/dist-packages/deepspeed/launcher/launch.py", line 171, in <module>
    main()
  File "/usr/local/lib/python3.6/dist-packages/deepspeed/launcher/launch.py", line 161, in main
    sigkill_handler(signal.SIGTERM, None)  # not coming back
  File "/usr/local/lib/python3.6/dist-packages/deepspeed/launcher/launch.py", line 139, in sigkill_handler
    raise subprocess.CalledProcessError(returncode=last_return_code, cmd=cmd)
subprocess.CalledProcessError: Command '['/usr/bin/python', '-u', 'GPT2nr.py', '--local_rank=3']' returned non-zero exit status 1.

GPT2Trainer.py

import math
import time
from typing import Dict, List, NamedTuple, Optional, Tuple, Union

import numpy as np
from datasets import Dataset
from torch.utils.data.dataloader import DataLoader
from transformers.trainer import Trainer

class EvalPrediction(NamedTuple):
    predictions: Union[np.ndarray, Tuple[np.ndarray]]
    label_ids: np.ndarray
    classes: List

class EvalLoopOutput(NamedTuple):
    predictions: Union[np.ndarray, Tuple[np.ndarray]]
    label_ids: Optional[np.ndarray]
    metrics: Optional[Dict[str, float]]
    num_samples: Optional[int]
    classes: Optional[List]




class GPT2Trainer(Trainer):
    def __init__(self, model,args = None,data_collator = None,train_dataset = None,eval_dataset = None,tokenizer = None,
            model_init = None,compute_metrics = None,callbacks = None,optimizers = (None,None)):
            
            super(GPT2Trainer,self).__init__(model, args, data_collator, train_dataset, eval_dataset, tokenizer, model_init,
              compute_metrics, callbacks, optimizers)
    
    def evaluate(self,
                eval_dataset: Optional[Dataset] = None,
                ignore_keys: Optional[List[str]] = None,
                metric_key_prefix: str = "eval",) -> Dict[str, float]:

        self._memory_tracker.start()

        eval_dataloader = self.get_eval_dataloader(eval_dataset)
        start_time = time.time()

        eval_loop = self.prediction_loop if self.args.use_legacy_prediction_loop else self.evaluation_loop
        output = eval_loop(
            eval_dataloader,
            description="Evaluation",
            # No point gathering the predictions if there are no metrics, otherwise we defer to
            # self.args.prediction_loss_only
            prediction_loss_only=True if self.compute_metrics is None else None,
            ignore_keys=ignore_keys,
            metric_key_prefix=metric_key_prefix,
        )

        total_batch_size = self.args.eval_batch_size * self.args.world_size
        
        output.metrics.update(
            speed_metrics(
                metric_key_prefix,
                start_time,
                num_samples=output.num_samples,
                num_steps=math.ceil(output.num_samples / total_batch_size),
            )
        )

        self.log(output.metrics)

        if DebugOption.TPU_METRICS_DEBUG in self.args.debug:
            # tpu-comment: Logging debug metrics for PyTorch/XLA (compile, execute times, ops, etc.)
            xm.master_print(met.metrics_report())

        self.control = self.callback_handler.on_evaluate(self.args, self.state, self.control, output.metrics)

        self._memory_tracker.stop_and_update_metrics(output.metrics)

        return output.metrics

    def evaluation_loop(
        self,
        dataloader: DataLoader,
        description: str,
        prediction_loss_only: Optional[bool] = None,
        ignore_keys: Optional[List[str]] = None,
        metric_key_prefix: str = "eval",
    ) -> EvalLoopOutput:
        """
        Prediction/evaluation loop, shared by :obj:`Trainer.evaluate()` and :obj:`Trainer.predict()`.

        Works both with or without labels.
        """
        prediction_loss_only = (
            prediction_loss_only if prediction_loss_only is not None else self.args.prediction_loss_only
        )

        # if eval is called w/o train init deepspeed here
        if self.args.deepspeed and not self.deepspeed:

            # XXX: eval doesn't have `resume_from_checkpoint` arg but we should be able to do eval
            # from the checkpoint eventually
            deepspeed_engine, _, _ = deepspeed_init(self, num_training_steps=0, resume_from_checkpoint=None)
            self.model = deepspeed_engine.module
            self.model_wrapped = deepspeed_engine
            self.deepspeed = deepspeed_engine
            # XXX: we don't need optim/sched for inference, but this needs to be sorted out, since
            # for example the Z3-optimizer is a must for zero3 to work even for inference - what we
            # don't need is the deepspeed basic optimizer which is self.optimizer.optimizer
            deepspeed_engine.optimizer.optimizer = None
            deepspeed_engine.lr_scheduler = None

        model = self._wrap_model(self.model, training=False)

        # if full fp16 is wanted on eval and this ``evaluation`` or ``predict`` isn't called while
        # ``train`` is running, halve it first and then put on device
        if not self.is_in_train and self.args.fp16_full_eval:
            model = model.half().to(self.args.device)

        batch_size = dataloader.batch_size

        logger.info(f"***** Running {description} *****")
        if isinstance(dataloader.dataset, collections.abc.Sized):
            logger.info(f"  Num examples = {self.num_examples(dataloader)}")
        else:
            logger.info("  Num examples: Unknown")
        logger.info(f"  Batch size = {batch_size}")

        model.eval()

        self.callback_handler.eval_dataloader = dataloader
        # Do this before wrapping.
        eval_dataset = dataloader.dataset

        if is_torch_tpu_available():
            dataloader = pl.ParallelLoader(dataloader, [self.args.device]).per_device_loader(self.args.device)

        if self.args.past_index >= 0:
            self._past = None

        # Initialize containers
        # losses/preds/labels on GPU/TPU (accumulated for eval_accumulation_steps)
        losses_host = None
        preds_host = None
        labels_host = None
        # losses/preds/labels on CPU (final containers)
        all_losses = None
        all_preds = None
        all_labels = None
        # Will be useful when we have an iterable dataset so don't know its length.

        observed_num_examples = 0
        # Main evaluation loop
        for step, inputs in enumerate(dataloader):
            # Update the observed num examples
            inputs, class_labels = inputs.get('input_ids'),inputs.get('labels')
            observed_batch_size = find_batch_size(inputs)
            if observed_batch_size is not None:
                observed_num_examples += observed_batch_size

            # Prediction step
            loss, logits, labels = self.prediction_step(model, inputs, prediction_loss_only, ignore_keys=ignore_keys)

            # Update containers on host
            if loss is not None:
                losses = self._nested_gather(loss.repeat(batch_size))
                losses_host = losses if losses_host is None else torch.cat((losses_host, losses), dim=0)
            if logits is not None:
                logits = self._pad_across_processes(logits)
                logits = self._nested_gather(logits)
                preds_host = logits if preds_host is None else nested_concat(preds_host, logits, padding_index=-100)
            if labels is not None:
                labels = self._pad_across_processes(labels)
                labels = self._nested_gather(labels)
                labels_host = labels if labels_host is None else nested_concat(labels_host, labels, padding_index=-100)
            self.control = self.callback_handler.on_prediction_step(self.args, self.state, self.control)

            # Gather all tensors and put them back on the CPU if we have done enough accumulation steps.
            if self.args.eval_accumulation_steps is not None and (step + 1) % self.args.eval_accumulation_steps == 0:
                if losses_host is not None:
                    losses = nested_numpify(losses_host)
                    all_losses = losses if all_losses is None else np.concatenate((all_losses, losses), axis=0)
                if preds_host is not None:
                    logits = nested_numpify(preds_host)
                    all_preds = logits if all_preds is None else nested_concat(all_preds, logits, padding_index=-100)
                if labels_host is not None:
                    labels = nested_numpify(labels_host)
                    all_labels = (
                        labels if all_labels is None else nested_concat(all_labels, labels, padding_index=-100)
                    )

                # Set back to None to begin a new accumulation
                losses_host, preds_host, labels_host = None, None, None

        if self.args.past_index and hasattr(self, "_past"):
            # Clean the state at the end of the evaluation loop
            delattr(self, "_past")

        # Gather all remaining tensors and put them back on the CPU
        if losses_host is not None:
            losses = nested_numpify(losses_host)
            all_losses = losses if all_losses is None else np.concatenate((all_losses, losses), axis=0)
        if preds_host is not None:
            logits = nested_numpify(preds_host)
            all_preds = logits if all_preds is None else nested_concat(all_preds, logits, padding_index=-100)
        if labels_host is not None:
            labels = nested_numpify(labels_host)
            all_labels = labels if all_labels is None else nested_concat(all_labels, labels, padding_index=-100)

        # Number of samples
        if not isinstance(eval_dataset, IterableDataset):
            num_samples = len(eval_dataset)
        # The instance check is weird and does not actually check for the type, but whether the dataset has the right
        # methods. Therefore we need to make sure it also has the attribute.
        elif isinstance(eval_dataset, IterableDatasetShard) and hasattr(eval_dataset, "num_examples"):
            num_samples = eval_dataset.num_examples
        else:
            num_samples = observed_num_examples

        # Number of losses has been rounded to a multiple of batch_size and in a distributed training, the number of
        # samplers has been rounded to a multiple of batch_size, so we truncate.
        if all_losses is not None:
            all_losses = all_losses[:num_samples]
        if all_preds is not None:
            all_preds = nested_truncate(all_preds, num_samples)
        if all_labels is not None:
            all_labels = nested_truncate(all_labels, num_samples)

        # Metrics!
        if self.compute_metrics is not None and all_preds is not None and all_labels is not None:
            metrics = self.compute_metrics(EvalPrediction(predictions=all_preds, label_ids=all_labels,classes=class_labels))
        else:
            metrics = {}

        # To be JSON-serializable, we need to remove numpy types or zero-d tensors
        metrics = denumpify_detensorize(metrics)

        if all_losses is not None:
            metrics[f"{metric_key_prefix}_loss"] = all_losses.mean().item()

        # Prefix all keys with metric_key_prefix + '_'
        for key in list(metrics.keys()):
            if not key.startswith(f"{metric_key_prefix}_"):
                metrics[f"{metric_key_prefix}_{key}"] = metrics.pop(key)

        return EvalLoopOutput(predictions=all_preds, label_ids=all_labels, metrics=metrics, num_samples=num_samples)

GPT2nr.py

import json
from typing import Dict, List, Optional

import deepspeed
import numpy as np
import pandas as pd
import torch
from datasets import Dataset, load_dataset, load_metric
from skunkworks.models.GPT2Trainer import (EvalLoopOutput, EvalPrediction,
                                                GPT2Trainer)
from transformers import (GPT2LMHeadModel, GPT2TokenizerFast, Trainer,
                          TrainingArguments)
from transformers.trainer_utils import get_last_checkpoint, is_main_process

def compute_metrics(eval_pred: EvalPrediction) -> Dict:
    logits, label_ids,clas_labels = eval_pred
    logits = torch.Tensor(logits)
    logits = torch.argmax(logits,axis=-1)
    predictions = tokenizer.batch_decode(logits)
    label_ids = tokenizer.batch_decode(label_ids)
    metric_values = metric.compute(predictions=predictions, references=label_ids)
    avg_divergence_df = pd.DataFrame({"labels":class_labels,"scores":metric_values['raw_scores']})
    return metric_values

deepspeed_dict = json.load(open('ds_config_zero2manual.json','r'))
metric = load_metric("../metrics/hf_metric.py")
block_size = 128

def tokenize_function(examples,field):
    return tokenizer(examples[field], padding="max_length", truncation=True)

def group_texts(examples):
    # Concatenate all texts.
    concatenated_examples = {k: sum(examples[k], []) for k in examples.keys()}
    total_length = len(concatenated_examples[list(examples.keys())[0]])
    # We drop the small remainder, we could add padding if the model supported it instead of this drop, you can
        # customize this part to your needs.
    total_length = (total_length // block_size) * block_size
    # Split by chunks of max_len.
    result = {
        k: [t[i : i + block_size] for i in range(0, total_length, block_size)]
        for k, t in concatenated_examples.items()
    }
    result["labels"] = result["input_ids"].copy()
    return result  


model = GPT2LMHeadModel.from_pretrained('gpt2-medium')
tokenizer = GPT2TokenizerFast.from_pretrained('gpt2-medium')
tokenizer.pad_token = tokenizer.eos_token


train_dataset = load_dataset('csv', data_files={'train': 'train.csv'})

test_dataset = load_dataset('csv', data_files={'test': 'test.csv'})

tokenized_train_dataset = train_dataset.map(lambda x: tokenize_function(x,'text'), batched=True, num_proc=4, remove_columns=["text"])
tokenized_train_dataset = tokenized_train_dataset.map(group_texts,batched=True,batch_size=1,num_proc=4)

tokenized_test_dataset = test_dataset.map(lambda x: tokenize_function(x,'data'), batched=True, num_proc=4, remove_columns=["data"])

training_args = TrainingArguments("test-trainer",per_device_train_batch_size=8,
                                    per_device_eval_batch_size=8,
                                    num_train_epochs=1,
                                    learning_rate=2e-5,
                                    weight_decay=0.01,
                                    eval_accumulation_steps=2,
                                    fp16=True,
                                    deepspeed=deepspeed_dict)


trainer = GPT2Trainer(
    model=model,args=training_args, 
    train_dataset=tokenized_train_dataset["train"],
     eval_dataset=tokenized_test_dataset["test"],
     tokenizer=tokenizer,
     compute_metrics=compute_metrics
     )

trainer.train()
values = trainer.evaluate()

Transformers Version: 4.5.2
Training GPUs: 4
Training GPU Model: Tesla T4

1 Like

It looks like you are not using the latest version of Transformers.