How can I build an invoice data extractor tool for free?

I want to build an invoice extractor for free , in which I can get desired details when I upload my invoice pdf, in a JSON format, I have tried many things like hard code approach, pre trained models on hugging face , docling , donut and layoutlmv3 too , but I am not getting accurate results , the structure and contents of my invoices are different.
If anybody can help me it would be great.

Plan

  1. Input & config

    • Accept one/many PDF paths, optional JSON output path.

    • Define target schema (header fields + line items).

  2. Load text

    • Try pdfplumber text per page.

    • If a page is “too empty”, try to make it searchable with ocrmypdf (if present); else fallback to pdf2image + pytesseract.

  3. Detect tables (line items)

    • First try camelot (vector PDFs) in lattice then stream.

    • Normalize columns (qty, description, unit price, total) via fuzzy headers + regex numbers.

    • If no table found, fallback: regex-based line parser over the text block between “Description/Item” and “Total/Subtotal”.

  4. Header fields

    • invoice_number: regex on common labels.

    • date: regex on date patterns → dateparser normalize to YYYY-MM-DD.

    • Parties: capture blocks after “Bill To/Invoice To/Ship To/From/Supplier”.

    • Currency: infer from symbols/codes near totals.

    • Totals: scan lines for Grand Total|Amount Due|Total (prefer last), also Subtotal and Tax/VAT.

  5. Post-process

    • Clean numbers, unify currency, dedupe whitespace.

    • Heuristic confidence score from presence/consistency checks (sum(line_totals)≈subtotal≈total−tax).

    • Validate with a JSONSchema-like check (lightweight).

  6. Output

    • Emit JSON per file; optionally write .json next to PDF.

    • Print brief extraction report (fields found, confidence, any warnings).

  7. Extensibility

    • Pluggable field extractors (simple strategy registry).

    • Easy to add locale patterns (VAT/GST labels, currency).

Free, template-agnostic invoice extractor.
Dependencies (all free):
  pip install pdfplumber dateparser rapidfuzz camelot-py[cv] pandas pdf2image pytesseract
OS packages often needed:
  - poppler (pdf2image), ghostscript (camelot lattice), tesseract-ocr
    Ubuntu: sudo apt-get install -y poppler-utils ghostscript tesseract-ocr
Optional but recommended:
  - ocrmypdf  (faster, better OCR for scanned PDFs)

