I’m overriding the evaluation_loop
method for the Trainer
class, and trying to run model.generate()
in a distributed setting (sharded model with torchrun --nproc_per_node=4
), but get
RuntimeError: Expected all tensors to be on the same device, but found at least two devices, cuda:2 and cpu! (when checking argument for argument index in method wrapper_CUDA__index_select)
My evaluation loop looks like this:
def evaluation_loop(
self,
dataloader: DataLoader,
description: str,
prediction_loss_only: Optional[bool] = None,
ignore_keys: Optional[List[str]] = None,
metric_key_prefix: str = "eval",
) -> EvalLoopOutput:
"""
Prediction/evaluation loop, shared by `Trainer.evaluate()` and `Trainer.predict()`.
Works both with or without labels.
"""
args = self.args
prediction_loss_only = (
prediction_loss_only
if prediction_loss_only is not None
else args.prediction_loss_only
)
# if eval is called w/o train init deepspeed here
if args.deepspeed and not self.deepspeed:
# XXX: eval doesn't have `resume_from_checkpoint` arg but we should be able to do eval
# from the checkpoint eventually
deepspeed_engine, _, _ = deepspeed_init(
self, num_training_steps=0, resume_from_checkpoint=None, inference=True
)
self.model = deepspeed_engine.module
self.model_wrapped = deepspeed_engine
self.deepspeed = deepspeed_engine
model = self._wrap_model(self.model, training=False, dataloader=dataloader)
if not self.is_in_train:
if args.fp16_full_eval:
model = model.to(dtype=torch.float16, device=args.device)
elif args.bf16_full_eval:
model = model.to(dtype=torch.bfloat16, device=args.device)
batch_size = self.args.eval_batch_size
logger.info(f"***** Running {description} *****")
if has_length(dataloader):
logger.info(f" Num examples = {self.num_examples(dataloader)}")
else:
logger.info(" Num examples: Unknown")
logger.info(f" Batch size = {batch_size}")
model.eval()
self.callback_handler.eval_dataloader = dataloader
# Do this before wrapping.
eval_dataset = getattr(dataloader, "dataset", None)
if args.past_index >= 0:
self._past = None
observed_num_examples = 0
# Main evaluation loop
for step, inputs in enumerate(dataloader):
# Update the observed num examples
observed_batch_size = find_batch_size(inputs)
if observed_batch_size is not None:
observed_num_examples += observed_batch_size
# For batch samplers, batch_size is not known by the dataloader in advance.
if batch_size is None:
batch_size = observed_batch_size
# Update containers on host
self.control = self.callback_handler.on_prediction_step(
args, self.state, self.control
)
output = model.generate(
inputs["input_ids"],
max_length=2048,
do_sample=True,
top_k=50,
top_p=0.95,
temperature=0.8,
num_return_sequences=1,
)
decoded = self.tokenizer.batch_decode(output, skip_special_tokens=True)
for i, d in enumerate(decoded):
print(f"Input: {inputs['input_ids'][i]}")
print(f"Output: {d}")
return EvalLoopOutput(
predictions=None,
label_ids=None,
metrics=None,
num_samples=None,
)
and torchrun
settings look like this
torchrun --nproc_per_node=4 --master_port=1111 train.py \
--model_name_or_path $model_name_or_path \
--data_path $train_path \
--eval_path $val_path \
--bf16 True \
--output_dir $output_dir \
--num_train_epochs 3 \
--per_device_train_batch_size 2 \
--per_device_eval_batch_size 2 \
--gradient_accumulation_steps 8 \
--evaluation_strategy "steps" \
--eval_steps 1 \
--save_strategy "steps" \
--save_steps 20 \
--save_total_limit 15 \
--learning_rate 2e-5 \
--weight_decay 0. \
--warmup_ratio 0.03 \
--lr_scheduler_type "cosine" \
--logging_steps 1 \
--fsdp "full_shard auto_wrap" \
--fsdp_transformer_layer_cls_to_wrap 'LLaMADecoderLayer' \
--tf32 True
How to implement properly?