import random
import pandas as pd
import numpy as np
import torch
from sklearn.metrics import accuracy_score
from torch.utils.data import Dataset, DataLoader
from transformers import BartTokenizer, BartForConditionalGeneration, Trainer, TrainingArguments
from transformers import EarlyStoppingCallback, AutoTokenizer, AutoModelForSeq2SeqLM
from transformers import AutoModelForCausalLM, BitsAndBytesConfig
from peft import LoraConfig, get_peft_model
import torch.nn as nn
import torch.nn.functional as F
import re
import logging
import os
from collections import Counter
# Disable unnecessary logging
logging.basicConfig(level=logging.ERROR)
# Set seed for reproducibility
def set_seed(seed):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(seed)
# Custom Dataset class for PyTorch
class CustomDataset(Dataset):
def __init__(self, input_ids, attention_mask, labels):
self.input_ids = input_ids
self.attention_mask = attention_mask
self.labels = labels
def __len__(self):
return len(self.input_ids)
def __getitem__(self, idx):
return {
'input_ids': self.input_ids[idx],
'attention_mask': self.attention_mask[idx],
'labels': self.labels[idx]
}
# Class for Model Training and Evaluation
class ModelTrainer:
def __init__(self, model_name):
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.device = "cuda" if torch.cuda.is_available() else "cpu"
# Configure 4-bit quantization
quantization_config = BitsAndBytesConfig(load_in_4bit=True)
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
quantization_config=quantization_config,
torch_dtype=torch.float32,
device_map="auto"
)
print(self.model)
if self.tokenizer.pad_token is None:
self.tokenizer.add_special_tokens({'pad_token': '[PAD]'})
self.model.resize_token_embeddings(len(self.tokenizer))
# Apply LoRA adapter
lora_config = LoraConfig(
r=8, # Low rank dimension
lora_alpha=16, # Scaling parameter
target_modules=["q_proj", "v_proj"], # Target LoRA modules
lora_dropout=0.1, # Dropout for LoRA layers
bias="none" # No bias for LoRA
)
self.model = get_peft_model(self.model, lora_config)
print("LoRA adapter has been added to the model.")
def prepare_data(self, df):
# Tokenize the main dataset
tokenized_data = self.tokenizer(
df['question'].tolist(),
padding=True,
truncation=True,
return_tensors="pt"
)
# Tokenize the labels
ans_list = [str(element) for element in df['answer'].tolist()]
labels = self.tokenizer(
ans_list,
padding=True,
truncation=True,
return_tensors="pt"
)['input_ids']
# Create a dataset from the tokenized data
dataset = CustomDataset(
input_ids=tokenized_data['input_ids'],
attention_mask=tokenized_data['attention_mask'],
labels=labels
)
return dataset
def train(self, train_dataset, eval_dataset, output_dir="./results"):
print("Starting training...")
training_args = TrainingArguments(
output_dir=output_dir,
per_device_train_batch_size=4,
per_device_eval_batch_size=4,
num_train_epochs=4,
save_steps=20000,
logging_dir="./logs",
eval_strategy="steps",
eval_steps=2000,
logging_steps=5000,
load_best_model_at_end=True,
logging_first_step=True,
report_to="none"
)
trainer = Trainer(
model=self.model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
callbacks=[EarlyStoppingCallback(early_stopping_patience=3, early_stopping_threshold=0.02)]
)
trainer.train()
print("Training complete!")
def evaluate(self, eval_dataset):
print("Starting evaluation...")
eval_loader = DataLoader(eval_dataset, batch_size=2, shuffle=False)
self.model.eval()
predicted_labels = []
true_labels = []
for batch in eval_loader:
input_ids = batch['input_ids'].to(self.device)
attention_mask = batch['attention_mask'].to(self.device)
with torch.no_grad():
outputs = self.model.generate(input_ids=input_ids, attention_mask=attention_mask, max_new_tokens=1024)
predicted_texts = [self.tokenizer.decode(output, skip_special_tokens=True) for output in outputs]
predicted_labels.extend(predicted_texts)
true_labels.extend([self.tokenizer.decode(label, skip_special_tokens=True) for label in batch['labels']])
correct = sum(np.array_equal(t, p) for t, p in zip(true_labels, predicted_labels))
accuracy = (correct / len(true_labels)) * 100
print(f"Evaluation complete with accuracy: {accuracy:.2f}%")
return accuracy
def save_model(self, output_path):
print(f"Saving model to {output_path}...")
self.model.save_pretrained(output_path)
self.tokenizer.save_pretrained(output_path)
print("Model saved successfully!")
# Main function
def main(input_data_path):
model_name = "microsoft/phi-2"
seed = 123
output_dir = "result"
set_seed(seed)
# Ensure output directory exists
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# Initialize Model Trainer with the specified model name
model_trainer = ModelTrainer(model_name=model_name)
# Load the input data
input_data = pd.read_csv(input_data_path)
print(input_data.shape[0])
eval_data = input_data.sample(frac=0.2, random_state=seed)
train_data = input_data.drop(eval_data.index)
print(train_data.shape[0])
# Prepare datasets for model training
train_dataset = model_trainer.prepare_data(train_data)
eval_dataset = model_trainer.prepare_data(eval_data)
# Train the model
model_trainer.train(train_dataset, eval_dataset, output_dir=output_dir)
# Evaluate the model on the evaluation dataset
accuracy = model_trainer.evaluate(eval_dataset)
print(f"Final Evaluation Accuracy: {accuracy:.2f}")
# Save model to Google Drive
google_drive_path = "/content/drive/MyDrive/LAT_Trained/Phi_2_LAT"
model_trainer.save_model(google_drive_path)
if __name__ == "__main__":
main("/content/drive/MyDrive/LAT dataset/LAT_Training_dataset.csv")
This is my code, the size of my input dataset is 400,000 samples. I am getting the following error:
ValueError: Expected input batch_size (132) to match target batch_size (0).
Can anyone help me to understand why it is the case, and point out solution?