Hello Team,
I have a use where I need to compare LLM outputs of fine tuned model with a baseline model. In this case the baseline is codellama and a fine tuned test model. And I also need to run it on hundreds of prompts. I do have access to 4 gpus GCP box.
Initially I tried to load the evaluate dataset in pandas dataframe and used the apply method per row to apply model generate. I also used device_map auto with each model so that the models can get distributed over multiple gpus.
However, I believe we can scale up better with accelerate and spawn multiple processes to do the same instead of sequentially processing each row with pandas. I read the Distributed Inference with 🤗 Accelerate
Below is my implementation, This runs as desired. It would be great if you could review if it is correct design wise. I hope I can publish this as a tutorial or a blog to help others.
import os
import pandas as pd
from accelerate import Accelerator
from datasets import load_dataset
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
def transform_user_question_to_chatml_format(user_question):
user_part = [
{"role": "user", "content": user_question},
]
return user_part
def main():
test_model_path = "asaha-cdcp/codellama-instruct-oasst-cdcp-tune-pack-false"
reference_model_path = "codellama/CodeLlama-7b-Instruct-hf"
eval_question_file = (
"/home/anindya/training/chat/datasets/eval_questions.csv"
)
eval_response_file = (
"/home/anindya/training/chat/datasets/test_responses.csv"
)
question_field = "question"
accelerator = Accelerator()
eval_qs_ds = load_dataset("csv", data_files=eval_question_file)
test_tokenizer = AutoTokenizer.from_pretrained(test_model_path)
test_model = AutoModelForCausalLM.from_pretrained(
test_model_path, torch_dtype = torch.float16, device_map='auto'
)
reference_tokenizer = AutoTokenizer.from_pretrained(reference_model_path)
reference_model = AutoModelForCausalLM.from_pretrained(
reference_model_path, torch_dtype = torch.float16, device_map='auto'
)
#no need to prepare since it is inference only
#test_model = accelerator.prepare(test_model)
#reference_model = accelerator.prepare(reference_model)
with accelerator.main_process_first():
def preprocess(samples):
batch = []
for user_question in samples[question_field]:
batch.append(
test_tokenizer.apply_chat_template(
transform_user_question_to_chatml_format(user_question),
tokenize=False,
)
)
return {"text": batch}
eval_qs_ds = eval_qs_ds.map(
preprocess,
batched=True,
remove_columns=eval_qs_ds["train"].column_names,
desc=f"apply chat template to dataset",
)
with accelerator.split_between_processes(eval_qs_ds["train"]["text"]) as processed_prompts:
print(f"Processed prompts in Process {accelerator.process_index}:")
print(f"{processed_prompts}")
accelerator.wait_for_everyone()
# put models into eval mode
test_model.eval()
reference_model.eval()
with accelerator.split_between_processes(eval_qs_ds["train"]["text"]) as processed_prompts:
#print(processed_prompts)
if not test_tokenizer.pad_token:
test_tokenizer.pad_token = reference_tokenizer.eos_token
input_ids = test_tokenizer(processed_prompts, padding=True, truncation=True, return_tensors="pt").to(accelerator.device)
# Generate text from test model
with torch.no_grad(): # Disable gradient calculation for efficiency
test_outputs = test_model.generate(
**input_ids,
max_new_tokens=256,
num_return_sequences=1,
no_repeat_ngram_size=2,
)
# Decode the output text
test_model_generated_response = test_tokenizer.batch_decode(
test_outputs.cpu(), skip_special_tokens=False
)
print(test_model_generated_response)
with accelerator.split_between_processes(eval_qs_ds["train"]["text"]) as processed_prompts:
#print(processed_prompts)
if not reference_tokenizer.pad_token:
reference_tokenizer.pad_token = reference_tokenizer.eos_token
input_ids = reference_tokenizer(processed_prompts, padding=True, truncation=True, max_length=512, return_tensors="pt").to(accelerator.device)
# Generate text from reference model
with torch.no_grad(): # Disable gradient calculation for efficiency
reference_outputs = reference_model.generate(
**input_ids,
max_new_tokens=256,
num_return_sequences=1,
no_repeat_ngram_size=2,
)
# should I batch decode here ? inside no grad ?
# or should I batch decode here ? outside the no grad ?
reference_model_generated_response = reference_tokenizer.batch_decode(
reference_outputs.cpu(), skip_special_tokens=False
)
# should I create a partial file with process_index e.g.output_{accelerator.process_index}.csv ?
# or can I leverage accelerator.gather(reference_model_generated_response) here ?
accelerator.wait_for_everyone()
# and then merge here
with accelerator.is_main_process():
# read individual files output_{i}.csv and merge
pass
accelerator.wait_for_everyone()
accelerator.print('DONE!')
if __name__ == "__main__":
hub_token = "<HF_TOKEN>"
os.environ["HUGGING_FACE_HUB_TOKEN"] = hub_token
main()
I have some doubts on how I can collate the outputs from the two models as two new columns and add it to the incoming dataset.
-
Should we use
accelerator.gather(...)
to collect the reference_outputs and test_outputs? Does it apply in this case ? If yes, should it be done inside thetorch.no_grad()
block ? or outside ? In this example transformers/examples/pytorch/translation/run_translation_no_trainer.py at 976189a6df796a2ff442dd81b022626c840d8c27 · huggingface/transformers · GitHub it is done inside the no_grad block while in transformers/examples/pytorch/language-modeling/run_clm_no_trainer.py at 5c7e11e01012112498ece9ee4ab3894c2a702195 · huggingface/transformers · GitHub it is done outside the no_grad block. Which one is correct ? -
Or Should we write an intermediate temporary file per process, after
accelerator.wait_for_everyone()
, on the main process merge all the partial files into one file. I have added comments on my code to make it clear. -
Here two models are running sequentially and I have to create the two tokenizers. They tokenize the same prompt twice. I cannot use the tokenized output from
test_tokenizer
and use in thereference_model.generate(...)
method of test model and reference model both, I get a cuda error. Is there a way to make these two model generation run in parallel too ? -
What happens if there are thousands of such prompts to evaluate ? Is it possible to splitting and streaming instead of loading all the data at once and delegating it to each process ? Could you suggest a scalable strategy ?
I deliberately disabled accelerator.prepare()
for the models because we are just doing inference and there is nothing to sync across devices. Is my understanding right ?