2047 lines
70 KiB
Python
Executable File
2047 lines
70 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
PDF Processing Toolkit — All-in-One CLI
|
|
|
|
Usage:
|
|
python3 pdf.py <command> [args...]
|
|
|
|
Commands:
|
|
env.check [--json] Check environment dependencies
|
|
env.fix Auto-install missing dependencies
|
|
|
|
extract.text <pdf> [-p pages]
|
|
extract.table <pdf> [-p pages]
|
|
extract.image <pdf> -o <dir>
|
|
|
|
pages.merge <pdf>... -o <out>
|
|
pages.split <pdf> -o <dir>
|
|
pages.rotate <pdf> <deg> -o <out> [-p pages]
|
|
pages.crop <pdf> <box> -o <out> [-p pages]
|
|
|
|
meta.get <pdf>
|
|
meta.set <pdf> -o <out> -d <json>
|
|
meta.brand <pdf>... [-o <out>] [-t title] [-q]
|
|
|
|
form.info <pdf>
|
|
form.fill <pdf> -o <out> -d <json>
|
|
form.detail <pdf> <output.json>
|
|
form.fill-legacy <pdf> <fields.json> <output.pdf>
|
|
form.annotate <pdf> <fields.json> <output.pdf>
|
|
form.render <pdf> <output_dir> [--max-dim N]
|
|
form.validate <page> <fields.json> <input_img> <output_img>
|
|
form.check-bbox <fields.json>
|
|
|
|
convert.office <file> [-o <out>]
|
|
convert.html <file> [-o <out>] [--css <file>]
|
|
convert.latex <file> [--runs N] [--keep-logs]
|
|
|
|
code.sanitize <file>
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import html
|
|
import json
|
|
import os
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Callable, Dict, List, Optional, Set, Tuple
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════
|
|
# Section 0: Framework — Output, @cmd registry, CLI parser
|
|
# ═══════════════════════════════════════════════════════════════
|
|
|
|
class Output:
|
|
"""Structured JSON output for all subcommands."""
|
|
|
|
@staticmethod
|
|
def success(data: dict):
|
|
payload = {"status": "success", "data": data}
|
|
sys.stdout.write(json.dumps(payload, ensure_ascii=False, indent=2) + "\n")
|
|
raise SystemExit(0)
|
|
|
|
@staticmethod
|
|
def error(error: str, message: str, hint: Optional[str] = None, code: int = 1):
|
|
payload = {"status": "error", "error": error, "message": message}
|
|
if hint is not None:
|
|
payload["hint"] = hint
|
|
sys.stderr.write(json.dumps(payload, ensure_ascii=False, indent=2) + "\n")
|
|
raise SystemExit(code)
|
|
|
|
@staticmethod
|
|
def check_file(filepath: str) -> Path:
|
|
target = Path(filepath)
|
|
if not target.exists():
|
|
Output.error("FileNotFound", f"File not found: {filepath}", code=2)
|
|
return target
|
|
|
|
|
|
# Command registry
|
|
_COMMANDS: Dict[str, Callable] = {}
|
|
|
|
|
|
def cmd(name: str):
|
|
"""Decorator to register a CLI command under a dotted namespace."""
|
|
def decorator(fn: Callable) -> Callable:
|
|
_COMMANDS[name] = fn
|
|
return fn
|
|
return decorator
|
|
|
|
|
|
def _pop_flag(argv: list, short: str, long: str, needs_value: bool = True):
|
|
"""Extract a flag (and optional value) from *argv* in-place."""
|
|
for idx, tok in enumerate(argv):
|
|
if tok in (short, long):
|
|
argv.pop(idx)
|
|
if needs_value:
|
|
if idx < len(argv):
|
|
return argv.pop(idx)
|
|
Output.error("MissingArg", f"Flag {long} requires a value")
|
|
return True
|
|
return None
|
|
|
|
|
|
def _load_json_arg(argv: list) -> dict:
|
|
"""Read JSON from -d/--data string or -f/--file path."""
|
|
raw = _pop_flag(argv, "-d", "--data")
|
|
if raw is not None:
|
|
try:
|
|
return json.loads(raw)
|
|
except json.JSONDecodeError as exc:
|
|
Output.error("InvalidJSON", f"JSON parse error: {exc}")
|
|
|
|
fpath = _pop_flag(argv, "-f", "--file")
|
|
if fpath is not None:
|
|
try:
|
|
with open(fpath) as fh:
|
|
return json.load(fh)
|
|
except Exception as exc:
|
|
Output.error("FileError", f"Failed to read file: {exc}")
|
|
|
|
Output.error("MissingData", "Requires --data or --file argument")
|
|
|
|
|
|
def _resolve_page_indices(range_spec: Optional[str], page_count: int) -> List[int]:
|
|
"""Turn a human-friendly range string (1-indexed) into a sorted list of 0-based indices."""
|
|
if not range_spec:
|
|
return list(range(page_count))
|
|
indices: Set[int] = set()
|
|
for segment in range_spec.split(","):
|
|
segment = segment.strip()
|
|
if "-" in segment:
|
|
lo, hi = segment.split("-", 1)
|
|
for i in range(int(lo) - 1, min(int(hi), page_count)):
|
|
indices.add(i)
|
|
else:
|
|
val = int(segment) - 1
|
|
if 0 <= val < page_count:
|
|
indices.add(val)
|
|
return sorted(indices)
|
|
|
|
|
|
_SCRIPT_DIR = Path(__file__).resolve().parent
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════
|
|
# Section 1: env — environment diagnostics and auto-fix
|
|
# ═══════════════════════════════════════════════════════════════
|
|
|
|
def _probe_cmd(name: str, version_args: Optional[List[str]] = None) -> Tuple[str, str]:
|
|
"""Check if a command exists and optionally get its version. Returns (status, detail)."""
|
|
path = shutil.which(name)
|
|
if path is None:
|
|
return ("missing", "")
|
|
if version_args is None:
|
|
return ("ok", "")
|
|
try:
|
|
result = subprocess.run(
|
|
[path] + version_args,
|
|
capture_output=True, text=True, timeout=10
|
|
)
|
|
ver = result.stdout.strip() or result.stderr.strip()
|
|
return ("ok", ver)
|
|
except Exception:
|
|
return ("ok", "")
|
|
|
|
|
|
def _probe_python_module(mod_name: str) -> Tuple[str, str]:
|
|
"""Check if a Python module is importable and get its version."""
|
|
try:
|
|
result = subprocess.run(
|
|
[sys.executable, "-c", f"import {mod_name}; print(getattr({mod_name}, '__version__', 'installed'))"],
|
|
capture_output=True, text=True, timeout=10
|
|
)
|
|
if result.returncode == 0:
|
|
return ("ok", result.stdout.strip())
|
|
return ("missing", "")
|
|
except Exception:
|
|
return ("missing", "")
|
|
|
|
|
|
def _probe_node() -> Tuple[str, str]:
|
|
s, d = _probe_cmd("node", ["--version"])
|
|
if s == "ok" and d:
|
|
d = d.lstrip("v")
|
|
return (s, d)
|
|
|
|
|
|
def _probe_python() -> Tuple[str, str]:
|
|
try:
|
|
import platform
|
|
return ("ok", platform.python_version())
|
|
except Exception:
|
|
return ("ok", "")
|
|
|
|
|
|
def _probe_libreoffice() -> Tuple[str, str]:
|
|
candidates = [
|
|
"/Applications/LibreOffice.app/Contents/MacOS/soffice",
|
|
os.path.expanduser("~/Applications/LibreOffice.app/Contents/MacOS/soffice"),
|
|
"/usr/bin/soffice",
|
|
"/usr/local/bin/soffice",
|
|
"/usr/lib/libreoffice/program/soffice",
|
|
"/opt/libreoffice/program/soffice",
|
|
"/snap/bin/libreoffice.soffice",
|
|
]
|
|
for c in candidates:
|
|
if Path(c).is_file():
|
|
return ("ok", "")
|
|
for alias in ("soffice", "libreoffice"):
|
|
if shutil.which(alias):
|
|
return ("ok", "")
|
|
return ("missing", "")
|
|
|
|
|
|
def _probe_tectonic() -> Tuple[str, str]:
|
|
home_bin = Path.home() / "tectonic"
|
|
if home_bin.exists() and os.access(home_bin, os.X_OK):
|
|
return ("ok", "")
|
|
tec_local = _SCRIPT_DIR / "tectonic"
|
|
if tec_local.exists() and os.access(tec_local, os.X_OK):
|
|
return ("ok", "")
|
|
if shutil.which("tectonic"):
|
|
return ("ok", "")
|
|
return ("missing", "")
|
|
|
|
|
|
def _probe_playwright_npm() -> Tuple[str, str]:
|
|
"""Check if playwright npm package is installed."""
|
|
try:
|
|
result = subprocess.run(
|
|
["node", "-e", "console.log(require('playwright/package.json').version)"],
|
|
capture_output=True, text=True, timeout=10
|
|
)
|
|
if result.returncode == 0 and result.stdout.strip():
|
|
return ("ok", result.stdout.strip())
|
|
except Exception:
|
|
pass
|
|
# Try global
|
|
try:
|
|
result = subprocess.run(
|
|
["npm", "list", "-g", "playwright", "--depth=0"],
|
|
capture_output=True, text=True, timeout=15
|
|
)
|
|
import re as _re
|
|
m = _re.search(r"playwright@(\S+)", result.stdout)
|
|
if m:
|
|
return ("ok", m.group(1))
|
|
except Exception:
|
|
pass
|
|
return ("missing", "")
|
|
|
|
|
|
def _probe_chromium() -> Tuple[str, str]:
|
|
"""Check if Playwright Chromium browser is installed."""
|
|
import platform as _platform
|
|
home = Path.home()
|
|
if _platform.system() == "Darwin":
|
|
cache_dir = home / "Library" / "Caches" / "ms-playwright"
|
|
else:
|
|
cache_dir = home / ".cache" / "ms-playwright"
|
|
if cache_dir.is_dir():
|
|
for entry in sorted(cache_dir.iterdir(), reverse=True):
|
|
if "chromium" in entry.name.lower():
|
|
return ("ok", entry.name)
|
|
return ("missing", "")
|
|
|
|
|
|
@cmd("env.check")
|
|
def env_check(argv: list):
|
|
"""Check environment dependencies."""
|
|
use_json = _pop_flag(argv, "-j", "--json", needs_value=False)
|
|
|
|
s_node = _probe_node()
|
|
s_pw = _probe_playwright_npm()
|
|
s_cr = _probe_chromium()
|
|
s_py = _probe_python()
|
|
s_pike = _probe_python_module("pikepdf")
|
|
s_plumb = _probe_python_module("pdfplumber")
|
|
s_lo = _probe_libreoffice()
|
|
s_tec = _probe_tectonic()
|
|
s_pw_py = _probe_python_module("playwright")
|
|
|
|
if use_json:
|
|
report = {
|
|
"html_route": {
|
|
"node": s_node[0], "node_version": s_node[1],
|
|
"playwright": s_pw[0], "playwright_version": s_pw[1],
|
|
"chromium": s_cr[0], "chromium_detail": s_cr[1],
|
|
},
|
|
"process_route": {
|
|
"python3": s_py[0], "python3_version": s_py[1],
|
|
"pikepdf": s_pike[0], "pikepdf_version": s_pike[1],
|
|
"pdfplumber": s_plumb[0], "pdfplumber_version": s_plumb[1],
|
|
"playwright_python": s_pw_py[0], "playwright_python_version": s_pw_py[1],
|
|
},
|
|
"optional": {
|
|
"libreoffice": s_lo[0],
|
|
"tectonic": s_tec[0],
|
|
},
|
|
}
|
|
sys.stdout.write(json.dumps(report, ensure_ascii=False, indent=2) + "\n")
|
|
# Determine exit code
|
|
rc = 0
|
|
for v in [s_node, s_pw, s_cr, s_py, s_pike, s_plumb]:
|
|
if v[0] != "ok":
|
|
rc = 2
|
|
break
|
|
raise SystemExit(rc)
|
|
|
|
# Human-readable output
|
|
rc = 0
|
|
|
|
def show(name: str, status: Tuple[str, str], optional: bool = False):
|
|
nonlocal rc
|
|
s, d = status
|
|
if s == "ok":
|
|
detail = f" ({d})" if d else ""
|
|
print(f" \u2713 {name}{detail}")
|
|
elif optional:
|
|
print(f" \u25cb {name} (optional, not installed)")
|
|
else:
|
|
print(f" \u2717 {name} (missing)")
|
|
rc = 2
|
|
|
|
print("=== PDF Skill Environment ===\n")
|
|
print("--- HTML Route ---")
|
|
show("node", s_node)
|
|
show("playwright", s_pw)
|
|
show("chromium", s_cr)
|
|
|
|
print("\n--- Process Route ---")
|
|
show("python3", s_py)
|
|
show("pikepdf", s_pike)
|
|
show("pdfplumber", s_plumb)
|
|
if s_pw_py[0] == "ok":
|
|
print(f" (playwright-python: {s_pw_py[1]})")
|
|
|
|
print("\n--- Optional ---")
|
|
show("libreoffice", s_lo, optional=True)
|
|
show("tectonic", s_tec, optional=True)
|
|
|
|
print("\n=== Install Commands ===")
|
|
print(" Node.js: brew install node (macOS) / apt install nodejs (Ubuntu)")
|
|
print(" Playwright: npm install -g playwright && npx playwright install chromium")
|
|
print(" Python: brew install python3 (macOS) / apt install python3 (Ubuntu)")
|
|
print(" pikepdf: pip install pikepdf pdfplumber --user")
|
|
print(" LibreOffice: brew install --cask libreoffice (macOS)")
|
|
print(" Tectonic: curl -fsSL https://drop-sh.fullyjustified.net | sh")
|
|
raise SystemExit(rc)
|
|
|
|
|
|
@cmd("env.fix")
|
|
def env_fix(argv: list):
|
|
"""Auto-install missing Python dependencies."""
|
|
modules = {
|
|
"pikepdf": "pikepdf",
|
|
"pdfplumber": "pdfplumber",
|
|
"pypdf": "pypdf",
|
|
"pdf2image": "pdf2image",
|
|
"PIL": "Pillow",
|
|
}
|
|
installed = []
|
|
for mod, pkg in modules.items():
|
|
s, _ = _probe_python_module(mod)
|
|
if s == "missing":
|
|
print(f"Installing {pkg}...")
|
|
for attempt in (
|
|
[sys.executable, "-m", "pip", "install", "-q", pkg],
|
|
[sys.executable, "-m", "pip", "install", "-q", "--user", pkg],
|
|
[sys.executable, "-m", "pip", "install", "-q", "--break-system-packages", pkg],
|
|
):
|
|
result = subprocess.run(attempt, capture_output=True, text=True)
|
|
if result.returncode == 0:
|
|
installed.append(pkg)
|
|
break
|
|
else:
|
|
print(f" Failed to install {pkg}")
|
|
|
|
if installed:
|
|
print(f"\nInstalled: {', '.join(installed)}")
|
|
else:
|
|
print("All Python dependencies are already installed.")
|
|
raise SystemExit(0)
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════
|
|
# Section 2: extract — text, tables, and embedded images
|
|
# ═══════════════════════════════════════════════════════════════
|
|
|
|
@cmd("extract.text")
|
|
def extract_text(argv: list):
|
|
"""Pull plain text from selected pages."""
|
|
if not argv:
|
|
Output.error("MissingArg", "pdf path required")
|
|
pdf_path = argv.pop(0)
|
|
page_range = _pop_flag(argv, "-p", "--pages")
|
|
|
|
import pdfplumber
|
|
src = Output.check_file(pdf_path)
|
|
try:
|
|
doc = pdfplumber.open(src)
|
|
except Exception as exc:
|
|
Output.error("PDFError", f"Cannot open PDF: {exc}", code=3)
|
|
|
|
total_pages = len(doc.pages)
|
|
target_pages = _resolve_page_indices(page_range, total_pages)
|
|
char_total = 0
|
|
page_results = []
|
|
|
|
for pg_idx in target_pages:
|
|
content = doc.pages[pg_idx].extract_text() or ""
|
|
char_total += len(content)
|
|
page_results.append({"page": pg_idx + 1, "chars": len(content), "text": content})
|
|
|
|
doc.close()
|
|
Output.success({
|
|
"total_pages": total_pages,
|
|
"extracted_pages": len(target_pages),
|
|
"total_chars": char_total,
|
|
"pages": page_results,
|
|
})
|
|
|
|
|
|
@cmd("extract.table")
|
|
def extract_table(argv: list):
|
|
"""Locate and return every table on selected pages."""
|
|
if not argv:
|
|
Output.error("MissingArg", "pdf path required")
|
|
pdf_path = argv.pop(0)
|
|
page_range = _pop_flag(argv, "-p", "--pages")
|
|
|
|
import pdfplumber
|
|
src = Output.check_file(pdf_path)
|
|
try:
|
|
doc = pdfplumber.open(src)
|
|
except Exception as exc:
|
|
Output.error("PDFError", f"Cannot open PDF: {exc}", code=3)
|
|
|
|
total_pages = len(doc.pages)
|
|
target_pages = _resolve_page_indices(page_range, total_pages)
|
|
collected = []
|
|
|
|
for pg_idx in target_pages:
|
|
for tbl_num, raw_table in enumerate(doc.pages[pg_idx].extract_tables()):
|
|
if not raw_table:
|
|
continue
|
|
sanitised = [
|
|
[(cell.strip() if cell else "") for cell in row]
|
|
for row in raw_table
|
|
]
|
|
collected.append({
|
|
"page": pg_idx + 1,
|
|
"table_index": tbl_num,
|
|
"rows": len(sanitised),
|
|
"cols": len(sanitised[0]) if sanitised else 0,
|
|
"data": sanitised,
|
|
})
|
|
|
|
doc.close()
|
|
Output.success({
|
|
"total_pages": total_pages,
|
|
"extracted_pages": len(target_pages),
|
|
"total_tables": len(collected),
|
|
"tables": collected,
|
|
})
|
|
|
|
|
|
@cmd("extract.image")
|
|
def extract_image(argv: list):
|
|
"""Save every embedded raster image to output dir."""
|
|
if not argv:
|
|
Output.error("MissingArg", "pdf path required")
|
|
pdf_path = argv.pop(0)
|
|
out_dir = _pop_flag(argv, "-o", "--output") or "."
|
|
|
|
import pikepdf
|
|
src = Output.check_file(pdf_path)
|
|
dest = Path(out_dir)
|
|
dest.mkdir(parents=True, exist_ok=True)
|
|
|
|
try:
|
|
doc = pikepdf.open(src)
|
|
except Exception as exc:
|
|
Output.error("PDFError", f"Cannot open PDF: {exc}", code=3)
|
|
|
|
saved = []
|
|
seq = 0
|
|
_EXT_MAP = {
|
|
"/DCTDecode": "jpg",
|
|
"/FlateDecode": "png",
|
|
"/JPXDecode": "jp2",
|
|
}
|
|
|
|
for page_no, pg in enumerate(doc.pages, 1):
|
|
res = pg.get("/Resources")
|
|
if res is None or "/XObject" not in res:
|
|
continue
|
|
for key, ref in res.XObject.items():
|
|
try:
|
|
img_obj = doc.get_object(ref.objgen)
|
|
if img_obj.get("/Subtype") != "/Image":
|
|
continue
|
|
seq += 1
|
|
w = int(img_obj.get("/Width", 0))
|
|
h = int(img_obj.get("/Height", 0))
|
|
filt = img_obj.get("/Filter")
|
|
ext = _EXT_MAP.get(str(filt) if filt else None, "bin")
|
|
fname = f"page{page_no}_img{seq}.{ext}"
|
|
out_file = dest / fname
|
|
out_file.write_bytes(img_obj.read_raw_bytes())
|
|
saved.append({
|
|
"page": page_no, "name": str(key), "file": str(out_file),
|
|
"width": w, "height": h, "format": ext,
|
|
})
|
|
except Exception:
|
|
continue
|
|
|
|
doc.close()
|
|
Output.success({"output_dir": str(dest), "total_images": len(saved), "images": saved})
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════
|
|
# Section 3: pages — merge, split, rotate, crop
|
|
# ═══════════════════════════════════════════════════════════════
|
|
|
|
@cmd("pages.merge")
|
|
def pages_merge(argv: list):
|
|
"""Concatenate several PDF files into one."""
|
|
out_path = _pop_flag(argv, "-o", "--output")
|
|
if out_path is None:
|
|
Output.error("MissingArg", "--output is required")
|
|
if not argv:
|
|
Output.error("MissingArg", "At least one PDF required")
|
|
|
|
import pikepdf
|
|
sources = [Output.check_file(p) for p in argv]
|
|
handles = []
|
|
try:
|
|
combined = pikepdf.new()
|
|
descriptions = []
|
|
for src in sources:
|
|
handle = pikepdf.open(src)
|
|
handles.append(handle)
|
|
n = len(handle.pages)
|
|
descriptions.append(f"{src.name} ({n} pages)")
|
|
for pg in handle.pages:
|
|
combined.pages.append(pg)
|
|
total = len(combined.pages)
|
|
combined.save(out_path)
|
|
combined.close()
|
|
except Exception as exc:
|
|
Output.error("MergeError", f"Merge failed: {exc}", code=4)
|
|
finally:
|
|
for h in handles:
|
|
try:
|
|
h.close()
|
|
except Exception:
|
|
pass
|
|
|
|
Output.success({"output": out_path, "total_pages": total, "sources": descriptions})
|
|
|
|
|
|
@cmd("pages.split")
|
|
def pages_split(argv: list):
|
|
"""Write each page as a separate single-page PDF."""
|
|
if not argv:
|
|
Output.error("MissingArg", "pdf path required")
|
|
pdf_path = argv.pop(0)
|
|
out_dir = _pop_flag(argv, "-o", "--output") or "."
|
|
|
|
import pikepdf
|
|
src = Output.check_file(pdf_path)
|
|
dest = Path(out_dir)
|
|
dest.mkdir(parents=True, exist_ok=True)
|
|
|
|
try:
|
|
doc = pikepdf.open(src)
|
|
except Exception as exc:
|
|
Output.error("PDFError", f"Cannot open PDF: {exc}", code=3)
|
|
|
|
generated = []
|
|
base = src.stem
|
|
try:
|
|
for idx, pg in enumerate(doc.pages, 1):
|
|
fp = dest / f"{base}_page{idx:03d}.pdf"
|
|
single_doc = pikepdf.new()
|
|
single_doc.pages.append(pg)
|
|
single_doc.save(fp)
|
|
single_doc.close()
|
|
generated.append(str(fp))
|
|
doc.close()
|
|
except Exception as exc:
|
|
Output.error("SplitError", f"Split failed: {exc}", code=4)
|
|
|
|
Output.success({"output_dir": str(dest), "total_pages": len(generated), "files": generated})
|
|
|
|
|
|
@cmd("pages.rotate")
|
|
def pages_rotate(argv: list):
|
|
"""Rotate selected pages by 90/180/270 degrees."""
|
|
if len(argv) < 2:
|
|
Output.error("MissingArg", "pdf path and degrees required")
|
|
pdf_path = argv.pop(0)
|
|
degrees = int(argv.pop(0))
|
|
out_path = _pop_flag(argv, "-o", "--output")
|
|
if out_path is None:
|
|
Output.error("MissingArg", "--output is required")
|
|
page_range = _pop_flag(argv, "-p", "--pages")
|
|
|
|
if degrees not in (90, 180, 270):
|
|
Output.error("InvalidDegrees", "Rotation angle must be 90, 180, or 270")
|
|
|
|
import pikepdf
|
|
src = Output.check_file(pdf_path)
|
|
try:
|
|
doc = pikepdf.open(src)
|
|
except Exception as exc:
|
|
Output.error("PDFError", f"Cannot open PDF: {exc}", code=3)
|
|
|
|
targets = _resolve_page_indices(page_range, len(doc.pages))
|
|
try:
|
|
for i in targets:
|
|
existing = int(doc.pages[i].get("/Rotate", 0))
|
|
doc.pages[i]["/Rotate"] = (existing + degrees) % 360
|
|
doc.save(out_path)
|
|
doc.close()
|
|
except Exception as exc:
|
|
Output.error("RotateError", f"Rotation failed: {exc}", code=4)
|
|
|
|
Output.success({"output": out_path, "degrees": degrees, "pages_rotated": len(targets)})
|
|
|
|
|
|
@cmd("pages.crop")
|
|
def pages_crop(argv: list):
|
|
"""Set the media/crop box on selected pages. box = 'left,bottom,right,top' in pt."""
|
|
if len(argv) < 2:
|
|
Output.error("MissingArg", "pdf path and crop box required")
|
|
pdf_path = argv.pop(0)
|
|
box_str = argv.pop(0)
|
|
out_path = _pop_flag(argv, "-o", "--output")
|
|
if out_path is None:
|
|
Output.error("MissingArg", "--output is required")
|
|
page_range = _pop_flag(argv, "-p", "--pages")
|
|
|
|
try:
|
|
coords = [float(v.strip()) for v in box_str.split(",")]
|
|
assert len(coords) == 4
|
|
left, bottom, right, top = coords
|
|
except Exception:
|
|
Output.error("InvalidBox", "Invalid crop box format, should be: left,bottom,right,top",
|
|
hint="Example: 50,50,550,750")
|
|
|
|
import pikepdf
|
|
src = Output.check_file(pdf_path)
|
|
try:
|
|
doc = pikepdf.open(src)
|
|
except Exception as exc:
|
|
Output.error("PDFError", f"Cannot open PDF: {exc}", code=3)
|
|
|
|
targets = _resolve_page_indices(page_range, len(doc.pages))
|
|
try:
|
|
arr = pikepdf.Array([left, bottom, right, top])
|
|
for i in targets:
|
|
doc.pages[i].mediabox = arr
|
|
doc.pages[i].cropbox = arr
|
|
doc.save(out_path)
|
|
doc.close()
|
|
except Exception as exc:
|
|
Output.error("CropError", f"Crop failed: {exc}", code=4)
|
|
|
|
Output.success({
|
|
"output": out_path,
|
|
"box": {"left": left, "bottom": bottom, "right": right, "top": top},
|
|
"pages_cropped": len(targets),
|
|
})
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════
|
|
# Section 4: meta — metadata reading, writing, and branding
|
|
# ═══════════════════════════════════════════════════════════════
|
|
|
|
_XMP_MAPPING = {
|
|
"Title": "dc:title",
|
|
"Author": "dc:creator",
|
|
"Subject": "dc:description",
|
|
"Keywords": "pdf:Keywords",
|
|
"Creator": "xmp:CreatorTool",
|
|
"Producer": "pdf:Producer",
|
|
}
|
|
_ACCEPTED_KEYS = set(_XMP_MAPPING.keys())
|
|
|
|
|
|
@cmd("meta.get")
|
|
def meta_get(argv: list):
|
|
"""Read document information and metadata."""
|
|
if not argv:
|
|
Output.error("MissingArg", "pdf path required")
|
|
|
|
import pikepdf
|
|
src = Output.check_file(argv[0])
|
|
try:
|
|
doc = pikepdf.open(src)
|
|
except Exception as exc:
|
|
Output.error("PDFError", f"Cannot open PDF: {exc}", code=3)
|
|
|
|
record: dict = {
|
|
"pages": len(doc.pages),
|
|
"pdf_version": str(doc.pdf_version),
|
|
}
|
|
|
|
if doc.pages:
|
|
mb = doc.pages[0].mediabox
|
|
record["page_size"] = {
|
|
"width": float(mb[2] - mb[0]),
|
|
"height": float(mb[3] - mb[1]),
|
|
"unit": "pt",
|
|
}
|
|
|
|
kv_pairs = {}
|
|
if doc.docinfo:
|
|
for k in doc.docinfo.keys():
|
|
try:
|
|
kv_pairs[str(k).lstrip("/")] = str(doc.docinfo[k])
|
|
except Exception:
|
|
pass
|
|
record["metadata"] = kv_pairs
|
|
record["encrypted"] = doc.is_encrypted
|
|
record["has_form"] = "/AcroForm" in doc.Root
|
|
record["has_outlines"] = "/Outlines" in doc.Root
|
|
|
|
doc.close()
|
|
Output.success(record)
|
|
|
|
|
|
@cmd("meta.set")
|
|
def meta_set(argv: list):
|
|
"""Update XMP + legacy docinfo metadata fields."""
|
|
if not argv:
|
|
Output.error("MissingArg", "pdf path required")
|
|
pdf_path = argv.pop(0)
|
|
out_path = _pop_flag(argv, "-o", "--output")
|
|
if out_path is None:
|
|
Output.error("MissingArg", "--output is required")
|
|
data = _load_json_arg(argv)
|
|
|
|
import pikepdf
|
|
src = Output.check_file(pdf_path)
|
|
try:
|
|
doc = pikepdf.open(src)
|
|
except Exception as exc:
|
|
Output.error("PDFError", f"Cannot open PDF: {exc}", code=3)
|
|
|
|
# XMP layer
|
|
with doc.open_metadata() as xmp:
|
|
for raw_key, raw_val in data.items():
|
|
norm = raw_key.title()
|
|
xmp_key = _XMP_MAPPING.get(norm)
|
|
if xmp_key is None:
|
|
continue
|
|
try:
|
|
xmp[xmp_key] = str(raw_val)
|
|
except Exception:
|
|
pass
|
|
|
|
# Legacy docinfo layer
|
|
if not doc.docinfo:
|
|
doc.docinfo = pikepdf.Dictionary()
|
|
for raw_key, raw_val in data.items():
|
|
norm = raw_key.title()
|
|
if norm in _ACCEPTED_KEYS:
|
|
doc.docinfo[pikepdf.Name(f"/{norm}")] = pikepdf.String(str(raw_val))
|
|
|
|
doc.docinfo[pikepdf.Name("/ModDate")] = pikepdf.String(
|
|
datetime.now().strftime("D:%Y%m%d%H%M%S")
|
|
)
|
|
|
|
try:
|
|
doc.save(out_path)
|
|
doc.close()
|
|
except Exception as exc:
|
|
Output.error("SaveError", f"Save failed: {exc}", code=4)
|
|
|
|
Output.success({"output": out_path, "updated_fields": list(data.keys())})
|
|
|
|
|
|
@cmd("meta.brand")
|
|
def meta_brand(argv: list):
|
|
"""Add Z.ai branding metadata to PDF documents."""
|
|
output_path = _pop_flag(argv, "-o", "--output")
|
|
custom_title = _pop_flag(argv, "-t", "--title")
|
|
quiet = _pop_flag(argv, "-q", "--quiet", needs_value=False)
|
|
|
|
if not argv:
|
|
Output.error("MissingArg", "At least one PDF file required")
|
|
|
|
# Check if output is specified for multiple files
|
|
if output_path and len(argv) > 1:
|
|
Output.error("InvalidArg", "--output can only be used with a single input file")
|
|
|
|
from pypdf import PdfReader, PdfWriter
|
|
|
|
for input_path in argv:
|
|
if not os.path.exists(input_path):
|
|
print(f"Error: Input file not found: {input_path}", file=sys.stderr)
|
|
continue
|
|
|
|
try:
|
|
reader = PdfReader(input_path)
|
|
except Exception as e:
|
|
print(f"Error: Cannot open PDF: {e}", file=sys.stderr)
|
|
continue
|
|
|
|
writer = PdfWriter()
|
|
for page in reader.pages:
|
|
writer.add_page(page)
|
|
|
|
# Determine title
|
|
if custom_title:
|
|
title = custom_title
|
|
else:
|
|
original_meta = reader.metadata
|
|
if original_meta and original_meta.title and original_meta.title not in ('(anonymous)', 'unspecified', None):
|
|
title = original_meta.title
|
|
else:
|
|
title = os.path.splitext(os.path.basename(input_path))[0]
|
|
|
|
writer.add_metadata({
|
|
'/Title': title,
|
|
'/Author': 'Z.ai',
|
|
'/Creator': 'Z.ai',
|
|
'/Producer': 'http://z.ai',
|
|
})
|
|
|
|
# Write output
|
|
out = output_path if (len(argv) == 1 and output_path) else input_path
|
|
try:
|
|
with open(out, "wb") as f:
|
|
writer.write(f)
|
|
except Exception as e:
|
|
print(f"Error: Cannot write output file: {e}", file=sys.stderr)
|
|
continue
|
|
|
|
if not quiet:
|
|
print(f"\u2713 Updated metadata for: {os.path.basename(input_path)}")
|
|
print(f" Title: {title}")
|
|
print(f" Author: Z.ai")
|
|
print(f" Creator: Z.ai")
|
|
print(f" Producer: http://z.ai")
|
|
if out != input_path:
|
|
print(f" Output: {out}")
|
|
|
|
raise SystemExit(0)
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════
|
|
# Section 5: form — inspection, filling, annotation, rendering
|
|
# ═══════════════════════════════════════════════════════════════
|
|
|
|
# --- form.info (pikepdf-based) ---
|
|
|
|
_FIELD_TYPE_MAP = {
|
|
"/Tx": "text",
|
|
"/Sig": "signature",
|
|
}
|
|
|
|
|
|
def _classify_field(node) -> str:
|
|
"""Map a PDF field type token to a human label."""
|
|
ft = str(node.get("/FT", ""))
|
|
if ft in _FIELD_TYPE_MAP:
|
|
return _FIELD_TYPE_MAP[ft]
|
|
flags = int(node.get("/Ff", 0))
|
|
if ft == "/Btn":
|
|
return "radio" if (flags & (1 << 15)) else "checkbox"
|
|
if ft == "/Ch":
|
|
return "dropdown" if (flags & (1 << 17)) else "listbox"
|
|
return "unknown"
|
|
|
|
|
|
def _extra_props(node, kind: str) -> dict:
|
|
"""Gather type-specific metadata (options, checked value, etc.)."""
|
|
props: dict = {}
|
|
if kind == "checkbox":
|
|
ap = node.get("/AP")
|
|
if ap and "/N" in ap:
|
|
states = [str(s) for s in ap["/N"].keys()]
|
|
props["states"] = states
|
|
props["checked_value"] = next((s for s in states if s != "/Off"), states[0] if states else None)
|
|
elif kind in ("dropdown", "listbox"):
|
|
raw_opts = node.get("/Opt")
|
|
if raw_opts:
|
|
props["options"] = [
|
|
{"value": str(item[0]), "label": str(item[1])} if isinstance(item, list) and len(item) >= 2
|
|
else {"value": str(item), "label": str(item)}
|
|
for item in raw_opts
|
|
]
|
|
elif kind == "radio":
|
|
kids = node.get("/Kids")
|
|
if kids:
|
|
radio_vals = []
|
|
for child in kids:
|
|
ap = child.get("/AP")
|
|
if ap and "/N" in ap:
|
|
radio_vals.extend(str(k) for k in ap["/N"].keys() if str(k) != "/Off")
|
|
if radio_vals:
|
|
props["options"] = radio_vals
|
|
return props
|
|
|
|
|
|
def _current_value(node):
|
|
v = node.get("/V")
|
|
return str(v) if v is not None else None
|
|
|
|
|
|
def _gather_fields(doc) -> list:
|
|
"""Walk the AcroForm field tree iteratively and return a flat list."""
|
|
if "/AcroForm" not in doc.Root:
|
|
return []
|
|
acro = doc.Root.AcroForm
|
|
if "/Fields" not in acro:
|
|
return []
|
|
|
|
page_lookup = {pg.objgen: idx for idx, pg in enumerate(doc.pages)}
|
|
results = []
|
|
stack = [(field, "") for field in reversed(list(acro.Fields))]
|
|
|
|
while stack:
|
|
node, parent_path = stack.pop()
|
|
name = str(node.get("/T", ""))
|
|
full = f"{parent_path}.{name}" if parent_path else name
|
|
|
|
kids = node.get("/Kids")
|
|
if kids and any("/T" in k for k in kids):
|
|
for kid in reversed(list(kids)):
|
|
stack.append((kid, full))
|
|
continue
|
|
|
|
kind = _classify_field(node)
|
|
if kind == "unknown":
|
|
continue
|
|
|
|
entry = {"id": full, "type": kind}
|
|
val = _current_value(node)
|
|
if val:
|
|
entry["current_value"] = val
|
|
entry.update(_extra_props(node, kind))
|
|
|
|
page_ref = node.get("/P")
|
|
if page_ref and hasattr(page_ref, "objgen"):
|
|
pg_num = page_lookup.get(page_ref.objgen)
|
|
if pg_num is not None:
|
|
entry["page"] = pg_num + 1
|
|
|
|
results.append(entry)
|
|
|
|
return results
|
|
|
|
|
|
@cmd("form.info")
|
|
def form_info(argv: list):
|
|
"""Return structured JSON describing every form field (pikepdf + check_fillable)."""
|
|
if not argv:
|
|
Output.error("MissingArg", "pdf path required")
|
|
|
|
import pikepdf
|
|
src = Output.check_file(argv[0])
|
|
try:
|
|
doc = pikepdf.open(src)
|
|
except Exception as exc:
|
|
Output.error("PDFError", f"Cannot open PDF: {exc}", code=3)
|
|
|
|
fields = _gather_fields(doc)
|
|
if not fields:
|
|
Output.success({"has_fields": False, "count": 0, "fields": [], "hint": "This PDF has no fillable form fields"})
|
|
Output.success({"has_fields": True, "count": len(fields), "fields": fields})
|
|
|
|
|
|
@cmd("form.fill")
|
|
def form_fill(argv: list):
|
|
"""Write values into a fillable PDF (pikepdf version)."""
|
|
if not argv:
|
|
Output.error("MissingArg", "pdf path required")
|
|
pdf_path = argv.pop(0)
|
|
out_path = _pop_flag(argv, "-o", "--output")
|
|
if out_path is None:
|
|
Output.error("MissingArg", "--output is required")
|
|
data = _load_json_arg(argv)
|
|
|
|
import pikepdf
|
|
src = Output.check_file(pdf_path)
|
|
try:
|
|
doc = pikepdf.open(src)
|
|
except Exception as exc:
|
|
Output.error("PDFError", f"Cannot open PDF: {exc}", code=3)
|
|
|
|
if "/AcroForm" not in doc.Root or "/Fields" not in doc.Root.AcroForm:
|
|
Output.error("NoForm", "This PDF has no form fields")
|
|
|
|
known = {f["id"]: f for f in _gather_fields(doc)}
|
|
|
|
# Validation
|
|
issues = []
|
|
for fid, fval in data.items():
|
|
if fid not in known:
|
|
issues.append(f"Field not found: {fid}")
|
|
continue
|
|
fmeta = known[fid]
|
|
ftype = fmeta["type"]
|
|
if ftype == "checkbox" and "states" in fmeta:
|
|
ok_vals = fmeta["states"]
|
|
if fval not in ok_vals and f"/{fval}" not in ok_vals and fval not in ("true", "True", "false", "False", "1", "0"):
|
|
issues.append(f"Invalid value for field {fid}, options: {ok_vals} or true/false")
|
|
if ftype in ("dropdown", "listbox") and "options" in fmeta:
|
|
ok_vals = [o["value"] for o in fmeta["options"]]
|
|
if fval not in ok_vals:
|
|
issues.append(f"Invalid value for field {fid}, options: {ok_vals}")
|
|
if issues:
|
|
Output.error("ValidationError", "Field validation failed", hint="; ".join(issues))
|
|
|
|
# Fill
|
|
written = 0
|
|
|
|
def _apply(node, parent_path=""):
|
|
nonlocal written
|
|
name = str(node.get("/T", ""))
|
|
full = f"{parent_path}.{name}" if parent_path else name
|
|
kids = node.get("/Kids")
|
|
if kids and any("/T" in k for k in kids):
|
|
for kid in kids:
|
|
_apply(kid, full)
|
|
return
|
|
if full not in data:
|
|
return
|
|
val = data[full]
|
|
kind = _classify_field(node)
|
|
if kind == "checkbox":
|
|
if val in ("true", "True", "1", True):
|
|
ap = node.get("/AP")
|
|
if ap and "/N" in ap:
|
|
checked_name = next((str(k) for k in ap["/N"].keys() if str(k) != "/Off"), "/Yes")
|
|
if not checked_name.startswith("/"):
|
|
checked_name = f"/{checked_name}"
|
|
node["/V"] = pikepdf.Name(checked_name)
|
|
node["/AS"] = pikepdf.Name(checked_name)
|
|
else:
|
|
node["/V"] = pikepdf.Name("/Off")
|
|
node["/AS"] = pikepdf.Name("/Off")
|
|
else:
|
|
node["/V"] = pikepdf.String(str(val))
|
|
written += 1
|
|
|
|
for field in doc.Root.AcroForm.Fields:
|
|
_apply(field)
|
|
|
|
acro = doc.Root.AcroForm
|
|
if "/NeedAppearances" not in acro:
|
|
acro["/NeedAppearances"] = True
|
|
|
|
try:
|
|
doc.save(out_path)
|
|
except Exception as exc:
|
|
Output.error("SaveError", f"Save failed: {exc}", code=4)
|
|
|
|
Output.success({"output": out_path, "fields_filled": written, "fields_requested": len(data)})
|
|
|
|
|
|
# --- form.detail (pypdf-based detailed field extraction) ---
|
|
|
|
def _get_full_annotation_field_id(annotation):
|
|
"""Build dotted field ID by walking parent chain."""
|
|
components = []
|
|
while annotation:
|
|
field_name = annotation.get('/T')
|
|
if field_name:
|
|
components.append(field_name)
|
|
annotation = annotation.get('/Parent')
|
|
return ".".join(reversed(components)) if components else None
|
|
|
|
|
|
def _make_field_dict(field, field_id):
|
|
field_dict = {"field_id": field_id}
|
|
ft = field.get('/FT')
|
|
if ft == "/Tx":
|
|
field_dict["type"] = "text"
|
|
elif ft == "/Btn":
|
|
field_dict["type"] = "checkbox"
|
|
states = field.get("/_States_", [])
|
|
if len(states) == 2:
|
|
if "/Off" in states:
|
|
field_dict["checked_value"] = states[0] if states[0] != "/Off" else states[1]
|
|
field_dict["unchecked_value"] = "/Off"
|
|
else:
|
|
print(f"Unexpected state values for checkbox `${field_id}`. Its checked and unchecked values may not be correct; if you're trying to check it, visually verify the results.")
|
|
field_dict["checked_value"] = states[0]
|
|
field_dict["unchecked_value"] = states[1]
|
|
elif ft == "/Ch":
|
|
field_dict["type"] = "choice"
|
|
states = field.get("/_States_", [])
|
|
field_dict["choice_options"] = [{
|
|
"value": state[0],
|
|
"text": state[1],
|
|
} for state in states]
|
|
else:
|
|
field_dict["type"] = f"unknown ({ft})"
|
|
return field_dict
|
|
|
|
|
|
def _get_field_info(reader) -> list:
|
|
"""Extract detailed field info from a PdfReader, including radio group aggregation."""
|
|
fields = reader.get_fields()
|
|
|
|
field_info_by_id = {}
|
|
possible_radio_names: Set[str] = set()
|
|
|
|
for field_id, field in fields.items():
|
|
if field.get("/Kids"):
|
|
if field.get("/FT") == "/Btn":
|
|
possible_radio_names.add(field_id)
|
|
continue
|
|
field_info_by_id[field_id] = _make_field_dict(field, field_id)
|
|
|
|
radio_fields_by_id: Dict[str, dict] = {}
|
|
|
|
for page_index, page in enumerate(reader.pages):
|
|
annotations = page.get('/Annots', [])
|
|
for ann in annotations:
|
|
field_id = _get_full_annotation_field_id(ann)
|
|
if field_id in field_info_by_id:
|
|
field_info_by_id[field_id]["page"] = page_index + 1
|
|
field_info_by_id[field_id]["rect"] = ann.get('/Rect')
|
|
elif field_id in possible_radio_names:
|
|
try:
|
|
on_values = [v for v in ann["/AP"]["/N"] if v != "/Off"]
|
|
except KeyError:
|
|
continue
|
|
if len(on_values) == 1:
|
|
rect = ann.get("/Rect")
|
|
if field_id not in radio_fields_by_id:
|
|
radio_fields_by_id[field_id] = {
|
|
"field_id": field_id,
|
|
"type": "radio_group",
|
|
"page": page_index + 1,
|
|
"radio_options": [],
|
|
}
|
|
radio_fields_by_id[field_id]["radio_options"].append({
|
|
"value": on_values[0],
|
|
"rect": rect,
|
|
})
|
|
|
|
# Filter fields without location
|
|
fields_with_location = []
|
|
for field_info in field_info_by_id.values():
|
|
if "page" in field_info:
|
|
fields_with_location.append(field_info)
|
|
else:
|
|
print(f"Unable to determine location for field id: {field_info.get('field_id')}, ignoring")
|
|
|
|
# Sort by page number, then Y position (flipped), then X
|
|
def sort_key(f):
|
|
if "radio_options" in f:
|
|
rect = f["radio_options"][0]["rect"] or [0, 0, 0, 0]
|
|
else:
|
|
rect = f.get("rect") or [0, 0, 0, 0]
|
|
adjusted_position = [-rect[1], rect[0]]
|
|
return [f.get("page"), adjusted_position]
|
|
|
|
sorted_fields = fields_with_location + list(radio_fields_by_id.values())
|
|
sorted_fields.sort(key=sort_key)
|
|
|
|
return sorted_fields
|
|
|
|
|
|
@cmd("form.detail")
|
|
def form_detail(argv: list):
|
|
"""Extract detailed field info (pypdf version) to JSON."""
|
|
if len(argv) < 2:
|
|
Output.error("MissingArg", "Usage: form.detail <pdf> <output.json>")
|
|
pdf_path = argv[0]
|
|
json_output_path = argv[1]
|
|
|
|
from pypdf import PdfReader
|
|
reader = PdfReader(pdf_path)
|
|
field_info = _get_field_info(reader)
|
|
with open(json_output_path, "w") as f:
|
|
json.dump(field_info, f, indent=2)
|
|
print(f"Wrote {len(field_info)} fields to {json_output_path}")
|
|
raise SystemExit(0)
|
|
|
|
|
|
# --- form.fill-legacy (pypdf version with monkeypatch) ---
|
|
|
|
def _validation_error_for_field_value(field_info, field_value):
|
|
field_type = field_info["type"]
|
|
field_id = field_info["field_id"]
|
|
if field_type == "checkbox":
|
|
checked_val = field_info["checked_value"]
|
|
unchecked_val = field_info["unchecked_value"]
|
|
if field_value != checked_val and field_value != unchecked_val:
|
|
return f'ERROR: Invalid value "{field_value}" for checkbox field "{field_id}". The checked value is "{checked_val}" and the unchecked value is "{unchecked_val}"'
|
|
elif field_type == "radio_group":
|
|
option_values = [opt["value"] for opt in field_info["radio_options"]]
|
|
if field_value not in option_values:
|
|
return f'ERROR: Invalid value "{field_value}" for radio group field "{field_id}". Valid values are: {option_values}'
|
|
elif field_type == "choice":
|
|
choice_values = [opt["value"] for opt in field_info["choice_options"]]
|
|
if field_value not in choice_values:
|
|
return f'ERROR: Invalid value "{field_value}" for choice field "{field_id}". Valid values are: {choice_values}'
|
|
return None
|
|
|
|
|
|
def _monkeypatch_pypdf_method():
|
|
"""
|
|
Workaround for pypdf bug with selection list fields.
|
|
pypdf's get_inherited returns a list of two-element lists for /Opt fields
|
|
in selection lists, causing join() to throw TypeError. We patch it to
|
|
return just the value strings.
|
|
"""
|
|
from pypdf.generic import DictionaryObject
|
|
from pypdf.constants import FieldDictionaryAttributes
|
|
|
|
original_get_inherited = DictionaryObject.get_inherited
|
|
|
|
def patched_get_inherited(self, key: str, default=None):
|
|
result = original_get_inherited(self, key, default)
|
|
if key == FieldDictionaryAttributes.Opt:
|
|
if isinstance(result, list) and all(isinstance(v, list) and len(v) == 2 for v in result):
|
|
result = [r[0] for r in result]
|
|
return result
|
|
|
|
DictionaryObject.get_inherited = patched_get_inherited
|
|
|
|
|
|
@cmd("form.fill-legacy")
|
|
def form_fill_legacy(argv: list):
|
|
"""Fill fillable form fields (pypdf version with monkeypatch)."""
|
|
if len(argv) < 3:
|
|
Output.error("MissingArg", "Usage: form.fill-legacy <pdf> <fields.json> <output.pdf>")
|
|
input_pdf = argv[0]
|
|
fields_json = argv[1]
|
|
output_pdf = argv[2]
|
|
|
|
from pypdf import PdfReader, PdfWriter
|
|
|
|
_monkeypatch_pypdf_method()
|
|
|
|
with open(fields_json) as f:
|
|
fields = json.load(f)
|
|
|
|
# Group by page number
|
|
fields_by_page: Dict[int, dict] = {}
|
|
for field in fields:
|
|
if "value" in field:
|
|
field_id = field["field_id"]
|
|
page = field["page"]
|
|
if page not in fields_by_page:
|
|
fields_by_page[page] = {}
|
|
fields_by_page[page][field_id] = field["value"]
|
|
|
|
reader = PdfReader(input_pdf)
|
|
|
|
has_error = False
|
|
field_info = _get_field_info(reader)
|
|
fields_by_ids = {f["field_id"]: f for f in field_info}
|
|
for field in fields:
|
|
existing_field = fields_by_ids.get(field["field_id"])
|
|
if not existing_field:
|
|
has_error = True
|
|
print(f"ERROR: `{field['field_id']}` is not a valid field ID")
|
|
elif field["page"] != existing_field["page"]:
|
|
has_error = True
|
|
print(f"ERROR: Incorrect page number for `{field['field_id']}` (got {field['page']}, expected {existing_field['page']})")
|
|
else:
|
|
if "value" in field:
|
|
err = _validation_error_for_field_value(existing_field, field["value"])
|
|
if err:
|
|
print(err)
|
|
has_error = True
|
|
if has_error:
|
|
raise SystemExit(1)
|
|
|
|
writer = PdfWriter(clone_from=reader)
|
|
for page, field_values in fields_by_page.items():
|
|
writer.update_page_form_field_values(writer.pages[page - 1], field_values, auto_regenerate=False)
|
|
|
|
writer.set_need_appearances_writer(True)
|
|
|
|
with open(output_pdf, "wb") as f:
|
|
writer.write(f)
|
|
|
|
print(f"Filled {len(fields_by_page)} page(s) in {output_pdf}")
|
|
raise SystemExit(0)
|
|
|
|
|
|
# --- form.annotate (annotation-based filling with coordinate transform) ---
|
|
|
|
def _transform_coordinates(bbox, image_width, image_height, pdf_width, pdf_height):
|
|
"""Transform bounding box from image coordinates to PDF coordinates."""
|
|
x_scale = pdf_width / image_width
|
|
y_scale = pdf_height / image_height
|
|
|
|
left = bbox[0] * x_scale
|
|
right = bbox[2] * x_scale
|
|
|
|
# Flip Y coordinates for PDF
|
|
top = pdf_height - (bbox[1] * y_scale)
|
|
bottom = pdf_height - (bbox[3] * y_scale)
|
|
|
|
return left, bottom, right, top
|
|
|
|
|
|
def _normalise_fields_json(raw: dict) -> dict:
|
|
"""Accept both the current sheet-based schema and the legacy flat schema.
|
|
|
|
Current (v2) schema uses ``sheet[].pg/dims/regions[]`` with nested
|
|
``label.bbox``, ``target.bbox``, ``ink{}``.
|
|
|
|
Legacy (v1) schema uses ``pages[]`` + ``form_fields[]`` with flat keys
|
|
like ``entry_bounding_box``, ``label_bounding_box``, ``entry_text{}``.
|
|
|
|
Returns a normalised dict in the **v2** internal format used by all
|
|
downstream functions.
|
|
"""
|
|
# Already v2
|
|
if "sheet" in raw:
|
|
return raw
|
|
|
|
# Convert legacy → v2
|
|
pages_lut = {p["page_number"]: p for p in raw.get("pages", [])}
|
|
sheets: dict = {} # pg -> sheet entry
|
|
|
|
for f in raw.get("form_fields", []):
|
|
pg = f["page_number"]
|
|
if pg not in sheets:
|
|
pi = pages_lut.get(pg, {})
|
|
sheets[pg] = {
|
|
"pg": pg,
|
|
"dims": [pi.get("image_width", 0), pi.get("image_height", 0)],
|
|
"regions": [],
|
|
}
|
|
et = f.get("entry_text", {})
|
|
region = {
|
|
"id": f.get("field_label", f.get("description", "")),
|
|
"hint": f.get("description", ""),
|
|
"label": {"tag": f.get("field_label", ""), "bbox": f.get("label_bounding_box", [0, 0, 0, 0])},
|
|
"target": {"bbox": f.get("entry_bounding_box", [0, 0, 0, 0])},
|
|
"ink": {},
|
|
}
|
|
if isinstance(et, dict) and et.get("text"):
|
|
region["ink"]["value"] = et["text"]
|
|
if "font_size" in et:
|
|
region["ink"]["size"] = et["font_size"]
|
|
if "font_color" in et:
|
|
region["ink"]["color"] = et["font_color"]
|
|
if "font" in et:
|
|
region["ink"]["font"] = et["font"]
|
|
sheets[pg]["regions"].append(region)
|
|
|
|
return {"sheet": list(sheets.values())}
|
|
|
|
|
|
@cmd("form.annotate")
|
|
def form_annotate(argv: list):
|
|
"""Fill a PDF by adding text annotations (FreeText) defined in fields.json."""
|
|
if len(argv) < 3:
|
|
Output.error("MissingArg", "Usage: form.annotate <pdf> <fields.json> <output.pdf>")
|
|
input_pdf = argv[0]
|
|
fields_json_path = argv[1]
|
|
output_pdf = argv[2]
|
|
|
|
from pypdf import PdfReader, PdfWriter
|
|
from pypdf.annotations import FreeText
|
|
|
|
with open(fields_json_path, "r") as f:
|
|
fields_data = _normalise_fields_json(json.load(f))
|
|
|
|
reader = PdfReader(input_pdf)
|
|
writer = PdfWriter()
|
|
writer.append(reader)
|
|
|
|
# Get PDF dimensions for each page
|
|
pdf_dimensions = {}
|
|
for i, page in enumerate(reader.pages):
|
|
mediabox = page.mediabox
|
|
pdf_dimensions[i + 1] = [mediabox.width, mediabox.height]
|
|
|
|
annotations = []
|
|
for page_entry in fields_data["sheet"]:
|
|
pg = page_entry["pg"]
|
|
image_width, image_height = page_entry["dims"]
|
|
pdf_width, pdf_height = pdf_dimensions[pg]
|
|
|
|
for region in page_entry["regions"]:
|
|
ink = region.get("ink", {})
|
|
text = ink.get("value", "")
|
|
if not text:
|
|
continue
|
|
|
|
transformed_box = _transform_coordinates(
|
|
region["target"]["bbox"],
|
|
image_width, image_height,
|
|
pdf_width, pdf_height
|
|
)
|
|
|
|
font_name = ink.get("font", "Arial")
|
|
font_size = str(ink.get("size", 14)) + "pt"
|
|
font_color = ink.get("color", "000000")
|
|
|
|
annotation = FreeText(
|
|
text=text,
|
|
rect=transformed_box,
|
|
font=font_name,
|
|
font_size=font_size,
|
|
font_color=font_color,
|
|
border_color=None,
|
|
background_color=None,
|
|
)
|
|
annotations.append(annotation)
|
|
writer.add_annotation(page_number=pg - 1, annotation=annotation)
|
|
|
|
with open(output_pdf, "wb") as output:
|
|
writer.write(output)
|
|
|
|
print(f"Successfully filled PDF form and saved to {output_pdf}")
|
|
print(f"Added {len(annotations)} text annotations")
|
|
raise SystemExit(0)
|
|
|
|
|
|
# --- form.render (PDF to PNG images) ---
|
|
|
|
@cmd("form.render")
|
|
def form_render(argv: list):
|
|
"""Convert each page of a PDF to a PNG image."""
|
|
if len(argv) < 2:
|
|
Output.error("MissingArg", "Usage: form.render <pdf> <output_dir> [--max-dim N]")
|
|
pdf_path = argv.pop(0)
|
|
output_dir = argv.pop(0)
|
|
max_dim_str = _pop_flag(argv, "-m", "--max-dim")
|
|
max_dim = int(max_dim_str) if max_dim_str else 1000
|
|
|
|
from pdf2image import convert_from_path
|
|
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
images = convert_from_path(pdf_path, dpi=200)
|
|
|
|
for i, image in enumerate(images):
|
|
width, height = image.size
|
|
if width > max_dim or height > max_dim:
|
|
scale_factor = min(max_dim / width, max_dim / height)
|
|
new_width = int(width * scale_factor)
|
|
new_height = int(height * scale_factor)
|
|
image = image.resize((new_width, new_height))
|
|
|
|
image_path = os.path.join(output_dir, f"page_{i+1}.png")
|
|
image.save(image_path)
|
|
print(f"Saved page {i+1} as {image_path} (size: {image.size})")
|
|
|
|
print(f"Converted {len(images)} pages to PNG images")
|
|
raise SystemExit(0)
|
|
|
|
|
|
# --- form.validate (bounding box validation image) ---
|
|
|
|
@cmd("form.validate")
|
|
def form_validate(argv: list):
|
|
"""Create validation images with bounding box rectangles."""
|
|
if len(argv) < 4:
|
|
Output.error("MissingArg", "Usage: form.validate <page> <fields.json> <input_img> <output_img>")
|
|
page_number = int(argv[0])
|
|
fields_json_path = argv[1]
|
|
input_path = argv[2]
|
|
output_path = argv[3]
|
|
|
|
from PIL import Image, ImageDraw
|
|
|
|
with open(fields_json_path, 'r') as f:
|
|
data = _normalise_fields_json(json.load(f))
|
|
|
|
img = Image.open(input_path)
|
|
draw = ImageDraw.Draw(img)
|
|
num_boxes = 0
|
|
|
|
for page_entry in data["sheet"]:
|
|
if page_entry["pg"] != page_number:
|
|
continue
|
|
for region in page_entry["regions"]:
|
|
target_box = region["target"]["bbox"]
|
|
label_box = region["label"]["bbox"]
|
|
draw.rectangle(target_box, outline='red', width=2)
|
|
draw.rectangle(label_box, outline='blue', width=2)
|
|
num_boxes += 2
|
|
|
|
img.save(output_path)
|
|
print(f"Created validation image at {output_path} with {num_boxes} bounding boxes")
|
|
raise SystemExit(0)
|
|
|
|
|
|
# --- form.check-bbox (bounding box overlap detection) ---
|
|
|
|
@dataclass
|
|
class _RectAndField:
|
|
rect: list
|
|
rect_type: str
|
|
field: dict
|
|
|
|
|
|
def get_bounding_box_messages(fields_json_stream) -> List[str]:
|
|
"""Check for overlapping bounding boxes. Returns list of messages (max 20)."""
|
|
messages = []
|
|
raw = json.load(fields_json_stream)
|
|
data = _normalise_fields_json(raw)
|
|
|
|
total_regions = sum(len(pe["regions"]) for pe in data["sheet"])
|
|
messages.append(f"Read {total_regions} regions across {len(data['sheet'])} page(s)")
|
|
|
|
def rects_intersect(r1, r2):
|
|
disjoint_horizontal = r1[0] >= r2[2] or r1[2] <= r2[0]
|
|
disjoint_vertical = r1[1] >= r2[3] or r1[3] <= r2[1]
|
|
return not (disjoint_horizontal or disjoint_vertical)
|
|
|
|
has_error = False
|
|
|
|
for page_entry in data["sheet"]:
|
|
pg = page_entry["pg"]
|
|
# Collect all rects on this page
|
|
rects_and_regions = []
|
|
for region in page_entry["regions"]:
|
|
rects_and_regions.append(_RectAndField(region["label"]["bbox"], "label", region))
|
|
rects_and_regions.append(_RectAndField(region["target"]["bbox"], "target", region))
|
|
|
|
for i, ri in enumerate(rects_and_regions):
|
|
for j in range(i + 1, len(rects_and_regions)):
|
|
rj = rects_and_regions[j]
|
|
if rects_intersect(ri.rect, rj.rect):
|
|
has_error = True
|
|
rid = ri.field.get("id", ri.field.get("hint", "?"))
|
|
rjd = rj.field.get("id", rj.field.get("hint", "?"))
|
|
if ri.field is rj.field:
|
|
messages.append(f"FAILURE: pg {pg} — label/target overlap for `{rid}` ({ri.rect}, {rj.rect})")
|
|
else:
|
|
messages.append(f"FAILURE: pg {pg} — {ri.rect_type} of `{rid}` ({ri.rect}) overlaps {rj.rect_type} of `{rjd}` ({rj.rect})")
|
|
if len(messages) >= 20:
|
|
messages.append("Aborting further checks; fix bounding boxes and try again")
|
|
return messages
|
|
|
|
# Height check for target rects
|
|
if ri.rect_type == "target":
|
|
ink = ri.field.get("ink", {})
|
|
if ink.get("value"):
|
|
font_size = ink.get("size", 14)
|
|
entry_height = ri.rect[3] - ri.rect[1]
|
|
if entry_height < font_size:
|
|
has_error = True
|
|
rid = ri.field.get("id", ri.field.get("hint", "?"))
|
|
messages.append(f"FAILURE: pg {pg} — target box height ({entry_height}) for `{rid}` is shorter than font size ({font_size}). Increase box height or decrease ink.size.")
|
|
if len(messages) >= 20:
|
|
messages.append("Aborting further checks; fix bounding boxes and try again")
|
|
return messages
|
|
|
|
if not has_error:
|
|
messages.append("SUCCESS: All bounding boxes are valid")
|
|
return messages
|
|
|
|
|
|
@cmd("form.check-bbox")
|
|
def form_check_bbox(argv: list):
|
|
"""Check bounding boxes in fields.json for overlaps."""
|
|
if not argv:
|
|
Output.error("MissingArg", "Usage: form.check-bbox <fields.json>")
|
|
with open(argv[0]) as f:
|
|
messages = get_bounding_box_messages(f)
|
|
for msg in messages:
|
|
print(msg)
|
|
raise SystemExit(0)
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════
|
|
# Section 6: convert — office, HTML, and LaTeX
|
|
# ═══════════════════════════════════════════════════════════════
|
|
|
|
_CONVERTIBLE_EXTENSIONS = frozenset({
|
|
".docx", ".doc", ".odt", ".rtf",
|
|
".pptx", ".ppt", ".odp",
|
|
".xlsx", ".xls", ".ods", ".csv",
|
|
".txt", ".html", ".htm",
|
|
})
|
|
|
|
_SOFFICE_CANDIDATES = [
|
|
"/Applications/LibreOffice.app/Contents/MacOS/soffice",
|
|
os.path.expanduser("~/Applications/LibreOffice.app/Contents/MacOS/soffice"),
|
|
"/usr/bin/soffice",
|
|
"/usr/local/bin/soffice",
|
|
"/usr/lib/libreoffice/program/soffice",
|
|
"/opt/libreoffice/program/soffice",
|
|
"/snap/bin/libreoffice.soffice",
|
|
]
|
|
|
|
|
|
def _locate_soffice() -> Optional[str]:
|
|
"""Search for a working soffice binary."""
|
|
for candidate in _SOFFICE_CANDIDATES:
|
|
if Path(candidate).is_file():
|
|
return candidate
|
|
for alias in ("soffice", "libreoffice"):
|
|
found = shutil.which(alias)
|
|
if found:
|
|
return found
|
|
return None
|
|
|
|
|
|
@cmd("convert.office")
|
|
def convert_office(argv: list):
|
|
"""Convert an office document to PDF via LibreOffice."""
|
|
if not argv:
|
|
Output.error("MissingArg", "input file required")
|
|
src_path = argv.pop(0)
|
|
out_path = _pop_flag(argv, "-o", "--output")
|
|
|
|
src = Output.check_file(src_path)
|
|
ext = src.suffix.lower()
|
|
|
|
if ext not in _CONVERTIBLE_EXTENSIONS:
|
|
Output.error(
|
|
"UnsupportedFormat",
|
|
f"Unsupported format: {ext}",
|
|
hint=f"Supported formats: {', '.join(sorted(_CONVERTIBLE_EXTENSIONS))}",
|
|
)
|
|
|
|
binary = _locate_soffice()
|
|
if binary is None:
|
|
Output.error(
|
|
"DependencyMissing",
|
|
"LibreOffice not found",
|
|
hint="Please install LibreOffice: https://www.libreoffice.org/download/",
|
|
)
|
|
|
|
target_dir = Path(out_path).parent if out_path else src.parent
|
|
target_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
cmd_list = [binary, "--headless", "--convert-to", "pdf", "--outdir", str(target_dir), str(src)]
|
|
|
|
try:
|
|
proc = subprocess.run(cmd_list, capture_output=True, text=True, timeout=120)
|
|
if proc.returncode != 0:
|
|
Output.error("ConvertError", f"Conversion failed: {proc.stderr.strip() or 'Unknown error'}", code=4)
|
|
except subprocess.TimeoutExpired:
|
|
Output.error("Timeout", "Conversion timeout (>120s)", code=4)
|
|
except Exception as exc:
|
|
Output.error("ConvertError", f"Conversion failed: {exc}", code=4)
|
|
|
|
auto_name = target_dir / f"{src.stem}.pdf"
|
|
final = Path(out_path) if out_path else auto_name
|
|
|
|
if out_path and final.name != auto_name.name and auto_name.exists():
|
|
auto_name.rename(final)
|
|
|
|
if not final.exists():
|
|
Output.error("ConvertError", "Converted PDF file was not generated", code=4)
|
|
|
|
Output.success({"input": str(src), "output": str(final), "format": ext})
|
|
|
|
|
|
@cmd("convert.html")
|
|
def convert_html(argv: list):
|
|
"""Convert HTML to PDF via node html2pdf.js."""
|
|
if not argv:
|
|
Output.error("MissingArg", "input file required")
|
|
|
|
js_path = _SCRIPT_DIR / "html2pdf.js"
|
|
if not js_path.exists():
|
|
Output.error("DependencyMissing", "html2pdf.js not found in scripts directory")
|
|
|
|
node_path = shutil.which("node")
|
|
if not node_path:
|
|
Output.error("DependencyMissing", "node not found in PATH")
|
|
|
|
cmd_list = [node_path, str(js_path)] + argv
|
|
try:
|
|
proc = subprocess.run(cmd_list, timeout=180)
|
|
raise SystemExit(proc.returncode)
|
|
except subprocess.TimeoutExpired:
|
|
Output.error("Timeout", "HTML conversion timeout (>180s)", code=4)
|
|
except SystemExit:
|
|
raise
|
|
except Exception as exc:
|
|
Output.error("ConvertError", f"HTML conversion failed: {exc}", code=4)
|
|
|
|
|
|
# --- convert.latex (tectonic wrapper with log filtering + PDF stats) ---
|
|
|
|
_NOISE_RE = re.compile(
|
|
r"^note: (?:"
|
|
r'"version 2" Tectonic'
|
|
r"|Running TeX"
|
|
r"|Rerunning TeX because"
|
|
r"|Running xdvipdfmx"
|
|
r"|downloading "
|
|
r"|Skipped writing .* intermediate files"
|
|
r")"
|
|
)
|
|
|
|
|
|
def _find_tectonic() -> Optional[str]:
|
|
"""Locate the tectonic binary: script_dir first, ~/tectonic, then PATH."""
|
|
local_bin = _SCRIPT_DIR / "tectonic"
|
|
if local_bin.exists() and os.access(local_bin, os.X_OK):
|
|
return str(local_bin)
|
|
home_bin = Path.home() / "tectonic"
|
|
if home_bin.exists() and os.access(home_bin, os.X_OK):
|
|
return str(home_bin)
|
|
system_bin = shutil.which("tectonic")
|
|
return system_bin
|
|
|
|
|
|
def _human_size(nbytes: int) -> str:
|
|
for unit in ("B", "KB", "MB", "GB"):
|
|
if nbytes < 1024:
|
|
return f"{nbytes:.2f} {unit}"
|
|
nbytes /= 1024
|
|
return f"{nbytes:.2f} TB"
|
|
|
|
|
|
def _pdf_stats(pdf_file: Path):
|
|
"""Return (pages, word_count, image_count) or Nones."""
|
|
try:
|
|
from pypdf import PdfReader
|
|
except ImportError:
|
|
for attempt in (
|
|
[sys.executable, "-m", "pip", "install", "-q", "pypdf"],
|
|
[sys.executable, "-m", "pip", "install", "-q", "--break-system-packages", "pypdf"],
|
|
[sys.executable, "-m", "pip", "install", "-q", "--user", "pypdf"],
|
|
):
|
|
if subprocess.run(attempt, check=False, capture_output=True).returncode == 0:
|
|
break
|
|
try:
|
|
from pypdf import PdfReader
|
|
except ImportError:
|
|
return None, None, None
|
|
|
|
try:
|
|
reader = PdfReader(str(pdf_file))
|
|
n_pages = len(reader.pages)
|
|
all_text = "".join(p.extract_text() or "" for p in reader.pages)
|
|
n_words = len([w for w in all_text.split() if w.strip()])
|
|
n_images = 0
|
|
for pg in reader.pages:
|
|
xobj = pg.get("/Resources", {}).get("/XObject")
|
|
if xobj:
|
|
obj = xobj.get_object()
|
|
n_images += sum(1 for k in obj if obj[k].get("/Subtype") == "/Image")
|
|
return n_pages, n_words, n_images
|
|
except Exception as exc:
|
|
print(f"Error extracting PDF info: {exc}", file=sys.stderr)
|
|
return None, None, None
|
|
|
|
|
|
def _classify_lines(lines):
|
|
"""Bucket raw output into errors / warnings / layout issues."""
|
|
errors, warnings, layout, pdf_note = [], [], [], None
|
|
for raw in lines:
|
|
ln = raw.rstrip()
|
|
if not ln:
|
|
continue
|
|
if _NOISE_RE.match(ln):
|
|
if ln.startswith("note: Writing"):
|
|
pdf_note = ln
|
|
continue
|
|
if ln.startswith("error:"):
|
|
errors.append(ln)
|
|
elif ln.startswith("warning:"):
|
|
warnings.append(ln)
|
|
elif re.search(r"(Overfull|Underfull) \\[hv]box", ln) or re.search(r"(Font shape|Missing character)", ln):
|
|
layout.append(ln)
|
|
return errors, warnings, layout, pdf_note
|
|
|
|
|
|
def _parse_writing_note(note: Optional[str]):
|
|
m = re.search(r"Writing `(.+?)` \((.+?)\)", note or "")
|
|
return (m.group(1), m.group(2)) if m else (None, None)
|
|
|
|
|
|
@cmd("convert.latex")
|
|
def convert_latex(argv: list):
|
|
"""Compile LaTeX file via tectonic, filter logs, report PDF stats."""
|
|
if not argv:
|
|
Output.error("MissingArg", "tex file required")
|
|
tex_file = argv.pop(0)
|
|
runs_str = _pop_flag(argv, "-r", "--runs")
|
|
runs = int(runs_str) if runs_str else 1
|
|
keep_logs = _pop_flag(argv, "-k", "--keep-logs", needs_value=False)
|
|
|
|
tex = Path(tex_file)
|
|
if not tex.exists():
|
|
print(f"\u2717 Error: File not found {tex_file}")
|
|
raise SystemExit(1)
|
|
|
|
print(f"Compiling {tex.name}...", flush=True)
|
|
if runs > 1:
|
|
print(f"Running {runs} passes (for cross-references)", flush=True)
|
|
|
|
tectonic = _find_tectonic()
|
|
if tectonic is None:
|
|
print("\n\u2717 Error: tectonic command not found")
|
|
print("Please install tectonic: https://tectonic-typesetting.github.io/")
|
|
print("\nHint: If installed at ~/tectonic, ensure it has execute permission:")
|
|
print(" chmod +x ~/tectonic")
|
|
raise SystemExit(1)
|
|
|
|
all_lines = []
|
|
ok = False
|
|
for _ in range(runs):
|
|
try:
|
|
proc = subprocess.run(
|
|
[tectonic, "-X", "compile", str(tex)],
|
|
capture_output=True, text=True, timeout=120,
|
|
)
|
|
all_lines.extend((proc.stdout + proc.stderr).splitlines())
|
|
ok = proc.returncode == 0
|
|
if not ok:
|
|
break
|
|
except subprocess.TimeoutExpired:
|
|
print("\n\u2717 Error: Compilation timeout (>2 minutes)")
|
|
raise SystemExit(1)
|
|
except Exception as exc:
|
|
print(f"\n\u2717 Error: {exc}")
|
|
raise SystemExit(1)
|
|
|
|
if keep_logs:
|
|
print("\n" + "=" * 50 + "\nFull logs:\n" + "=" * 50)
|
|
for ln in all_lines:
|
|
print(ln)
|
|
print("=" * 50 + "\n")
|
|
|
|
errors, warnings, layout, pdf_note = _classify_lines(all_lines)
|
|
noted_name, noted_size = _parse_writing_note(pdf_note)
|
|
pdf_name = noted_name or (tex.stem + ".pdf")
|
|
pdf_path = tex.parent / pdf_name
|
|
|
|
print()
|
|
if ok:
|
|
tag = "\u2713 Compilation successful" + (" (with warnings)" if warnings or layout else "")
|
|
print(tag)
|
|
else:
|
|
print("\u2717 Compilation failed")
|
|
|
|
if ok and pdf_path.exists():
|
|
print("\n========================\nPDF Information\n========================")
|
|
print(f"File: {pdf_name}")
|
|
print(f"Size: {noted_size or _human_size(pdf_path.stat().st_size)}")
|
|
pages, words, images = _pdf_stats(pdf_path)
|
|
if pages is not None:
|
|
print(f"Pages: {pages}")
|
|
if words is not None:
|
|
print(f"Words: ~{words:,}")
|
|
if images is not None:
|
|
print(f"Images: {images}")
|
|
|
|
if layout:
|
|
print(f"\n========================\nLayout Issues ({len(layout)})\n========================")
|
|
for ln in layout:
|
|
print(ln)
|
|
|
|
if warnings:
|
|
print(f"\n========================\nWarnings ({len(warnings)})\n========================")
|
|
for ln in warnings:
|
|
print(ln.replace("warning: ", "", 1))
|
|
|
|
if errors:
|
|
print("\n========================\nErrors\n========================")
|
|
for ln in errors:
|
|
print(ln.replace("error: ", "", 1))
|
|
|
|
if ok and (layout or warnings):
|
|
print()
|
|
print("<system-reminder>")
|
|
print(f"Detected {len(layout)} layout issues and {len(warnings)} warnings.")
|
|
print("These issues affect PDF typesetting quality and must be fixed.")
|
|
print("Do not dismiss with 'warnings don't affect output'. Fix all issues.")
|
|
print("</system-reminder>")
|
|
|
|
raise SystemExit(0 if ok else 1)
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════
|
|
# Section 7: code — sanitization pipeline for PDF generation code
|
|
# ═══════════════════════════════════════════════════════════════
|
|
|
|
# --- Step 0: restore literal unicode escapes/entities to real chars ---
|
|
_RE_UNICODE_ESC = re.compile(r"(\\u[0-9a-fA-F]{4})|(\\U[0-9a-fA-F]{8})|(\\x[0-9a-fA-F]{2})")
|
|
|
|
|
|
def _restore_escapes(s: str) -> str:
|
|
# HTML entities: ³ ≤ α ...
|
|
s = html.unescape(s)
|
|
|
|
# Literal backslash escapes: "\\u00B3" -> "³"
|
|
def _dec(m: re.Match) -> str:
|
|
esc = m.group(0)
|
|
try:
|
|
if esc.startswith("\\u") or esc.startswith("\\U"):
|
|
return chr(int(esc[2:], 16))
|
|
if esc.startswith("\\x"):
|
|
return chr(int(esc[2:], 16))
|
|
except Exception:
|
|
return esc
|
|
return esc
|
|
|
|
return _RE_UNICODE_ESC.sub(_dec, s)
|
|
|
|
|
|
# --- Step 1: superscripts/subscripts -> <super>/<sub> ---
|
|
_SUPERSCRIPT_MAP: Dict[str, str] = {
|
|
"\u2070": "0", "\u00b9": "1", "\u00b2": "2", "\u00b3": "3", "\u2074": "4",
|
|
"\u2075": "5", "\u2076": "6", "\u2077": "7", "\u2078": "8", "\u2079": "9",
|
|
"\u207a": "+", "\u207b": "-", "\u207c": "=", "\u207d": "(", "\u207e": ")",
|
|
"\u207f": "n", "\u1da6": "i",
|
|
}
|
|
|
|
_SUBSCRIPT_MAP: Dict[str, str] = {
|
|
"\u2080": "0", "\u2081": "1", "\u2082": "2", "\u2083": "3", "\u2084": "4",
|
|
"\u2085": "5", "\u2086": "6", "\u2087": "7", "\u2088": "8", "\u2089": "9",
|
|
"\u208a": "+", "\u208b": "-", "\u208c": "=", "\u208d": "(", "\u208e": ")",
|
|
"\u2090": "a", "\u2091": "e", "\u2095": "h", "\u1d62": "i", "\u2c7c": "j",
|
|
"\u2096": "k", "\u2097": "l", "\u2098": "m", "\u2099": "n", "\u2092": "o",
|
|
"\u209a": "p", "\u1d63": "r", "\u209b": "s", "\u209c": "t", "\u1d64": "u",
|
|
"\u1d65": "v", "\u2093": "x",
|
|
}
|
|
|
|
|
|
def _replace_super_sub(s: str) -> str:
|
|
out = []
|
|
for ch in s:
|
|
if ch in _SUPERSCRIPT_MAP:
|
|
out.append(f"<super>{_SUPERSCRIPT_MAP[ch]}</super>")
|
|
elif ch in _SUBSCRIPT_MAP:
|
|
out.append(f"<sub>{_SUBSCRIPT_MAP[ch]}</sub>")
|
|
else:
|
|
out.append(ch)
|
|
return "".join(out)
|
|
|
|
|
|
# --- Step 2: symbol fallback for SimHei (protect tags, then replace) ---
|
|
_SYMBOL_FALLBACK: Dict[str, str] = {
|
|
# Currently empty - enable entries as needed for fonts missing specific glyphs
|
|
}
|
|
|
|
|
|
def _fallback_symbols(s: str) -> str:
|
|
# Protect <super>/<sub> tags from being modified
|
|
placeholders: Dict[str, str] = {}
|
|
|
|
def _protect_tag(m: re.Match) -> str:
|
|
key = f"@@TAG{len(placeholders)}@@"
|
|
placeholders[key] = m.group(0)
|
|
return key
|
|
|
|
protected = re.sub(r"</?super>|</?sub>", _protect_tag, s)
|
|
|
|
# Replace symbols
|
|
protected = "".join(_SYMBOL_FALLBACK.get(ch, ch) for ch in protected)
|
|
|
|
# Restore tags
|
|
for k, v in placeholders.items():
|
|
protected = protected.replace(k, v)
|
|
|
|
return protected
|
|
|
|
|
|
def sanitize_code(text: str) -> str:
|
|
"""
|
|
Full sanitization pipeline for PDF generation code.
|
|
- Restore unicode escapes/entities to real characters
|
|
- Replace superscript/subscript unicode with <super>/<sub>
|
|
- Replace other risky symbols with ASCII/text fallbacks
|
|
"""
|
|
s = _restore_escapes(text)
|
|
s = _replace_super_sub(s)
|
|
s = _fallback_symbols(s)
|
|
return s
|
|
|
|
|
|
@cmd("code.sanitize")
|
|
def code_sanitize(argv: list):
|
|
"""Sanitize Unicode in a Python script for PDF generation."""
|
|
if not argv:
|
|
Output.error("MissingArg", "Usage: code.sanitize <target_script.py>")
|
|
target = argv[0]
|
|
with open(target, "r", encoding="utf-8") as f:
|
|
code = f.read()
|
|
sanitized = sanitize_code(code)
|
|
with open(target, "w", encoding="utf-8") as f:
|
|
f.write(sanitized)
|
|
print(f"Sanitized: {target}")
|
|
raise SystemExit(0)
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════
|
|
# Section 8: CLI dispatcher
|
|
# ═══════════════════════════════════════════════════════════════
|
|
|
|
def _usage():
|
|
sys.stdout.write(__doc__.strip() + "\n")
|
|
raise SystemExit(0)
|
|
|
|
|
|
def main():
|
|
tokens = sys.argv[1:]
|
|
if not tokens or tokens[0] in ("-h", "--help"):
|
|
_usage()
|
|
|
|
cmd_name = tokens.pop(0)
|
|
|
|
# Direct match
|
|
handler = _COMMANDS.get(cmd_name)
|
|
if handler is not None:
|
|
handler(tokens)
|
|
return
|
|
|
|
# Two-word match (e.g., "extract text" -> "extract.text")
|
|
if tokens:
|
|
compound = f"{cmd_name}.{tokens[0]}"
|
|
handler = _COMMANDS.get(compound)
|
|
if handler is not None:
|
|
tokens.pop(0)
|
|
handler(tokens)
|
|
return
|
|
|
|
# List commands in group
|
|
group_cmds = [k for k in _COMMANDS if k.startswith(cmd_name + ".")]
|
|
if group_cmds:
|
|
print(f"Available commands in '{cmd_name}':")
|
|
for c in sorted(group_cmds):
|
|
print(f" {c}")
|
|
raise SystemExit(0)
|
|
|
|
print(f"Unknown command: {cmd_name}\n")
|
|
_usage()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|