Files
2026-06-06 05:21:10 +00:00

2960 lines
104 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
PDF Processing Toolkit — All-in-One CLI
Usage:
python3 pdf.py <command> [args...]
Commands:
env.check [--json] Check environment dependencies
env.fix Auto-install missing dependencies
extract.text <pdf> [-p pages]
extract.table <pdf> [-p pages]
extract.image <pdf> -o <dir>
pages.merge <pdf>... -o <out>
pages.split <pdf> -o <dir>
pages.rotate <pdf> <deg> -o <out> [-p pages]
pages.crop <pdf> <box> -o <out> [-p pages]
meta.get <pdf>
meta.set <pdf> -o <out> -d <json>
meta.brand <pdf>... [-o <out>] [-t title] [-q]
form.info <pdf>
form.fill <pdf> -o <out> -d <json>
form.detail <pdf> <output.json>
form.fill-legacy <pdf> <fields.json> <output.pdf>
form.annotate <pdf> <fields.json> <output.pdf>
form.render <pdf> <output_dir> [--max-dim N]
form.validate <page> <fields.json> <input_img> <output_img>
form.check-bbox <fields.json>
pages.clean <pdf> -o <out> Remove blank pages
convert.office <file> [-o <out>]
convert.html <file> [-o <out>] [--css <file>]
convert.blueprint <llm_response.txt> [-o <out>]
convert.latex <file> [--runs N] [--keep-logs]
palette.generate [--title "..."] [--mode minimal] [--format python|json|css]
palette.cascade [--title "..."] [--mode minimal] [--format summary|json|css|reportlab]
code.sanitize <file>
content.sanitize <file> [--apply] Fix content issues (CJK, encoding)
font.check <pdf>
toc.check <pdf>
"""
from __future__ import annotations
import html
import json
import os
import re
import shutil
import subprocess
import sys
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Set, Tuple
# ═══════════════════════════════════════════════════════════════
# Section 0: Framework — Output, @cmd registry, CLI parser
# ═══════════════════════════════════════════════════════════════
class Output:
"""Structured JSON output for all subcommands."""
@staticmethod
def success(data: dict):
payload = {"status": "success", "data": data}
sys.stdout.write(json.dumps(payload, ensure_ascii=False, indent=2) + "\n")
raise SystemExit(0)
@staticmethod
def error(error: str, message: str, hint: Optional[str] = None, code: int = 1):
payload = {"status": "error", "error": error, "message": message}
if hint is not None:
payload["hint"] = hint
sys.stderr.write(json.dumps(payload, ensure_ascii=False, indent=2) + "\n")
raise SystemExit(code)
@staticmethod
def check_file(filepath: str) -> Path:
target = Path(filepath)
if not target.exists():
Output.error("FileNotFound", f"File not found: {filepath}", code=2)
return target
# Command registry
_COMMANDS: Dict[str, Callable] = {}
def cmd(name: str):
"""Decorator to register a CLI command under a dotted namespace."""
def decorator(fn: Callable) -> Callable:
_COMMANDS[name] = fn
return fn
return decorator
def _pop_flag(argv: list, short: str, long: str, needs_value: bool = True):
"""Extract a flag (and optional value) from *argv* in-place."""
for idx, tok in enumerate(argv):
if tok in (short, long):
argv.pop(idx)
if needs_value:
if idx < len(argv):
return argv.pop(idx)
Output.error("MissingArg", f"Flag {long} requires a value")
return True
return None
def _load_json_arg(argv: list) -> dict:
"""Read JSON from -d/--data string or -f/--file path."""
raw = _pop_flag(argv, "-d", "--data")
if raw is not None:
try:
return json.loads(raw)
except json.JSONDecodeError as exc:
Output.error("InvalidJSON", f"JSON parse error: {exc}")
fpath = _pop_flag(argv, "-f", "--file")
if fpath is not None:
try:
with open(fpath) as fh:
return json.load(fh)
except Exception as exc:
Output.error("FileError", f"Failed to read file: {exc}")
Output.error("MissingData", "Requires --data or --file argument")
def _resolve_page_indices(range_spec: Optional[str], page_count: int) -> List[int]:
"""Turn a human-friendly range string (1-indexed) into a sorted list of 0-based indices."""
if not range_spec:
return list(range(page_count))
indices: Set[int] = set()
for segment in range_spec.split(","):
segment = segment.strip()
if "-" in segment:
lo, hi = segment.split("-", 1)
for i in range(int(lo) - 1, min(int(hi), page_count)):
indices.add(i)
else:
val = int(segment) - 1
if 0 <= val < page_count:
indices.add(val)
return sorted(indices)
_SCRIPT_DIR = Path(__file__).resolve().parent
# ═══════════════════════════════════════════════════════════════
# Section 1: env — environment diagnostics and auto-fix
# ═══════════════════════════════════════════════════════════════
def _probe_cmd(name: str, version_args: Optional[List[str]] = None) -> Tuple[str, str]:
"""Check if a command exists and optionally get its version. Returns (status, detail)."""
path = shutil.which(name)
if path is None:
return ("missing", "")
if version_args is None:
return ("ok", "")
try:
result = subprocess.run(
[path] + version_args,
capture_output=True, text=True, timeout=10
)
ver = result.stdout.strip() or result.stderr.strip()
return ("ok", ver)
except Exception:
return ("ok", "")
def _probe_python_module(mod_name: str) -> Tuple[str, str]:
"""Check if a Python module is importable and get its version."""
try:
result = subprocess.run(
[sys.executable, "-c", f"import {mod_name}; print(getattr({mod_name}, '__version__', 'installed'))"],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
return ("ok", result.stdout.strip())
return ("missing", "")
except Exception:
return ("missing", "")
def _probe_node() -> Tuple[str, str]:
s, d = _probe_cmd("node", ["--version"])
if s == "ok" and d:
d = d.lstrip("v")
return (s, d)
def _probe_python() -> Tuple[str, str]:
try:
import platform
return ("ok", platform.python_version())
except Exception:
return ("ok", "")
def _probe_libreoffice() -> Tuple[str, str]:
candidates = [
"/Applications/LibreOffice.app/Contents/MacOS/soffice",
os.path.expanduser("~/Applications/LibreOffice.app/Contents/MacOS/soffice"),
"/usr/bin/soffice",
"/usr/local/bin/soffice",
"/usr/lib/libreoffice/program/soffice",
"/opt/libreoffice/program/soffice",
"/snap/bin/libreoffice.soffice",
]
for c in candidates:
if Path(c).is_file():
return ("ok", "")
for alias in ("soffice", "libreoffice"):
if shutil.which(alias):
return ("ok", "")
return ("missing", "")
def _probe_tectonic() -> Tuple[str, str]:
home_bin = Path.home() / "tectonic"
if home_bin.exists() and os.access(home_bin, os.X_OK):
return ("ok", "")
tec_local = _SCRIPT_DIR / "tectonic"
if tec_local.exists() and os.access(tec_local, os.X_OK):
return ("ok", "")
if shutil.which("tectonic"):
return ("ok", "")
return ("missing", "")
def _probe_playwright_npm() -> Tuple[str, str]:
"""Check if playwright npm package is installed."""
try:
result = subprocess.run(
["node", "-e", "console.log(require('playwright/package.json').version)"],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0 and result.stdout.strip():
return ("ok", result.stdout.strip())
except Exception:
pass
# Try global
try:
result = subprocess.run(
["npm", "list", "-g", "playwright", "--depth=0"],
capture_output=True, text=True, timeout=15
)
import re as _re
m = _re.search(r"playwright@(\S+)", result.stdout)
if m:
return ("ok", m.group(1))
except Exception:
pass
return ("missing", "")
def _probe_chromium() -> Tuple[str, str]:
"""Check if Playwright Chromium browser is installed."""
import platform as _platform
home = Path.home()
if _platform.system() == "Darwin":
cache_dir = home / "Library" / "Caches" / "ms-playwright"
else:
cache_dir = home / ".cache" / "ms-playwright"
if cache_dir.is_dir():
for entry in sorted(cache_dir.iterdir(), reverse=True):
if "chromium" in entry.name.lower():
return ("ok", entry.name)
return ("missing", "")
@cmd("env.check")
def env_check(argv: list):
"""Check environment dependencies."""
use_json = _pop_flag(argv, "-j", "--json", needs_value=False)
s_node = _probe_node()
s_pw = _probe_playwright_npm()
s_cr = _probe_chromium()
s_py = _probe_python()
s_pike = _probe_python_module("pikepdf")
s_plumb = _probe_python_module("pdfplumber")
s_lo = _probe_libreoffice()
s_tec = _probe_tectonic()
s_pw_py = _probe_python_module("playwright")
if use_json:
report = {
"html_route": {
"node": s_node[0], "node_version": s_node[1],
"playwright": s_pw[0], "playwright_version": s_pw[1],
"chromium": s_cr[0], "chromium_detail": s_cr[1],
},
"process_route": {
"python3": s_py[0], "python3_version": s_py[1],
"pikepdf": s_pike[0], "pikepdf_version": s_pike[1],
"pdfplumber": s_plumb[0], "pdfplumber_version": s_plumb[1],
"playwright_python": s_pw_py[0], "playwright_python_version": s_pw_py[1],
},
"optional": {
"libreoffice": s_lo[0],
"tectonic": s_tec[0],
},
}
sys.stdout.write(json.dumps(report, ensure_ascii=False, indent=2) + "\n")
# Determine exit code
rc = 0
for v in [s_node, s_pw, s_cr, s_py, s_pike, s_plumb]:
if v[0] != "ok":
rc = 2
break
raise SystemExit(rc)
# Human-readable output
rc = 0
def show(name: str, status: Tuple[str, str], optional: bool = False):
nonlocal rc
s, d = status
if s == "ok":
detail = f" ({d})" if d else ""
print(f" \u2713 {name}{detail}")
elif optional:
print(f" \u25cb {name} (optional, not installed)")
else:
print(f" \u2717 {name} (missing)")
rc = 2
print("=== PDF Skill Environment ===\n")
print("--- HTML Route ---")
show("node", s_node)
show("playwright", s_pw)
show("chromium", s_cr)
print("\n--- Process Route ---")
show("python3", s_py)
show("pikepdf", s_pike)
show("pdfplumber", s_plumb)
if s_pw_py[0] == "ok":
print(f" (playwright-python: {s_pw_py[1]})")
print("\n--- Optional ---")
show("libreoffice", s_lo, optional=True)
show("tectonic", s_tec, optional=True)
print("\n=== Install Commands ===")
print(" Node.js: brew install node (macOS) / apt install nodejs (Ubuntu)")
print(" Playwright: npm install -g playwright && npx playwright install chromium")
print(" Python: brew install python3 (macOS) / apt install python3 (Ubuntu)")
print(" pikepdf: pip install pikepdf pdfplumber --user")
print(" LibreOffice: brew install --cask libreoffice (macOS)")
print(" Tectonic: curl -fsSL https://drop-sh.fullyjustified.net | sh")
raise SystemExit(rc)
@cmd("env.fix")
def env_fix(argv: list):
"""Auto-install missing Python dependencies."""
modules = {
"pikepdf": "pikepdf",
"pdfplumber": "pdfplumber",
"pypdf": "pypdf",
"pdf2image": "pdf2image",
"PIL": "Pillow",
}
installed = []
for mod, pkg in modules.items():
s, _ = _probe_python_module(mod)
if s == "missing":
print(f"Installing {pkg}...")
for attempt in (
[sys.executable, "-m", "pip", "install", "-q", pkg],
[sys.executable, "-m", "pip", "install", "-q", "--user", pkg],
[sys.executable, "-m", "pip", "install", "-q", "--break-system-packages", pkg],
):
result = subprocess.run(attempt, capture_output=True, text=True)
if result.returncode == 0:
installed.append(pkg)
break
else:
print(f" Failed to install {pkg}")
if installed:
print(f"\nInstalled: {', '.join(installed)}")
else:
print("All Python dependencies are already installed.")
raise SystemExit(0)
# ═══════════════════════════════════════════════════════════════
# Section 2: extract — text, tables, and embedded images
# ═══════════════════════════════════════════════════════════════
@cmd("extract.text")
def extract_text(argv: list):
"""Pull plain text from selected pages."""
if not argv:
Output.error("MissingArg", "pdf path required")
pdf_path = argv.pop(0)
page_range = _pop_flag(argv, "-p", "--pages")
import pdfplumber
src = Output.check_file(pdf_path)
try:
doc = pdfplumber.open(src)
except Exception as exc:
Output.error("PDFError", f"Cannot open PDF: {exc}", code=3)
total_pages = len(doc.pages)
target_pages = _resolve_page_indices(page_range, total_pages)
char_total = 0
page_results = []
for pg_idx in target_pages:
content = doc.pages[pg_idx].extract_text() or ""
char_total += len(content)
page_results.append({"page": pg_idx + 1, "chars": len(content), "text": content})
doc.close()
Output.success({
"total_pages": total_pages,
"extracted_pages": len(target_pages),
"total_chars": char_total,
"pages": page_results,
})
@cmd("extract.table")
def extract_table(argv: list):
"""Locate and return every table on selected pages."""
if not argv:
Output.error("MissingArg", "pdf path required")
pdf_path = argv.pop(0)
page_range = _pop_flag(argv, "-p", "--pages")
import pdfplumber
src = Output.check_file(pdf_path)
try:
doc = pdfplumber.open(src)
except Exception as exc:
Output.error("PDFError", f"Cannot open PDF: {exc}", code=3)
total_pages = len(doc.pages)
target_pages = _resolve_page_indices(page_range, total_pages)
collected = []
for pg_idx in target_pages:
for tbl_num, raw_table in enumerate(doc.pages[pg_idx].extract_tables()):
if not raw_table:
continue
sanitised = [
[(cell.strip() if cell else "") for cell in row]
for row in raw_table
]
collected.append({
"page": pg_idx + 1,
"table_index": tbl_num,
"rows": len(sanitised),
"cols": len(sanitised[0]) if sanitised else 0,
"data": sanitised,
})
doc.close()
Output.success({
"total_pages": total_pages,
"extracted_pages": len(target_pages),
"total_tables": len(collected),
"tables": collected,
})
@cmd("extract.image")
def extract_image(argv: list):
"""Save every embedded raster image to output dir."""
if not argv:
Output.error("MissingArg", "pdf path required")
pdf_path = argv.pop(0)
out_dir = _pop_flag(argv, "-o", "--output") or "."
import pikepdf
src = Output.check_file(pdf_path)
dest = Path(out_dir)
dest.mkdir(parents=True, exist_ok=True)
try:
doc = pikepdf.open(src)
except Exception as exc:
Output.error("PDFError", f"Cannot open PDF: {exc}", code=3)
saved = []
seq = 0
_EXT_MAP = {
"/DCTDecode": "jpg",
"/FlateDecode": "png",
"/JPXDecode": "jp2",
}
for page_no, pg in enumerate(doc.pages, 1):
res = pg.get("/Resources")
if res is None or "/XObject" not in res:
continue
for key, ref in res.XObject.items():
try:
img_obj = doc.get_object(ref.objgen)
if img_obj.get("/Subtype") != "/Image":
continue
seq += 1
w = int(img_obj.get("/Width", 0))
h = int(img_obj.get("/Height", 0))
filt = img_obj.get("/Filter")
ext = _EXT_MAP.get(str(filt) if filt else None, "bin")
fname = f"page{page_no}_img{seq}.{ext}"
out_file = dest / fname
out_file.write_bytes(img_obj.read_raw_bytes())
saved.append({
"page": page_no, "name": str(key), "file": str(out_file),
"width": w, "height": h, "format": ext,
})
except Exception:
continue
doc.close()
Output.success({"output_dir": str(dest), "total_images": len(saved), "images": saved})
# ═══════════════════════════════════════════════════════════════
# Section 3: pages — merge, split, rotate, crop
# ═══════════════════════════════════════════════════════════════
@cmd("pages.merge")
def pages_merge(argv: list):
"""Concatenate several PDF files into one."""
out_path = _pop_flag(argv, "-o", "--output")
if out_path is None:
Output.error("MissingArg", "--output is required")
if not argv:
Output.error("MissingArg", "At least one PDF required")
import pikepdf
sources = [Output.check_file(p) for p in argv]
handles = []
try:
combined = pikepdf.new()
descriptions = []
for src in sources:
handle = pikepdf.open(src)
handles.append(handle)
n = len(handle.pages)
descriptions.append(f"{src.name} ({n} pages)")
for pg in handle.pages:
combined.pages.append(pg)
total = len(combined.pages)
combined.save(out_path)
combined.close()
except Exception as exc:
Output.error("MergeError", f"Merge failed: {exc}", code=4)
finally:
for h in handles:
try:
h.close()
except Exception:
pass
Output.success({"output": out_path, "total_pages": total, "sources": descriptions})
@cmd("pages.split")
def pages_split(argv: list):
"""Write each page as a separate single-page PDF."""
if not argv:
Output.error("MissingArg", "pdf path required")
pdf_path = argv.pop(0)
out_dir = _pop_flag(argv, "-o", "--output") or "."
import pikepdf
src = Output.check_file(pdf_path)
dest = Path(out_dir)
dest.mkdir(parents=True, exist_ok=True)
try:
doc = pikepdf.open(src)
except Exception as exc:
Output.error("PDFError", f"Cannot open PDF: {exc}", code=3)
generated = []
base = src.stem
try:
for idx, pg in enumerate(doc.pages, 1):
fp = dest / f"{base}_page{idx:03d}.pdf"
single_doc = pikepdf.new()
single_doc.pages.append(pg)
single_doc.save(fp)
single_doc.close()
generated.append(str(fp))
doc.close()
except Exception as exc:
Output.error("SplitError", f"Split failed: {exc}", code=4)
Output.success({"output_dir": str(dest), "total_pages": len(generated), "files": generated})
@cmd("pages.rotate")
def pages_rotate(argv: list):
"""Rotate selected pages by 90/180/270 degrees."""
if len(argv) < 2:
Output.error("MissingArg", "pdf path and degrees required")
pdf_path = argv.pop(0)
degrees = int(argv.pop(0))
out_path = _pop_flag(argv, "-o", "--output")
if out_path is None:
Output.error("MissingArg", "--output is required")
page_range = _pop_flag(argv, "-p", "--pages")
if degrees not in (90, 180, 270):
Output.error("InvalidDegrees", "Rotation angle must be 90, 180, or 270")
import pikepdf
src = Output.check_file(pdf_path)
try:
doc = pikepdf.open(src)
except Exception as exc:
Output.error("PDFError", f"Cannot open PDF: {exc}", code=3)
targets = _resolve_page_indices(page_range, len(doc.pages))
try:
for i in targets:
existing = int(doc.pages[i].get("/Rotate", 0))
doc.pages[i]["/Rotate"] = (existing + degrees) % 360
doc.save(out_path)
doc.close()
except Exception as exc:
Output.error("RotateError", f"Rotation failed: {exc}", code=4)
Output.success({"output": out_path, "degrees": degrees, "pages_rotated": len(targets)})
@cmd("pages.crop")
def pages_crop(argv: list):
"""Set the media/crop box on selected pages. box = 'left,bottom,right,top' in pt."""
if len(argv) < 2:
Output.error("MissingArg", "pdf path and crop box required")
pdf_path = argv.pop(0)
box_str = argv.pop(0)
out_path = _pop_flag(argv, "-o", "--output")
if out_path is None:
Output.error("MissingArg", "--output is required")
page_range = _pop_flag(argv, "-p", "--pages")
try:
coords = [float(v.strip()) for v in box_str.split(",")]
assert len(coords) == 4
left, bottom, right, top = coords
except Exception:
Output.error("InvalidBox", "Invalid crop box format, should be: left,bottom,right,top",
hint="Example: 50,50,550,750")
import pikepdf
src = Output.check_file(pdf_path)
try:
doc = pikepdf.open(src)
except Exception as exc:
Output.error("PDFError", f"Cannot open PDF: {exc}", code=3)
targets = _resolve_page_indices(page_range, len(doc.pages))
try:
arr = pikepdf.Array([left, bottom, right, top])
for i in targets:
doc.pages[i].mediabox = arr
doc.pages[i].cropbox = arr
doc.save(out_path)
doc.close()
except Exception as exc:
Output.error("CropError", f"Crop failed: {exc}", code=4)
Output.success({
"output": out_path,
"box": {"left": left, "bottom": bottom, "right": right, "top": top},
"pages_cropped": len(targets),
})
@cmd("pages.clean")
def pages_clean(argv: list):
"""Remove truly blank pages from a PDF.
A page is considered blank ONLY if it has exactly 0 text characters AND
0 images. Pages with even a single character or a tiny image are kept.
Usage:
pdf.py pages.clean input.pdf -o output.pdf
"""
if not argv:
Output.error("MissingArg", "PDF path required")
pdf_path = argv.pop(0)
out_path = _pop_flag(argv, "-o", "--output")
if out_path is None:
Output.error("MissingArg", "--output is required")
# --threshold is accepted but ignored (kept for backward compat)
_pop_flag(argv, "-t", "--threshold")
src = Output.check_file(pdf_path)
# Phase 1: Detect blank pages using pdfplumber (text extraction)
try:
import pdfplumber
except ImportError:
Output.error("DependencyMissing", "pdfplumber required: pip install pdfplumber")
blank_indices = [] # 0-indexed
total_pages = 0
try:
pdf = pdfplumber.open(str(src))
total_pages = len(pdf.pages)
for i, page in enumerate(pdf.pages):
text = (page.extract_text() or "").strip()
# Check for ANY images on the page (no size filter)
images = page.images if hasattr(page, 'images') else []
# A page is blank ONLY if it has exactly 0 characters AND 0 images
if len(text) == 0 and len(images) == 0:
blank_indices.append(i)
pdf.close()
except Exception as exc:
Output.error("PDFError", f"Cannot analyze PDF: {exc}", code=3)
if not blank_indices:
Output.success({
"output": str(src),
"total_pages": total_pages,
"blank_pages_removed": 0,
"message": "No blank pages found",
})
return
# Phase 2: Remove blank pages using pikepdf
try:
import pikepdf
except ImportError:
Output.error("DependencyMissing", "pikepdf required: pip install pikepdf")
try:
doc = pikepdf.open(str(src))
# Remove in reverse order to preserve indices
for idx in sorted(blank_indices, reverse=True):
if idx < len(doc.pages):
del doc.pages[idx]
doc.save(out_path)
doc.close()
except Exception as exc:
Output.error("PDFError", f"Cannot remove blank pages: {exc}", code=4)
remaining = total_pages - len(blank_indices)
Output.success({
"output": out_path,
"total_pages": total_pages,
"blank_pages_removed": len(blank_indices),
"blank_page_numbers": [i + 1 for i in blank_indices], # 1-indexed for humans
"remaining_pages": remaining,
})
# ═══════════════════════════════════════════════════════════════
# Section 4: meta — metadata reading, writing, and branding
# ═══════════════════════════════════════════════════════════════
_XMP_MAPPING = {
"Title": "dc:title",
"Author": "dc:creator",
"Subject": "dc:description",
"Keywords": "pdf:Keywords",
"Creator": "xmp:CreatorTool",
"Producer": "pdf:Producer",
}
_ACCEPTED_KEYS = set(_XMP_MAPPING.keys())
@cmd("meta.get")
def meta_get(argv: list):
"""Read document information and metadata."""
if not argv:
Output.error("MissingArg", "pdf path required")
import pikepdf
src = Output.check_file(argv[0])
try:
doc = pikepdf.open(src)
except Exception as exc:
Output.error("PDFError", f"Cannot open PDF: {exc}", code=3)
record: dict = {
"pages": len(doc.pages),
"pdf_version": str(doc.pdf_version),
}
if doc.pages:
mb = doc.pages[0].mediabox
record["page_size"] = {
"width": float(mb[2] - mb[0]),
"height": float(mb[3] - mb[1]),
"unit": "pt",
}
kv_pairs = {}
if doc.docinfo:
for k in doc.docinfo.keys():
try:
kv_pairs[str(k).lstrip("/")] = str(doc.docinfo[k])
except Exception:
pass
record["metadata"] = kv_pairs
record["encrypted"] = doc.is_encrypted
record["has_form"] = "/AcroForm" in doc.Root
record["has_outlines"] = "/Outlines" in doc.Root
doc.close()
Output.success(record)
@cmd("meta.set")
def meta_set(argv: list):
"""Update XMP + legacy docinfo metadata fields."""
if not argv:
Output.error("MissingArg", "pdf path required")
pdf_path = argv.pop(0)
out_path = _pop_flag(argv, "-o", "--output")
if out_path is None:
Output.error("MissingArg", "--output is required")
data = _load_json_arg(argv)
import pikepdf
src = Output.check_file(pdf_path)
try:
doc = pikepdf.open(src)
except Exception as exc:
Output.error("PDFError", f"Cannot open PDF: {exc}", code=3)
# XMP layer
with doc.open_metadata() as xmp:
for raw_key, raw_val in data.items():
norm = raw_key.title()
xmp_key = _XMP_MAPPING.get(norm)
if xmp_key is None:
continue
try:
xmp[xmp_key] = str(raw_val)
except Exception:
pass
# Legacy docinfo layer
if not doc.docinfo:
doc.docinfo = pikepdf.Dictionary()
for raw_key, raw_val in data.items():
norm = raw_key.title()
if norm in _ACCEPTED_KEYS:
doc.docinfo[pikepdf.Name(f"/{norm}")] = pikepdf.String(str(raw_val))
doc.docinfo[pikepdf.Name("/ModDate")] = pikepdf.String(
datetime.now().strftime("D:%Y%m%d%H%M%S")
)
try:
doc.save(out_path)
doc.close()
except Exception as exc:
Output.error("SaveError", f"Save failed: {exc}", code=4)
Output.success({"output": out_path, "updated_fields": list(data.keys())})
@cmd("meta.brand")
def meta_brand(argv: list):
"""Add Z.ai branding metadata to PDF documents."""
output_path = _pop_flag(argv, "-o", "--output")
custom_title = _pop_flag(argv, "-t", "--title")
quiet = _pop_flag(argv, "-q", "--quiet", needs_value=False)
if not argv:
Output.error("MissingArg", "At least one PDF file required")
# Check if output is specified for multiple files
if output_path and len(argv) > 1:
Output.error("InvalidArg", "--output can only be used with a single input file")
from pypdf import PdfReader, PdfWriter
for input_path in argv:
if not os.path.exists(input_path):
print(f"Error: Input file not found: {input_path}", file=sys.stderr)
continue
try:
reader = PdfReader(input_path)
except Exception as e:
print(f"Error: Cannot open PDF: {e}", file=sys.stderr)
continue
writer = PdfWriter()
for page in reader.pages:
writer.add_page(page)
# Determine title
if custom_title:
title = custom_title
else:
original_meta = reader.metadata
if original_meta and original_meta.title and original_meta.title not in ('(anonymous)', 'unspecified', None):
title = original_meta.title
else:
title = os.path.splitext(os.path.basename(input_path))[0]
writer.add_metadata({
'/Title': title,
'/Author': 'Z.ai',
'/Creator': 'Z.ai',
'/Producer': 'http://z.ai',
})
# Write output
out = output_path if (len(argv) == 1 and output_path) else input_path
try:
with open(out, "wb") as f:
writer.write(f)
except Exception as e:
print(f"Error: Cannot write output file: {e}", file=sys.stderr)
continue
if not quiet:
print(f"\u2713 Updated metadata for: {os.path.basename(input_path)}")
print(f" Title: {title}")
print(f" Author: Z.ai")
print(f" Creator: Z.ai")
print(f" Producer: http://z.ai")
if out != input_path:
print(f" Output: {out}")
raise SystemExit(0)
# ═══════════════════════════════════════════════════════════════
# Section 5: form — inspection, filling, annotation, rendering
# ═══════════════════════════════════════════════════════════════
# --- form.info (pikepdf-based) ---
_FIELD_TYPE_MAP = {
"/Tx": "text",
"/Sig": "signature",
}
def _classify_field(node) -> str:
"""Map a PDF field type token to a human label."""
ft = str(node.get("/FT", ""))
if ft in _FIELD_TYPE_MAP:
return _FIELD_TYPE_MAP[ft]
flags = int(node.get("/Ff", 0))
if ft == "/Btn":
return "radio" if (flags & (1 << 15)) else "checkbox"
if ft == "/Ch":
return "dropdown" if (flags & (1 << 17)) else "listbox"
return "unknown"
def _extra_props(node, kind: str) -> dict:
"""Gather type-specific metadata (options, checked value, etc.)."""
props: dict = {}
if kind == "checkbox":
ap = node.get("/AP")
if ap and "/N" in ap:
states = [str(s) for s in ap["/N"].keys()]
props["states"] = states
props["checked_value"] = next((s for s in states if s != "/Off"), states[0] if states else None)
elif kind in ("dropdown", "listbox"):
raw_opts = node.get("/Opt")
if raw_opts:
props["options"] = [
{"value": str(item[0]), "label": str(item[1])} if isinstance(item, list) and len(item) >= 2
else {"value": str(item), "label": str(item)}
for item in raw_opts
]
elif kind == "radio":
kids = node.get("/Kids")
if kids:
radio_vals = []
for child in kids:
ap = child.get("/AP")
if ap and "/N" in ap:
radio_vals.extend(str(k) for k in ap["/N"].keys() if str(k) != "/Off")
if radio_vals:
props["options"] = radio_vals
return props
def _current_value(node):
v = node.get("/V")
return str(v) if v is not None else None
def _gather_fields(doc) -> list:
"""Walk the AcroForm field tree iteratively and return a flat list."""
if "/AcroForm" not in doc.Root:
return []
acro = doc.Root.AcroForm
if "/Fields" not in acro:
return []
page_lookup = {pg.objgen: idx for idx, pg in enumerate(doc.pages)}
results = []
stack = [(field, "") for field in reversed(list(acro.Fields))]
while stack:
node, parent_path = stack.pop()
name = str(node.get("/T", ""))
full = f"{parent_path}.{name}" if parent_path else name
kids = node.get("/Kids")
if kids and any("/T" in k for k in kids):
for kid in reversed(list(kids)):
stack.append((kid, full))
continue
kind = _classify_field(node)
if kind == "unknown":
continue
entry = {"id": full, "type": kind}
val = _current_value(node)
if val:
entry["current_value"] = val
entry.update(_extra_props(node, kind))
page_ref = node.get("/P")
if page_ref and hasattr(page_ref, "objgen"):
pg_num = page_lookup.get(page_ref.objgen)
if pg_num is not None:
entry["page"] = pg_num + 1
results.append(entry)
return results
@cmd("form.info")
def form_info(argv: list):
"""Return structured JSON describing every form field (pikepdf + check_fillable)."""
if not argv:
Output.error("MissingArg", "pdf path required")
import pikepdf
src = Output.check_file(argv[0])
try:
doc = pikepdf.open(src)
except Exception as exc:
Output.error("PDFError", f"Cannot open PDF: {exc}", code=3)
fields = _gather_fields(doc)
if not fields:
Output.success({"has_fields": False, "count": 0, "fields": [], "hint": "This PDF has no fillable form fields"})
Output.success({"has_fields": True, "count": len(fields), "fields": fields})
@cmd("form.fill")
def form_fill(argv: list):
"""Write values into a fillable PDF (pikepdf version)."""
if not argv:
Output.error("MissingArg", "pdf path required")
pdf_path = argv.pop(0)
out_path = _pop_flag(argv, "-o", "--output")
if out_path is None:
Output.error("MissingArg", "--output is required")
data = _load_json_arg(argv)
import pikepdf
src = Output.check_file(pdf_path)
try:
doc = pikepdf.open(src)
except Exception as exc:
Output.error("PDFError", f"Cannot open PDF: {exc}", code=3)
if "/AcroForm" not in doc.Root or "/Fields" not in doc.Root.AcroForm:
Output.error("NoForm", "This PDF has no form fields")
known = {f["id"]: f for f in _gather_fields(doc)}
# Validation
issues = []
for fid, fval in data.items():
if fid not in known:
issues.append(f"Field not found: {fid}")
continue
fmeta = known[fid]
ftype = fmeta["type"]
if ftype == "checkbox" and "states" in fmeta:
ok_vals = fmeta["states"]
if fval not in ok_vals and f"/{fval}" not in ok_vals and fval not in ("true", "True", "false", "False", "1", "0"):
issues.append(f"Invalid value for field {fid}, options: {ok_vals} or true/false")
if ftype in ("dropdown", "listbox") and "options" in fmeta:
ok_vals = [o["value"] for o in fmeta["options"]]
if fval not in ok_vals:
issues.append(f"Invalid value for field {fid}, options: {ok_vals}")
if issues:
Output.error("ValidationError", "Field validation failed", hint="; ".join(issues))
# Fill
written = 0
def _apply(node, parent_path=""):
nonlocal written
name = str(node.get("/T", ""))
full = f"{parent_path}.{name}" if parent_path else name
kids = node.get("/Kids")
if kids and any("/T" in k for k in kids):
for kid in kids:
_apply(kid, full)
return
if full not in data:
return
val = data[full]
kind = _classify_field(node)
if kind == "checkbox":
if val in ("true", "True", "1", True):
ap = node.get("/AP")
if ap and "/N" in ap:
checked_name = next((str(k) for k in ap["/N"].keys() if str(k) != "/Off"), "/Yes")
if not checked_name.startswith("/"):
checked_name = f"/{checked_name}"
node["/V"] = pikepdf.Name(checked_name)
node["/AS"] = pikepdf.Name(checked_name)
else:
node["/V"] = pikepdf.Name("/Off")
node["/AS"] = pikepdf.Name("/Off")
else:
node["/V"] = pikepdf.String(str(val))
written += 1
for field in doc.Root.AcroForm.Fields:
_apply(field)
acro = doc.Root.AcroForm
if "/NeedAppearances" not in acro:
acro["/NeedAppearances"] = True
try:
doc.save(out_path)
except Exception as exc:
Output.error("SaveError", f"Save failed: {exc}", code=4)
Output.success({"output": out_path, "fields_filled": written, "fields_requested": len(data)})
# --- form.detail (pypdf-based detailed field extraction) ---
def _get_full_annotation_field_id(annotation):
"""Build dotted field ID by walking parent chain."""
components = []
while annotation:
field_name = annotation.get('/T')
if field_name:
components.append(field_name)
annotation = annotation.get('/Parent')
return ".".join(reversed(components)) if components else None
def _make_field_dict(field, field_id):
field_dict = {"field_id": field_id}
ft = field.get('/FT')
if ft == "/Tx":
field_dict["type"] = "text"
elif ft == "/Btn":
field_dict["type"] = "checkbox"
states = field.get("/_States_", [])
if len(states) == 2:
if "/Off" in states:
field_dict["checked_value"] = states[0] if states[0] != "/Off" else states[1]
field_dict["unchecked_value"] = "/Off"
else:
print(f"Unexpected state values for checkbox `${field_id}`. Its checked and unchecked values may not be correct; if you're trying to check it, visually verify the results.")
field_dict["checked_value"] = states[0]
field_dict["unchecked_value"] = states[1]
elif ft == "/Ch":
field_dict["type"] = "choice"
states = field.get("/_States_", [])
field_dict["choice_options"] = [{
"value": state[0],
"text": state[1],
} for state in states]
else:
field_dict["type"] = f"unknown ({ft})"
return field_dict
def _get_field_info(reader) -> list:
"""Extract detailed field info from a PdfReader, including radio group aggregation."""
fields = reader.get_fields()
field_info_by_id = {}
possible_radio_names: Set[str] = set()
for field_id, field in fields.items():
if field.get("/Kids"):
if field.get("/FT") == "/Btn":
possible_radio_names.add(field_id)
continue
field_info_by_id[field_id] = _make_field_dict(field, field_id)
radio_fields_by_id: Dict[str, dict] = {}
for page_index, page in enumerate(reader.pages):
annotations = page.get('/Annots', [])
for ann in annotations:
field_id = _get_full_annotation_field_id(ann)
if field_id in field_info_by_id:
field_info_by_id[field_id]["page"] = page_index + 1
field_info_by_id[field_id]["rect"] = ann.get('/Rect')
elif field_id in possible_radio_names:
try:
on_values = [v for v in ann["/AP"]["/N"] if v != "/Off"]
except KeyError:
continue
if len(on_values) == 1:
rect = ann.get("/Rect")
if field_id not in radio_fields_by_id:
radio_fields_by_id[field_id] = {
"field_id": field_id,
"type": "radio_group",
"page": page_index + 1,
"radio_options": [],
}
radio_fields_by_id[field_id]["radio_options"].append({
"value": on_values[0],
"rect": rect,
})
# Filter fields without location
fields_with_location = []
for field_info in field_info_by_id.values():
if "page" in field_info:
fields_with_location.append(field_info)
else:
print(f"Unable to determine location for field id: {field_info.get('field_id')}, ignoring")
# Sort by page number, then Y position (flipped), then X
def sort_key(f):
if "radio_options" in f:
rect = f["radio_options"][0]["rect"] or [0, 0, 0, 0]
else:
rect = f.get("rect") or [0, 0, 0, 0]
adjusted_position = [-rect[1], rect[0]]
return [f.get("page"), adjusted_position]
sorted_fields = fields_with_location + list(radio_fields_by_id.values())
sorted_fields.sort(key=sort_key)
return sorted_fields
@cmd("form.detail")
def form_detail(argv: list):
"""Extract detailed field info (pypdf version) to JSON."""
if len(argv) < 2:
Output.error("MissingArg", "Usage: form.detail <pdf> <output.json>")
pdf_path = argv[0]
json_output_path = argv[1]
from pypdf import PdfReader
reader = PdfReader(pdf_path)
field_info = _get_field_info(reader)
with open(json_output_path, "w") as f:
json.dump(field_info, f, indent=2)
print(f"Wrote {len(field_info)} fields to {json_output_path}")
raise SystemExit(0)
# --- form.fill-legacy (pypdf version with monkeypatch) ---
def _validation_error_for_field_value(field_info, field_value):
field_type = field_info["type"]
field_id = field_info["field_id"]
if field_type == "checkbox":
checked_val = field_info["checked_value"]
unchecked_val = field_info["unchecked_value"]
if field_value != checked_val and field_value != unchecked_val:
return f'ERROR: Invalid value "{field_value}" for checkbox field "{field_id}". The checked value is "{checked_val}" and the unchecked value is "{unchecked_val}"'
elif field_type == "radio_group":
option_values = [opt["value"] for opt in field_info["radio_options"]]
if field_value not in option_values:
return f'ERROR: Invalid value "{field_value}" for radio group field "{field_id}". Valid values are: {option_values}'
elif field_type == "choice":
choice_values = [opt["value"] for opt in field_info["choice_options"]]
if field_value not in choice_values:
return f'ERROR: Invalid value "{field_value}" for choice field "{field_id}". Valid values are: {choice_values}'
return None
def _monkeypatch_pypdf_method():
"""
Workaround for pypdf bug with selection list fields.
pypdf's get_inherited returns a list of two-element lists for /Opt fields
in selection lists, causing join() to throw TypeError. We patch it to
return just the value strings.
"""
from pypdf.generic import DictionaryObject
from pypdf.constants import FieldDictionaryAttributes
original_get_inherited = DictionaryObject.get_inherited
def patched_get_inherited(self, key: str, default=None):
result = original_get_inherited(self, key, default)
if key == FieldDictionaryAttributes.Opt:
if isinstance(result, list) and all(isinstance(v, list) and len(v) == 2 for v in result):
result = [r[0] for r in result]
return result
DictionaryObject.get_inherited = patched_get_inherited
@cmd("form.fill-legacy")
def form_fill_legacy(argv: list):
"""Fill fillable form fields (pypdf version with monkeypatch)."""
if len(argv) < 3:
Output.error("MissingArg", "Usage: form.fill-legacy <pdf> <fields.json> <output.pdf>")
input_pdf = argv[0]
fields_json = argv[1]
output_pdf = argv[2]
from pypdf import PdfReader, PdfWriter
_monkeypatch_pypdf_method()
with open(fields_json) as f:
fields = json.load(f)
# Group by page number
fields_by_page: Dict[int, dict] = {}
for field in fields:
if "value" in field:
field_id = field["field_id"]
page = field["page"]
if page not in fields_by_page:
fields_by_page[page] = {}
fields_by_page[page][field_id] = field["value"]
reader = PdfReader(input_pdf)
has_error = False
field_info = _get_field_info(reader)
fields_by_ids = {f["field_id"]: f for f in field_info}
for field in fields:
existing_field = fields_by_ids.get(field["field_id"])
if not existing_field:
has_error = True
print(f"ERROR: `{field['field_id']}` is not a valid field ID")
elif field["page"] != existing_field["page"]:
has_error = True
print(f"ERROR: Incorrect page number for `{field['field_id']}` (got {field['page']}, expected {existing_field['page']})")
else:
if "value" in field:
err = _validation_error_for_field_value(existing_field, field["value"])
if err:
print(err)
has_error = True
if has_error:
raise SystemExit(1)
writer = PdfWriter(clone_from=reader)
for page, field_values in fields_by_page.items():
writer.update_page_form_field_values(writer.pages[page - 1], field_values, auto_regenerate=False)
writer.set_need_appearances_writer(True)
with open(output_pdf, "wb") as f:
writer.write(f)
print(f"Filled {len(fields_by_page)} page(s) in {output_pdf}")
raise SystemExit(0)
# --- form.annotate (annotation-based filling with coordinate transform) ---
def _transform_coordinates(bbox, image_width, image_height, pdf_width, pdf_height):
"""Transform bounding box from image coordinates to PDF coordinates."""
x_scale = pdf_width / image_width
y_scale = pdf_height / image_height
left = bbox[0] * x_scale
right = bbox[2] * x_scale
# Flip Y coordinates for PDF
top = pdf_height - (bbox[1] * y_scale)
bottom = pdf_height - (bbox[3] * y_scale)
return left, bottom, right, top
def _normalise_fields_json(raw: dict) -> dict:
"""Accept both the current sheet-based schema and the legacy flat schema.
Current (v2) schema uses ``sheet[].pg/dims/regions[]`` with nested
``label.bbox``, ``target.bbox``, ``ink{}``.
Legacy (v1) schema uses ``pages[]`` + ``form_fields[]`` with flat keys
like ``entry_bounding_box``, ``label_bounding_box``, ``entry_text{}``.
Returns a normalised dict in the **v2** internal format used by all
downstream functions.
"""
# Already v2
if "sheet" in raw:
return raw
# Convert legacy → v2
pages_lut = {p["page_number"]: p for p in raw.get("pages", [])}
sheets: dict = {} # pg -> sheet entry
for f in raw.get("form_fields", []):
pg = f["page_number"]
if pg not in sheets:
pi = pages_lut.get(pg, {})
sheets[pg] = {
"pg": pg,
"dims": [pi.get("image_width", 0), pi.get("image_height", 0)],
"regions": [],
}
et = f.get("entry_text", {})
region = {
"id": f.get("field_label", f.get("description", "")),
"hint": f.get("description", ""),
"label": {"tag": f.get("field_label", ""), "bbox": f.get("label_bounding_box", [0, 0, 0, 0])},
"target": {"bbox": f.get("entry_bounding_box", [0, 0, 0, 0])},
"ink": {},
}
if isinstance(et, dict) and et.get("text"):
region["ink"]["value"] = et["text"]
if "font_size" in et:
region["ink"]["size"] = et["font_size"]
if "font_color" in et:
region["ink"]["color"] = et["font_color"]
if "font" in et:
region["ink"]["font"] = et["font"]
sheets[pg]["regions"].append(region)
return {"sheet": list(sheets.values())}
@cmd("form.annotate")
def form_annotate(argv: list):
"""Fill a PDF by adding text annotations (FreeText) defined in fields.json."""
if len(argv) < 3:
Output.error("MissingArg", "Usage: form.annotate <pdf> <fields.json> <output.pdf>")
input_pdf = argv[0]
fields_json_path = argv[1]
output_pdf = argv[2]
from pypdf import PdfReader, PdfWriter
from pypdf.annotations import FreeText
with open(fields_json_path, "r") as f:
fields_data = _normalise_fields_json(json.load(f))
reader = PdfReader(input_pdf)
writer = PdfWriter()
writer.append(reader)
# Get PDF dimensions for each page
pdf_dimensions = {}
for i, page in enumerate(reader.pages):
mediabox = page.mediabox
pdf_dimensions[i + 1] = [mediabox.width, mediabox.height]
annotations = []
for page_entry in fields_data["sheet"]:
pg = page_entry["pg"]
image_width, image_height = page_entry["dims"]
pdf_width, pdf_height = pdf_dimensions[pg]
for region in page_entry["regions"]:
ink = region.get("ink", {})
text = ink.get("value", "")
if not text:
continue
transformed_box = _transform_coordinates(
region["target"]["bbox"],
image_width, image_height,
pdf_width, pdf_height
)
font_name = ink.get("font", "Arial")
font_size = str(ink.get("size", 14)) + "pt"
font_color = ink.get("color", "000000")
annotation = FreeText(
text=text,
rect=transformed_box,
font=font_name,
font_size=font_size,
font_color=font_color,
border_color=None,
background_color=None,
)
annotations.append(annotation)
writer.add_annotation(page_number=pg - 1, annotation=annotation)
with open(output_pdf, "wb") as output:
writer.write(output)
print(f"Successfully filled PDF form and saved to {output_pdf}")
print(f"Added {len(annotations)} text annotations")
raise SystemExit(0)
# --- form.render (PDF to PNG images) ---
@cmd("form.render")
def form_render(argv: list):
"""Convert each page of a PDF to a PNG image."""
if len(argv) < 2:
Output.error("MissingArg", "Usage: form.render <pdf> <output_dir> [--max-dim N]")
pdf_path = argv.pop(0)
output_dir = argv.pop(0)
max_dim_str = _pop_flag(argv, "-m", "--max-dim")
max_dim = int(max_dim_str) if max_dim_str else 1000
from pdf2image import convert_from_path
os.makedirs(output_dir, exist_ok=True)
images = convert_from_path(pdf_path, dpi=200)
for i, image in enumerate(images):
width, height = image.size
if width > max_dim or height > max_dim:
scale_factor = min(max_dim / width, max_dim / height)
new_width = int(width * scale_factor)
new_height = int(height * scale_factor)
image = image.resize((new_width, new_height))
image_path = os.path.join(output_dir, f"page_{i+1}.png")
image.save(image_path)
print(f"Saved page {i+1} as {image_path} (size: {image.size})")
print(f"Converted {len(images)} pages to PNG images")
raise SystemExit(0)
# --- form.validate (bounding box validation image) ---
@cmd("form.validate")
def form_validate(argv: list):
"""Create validation images with bounding box rectangles."""
if len(argv) < 4:
Output.error("MissingArg", "Usage: form.validate <page> <fields.json> <input_img> <output_img>")
page_number = int(argv[0])
fields_json_path = argv[1]
input_path = argv[2]
output_path = argv[3]
from PIL import Image, ImageDraw
with open(fields_json_path, 'r') as f:
data = _normalise_fields_json(json.load(f))
img = Image.open(input_path)
draw = ImageDraw.Draw(img)
num_boxes = 0
for page_entry in data["sheet"]:
if page_entry["pg"] != page_number:
continue
for region in page_entry["regions"]:
target_box = region["target"]["bbox"]
label_box = region["label"]["bbox"]
draw.rectangle(target_box, outline='red', width=2)
draw.rectangle(label_box, outline='blue', width=2)
num_boxes += 2
img.save(output_path)
print(f"Created validation image at {output_path} with {num_boxes} bounding boxes")
raise SystemExit(0)
# --- form.check-bbox (bounding box overlap detection) ---
@dataclass
class _RectAndField:
rect: list
rect_type: str
field: dict
def get_bounding_box_messages(fields_json_stream) -> List[str]:
"""Check for overlapping bounding boxes. Returns list of messages (max 20)."""
messages = []
raw = json.load(fields_json_stream)
data = _normalise_fields_json(raw)
total_regions = sum(len(pe["regions"]) for pe in data["sheet"])
messages.append(f"Read {total_regions} regions across {len(data['sheet'])} page(s)")
def rects_intersect(r1, r2):
disjoint_horizontal = r1[0] >= r2[2] or r1[2] <= r2[0]
disjoint_vertical = r1[1] >= r2[3] or r1[3] <= r2[1]
return not (disjoint_horizontal or disjoint_vertical)
has_error = False
for page_entry in data["sheet"]:
pg = page_entry["pg"]
# Collect all rects on this page
rects_and_regions = []
for region in page_entry["regions"]:
rects_and_regions.append(_RectAndField(region["label"]["bbox"], "label", region))
rects_and_regions.append(_RectAndField(region["target"]["bbox"], "target", region))
for i, ri in enumerate(rects_and_regions):
for j in range(i + 1, len(rects_and_regions)):
rj = rects_and_regions[j]
if rects_intersect(ri.rect, rj.rect):
has_error = True
rid = ri.field.get("id", ri.field.get("hint", "?"))
rjd = rj.field.get("id", rj.field.get("hint", "?"))
if ri.field is rj.field:
messages.append(f"FAILURE: pg {pg} — label/target overlap for `{rid}` ({ri.rect}, {rj.rect})")
else:
messages.append(f"FAILURE: pg {pg}{ri.rect_type} of `{rid}` ({ri.rect}) overlaps {rj.rect_type} of `{rjd}` ({rj.rect})")
if len(messages) >= 20:
messages.append("Aborting further checks; fix bounding boxes and try again")
return messages
# Height check for target rects
if ri.rect_type == "target":
ink = ri.field.get("ink", {})
if ink.get("value"):
font_size = ink.get("size", 14)
entry_height = ri.rect[3] - ri.rect[1]
if entry_height < font_size:
has_error = True
rid = ri.field.get("id", ri.field.get("hint", "?"))
messages.append(f"FAILURE: pg {pg} — target box height ({entry_height}) for `{rid}` is shorter than font size ({font_size}). Increase box height or decrease ink.size.")
if len(messages) >= 20:
messages.append("Aborting further checks; fix bounding boxes and try again")
return messages
if not has_error:
messages.append("SUCCESS: All bounding boxes are valid")
return messages
@cmd("form.check-bbox")
def form_check_bbox(argv: list):
"""Check bounding boxes in fields.json for overlaps."""
if not argv:
Output.error("MissingArg", "Usage: form.check-bbox <fields.json>")
with open(argv[0]) as f:
messages = get_bounding_box_messages(f)
for msg in messages:
print(msg)
raise SystemExit(0)
# ═══════════════════════════════════════════════════════════════
# Section 6: convert — office, HTML, and LaTeX
# ═══════════════════════════════════════════════════════════════
_CONVERTIBLE_EXTENSIONS = frozenset({
".docx", ".doc", ".odt", ".rtf",
".pptx", ".ppt", ".odp",
".xlsx", ".xls", ".ods", ".csv",
".txt", ".html", ".htm",
})
_SOFFICE_CANDIDATES = [
"/Applications/LibreOffice.app/Contents/MacOS/soffice",
os.path.expanduser("~/Applications/LibreOffice.app/Contents/MacOS/soffice"),
"/usr/bin/soffice",
"/usr/local/bin/soffice",
"/usr/lib/libreoffice/program/soffice",
"/opt/libreoffice/program/soffice",
"/snap/bin/libreoffice.soffice",
]
def _locate_soffice() -> Optional[str]:
"""Search for a working soffice binary."""
for candidate in _SOFFICE_CANDIDATES:
if Path(candidate).is_file():
return candidate
for alias in ("soffice", "libreoffice"):
found = shutil.which(alias)
if found:
return found
return None
@cmd("convert.office")
def convert_office(argv: list):
"""Convert an office document to PDF via LibreOffice."""
if not argv:
Output.error("MissingArg", "input file required")
src_path = argv.pop(0)
out_path = _pop_flag(argv, "-o", "--output")
src = Output.check_file(src_path)
ext = src.suffix.lower()
if ext not in _CONVERTIBLE_EXTENSIONS:
Output.error(
"UnsupportedFormat",
f"Unsupported format: {ext}",
hint=f"Supported formats: {', '.join(sorted(_CONVERTIBLE_EXTENSIONS))}",
)
binary = _locate_soffice()
if binary is None:
Output.error(
"DependencyMissing",
"LibreOffice not found",
hint="Please install LibreOffice: https://www.libreoffice.org/download/",
)
target_dir = Path(out_path).parent if out_path else src.parent
target_dir.mkdir(parents=True, exist_ok=True)
cmd_list = [binary, "--headless", "--convert-to", "pdf", "--outdir", str(target_dir), str(src)]
try:
proc = subprocess.run(cmd_list, capture_output=True, text=True, timeout=120)
if proc.returncode != 0:
Output.error("ConvertError", f"Conversion failed: {proc.stderr.strip() or 'Unknown error'}", code=4)
except subprocess.TimeoutExpired:
Output.error("Timeout", "Conversion timeout (>120s)", code=4)
except Exception as exc:
Output.error("ConvertError", f"Conversion failed: {exc}", code=4)
auto_name = target_dir / f"{src.stem}.pdf"
final = Path(out_path) if out_path else auto_name
if out_path and final.name != auto_name.name and auto_name.exists():
auto_name.rename(final)
if not final.exists():
Output.error("ConvertError", "Converted PDF file was not generated", code=4)
Output.success({"input": str(src), "output": str(final), "format": ext})
@cmd("convert.html")
def convert_html(argv: list):
"""Convert HTML to PDF via node html2pdf.js."""
if not argv:
Output.error("MissingArg", "input file required")
js_path = _SCRIPT_DIR / "html2pdf.js"
if not js_path.exists():
Output.error("DependencyMissing", "html2pdf.js not found in scripts directory")
node_path = shutil.which("node")
if not node_path:
Output.error("DependencyMissing", "node not found in PATH")
cmd_list = [node_path, str(js_path)] + argv
try:
proc = subprocess.run(cmd_list, timeout=180)
raise SystemExit(proc.returncode)
except subprocess.TimeoutExpired:
Output.error("Timeout", "HTML conversion timeout (>180s)", code=4)
except SystemExit:
raise
except Exception as exc:
Output.error("ConvertError", f"HTML conversion failed: {exc}", code=4)
# --- convert.latex (tectonic wrapper with log filtering + PDF stats) ---
_NOISE_RE = re.compile(
r"^note: (?:"
r'"version 2" Tectonic'
r"|Running TeX"
r"|Rerunning TeX because"
r"|Running xdvipdfmx"
r"|downloading "
r"|Skipped writing .* intermediate files"
r")"
)
def _find_tectonic() -> Optional[str]:
"""Locate the tectonic binary.
Search order:
1. scripts/ dir (bundled binary — macOS arm64 only)
2. ~/tectonic (user-placed binary)
3. System PATH (package-manager install)
NOTE: The bundled binary at scripts/tectonic is a macOS arm64 (Apple
Silicon) Mach-O executable. It will NOT run on Linux or Windows.
On other platforms, install tectonic via the system package manager
or download the correct binary — see the error message in
convert_latex() for instructions.
"""
local_bin = _SCRIPT_DIR / "tectonic"
if local_bin.exists() and os.access(local_bin, os.X_OK):
return str(local_bin)
home_bin = Path.home() / "tectonic"
if home_bin.exists() and os.access(home_bin, os.X_OK):
return str(home_bin)
system_bin = shutil.which("tectonic")
return system_bin
def _human_size(nbytes: int) -> str:
for unit in ("B", "KB", "MB", "GB"):
if nbytes < 1024:
return f"{nbytes:.2f} {unit}"
nbytes /= 1024
return f"{nbytes:.2f} TB"
def _pdf_stats(pdf_file: Path):
"""Return (pages, word_count, image_count) or Nones."""
try:
from pypdf import PdfReader
except ImportError:
for attempt in (
[sys.executable, "-m", "pip", "install", "-q", "pypdf"],
[sys.executable, "-m", "pip", "install", "-q", "--break-system-packages", "pypdf"],
[sys.executable, "-m", "pip", "install", "-q", "--user", "pypdf"],
):
if subprocess.run(attempt, check=False, capture_output=True).returncode == 0:
break
try:
from pypdf import PdfReader
except ImportError:
return None, None, None
try:
reader = PdfReader(str(pdf_file))
n_pages = len(reader.pages)
all_text = "".join(p.extract_text() or "" for p in reader.pages)
n_words = len([w for w in all_text.split() if w.strip()])
n_images = 0
for pg in reader.pages:
xobj = pg.get("/Resources", {}).get("/XObject")
if xobj:
obj = xobj.get_object()
n_images += sum(1 for k in obj if obj[k].get("/Subtype") == "/Image")
return n_pages, n_words, n_images
except Exception as exc:
print(f"Error extracting PDF info: {exc}", file=sys.stderr)
return None, None, None
def _classify_lines(lines):
"""Bucket raw output into errors / warnings / layout issues."""
errors, warnings, layout, pdf_note = [], [], [], None
for raw in lines:
ln = raw.rstrip()
if not ln:
continue
if _NOISE_RE.match(ln):
if ln.startswith("note: Writing"):
pdf_note = ln
continue
if ln.startswith("error:"):
errors.append(ln)
elif ln.startswith("warning:"):
warnings.append(ln)
elif re.search(r"(Overfull|Underfull) \\[hv]box", ln) or re.search(r"(Font shape|Missing character)", ln):
layout.append(ln)
return errors, warnings, layout, pdf_note
def _parse_writing_note(note: Optional[str]):
m = re.search(r"Writing `(.+?)` \((.+?)\)", note or "")
return (m.group(1), m.group(2)) if m else (None, None)
@cmd("convert.blueprint")
def convert_blueprint(argv: list):
"""
[Auto-Pipeline] Extract JSON blueprint from LLM markdown response,
compile it to HTML via design_engine, and render to PDF.
"""
if not argv:
Output.error("MissingArg", "Usage: convert.blueprint <llm_response_file.md> [-o <out.pdf>]")
input_file = argv.pop(0)
out_pdf = _pop_flag(argv, "-o", "--output") or "output.pdf"
src = Output.check_file(input_file)
content = src.read_text(encoding="utf-8")
# 1. Smart JSON extraction (regardless of whether LLM wrapped in Markdown code blocks)
match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', content, re.DOTALL)
if match:
json_str = match.group(1)
else:
# Fallback: Assume the file is pure JSON
json_str = content
try:
# Validate JSON format
parsed_json = json.loads(json_str)
except json.JSONDecodeError as e:
Output.error("InvalidJSON", f"Failed to parse JSON blueprint: {e}", hint="Ensure the LLM output is valid JSON without trailing commas.")
# 1b. Encoding integrity check — detect corrupted characters before rendering
corruption_found = False
def _scan_for_corruption(obj, path=""):
"""Recursively scan JSON values for encoding corruption markers."""
nonlocal corruption_found
if isinstance(obj, str):
for i, ch in enumerate(obj):
if ch == '\ufffd': # Unicode replacement character
context = obj[max(0, i-10):i+10]
print(f" \u26a0\ufe0f U+FFFD at {path}[{i}]: ...{context}...")
corruption_found = True
elif '\ud800' <= ch <= '\udfff': # Lone surrogate
print(f" \u26a0\ufe0f Lone surrogate U+{ord(ch):04X} at {path}[{i}]")
corruption_found = True
elif isinstance(obj, dict):
for k, v in obj.items():
_scan_for_corruption(v, f"{path}.{k}")
elif isinstance(obj, list):
for idx, v in enumerate(obj):
_scan_for_corruption(v, f"{path}[{idx}]")
_scan_for_corruption(parsed_json)
if corruption_found:
print("\n\u26a0\ufe0f WARNING: Corrupted characters detected in blueprint content!")
print(" These will render as \ufffd (replacement character) in the PDF.")
print(" Consider fixing the source text and re-running.\n")
# Also sanitize: replace U+FFFD with empty string to avoid visible corruption
json_str_clean = json_str.replace('\ufffd', '')
if json_str_clean != json_str:
json_str = json_str_clean
print(" \u2139\ufe0f Auto-removed U+FFFD characters from blueprint.")
# 2. Save clean JSON blueprint and temp HTML path
blueprint_path = src.parent / f"{src.stem}_pure_blueprint.json"
html_path = src.parent / f"{src.stem}_rendered.html"
blueprint_path.write_text(json_str, encoding="utf-8")
# 3. Call design_engine.py to compile HTML
engine_script = _SCRIPT_DIR / "design_engine.py"
print("🎨 [1/2] Compiling JSON Blueprint to Art-Directed HTML...", flush=True)
try:
subprocess.run([
sys.executable, str(engine_script), "compile",
"--blueprint", str(blueprint_path),
"--output", str(html_path)
], check=True)
except subprocess.CalledProcessError:
Output.error("CompileError", "design_engine.py failed to compile the blueprint.", code=4)
# 4. Call html2pdf.js to render PDF
print("📄 [2/2] Rendering HTML to High-Res PDF via Playwright...", flush=True)
js_script = _SCRIPT_DIR / "html2pdf.js"
node_path = shutil.which("node")
if not node_path:
Output.error("DependencyMissing", "node not found in PATH")
try:
subprocess.run([
node_path, str(js_script), str(html_path), "-o", out_pdf,
"--width", "720px", "--height", "960px"
], check=True)
except subprocess.CalledProcessError:
Output.error("RenderError", "html2pdf.js failed to render the PDF.", code=4)
print(f"\n🎉 Success! Masterpiece generated at: {out_pdf}")
raise SystemExit(0)
@cmd("convert.latex")
def convert_latex(argv: list):
"""Compile LaTeX file via tectonic, filter logs, report PDF stats."""
if not argv:
Output.error("MissingArg", "tex file required")
tex_file = argv.pop(0)
runs_str = _pop_flag(argv, "-r", "--runs")
runs = int(runs_str) if runs_str else 1
keep_logs = _pop_flag(argv, "-k", "--keep-logs", needs_value=False)
tex = Path(tex_file)
if not tex.exists():
print(f"\u2717 Error: File not found {tex_file}")
raise SystemExit(1)
print(f"Compiling {tex.name}...", flush=True)
if runs > 1:
print(f"Running {runs} passes (for cross-references)", flush=True)
tectonic = _find_tectonic()
if tectonic is None:
import platform
print("\n\u2717 Error: tectonic command not found")
print()
print("The bundled scripts/tectonic binary is macOS arm64 only.")
print("Install tectonic for your platform:\n")
sys_name = platform.system()
if sys_name == "Darwin":
print(" macOS (Homebrew): brew install tectonic")
print(" macOS (binary): curl -sSL https://drop-sh.fullyjustified.net | sh")
print(" mv tectonic ~/tectonic && chmod +x ~/tectonic")
elif sys_name == "Linux":
print(" Debian/Ubuntu: apt install tectonic (if available)")
print(" Arch Linux: pacman -S tectonic")
print(" Conda: conda install -c conda-forge tectonic")
print(" Binary download: curl -sSL https://drop-sh.fullyjustified.net | sh")
print(" mv tectonic ~/tectonic && chmod +x ~/tectonic")
elif sys_name == "Windows":
print(" Windows (scoop): scoop install tectonic")
print(" Windows (choco): choco install tectonic")
print(" Conda: conda install -c conda-forge tectonic")
else:
print(" See: https://tectonic-typesetting.github.io/")
print()
print("After installing, verify with: tectonic --version")
print()
print("Quick check — is tectonic already installed somewhere?")
print(f" which tectonic: {shutil.which('tectonic') or 'not found'}")
print(f" ~/tectonic: {'exists' if (Path.home() / 'tectonic').exists() else 'not found'}")
print(f" scripts/tectonic: {'exists' if (_SCRIPT_DIR / 'tectonic').exists() else 'not found'}")
print(f" Platform: {sys_name} {platform.machine()}")
raise SystemExit(1)
all_lines = []
ok = False
for _ in range(runs):
try:
proc = subprocess.run(
[tectonic, "-X", "compile", str(tex)],
capture_output=True, text=True, timeout=120,
)
all_lines.extend((proc.stdout + proc.stderr).splitlines())
ok = proc.returncode == 0
if not ok:
break
except subprocess.TimeoutExpired:
print("\n\u2717 Error: Compilation timeout (>2 minutes)")
raise SystemExit(1)
except Exception as exc:
print(f"\n\u2717 Error: {exc}")
raise SystemExit(1)
if keep_logs:
print("\n" + "=" * 50 + "\nFull logs:\n" + "=" * 50)
for ln in all_lines:
print(ln)
print("=" * 50 + "\n")
errors, warnings, layout, pdf_note = _classify_lines(all_lines)
noted_name, noted_size = _parse_writing_note(pdf_note)
pdf_name = noted_name or (tex.stem + ".pdf")
pdf_path = tex.parent / pdf_name
print()
if ok:
tag = "\u2713 Compilation successful" + (" (with warnings)" if warnings or layout else "")
print(tag)
else:
print("\u2717 Compilation failed")
if ok and pdf_path.exists():
print("\n========================\nPDF Information\n========================")
print(f"File: {pdf_name}")
print(f"Size: {noted_size or _human_size(pdf_path.stat().st_size)}")
pages, words, images = _pdf_stats(pdf_path)
if pages is not None:
print(f"Pages: {pages}")
if words is not None:
print(f"Words: ~{words:,}")
if images is not None:
print(f"Images: {images}")
if layout:
print(f"\n========================\nLayout Issues ({len(layout)})\n========================")
for ln in layout:
print(ln)
if warnings:
print(f"\n========================\nWarnings ({len(warnings)})\n========================")
for ln in warnings:
print(ln.replace("warning: ", "", 1))
if errors:
print("\n========================\nErrors\n========================")
for ln in errors:
print(ln.replace("error: ", "", 1))
if ok and (layout or warnings):
print()
print("<system-reminder>")
print(f"Detected {len(layout)} layout issues and {len(warnings)} warnings.")
print("These issues affect PDF typesetting quality and must be fixed.")
print("Do not dismiss with 'warnings don't affect output'. Fix all issues.")
print("</system-reminder>")
raise SystemExit(0 if ok else 1)
# ═══════════════════════════════════════════════════════════════
# Section 7b: Font Fallback Engine — automatic <font> wrapping
# ═══════════════════════════════════════════════════════════════
# -- Data structures --------------------------------------------------------
FONT_FALLBACK_CHAIN: Dict[str, List[str]] = {
"Times New Roman": ["SimHei"],
"Calibri": ["SimHei"],
"DejaVuSans": ["SimHei"],
"SimHei": ["Times New Roman"],
"Microsoft YaHei": ["Times New Roman"],
}
FONT_PREFER_RANGES: Dict[str, List[Tuple[int, int, str]]] = {
"SimHei": [
(0x0400, 0x04FF, "Times New Roman"), # Cyrillic
(0x0500, 0x052F, "Times New Roman"), # Cyrillic Supplement
],
"Microsoft YaHei": [
(0x0400, 0x04FF, "Times New Roman"),
(0x0500, 0x052F, "Times New Roman"),
],
}
# -- Content sanitization (runtime, pre-font-fallback) ----------------------
# Control: global switches
CONTENT_SANITIZE_ENABLED = True # master switch
CONTENT_SANITIZE_STRIP_ZW = True # zero-width chars (disable for Thai/Burmese/Hindi)
# Fullwidth ASCII → halfwidth (letters + digits only, NOT punctuation)
_FULLWIDTH_OFFSET = 0xFEE0 # ord('') - ord('A') == 0xFEE0
# Ligature decomposition (U+FB00FB06)
_LIGATURE_MAP: Dict[int, str] = {
0xFB00: "ff",
0xFB01: "fi",
0xFB02: "fl",
0xFB03: "ffi",
0xFB04: "ffl",
0xFB05: "st", # ſt long s t
0xFB06: "st", # st st
}
# Zero-width / invisible characters to strip
_ZERO_WIDTH_CHARS: Set[int] = {
0x200B, # ZERO WIDTH SPACE
0x200C, # ZERO WIDTH NON-JOINER
0x200D, # ZERO WIDTH JOINER
0x2060, # WORD JOINER
0xFEFF, # BOM (when not at file start)
0x034F, # COMBINING GRAPHEME JOINER
}
# Bidirectional control characters to strip
_BIDI_CONTROLS: Set[int] = {
0x200E, 0x200F, # LRM, RLM
0x202A, 0x202B, 0x202C, # LRE, RLE, PDF
0x202D, 0x202E, # LRO, RLO
0x2066, 0x2067, 0x2068, 0x2069, # LRI, RLI, FSI, PDI
}
# Variation selectors to strip (emoji style modifiers)
_VARIATION_SELECTORS: Set[int] = set(range(0xFE00, 0xFE10)) # U+FE00FE0F
# Unicode noncharacters (should never appear in text interchange)
_NONCHARACTERS: Set[int] = set(range(0xFDD0, 0xFDF0)) # U+FDD0FDEF
_NONCHARACTERS.add(0xFFFE)
_NONCHARACTERS.add(0xFFFF)
_content_sanitize_warnings: List[str] = []
def content_sanitize(text: str, dry_run: bool = False) -> str:
"""Sanitize Paragraph text content before font fallback.
Removes/replaces characters that should not appear in final PDF output.
Designed for CJK + Latin content. Does NOT touch XML/HTML tags in the text.
Args:
text: Raw Paragraph text (may contain ReportLab XML tags like <b>, <font>).
dry_run: If True, collect warnings but still return cleaned text.
Returns:
Cleaned text string.
"""
if not CONTENT_SANITIZE_ENABLED or not text:
return text
global _content_sanitize_warnings
if dry_run:
_content_sanitize_warnings = []
# Split text into tags and plain-text segments to avoid mangling XML tags
# e.g. '<font name="SimHei">你好</font>' → ['', '<font name="SimHei">', '你好', '</font>', '']
parts = _TAG_RE.split(text)
tags = _TAG_RE.findall(text)
out_pieces: List[str] = []
for i, part in enumerate(parts):
if part:
# Process plain text segment character by character
cleaned: List[str] = []
for ch in part:
code = ord(ch)
replacement = _sanitize_one_char(ch, code, dry_run)
if replacement is not None:
cleaned.append(replacement)
# else: character was deleted
out_pieces.append("".join(cleaned))
# Append the tag that follows this part (if any)
if i < len(tags):
out_pieces.append(tags[i]) # tags pass through untouched
return "".join(out_pieces)
def _sanitize_one_char(ch: str, code: int, dry_run: bool) -> Optional[str]:
"""Process a single character. Returns replacement string, or None to delete."""
# --- DELETE: ASCII control characters (except \t \n \r) ---
if code <= 0x1F and ch not in '\t\n\r':
if dry_run:
_content_sanitize_warnings.append(
f"DELETE control char U+{code:04X} ({repr(ch)})")
return None
# --- DELETE: U+007F DEL ---
if code == 0x7F:
return None
# --- DELETE: zero-width characters ---
if CONTENT_SANITIZE_STRIP_ZW and code in _ZERO_WIDTH_CHARS:
if dry_run:
_content_sanitize_warnings.append(
f"DELETE zero-width U+{code:04X}")
return None
# --- DELETE: bidirectional controls ---
if code in _BIDI_CONTROLS:
if dry_run:
_content_sanitize_warnings.append(
f"DELETE bidi control U+{code:04X}")
return None
# --- DELETE: variation selectors ---
if code in _VARIATION_SELECTORS:
if dry_run:
_content_sanitize_warnings.append(
f"DELETE variation selector U+{code:04X}")
return None
# --- DELETE: Unicode noncharacters ---
if code in _NONCHARACTERS:
return None
# --- REPLACE: U+FFFD replacement character → '?' ---
if code == 0xFFFD:
if dry_run:
_content_sanitize_warnings.append(
"REPLACE U+FFFD (upstream encoding error) → '?'")
return '?'
# --- REPLACE: fullwidth ASCII letters/digits → halfwidth ---
# U+FF21FF3A = , U+FF41FF5A = , U+FF10FF19 =
if 0xFF21 <= code <= 0xFF3A or 0xFF41 <= code <= 0xFF5A or 0xFF10 <= code <= 0xFF19:
halfwidth = chr(code - _FULLWIDTH_OFFSET)
if dry_run:
_content_sanitize_warnings.append(
f"REPLACE fullwidth '{ch}' → halfwidth '{halfwidth}'")
return halfwidth
# --- REPLACE: ligatures → decomposed ---
if code in _LIGATURE_MAP:
decomposed = _LIGATURE_MAP[code]
if dry_run:
_content_sanitize_warnings.append(
f"REPLACE ligature '{ch}' (U+{code:04X}) → '{decomposed}'")
return decomposed
# --- REPLACE: line/paragraph separators → newline ---
if code == 0x2028 or code == 0x2029:
if dry_run:
_content_sanitize_warnings.append(
f"REPLACE U+{code:04X} separator → '\\n'")
return '\n'
# --- REPLACE: soft hyphen → empty (invisible, ReportLab ignores it) ---
if code == 0x00AD:
return None
# --- WARN (pass through): Private Use Area ---
if 0xE000 <= code <= 0xF8FF:
if dry_run:
_content_sanitize_warnings.append(
f"WARN Private Use Area U+{code:04X} (passed through)")
return ch
# --- PASS: everything else ---
return ch
# -- Glyph detection -------------------------------------------------------
_glyph_cache: Dict[Tuple[str, int], bool] = {}
def _has_glyph(font_name: str, code: int) -> bool:
"""Check whether *font_name* has a real glyph outline for *code*."""
key = (font_name, code)
if key in _glyph_cache:
return _glyph_cache[key]
try:
from reportlab.pdfbase import pdfmetrics
font = pdfmetrics.getFont(font_name)
result = code in font.face.charToGlyph
except Exception:
result = False
_glyph_cache[key] = result
return result
def _best_font_for_char(code: int, base_font: str) -> str:
"""Return the best registered font for a single character *code*."""
# ASCII fast path
if code < 128:
return base_font
# Aesthetic preference ranges
for rng_start, rng_end, preferred in FONT_PREFER_RANGES.get(base_font, []):
if rng_start <= code <= rng_end:
if _has_glyph(preferred, code):
return preferred
# Base font has the glyph — use it
if _has_glyph(base_font, code):
return base_font
# Walk the fallback chain
for fb in FONT_FALLBACK_CHAIN.get(base_font, []):
if _has_glyph(fb, code):
return fb
# Nothing found — return base_font (will render □ but won't crash)
return base_font
# -- Text-level automatic <font> wrapping ----------------------------------
_TAG_RE = re.compile(r"</?[a-zA-Z][^>]*>")
def font_fallback(text: str, base_font: str) -> str:
"""Wrap characters that *base_font* cannot render with ``<font>`` tags.
Preserves existing XML tags (``<b>``, ``<font>``, ``<super>``, etc.).
Returns the transformed markup string.
"""
if not text:
return text
# Split text into (plain, tag, plain, tag, ...) segments
parts = _TAG_RE.split(text)
tags = _TAG_RE.findall(text)
# Track <font> nesting so we don't re-wrap inside explicit <font>
font_stack: List[str] = []
out_pieces: List[str] = []
for i, part in enumerate(parts):
# Determine effective font at this point
effective = font_stack[-1] if font_stack else base_font
if part:
# Process plain text character by character
runs: List[Tuple[str, List[str]]] = []
for ch in part:
code = ord(ch)
best = _best_font_for_char(code, effective)
if runs and runs[-1][0] == best:
runs[-1][1].append(ch)
else:
runs.append((best, [ch]))
for font_name, chars in runs:
segment = "".join(chars)
if font_name == effective:
out_pieces.append(segment)
else:
out_pieces.append(f'<font name="{font_name}">{segment}</font>')
# Append the tag that follows this part (if any)
if i < len(tags):
tag = tags[i]
out_pieces.append(tag)
# Track font stack
tag_lower = tag.lower()
if tag_lower.startswith("<font "):
# Extract font name from tag
m = re.search(r'name\s*=\s*["\']([^"\']+)["\']', tag, re.IGNORECASE)
if m:
font_stack.append(m.group(1))
else:
font_stack.append(effective)
elif tag_lower == "</font>" and font_stack:
font_stack.pop()
return "".join(out_pieces)
# -- Monkey-patch installer -------------------------------------------------
_fallback_installed = False
def install_font_fallback():
"""Monkey-patch ``Paragraph.__init__`` so every Paragraph automatically
runs ``font_fallback()`` on its text. Idempotent — safe to call
multiple times.
"""
global _fallback_installed
if _fallback_installed:
return
try:
from reportlab.platypus import Paragraph as _Para
except ImportError:
return
_orig_init = _Para.__init__
def _patched_init(self, text, style=None, *args, **kwargs):
if isinstance(text, str):
text = content_sanitize(text) # Layer 1: clean dangerous chars
if style is not None:
base_font = getattr(style, "fontName", None)
if base_font and base_font in FONT_FALLBACK_CHAIN:
text = font_fallback(text, base_font) # Layer 2: font selection
_orig_init(self, text, style, *args, **kwargs)
_Para.__init__ = _patched_init
_fallback_installed = True
# -- Post-generation glyph check -------------------------------------------
def check_missing_glyphs(pdf_path: str) -> List[Dict[str, Any]]:
"""Scan a PDF for .notdef glyphs, control chars, and other problematic characters.
Returns a list of dicts with keys: page, position, context, kind.
Empty list means no issues found.
"""
try:
import fitz # PyMuPDF
except ImportError:
print("Warning: PyMuPDF (fitz) not installed — cannot check missing glyphs", file=sys.stderr)
return []
issues: List[Dict[str, Any]] = []
doc = fitz.open(pdf_path)
for page_num in range(len(doc)):
page = doc[page_num]
text = page.get_text()
for i, ch in enumerate(text):
code = ord(ch)
kind: Optional[str] = None
if ch == "\x00":
kind = "notdef"
elif code <= 0x1F and ch not in '\t\n\r':
kind = "control_char"
elif code == 0x7F:
kind = "control_char"
elif code == 0xFFFD:
kind = "replacement_char"
elif code in _ZERO_WIDTH_CHARS:
kind = "zero_width"
elif code in _BIDI_CONTROLS:
kind = "bidi_control"
elif code in _VARIATION_SELECTORS:
kind = "variation_selector"
elif 0xE000 <= code <= 0xF8FF:
kind = "private_use_area"
if kind:
start = max(0, i - 10)
end = min(len(text), i + 11)
context = text[start:end].replace("\x00", "")
issues.append({
"page": page_num + 1,
"position": i,
"char": f"U+{code:04X}",
"kind": kind,
"context": context,
})
doc.close()
# Deduplicate by page + context + kind
seen: Set[str] = set()
unique: List[Dict[str, Any]] = []
for item in issues:
key = f"{item['page']}:{item['kind']}:{item['context']}"
if key not in seen:
seen.add(key)
unique.append(item)
return unique
@cmd("font.check")
def font_check(argv: list):
"""Scan a PDF for missing glyphs (□ boxes) after generation."""
if not argv:
Output.error("MissingArg", "Usage: font.check <pdf_file>")
pdf_path = argv[0]
if not os.path.isfile(pdf_path):
Output.error("FileNotFound", f"File not found: {pdf_path}")
issues = check_missing_glyphs(pdf_path)
# Group by kind for better reporting
by_kind: Dict[str, List[Dict]] = {}
for item in issues:
k = item.get("kind", "unknown")
by_kind.setdefault(k, []).append(item)
result: Dict[str, Any] = {
"status": "issues_found" if issues else "ok",
"total_issues": len(issues),
"by_kind": {k: len(v) for k, v in by_kind.items()},
}
if issues:
result["issues"] = issues[:30] # cap output
result["hints"] = {
"notdef": "Missing glyphs (□). Fix: call install_font_fallback() after font registration.",
"control_char": "Control characters leaked into PDF. Fix: content_sanitize() should strip these.",
"replacement_char": "U+FFFD found — upstream encoding error corrupted original character.",
"zero_width": "Zero-width chars found. Usually harmless visually but affect text search/copy.",
"bidi_control": "Bidirectional control chars found. May cause text direction issues.",
"variation_selector": "Variation selectors found. No visual effect in ReportLab but shouldn't be here.",
"private_use_area": "Private Use Area chars found. Will render as □ unless a custom font covers them.",
}
print(json.dumps(result, ensure_ascii=False, indent=2))
raise SystemExit(0 if not issues else 1)
@cmd("toc.check")
def toc_check(argv: list):
"""Validate TOC quality in a PDF file (page numbers, entries, links)."""
if not argv:
Output.error("MissingArg", "Usage: toc.check <pdf_file>")
pdf_path = argv[0]
if not os.path.isfile(pdf_path):
Output.error("FileNotFound", f"File not found: {pdf_path}")
# Locate toc_validate.py relative to this script
script_dir = Path(__file__).resolve().parent
toc_script = script_dir / "toc_validate.py"
if not toc_script.exists():
# Try parent's scripts dir
candidate = script_dir.parent / "scripts" / "toc_validate.py"
if candidate.exists():
toc_script = candidate
if toc_script.exists():
# Import and call directly
import importlib.util
spec = importlib.util.spec_from_file_location("toc_validate", str(toc_script))
toc_mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(toc_mod)
result = toc_mod.check_pdf(pdf_path)
else:
Output.error("ScriptNotFound",
"toc_validate.py not found. Expected at: " + str(script_dir / "toc_validate.py"))
# Add remediation hints per error code
remediation = {
"TOC_ALL_SAME_PAGE": "ReportLab: use multiBuild() instead of build() with TocDocTemplate",
"TOC_NO_ENTRIES": "Check afterFlowable() notifies TOC correctly; ensure headings have bookmark_name/bookmark_level attributes",
"TOC_PAGES_INVALID": "TOC entry references a page beyond document total — regenerate TOC",
"TOC_NOT_FOUND": "Document has many pages but no TOC detected in first 5 pages",
"TOC_LINKS_MISSING": "TOC entries exist but no clickable links — check bookmark_key in afterFlowable()",
"TOC_ON_FIRST_PAGE": "Add a cover page before the TOC, or insert PageBreak() between cover and TOC. Expected: Cover(p1) → TOC(p2) → Content(p3+)",
}
for err in result.get("errors", []):
code = err.get("code", "")
if code in remediation:
err["fix"] = remediation[code]
for warn in result.get("warnings", []):
code = warn.get("code", "")
if code in remediation:
warn["fix"] = remediation[code]
print(json.dumps(result, ensure_ascii=False, indent=2))
raise SystemExit(0 if result.get("pass", False) else 1)
# ═══════════════════════════════════════════════════════════════
# Section 7: code — sanitization pipeline for PDF generation code
# ═══════════════════════════════════════════════════════════════
# --- Step 0: restore literal unicode escapes/entities to real chars ---
_RE_UNICODE_ESC = re.compile(r"(\\u[0-9a-fA-F]{4})|(\\U[0-9a-fA-F]{8})|(\\x[0-9a-fA-F]{2})")
def _restore_escapes(s: str) -> str:
# HTML entities: &#179; &#x2264; &alpha; ...
s = html.unescape(s)
# Literal backslash escapes: "\\u00B3" -> "³"
def _dec(m: re.Match) -> str:
esc = m.group(0)
try:
if esc.startswith("\\u") or esc.startswith("\\U"):
return chr(int(esc[2:], 16))
if esc.startswith("\\x"):
return chr(int(esc[2:], 16))
except Exception:
return esc
return esc
return _RE_UNICODE_ESC.sub(_dec, s)
# --- Step 1: superscripts/subscripts -> <super>/<sub> ---
_SUPERSCRIPT_MAP: Dict[str, str] = {
"\u2070": "0", "\u00b9": "1", "\u00b2": "2", "\u00b3": "3", "\u2074": "4",
"\u2075": "5", "\u2076": "6", "\u2077": "7", "\u2078": "8", "\u2079": "9",
"\u207a": "+", "\u207b": "-", "\u207c": "=", "\u207d": "(", "\u207e": ")",
"\u207f": "n", "\u1da6": "i",
}
_SUBSCRIPT_MAP: Dict[str, str] = {
"\u2080": "0", "\u2081": "1", "\u2082": "2", "\u2083": "3", "\u2084": "4",
"\u2085": "5", "\u2086": "6", "\u2087": "7", "\u2088": "8", "\u2089": "9",
"\u208a": "+", "\u208b": "-", "\u208c": "=", "\u208d": "(", "\u208e": ")",
"\u2090": "a", "\u2091": "e", "\u2095": "h", "\u1d62": "i", "\u2c7c": "j",
"\u2096": "k", "\u2097": "l", "\u2098": "m", "\u2099": "n", "\u2092": "o",
"\u209a": "p", "\u1d63": "r", "\u209b": "s", "\u209c": "t", "\u1d64": "u",
"\u1d65": "v", "\u2093": "x",
}
def _replace_super_sub(s: str) -> str:
out = []
for ch in s:
if ch in _SUPERSCRIPT_MAP:
out.append(f"<super>{_SUPERSCRIPT_MAP[ch]}</super>")
elif ch in _SUBSCRIPT_MAP:
out.append(f"<sub>{_SUBSCRIPT_MAP[ch]}</sub>")
else:
out.append(ch)
return "".join(out)
# --- Step 2: symbol fallback for SimHei (protect tags, then replace) ---
_SYMBOL_FALLBACK: Dict[str, str] = {
# Currently empty - enable entries as needed for fonts missing specific glyphs
}
def _fallback_symbols(s: str) -> str:
# Protect <super>/<sub> tags from being modified
placeholders: Dict[str, str] = {}
def _protect_tag(m: re.Match) -> str:
key = f"@@TAG{len(placeholders)}@@"
placeholders[key] = m.group(0)
return key
protected = re.sub(r"</?super>|</?sub>", _protect_tag, s)
# Replace symbols
protected = "".join(_SYMBOL_FALLBACK.get(ch, ch) for ch in protected)
# Restore tags
for k, v in placeholders.items():
protected = protected.replace(k, v)
return protected
def sanitize_code(text: str) -> str:
"""
Full sanitization pipeline for PDF generation code.
- Restore unicode escapes/entities to real characters
- Replace superscript/subscript unicode with <super>/<sub>
- Replace other risky symbols with ASCII/text fallbacks
"""
s = _restore_escapes(text)
s = _replace_super_sub(s)
s = _fallback_symbols(s)
return s
@cmd("palette.generate")
def palette_generate(argv: list):
"""Generate a color palette via design_engine.py and output as Python-ready ReportLab code.
Usage:
pdf.py palette.generate [--title "document title"] [--mode minimal|dark|pastel|jewel|light] [--harmony auto|complementary|...] [--format python|json|css]
If --title is provided, intent is auto-derived from the title.
Output formats:
python (default): Ready-to-paste ReportLab color variables
json: Raw palette JSON
css: CSS custom properties
"""
title = _pop_flag(argv, "--title", "-t") or ""
mode = _pop_flag(argv, "--mode", "-m") or "minimal"
harmony = _pop_flag(argv, "--harmony", "--harmony") or "auto"
fmt = _pop_flag(argv, "--format", "-f") or "python"
seed_str = _pop_flag(argv, "--seed", "--seed")
seed = int(seed_str) if seed_str else None
engine_script = _SCRIPT_DIR / "design_engine.py"
if not engine_script.exists():
Output.error("DependencyMissing", f"design_engine.py not found at {engine_script}")
# Import design_engine dynamically
import importlib.util
spec = importlib.util.spec_from_file_location("design_engine", str(engine_script))
de = importlib.util.module_from_spec(spec)
spec.loader.exec_module(de)
# Auto-derive intent from title
if title:
intent = de.derive_intent(title)
else:
intent = "neutral"
palette = de.generate_color_palette(intent, mode, harmony=harmony, seed=seed)
violations = de.audit_palette(palette)
if fmt == "json":
print(json.dumps(palette, indent=2, ensure_ascii=False))
elif fmt == "css":
print(de.palette_to_css(palette))
else:
# Python format: ready-to-paste ReportLab code
print("# \u2501\u2501 Color Palette (auto-generated by pdf.py palette.generate) \u2501\u2501")
print(f"# Intent: {intent} | Mode: {mode} | Harmony: {palette['meta']['harmony']}")
print(f"# Contrast: text:bg={palette['meta']['contrast']['text_on_bg']} | accent:bg={palette['meta']['contrast']['accent_on_bg']}")
print("from reportlab.lib import colors")
print(f"ACCENT = colors.HexColor('{palette['accent']}')")
print(f"TEXT_PRIMARY = colors.HexColor('{palette['text']}')")
print(f"TEXT_MUTED = colors.HexColor('{palette['muted']}')")
print(f"BG_SURFACE = colors.HexColor('{palette['mid']}')")
print(f"BG_PAGE = colors.HexColor('{palette['bg']}')")
print(f"SURFACE_RGBA = '{palette['surface']}' # For CSS/HTML elements")
print("")
print("# ReportLab table colors")
print("TABLE_HEADER_COLOR = ACCENT")
print("TABLE_HEADER_TEXT = colors.white")
print("TABLE_ROW_EVEN = colors.white")
print("TABLE_ROW_ODD = BG_SURFACE")
if violations:
print(f"\n# \u26a0\ufe0f Palette audit warnings:", file=sys.stderr)
for v in violations:
print(f"# - {v}", file=sys.stderr)
raise SystemExit(0)
@cmd("palette.cascade")
def palette_cascade(argv: list):
"""Generate a role-based cascade palette (area ∝ 1/saturation).
Usage:
pdf.py palette.cascade [--title "document title"] [--mode minimal|dark|pastel|jewel|light] [--harmony auto|...] [--format summary|json|css|reportlab]
The cascade palette produces 12 named roles + 4 semantic colors, each assigned
to a size tier (XL/L/M/S/XS) with saturation caps enforced per tier.
Cover, body, and chart colors all pull from the same palette — no color drift.
"""
title = _pop_flag(argv, "--title", "-t") or ""
mode = _pop_flag(argv, "--mode", "-m") or "minimal"
harmony = _pop_flag(argv, "--harmony", "--harmony") or "auto"
fmt = _pop_flag(argv, "--format", "-f") or "summary"
seed_str = _pop_flag(argv, "--seed", "--seed")
seed = int(seed_str) if seed_str else None
engine_script = _SCRIPT_DIR / "design_engine.py"
if not engine_script.exists():
Output.error("DependencyMissing", f"design_engine.py not found at {engine_script}")
import importlib.util
spec = importlib.util.spec_from_file_location("design_engine", str(engine_script))
de = importlib.util.module_from_spec(spec)
spec.loader.exec_module(de)
if title:
intent = de.derive_intent(title)
else:
intent = "neutral"
cascade = de.generate_cascade_palette(intent, mode, harmony=harmony, seed=seed)
if fmt == "json":
print(json.dumps(cascade, indent=2, ensure_ascii=False, default=str))
elif fmt == "css":
print(cascade["css"])
elif fmt == "reportlab":
print(cascade["reportlab"])
else:
# Summary format
meta = cascade["meta"]
print(f"🎨 Cascade Palette | Intent: {meta['intent']} | Mode: {meta['mode']} | Harmony: {meta['harmony']}")
print(f" Base hue: {meta['base_hue']}° | Accent hue: {meta['accent_hue']}° | Secondary: {meta['secondary_hue']}°")
print(f" Contrast: text:bg={meta['contrast']['text_on_bg']} | accent:bg={meta['contrast']['accent_on_bg']}")
print()
print(" TIER | ROLE | HEX | S | USAGE")
print(" ────── | ────────────────── | ─────── | ────── | ────────────")
for name, info in cascade["roles"].items():
tier = info['tier'].upper().ljust(6)
nm = name.ljust(18)
hx = info['hex'].ljust(7)
s_val = f"{info['hsl'][1]:.3f}".ljust(6)
print(f" {tier} | {nm} | {hx} | {s_val} | {info['usage']}")
print()
print(" Semantic:")
for name, info in cascade["semantic"].items():
print(f" {name}: {info['hex']} (S={info['hsl'][1]:.3f})")
if meta["audit"]:
print(f"\n ⚠️ Violations:")
for v in meta["audit"]:
print(f" - {v}")
else:
print(f"\n ✅ All tier constraints pass")
raise SystemExit(0)
@cmd("code.sanitize")
def code_sanitize(argv: list):
"""Sanitize Unicode in a Python script for PDF generation."""
if not argv:
Output.error("MissingArg", "Usage: code.sanitize <target_script.py>")
target = argv[0]
with open(target, "r", encoding="utf-8") as f:
code = f.read()
sanitized = sanitize_code(code)
with open(target, "w", encoding="utf-8") as f:
f.write(sanitized)
print(f"Sanitized: {target}")
raise SystemExit(0)
@cmd("content.sanitize")
def content_sanitize_cli(argv: list):
"""Sanitize content text for PDF rendering (dry-run report).
Reads a text file and reports what content_sanitize() would change.
Useful for debugging before PDF generation.
Usage:
pdf.py content.sanitize <text_file>
pdf.py content.sanitize <text_file> --apply
"""
if not argv:
Output.error("MissingArg", "Usage: content.sanitize <text_file> [--apply]")
target = argv[0]
apply_flag = "--apply" in argv
with open(target, "r", encoding="utf-8") as f:
raw = f.read()
global _content_sanitize_warnings
_content_sanitize_warnings = []
cleaned = content_sanitize(raw, dry_run=True)
changes = len(_content_sanitize_warnings)
result: Dict[str, Any] = {
"file": target,
"original_chars": len(raw),
"cleaned_chars": len(cleaned),
"changes": changes,
"details": _content_sanitize_warnings[:50],
}
if apply_flag and changes > 0:
with open(target, "w", encoding="utf-8") as f:
f.write(cleaned)
result["applied"] = True
else:
result["applied"] = False
if changes > 0:
result["hint"] = "Run with --apply to write cleaned text back to file."
print(json.dumps(result, ensure_ascii=False, indent=2))
raise SystemExit(0)
# ═══════════════════════════════════════════════════════════════
# Section 8: CLI dispatcher
# ═══════════════════════════════════════════════════════════════
def _usage():
sys.stdout.write(__doc__.strip() + "\n")
raise SystemExit(0)
def main():
tokens = sys.argv[1:]
if not tokens or tokens[0] in ("-h", "--help"):
_usage()
cmd_name = tokens.pop(0)
# Direct match
handler = _COMMANDS.get(cmd_name)
if handler is not None:
handler(tokens)
return
# Two-word match (e.g., "extract text" -> "extract.text")
if tokens:
compound = f"{cmd_name}.{tokens[0]}"
handler = _COMMANDS.get(compound)
if handler is not None:
tokens.pop(0)
handler(tokens)
return
# List commands in group
group_cmds = [k for k in _COMMANDS if k.startswith(cmd_name + ".")]
if group_cmds:
print(f"Available commands in '{cmd_name}':")
for c in sorted(group_cmds):
print(f" {c}")
raise SystemExit(0)
print(f"Unknown command: {cmd_name}\n")
_usage()
if __name__ == "__main__":
main()