I want to build an invoice extractor for free , in which I can get desired details when I upload my invoice pdf, in a JSON format, I have tried many things like hard code approach, pre trained models on hugging face , docling , donut and layoutlmv3 too , but I am not getting accurate results , the structure and contents of my invoices are different.
If anybody can help me it would be great.
Plan
-
Input & config
-
Accept one/many PDF paths, optional JSON output path.
-
Define target schema (header fields + line items).
-
-
Load text
-
Try
pdfplumbertext per page. -
If a page is “too empty”, try to make it searchable with
ocrmypdf(if present); else fallback topdf2image+pytesseract.
-
-
Detect tables (line items)
-
First try
camelot(vector PDFs) inlatticethenstream. -
Normalize columns (qty, description, unit price, total) via fuzzy headers + regex numbers.
-
If no table found, fallback: regex-based line parser over the text block between “Description/Item” and “Total/Subtotal”.
-
-
Header fields
-
invoice_number: regex on common labels. -
date: regex on date patterns →dateparsernormalize toYYYY-MM-DD. -
Parties: capture blocks after “Bill To/Invoice To/Ship To/From/Supplier”.
-
Currency: infer from symbols/codes near totals.
-
Totals: scan lines for
Grand Total|Amount Due|Total(prefer last), alsoSubtotalandTax/VAT.
-
-
Post-process
-
Clean numbers, unify currency, dedupe whitespace.
-
Heuristic confidence score from presence/consistency checks (sum(line_totals)≈subtotal≈total−tax).
-
Validate with a JSONSchema-like check (lightweight).
-
-
Output
-
Emit JSON per file; optionally write
.jsonnext to PDF. -
Print brief extraction report (fields found, confidence, any warnings).
-
-
Extensibility
-
Pluggable field extractors (simple strategy registry).
-
Easy to add locale patterns (VAT/GST labels, currency).
-
Free, template-agnostic invoice extractor.
Dependencies (all free):
pip install pdfplumber dateparser rapidfuzz camelot-py[cv] pandas pdf2image pytesseract
OS packages often needed:
- poppler (pdf2image), ghostscript (camelot lattice), tesseract-ocr
Ubuntu: sudo apt-get install -y poppler-utils ghostscript tesseract-ocr
Optional but recommended:
- ocrmypdf (faster, better OCR for scanned PDFs)
Usage:
python tools/invoice_extractor.py input.pdf
python tools/invoice_extractor.py dir/*.pdf --out out_dir/
Why these choices:
- Starts simple (heuristics), degrades gracefully (OCR fallback), no paid APIs.
- Works across varied layouts; you can later swap in ML detectors without breaking I/O.
"""
from __future__ import annotations
import argparse
import dataclasses
import io
import json
import math
import os
import re
import shutil
import sys
import tempfile
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple
# Optional libs (import lazily where used)
# pdfplumber is required
import pdfplumber # type: ignore
import dateparser # type: ignore
try:
from rapidfuzz import fuzz, process # type: ignore
except Exception:
fuzz = None
process = None
# ---- Data model ----
@dataclass
class Party:
name: Optional[str] = None
address: Optional[str] = None
raw_block: Optional[str] = None
@dataclass
class LineItem:
description: Optional[str] = None
quantity: Optional[float] = None
unit_price: Optional[float] = None
line_total: Optional[float] = None
@dataclass
class Invoice:
invoice_number: Optional[str]
invoice_date: Optional[str] # ISO date
currency: Optional[str]
seller: Party
buyer: Party
subtotal: Optional[float]
tax: Optional[float]
total: Optional[float]
line_items: List[LineItem]
confidence: float
warnings: List[str]
raw_text: str
CURRENCY_SYMBOLS = {
"$": "USD", "€": "EUR", "£": "GBP", "¥": "JPY", "₹": "INR", "₩": "KRW", "₫": "VND", "₱": "PHP",
"₪": "ILS", "₺": "TRY", "₽": "RUB", "R$": "BRL", "C$": "CAD", "A$": "AUD", "NZ$": "NZD",
}
CURRENCY_CODES = set(["USD","EUR","GBP","JPY","INR","CAD","AUD","NZD","CHF","CNY","HKD","SGD","SEK","NOK","DKK","PLN","CZK","HUF","TRY","ILS","RUB","ZAR","BRL","MXN","AED","SAR"])
AMOUNT_RE = re.compile(r"(?<!\w)(?:[$€£¥₹]|R\$|C\$|A\$|NZ\$)?\s*[-+]?\d{1,3}(?:,\d{3})*(?:\.\d+)?(?!\w)")
NUM_RE = re.compile(r"[-+]?\d{1,3}(?:,\d{3})*(?:\.\d+)?")
DATE_CAND_RE = re.compile(
r"\b(?:\d{1,2}[/-]\d{1,2}[/-]\d{2,4}|"
r"(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Sept|Oct|Nov|Dec)[a-z]*\s+\d{1,2},?\s+\d{2,4}|"
r"\d{4}[/-]\d{1,2}[/-]\d{1,2})\b",
re.IGNORECASE,
)
INVNO_RE = re.compile(
r"(invoice\s*(?:no\.?|number|#)\s*[:\-]?\s*([A-Z0-9\-\/\.]+)|"
r"inv\s*#\s*[:\-]?\s*([A-Z0-9\-\/\.]+))",
re.IGNORECASE,
)
# ---- Utilities ----
def _to_float(s: str) -> Optional[float]:
try:
s = s.strip()
s = re.sub(r"[^\d\.\-\,]", "", s)
if s.count(",") > 0 and s.count(".") == 0:
s = s.replace(",", "")
else:
s = s.replace(",", "")
return float(s)
except Exception:
return None
def _norm_whitespace(s: str) -> str:
return re.sub(r"[ \t]+", " ", s).strip()
def _which(cmd: str) -> bool:
return shutil.which(cmd) is not None
def _best_last_amount(lines: List[str], keywords: Iterable[str]) -> Optional[float]:
pat = re.compile("|".join([re.escape(k) for k in keywords]), re.IGNORECASE)
winners: List[float] = []
for line in lines:
if pat.search(line):
amts = AMOUNT_RE.findall(line)
if amts:
v = _to_float(amts[-1])
if v is not None:
winners.append(v)
return winners[-1] if winners else None
def _infer_currency(text: str) -> Optional[str]:
for sym, code in CURRENCY_SYMBOLS.items():
if sym in text:
return code
for code in CURRENCY_CODES:
if re.search(rf"\b{code}\b", text):
return code
return None
# ---- OCR / Text extraction ----
def extract_text_with_fallback(pdf_path: Path) -> Tuple[str, List[str], List[str]]:
"""
Returns:
full_text, page_texts, warnings
"""
warnings: List[str] = []
page_texts: List[str] = []
try:
with pdfplumber.open(str(pdf_path)) as pdf:
for page in pdf.pages:
t = page.extract_text() or ""
page_texts.append(t)
except Exception as e:
warnings.append(f"pdfplumber failed: {e}")
page_texts = []
# Decide if OCR is needed
need_ocr = any(len(t.strip()) < 20 for t in page_texts) or not page_texts
if not need_ocr:
return ("\n".join(page_texts), page_texts, warnings)
# Try ocrmypdf first (why: preserves layout & embedded text)
if _which("ocrmypdf"):
try:
with tempfile.TemporaryDirectory() as td:
out_pdf = Path(td) / "ocr.pdf"
os.system(f'ocrmypdf --quiet --skip-text "{pdf_path}" "{out_pdf}"')
with pdfplumber.open(str(out_pdf)) as pdf:
page_texts = [(p.extract_text() or "") for p in pdf.pages]
warnings.append("Used ocrmypdf for OCR.")
return ("\n".join(page_texts), page_texts, warnings)
except Exception as e:
warnings.append(f"ocrmypdf failed: {e}")
# Fallback to pytesseract
try:
from pdf2image import convert_from_path # type: ignore
import pytesseract # type: ignore
images = convert_from_path(str(pdf_path))
page_texts = [pytesseract.image_to_string(img) for img in images]
warnings.append("Used pytesseract OCR fallback.")
return ("\n".join(page_texts), page_texts, warnings)
except Exception as e:
warnings.append(f"OCR fallback failed: {e}")
# Return whatever we had, even if empty
return ("\n".join(page_texts), page_texts, warnings)
# ---- Field extractors ----
def extract_invoice_number(text: str) -> Optional[str]:
m = INVNO_RE.search(text)
if m:
for g in m.groups()[1:]:
if g:
return g.strip().strip(":.#")
# Weak fallback: first alnum chunk near "invoice"
lines = text.splitlines()
for i, line in enumerate(lines):
if re.search(r"\binvoice\b", line, re.IGNORECASE):
after = lines[i : i + 2]
jtxt = " ".join(after)
m2 = re.search(r"(#|no\.?|number)[:\s\-]*([A-Z0-9\-\/\.]{3,})", jtxt, re.IGNORECASE)
if m2:
return m2.group(2)
return None
def extract_date(text: str) -> Optional[str]:
cand = DATE_CAND_RE.findall(text)
for c in cand:
parsed = dateparser.parse(c, settings={"DATE_ORDER": "DMY"})
if parsed:
try:
return parsed.date().isoformat()
except Exception:
continue
return None
def _capture_block_after(text: str, label: str, stop_labels: Iterable[str]) -> Optional[str]:
# Why: parties are often in blocks following a label; stop when we hit another known label.
lines = [_norm_whitespace(l) for l in text.splitlines()]
block: List[str] = []
start = None
for i, l in enumerate(lines):
if re.search(rf"\b{label}\b", l, re.IGNORECASE):
start = i + 1
break
if start is None:
return None
for l in lines[start:]:
if any(re.search(rf"\b{sl}\b", l, re.IGNORECASE) for sl in stop_labels):
break
if l.strip():
block.append(l)
elif block:
break
return "\n".join(block).strip() if block else None
def extract_parties(text: str) -> Tuple[Party, Party]:
stop = ["bill to", "ship to", "invoice to", "sold to", "from", "supplier", "customer", "buyer"]
buyer_block = (
_capture_block_after(text, "bill to", stop)
or _capture_block_after(text, "invoice to", stop)
or _capture_block_after(text, "sold to", stop)
or _capture_block_after(text, "ship to", stop)
)
seller_block = (
_capture_block_after(text, "from", stop)
or _capture_block_after(text, "supplier", stop)
or _capture_block_after(text, "seller", stop)
)
def to_party(block: Optional[str]) -> Party:
if not block:
return Party()
lines = block.splitlines()
name = lines[0] if lines else None
addr = "\n".join(lines[1:]) if len(lines) > 1 else None
return Party(name=name, address=addr, raw_block=block)
return to_party(seller_block), to_party(buyer_block)
def extract_totals(text: str) -> Tuple[Optional[float], Optional[float], Optional[float]]:
lines = [_norm_whitespace(l) for l in text.splitlines()]
total = _best_last_amount(lines, ["grand total", "amount due", "total"])
subtotal = _best_last_amount(lines, ["subtotal", "sub total"])
tax = _best_last_amount(lines, ["tax", "vat", "gst"])
if not total and subtotal and tax:
total = round(subtotal + tax, 2)
return subtotal, tax, total
# ---- Line items ----
def _normalize_header_name(h: str) -> str:
h = h.lower()
if "qty" in h or "quantity" in h or h.strip() == "q":
return "quantity"
if "unit price" in h or "price" in h or "rate" in h:
return "unit_price"
if "total" in h or "amount" in h or "line total" in h:
return "line_total"
if "description" in h or "item" in h or "details" in h:
return "description"
return h
def _camelot_extract(pdf_path: Path) -> List[LineItem]:
try:
import camelot # type: ignore
tables = []
# Try both modes; lattice handles ruled tables, stream handles whitespace tables.
for flavor in ("lattice", "stream"):
try:
t = camelot.read_pdf(str(pdf_path), flavor=flavor, pages="all")
tables.extend([df.df for df in t])
except Exception:
continue
items: List[LineItem] = []
for df in tables:
headers = [_normalize_header_name(str(h)) for h in df.iloc[0].tolist()]
for _, row in df.iloc[1:].iterrows():
rec: Dict[str, Optional[str]] = {}
for idx, cell in enumerate(row.tolist()):
key = headers[idx] if idx < len(headers) else f"col{idx}"
rec[key] = str(cell)
# Map to LineItem
desc = rec.get("description")
qty = _to_float(rec.get("quantity") or "") if rec.get("quantity") else None
unit = _to_float(rec.get("unit_price") or "") if rec.get("unit_price") else None
lt = _to_float(rec.get("line_total") or rec.get("amount") or "") if (rec.get("line_total") or rec.get("amount")) else None
# weak filter: must have desc and at least one numeric
if (desc and (qty is not None or unit is not None or lt is not None)):
items.append(LineItem(description=_norm_whitespace(desc), quantity=qty, unit_price=unit, line_total=lt))
return items
except Exception:
return []
def _regex_lineitem_fallback(text: str) -> List[LineItem]:
# Why: many invoices have "Description Qty Price Total" lines separated by spaces.
lines = [_norm_whitespace(l) for l in text.splitlines() if l.strip()]
# Find likely header line
header_idx = None
for i, l in enumerate(lines):
if re.search(r"description|item", l, re.IGNORECASE) and re.search(r"qty|quantity", l, re.IGNORECASE):
header_idx = i
break
if header_idx is None:
header_idx = 0
body = lines[header_idx + 1 :]
items: List[LineItem] = []
for l in body:
nums = NUM_RE.findall(l)
if len(nums) >= 1 and any(k in l.lower() for k in ["total", "subtotal", "tax", "vat"]):
break # reached footer
if len(nums) == 0:
continue
# Heuristic split: description before first number
m = re.search(NUM_RE, l)
if not m:
continue
desc = l[: m.start()].strip(" -:•")
rest = l[m.start() :]
nums = NUM_RE.findall(rest)
q = _to_float(nums[0]) if len(nums) >= 1 else None
up = _to_float(nums[1]) if len(nums) >= 2 else None
lt = _to_float(nums[2]) if len(nums) >= 3 else None
if desc and (q is not None or up is not None or lt is not None):
items.append(LineItem(description=desc, quantity=q, unit_price=up, line_total=lt))
return items
def extract_line_items(pdf_path: Path, text: str) -> List[LineItem]:
items = _camelot_extract(pdf_path)
if items:
return items
return _regex_lineitem_fallback(text)
# ---- Confidence & validation ----
def compute_confidence(inv: Invoice) -> Tuple[float, List[str]]:
warnings: List[str] = list(inv.warnings)
points = 0
max_points = 8
points += 1 if inv.invoice_number else 0
points += 1 if inv.invoice_date else 0
points += 1 if inv.total else 0
points += 1 if inv.currency else 0
points += 1 if inv.seller.name or inv.buyer.name else 0
points += 1 if len(inv.line_items) > 0 else 0
# Consistency check
if inv.subtotal is not None and inv.tax is not None and inv.total is not None:
if abs((inv.subtotal + inv.tax) - inv.total) < 1.01:
points += 2
else:
warnings.append("Totals do not add up cleanly.")
elif inv.total is not None:
points += 1
conf = round(points / max_points, 2)
return conf, warnings
# ---- Main pipeline ----
def extract_invoice(pdf_path: Path) -> Invoice:
full_text, page_texts, warns = extract_text_with_fallback(pdf_path)
currency = _infer_currency(full_text)
inv_no = extract_invoice_number(full_text)
inv_date = extract_date(full_text)
seller, buyer = extract_parties(full_text)
subtotal, tax, total = extract_totals(full_text)
items = extract_line_items(pdf_path, full_text)
invoice = Invoice(
invoice_number=inv_no,
invoice_date=inv_date,
currency=currency,
seller=seller,
buyer=buyer,
subtotal=subtotal,
tax=tax,
total=total,
line_items=items,
confidence=0.0,
warnings=warns,
raw_text=full_text,
)
c, w = compute_confidence(invoice)
invoice.confidence = c
invoice.warnings = w
return invoice
# ---- CLI ----
def _default_out_path(in_path: Path, out_dir: Optional[Path]) -> Path:
if out_dir:
out_dir.mkdir(parents=True, exist_ok=True)
return out_dir / (in_path.stem + ".json")
return in_path.with_suffix(".json")
def main() -> None:
parser = argparse.ArgumentParser(description="Free invoice extractor → JSON")
parser.add_argument("pdfs", nargs="+", help="PDF file(s) or globs")
parser.add_argument("--out", help="Output directory (optional)")
parser.add_argument("--print", action="store_true", help="Print JSON to stdout")
args = parser.parse_args()
files: List[Path] = []
for p in args.pdfs:
if any(ch in p for ch in ["*", "?", "["]):
files.extend([Path(x) for x in sorted(Path().glob(p))])
else:
files.append(Path(p))
files = [f for f in files if f.exists() and f.is_file()]
out_dir = Path(args.out) if args.out else None
if not files:
print("No input PDFs found.", file=sys.stderr)
sys.exit(1)
results: List[Dict[str, Any]] = []
for f in files:
try:
inv = extract_invoice(f)
data = asdict(inv)
results.append({"file": str(f), "invoice": data})
out_path = _default_out_path(f, out_dir)
with open(out_path, "w", encoding="utf-8") as fo:
json.dump(data, fo, ensure_ascii=False, indent=2)
print(f"[OK] {f} → {out_path} (confidence={inv.confidence})")
if inv.warnings:
print(" warnings:", "; ".join(inv.warnings))
if args.__dict__.get("print", False):
print(json.dumps(data, ensure_ascii=False, indent=2))
except Exception as e:
print(f"[FAIL] {f}: {e}", file=sys.stderr)
if __name__ == "__main__":
main()
Script generated by TD Ai