/scripts/evaluate_qwen_with_metrics.py
“”"
Evaluate a Causal LM (Qwen or similar) with:
- DPO loss on pairwise data (prompt, chosen, rejected)
- Cross-entropy loss & Perplexity on single-reference data (prompt, response)
- Optional text-generation metrics: ROUGE-1/2/L and BLEU-4 (no external deps)
Usage (SFT/perplexity + metrics):
python evaluate_qwen_with_metrics.py
–mode perplexity --model your/model --eval_jsonl valid.jsonl
–metrics all --batch_size 4 --max_new_tokens 256
Usage (DPO eval + metrics vs chosen):
python evaluate_qwen_with_metrics.py
–mode dpo --model your/model --eval_jsonl pairwise.jsonl
–metrics rouge --dpo_target_field chosen --batch_size 4
“”"
from future import annotations
import argparse
import json
import math
import os
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Sequence, Tuple
import torch
from torch.utils.data import Dataset
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
TrainingArguments,
DataCollatorForLanguageModeling,
Trainer,
)
Optional import for DPO training/eval
try:
from trl import DPOTrainer, DPOConfig
_HAS_TRL = True
except Exception:
_HAS_TRL = False
----------------------------- Data -----------------------------
class PairwiseDPODataset(Dataset):
“”“JSONL rows: prompt, chosen, rejected.”“”
def __init__(self, rows: List[Dict[str, str]]):
for k in ("prompt", "chosen", "rejected"):
if any(k not in r for r in rows):
raise ValueError(f"Missing key '{k}' in some DPO rows.")
self.rows = rows
def __len__(self) -> int:
return len(self.rows)
def __getitem__(self, idx: int) -> Dict[str, str]:
r = self.rows[idx]
return {"prompt": r["prompt"], "chosen": r["chosen"], "rejected": r["rejected"]}
class SFTSingleRefDataset(Dataset):
“”“JSONL rows: prompt, response. Used for CE loss / PPL.”“”
def __init__(
self, rows: List[Dict[str, str]], tokenizer, max_len: int = 2048
) -> None:
if any(("prompt" not in r or "response" not in r) for r in rows):
raise ValueError("Each row must have 'prompt' and 'response'.")
self.rows = rows
self.tok = tokenizer
self.max_len = max_len
self.eos = tokenizer.eos_token or tokenizer.pad_token
def __len__(self) -> int:
return len(self.rows)
def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]:
r = self.rows[idx]
text = f"{r['prompt']}\n{r['response']}{self.eos}"
enc = self.tok(
text,
max_length=self.max_len,
truncation=True,
padding="max_length",
return_tensors="pt",
)
input_ids = enc["input_ids"][0]
attn_mask = enc["attention_mask"][0]
labels = input_ids.clone()
labels[attn_mask == 0] = -100
return {"input_ids": input_ids, "attention_mask": attn_mask, "labels": labels}
def read_jsonl(path: str) → List[Dict[str, Any]]:
with open(path, “r”, encoding=“utf-8”) as f:
return [json.loads(line) for line in f if line.strip()]
----------------------- Simple Text Metrics -----------------------
def _split_tokens(s: str) → List[str]:
Lower + whitespace tokenization keeps it deterministic
return s.lower().strip().split()
def _ngram_counts(tokens: Sequence[str], n: int) → Dict[Tuple[str, …], int]:
out: Dict[Tuple[str, …], int] = {}
for i in range(len(tokens) - n + 1):
ngram = tuple(tokens[i : i + n])
out[ngram] = out.get(ngram, 0) + 1
return out
def _lcs_len(a: Sequence[str], b: Sequence[str]) → int:
O(n*m) DP is fine for eval batches; sequences are short relative to tokens budget.
n, m = len(a), len(b)
dp = [0] * (m + 1)
for i in range(1, n + 1):
prev = 0
for j in range(1, m + 1):
temp = dp[j]
if a[i - 1] == b[j - 1]:
dp[j] = prev + 1
else:
dp[j] = max(dp[j], dp[j - 1])
prev = temp
return dp[m]
def _safe_div(a: float, b: float) → float:
return a / b if b != 0 else 0.0
def rouge_scores(pred: str, ref: str) → Dict[str, float]:
“”"
Returns sample-level F1 for ROUGE-1, ROUGE-2, and ROUGE-L.
“”"
pt, rt = _split_tokens(pred), _split_tokens(ref)
def rouge_n(n: int) -> float:
pc, rc = _ngram_counts(pt, n), _ngram_counts(rt, n)
overlap = 0
for k, v in pc.items():
if k in rc:
overlap += min(v, rc[k])
p = _safe_div(overlap, max(1, sum(pc.values())))
r = _safe_div(overlap, max(1, sum(rc.values())))
f1 = _safe_div(2 * p * r, (p + r)) if p + r else 0.0
return f1
# ROUGE-L F1 via LCS
lcs = _lcs_len(pt, rt)
p_l = _safe_div(lcs, max(1, len(pt)))
r_l = _safe_div(lcs, max(1, len(rt)))
f1_l = _safe_div(2 * p_l * r_l, (p_l + r_l)) if p_l + r_l else 0.0
return {"rouge1": rouge_n(1), "rouge2": rouge_n(2), "rougeL": f1_l}
def bleu4_score(pred: str, ref: str) → float:
“”"
Corpus-smoothed BLEU-4 at sentence level (simple +1 Laplace smoothing on n-gram precisions).
This is a light, dependency-free approximation; sacrebleu will differ slightly.
“”"
pt, rt = _split_tokens(pred), _split_tokens(ref)
if not pt:
return 0.0
def clipped_precision(n: int) -> float:
pc, rc = _ngram_counts(pt, n), _ngram_counts(rt, n)
overlap = 0
total = 0
for k, v in pc.items():
total += v
if k in rc:
overlap += min(v, rc[k])
# +1 smoothing keeps BLEU informative for short strings
return (overlap + 1.0) / (total + 1.0)
precisions = [clipped_precision(n) for n in range(1, 5)]
geo_mean = math.exp(sum(math.log(p) for p in precisions) / 4.0)
# Brevity penalty
ref_len, pred_len = len(rt), len(pt)
if pred_len > ref_len:
bp = 1.0
else:
bp = math.exp(1.0 - _safe_div(ref_len, max(1, pred_len)))
return bp * geo_mean
def compute_metrics_bulk(
preds: List[str], refs: List[str], want_rouge: bool, want_bleu: bool
) → Dict[str, float]:
assert len(preds) == len(refs)
n = max(1, len(preds))
out: Dict[str, float] = {}
if want_rouge:
r1 = r2 = rl = 0.0
for p, r in zip(preds, refs):
sc = rouge_scores(p, r)
r1 += sc[“rouge1”]
r2 += sc[“rouge2”]
rl += sc[“rougeL”]
out[“gen_rouge1”] = r1 / n
out[“gen_rouge2”] = r2 / n
out[“gen_rougeL”] = rl / n
if want_bleu:
b = 0.0
for p, r in zip(preds, refs):
b += bleu4_score(p, r)
out[“gen_bleu4”] = b / n
return out
------------------------- Generation Eval -------------------------
@dataclass
class GenArgs:
max_new_tokens: int = 256
do_sample: bool = False
temperature: float = 0.7
top_p: float = 0.95
num_beams: int = 1
prompt_max_length: int = 1024
@torch.no_grad()
def generate_texts(
model: AutoModelForCausalLM,
tokenizer,
prompts: List[str],
gen_args: GenArgs,
batch_size: int,
device: str,
) → List[str]:
outs: List[str] =
for i in range(0, len(prompts), batch_size):
batch = prompts[i : i + batch_size]
enc = tokenizer(
batch,
return_tensors=“pt”,
padding=True,
truncation=True,
max_length=gen_args.prompt_max_length,
).to(device)
gen = model.generate(
**enc,
max_new_tokens=gen_args.max_new_tokens,
do_sample=gen_args.do_sample,
temperature=gen_args.temperature,
top_p=gen_args.top_p,
num_beams=gen_args.num_beams,
pad_token_id=tokenizer.eos_token_id,
)
Keep only the generated continuation
dec = tokenizer.batch_decode(gen, skip_special_tokens=True)
Heuristic: take suffix after the original prompt text.
for prompt, full in zip(batch, dec):
if full.startswith(prompt):
outs.append(full[len(prompt) :].strip())
else:
outs.append(full.strip())
return outs
def eval_dpo(
model: AutoModelForCausalLM,
tokenizer,
eval_path: str,
batch_size: int,
) → Dict[str, float]:
if not _HAS_TRL:
raise ImportError(“trl is required for DPO evaluation. pip install trl
”)
rows = read_jsonl(eval_path)
eval_ds = PairwiseDPODataset(rows)
dpo_args = DPOConfig(
output_dir=os.path.join("runs", "dpo_eval"),
per_device_eval_batch_size=batch_size,
loss_type="sigmoid", # 'sft' is invalid for DPO
logging_steps=0,
save_strategy="no",
eval_strategy="no",
report_to=[],
)
trainer = DPOTrainer(
model=model,
tokenizer=tokenizer,
args=dpo_args,
train_dataset=None,
eval_dataset=eval_ds,
)
metrics = trainer.evaluate()
return metrics
def eval_perplexity(
model: AutoModelForCausalLM,
tokenizer,
eval_path: str,
batch_size: int,
max_length: int,
) → Dict[str, float]:
rows = read_jsonl(eval_path)
eval_ds = SFTSingleRefDataset(rows, tokenizer, max_len=max_length)
args = TrainingArguments(
output_dir=os.path.join("runs", "ppl_eval"),
per_device_eval_batch_size=batch_size,
dataloader_drop_last=False,
report_to=[],
)
collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False)
trainer = Trainer(
model=model,
args=args,
eval_dataset=eval_ds,
data_collator=collator,
tokenizer=tokenizer,
)
metrics = trainer.evaluate()
loss = float(metrics["eval_loss"])
metrics["perplexity"] = math.exp(loss) if loss < 50 else float("inf")
return metrics
def maybe_gen_metrics(
model: AutoModelForCausalLM,
tokenizer,
rows: List[Dict[str, Any]],
mode: str,
metrics_flag: str,
dpo_target_field: str,
gen_args: GenArgs,
batch_size: int,
device: str,
) → Dict[str, float]:
if metrics_flag == “none”:
return {}
want_rouge = metrics_flag in ("rouge", "all")
want_bleu = metrics_flag in ("bleu", "all")
prompts: List[str] = []
refs: List[str] = []
if mode == "perplexity":
for r in rows:
prompts.append(str(r["prompt"]))
refs.append(str(r["response"]))
else: # dpo
tgt_field = dpo_target_field or "chosen"
for r in rows:
if tgt_field not in r:
raise ValueError(
f"--dpo_target_field '{tgt_field}' not in row keys {list(r.keys())}"
)
prompts.append(str(r["prompt"]))
refs.append(str(r[tgt_field]))
preds = generate_texts(model, tokenizer, prompts, gen_args, batch_size, device)
return compute_metrics_bulk(preds, refs, want_rouge, want_bleu)
------------------------------- CLI -------------------------------
def main():
parser = argparse.ArgumentParser()
parser.add_argument(“–mode”, choices=[“dpo”, “perplexity”], required=True)
parser.add_argument(“–model”, required=True)
parser.add_argument(“–tokenizer”, default=None)
parser.add_argument(“–eval_jsonl”, required=True)
parser.add_argument(“–batch_size”, type=int, default=2)
parser.add_argument(“–max_length”, type=int, default=2048)
# Text metrics / generation
parser.add_argument(
"--metrics", choices=["none", "rouge", "bleu", "all"], default="none"
)
parser.add_argument("--max_new_tokens", type=int, default=256)
parser.add_argument("--do_sample", action="store_true")
parser.add_argument("--temperature", type=float, default=0.7)
parser.add_argument("--top_p", type=float, default=0.95)
parser.add_argument("--num_beams", type=int, default=1)
parser.add_argument("--prompt_max_length", type=int, default=1024)
parser.add_argument(
"--dpo_target_field",
type=str,
default="chosen",
help="For DPO metrics, compare generations to this field (e.g., chosen).",
)
args = parser.parse_args()
tok_name = args.tokenizer or args.model
tokenizer = AutoTokenizer.from_pretrained(tok_name, use_fast=True)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token # avoid pad warnings
device = "cuda" if torch.cuda.is_available() else "cpu"
model = AutoModelForCausalLM.from_pretrained(args.model).to(device)
model.eval()
all_metrics: Dict[str, float] = {}
if args.mode == "dpo":
dpo_metrics = eval_dpo(model, tokenizer, args.eval_jsonl, args.batch_size)
all_metrics.update(dpo_metrics)
if args.metrics != "none":
rows = read_jsonl(args.eval_jsonl)
gm = maybe_gen_metrics(
model,
tokenizer,
rows,
mode="dpo",
metrics_flag=args.metrics,
dpo_target_field=args.dpo_target_field,
gen_args=GenArgs(
max_new_tokens=args.max_new_tokens,
do_sample=args.do_sample,
temperature=args.temperature,
top_p=args.top_p,
num_beams=args.num_beams,
prompt_max_length=args.prompt_max_length,
),
batch_size=args.batch_size,
device=device,
)
all_metrics.update(gm)
else:
ppl_metrics = eval_perplexity(
model, tokenizer, args.eval_jsonl, args.batch_size, args.max_length
)
all_metrics.update(ppl_metrics)
if args.metrics != "none":
rows = read_jsonl(args.eval_jsonl)
gm = maybe_gen_metrics(
model,
tokenizer,
rows,
mode="perplexity",
metrics_flag=args.metrics,
dpo_target_field="",
gen_args=GenArgs(
max_new_tokens=args.max_new_tokens,
do_sample=args.do_sample,
temperature=args.temperature,
top_p=args.top_p,
num_beams=args.num_beams,
prompt_max_length=args.prompt_max_length,
),
batch_size=args.batch_size,
device=device,
)
all_metrics.update(gm)
# Final print for scripts/CI
print({k: (float(v) if isinstance(v, (int, float)) else v) for k, v in all_metrics.items()})
if name == “main”:
main()