#!/usr/bin/env python3 """ PDF Processing Toolkit — All-in-One CLI Usage: python3 pdf.py [args...] Commands: env.check [--json] Check environment dependencies env.fix Auto-install missing dependencies extract.text [-p pages] extract.table [-p pages] extract.image -o pages.merge ... -o pages.split -o pages.rotate -o [-p pages] pages.crop -o [-p pages] meta.get meta.set -o -d meta.brand ... [-o ] [-t title] [-q] form.info form.fill -o -d form.detail form.fill-legacy form.annotate form.render [--max-dim N] form.validate form.check-bbox pages.clean -o Remove blank pages convert.office [-o ] convert.html [-o ] [--css ] convert.blueprint [-o ] convert.latex [--runs N] [--keep-logs] palette.generate [--title "..."] [--mode minimal] [--format python|json|css] palette.cascade [--title "..."] [--mode minimal] [--format summary|json|css|reportlab] code.sanitize content.sanitize [--apply] Fix content issues (CJK, encoding) font.check toc.check """ from __future__ import annotations import html import json import os import re import shutil import subprocess import sys from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Set, Tuple # ═══════════════════════════════════════════════════════════════ # Section 0: Framework — Output, @cmd registry, CLI parser # ═══════════════════════════════════════════════════════════════ class Output: """Structured JSON output for all subcommands.""" @staticmethod def success(data: dict): payload = {"status": "success", "data": data} sys.stdout.write(json.dumps(payload, ensure_ascii=False, indent=2) + "\n") raise SystemExit(0) @staticmethod def error(error: str, message: str, hint: Optional[str] = None, code: int = 1): payload = {"status": "error", "error": error, "message": message} if hint is not None: payload["hint"] = hint sys.stderr.write(json.dumps(payload, ensure_ascii=False, indent=2) + "\n") raise SystemExit(code) @staticmethod def check_file(filepath: str) -> Path: target = Path(filepath) if not target.exists(): Output.error("FileNotFound", f"File not found: {filepath}", code=2) return target # Command registry _COMMANDS: Dict[str, Callable] = {} def cmd(name: str): """Decorator to register a CLI command under a dotted namespace.""" def decorator(fn: Callable) -> Callable: _COMMANDS[name] = fn return fn return decorator def _pop_flag(argv: list, short: str, long: str, needs_value: bool = True): """Extract a flag (and optional value) from *argv* in-place.""" for idx, tok in enumerate(argv): if tok in (short, long): argv.pop(idx) if needs_value: if idx < len(argv): return argv.pop(idx) Output.error("MissingArg", f"Flag {long} requires a value") return True return None def _load_json_arg(argv: list) -> dict: """Read JSON from -d/--data string or -f/--file path.""" raw = _pop_flag(argv, "-d", "--data") if raw is not None: try: return json.loads(raw) except json.JSONDecodeError as exc: Output.error("InvalidJSON", f"JSON parse error: {exc}") fpath = _pop_flag(argv, "-f", "--file") if fpath is not None: try: with open(fpath) as fh: return json.load(fh) except Exception as exc: Output.error("FileError", f"Failed to read file: {exc}") Output.error("MissingData", "Requires --data or --file argument") def _resolve_page_indices(range_spec: Optional[str], page_count: int) -> List[int]: """Turn a human-friendly range string (1-indexed) into a sorted list of 0-based indices.""" if not range_spec: return list(range(page_count)) indices: Set[int] = set() for segment in range_spec.split(","): segment = segment.strip() if "-" in segment: lo, hi = segment.split("-", 1) for i in range(int(lo) - 1, min(int(hi), page_count)): indices.add(i) else: val = int(segment) - 1 if 0 <= val < page_count: indices.add(val) return sorted(indices) _SCRIPT_DIR = Path(__file__).resolve().parent # ═══════════════════════════════════════════════════════════════ # Section 1: env — environment diagnostics and auto-fix # ═══════════════════════════════════════════════════════════════ def _probe_cmd(name: str, version_args: Optional[List[str]] = None) -> Tuple[str, str]: """Check if a command exists and optionally get its version. Returns (status, detail).""" path = shutil.which(name) if path is None: return ("missing", "") if version_args is None: return ("ok", "") try: result = subprocess.run( [path] + version_args, capture_output=True, text=True, timeout=10 ) ver = result.stdout.strip() or result.stderr.strip() return ("ok", ver) except Exception: return ("ok", "") def _probe_python_module(mod_name: str) -> Tuple[str, str]: """Check if a Python module is importable and get its version.""" try: result = subprocess.run( [sys.executable, "-c", f"import {mod_name}; print(getattr({mod_name}, '__version__', 'installed'))"], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: return ("ok", result.stdout.strip()) return ("missing", "") except Exception: return ("missing", "") def _probe_node() -> Tuple[str, str]: s, d = _probe_cmd("node", ["--version"]) if s == "ok" and d: d = d.lstrip("v") return (s, d) def _probe_python() -> Tuple[str, str]: try: import platform return ("ok", platform.python_version()) except Exception: return ("ok", "") def _probe_libreoffice() -> Tuple[str, str]: candidates = [ "/Applications/LibreOffice.app/Contents/MacOS/soffice", os.path.expanduser("~/Applications/LibreOffice.app/Contents/MacOS/soffice"), "/usr/bin/soffice", "/usr/local/bin/soffice", "/usr/lib/libreoffice/program/soffice", "/opt/libreoffice/program/soffice", "/snap/bin/libreoffice.soffice", ] for c in candidates: if Path(c).is_file(): return ("ok", "") for alias in ("soffice", "libreoffice"): if shutil.which(alias): return ("ok", "") return ("missing", "") def _probe_tectonic() -> Tuple[str, str]: home_bin = Path.home() / "tectonic" if home_bin.exists() and os.access(home_bin, os.X_OK): return ("ok", "") tec_local = _SCRIPT_DIR / "tectonic" if tec_local.exists() and os.access(tec_local, os.X_OK): return ("ok", "") if shutil.which("tectonic"): return ("ok", "") return ("missing", "") def _probe_playwright_npm() -> Tuple[str, str]: """Check if playwright npm package is installed.""" try: result = subprocess.run( ["node", "-e", "console.log(require('playwright/package.json').version)"], capture_output=True, text=True, timeout=10 ) if result.returncode == 0 and result.stdout.strip(): return ("ok", result.stdout.strip()) except Exception: pass # Try global try: result = subprocess.run( ["npm", "list", "-g", "playwright", "--depth=0"], capture_output=True, text=True, timeout=15 ) import re as _re m = _re.search(r"playwright@(\S+)", result.stdout) if m: return ("ok", m.group(1)) except Exception: pass return ("missing", "") def _probe_chromium() -> Tuple[str, str]: """Check if Playwright Chromium browser is installed.""" import platform as _platform home = Path.home() if _platform.system() == "Darwin": cache_dir = home / "Library" / "Caches" / "ms-playwright" else: cache_dir = home / ".cache" / "ms-playwright" if cache_dir.is_dir(): for entry in sorted(cache_dir.iterdir(), reverse=True): if "chromium" in entry.name.lower(): return ("ok", entry.name) return ("missing", "") @cmd("env.check") def env_check(argv: list): """Check environment dependencies.""" use_json = _pop_flag(argv, "-j", "--json", needs_value=False) s_node = _probe_node() s_pw = _probe_playwright_npm() s_cr = _probe_chromium() s_py = _probe_python() s_pike = _probe_python_module("pikepdf") s_plumb = _probe_python_module("pdfplumber") s_lo = _probe_libreoffice() s_tec = _probe_tectonic() s_pw_py = _probe_python_module("playwright") if use_json: report = { "html_route": { "node": s_node[0], "node_version": s_node[1], "playwright": s_pw[0], "playwright_version": s_pw[1], "chromium": s_cr[0], "chromium_detail": s_cr[1], }, "process_route": { "python3": s_py[0], "python3_version": s_py[1], "pikepdf": s_pike[0], "pikepdf_version": s_pike[1], "pdfplumber": s_plumb[0], "pdfplumber_version": s_plumb[1], "playwright_python": s_pw_py[0], "playwright_python_version": s_pw_py[1], }, "optional": { "libreoffice": s_lo[0], "tectonic": s_tec[0], }, } sys.stdout.write(json.dumps(report, ensure_ascii=False, indent=2) + "\n") # Determine exit code rc = 0 for v in [s_node, s_pw, s_cr, s_py, s_pike, s_plumb]: if v[0] != "ok": rc = 2 break raise SystemExit(rc) # Human-readable output rc = 0 def show(name: str, status: Tuple[str, str], optional: bool = False): nonlocal rc s, d = status if s == "ok": detail = f" ({d})" if d else "" print(f" \u2713 {name}{detail}") elif optional: print(f" \u25cb {name} (optional, not installed)") else: print(f" \u2717 {name} (missing)") rc = 2 print("=== PDF Skill Environment ===\n") print("--- HTML Route ---") show("node", s_node) show("playwright", s_pw) show("chromium", s_cr) print("\n--- Process Route ---") show("python3", s_py) show("pikepdf", s_pike) show("pdfplumber", s_plumb) if s_pw_py[0] == "ok": print(f" (playwright-python: {s_pw_py[1]})") print("\n--- Optional ---") show("libreoffice", s_lo, optional=True) show("tectonic", s_tec, optional=True) print("\n=== Install Commands ===") print(" Node.js: brew install node (macOS) / apt install nodejs (Ubuntu)") print(" Playwright: npm install -g playwright && npx playwright install chromium") print(" Python: brew install python3 (macOS) / apt install python3 (Ubuntu)") print(" pikepdf: pip install pikepdf pdfplumber --user") print(" LibreOffice: brew install --cask libreoffice (macOS)") print(" Tectonic: curl -fsSL https://drop-sh.fullyjustified.net | sh") raise SystemExit(rc) @cmd("env.fix") def env_fix(argv: list): """Auto-install missing Python dependencies.""" modules = { "pikepdf": "pikepdf", "pdfplumber": "pdfplumber", "pypdf": "pypdf", "pdf2image": "pdf2image", "PIL": "Pillow", } installed = [] for mod, pkg in modules.items(): s, _ = _probe_python_module(mod) if s == "missing": print(f"Installing {pkg}...") for attempt in ( [sys.executable, "-m", "pip", "install", "-q", pkg], [sys.executable, "-m", "pip", "install", "-q", "--user", pkg], [sys.executable, "-m", "pip", "install", "-q", "--break-system-packages", pkg], ): result = subprocess.run(attempt, capture_output=True, text=True) if result.returncode == 0: installed.append(pkg) break else: print(f" Failed to install {pkg}") if installed: print(f"\nInstalled: {', '.join(installed)}") else: print("All Python dependencies are already installed.") raise SystemExit(0) # ═══════════════════════════════════════════════════════════════ # Section 2: extract — text, tables, and embedded images # ═══════════════════════════════════════════════════════════════ @cmd("extract.text") def extract_text(argv: list): """Pull plain text from selected pages.""" if not argv: Output.error("MissingArg", "pdf path required") pdf_path = argv.pop(0) page_range = _pop_flag(argv, "-p", "--pages") import pdfplumber src = Output.check_file(pdf_path) try: doc = pdfplumber.open(src) except Exception as exc: Output.error("PDFError", f"Cannot open PDF: {exc}", code=3) total_pages = len(doc.pages) target_pages = _resolve_page_indices(page_range, total_pages) char_total = 0 page_results = [] for pg_idx in target_pages: content = doc.pages[pg_idx].extract_text() or "" char_total += len(content) page_results.append({"page": pg_idx + 1, "chars": len(content), "text": content}) doc.close() Output.success({ "total_pages": total_pages, "extracted_pages": len(target_pages), "total_chars": char_total, "pages": page_results, }) @cmd("extract.table") def extract_table(argv: list): """Locate and return every table on selected pages.""" if not argv: Output.error("MissingArg", "pdf path required") pdf_path = argv.pop(0) page_range = _pop_flag(argv, "-p", "--pages") import pdfplumber src = Output.check_file(pdf_path) try: doc = pdfplumber.open(src) except Exception as exc: Output.error("PDFError", f"Cannot open PDF: {exc}", code=3) total_pages = len(doc.pages) target_pages = _resolve_page_indices(page_range, total_pages) collected = [] for pg_idx in target_pages: for tbl_num, raw_table in enumerate(doc.pages[pg_idx].extract_tables()): if not raw_table: continue sanitised = [ [(cell.strip() if cell else "") for cell in row] for row in raw_table ] collected.append({ "page": pg_idx + 1, "table_index": tbl_num, "rows": len(sanitised), "cols": len(sanitised[0]) if sanitised else 0, "data": sanitised, }) doc.close() Output.success({ "total_pages": total_pages, "extracted_pages": len(target_pages), "total_tables": len(collected), "tables": collected, }) @cmd("extract.image") def extract_image(argv: list): """Save every embedded raster image to output dir.""" if not argv: Output.error("MissingArg", "pdf path required") pdf_path = argv.pop(0) out_dir = _pop_flag(argv, "-o", "--output") or "." import pikepdf src = Output.check_file(pdf_path) dest = Path(out_dir) dest.mkdir(parents=True, exist_ok=True) try: doc = pikepdf.open(src) except Exception as exc: Output.error("PDFError", f"Cannot open PDF: {exc}", code=3) saved = [] seq = 0 _EXT_MAP = { "/DCTDecode": "jpg", "/FlateDecode": "png", "/JPXDecode": "jp2", } for page_no, pg in enumerate(doc.pages, 1): res = pg.get("/Resources") if res is None or "/XObject" not in res: continue for key, ref in res.XObject.items(): try: img_obj = doc.get_object(ref.objgen) if img_obj.get("/Subtype") != "/Image": continue seq += 1 w = int(img_obj.get("/Width", 0)) h = int(img_obj.get("/Height", 0)) filt = img_obj.get("/Filter") ext = _EXT_MAP.get(str(filt) if filt else None, "bin") fname = f"page{page_no}_img{seq}.{ext}" out_file = dest / fname out_file.write_bytes(img_obj.read_raw_bytes()) saved.append({ "page": page_no, "name": str(key), "file": str(out_file), "width": w, "height": h, "format": ext, }) except Exception: continue doc.close() Output.success({"output_dir": str(dest), "total_images": len(saved), "images": saved}) # ═══════════════════════════════════════════════════════════════ # Section 3: pages — merge, split, rotate, crop # ═══════════════════════════════════════════════════════════════ @cmd("pages.merge") def pages_merge(argv: list): """Concatenate several PDF files into one.""" out_path = _pop_flag(argv, "-o", "--output") if out_path is None: Output.error("MissingArg", "--output is required") if not argv: Output.error("MissingArg", "At least one PDF required") import pikepdf sources = [Output.check_file(p) for p in argv] handles = [] try: combined = pikepdf.new() descriptions = [] for src in sources: handle = pikepdf.open(src) handles.append(handle) n = len(handle.pages) descriptions.append(f"{src.name} ({n} pages)") for pg in handle.pages: combined.pages.append(pg) total = len(combined.pages) combined.save(out_path) combined.close() except Exception as exc: Output.error("MergeError", f"Merge failed: {exc}", code=4) finally: for h in handles: try: h.close() except Exception: pass Output.success({"output": out_path, "total_pages": total, "sources": descriptions}) @cmd("pages.split") def pages_split(argv: list): """Write each page as a separate single-page PDF.""" if not argv: Output.error("MissingArg", "pdf path required") pdf_path = argv.pop(0) out_dir = _pop_flag(argv, "-o", "--output") or "." import pikepdf src = Output.check_file(pdf_path) dest = Path(out_dir) dest.mkdir(parents=True, exist_ok=True) try: doc = pikepdf.open(src) except Exception as exc: Output.error("PDFError", f"Cannot open PDF: {exc}", code=3) generated = [] base = src.stem try: for idx, pg in enumerate(doc.pages, 1): fp = dest / f"{base}_page{idx:03d}.pdf" single_doc = pikepdf.new() single_doc.pages.append(pg) single_doc.save(fp) single_doc.close() generated.append(str(fp)) doc.close() except Exception as exc: Output.error("SplitError", f"Split failed: {exc}", code=4) Output.success({"output_dir": str(dest), "total_pages": len(generated), "files": generated}) @cmd("pages.rotate") def pages_rotate(argv: list): """Rotate selected pages by 90/180/270 degrees.""" if len(argv) < 2: Output.error("MissingArg", "pdf path and degrees required") pdf_path = argv.pop(0) degrees = int(argv.pop(0)) out_path = _pop_flag(argv, "-o", "--output") if out_path is None: Output.error("MissingArg", "--output is required") page_range = _pop_flag(argv, "-p", "--pages") if degrees not in (90, 180, 270): Output.error("InvalidDegrees", "Rotation angle must be 90, 180, or 270") import pikepdf src = Output.check_file(pdf_path) try: doc = pikepdf.open(src) except Exception as exc: Output.error("PDFError", f"Cannot open PDF: {exc}", code=3) targets = _resolve_page_indices(page_range, len(doc.pages)) try: for i in targets: existing = int(doc.pages[i].get("/Rotate", 0)) doc.pages[i]["/Rotate"] = (existing + degrees) % 360 doc.save(out_path) doc.close() except Exception as exc: Output.error("RotateError", f"Rotation failed: {exc}", code=4) Output.success({"output": out_path, "degrees": degrees, "pages_rotated": len(targets)}) @cmd("pages.crop") def pages_crop(argv: list): """Set the media/crop box on selected pages. box = 'left,bottom,right,top' in pt.""" if len(argv) < 2: Output.error("MissingArg", "pdf path and crop box required") pdf_path = argv.pop(0) box_str = argv.pop(0) out_path = _pop_flag(argv, "-o", "--output") if out_path is None: Output.error("MissingArg", "--output is required") page_range = _pop_flag(argv, "-p", "--pages") try: coords = [float(v.strip()) for v in box_str.split(",")] assert len(coords) == 4 left, bottom, right, top = coords except Exception: Output.error("InvalidBox", "Invalid crop box format, should be: left,bottom,right,top", hint="Example: 50,50,550,750") import pikepdf src = Output.check_file(pdf_path) try: doc = pikepdf.open(src) except Exception as exc: Output.error("PDFError", f"Cannot open PDF: {exc}", code=3) targets = _resolve_page_indices(page_range, len(doc.pages)) try: arr = pikepdf.Array([left, bottom, right, top]) for i in targets: doc.pages[i].mediabox = arr doc.pages[i].cropbox = arr doc.save(out_path) doc.close() except Exception as exc: Output.error("CropError", f"Crop failed: {exc}", code=4) Output.success({ "output": out_path, "box": {"left": left, "bottom": bottom, "right": right, "top": top}, "pages_cropped": len(targets), }) @cmd("pages.clean") def pages_clean(argv: list): """Remove truly blank pages from a PDF. A page is considered blank ONLY if it has exactly 0 text characters AND 0 images. Pages with even a single character or a tiny image are kept. Usage: pdf.py pages.clean input.pdf -o output.pdf """ if not argv: Output.error("MissingArg", "PDF path required") pdf_path = argv.pop(0) out_path = _pop_flag(argv, "-o", "--output") if out_path is None: Output.error("MissingArg", "--output is required") # --threshold is accepted but ignored (kept for backward compat) _pop_flag(argv, "-t", "--threshold") src = Output.check_file(pdf_path) # Phase 1: Detect blank pages using pdfplumber (text extraction) try: import pdfplumber except ImportError: Output.error("DependencyMissing", "pdfplumber required: pip install pdfplumber") blank_indices = [] # 0-indexed total_pages = 0 try: pdf = pdfplumber.open(str(src)) total_pages = len(pdf.pages) for i, page in enumerate(pdf.pages): text = (page.extract_text() or "").strip() # Check for ANY images on the page (no size filter) images = page.images if hasattr(page, 'images') else [] # A page is blank ONLY if it has exactly 0 characters AND 0 images if len(text) == 0 and len(images) == 0: blank_indices.append(i) pdf.close() except Exception as exc: Output.error("PDFError", f"Cannot analyze PDF: {exc}", code=3) if not blank_indices: Output.success({ "output": str(src), "total_pages": total_pages, "blank_pages_removed": 0, "message": "No blank pages found", }) return # Phase 2: Remove blank pages using pikepdf try: import pikepdf except ImportError: Output.error("DependencyMissing", "pikepdf required: pip install pikepdf") try: doc = pikepdf.open(str(src)) # Remove in reverse order to preserve indices for idx in sorted(blank_indices, reverse=True): if idx < len(doc.pages): del doc.pages[idx] doc.save(out_path) doc.close() except Exception as exc: Output.error("PDFError", f"Cannot remove blank pages: {exc}", code=4) remaining = total_pages - len(blank_indices) Output.success({ "output": out_path, "total_pages": total_pages, "blank_pages_removed": len(blank_indices), "blank_page_numbers": [i + 1 for i in blank_indices], # 1-indexed for humans "remaining_pages": remaining, }) # ═══════════════════════════════════════════════════════════════ # Section 4: meta — metadata reading, writing, and branding # ═══════════════════════════════════════════════════════════════ _XMP_MAPPING = { "Title": "dc:title", "Author": "dc:creator", "Subject": "dc:description", "Keywords": "pdf:Keywords", "Creator": "xmp:CreatorTool", "Producer": "pdf:Producer", } _ACCEPTED_KEYS = set(_XMP_MAPPING.keys()) @cmd("meta.get") def meta_get(argv: list): """Read document information and metadata.""" if not argv: Output.error("MissingArg", "pdf path required") import pikepdf src = Output.check_file(argv[0]) try: doc = pikepdf.open(src) except Exception as exc: Output.error("PDFError", f"Cannot open PDF: {exc}", code=3) record: dict = { "pages": len(doc.pages), "pdf_version": str(doc.pdf_version), } if doc.pages: mb = doc.pages[0].mediabox record["page_size"] = { "width": float(mb[2] - mb[0]), "height": float(mb[3] - mb[1]), "unit": "pt", } kv_pairs = {} if doc.docinfo: for k in doc.docinfo.keys(): try: kv_pairs[str(k).lstrip("/")] = str(doc.docinfo[k]) except Exception: pass record["metadata"] = kv_pairs record["encrypted"] = doc.is_encrypted record["has_form"] = "/AcroForm" in doc.Root record["has_outlines"] = "/Outlines" in doc.Root doc.close() Output.success(record) @cmd("meta.set") def meta_set(argv: list): """Update XMP + legacy docinfo metadata fields.""" if not argv: Output.error("MissingArg", "pdf path required") pdf_path = argv.pop(0) out_path = _pop_flag(argv, "-o", "--output") if out_path is None: Output.error("MissingArg", "--output is required") data = _load_json_arg(argv) import pikepdf src = Output.check_file(pdf_path) try: doc = pikepdf.open(src) except Exception as exc: Output.error("PDFError", f"Cannot open PDF: {exc}", code=3) # XMP layer with doc.open_metadata() as xmp: for raw_key, raw_val in data.items(): norm = raw_key.title() xmp_key = _XMP_MAPPING.get(norm) if xmp_key is None: continue try: xmp[xmp_key] = str(raw_val) except Exception: pass # Legacy docinfo layer if not doc.docinfo: doc.docinfo = pikepdf.Dictionary() for raw_key, raw_val in data.items(): norm = raw_key.title() if norm in _ACCEPTED_KEYS: doc.docinfo[pikepdf.Name(f"/{norm}")] = pikepdf.String(str(raw_val)) doc.docinfo[pikepdf.Name("/ModDate")] = pikepdf.String( datetime.now().strftime("D:%Y%m%d%H%M%S") ) try: doc.save(out_path) doc.close() except Exception as exc: Output.error("SaveError", f"Save failed: {exc}", code=4) Output.success({"output": out_path, "updated_fields": list(data.keys())}) @cmd("meta.brand") def meta_brand(argv: list): """Add Z.ai branding metadata to PDF documents.""" output_path = _pop_flag(argv, "-o", "--output") custom_title = _pop_flag(argv, "-t", "--title") quiet = _pop_flag(argv, "-q", "--quiet", needs_value=False) if not argv: Output.error("MissingArg", "At least one PDF file required") # Check if output is specified for multiple files if output_path and len(argv) > 1: Output.error("InvalidArg", "--output can only be used with a single input file") from pypdf import PdfReader, PdfWriter for input_path in argv: if not os.path.exists(input_path): print(f"Error: Input file not found: {input_path}", file=sys.stderr) continue try: reader = PdfReader(input_path) except Exception as e: print(f"Error: Cannot open PDF: {e}", file=sys.stderr) continue writer = PdfWriter() for page in reader.pages: writer.add_page(page) # Determine title if custom_title: title = custom_title else: original_meta = reader.metadata if original_meta and original_meta.title and original_meta.title not in ('(anonymous)', 'unspecified', None): title = original_meta.title else: title = os.path.splitext(os.path.basename(input_path))[0] writer.add_metadata({ '/Title': title, '/Author': 'Z.ai', '/Creator': 'Z.ai', '/Producer': 'http://z.ai', }) # Write output out = output_path if (len(argv) == 1 and output_path) else input_path try: with open(out, "wb") as f: writer.write(f) except Exception as e: print(f"Error: Cannot write output file: {e}", file=sys.stderr) continue if not quiet: print(f"\u2713 Updated metadata for: {os.path.basename(input_path)}") print(f" Title: {title}") print(f" Author: Z.ai") print(f" Creator: Z.ai") print(f" Producer: http://z.ai") if out != input_path: print(f" Output: {out}") raise SystemExit(0) # ═══════════════════════════════════════════════════════════════ # Section 5: form — inspection, filling, annotation, rendering # ═══════════════════════════════════════════════════════════════ # --- form.info (pikepdf-based) --- _FIELD_TYPE_MAP = { "/Tx": "text", "/Sig": "signature", } def _classify_field(node) -> str: """Map a PDF field type token to a human label.""" ft = str(node.get("/FT", "")) if ft in _FIELD_TYPE_MAP: return _FIELD_TYPE_MAP[ft] flags = int(node.get("/Ff", 0)) if ft == "/Btn": return "radio" if (flags & (1 << 15)) else "checkbox" if ft == "/Ch": return "dropdown" if (flags & (1 << 17)) else "listbox" return "unknown" def _extra_props(node, kind: str) -> dict: """Gather type-specific metadata (options, checked value, etc.).""" props: dict = {} if kind == "checkbox": ap = node.get("/AP") if ap and "/N" in ap: states = [str(s) for s in ap["/N"].keys()] props["states"] = states props["checked_value"] = next((s for s in states if s != "/Off"), states[0] if states else None) elif kind in ("dropdown", "listbox"): raw_opts = node.get("/Opt") if raw_opts: props["options"] = [ {"value": str(item[0]), "label": str(item[1])} if isinstance(item, list) and len(item) >= 2 else {"value": str(item), "label": str(item)} for item in raw_opts ] elif kind == "radio": kids = node.get("/Kids") if kids: radio_vals = [] for child in kids: ap = child.get("/AP") if ap and "/N" in ap: radio_vals.extend(str(k) for k in ap["/N"].keys() if str(k) != "/Off") if radio_vals: props["options"] = radio_vals return props def _current_value(node): v = node.get("/V") return str(v) if v is not None else None def _gather_fields(doc) -> list: """Walk the AcroForm field tree iteratively and return a flat list.""" if "/AcroForm" not in doc.Root: return [] acro = doc.Root.AcroForm if "/Fields" not in acro: return [] page_lookup = {pg.objgen: idx for idx, pg in enumerate(doc.pages)} results = [] stack = [(field, "") for field in reversed(list(acro.Fields))] while stack: node, parent_path = stack.pop() name = str(node.get("/T", "")) full = f"{parent_path}.{name}" if parent_path else name kids = node.get("/Kids") if kids and any("/T" in k for k in kids): for kid in reversed(list(kids)): stack.append((kid, full)) continue kind = _classify_field(node) if kind == "unknown": continue entry = {"id": full, "type": kind} val = _current_value(node) if val: entry["current_value"] = val entry.update(_extra_props(node, kind)) page_ref = node.get("/P") if page_ref and hasattr(page_ref, "objgen"): pg_num = page_lookup.get(page_ref.objgen) if pg_num is not None: entry["page"] = pg_num + 1 results.append(entry) return results @cmd("form.info") def form_info(argv: list): """Return structured JSON describing every form field (pikepdf + check_fillable).""" if not argv: Output.error("MissingArg", "pdf path required") import pikepdf src = Output.check_file(argv[0]) try: doc = pikepdf.open(src) except Exception as exc: Output.error("PDFError", f"Cannot open PDF: {exc}", code=3) fields = _gather_fields(doc) if not fields: Output.success({"has_fields": False, "count": 0, "fields": [], "hint": "This PDF has no fillable form fields"}) Output.success({"has_fields": True, "count": len(fields), "fields": fields}) @cmd("form.fill") def form_fill(argv: list): """Write values into a fillable PDF (pikepdf version).""" if not argv: Output.error("MissingArg", "pdf path required") pdf_path = argv.pop(0) out_path = _pop_flag(argv, "-o", "--output") if out_path is None: Output.error("MissingArg", "--output is required") data = _load_json_arg(argv) import pikepdf src = Output.check_file(pdf_path) try: doc = pikepdf.open(src) except Exception as exc: Output.error("PDFError", f"Cannot open PDF: {exc}", code=3) if "/AcroForm" not in doc.Root or "/Fields" not in doc.Root.AcroForm: Output.error("NoForm", "This PDF has no form fields") known = {f["id"]: f for f in _gather_fields(doc)} # Validation issues = [] for fid, fval in data.items(): if fid not in known: issues.append(f"Field not found: {fid}") continue fmeta = known[fid] ftype = fmeta["type"] if ftype == "checkbox" and "states" in fmeta: ok_vals = fmeta["states"] if fval not in ok_vals and f"/{fval}" not in ok_vals and fval not in ("true", "True", "false", "False", "1", "0"): issues.append(f"Invalid value for field {fid}, options: {ok_vals} or true/false") if ftype in ("dropdown", "listbox") and "options" in fmeta: ok_vals = [o["value"] for o in fmeta["options"]] if fval not in ok_vals: issues.append(f"Invalid value for field {fid}, options: {ok_vals}") if issues: Output.error("ValidationError", "Field validation failed", hint="; ".join(issues)) # Fill written = 0 def _apply(node, parent_path=""): nonlocal written name = str(node.get("/T", "")) full = f"{parent_path}.{name}" if parent_path else name kids = node.get("/Kids") if kids and any("/T" in k for k in kids): for kid in kids: _apply(kid, full) return if full not in data: return val = data[full] kind = _classify_field(node) if kind == "checkbox": if val in ("true", "True", "1", True): ap = node.get("/AP") if ap and "/N" in ap: checked_name = next((str(k) for k in ap["/N"].keys() if str(k) != "/Off"), "/Yes") if not checked_name.startswith("/"): checked_name = f"/{checked_name}" node["/V"] = pikepdf.Name(checked_name) node["/AS"] = pikepdf.Name(checked_name) else: node["/V"] = pikepdf.Name("/Off") node["/AS"] = pikepdf.Name("/Off") else: node["/V"] = pikepdf.String(str(val)) written += 1 for field in doc.Root.AcroForm.Fields: _apply(field) acro = doc.Root.AcroForm if "/NeedAppearances" not in acro: acro["/NeedAppearances"] = True try: doc.save(out_path) except Exception as exc: Output.error("SaveError", f"Save failed: {exc}", code=4) Output.success({"output": out_path, "fields_filled": written, "fields_requested": len(data)}) # --- form.detail (pypdf-based detailed field extraction) --- def _get_full_annotation_field_id(annotation): """Build dotted field ID by walking parent chain.""" components = [] while annotation: field_name = annotation.get('/T') if field_name: components.append(field_name) annotation = annotation.get('/Parent') return ".".join(reversed(components)) if components else None def _make_field_dict(field, field_id): field_dict = {"field_id": field_id} ft = field.get('/FT') if ft == "/Tx": field_dict["type"] = "text" elif ft == "/Btn": field_dict["type"] = "checkbox" states = field.get("/_States_", []) if len(states) == 2: if "/Off" in states: field_dict["checked_value"] = states[0] if states[0] != "/Off" else states[1] field_dict["unchecked_value"] = "/Off" else: print(f"Unexpected state values for checkbox `${field_id}`. Its checked and unchecked values may not be correct; if you're trying to check it, visually verify the results.") field_dict["checked_value"] = states[0] field_dict["unchecked_value"] = states[1] elif ft == "/Ch": field_dict["type"] = "choice" states = field.get("/_States_", []) field_dict["choice_options"] = [{ "value": state[0], "text": state[1], } for state in states] else: field_dict["type"] = f"unknown ({ft})" return field_dict def _get_field_info(reader) -> list: """Extract detailed field info from a PdfReader, including radio group aggregation.""" fields = reader.get_fields() field_info_by_id = {} possible_radio_names: Set[str] = set() for field_id, field in fields.items(): if field.get("/Kids"): if field.get("/FT") == "/Btn": possible_radio_names.add(field_id) continue field_info_by_id[field_id] = _make_field_dict(field, field_id) radio_fields_by_id: Dict[str, dict] = {} for page_index, page in enumerate(reader.pages): annotations = page.get('/Annots', []) for ann in annotations: field_id = _get_full_annotation_field_id(ann) if field_id in field_info_by_id: field_info_by_id[field_id]["page"] = page_index + 1 field_info_by_id[field_id]["rect"] = ann.get('/Rect') elif field_id in possible_radio_names: try: on_values = [v for v in ann["/AP"]["/N"] if v != "/Off"] except KeyError: continue if len(on_values) == 1: rect = ann.get("/Rect") if field_id not in radio_fields_by_id: radio_fields_by_id[field_id] = { "field_id": field_id, "type": "radio_group", "page": page_index + 1, "radio_options": [], } radio_fields_by_id[field_id]["radio_options"].append({ "value": on_values[0], "rect": rect, }) # Filter fields without location fields_with_location = [] for field_info in field_info_by_id.values(): if "page" in field_info: fields_with_location.append(field_info) else: print(f"Unable to determine location for field id: {field_info.get('field_id')}, ignoring") # Sort by page number, then Y position (flipped), then X def sort_key(f): if "radio_options" in f: rect = f["radio_options"][0]["rect"] or [0, 0, 0, 0] else: rect = f.get("rect") or [0, 0, 0, 0] adjusted_position = [-rect[1], rect[0]] return [f.get("page"), adjusted_position] sorted_fields = fields_with_location + list(radio_fields_by_id.values()) sorted_fields.sort(key=sort_key) return sorted_fields @cmd("form.detail") def form_detail(argv: list): """Extract detailed field info (pypdf version) to JSON.""" if len(argv) < 2: Output.error("MissingArg", "Usage: form.detail ") pdf_path = argv[0] json_output_path = argv[1] from pypdf import PdfReader reader = PdfReader(pdf_path) field_info = _get_field_info(reader) with open(json_output_path, "w") as f: json.dump(field_info, f, indent=2) print(f"Wrote {len(field_info)} fields to {json_output_path}") raise SystemExit(0) # --- form.fill-legacy (pypdf version with monkeypatch) --- def _validation_error_for_field_value(field_info, field_value): field_type = field_info["type"] field_id = field_info["field_id"] if field_type == "checkbox": checked_val = field_info["checked_value"] unchecked_val = field_info["unchecked_value"] if field_value != checked_val and field_value != unchecked_val: return f'ERROR: Invalid value "{field_value}" for checkbox field "{field_id}". The checked value is "{checked_val}" and the unchecked value is "{unchecked_val}"' elif field_type == "radio_group": option_values = [opt["value"] for opt in field_info["radio_options"]] if field_value not in option_values: return f'ERROR: Invalid value "{field_value}" for radio group field "{field_id}". Valid values are: {option_values}' elif field_type == "choice": choice_values = [opt["value"] for opt in field_info["choice_options"]] if field_value not in choice_values: return f'ERROR: Invalid value "{field_value}" for choice field "{field_id}". Valid values are: {choice_values}' return None def _monkeypatch_pypdf_method(): """ Workaround for pypdf bug with selection list fields. pypdf's get_inherited returns a list of two-element lists for /Opt fields in selection lists, causing join() to throw TypeError. We patch it to return just the value strings. """ from pypdf.generic import DictionaryObject from pypdf.constants import FieldDictionaryAttributes original_get_inherited = DictionaryObject.get_inherited def patched_get_inherited(self, key: str, default=None): result = original_get_inherited(self, key, default) if key == FieldDictionaryAttributes.Opt: if isinstance(result, list) and all(isinstance(v, list) and len(v) == 2 for v in result): result = [r[0] for r in result] return result DictionaryObject.get_inherited = patched_get_inherited @cmd("form.fill-legacy") def form_fill_legacy(argv: list): """Fill fillable form fields (pypdf version with monkeypatch).""" if len(argv) < 3: Output.error("MissingArg", "Usage: form.fill-legacy ") input_pdf = argv[0] fields_json = argv[1] output_pdf = argv[2] from pypdf import PdfReader, PdfWriter _monkeypatch_pypdf_method() with open(fields_json) as f: fields = json.load(f) # Group by page number fields_by_page: Dict[int, dict] = {} for field in fields: if "value" in field: field_id = field["field_id"] page = field["page"] if page not in fields_by_page: fields_by_page[page] = {} fields_by_page[page][field_id] = field["value"] reader = PdfReader(input_pdf) has_error = False field_info = _get_field_info(reader) fields_by_ids = {f["field_id"]: f for f in field_info} for field in fields: existing_field = fields_by_ids.get(field["field_id"]) if not existing_field: has_error = True print(f"ERROR: `{field['field_id']}` is not a valid field ID") elif field["page"] != existing_field["page"]: has_error = True print(f"ERROR: Incorrect page number for `{field['field_id']}` (got {field['page']}, expected {existing_field['page']})") else: if "value" in field: err = _validation_error_for_field_value(existing_field, field["value"]) if err: print(err) has_error = True if has_error: raise SystemExit(1) writer = PdfWriter(clone_from=reader) for page, field_values in fields_by_page.items(): writer.update_page_form_field_values(writer.pages[page - 1], field_values, auto_regenerate=False) writer.set_need_appearances_writer(True) with open(output_pdf, "wb") as f: writer.write(f) print(f"Filled {len(fields_by_page)} page(s) in {output_pdf}") raise SystemExit(0) # --- form.annotate (annotation-based filling with coordinate transform) --- def _transform_coordinates(bbox, image_width, image_height, pdf_width, pdf_height): """Transform bounding box from image coordinates to PDF coordinates.""" x_scale = pdf_width / image_width y_scale = pdf_height / image_height left = bbox[0] * x_scale right = bbox[2] * x_scale # Flip Y coordinates for PDF top = pdf_height - (bbox[1] * y_scale) bottom = pdf_height - (bbox[3] * y_scale) return left, bottom, right, top def _normalise_fields_json(raw: dict) -> dict: """Accept both the current sheet-based schema and the legacy flat schema. Current (v2) schema uses ``sheet[].pg/dims/regions[]`` with nested ``label.bbox``, ``target.bbox``, ``ink{}``. Legacy (v1) schema uses ``pages[]`` + ``form_fields[]`` with flat keys like ``entry_bounding_box``, ``label_bounding_box``, ``entry_text{}``. Returns a normalised dict in the **v2** internal format used by all downstream functions. """ # Already v2 if "sheet" in raw: return raw # Convert legacy → v2 pages_lut = {p["page_number"]: p for p in raw.get("pages", [])} sheets: dict = {} # pg -> sheet entry for f in raw.get("form_fields", []): pg = f["page_number"] if pg not in sheets: pi = pages_lut.get(pg, {}) sheets[pg] = { "pg": pg, "dims": [pi.get("image_width", 0), pi.get("image_height", 0)], "regions": [], } et = f.get("entry_text", {}) region = { "id": f.get("field_label", f.get("description", "")), "hint": f.get("description", ""), "label": {"tag": f.get("field_label", ""), "bbox": f.get("label_bounding_box", [0, 0, 0, 0])}, "target": {"bbox": f.get("entry_bounding_box", [0, 0, 0, 0])}, "ink": {}, } if isinstance(et, dict) and et.get("text"): region["ink"]["value"] = et["text"] if "font_size" in et: region["ink"]["size"] = et["font_size"] if "font_color" in et: region["ink"]["color"] = et["font_color"] if "font" in et: region["ink"]["font"] = et["font"] sheets[pg]["regions"].append(region) return {"sheet": list(sheets.values())} @cmd("form.annotate") def form_annotate(argv: list): """Fill a PDF by adding text annotations (FreeText) defined in fields.json.""" if len(argv) < 3: Output.error("MissingArg", "Usage: form.annotate ") input_pdf = argv[0] fields_json_path = argv[1] output_pdf = argv[2] from pypdf import PdfReader, PdfWriter from pypdf.annotations import FreeText with open(fields_json_path, "r") as f: fields_data = _normalise_fields_json(json.load(f)) reader = PdfReader(input_pdf) writer = PdfWriter() writer.append(reader) # Get PDF dimensions for each page pdf_dimensions = {} for i, page in enumerate(reader.pages): mediabox = page.mediabox pdf_dimensions[i + 1] = [mediabox.width, mediabox.height] annotations = [] for page_entry in fields_data["sheet"]: pg = page_entry["pg"] image_width, image_height = page_entry["dims"] pdf_width, pdf_height = pdf_dimensions[pg] for region in page_entry["regions"]: ink = region.get("ink", {}) text = ink.get("value", "") if not text: continue transformed_box = _transform_coordinates( region["target"]["bbox"], image_width, image_height, pdf_width, pdf_height ) font_name = ink.get("font", "Arial") font_size = str(ink.get("size", 14)) + "pt" font_color = ink.get("color", "000000") annotation = FreeText( text=text, rect=transformed_box, font=font_name, font_size=font_size, font_color=font_color, border_color=None, background_color=None, ) annotations.append(annotation) writer.add_annotation(page_number=pg - 1, annotation=annotation) with open(output_pdf, "wb") as output: writer.write(output) print(f"Successfully filled PDF form and saved to {output_pdf}") print(f"Added {len(annotations)} text annotations") raise SystemExit(0) # --- form.render (PDF to PNG images) --- @cmd("form.render") def form_render(argv: list): """Convert each page of a PDF to a PNG image.""" if len(argv) < 2: Output.error("MissingArg", "Usage: form.render [--max-dim N]") pdf_path = argv.pop(0) output_dir = argv.pop(0) max_dim_str = _pop_flag(argv, "-m", "--max-dim") max_dim = int(max_dim_str) if max_dim_str else 1000 from pdf2image import convert_from_path os.makedirs(output_dir, exist_ok=True) images = convert_from_path(pdf_path, dpi=200) for i, image in enumerate(images): width, height = image.size if width > max_dim or height > max_dim: scale_factor = min(max_dim / width, max_dim / height) new_width = int(width * scale_factor) new_height = int(height * scale_factor) image = image.resize((new_width, new_height)) image_path = os.path.join(output_dir, f"page_{i+1}.png") image.save(image_path) print(f"Saved page {i+1} as {image_path} (size: {image.size})") print(f"Converted {len(images)} pages to PNG images") raise SystemExit(0) # --- form.validate (bounding box validation image) --- @cmd("form.validate") def form_validate(argv: list): """Create validation images with bounding box rectangles.""" if len(argv) < 4: Output.error("MissingArg", "Usage: form.validate ") page_number = int(argv[0]) fields_json_path = argv[1] input_path = argv[2] output_path = argv[3] from PIL import Image, ImageDraw with open(fields_json_path, 'r') as f: data = _normalise_fields_json(json.load(f)) img = Image.open(input_path) draw = ImageDraw.Draw(img) num_boxes = 0 for page_entry in data["sheet"]: if page_entry["pg"] != page_number: continue for region in page_entry["regions"]: target_box = region["target"]["bbox"] label_box = region["label"]["bbox"] draw.rectangle(target_box, outline='red', width=2) draw.rectangle(label_box, outline='blue', width=2) num_boxes += 2 img.save(output_path) print(f"Created validation image at {output_path} with {num_boxes} bounding boxes") raise SystemExit(0) # --- form.check-bbox (bounding box overlap detection) --- @dataclass class _RectAndField: rect: list rect_type: str field: dict def get_bounding_box_messages(fields_json_stream) -> List[str]: """Check for overlapping bounding boxes. Returns list of messages (max 20).""" messages = [] raw = json.load(fields_json_stream) data = _normalise_fields_json(raw) total_regions = sum(len(pe["regions"]) for pe in data["sheet"]) messages.append(f"Read {total_regions} regions across {len(data['sheet'])} page(s)") def rects_intersect(r1, r2): disjoint_horizontal = r1[0] >= r2[2] or r1[2] <= r2[0] disjoint_vertical = r1[1] >= r2[3] or r1[3] <= r2[1] return not (disjoint_horizontal or disjoint_vertical) has_error = False for page_entry in data["sheet"]: pg = page_entry["pg"] # Collect all rects on this page rects_and_regions = [] for region in page_entry["regions"]: rects_and_regions.append(_RectAndField(region["label"]["bbox"], "label", region)) rects_and_regions.append(_RectAndField(region["target"]["bbox"], "target", region)) for i, ri in enumerate(rects_and_regions): for j in range(i + 1, len(rects_and_regions)): rj = rects_and_regions[j] if rects_intersect(ri.rect, rj.rect): has_error = True rid = ri.field.get("id", ri.field.get("hint", "?")) rjd = rj.field.get("id", rj.field.get("hint", "?")) if ri.field is rj.field: messages.append(f"FAILURE: pg {pg} — label/target overlap for `{rid}` ({ri.rect}, {rj.rect})") else: messages.append(f"FAILURE: pg {pg} — {ri.rect_type} of `{rid}` ({ri.rect}) overlaps {rj.rect_type} of `{rjd}` ({rj.rect})") if len(messages) >= 20: messages.append("Aborting further checks; fix bounding boxes and try again") return messages # Height check for target rects if ri.rect_type == "target": ink = ri.field.get("ink", {}) if ink.get("value"): font_size = ink.get("size", 14) entry_height = ri.rect[3] - ri.rect[1] if entry_height < font_size: has_error = True rid = ri.field.get("id", ri.field.get("hint", "?")) messages.append(f"FAILURE: pg {pg} — target box height ({entry_height}) for `{rid}` is shorter than font size ({font_size}). Increase box height or decrease ink.size.") if len(messages) >= 20: messages.append("Aborting further checks; fix bounding boxes and try again") return messages if not has_error: messages.append("SUCCESS: All bounding boxes are valid") return messages @cmd("form.check-bbox") def form_check_bbox(argv: list): """Check bounding boxes in fields.json for overlaps.""" if not argv: Output.error("MissingArg", "Usage: form.check-bbox ") with open(argv[0]) as f: messages = get_bounding_box_messages(f) for msg in messages: print(msg) raise SystemExit(0) # ═══════════════════════════════════════════════════════════════ # Section 6: convert — office, HTML, and LaTeX # ═══════════════════════════════════════════════════════════════ _CONVERTIBLE_EXTENSIONS = frozenset({ ".docx", ".doc", ".odt", ".rtf", ".pptx", ".ppt", ".odp", ".xlsx", ".xls", ".ods", ".csv", ".txt", ".html", ".htm", }) _SOFFICE_CANDIDATES = [ "/Applications/LibreOffice.app/Contents/MacOS/soffice", os.path.expanduser("~/Applications/LibreOffice.app/Contents/MacOS/soffice"), "/usr/bin/soffice", "/usr/local/bin/soffice", "/usr/lib/libreoffice/program/soffice", "/opt/libreoffice/program/soffice", "/snap/bin/libreoffice.soffice", ] def _locate_soffice() -> Optional[str]: """Search for a working soffice binary.""" for candidate in _SOFFICE_CANDIDATES: if Path(candidate).is_file(): return candidate for alias in ("soffice", "libreoffice"): found = shutil.which(alias) if found: return found return None @cmd("convert.office") def convert_office(argv: list): """Convert an office document to PDF via LibreOffice.""" if not argv: Output.error("MissingArg", "input file required") src_path = argv.pop(0) out_path = _pop_flag(argv, "-o", "--output") src = Output.check_file(src_path) ext = src.suffix.lower() if ext not in _CONVERTIBLE_EXTENSIONS: Output.error( "UnsupportedFormat", f"Unsupported format: {ext}", hint=f"Supported formats: {', '.join(sorted(_CONVERTIBLE_EXTENSIONS))}", ) binary = _locate_soffice() if binary is None: Output.error( "DependencyMissing", "LibreOffice not found", hint="Please install LibreOffice: https://www.libreoffice.org/download/", ) target_dir = Path(out_path).parent if out_path else src.parent target_dir.mkdir(parents=True, exist_ok=True) cmd_list = [binary, "--headless", "--convert-to", "pdf", "--outdir", str(target_dir), str(src)] try: proc = subprocess.run(cmd_list, capture_output=True, text=True, timeout=120) if proc.returncode != 0: Output.error("ConvertError", f"Conversion failed: {proc.stderr.strip() or 'Unknown error'}", code=4) except subprocess.TimeoutExpired: Output.error("Timeout", "Conversion timeout (>120s)", code=4) except Exception as exc: Output.error("ConvertError", f"Conversion failed: {exc}", code=4) auto_name = target_dir / f"{src.stem}.pdf" final = Path(out_path) if out_path else auto_name if out_path and final.name != auto_name.name and auto_name.exists(): auto_name.rename(final) if not final.exists(): Output.error("ConvertError", "Converted PDF file was not generated", code=4) Output.success({"input": str(src), "output": str(final), "format": ext}) @cmd("convert.html") def convert_html(argv: list): """Convert HTML to PDF via node html2pdf.js.""" if not argv: Output.error("MissingArg", "input file required") js_path = _SCRIPT_DIR / "html2pdf.js" if not js_path.exists(): Output.error("DependencyMissing", "html2pdf.js not found in scripts directory") node_path = shutil.which("node") if not node_path: Output.error("DependencyMissing", "node not found in PATH") cmd_list = [node_path, str(js_path)] + argv try: proc = subprocess.run(cmd_list, timeout=180) raise SystemExit(proc.returncode) except subprocess.TimeoutExpired: Output.error("Timeout", "HTML conversion timeout (>180s)", code=4) except SystemExit: raise except Exception as exc: Output.error("ConvertError", f"HTML conversion failed: {exc}", code=4) # --- convert.latex (tectonic wrapper with log filtering + PDF stats) --- _NOISE_RE = re.compile( r"^note: (?:" r'"version 2" Tectonic' r"|Running TeX" r"|Rerunning TeX because" r"|Running xdvipdfmx" r"|downloading " r"|Skipped writing .* intermediate files" r")" ) def _find_tectonic() -> Optional[str]: """Locate the tectonic binary. Search order: 1. scripts/ dir (bundled binary — macOS arm64 only) 2. ~/tectonic (user-placed binary) 3. System PATH (package-manager install) NOTE: The bundled binary at scripts/tectonic is a macOS arm64 (Apple Silicon) Mach-O executable. It will NOT run on Linux or Windows. On other platforms, install tectonic via the system package manager or download the correct binary — see the error message in convert_latex() for instructions. """ local_bin = _SCRIPT_DIR / "tectonic" if local_bin.exists() and os.access(local_bin, os.X_OK): return str(local_bin) home_bin = Path.home() / "tectonic" if home_bin.exists() and os.access(home_bin, os.X_OK): return str(home_bin) system_bin = shutil.which("tectonic") return system_bin def _human_size(nbytes: int) -> str: for unit in ("B", "KB", "MB", "GB"): if nbytes < 1024: return f"{nbytes:.2f} {unit}" nbytes /= 1024 return f"{nbytes:.2f} TB" def _pdf_stats(pdf_file: Path): """Return (pages, word_count, image_count) or Nones.""" try: from pypdf import PdfReader except ImportError: for attempt in ( [sys.executable, "-m", "pip", "install", "-q", "pypdf"], [sys.executable, "-m", "pip", "install", "-q", "--break-system-packages", "pypdf"], [sys.executable, "-m", "pip", "install", "-q", "--user", "pypdf"], ): if subprocess.run(attempt, check=False, capture_output=True).returncode == 0: break try: from pypdf import PdfReader except ImportError: return None, None, None try: reader = PdfReader(str(pdf_file)) n_pages = len(reader.pages) all_text = "".join(p.extract_text() or "" for p in reader.pages) n_words = len([w for w in all_text.split() if w.strip()]) n_images = 0 for pg in reader.pages: xobj = pg.get("/Resources", {}).get("/XObject") if xobj: obj = xobj.get_object() n_images += sum(1 for k in obj if obj[k].get("/Subtype") == "/Image") return n_pages, n_words, n_images except Exception as exc: print(f"Error extracting PDF info: {exc}", file=sys.stderr) return None, None, None def _classify_lines(lines): """Bucket raw output into errors / warnings / layout issues.""" errors, warnings, layout, pdf_note = [], [], [], None for raw in lines: ln = raw.rstrip() if not ln: continue if _NOISE_RE.match(ln): if ln.startswith("note: Writing"): pdf_note = ln continue if ln.startswith("error:"): errors.append(ln) elif ln.startswith("warning:"): warnings.append(ln) elif re.search(r"(Overfull|Underfull) \\[hv]box", ln) or re.search(r"(Font shape|Missing character)", ln): layout.append(ln) return errors, warnings, layout, pdf_note def _parse_writing_note(note: Optional[str]): m = re.search(r"Writing `(.+?)` \((.+?)\)", note or "") return (m.group(1), m.group(2)) if m else (None, None) @cmd("convert.blueprint") def convert_blueprint(argv: list): """ [Auto-Pipeline] Extract JSON blueprint from LLM markdown response, compile it to HTML via design_engine, and render to PDF. """ if not argv: Output.error("MissingArg", "Usage: convert.blueprint [-o ]") input_file = argv.pop(0) out_pdf = _pop_flag(argv, "-o", "--output") or "output.pdf" src = Output.check_file(input_file) content = src.read_text(encoding="utf-8") # 1. Smart JSON extraction (regardless of whether LLM wrapped in Markdown code blocks) match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', content, re.DOTALL) if match: json_str = match.group(1) else: # Fallback: Assume the file is pure JSON json_str = content try: # Validate JSON format parsed_json = json.loads(json_str) except json.JSONDecodeError as e: Output.error("InvalidJSON", f"Failed to parse JSON blueprint: {e}", hint="Ensure the LLM output is valid JSON without trailing commas.") # 1b. Encoding integrity check — detect corrupted characters before rendering corruption_found = False def _scan_for_corruption(obj, path=""): """Recursively scan JSON values for encoding corruption markers.""" nonlocal corruption_found if isinstance(obj, str): for i, ch in enumerate(obj): if ch == '\ufffd': # Unicode replacement character context = obj[max(0, i-10):i+10] print(f" \u26a0\ufe0f U+FFFD at {path}[{i}]: ...{context}...") corruption_found = True elif '\ud800' <= ch <= '\udfff': # Lone surrogate print(f" \u26a0\ufe0f Lone surrogate U+{ord(ch):04X} at {path}[{i}]") corruption_found = True elif isinstance(obj, dict): for k, v in obj.items(): _scan_for_corruption(v, f"{path}.{k}") elif isinstance(obj, list): for idx, v in enumerate(obj): _scan_for_corruption(v, f"{path}[{idx}]") _scan_for_corruption(parsed_json) if corruption_found: print("\n\u26a0\ufe0f WARNING: Corrupted characters detected in blueprint content!") print(" These will render as \ufffd (replacement character) in the PDF.") print(" Consider fixing the source text and re-running.\n") # Also sanitize: replace U+FFFD with empty string to avoid visible corruption json_str_clean = json_str.replace('\ufffd', '') if json_str_clean != json_str: json_str = json_str_clean print(" \u2139\ufe0f Auto-removed U+FFFD characters from blueprint.") # 2. Save clean JSON blueprint and temp HTML path blueprint_path = src.parent / f"{src.stem}_pure_blueprint.json" html_path = src.parent / f"{src.stem}_rendered.html" blueprint_path.write_text(json_str, encoding="utf-8") # 3. Call design_engine.py to compile HTML engine_script = _SCRIPT_DIR / "design_engine.py" print("🎨 [1/2] Compiling JSON Blueprint to Art-Directed HTML...", flush=True) try: subprocess.run([ sys.executable, str(engine_script), "compile", "--blueprint", str(blueprint_path), "--output", str(html_path) ], check=True) except subprocess.CalledProcessError: Output.error("CompileError", "design_engine.py failed to compile the blueprint.", code=4) # 4. Call html2pdf.js to render PDF print("📄 [2/2] Rendering HTML to High-Res PDF via Playwright...", flush=True) js_script = _SCRIPT_DIR / "html2pdf.js" node_path = shutil.which("node") if not node_path: Output.error("DependencyMissing", "node not found in PATH") try: subprocess.run([ node_path, str(js_script), str(html_path), "-o", out_pdf, "--width", "720px", "--height", "960px" ], check=True) except subprocess.CalledProcessError: Output.error("RenderError", "html2pdf.js failed to render the PDF.", code=4) print(f"\n🎉 Success! Masterpiece generated at: {out_pdf}") raise SystemExit(0) @cmd("convert.latex") def convert_latex(argv: list): """Compile LaTeX file via tectonic, filter logs, report PDF stats.""" if not argv: Output.error("MissingArg", "tex file required") tex_file = argv.pop(0) runs_str = _pop_flag(argv, "-r", "--runs") runs = int(runs_str) if runs_str else 1 keep_logs = _pop_flag(argv, "-k", "--keep-logs", needs_value=False) tex = Path(tex_file) if not tex.exists(): print(f"\u2717 Error: File not found {tex_file}") raise SystemExit(1) print(f"Compiling {tex.name}...", flush=True) if runs > 1: print(f"Running {runs} passes (for cross-references)", flush=True) tectonic = _find_tectonic() if tectonic is None: import platform print("\n\u2717 Error: tectonic command not found") print() print("The bundled scripts/tectonic binary is macOS arm64 only.") print("Install tectonic for your platform:\n") sys_name = platform.system() if sys_name == "Darwin": print(" macOS (Homebrew): brew install tectonic") print(" macOS (binary): curl -sSL https://drop-sh.fullyjustified.net | sh") print(" mv tectonic ~/tectonic && chmod +x ~/tectonic") elif sys_name == "Linux": print(" Debian/Ubuntu: apt install tectonic (if available)") print(" Arch Linux: pacman -S tectonic") print(" Conda: conda install -c conda-forge tectonic") print(" Binary download: curl -sSL https://drop-sh.fullyjustified.net | sh") print(" mv tectonic ~/tectonic && chmod +x ~/tectonic") elif sys_name == "Windows": print(" Windows (scoop): scoop install tectonic") print(" Windows (choco): choco install tectonic") print(" Conda: conda install -c conda-forge tectonic") else: print(" See: https://tectonic-typesetting.github.io/") print() print("After installing, verify with: tectonic --version") print() print("Quick check — is tectonic already installed somewhere?") print(f" which tectonic: {shutil.which('tectonic') or 'not found'}") print(f" ~/tectonic: {'exists' if (Path.home() / 'tectonic').exists() else 'not found'}") print(f" scripts/tectonic: {'exists' if (_SCRIPT_DIR / 'tectonic').exists() else 'not found'}") print(f" Platform: {sys_name} {platform.machine()}") raise SystemExit(1) all_lines = [] ok = False for _ in range(runs): try: proc = subprocess.run( [tectonic, "-X", "compile", str(tex)], capture_output=True, text=True, timeout=120, ) all_lines.extend((proc.stdout + proc.stderr).splitlines()) ok = proc.returncode == 0 if not ok: break except subprocess.TimeoutExpired: print("\n\u2717 Error: Compilation timeout (>2 minutes)") raise SystemExit(1) except Exception as exc: print(f"\n\u2717 Error: {exc}") raise SystemExit(1) if keep_logs: print("\n" + "=" * 50 + "\nFull logs:\n" + "=" * 50) for ln in all_lines: print(ln) print("=" * 50 + "\n") errors, warnings, layout, pdf_note = _classify_lines(all_lines) noted_name, noted_size = _parse_writing_note(pdf_note) pdf_name = noted_name or (tex.stem + ".pdf") pdf_path = tex.parent / pdf_name print() if ok: tag = "\u2713 Compilation successful" + (" (with warnings)" if warnings or layout else "") print(tag) else: print("\u2717 Compilation failed") if ok and pdf_path.exists(): print("\n========================\nPDF Information\n========================") print(f"File: {pdf_name}") print(f"Size: {noted_size or _human_size(pdf_path.stat().st_size)}") pages, words, images = _pdf_stats(pdf_path) if pages is not None: print(f"Pages: {pages}") if words is not None: print(f"Words: ~{words:,}") if images is not None: print(f"Images: {images}") if layout: print(f"\n========================\nLayout Issues ({len(layout)})\n========================") for ln in layout: print(ln) if warnings: print(f"\n========================\nWarnings ({len(warnings)})\n========================") for ln in warnings: print(ln.replace("warning: ", "", 1)) if errors: print("\n========================\nErrors\n========================") for ln in errors: print(ln.replace("error: ", "", 1)) if ok and (layout or warnings): print() print("") print(f"Detected {len(layout)} layout issues and {len(warnings)} warnings.") print("These issues affect PDF typesetting quality and must be fixed.") print("Do not dismiss with 'warnings don't affect output'. Fix all issues.") print("") raise SystemExit(0 if ok else 1) # ═══════════════════════════════════════════════════════════════ # Section 7b: Font Fallback Engine — automatic wrapping # ═══════════════════════════════════════════════════════════════ # -- Data structures -------------------------------------------------------- FONT_FALLBACK_CHAIN: Dict[str, List[str]] = { "Times New Roman": ["SimHei"], "Calibri": ["SimHei"], "DejaVuSans": ["SimHei"], "SimHei": ["Times New Roman"], "Microsoft YaHei": ["Times New Roman"], } FONT_PREFER_RANGES: Dict[str, List[Tuple[int, int, str]]] = { "SimHei": [ (0x0400, 0x04FF, "Times New Roman"), # Cyrillic (0x0500, 0x052F, "Times New Roman"), # Cyrillic Supplement ], "Microsoft YaHei": [ (0x0400, 0x04FF, "Times New Roman"), (0x0500, 0x052F, "Times New Roman"), ], } # -- Content sanitization (runtime, pre-font-fallback) ---------------------- # Control: global switches CONTENT_SANITIZE_ENABLED = True # master switch CONTENT_SANITIZE_STRIP_ZW = True # zero-width chars (disable for Thai/Burmese/Hindi) # Fullwidth ASCII → halfwidth (letters + digits only, NOT punctuation) _FULLWIDTH_OFFSET = 0xFEE0 # ord('A') - ord('A') == 0xFEE0 # Ligature decomposition (U+FB00–FB06) _LIGATURE_MAP: Dict[int, str] = { 0xFB00: "ff", 0xFB01: "fi", 0xFB02: "fl", 0xFB03: "ffi", 0xFB04: "ffl", 0xFB05: "st", # ſt long s t 0xFB06: "st", # st st } # Zero-width / invisible characters to strip _ZERO_WIDTH_CHARS: Set[int] = { 0x200B, # ZERO WIDTH SPACE 0x200C, # ZERO WIDTH NON-JOINER 0x200D, # ZERO WIDTH JOINER 0x2060, # WORD JOINER 0xFEFF, # BOM (when not at file start) 0x034F, # COMBINING GRAPHEME JOINER } # Bidirectional control characters to strip _BIDI_CONTROLS: Set[int] = { 0x200E, 0x200F, # LRM, RLM 0x202A, 0x202B, 0x202C, # LRE, RLE, PDF 0x202D, 0x202E, # LRO, RLO 0x2066, 0x2067, 0x2068, 0x2069, # LRI, RLI, FSI, PDI } # Variation selectors to strip (emoji style modifiers) _VARIATION_SELECTORS: Set[int] = set(range(0xFE00, 0xFE10)) # U+FE00–FE0F # Unicode noncharacters (should never appear in text interchange) _NONCHARACTERS: Set[int] = set(range(0xFDD0, 0xFDF0)) # U+FDD0–FDEF _NONCHARACTERS.add(0xFFFE) _NONCHARACTERS.add(0xFFFF) _content_sanitize_warnings: List[str] = [] def content_sanitize(text: str, dry_run: bool = False) -> str: """Sanitize Paragraph text content before font fallback. Removes/replaces characters that should not appear in final PDF output. Designed for CJK + Latin content. Does NOT touch XML/HTML tags in the text. Args: text: Raw Paragraph text (may contain ReportLab XML tags like , ). dry_run: If True, collect warnings but still return cleaned text. Returns: Cleaned text string. """ if not CONTENT_SANITIZE_ENABLED or not text: return text global _content_sanitize_warnings if dry_run: _content_sanitize_warnings = [] # Split text into tags and plain-text segments to avoid mangling XML tags # e.g. '你好' → ['', '', '你好', '', ''] parts = _TAG_RE.split(text) tags = _TAG_RE.findall(text) out_pieces: List[str] = [] for i, part in enumerate(parts): if part: # Process plain text segment character by character cleaned: List[str] = [] for ch in part: code = ord(ch) replacement = _sanitize_one_char(ch, code, dry_run) if replacement is not None: cleaned.append(replacement) # else: character was deleted out_pieces.append("".join(cleaned)) # Append the tag that follows this part (if any) if i < len(tags): out_pieces.append(tags[i]) # tags pass through untouched return "".join(out_pieces) def _sanitize_one_char(ch: str, code: int, dry_run: bool) -> Optional[str]: """Process a single character. Returns replacement string, or None to delete.""" # --- DELETE: ASCII control characters (except \t \n \r) --- if code <= 0x1F and ch not in '\t\n\r': if dry_run: _content_sanitize_warnings.append( f"DELETE control char U+{code:04X} ({repr(ch)})") return None # --- DELETE: U+007F DEL --- if code == 0x7F: return None # --- DELETE: zero-width characters --- if CONTENT_SANITIZE_STRIP_ZW and code in _ZERO_WIDTH_CHARS: if dry_run: _content_sanitize_warnings.append( f"DELETE zero-width U+{code:04X}") return None # --- DELETE: bidirectional controls --- if code in _BIDI_CONTROLS: if dry_run: _content_sanitize_warnings.append( f"DELETE bidi control U+{code:04X}") return None # --- DELETE: variation selectors --- if code in _VARIATION_SELECTORS: if dry_run: _content_sanitize_warnings.append( f"DELETE variation selector U+{code:04X}") return None # --- DELETE: Unicode noncharacters --- if code in _NONCHARACTERS: return None # --- REPLACE: U+FFFD replacement character → '?' --- if code == 0xFFFD: if dry_run: _content_sanitize_warnings.append( "REPLACE U+FFFD (upstream encoding error) → '?'") return '?' # --- REPLACE: fullwidth ASCII letters/digits → halfwidth --- # U+FF21–FF3A = A–Z, U+FF41–FF5A = a–z, U+FF10–FF19 = 0–9 if 0xFF21 <= code <= 0xFF3A or 0xFF41 <= code <= 0xFF5A or 0xFF10 <= code <= 0xFF19: halfwidth = chr(code - _FULLWIDTH_OFFSET) if dry_run: _content_sanitize_warnings.append( f"REPLACE fullwidth '{ch}' → halfwidth '{halfwidth}'") return halfwidth # --- REPLACE: ligatures → decomposed --- if code in _LIGATURE_MAP: decomposed = _LIGATURE_MAP[code] if dry_run: _content_sanitize_warnings.append( f"REPLACE ligature '{ch}' (U+{code:04X}) → '{decomposed}'") return decomposed # --- REPLACE: line/paragraph separators → newline --- if code == 0x2028 or code == 0x2029: if dry_run: _content_sanitize_warnings.append( f"REPLACE U+{code:04X} separator → '\\n'") return '\n' # --- REPLACE: soft hyphen → empty (invisible, ReportLab ignores it) --- if code == 0x00AD: return None # --- WARN (pass through): Private Use Area --- if 0xE000 <= code <= 0xF8FF: if dry_run: _content_sanitize_warnings.append( f"WARN Private Use Area U+{code:04X} (passed through)") return ch # --- PASS: everything else --- return ch # -- Glyph detection ------------------------------------------------------- _glyph_cache: Dict[Tuple[str, int], bool] = {} def _has_glyph(font_name: str, code: int) -> bool: """Check whether *font_name* has a real glyph outline for *code*.""" key = (font_name, code) if key in _glyph_cache: return _glyph_cache[key] try: from reportlab.pdfbase import pdfmetrics font = pdfmetrics.getFont(font_name) result = code in font.face.charToGlyph except Exception: result = False _glyph_cache[key] = result return result def _best_font_for_char(code: int, base_font: str) -> str: """Return the best registered font for a single character *code*.""" # ASCII fast path if code < 128: return base_font # Aesthetic preference ranges for rng_start, rng_end, preferred in FONT_PREFER_RANGES.get(base_font, []): if rng_start <= code <= rng_end: if _has_glyph(preferred, code): return preferred # Base font has the glyph — use it if _has_glyph(base_font, code): return base_font # Walk the fallback chain for fb in FONT_FALLBACK_CHAIN.get(base_font, []): if _has_glyph(fb, code): return fb # Nothing found — return base_font (will render □ but won't crash) return base_font # -- Text-level automatic wrapping ---------------------------------- _TAG_RE = re.compile(r"]*>") def font_fallback(text: str, base_font: str) -> str: """Wrap characters that *base_font* cannot render with ```` tags. Preserves existing XML tags (````, ````, ````, etc.). Returns the transformed markup string. """ if not text: return text # Split text into (plain, tag, plain, tag, ...) segments parts = _TAG_RE.split(text) tags = _TAG_RE.findall(text) # Track nesting so we don't re-wrap inside explicit font_stack: List[str] = [] out_pieces: List[str] = [] for i, part in enumerate(parts): # Determine effective font at this point effective = font_stack[-1] if font_stack else base_font if part: # Process plain text character by character runs: List[Tuple[str, List[str]]] = [] for ch in part: code = ord(ch) best = _best_font_for_char(code, effective) if runs and runs[-1][0] == best: runs[-1][1].append(ch) else: runs.append((best, [ch])) for font_name, chars in runs: segment = "".join(chars) if font_name == effective: out_pieces.append(segment) else: out_pieces.append(f'{segment}') # Append the tag that follows this part (if any) if i < len(tags): tag = tags[i] out_pieces.append(tag) # Track font stack tag_lower = tag.lower() if tag_lower.startswith("" and font_stack: font_stack.pop() return "".join(out_pieces) # -- Monkey-patch installer ------------------------------------------------- _fallback_installed = False def install_font_fallback(): """Monkey-patch ``Paragraph.__init__`` so every Paragraph automatically runs ``font_fallback()`` on its text. Idempotent — safe to call multiple times. """ global _fallback_installed if _fallback_installed: return try: from reportlab.platypus import Paragraph as _Para except ImportError: return _orig_init = _Para.__init__ def _patched_init(self, text, style=None, *args, **kwargs): if isinstance(text, str): text = content_sanitize(text) # Layer 1: clean dangerous chars if style is not None: base_font = getattr(style, "fontName", None) if base_font and base_font in FONT_FALLBACK_CHAIN: text = font_fallback(text, base_font) # Layer 2: font selection _orig_init(self, text, style, *args, **kwargs) _Para.__init__ = _patched_init _fallback_installed = True # -- Post-generation glyph check ------------------------------------------- def check_missing_glyphs(pdf_path: str) -> List[Dict[str, Any]]: """Scan a PDF for .notdef glyphs, control chars, and other problematic characters. Returns a list of dicts with keys: page, position, context, kind. Empty list means no issues found. """ try: import fitz # PyMuPDF except ImportError: print("Warning: PyMuPDF (fitz) not installed — cannot check missing glyphs", file=sys.stderr) return [] issues: List[Dict[str, Any]] = [] doc = fitz.open(pdf_path) for page_num in range(len(doc)): page = doc[page_num] text = page.get_text() for i, ch in enumerate(text): code = ord(ch) kind: Optional[str] = None if ch == "\x00": kind = "notdef" elif code <= 0x1F and ch not in '\t\n\r': kind = "control_char" elif code == 0x7F: kind = "control_char" elif code == 0xFFFD: kind = "replacement_char" elif code in _ZERO_WIDTH_CHARS: kind = "zero_width" elif code in _BIDI_CONTROLS: kind = "bidi_control" elif code in _VARIATION_SELECTORS: kind = "variation_selector" elif 0xE000 <= code <= 0xF8FF: kind = "private_use_area" if kind: start = max(0, i - 10) end = min(len(text), i + 11) context = text[start:end].replace("\x00", "□") issues.append({ "page": page_num + 1, "position": i, "char": f"U+{code:04X}", "kind": kind, "context": context, }) doc.close() # Deduplicate by page + context + kind seen: Set[str] = set() unique: List[Dict[str, Any]] = [] for item in issues: key = f"{item['page']}:{item['kind']}:{item['context']}" if key not in seen: seen.add(key) unique.append(item) return unique @cmd("font.check") def font_check(argv: list): """Scan a PDF for missing glyphs (□ boxes) after generation.""" if not argv: Output.error("MissingArg", "Usage: font.check ") pdf_path = argv[0] if not os.path.isfile(pdf_path): Output.error("FileNotFound", f"File not found: {pdf_path}") issues = check_missing_glyphs(pdf_path) # Group by kind for better reporting by_kind: Dict[str, List[Dict]] = {} for item in issues: k = item.get("kind", "unknown") by_kind.setdefault(k, []).append(item) result: Dict[str, Any] = { "status": "issues_found" if issues else "ok", "total_issues": len(issues), "by_kind": {k: len(v) for k, v in by_kind.items()}, } if issues: result["issues"] = issues[:30] # cap output result["hints"] = { "notdef": "Missing glyphs (□). Fix: call install_font_fallback() after font registration.", "control_char": "Control characters leaked into PDF. Fix: content_sanitize() should strip these.", "replacement_char": "U+FFFD found — upstream encoding error corrupted original character.", "zero_width": "Zero-width chars found. Usually harmless visually but affect text search/copy.", "bidi_control": "Bidirectional control chars found. May cause text direction issues.", "variation_selector": "Variation selectors found. No visual effect in ReportLab but shouldn't be here.", "private_use_area": "Private Use Area chars found. Will render as □ unless a custom font covers them.", } print(json.dumps(result, ensure_ascii=False, indent=2)) raise SystemExit(0 if not issues else 1) @cmd("toc.check") def toc_check(argv: list): """Validate TOC quality in a PDF file (page numbers, entries, links).""" if not argv: Output.error("MissingArg", "Usage: toc.check ") pdf_path = argv[0] if not os.path.isfile(pdf_path): Output.error("FileNotFound", f"File not found: {pdf_path}") # Locate toc_validate.py relative to this script script_dir = Path(__file__).resolve().parent toc_script = script_dir / "toc_validate.py" if not toc_script.exists(): # Try parent's scripts dir candidate = script_dir.parent / "scripts" / "toc_validate.py" if candidate.exists(): toc_script = candidate if toc_script.exists(): # Import and call directly import importlib.util spec = importlib.util.spec_from_file_location("toc_validate", str(toc_script)) toc_mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(toc_mod) result = toc_mod.check_pdf(pdf_path) else: Output.error("ScriptNotFound", "toc_validate.py not found. Expected at: " + str(script_dir / "toc_validate.py")) # Add remediation hints per error code remediation = { "TOC_ALL_SAME_PAGE": "ReportLab: use multiBuild() instead of build() with TocDocTemplate", "TOC_NO_ENTRIES": "Check afterFlowable() notifies TOC correctly; ensure headings have bookmark_name/bookmark_level attributes", "TOC_PAGES_INVALID": "TOC entry references a page beyond document total — regenerate TOC", "TOC_NOT_FOUND": "Document has many pages but no TOC detected in first 5 pages", "TOC_LINKS_MISSING": "TOC entries exist but no clickable links — check bookmark_key in afterFlowable()", "TOC_ON_FIRST_PAGE": "Add a cover page before the TOC, or insert PageBreak() between cover and TOC. Expected: Cover(p1) → TOC(p2) → Content(p3+)", } for err in result.get("errors", []): code = err.get("code", "") if code in remediation: err["fix"] = remediation[code] for warn in result.get("warnings", []): code = warn.get("code", "") if code in remediation: warn["fix"] = remediation[code] print(json.dumps(result, ensure_ascii=False, indent=2)) raise SystemExit(0 if result.get("pass", False) else 1) # ═══════════════════════════════════════════════════════════════ # Section 7: code — sanitization pipeline for PDF generation code # ═══════════════════════════════════════════════════════════════ # --- Step 0: restore literal unicode escapes/entities to real chars --- _RE_UNICODE_ESC = re.compile(r"(\\u[0-9a-fA-F]{4})|(\\U[0-9a-fA-F]{8})|(\\x[0-9a-fA-F]{2})") def _restore_escapes(s: str) -> str: # HTML entities: ³ ≤ α ... s = html.unescape(s) # Literal backslash escapes: "\\u00B3" -> "³" def _dec(m: re.Match) -> str: esc = m.group(0) try: if esc.startswith("\\u") or esc.startswith("\\U"): return chr(int(esc[2:], 16)) if esc.startswith("\\x"): return chr(int(esc[2:], 16)) except Exception: return esc return esc return _RE_UNICODE_ESC.sub(_dec, s) # --- Step 1: superscripts/subscripts -> / --- _SUPERSCRIPT_MAP: Dict[str, str] = { "\u2070": "0", "\u00b9": "1", "\u00b2": "2", "\u00b3": "3", "\u2074": "4", "\u2075": "5", "\u2076": "6", "\u2077": "7", "\u2078": "8", "\u2079": "9", "\u207a": "+", "\u207b": "-", "\u207c": "=", "\u207d": "(", "\u207e": ")", "\u207f": "n", "\u1da6": "i", } _SUBSCRIPT_MAP: Dict[str, str] = { "\u2080": "0", "\u2081": "1", "\u2082": "2", "\u2083": "3", "\u2084": "4", "\u2085": "5", "\u2086": "6", "\u2087": "7", "\u2088": "8", "\u2089": "9", "\u208a": "+", "\u208b": "-", "\u208c": "=", "\u208d": "(", "\u208e": ")", "\u2090": "a", "\u2091": "e", "\u2095": "h", "\u1d62": "i", "\u2c7c": "j", "\u2096": "k", "\u2097": "l", "\u2098": "m", "\u2099": "n", "\u2092": "o", "\u209a": "p", "\u1d63": "r", "\u209b": "s", "\u209c": "t", "\u1d64": "u", "\u1d65": "v", "\u2093": "x", } def _replace_super_sub(s: str) -> str: out = [] for ch in s: if ch in _SUPERSCRIPT_MAP: out.append(f"{_SUPERSCRIPT_MAP[ch]}") elif ch in _SUBSCRIPT_MAP: out.append(f"{_SUBSCRIPT_MAP[ch]}") else: out.append(ch) return "".join(out) # --- Step 2: symbol fallback for SimHei (protect tags, then replace) --- _SYMBOL_FALLBACK: Dict[str, str] = { # Currently empty - enable entries as needed for fonts missing specific glyphs } def _fallback_symbols(s: str) -> str: # Protect / tags from being modified placeholders: Dict[str, str] = {} def _protect_tag(m: re.Match) -> str: key = f"@@TAG{len(placeholders)}@@" placeholders[key] = m.group(0) return key protected = re.sub(r"|", _protect_tag, s) # Replace symbols protected = "".join(_SYMBOL_FALLBACK.get(ch, ch) for ch in protected) # Restore tags for k, v in placeholders.items(): protected = protected.replace(k, v) return protected def sanitize_code(text: str) -> str: """ Full sanitization pipeline for PDF generation code. - Restore unicode escapes/entities to real characters - Replace superscript/subscript unicode with / - Replace other risky symbols with ASCII/text fallbacks """ s = _restore_escapes(text) s = _replace_super_sub(s) s = _fallback_symbols(s) return s @cmd("palette.generate") def palette_generate(argv: list): """Generate a color palette via design_engine.py and output as Python-ready ReportLab code. Usage: pdf.py palette.generate [--title "document title"] [--mode minimal|dark|pastel|jewel|light] [--harmony auto|complementary|...] [--format python|json|css] If --title is provided, intent is auto-derived from the title. Output formats: python (default): Ready-to-paste ReportLab color variables json: Raw palette JSON css: CSS custom properties """ title = _pop_flag(argv, "--title", "-t") or "" mode = _pop_flag(argv, "--mode", "-m") or "minimal" harmony = _pop_flag(argv, "--harmony", "--harmony") or "auto" fmt = _pop_flag(argv, "--format", "-f") or "python" seed_str = _pop_flag(argv, "--seed", "--seed") seed = int(seed_str) if seed_str else None engine_script = _SCRIPT_DIR / "design_engine.py" if not engine_script.exists(): Output.error("DependencyMissing", f"design_engine.py not found at {engine_script}") # Import design_engine dynamically import importlib.util spec = importlib.util.spec_from_file_location("design_engine", str(engine_script)) de = importlib.util.module_from_spec(spec) spec.loader.exec_module(de) # Auto-derive intent from title if title: intent = de.derive_intent(title) else: intent = "neutral" palette = de.generate_color_palette(intent, mode, harmony=harmony, seed=seed) violations = de.audit_palette(palette) if fmt == "json": print(json.dumps(palette, indent=2, ensure_ascii=False)) elif fmt == "css": print(de.palette_to_css(palette)) else: # Python format: ready-to-paste ReportLab code print("# \u2501\u2501 Color Palette (auto-generated by pdf.py palette.generate) \u2501\u2501") print(f"# Intent: {intent} | Mode: {mode} | Harmony: {palette['meta']['harmony']}") print(f"# Contrast: text:bg={palette['meta']['contrast']['text_on_bg']} | accent:bg={palette['meta']['contrast']['accent_on_bg']}") print("from reportlab.lib import colors") print(f"ACCENT = colors.HexColor('{palette['accent']}')") print(f"TEXT_PRIMARY = colors.HexColor('{palette['text']}')") print(f"TEXT_MUTED = colors.HexColor('{palette['muted']}')") print(f"BG_SURFACE = colors.HexColor('{palette['mid']}')") print(f"BG_PAGE = colors.HexColor('{palette['bg']}')") print(f"SURFACE_RGBA = '{palette['surface']}' # For CSS/HTML elements") print("") print("# ReportLab table colors") print("TABLE_HEADER_COLOR = ACCENT") print("TABLE_HEADER_TEXT = colors.white") print("TABLE_ROW_EVEN = colors.white") print("TABLE_ROW_ODD = BG_SURFACE") if violations: print(f"\n# \u26a0\ufe0f Palette audit warnings:", file=sys.stderr) for v in violations: print(f"# - {v}", file=sys.stderr) raise SystemExit(0) @cmd("palette.cascade") def palette_cascade(argv: list): """Generate a role-based cascade palette (area ∝ 1/saturation). Usage: pdf.py palette.cascade [--title "document title"] [--mode minimal|dark|pastel|jewel|light] [--harmony auto|...] [--format summary|json|css|reportlab] The cascade palette produces 12 named roles + 4 semantic colors, each assigned to a size tier (XL/L/M/S/XS) with saturation caps enforced per tier. Cover, body, and chart colors all pull from the same palette — no color drift. """ title = _pop_flag(argv, "--title", "-t") or "" mode = _pop_flag(argv, "--mode", "-m") or "minimal" harmony = _pop_flag(argv, "--harmony", "--harmony") or "auto" fmt = _pop_flag(argv, "--format", "-f") or "summary" seed_str = _pop_flag(argv, "--seed", "--seed") seed = int(seed_str) if seed_str else None engine_script = _SCRIPT_DIR / "design_engine.py" if not engine_script.exists(): Output.error("DependencyMissing", f"design_engine.py not found at {engine_script}") import importlib.util spec = importlib.util.spec_from_file_location("design_engine", str(engine_script)) de = importlib.util.module_from_spec(spec) spec.loader.exec_module(de) if title: intent = de.derive_intent(title) else: intent = "neutral" cascade = de.generate_cascade_palette(intent, mode, harmony=harmony, seed=seed) if fmt == "json": print(json.dumps(cascade, indent=2, ensure_ascii=False, default=str)) elif fmt == "css": print(cascade["css"]) elif fmt == "reportlab": print(cascade["reportlab"]) else: # Summary format meta = cascade["meta"] print(f"🎨 Cascade Palette | Intent: {meta['intent']} | Mode: {meta['mode']} | Harmony: {meta['harmony']}") print(f" Base hue: {meta['base_hue']}° | Accent hue: {meta['accent_hue']}° | Secondary: {meta['secondary_hue']}°") print(f" Contrast: text:bg={meta['contrast']['text_on_bg']} | accent:bg={meta['contrast']['accent_on_bg']}") print() print(" TIER | ROLE | HEX | S | USAGE") print(" ────── | ────────────────── | ─────── | ────── | ────────────") for name, info in cascade["roles"].items(): tier = info['tier'].upper().ljust(6) nm = name.ljust(18) hx = info['hex'].ljust(7) s_val = f"{info['hsl'][1]:.3f}".ljust(6) print(f" {tier} | {nm} | {hx} | {s_val} | {info['usage']}") print() print(" Semantic:") for name, info in cascade["semantic"].items(): print(f" {name}: {info['hex']} (S={info['hsl'][1]:.3f})") if meta["audit"]: print(f"\n ⚠️ Violations:") for v in meta["audit"]: print(f" - {v}") else: print(f"\n ✅ All tier constraints pass") raise SystemExit(0) @cmd("code.sanitize") def code_sanitize(argv: list): """Sanitize Unicode in a Python script for PDF generation.""" if not argv: Output.error("MissingArg", "Usage: code.sanitize ") target = argv[0] with open(target, "r", encoding="utf-8") as f: code = f.read() sanitized = sanitize_code(code) with open(target, "w", encoding="utf-8") as f: f.write(sanitized) print(f"Sanitized: {target}") raise SystemExit(0) @cmd("content.sanitize") def content_sanitize_cli(argv: list): """Sanitize content text for PDF rendering (dry-run report). Reads a text file and reports what content_sanitize() would change. Useful for debugging before PDF generation. Usage: pdf.py content.sanitize pdf.py content.sanitize --apply """ if not argv: Output.error("MissingArg", "Usage: content.sanitize [--apply]") target = argv[0] apply_flag = "--apply" in argv with open(target, "r", encoding="utf-8") as f: raw = f.read() global _content_sanitize_warnings _content_sanitize_warnings = [] cleaned = content_sanitize(raw, dry_run=True) changes = len(_content_sanitize_warnings) result: Dict[str, Any] = { "file": target, "original_chars": len(raw), "cleaned_chars": len(cleaned), "changes": changes, "details": _content_sanitize_warnings[:50], } if apply_flag and changes > 0: with open(target, "w", encoding="utf-8") as f: f.write(cleaned) result["applied"] = True else: result["applied"] = False if changes > 0: result["hint"] = "Run with --apply to write cleaned text back to file." print(json.dumps(result, ensure_ascii=False, indent=2)) raise SystemExit(0) # ═══════════════════════════════════════════════════════════════ # Section 8: CLI dispatcher # ═══════════════════════════════════════════════════════════════ def _usage(): sys.stdout.write(__doc__.strip() + "\n") raise SystemExit(0) def main(): tokens = sys.argv[1:] if not tokens or tokens[0] in ("-h", "--help"): _usage() cmd_name = tokens.pop(0) # Direct match handler = _COMMANDS.get(cmd_name) if handler is not None: handler(tokens) return # Two-word match (e.g., "extract text" -> "extract.text") if tokens: compound = f"{cmd_name}.{tokens[0]}" handler = _COMMANDS.get(compound) if handler is not None: tokens.pop(0) handler(tokens) return # List commands in group group_cmds = [k for k in _COMMANDS if k.startswith(cmd_name + ".")] if group_cmds: print(f"Available commands in '{cmd_name}':") for c in sorted(group_cmds): print(f" {c}") raise SystemExit(0) print(f"Unknown command: {cmd_name}\n") _usage() if __name__ == "__main__": main()