Plan
-
CLI: accept OCR .txt or block .json ([{text, bbox}]), optional gazetteer CSV.
-
Normalize text:
- Unicode NFKC; fix ligatures/quotes; remove headers/footers; dehyphenate line wraps; collapse whitespace.
-
(Optional) Spell-correct with SymSpell if dictionary provided; keep original for alignment if correction too destructive.
-
Sentence split (spaCy lightweight or regex fallback).
-
NER ensemble:
-
Merge spans:
-
Gazetteer pass (optional):
- Fuzzy match tokens to gazetteer; add/upgrade labels (ORG/GPE/PER).
-
Zone filtering (if JSON blocks): ignore blocks likely “ads/boilerplate” via digit/uppercase/price density; concatenate editorial zones only.
-
Emit JSON: entities with text, label, score, offsets; short report (noise level, tokens corrected).
“”"
OCR-robust NER (free, no paid APIs).
Install:
pip install transformers torch spacy dateparser rapidfuzz symspellpy pandas
python -m spacy download en_core_web_sm
Usage:
python nlp/ocr_ner_pipeline.py input.txt --out out.json
python nlp/ocr_ner_pipeline.py blocks.json --blocks --gazetteer names_places.csv --symspell dict_en.txt
blocks.json format:
[ {“text”: “Block text …”, “bbox”: [x,y,w,h]}, … ]
Gazetteer CSV columns (any order detected by header match): name,label
Why these choices:
Ensemble reduces OCR brittleness; char/byte-aware model (xlm-r) helps on noisy text.
Pre/post steps target common OCR artifacts (hyphenation, ligatures, headers/footers).
“”"
from future import annotations
import argparse
import csv
import json
import math
import re
import sys
import unicodedata
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple
import dateparser
Lazy imports for optional deps
try:
import spacy # type: ignore
except Exception:
spacy = None # type: ignore
try:
from transformers import pipeline # type: ignore
except Exception:
pipeline = None # type: ignore
try:
from rapidfuzz import fuzz, process # type: ignore
except Exception:
fuzz = None # type: ignore
process = None # type: ignore
try:
from symspellpy import SymSpell, Verbosity # type: ignore
except Exception:
SymSpell = None # type: ignore
---------------- util ----------------
def _nfkc(s: str) → str:
return unicodedata.normalize(“NFKC”, s)
def _replace_ligatures(s: str) → str:
return (s.replace(“fi”, “fi”).replace(“fl”, “fl”).replace(“’”, “'”).replace(““”, ‘"’).replace(“””, ‘"’))
def _strip_headers_footers(text: str) → str:
lines = [l for l in text.splitlines()]
kept: List[str] =
for l in lines:
if re.search(r"^\s*(page\s*\d+|^\d+\s*$|www.[^\s]+|©|copyright)", l, re.IGNORECASE):
continue
kept.append(l)
return “\n”.join(kept)
def _dehyphenate(text: str) → str:
# join hyphenated line breaks: “exam-\nple” → “example”
text = re.sub(r"(\w)-\n(\w)“, r”\1\2", text)
# unwrap single line breaks inside paragraphs
text = re.sub(r"(?<!\n)\n(?!\n)“, " “, text)
# collapse multiple blank lines
text = re.sub(r”\n{3,}”, “\n\n”, text)
return text
def normalize_ocr_text(text: str) → str:
text = _nfkc(text)
text = _replace_ligatures(text)
text = _strip_headers_footers(text)
text = _dehyphenate(text)
# normalize whitespace
text = re.sub(r"[ \t]+“, " “, text)
text = re.sub(r” +\n”, “\n”, text)
return text.strip()
def simple_sentences(text: str) → List[str]:
if spacy is not None:
try:
nlp = spacy.load(“en_core_web_sm”, disable=[“ner”,“tagger”,“lemmatizer”])
nlp.add_pipe(“sentencizer”)
return [s.text.strip() for s in nlp(text).sents if s.text.strip()]
except Exception:
pass
# regex fallback
return [s.strip() for s in re.split(r"(?<=[.!?])\s+(?=[A-Z0-9])", text) if s.strip()]
---------------- blocks / zoning ----------------
def load_text_or_blocks(path: Path, is_blocks: bool) → str:
if not is_blocks:
return path.read_text(encoding=“utf-8”, errors=“ignore”)
blocks = json.loads(path.read_text(encoding=“utf-8”))
# filter likely ads/boilerplate: too many prices/digits/uppercase
editorial: List[str] =
for b in blocks:
t: str = b.get(“text”, “”)
if not t.strip():
continue
price_hits = len(re.findall(r"$\s?\d|\b\d+(?:,\d{3})+(?:.\d+)?\b", t))
upper_ratio = sum(1 for c in t if c.isupper()) / max(1, sum(1 for c in t if c.isalpha()))
digit_ratio = sum(1 for c in t if c.isdigit()) / max(1, len(t))
if price_hits >= 5 or upper_ratio > 0.7 or digit_ratio > 0.4:
continue # likely ad/table/noise
editorial.append(t.strip())
return “\n\n”.join(editorial) if editorial else “\n\n”.join([b.get(“text”,“”) for b in blocks])
---------------- spell correction ----------------
def build_symspell(dict_path: Optional[Path]) → Optional[SymSpell]:
if dict_path is None or SymSpell is None:
return None
sym = SymSpell(max_dictionary_edit_distance=2, prefix_length=7)
# expects “term count” per line
if not sym.load_dictionary(str(dict_path), term_index=0, count_index=1, separator=" "):
return None
return sym
def symspell_correct(text: str, sym: SymSpell) → str:
# Only correct tokens that look broken; keep caps/punct.
out_tokens: List[str] =
for tok in re.findall(r"\w+|[^\w\s]“, text, re.UNICODE):
if re.fullmatch(r”[A-Za-z]{3,}“, tok):
sug = sym.lookup(tok.lower(), Verbosity.CLOSEST, max_edit_distance=2, include_unknown=True)
cand = sug[0].term if sug else tok
if tok[0].isupper():
cand = cand.capitalize()
out_tokens.append(cand)
else:
out_tokens.append(tok)
return _nfkc(”".join(out_tokens))
---------------- gazetteer ----------------
@dataclass
class GazetteerItem:
name: str
label: str # PER/ORG/GPE/LOC
def load_gazetteer(path: Optional[Path]) → List[GazetteerItem]:
if not path or not path.exists():
return
items: List[GazetteerItem] =
with open(path, “r”, encoding=“utf-8”) as f:
reader = csv.DictReader(f)
for row in reader:
name = (row.get(“name”) or row.get(“Name”) or “”).strip()
label = (row.get(“label”) or row.get(“Label”) or “ORG”).strip().upper()
if name:
items.append(GazetteerItem(name=name, label=label))
return items
def gazetteer_match(text: str, items: List[GazetteerItem]) → List[Tuple[int,int,str,float]]:
if not items or process is None:
return
spans: List[Tuple[int,int,str,float]] =
for g in items:
# sliding window over text to find close matches
for m in re.finditer(r"\b[\w\s]{3,}\b", text):
window = m.group(0)
score = fuzz.token_set_ratio(window, g.name)
if score >= 92 and len(window.split()) >= max(1, len(g.name.split())-1):
spans.append((m.start(), m.end(), g.label, score/100.0))
return spans
---------------- NER ensemble ----------------
@dataclass
class Span:
start: int
end: int
label: str
score: float
def run_ner_ensemble(text: str, models: List[str]) → List[Span]:
if pipeline is None:
raise RuntimeError(“transformers not installed.”)
pipes = [pipeline(“ner”, model=m, aggregation_strategy=“simple”) for m in models]
spans: List[Span] =
# chunk to avoid extremely long inputs
sentences = simple_sentences(text)
for sent in sentences:
for p in pipes:
for ent in p(sent):
start = ent[“start”] + text.find(sent)
end = ent[“end”] + text.find(sent)
spans.append(Span(start, end, ent[“entity_group”], float(ent.get(“score”, 0.0))))
return spans
def _overlap(a: Span, b: Span) → bool:
return not (a.end <= b.start or b.end <= a.start)
def merge_spans(spans: List[Span]) → List[Span]:
if not spans:
return
spans = sorted(spans, key=lambda s: (s.start, -s.end))
merged: List[Span] =
for s in spans:
if not merged or not _overlap(merged[-1], s):
merged.append(s)
continue
# overlap: merge with majority label; score = max
prev = merged[-1]
if prev.label == s.label:
merged[-1] = Span(prev.start, max(prev.end, s.end), prev.label, max(prev.score, s.score))
else:
# prefer non-MISC; prefer higher score; prefer longer span
candidates = sorted([prev, s], key=lambda x: (x.label==“MISC”, -x.score, -(x.end-x.start)))
merged[-1] = Span(min(prev.start, s.start), max(prev.end, s.end), candidates[0].label, candidates[0].score)
# heuristics cleanup
cleaned: List[Span] =
for s in merged:
frag = s # copy
frag_text = “” # fill later in finalize()
cleaned.append(Span(frag.start, frag.end, _fix_label(frag.label), frag.score))
return cleaned
def _fix_label(label: str) → str:
# normalize common variants
label = label.upper()
return {“LOC”:“GPE”}.get(label, label)
DATE_RE = re.compile(r"\b(?:\d{1,2}[/-.]\d{1,2}[/-.]\d{2,4}|(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Sept|Oct|Nov|Dec)[a-z]*\s+\d{1,2},?\s+\d{2,4})\b", re.IGNORECASE)
def post_rules(text: str, spans: List[Span]) → List[Span]:
out: List[Span] =
for s in spans:
frag = text[s.start:s.end]
if s.label == “PER” and re.search(r"\d", frag):
continue # drop obvious OCR junk in names
if s.label in {“ORG”,“GPE”} and len(frag.strip()) < 3:
continue
out.append(s)
# add high-confidence DATEs from regex when parser succeeds
for m in DATE_RE.finditer(text):
if dateparser.parse(m.group(0)):
out.append(Span(m.start(), m.end(), “DATE”, 0.99))
# merge overlaps again
out = merge_spans(sorted(out, key=lambda x: (x.start, x.end)))
return out
---------------- main ----------------
def main() → None:
ap = argparse.ArgumentParser(description=“OCR-robust NER with cleaning + ensemble + gazetteer.”)
ap.add_argument(“input”, type=str, help=“Path to OCR text (.txt) or JSON blocks.”)
ap.add_argument(“–blocks”, action=“store_true”, help=“Treat input as JSON blocks with {text,bbox}.”)
ap.add_argument(“–gazetteer”, type=str, default=None, help=“CSV with columns: name,label”)
ap.add_argument(“–symspell”, type=str, default=None, help=“SymSpell dictionary txt (term count). Optional.”)
ap.add_argument(“–out”, type=str, default=None, help=“Output JSON path.”)
ap.add_argument(“–models”, nargs=“+”, default=[“dslim/bert-base-NER”,“Davlan/xlm-roberta-base-ner-hrl”], help=“HF models for ensemble.”)
args = ap.parse_args()
raw = load_text_or_blocks(Path(args.input), is_blocks=args.blocks)
normalized = normalize_ocr_text(raw)
# optional spell correction (why: OCR often makes near-miss tokens)
corrected = normalized
if args.symspell and SymSpell is not None:
sym = build_symspell(Path(args.symspell))
if sym:
corrected = symspell_correct(normalized, sym)
# run ensemble
spans = run_ner_ensemble(corrected, args.models)
# gazetteer augmentation
gspans: List[Span] = []
gaz = load_gazetteer(Path(args.gazetteer)) if args.gazetteer else []
for (s, e, lab, score) in gazetteer_match(corrected, gaz):
gspans.append(Span(s, e, lab, float(score)))
all_spans = spans + gspans
merged = merge_spans(all_spans)
final_spans = post_rules(corrected, merged)
# package entities
ents = []
for s in final_spans:
frag = corrected[s.start:s.end]
ents.append({
"text": frag,
"label": s.label,
"start": s.start,
"end": s.end,
"score": round(s.score, 4),
})
report = {
"chars": len(corrected),
"num_entities": len(ents),
"models": args.models,
"spell_corrected": bool(args.symspell),
"used_blocks": args.blocks,
"gazetteer_count": len(gaz),
}
out = {"entities": ents, "report": report}
out_path = Path(args.out) if args.out else Path(args.input).with_suffix(".ner.json")
out_path.write_text(json.dumps(out, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"Wrote {out_path}")
if name == “main”:
main()
Script generated by TD Ai