Usage:
  python tools/invoice_extractor.py input.pdf
  python tools/invoice_extractor.py dir/*.pdf --out out_dir/

Why these choices:
  - Starts simple (heuristics), degrades gracefully (OCR fallback), no paid APIs.
  - Works across varied layouts; you can later swap in ML detectors without breaking I/O.
"""
from __future__ import annotations

import argparse
import dataclasses
import io
import json
import math
import os
import re
import shutil
import sys
import tempfile
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple

# Optional libs (import lazily where used)
# pdfplumber is required
import pdfplumber  # type: ignore
import dateparser  # type: ignore

try:
    from rapidfuzz import fuzz, process  # type: ignore
except Exception:
    fuzz = None
    process = None

# ---- Data model ----

@dataclass
class Party:
    name: Optional[str] = None
    address: Optional[str] = None
    raw_block: Optional[str] = None


@dataclass
class LineItem:
    description: Optional[str] = None
    quantity: Optional[float] = None
    unit_price: Optional[float] = None
    line_total: Optional[float] = None


@dataclass
class Invoice:
    invoice_number: Optional[str]
    invoice_date: Optional[str]  # ISO date
    currency: Optional[str]
    seller: Party
    buyer: Party
    subtotal: Optional[float]
    tax: Optional[float]
    total: Optional[float]
    line_items: List[LineItem]
    confidence: float
    warnings: List[str]
    raw_text: str


CURRENCY_SYMBOLS = {
    "$": "USD", "€": "EUR", "£": "GBP", "¥": "JPY", "₹": "INR", "₩": "KRW", "₫": "VND", "₱": "PHP",
    "₪": "ILS", "₺": "TRY", "₽": "RUB", "R$": "BRL", "C$": "CAD", "A$": "AUD", "NZ$": "NZD",
}
CURRENCY_CODES = set(["USD","EUR","GBP","JPY","INR","CAD","AUD","NZD","CHF","CNY","HKD","SGD","SEK","NOK","DKK","PLN","CZK","HUF","TRY","ILS","RUB","ZAR","BRL","MXN","AED","SAR"])

AMOUNT_RE = re.compile(r"(?<!\w)(?:[$€£¥₹]|R\$|C\$|A\$|NZ\$)?\s*[-+]?\d{1,3}(?:,\d{3})*(?:\.\d+)?(?!\w)")
NUM_RE = re.compile(r"[-+]?\d{1,3}(?:,\d{3})*(?:\.\d+)?")
DATE_CAND_RE = re.compile(
    r"\b(?:\d{1,2}[/-]\d{1,2}[/-]\d{2,4}|"
    r"(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Sept|Oct|Nov|Dec)[a-z]*\s+\d{1,2},?\s+\d{2,4}|"
    r"\d{4}[/-]\d{1,2}[/-]\d{1,2})\b",
    re.IGNORECASE,
)
INVNO_RE = re.compile(
    r"(invoice\s*(?:no\.?|number|#)\s*[:\-]?\s*([A-Z0-9\-\/\.]+)|"
    r"inv\s*#\s*[:\-]?\s*([A-Z0-9\-\/\.]+))",
    re.IGNORECASE,
)

# ---- Utilities ----

def _to_float(s: str) -> Optional[float]:
    try:
        s = s.strip()
        s = re.sub(r"[^\d\.\-\,]", "", s)
        if s.count(",") > 0 and s.count(".") == 0:
            s = s.replace(",", "")
        else:
            s = s.replace(",", "")
        return float(s)
    except Exception:
        return None

def _norm_whitespace(s: str) -> str:
    return re.sub(r"[ \t]+", " ", s).strip()

def _which(cmd: str) -> bool:
    return shutil.which(cmd) is not None

def _best_last_amount(lines: List[str], keywords: Iterable[str]) -> Optional[float]:
    pat = re.compile("|".join([re.escape(k) for k in keywords]), re.IGNORECASE)
    winners: List[float] = []
    for line in lines:
        if pat.search(line):
            amts = AMOUNT_RE.findall(line)
            if amts:
                v = _to_float(amts[-1])
                if v is not None:
                    winners.append(v)
    return winners[-1] if winners else None

def _infer_currency(text: str) -> Optional[str]:
    for sym, code in CURRENCY_SYMBOLS.items():
        if sym in text:
            return code
    for code in CURRENCY_CODES:
        if re.search(rf"\b{code}\b", text):
            return code
    return None

# ---- OCR / Text extraction ----

def extract_text_with_fallback(pdf_path: Path) -> Tuple[str, List[str], List[str]]:
    """
    Returns:
      full_text, page_texts, warnings
    """
    warnings: List[str] = []
    page_texts: List[str] = []
    try:
        with pdfplumber.open(str(pdf_path)) as pdf:
            for page in pdf.pages:
                t = page.extract_text() or ""
                page_texts.append(t)
    except Exception as e:
        warnings.append(f"pdfplumber failed: {e}")
        page_texts = []

    # Decide if OCR is needed
    need_ocr = any(len(t.strip()) < 20 for t in page_texts) or not page_texts
    if not need_ocr:
        return ("\n".join(page_texts), page_texts, warnings)

    # Try ocrmypdf first (why: preserves layout & embedded text)
    if _which("ocrmypdf"):
        try:
            with tempfile.TemporaryDirectory() as td:
                out_pdf = Path(td) / "ocr.pdf"
                os.system(f'ocrmypdf --quiet --skip-text "{pdf_path}" "{out_pdf}"')
                with pdfplumber.open(str(out_pdf)) as pdf:
                    page_texts = [(p.extract_text() or "") for p in pdf.pages]
            warnings.append("Used ocrmypdf for OCR.")
            return ("\n".join(page_texts), page_texts, warnings)
        except Exception as e:
            warnings.append(f"ocrmypdf failed: {e}")

    # Fallback to pytesseract
    try:
        from pdf2image import convert_from_path  # type: ignore
        import pytesseract  # type: ignore
        images = convert_from_path(str(pdf_path))
        page_texts = [pytesseract.image_to_string(img) for img in images]
        warnings.append("Used pytesseract OCR fallback.")
        return ("\n".join(page_texts), page_texts, warnings)
    except Exception as e:
        warnings.append(f"OCR fallback failed: {e}")
        # Return whatever we had, even if empty
        return ("\n".join(page_texts), page_texts, warnings)

# ---- Field extractors ----

def extract_invoice_number(text: str) -> Optional[str]:
    m = INVNO_RE.search(text)
    if m:
        for g in m.groups()[1:]:
            if g:
                return g.strip().strip(":.#")
    # Weak fallback: first alnum chunk near "invoice"
    lines = text.splitlines()
    for i, line in enumerate(lines):
        if re.search(r"\binvoice\b", line, re.IGNORECASE):
            after = lines[i : i + 2]
            jtxt = " ".join(after)
            m2 = re.search(r"(#|no\.?|number)[:\s\-]*([A-Z0-9\-\/\.]{3,})", jtxt, re.IGNORECASE)
            if m2:
                return m2.group(2)
    return None

def extract_date(text: str) -> Optional[str]:
    cand = DATE_CAND_RE.findall(text)
    for c in cand:
        parsed = dateparser.parse(c, settings={"DATE_ORDER": "DMY"})
        if parsed:
            try:
                return parsed.date().isoformat()
            except Exception:
                continue
    return None

def _capture_block_after(text: str, label: str, stop_labels: Iterable[str]) -> Optional[str]:
    # Why: parties are often in blocks following a label; stop when we hit another known label.
    lines = [_norm_whitespace(l) for l in text.splitlines()]
    block: List[str] = []
    start = None
    for i, l in enumerate(lines):
        if re.search(rf"\b{label}\b", l, re.IGNORECASE):
            start = i + 1
            break
    if start is None:
        return None
    for l in lines[start:]:
        if any(re.search(rf"\b{sl}\b", l, re.IGNORECASE) for sl in stop_labels):
            break
        if l.strip():
            block.append(l)
        elif block:
            break
    return "\n".join(block).strip() if block else None

def extract_parties(text: str) -> Tuple[Party, Party]:
    stop = ["bill to", "ship to", "invoice to", "sold to", "from", "supplier", "customer", "buyer"]
    buyer_block = (
        _capture_block_after(text, "bill to", stop)
        or _capture_block_after(text, "invoice to", stop)
        or _capture_block_after(text, "sold to", stop)
        or _capture_block_after(text, "ship to", stop)
    )
    seller_block = (
        _capture_block_after(text, "from", stop)
        or _capture_block_after(text, "supplier", stop)
        or _capture_block_after(text, "seller", stop)
    )
    def to_party(block: Optional[str]) -> Party:
        if not block:
            return Party()
        lines = block.splitlines()
        name = lines[0] if lines else None
        addr = "\n".join(lines[1:]) if len(lines) > 1 else None
        return Party(name=name, address=addr, raw_block=block)
    return to_party(seller_block), to_party(buyer_block)

def extract_totals(text: str) -> Tuple[Optional[float], Optional[float], Optional[float]]:
    lines = [_norm_whitespace(l) for l in text.splitlines()]
    total = _best_last_amount(lines, ["grand total", "amount due", "total"])
    subtotal = _best_last_amount(lines, ["subtotal", "sub total"])
    tax = _best_last_amount(lines, ["tax", "vat", "gst"])
    if not total and subtotal and tax:
        total = round(subtotal + tax, 2)
    return subtotal, tax, total

# ---- Line items ----

def _normalize_header_name(h: str) -> str:
    h = h.lower()
    if "qty" in h or "quantity" in h or h.strip() == "q":
        return "quantity"
    if "unit price" in h or "price" in h or "rate" in h:
        return "unit_price"
    if "total" in h or "amount" in h or "line total" in h:
        return "line_total"
    if "description" in h or "item" in h or "details" in h:
        return "description"
    return h

def _camelot_extract(pdf_path: Path) -> List[LineItem]:
    try:
        import camelot  # type: ignore
        tables = []
        # Try both modes; lattice handles ruled tables, stream handles whitespace tables.
        for flavor in ("lattice", "stream"):
            try:
                t = camelot.read_pdf(str(pdf_path), flavor=flavor, pages="all")
                tables.extend([df.df for df in t])
            except Exception:
                continue
        items: List[LineItem] = []
        for df in tables:
            headers = [_normalize_header_name(str(h)) for h in df.iloc[0].tolist()]
            for _, row in df.iloc[1:].iterrows():
                rec: Dict[str, Optional[str]] = {}
                for idx, cell in enumerate(row.tolist()):
                    key = headers[idx] if idx < len(headers) else f"col{idx}"
                    rec[key] = str(cell)
                # Map to LineItem
                desc = rec.get("description")
                qty = _to_float(rec.get("quantity") or "") if rec.get("quantity") else None
                unit = _to_float(rec.get("unit_price") or "") if rec.get("unit_price") else None
                lt = _to_float(rec.get("line_total") or rec.get("amount") or "") if (rec.get("line_total") or rec.get("amount")) else None
                # weak filter: must have desc and at least one numeric
                if (desc and (qty is not None or unit is not None or lt is not None)):
                    items.append(LineItem(description=_norm_whitespace(desc), quantity=qty, unit_price=unit, line_total=lt))
        return items
    except Exception:
        return []

def _regex_lineitem_fallback(text: str) -> List[LineItem]:
    # Why: many invoices have "Description  Qty  Price  Total" lines separated by spaces.
    lines = [_norm_whitespace(l) for l in text.splitlines() if l.strip()]
    # Find likely header line
    header_idx = None
    for i, l in enumerate(lines):
        if re.search(r"description|item", l, re.IGNORECASE) and re.search(r"qty|quantity", l, re.IGNORECASE):
            header_idx = i
            break
    if header_idx is None:
        header_idx = 0
    body = lines[header_idx + 1 :]
    items: List[LineItem] = []
    for l in body:
        nums = NUM_RE.findall(l)
        if len(nums) >= 1 and any(k in l.lower() for k in ["total", "subtotal", "tax", "vat"]):
            break  # reached footer
        if len(nums) == 0:
            continue
        # Heuristic split: description before first number
        m = re.search(NUM_RE, l)
        if not m:
            continue
        desc = l[: m.start()].strip(" -:•")
        rest = l[m.start() :]
        nums = NUM_RE.findall(rest)
        q = _to_float(nums[0]) if len(nums) >= 1 else None
        up = _to_float(nums[1]) if len(nums) >= 2 else None
        lt = _to_float(nums[2]) if len(nums) >= 3 else None
        if desc and (q is not None or up is not None or lt is not None):
            items.append(LineItem(description=desc, quantity=q, unit_price=up, line_total=lt))
    return items

def extract_line_items(pdf_path: Path, text: str) -> List[LineItem]:
    items = _camelot_extract(pdf_path)
    if items:
        return items
    return _regex_lineitem_fallback(text)

# ---- Confidence & validation ----

def compute_confidence(inv: Invoice) -> Tuple[float, List[str]]:
    warnings: List[str] = list(inv.warnings)
    points = 0
    max_points = 8
    points += 1 if inv.invoice_number else 0
    points += 1 if inv.invoice_date else 0
    points += 1 if inv.total else 0
    points += 1 if inv.currency else 0
    points += 1 if inv.seller.name or inv.buyer.name else 0
    points += 1 if len(inv.line_items) > 0 else 0
    # Consistency check
    if inv.subtotal is not None and inv.tax is not None and inv.total is not None:
        if abs((inv.subtotal + inv.tax) - inv.total) < 1.01:
            points += 2
        else:
            warnings.append("Totals do not add up cleanly.")
    elif inv.total is not None:
        points += 1
    conf = round(points / max_points, 2)
    return conf, warnings

# ---- Main pipeline ----

def extract_invoice(pdf_path: Path) -> Invoice:
    full_text, page_texts, warns = extract_text_with_fallback(pdf_path)
    currency = _infer_currency(full_text)
    inv_no = extract_invoice_number(full_text)
    inv_date = extract_date(full_text)
    seller, buyer = extract_parties(full_text)
    subtotal, tax, total = extract_totals(full_text)
    items = extract_line_items(pdf_path, full_text)
    invoice = Invoice(
        invoice_number=inv_no,
        invoice_date=inv_date,
        currency=currency,
        seller=seller,
        buyer=buyer,
        subtotal=subtotal,
        tax=tax,
        total=total,
        line_items=items,
        confidence=0.0,
        warnings=warns,
        raw_text=full_text,
    )
    c, w = compute_confidence(invoice)
    invoice.confidence = c
    invoice.warnings = w
    return invoice

# ---- CLI ----

def _default_out_path(in_path: Path, out_dir: Optional[Path]) -> Path:
    if out_dir:
        out_dir.mkdir(parents=True, exist_ok=True)
        return out_dir / (in_path.stem + ".json")
    return in_path.with_suffix(".json")

def main() -> None:
    parser = argparse.ArgumentParser(description="Free invoice extractor → JSON")
    parser.add_argument("pdfs", nargs="+", help="PDF file(s) or globs")
    parser.add_argument("--out", help="Output directory (optional)")
    parser.add_argument("--print", action="store_true", help="Print JSON to stdout")
    args = parser.parse_args()

    files: List[Path] = []
    for p in args.pdfs:
        if any(ch in p for ch in ["*", "?", "["]):
            files.extend([Path(x) for x in sorted(Path().glob(p))])
        else:
            files.append(Path(p))
    files = [f for f in files if f.exists() and f.is_file()]

    out_dir = Path(args.out) if args.out else None
    if not files:
        print("No input PDFs found.", file=sys.stderr)
        sys.exit(1)

    results: List[Dict[str, Any]] = []
    for f in files:
        try:
            inv = extract_invoice(f)
            data = asdict(inv)
            results.append({"file": str(f), "invoice": data})
            out_path = _default_out_path(f, out_dir)
            with open(out_path, "w", encoding="utf-8") as fo:
                json.dump(data, fo, ensure_ascii=False, indent=2)
            print(f"[OK] {f} → {out_path}  (confidence={inv.confidence})")
            if inv.warnings:
                print("  warnings:", "; ".join(inv.warnings))
            if args.__dict__.get("print", False):
                print(json.dumps(data, ensure_ascii=False, indent=2))
        except Exception as e:
            print(f"[FAIL] {f}: {e}", file=sys.stderr)

if __name__ == "__main__":
    main()

Script generated by TD Ai