feat: Add complete Agentic Compaction & Pipeline System
- Context Compaction System with token counting and summarization - Deterministic State Machine for flow control (no LLM decisions) - Parallel Execution Engine (up to 12 concurrent sessions) - Event-Driven Coordination via Event Bus - Agent Workspace Isolation (tools, memory, identity, files) - YAML Workflow Integration (OpenClaw/Lobster compatible) - Claude Code integration layer - Complete demo UI with real-time visualization - Comprehensive documentation and README Components: - agent-system/: Context management, token counting, subagent spawning - pipeline-system/: State machine, parallel executor, event bus, workflows - skills/: AI capabilities (LLM, ASR, TTS, VLM, image generation, etc.) - src/app/: Next.js demo application Total: ~100KB of production-ready TypeScript code
This commit is contained in:
172
skills/pdf/scripts/add_zai_metadata.py
Executable file
172
skills/pdf/scripts/add_zai_metadata.py
Executable file
@@ -0,0 +1,172 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Add Z.ai branding metadata to PDF documents.
|
||||
|
||||
This script adds Z.ai metadata (Author, Creator, Producer) to PDF files.
|
||||
It can process single files or batch process multiple PDFs.
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
from pypdf import PdfReader, PdfWriter
|
||||
|
||||
|
||||
def add_zai_metadata(input_pdf_path, output_pdf_path=None, custom_title=None, verbose=True):
|
||||
"""
|
||||
Add Z.ai branding metadata to a PDF document.
|
||||
|
||||
Args:
|
||||
input_pdf_path: Path to input PDF
|
||||
output_pdf_path: Path to output PDF (default: overwrites input)
|
||||
custom_title: Custom title to use (default: preserves original or uses filename)
|
||||
verbose: Print status messages (default: True)
|
||||
|
||||
Sets:
|
||||
- Author: Z.ai
|
||||
- Creator: Z.ai
|
||||
- Producer: http://z.ai
|
||||
- Title: Custom title, original title, or filename (in that priority)
|
||||
|
||||
Returns:
|
||||
Path to the output PDF file
|
||||
"""
|
||||
# Validate input file exists
|
||||
if not os.path.exists(input_pdf_path):
|
||||
print(f"Error: Input file not found: {input_pdf_path}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Read the PDF
|
||||
try:
|
||||
reader = PdfReader(input_pdf_path)
|
||||
except Exception as e:
|
||||
print(f"Error: Cannot open PDF: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
writer = PdfWriter()
|
||||
|
||||
# Copy all pages
|
||||
for page in reader.pages:
|
||||
writer.add_page(page)
|
||||
|
||||
# Determine title
|
||||
if custom_title:
|
||||
title = custom_title
|
||||
else:
|
||||
original_meta = reader.metadata
|
||||
if original_meta and original_meta.title and original_meta.title not in ['(anonymous)', 'unspecified', None]:
|
||||
title = original_meta.title
|
||||
else:
|
||||
# Use filename without extension as title
|
||||
title = os.path.splitext(os.path.basename(input_pdf_path))[0]
|
||||
|
||||
# Add Z.ai metadata
|
||||
writer.add_metadata({
|
||||
'/Title': title,
|
||||
'/Author': 'Z.ai',
|
||||
'/Creator': 'Z.ai',
|
||||
'/Producer': 'http://z.ai',
|
||||
})
|
||||
|
||||
# Write output
|
||||
if output_pdf_path is None:
|
||||
output_pdf_path = input_pdf_path
|
||||
|
||||
try:
|
||||
with open(output_pdf_path, "wb") as output:
|
||||
writer.write(output)
|
||||
except Exception as e:
|
||||
print(f"Error: Cannot write output file: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Print status
|
||||
if verbose:
|
||||
print(f"✓ Updated metadata for: {os.path.basename(input_pdf_path)}")
|
||||
print(f" Title: {title}")
|
||||
print(f" Author: Z.ai")
|
||||
print(f" Creator: Z.ai")
|
||||
print(f" Producer: http://z.ai")
|
||||
if output_pdf_path != input_pdf_path:
|
||||
print(f" Output: {output_pdf_path}")
|
||||
|
||||
return output_pdf_path
|
||||
|
||||
|
||||
def main():
|
||||
"""Command-line interface for add_zai_metadata."""
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Add Z.ai branding metadata to PDF documents',
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog="""
|
||||
Examples:
|
||||
# Add metadata to a single PDF (in-place)
|
||||
%(prog)s document.pdf
|
||||
|
||||
# Add metadata to a single PDF (create new file)
|
||||
%(prog)s input.pdf -o output.pdf
|
||||
|
||||
# Add metadata with custom title
|
||||
%(prog)s report.pdf -t "Q4 Financial Analysis"
|
||||
|
||||
# Batch process all PDFs in current directory
|
||||
%(prog)s *.pdf
|
||||
|
||||
# Quiet mode (no output)
|
||||
%(prog)s document.pdf -q
|
||||
"""
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'input',
|
||||
nargs='+',
|
||||
help='Input PDF file(s) to process'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-o', '--output',
|
||||
help='Output PDF path (only for single input file)'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-t', '--title',
|
||||
help='Custom title for the PDF'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-q', '--quiet',
|
||||
action='store_true',
|
||||
help='Quiet mode (no status messages)'
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Check if output is specified for multiple files
|
||||
if args.output and len(args.input) > 1:
|
||||
print("Error: --output can only be used with a single input file", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Process each input file
|
||||
for input_path in args.input:
|
||||
# Determine output path
|
||||
if len(args.input) == 1 and args.output:
|
||||
output_path = args.output
|
||||
else:
|
||||
output_path = None # Overwrite in-place
|
||||
|
||||
# Determine title
|
||||
if args.title:
|
||||
custom_title = args.title
|
||||
else:
|
||||
custom_title = None
|
||||
|
||||
# Add metadata
|
||||
add_zai_metadata(
|
||||
input_path,
|
||||
output_pdf_path=output_path,
|
||||
custom_title=custom_title,
|
||||
verbose=not args.quiet
|
||||
)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
70
skills/pdf/scripts/check_bounding_boxes.py
Executable file
70
skills/pdf/scripts/check_bounding_boxes.py
Executable file
@@ -0,0 +1,70 @@
|
||||
from dataclasses import dataclass
|
||||
import json
|
||||
import sys
|
||||
|
||||
|
||||
# Script to check that the `fields.json` file that GLM creates when analyzing PDFs
|
||||
# does not have overlapping bounding boxes. See forms.md.
|
||||
|
||||
|
||||
@dataclass
|
||||
class RectAndField:
|
||||
rect: list[float]
|
||||
rect_type: str
|
||||
field: dict
|
||||
|
||||
|
||||
# Returns a list of messages that are printed to stdout for GLM to read.
|
||||
def get_bounding_box_messages(fields_json_stream) -> list[str]:
|
||||
messages = []
|
||||
fields = json.load(fields_json_stream)
|
||||
messages.append(f"Read {len(fields['form_fields'])} fields")
|
||||
|
||||
def rects_intersect(r1, r2):
|
||||
disjoint_horizontal = r1[0] >= r2[2] or r1[2] <= r2[0]
|
||||
disjoint_vertical = r1[1] >= r2[3] or r1[3] <= r2[1]
|
||||
return not (disjoint_horizontal or disjoint_vertical)
|
||||
|
||||
rects_and_fields = []
|
||||
for f in fields["form_fields"]:
|
||||
rects_and_fields.append(RectAndField(f["label_bounding_box"], "label", f))
|
||||
rects_and_fields.append(RectAndField(f["entry_bounding_box"], "entry", f))
|
||||
|
||||
has_error = False
|
||||
for i, ri in enumerate(rects_and_fields):
|
||||
# This is O(N^2); we can optimize if it becomes a problem.
|
||||
for j in range(i + 1, len(rects_and_fields)):
|
||||
rj = rects_and_fields[j]
|
||||
if ri.field["page_number"] == rj.field["page_number"] and rects_intersect(ri.rect, rj.rect):
|
||||
has_error = True
|
||||
if ri.field is rj.field:
|
||||
messages.append(f"FAILURE: intersection between label and entry bounding boxes for `{ri.field['description']}` ({ri.rect}, {rj.rect})")
|
||||
else:
|
||||
messages.append(f"FAILURE: intersection between {ri.rect_type} bounding box for `{ri.field['description']}` ({ri.rect}) and {rj.rect_type} bounding box for `{rj.field['description']}` ({rj.rect})")
|
||||
if len(messages) >= 20:
|
||||
messages.append("Aborting further checks; fix bounding boxes and try again")
|
||||
return messages
|
||||
if ri.rect_type == "entry":
|
||||
if "entry_text" in ri.field:
|
||||
font_size = ri.field["entry_text"].get("font_size", 14)
|
||||
entry_height = ri.rect[3] - ri.rect[1]
|
||||
if entry_height < font_size:
|
||||
has_error = True
|
||||
messages.append(f"FAILURE: entry bounding box height ({entry_height}) for `{ri.field['description']}` is too short for the text content (font size: {font_size}). Increase the box height or decrease the font size.")
|
||||
if len(messages) >= 20:
|
||||
messages.append("Aborting further checks; fix bounding boxes and try again")
|
||||
return messages
|
||||
|
||||
if not has_error:
|
||||
messages.append("SUCCESS: All bounding boxes are valid")
|
||||
return messages
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) != 2:
|
||||
print("Usage: check_bounding_boxes.py [fields.json]")
|
||||
sys.exit(1)
|
||||
# Input file should be in the `fields.json` format described in forms.md.
|
||||
with open(sys.argv[1]) as f:
|
||||
messages = get_bounding_box_messages(f)
|
||||
for msg in messages:
|
||||
print(msg)
|
||||
226
skills/pdf/scripts/check_bounding_boxes_test.py
Executable file
226
skills/pdf/scripts/check_bounding_boxes_test.py
Executable file
@@ -0,0 +1,226 @@
|
||||
import unittest
|
||||
import json
|
||||
import io
|
||||
from check_bounding_boxes import get_bounding_box_messages
|
||||
|
||||
|
||||
# Currently this is not run automatically in CI; it's just for documentation and manual checking.
|
||||
class TestGetBoundingBoxMessages(unittest.TestCase):
|
||||
|
||||
def create_json_stream(self, data):
|
||||
"""Helper to create a JSON stream from data"""
|
||||
return io.StringIO(json.dumps(data))
|
||||
|
||||
def test_no_intersections(self):
|
||||
"""Test case with no bounding box intersections"""
|
||||
data = {
|
||||
"form_fields": [
|
||||
{
|
||||
"description": "Name",
|
||||
"page_number": 1,
|
||||
"label_bounding_box": [10, 10, 50, 30],
|
||||
"entry_bounding_box": [60, 10, 150, 30]
|
||||
},
|
||||
{
|
||||
"description": "Email",
|
||||
"page_number": 1,
|
||||
"label_bounding_box": [10, 40, 50, 60],
|
||||
"entry_bounding_box": [60, 40, 150, 60]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
stream = self.create_json_stream(data)
|
||||
messages = get_bounding_box_messages(stream)
|
||||
self.assertTrue(any("SUCCESS" in msg for msg in messages))
|
||||
self.assertFalse(any("FAILURE" in msg for msg in messages))
|
||||
|
||||
def test_label_entry_intersection_same_field(self):
|
||||
"""Test intersection between label and entry of the same field"""
|
||||
data = {
|
||||
"form_fields": [
|
||||
{
|
||||
"description": "Name",
|
||||
"page_number": 1,
|
||||
"label_bounding_box": [10, 10, 60, 30],
|
||||
"entry_bounding_box": [50, 10, 150, 30] # Overlaps with label
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
stream = self.create_json_stream(data)
|
||||
messages = get_bounding_box_messages(stream)
|
||||
self.assertTrue(any("FAILURE" in msg and "intersection" in msg for msg in messages))
|
||||
self.assertFalse(any("SUCCESS" in msg for msg in messages))
|
||||
|
||||
def test_intersection_between_different_fields(self):
|
||||
"""Test intersection between bounding boxes of different fields"""
|
||||
data = {
|
||||
"form_fields": [
|
||||
{
|
||||
"description": "Name",
|
||||
"page_number": 1,
|
||||
"label_bounding_box": [10, 10, 50, 30],
|
||||
"entry_bounding_box": [60, 10, 150, 30]
|
||||
},
|
||||
{
|
||||
"description": "Email",
|
||||
"page_number": 1,
|
||||
"label_bounding_box": [40, 20, 80, 40], # Overlaps with Name's boxes
|
||||
"entry_bounding_box": [160, 10, 250, 30]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
stream = self.create_json_stream(data)
|
||||
messages = get_bounding_box_messages(stream)
|
||||
self.assertTrue(any("FAILURE" in msg and "intersection" in msg for msg in messages))
|
||||
self.assertFalse(any("SUCCESS" in msg for msg in messages))
|
||||
|
||||
def test_different_pages_no_intersection(self):
|
||||
"""Test that boxes on different pages don't count as intersecting"""
|
||||
data = {
|
||||
"form_fields": [
|
||||
{
|
||||
"description": "Name",
|
||||
"page_number": 1,
|
||||
"label_bounding_box": [10, 10, 50, 30],
|
||||
"entry_bounding_box": [60, 10, 150, 30]
|
||||
},
|
||||
{
|
||||
"description": "Email",
|
||||
"page_number": 2,
|
||||
"label_bounding_box": [10, 10, 50, 30], # Same coordinates but different page
|
||||
"entry_bounding_box": [60, 10, 150, 30]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
stream = self.create_json_stream(data)
|
||||
messages = get_bounding_box_messages(stream)
|
||||
self.assertTrue(any("SUCCESS" in msg for msg in messages))
|
||||
self.assertFalse(any("FAILURE" in msg for msg in messages))
|
||||
|
||||
def test_entry_height_too_small(self):
|
||||
"""Test that entry box height is checked against font size"""
|
||||
data = {
|
||||
"form_fields": [
|
||||
{
|
||||
"description": "Name",
|
||||
"page_number": 1,
|
||||
"label_bounding_box": [10, 10, 50, 30],
|
||||
"entry_bounding_box": [60, 10, 150, 20], # Height is 10
|
||||
"entry_text": {
|
||||
"font_size": 14 # Font size larger than height
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
stream = self.create_json_stream(data)
|
||||
messages = get_bounding_box_messages(stream)
|
||||
self.assertTrue(any("FAILURE" in msg and "height" in msg for msg in messages))
|
||||
self.assertFalse(any("SUCCESS" in msg for msg in messages))
|
||||
|
||||
def test_entry_height_adequate(self):
|
||||
"""Test that adequate entry box height passes"""
|
||||
data = {
|
||||
"form_fields": [
|
||||
{
|
||||
"description": "Name",
|
||||
"page_number": 1,
|
||||
"label_bounding_box": [10, 10, 50, 30],
|
||||
"entry_bounding_box": [60, 10, 150, 30], # Height is 20
|
||||
"entry_text": {
|
||||
"font_size": 14 # Font size smaller than height
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
stream = self.create_json_stream(data)
|
||||
messages = get_bounding_box_messages(stream)
|
||||
self.assertTrue(any("SUCCESS" in msg for msg in messages))
|
||||
self.assertFalse(any("FAILURE" in msg for msg in messages))
|
||||
|
||||
def test_default_font_size(self):
|
||||
"""Test that default font size is used when not specified"""
|
||||
data = {
|
||||
"form_fields": [
|
||||
{
|
||||
"description": "Name",
|
||||
"page_number": 1,
|
||||
"label_bounding_box": [10, 10, 50, 30],
|
||||
"entry_bounding_box": [60, 10, 150, 20], # Height is 10
|
||||
"entry_text": {} # No font_size specified, should use default 14
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
stream = self.create_json_stream(data)
|
||||
messages = get_bounding_box_messages(stream)
|
||||
self.assertTrue(any("FAILURE" in msg and "height" in msg for msg in messages))
|
||||
self.assertFalse(any("SUCCESS" in msg for msg in messages))
|
||||
|
||||
def test_no_entry_text(self):
|
||||
"""Test that missing entry_text doesn't cause height check"""
|
||||
data = {
|
||||
"form_fields": [
|
||||
{
|
||||
"description": "Name",
|
||||
"page_number": 1,
|
||||
"label_bounding_box": [10, 10, 50, 30],
|
||||
"entry_bounding_box": [60, 10, 150, 20] # Small height but no entry_text
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
stream = self.create_json_stream(data)
|
||||
messages = get_bounding_box_messages(stream)
|
||||
self.assertTrue(any("SUCCESS" in msg for msg in messages))
|
||||
self.assertFalse(any("FAILURE" in msg for msg in messages))
|
||||
|
||||
def test_multiple_errors_limit(self):
|
||||
"""Test that error messages are limited to prevent excessive output"""
|
||||
fields = []
|
||||
# Create many overlapping fields
|
||||
for i in range(25):
|
||||
fields.append({
|
||||
"description": f"Field{i}",
|
||||
"page_number": 1,
|
||||
"label_bounding_box": [10, 10, 50, 30], # All overlap
|
||||
"entry_bounding_box": [20, 15, 60, 35] # All overlap
|
||||
})
|
||||
|
||||
data = {"form_fields": fields}
|
||||
|
||||
stream = self.create_json_stream(data)
|
||||
messages = get_bounding_box_messages(stream)
|
||||
# Should abort after ~20 messages
|
||||
self.assertTrue(any("Aborting" in msg for msg in messages))
|
||||
# Should have some FAILURE messages but not hundreds
|
||||
failure_count = sum(1 for msg in messages if "FAILURE" in msg)
|
||||
self.assertGreater(failure_count, 0)
|
||||
self.assertLess(len(messages), 30) # Should be limited
|
||||
|
||||
def test_edge_touching_boxes(self):
|
||||
"""Test that boxes touching at edges don't count as intersecting"""
|
||||
data = {
|
||||
"form_fields": [
|
||||
{
|
||||
"description": "Name",
|
||||
"page_number": 1,
|
||||
"label_bounding_box": [10, 10, 50, 30],
|
||||
"entry_bounding_box": [50, 10, 150, 30] # Touches at x=50
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
stream = self.create_json_stream(data)
|
||||
messages = get_bounding_box_messages(stream)
|
||||
self.assertTrue(any("SUCCESS" in msg for msg in messages))
|
||||
self.assertFalse(any("FAILURE" in msg for msg in messages))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
12
skills/pdf/scripts/check_fillable_fields.py
Executable file
12
skills/pdf/scripts/check_fillable_fields.py
Executable file
@@ -0,0 +1,12 @@
|
||||
import sys
|
||||
from pypdf import PdfReader
|
||||
|
||||
|
||||
# Script for GLM to run to determine whether a PDF has fillable form fields. See forms.md.
|
||||
|
||||
|
||||
reader = PdfReader(sys.argv[1])
|
||||
if (reader.get_fields()):
|
||||
print("This PDF has fillable form fields")
|
||||
else:
|
||||
print("This PDF does not have fillable form fields; you will need to visually determine where to enter data")
|
||||
35
skills/pdf/scripts/convert_pdf_to_images.py
Executable file
35
skills/pdf/scripts/convert_pdf_to_images.py
Executable file
@@ -0,0 +1,35 @@
|
||||
import os
|
||||
import sys
|
||||
|
||||
from pdf2image import convert_from_path
|
||||
|
||||
|
||||
# Converts each page of a PDF to a PNG image.
|
||||
|
||||
|
||||
def convert(pdf_path, output_dir, max_dim=1000):
|
||||
images = convert_from_path(pdf_path, dpi=200)
|
||||
|
||||
for i, image in enumerate(images):
|
||||
# Scale image if needed to keep width/height under `max_dim`
|
||||
width, height = image.size
|
||||
if width > max_dim or height > max_dim:
|
||||
scale_factor = min(max_dim / width, max_dim / height)
|
||||
new_width = int(width * scale_factor)
|
||||
new_height = int(height * scale_factor)
|
||||
image = image.resize((new_width, new_height))
|
||||
|
||||
image_path = os.path.join(output_dir, f"page_{i+1}.png")
|
||||
image.save(image_path)
|
||||
print(f"Saved page {i+1} as {image_path} (size: {image.size})")
|
||||
|
||||
print(f"Converted {len(images)} pages to PNG images")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) != 3:
|
||||
print("Usage: convert_pdf_to_images.py [input pdf] [output directory]")
|
||||
sys.exit(1)
|
||||
pdf_path = sys.argv[1]
|
||||
output_directory = sys.argv[2]
|
||||
convert(pdf_path, output_directory)
|
||||
41
skills/pdf/scripts/create_validation_image.py
Executable file
41
skills/pdf/scripts/create_validation_image.py
Executable file
@@ -0,0 +1,41 @@
|
||||
import json
|
||||
import sys
|
||||
|
||||
from PIL import Image, ImageDraw
|
||||
|
||||
|
||||
# Creates "validation" images with rectangles for the bounding box information that
|
||||
# GLM creates when determining where to add text annotations in PDFs. See forms.md.
|
||||
|
||||
|
||||
def create_validation_image(page_number, fields_json_path, input_path, output_path):
|
||||
# Input file should be in the `fields.json` format described in forms.md.
|
||||
with open(fields_json_path, 'r') as f:
|
||||
data = json.load(f)
|
||||
|
||||
img = Image.open(input_path)
|
||||
draw = ImageDraw.Draw(img)
|
||||
num_boxes = 0
|
||||
|
||||
for field in data["form_fields"]:
|
||||
if field["page_number"] == page_number:
|
||||
entry_box = field['entry_bounding_box']
|
||||
label_box = field['label_bounding_box']
|
||||
# Draw red rectangle over entry bounding box and blue rectangle over the label.
|
||||
draw.rectangle(entry_box, outline='red', width=2)
|
||||
draw.rectangle(label_box, outline='blue', width=2)
|
||||
num_boxes += 2
|
||||
|
||||
img.save(output_path)
|
||||
print(f"Created validation image at {output_path} with {num_boxes} bounding boxes")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) != 5:
|
||||
print("Usage: create_validation_image.py [page number] [fields.json file] [input image path] [output image path]")
|
||||
sys.exit(1)
|
||||
page_number = int(sys.argv[1])
|
||||
fields_json_path = sys.argv[2]
|
||||
input_image_path = sys.argv[3]
|
||||
output_image_path = sys.argv[4]
|
||||
create_validation_image(page_number, fields_json_path, input_image_path, output_image_path)
|
||||
152
skills/pdf/scripts/extract_form_field_info.py
Executable file
152
skills/pdf/scripts/extract_form_field_info.py
Executable file
@@ -0,0 +1,152 @@
|
||||
import json
|
||||
import sys
|
||||
|
||||
from pypdf import PdfReader
|
||||
|
||||
|
||||
# Extracts data for the fillable form fields in a PDF and outputs JSON that
|
||||
# GLM uses to fill the fields. See forms.md.
|
||||
|
||||
|
||||
# This matches the format used by PdfReader `get_fields` and `update_page_form_field_values` methods.
|
||||
def get_full_annotation_field_id(annotation):
|
||||
components = []
|
||||
while annotation:
|
||||
field_name = annotation.get('/T')
|
||||
if field_name:
|
||||
components.append(field_name)
|
||||
annotation = annotation.get('/Parent')
|
||||
return ".".join(reversed(components)) if components else None
|
||||
|
||||
|
||||
def make_field_dict(field, field_id):
|
||||
field_dict = {"field_id": field_id}
|
||||
ft = field.get('/FT')
|
||||
if ft == "/Tx":
|
||||
field_dict["type"] = "text"
|
||||
elif ft == "/Btn":
|
||||
field_dict["type"] = "checkbox" # radio groups handled separately
|
||||
states = field.get("/_States_", [])
|
||||
if len(states) == 2:
|
||||
# "/Off" seems to always be the unchecked value, as suggested by
|
||||
# https://opensource.adobe.com/dc-acrobat-sdk-docs/standards/pdfstandards/pdf/PDF32000_2008.pdf#page=448
|
||||
# It can be either first or second in the "/_States_" list.
|
||||
if "/Off" in states:
|
||||
field_dict["checked_value"] = states[0] if states[0] != "/Off" else states[1]
|
||||
field_dict["unchecked_value"] = "/Off"
|
||||
else:
|
||||
print(f"Unexpected state values for checkbox `${field_id}`. Its checked and unchecked values may not be correct; if you're trying to check it, visually verify the results.")
|
||||
field_dict["checked_value"] = states[0]
|
||||
field_dict["unchecked_value"] = states[1]
|
||||
elif ft == "/Ch":
|
||||
field_dict["type"] = "choice"
|
||||
states = field.get("/_States_", [])
|
||||
field_dict["choice_options"] = [{
|
||||
"value": state[0],
|
||||
"text": state[1],
|
||||
} for state in states]
|
||||
else:
|
||||
field_dict["type"] = f"unknown ({ft})"
|
||||
return field_dict
|
||||
|
||||
|
||||
# Returns a list of fillable PDF fields:
|
||||
# [
|
||||
# {
|
||||
# "field_id": "name",
|
||||
# "page": 1,
|
||||
# "type": ("text", "checkbox", "radio_group", or "choice")
|
||||
# // Per-type additional fields described in forms.md
|
||||
# },
|
||||
# ]
|
||||
def get_field_info(reader: PdfReader):
|
||||
fields = reader.get_fields()
|
||||
|
||||
field_info_by_id = {}
|
||||
possible_radio_names = set()
|
||||
|
||||
for field_id, field in fields.items():
|
||||
# Skip if this is a container field with children, except that it might be
|
||||
# a parent group for radio button options.
|
||||
if field.get("/Kids"):
|
||||
if field.get("/FT") == "/Btn":
|
||||
possible_radio_names.add(field_id)
|
||||
continue
|
||||
field_info_by_id[field_id] = make_field_dict(field, field_id)
|
||||
|
||||
# Bounding rects are stored in annotations in page objects.
|
||||
|
||||
# Radio button options have a separate annotation for each choice;
|
||||
# all choices have the same field name.
|
||||
# See https://westhealth.github.io/exploring-fillable-forms-with-pdfrw.html
|
||||
radio_fields_by_id = {}
|
||||
|
||||
for page_index, page in enumerate(reader.pages):
|
||||
annotations = page.get('/Annots', [])
|
||||
for ann in annotations:
|
||||
field_id = get_full_annotation_field_id(ann)
|
||||
if field_id in field_info_by_id:
|
||||
field_info_by_id[field_id]["page"] = page_index + 1
|
||||
field_info_by_id[field_id]["rect"] = ann.get('/Rect')
|
||||
elif field_id in possible_radio_names:
|
||||
try:
|
||||
# ann['/AP']['/N'] should have two items. One of them is '/Off',
|
||||
# the other is the active value.
|
||||
on_values = [v for v in ann["/AP"]["/N"] if v != "/Off"]
|
||||
except KeyError:
|
||||
continue
|
||||
if len(on_values) == 1:
|
||||
rect = ann.get("/Rect")
|
||||
if field_id not in radio_fields_by_id:
|
||||
radio_fields_by_id[field_id] = {
|
||||
"field_id": field_id,
|
||||
"type": "radio_group",
|
||||
"page": page_index + 1,
|
||||
"radio_options": [],
|
||||
}
|
||||
# Note: at least on macOS 15.7, Preview.app doesn't show selected
|
||||
# radio buttons correctly. (It does if you remove the leading slash
|
||||
# from the value, but that causes them not to appear correctly in
|
||||
# Chrome/Firefox/Acrobat/etc).
|
||||
radio_fields_by_id[field_id]["radio_options"].append({
|
||||
"value": on_values[0],
|
||||
"rect": rect,
|
||||
})
|
||||
|
||||
# Some PDFs have form field definitions without corresponding annotations,
|
||||
# so we can't tell where they are. Ignore these fields for now.
|
||||
fields_with_location = []
|
||||
for field_info in field_info_by_id.values():
|
||||
if "page" in field_info:
|
||||
fields_with_location.append(field_info)
|
||||
else:
|
||||
print(f"Unable to determine location for field id: {field_info.get('field_id')}, ignoring")
|
||||
|
||||
# Sort by page number, then Y position (flipped in PDF coordinate system), then X.
|
||||
def sort_key(f):
|
||||
if "radio_options" in f:
|
||||
rect = f["radio_options"][0]["rect"] or [0, 0, 0, 0]
|
||||
else:
|
||||
rect = f.get("rect") or [0, 0, 0, 0]
|
||||
adjusted_position = [-rect[1], rect[0]]
|
||||
return [f.get("page"), adjusted_position]
|
||||
|
||||
sorted_fields = fields_with_location + list(radio_fields_by_id.values())
|
||||
sorted_fields.sort(key=sort_key)
|
||||
|
||||
return sorted_fields
|
||||
|
||||
|
||||
def write_field_info(pdf_path: str, json_output_path: str):
|
||||
reader = PdfReader(pdf_path)
|
||||
field_info = get_field_info(reader)
|
||||
with open(json_output_path, "w") as f:
|
||||
json.dump(field_info, f, indent=2)
|
||||
print(f"Wrote {len(field_info)} fields to {json_output_path}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) != 3:
|
||||
print("Usage: extract_form_field_info.py [input pdf] [output json]")
|
||||
sys.exit(1)
|
||||
write_field_info(sys.argv[1], sys.argv[2])
|
||||
114
skills/pdf/scripts/fill_fillable_fields.py
Executable file
114
skills/pdf/scripts/fill_fillable_fields.py
Executable file
@@ -0,0 +1,114 @@
|
||||
import json
|
||||
import sys
|
||||
|
||||
from pypdf import PdfReader, PdfWriter
|
||||
|
||||
from extract_form_field_info import get_field_info
|
||||
|
||||
|
||||
# Fills fillable form fields in a PDF. See forms.md.
|
||||
|
||||
|
||||
def fill_pdf_fields(input_pdf_path: str, fields_json_path: str, output_pdf_path: str):
|
||||
with open(fields_json_path) as f:
|
||||
fields = json.load(f)
|
||||
# Group by page number.
|
||||
fields_by_page = {}
|
||||
for field in fields:
|
||||
if "value" in field:
|
||||
field_id = field["field_id"]
|
||||
page = field["page"]
|
||||
if page not in fields_by_page:
|
||||
fields_by_page[page] = {}
|
||||
fields_by_page[page][field_id] = field["value"]
|
||||
|
||||
reader = PdfReader(input_pdf_path)
|
||||
|
||||
has_error = False
|
||||
field_info = get_field_info(reader)
|
||||
fields_by_ids = {f["field_id"]: f for f in field_info}
|
||||
for field in fields:
|
||||
existing_field = fields_by_ids.get(field["field_id"])
|
||||
if not existing_field:
|
||||
has_error = True
|
||||
print(f"ERROR: `{field['field_id']}` is not a valid field ID")
|
||||
elif field["page"] != existing_field["page"]:
|
||||
has_error = True
|
||||
print(f"ERROR: Incorrect page number for `{field['field_id']}` (got {field['page']}, expected {existing_field['page']})")
|
||||
else:
|
||||
if "value" in field:
|
||||
err = validation_error_for_field_value(existing_field, field["value"])
|
||||
if err:
|
||||
print(err)
|
||||
has_error = True
|
||||
if has_error:
|
||||
sys.exit(1)
|
||||
|
||||
writer = PdfWriter(clone_from=reader)
|
||||
for page, field_values in fields_by_page.items():
|
||||
writer.update_page_form_field_values(writer.pages[page - 1], field_values, auto_regenerate=False)
|
||||
|
||||
# This seems to be necessary for many PDF viewers to format the form values correctly.
|
||||
# It may cause the viewer to show a "save changes" dialog even if the user doesn't make any changes.
|
||||
writer.set_need_appearances_writer(True)
|
||||
|
||||
with open(output_pdf_path, "wb") as f:
|
||||
writer.write(f)
|
||||
|
||||
|
||||
def validation_error_for_field_value(field_info, field_value):
|
||||
field_type = field_info["type"]
|
||||
field_id = field_info["field_id"]
|
||||
if field_type == "checkbox":
|
||||
checked_val = field_info["checked_value"]
|
||||
unchecked_val = field_info["unchecked_value"]
|
||||
if field_value != checked_val and field_value != unchecked_val:
|
||||
return f'ERROR: Invalid value "{field_value}" for checkbox field "{field_id}". The checked value is "{checked_val}" and the unchecked value is "{unchecked_val}"'
|
||||
elif field_type == "radio_group":
|
||||
option_values = [opt["value"] for opt in field_info["radio_options"]]
|
||||
if field_value not in option_values:
|
||||
return f'ERROR: Invalid value "{field_value}" for radio group field "{field_id}". Valid values are: {option_values}'
|
||||
elif field_type == "choice":
|
||||
choice_values = [opt["value"] for opt in field_info["choice_options"]]
|
||||
if field_value not in choice_values:
|
||||
return f'ERROR: Invalid value "{field_value}" for choice field "{field_id}". Valid values are: {choice_values}'
|
||||
return None
|
||||
|
||||
|
||||
# pypdf (at least version 5.7.0) has a bug when setting the value for a selection list field.
|
||||
# In _writer.py around line 966:
|
||||
#
|
||||
# if field.get(FA.FT, "/Tx") == "/Ch" and field_flags & FA.FfBits.Combo == 0:
|
||||
# txt = "\n".join(annotation.get_inherited(FA.Opt, []))
|
||||
#
|
||||
# The problem is that for selection lists, `get_inherited` returns a list of two-element lists like
|
||||
# [["value1", "Text 1"], ["value2", "Text 2"], ...]
|
||||
# This causes `join` to throw a TypeError because it expects an iterable of strings.
|
||||
# The horrible workaround is to patch `get_inherited` to return a list of the value strings.
|
||||
# We call the original method and adjust the return value only if the argument to `get_inherited`
|
||||
# is `FA.Opt` and if the return value is a list of two-element lists.
|
||||
def monkeypatch_pydpf_method():
|
||||
from pypdf.generic import DictionaryObject
|
||||
from pypdf.constants import FieldDictionaryAttributes
|
||||
|
||||
original_get_inherited = DictionaryObject.get_inherited
|
||||
|
||||
def patched_get_inherited(self, key: str, default = None):
|
||||
result = original_get_inherited(self, key, default)
|
||||
if key == FieldDictionaryAttributes.Opt:
|
||||
if isinstance(result, list) and all(isinstance(v, list) and len(v) == 2 for v in result):
|
||||
result = [r[0] for r in result]
|
||||
return result
|
||||
|
||||
DictionaryObject.get_inherited = patched_get_inherited
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) != 4:
|
||||
print("Usage: fill_fillable_fields.py [input pdf] [field_values.json] [output pdf]")
|
||||
sys.exit(1)
|
||||
monkeypatch_pydpf_method()
|
||||
input_pdf = sys.argv[1]
|
||||
fields_json = sys.argv[2]
|
||||
output_pdf = sys.argv[3]
|
||||
fill_pdf_fields(input_pdf, fields_json, output_pdf)
|
||||
108
skills/pdf/scripts/fill_pdf_form_with_annotations.py
Executable file
108
skills/pdf/scripts/fill_pdf_form_with_annotations.py
Executable file
@@ -0,0 +1,108 @@
|
||||
import json
|
||||
import sys
|
||||
|
||||
from pypdf import PdfReader, PdfWriter
|
||||
from pypdf.annotations import FreeText
|
||||
|
||||
|
||||
# Fills a PDF by adding text annotations defined in `fields.json`. See forms.md.
|
||||
|
||||
|
||||
def transform_coordinates(bbox, image_width, image_height, pdf_width, pdf_height):
|
||||
"""Transform bounding box from image coordinates to PDF coordinates"""
|
||||
# Image coordinates: origin at top-left, y increases downward
|
||||
# PDF coordinates: origin at bottom-left, y increases upward
|
||||
x_scale = pdf_width / image_width
|
||||
y_scale = pdf_height / image_height
|
||||
|
||||
left = bbox[0] * x_scale
|
||||
right = bbox[2] * x_scale
|
||||
|
||||
# Flip Y coordinates for PDF
|
||||
top = pdf_height - (bbox[1] * y_scale)
|
||||
bottom = pdf_height - (bbox[3] * y_scale)
|
||||
|
||||
return left, bottom, right, top
|
||||
|
||||
|
||||
def fill_pdf_form(input_pdf_path, fields_json_path, output_pdf_path):
|
||||
"""Fill the PDF form with data from fields.json"""
|
||||
|
||||
# `fields.json` format described in forms.md.
|
||||
with open(fields_json_path, "r") as f:
|
||||
fields_data = json.load(f)
|
||||
|
||||
# Open the PDF
|
||||
reader = PdfReader(input_pdf_path)
|
||||
writer = PdfWriter()
|
||||
|
||||
# Copy all pages to writer
|
||||
writer.append(reader)
|
||||
|
||||
# Get PDF dimensions for each page
|
||||
pdf_dimensions = {}
|
||||
for i, page in enumerate(reader.pages):
|
||||
mediabox = page.mediabox
|
||||
pdf_dimensions[i + 1] = [mediabox.width, mediabox.height]
|
||||
|
||||
# Process each form field
|
||||
annotations = []
|
||||
for field in fields_data["form_fields"]:
|
||||
page_num = field["page_number"]
|
||||
|
||||
# Get page dimensions and transform coordinates.
|
||||
page_info = next(p for p in fields_data["pages"] if p["page_number"] == page_num)
|
||||
image_width = page_info["image_width"]
|
||||
image_height = page_info["image_height"]
|
||||
pdf_width, pdf_height = pdf_dimensions[page_num]
|
||||
|
||||
transformed_entry_box = transform_coordinates(
|
||||
field["entry_bounding_box"],
|
||||
image_width, image_height,
|
||||
pdf_width, pdf_height
|
||||
)
|
||||
|
||||
# Skip empty fields
|
||||
if "entry_text" not in field or "text" not in field["entry_text"]:
|
||||
continue
|
||||
entry_text = field["entry_text"]
|
||||
text = entry_text["text"]
|
||||
if not text:
|
||||
continue
|
||||
|
||||
font_name = entry_text.get("font", "Arial")
|
||||
font_size = str(entry_text.get("font_size", 14)) + "pt"
|
||||
font_color = entry_text.get("font_color", "000000")
|
||||
|
||||
# Font size/color seems to not work reliably across viewers:
|
||||
# https://github.com/py-pdf/pypdf/issues/2084
|
||||
annotation = FreeText(
|
||||
text=text,
|
||||
rect=transformed_entry_box,
|
||||
font=font_name,
|
||||
font_size=font_size,
|
||||
font_color=font_color,
|
||||
border_color=None,
|
||||
background_color=None,
|
||||
)
|
||||
annotations.append(annotation)
|
||||
# page_number is 0-based for pypdf
|
||||
writer.add_annotation(page_number=page_num - 1, annotation=annotation)
|
||||
|
||||
# Save the filled PDF
|
||||
with open(output_pdf_path, "wb") as output:
|
||||
writer.write(output)
|
||||
|
||||
print(f"Successfully filled PDF form and saved to {output_pdf_path}")
|
||||
print(f"Added {len(annotations)} text annotations")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) != 4:
|
||||
print("Usage: fill_pdf_form_with_annotations.py [input pdf] [fields.json] [output pdf]")
|
||||
sys.exit(1)
|
||||
input_pdf = sys.argv[1]
|
||||
fields_json = sys.argv[2]
|
||||
output_pdf = sys.argv[3]
|
||||
|
||||
fill_pdf_form(input_pdf, fields_json, output_pdf)
|
||||
110
skills/pdf/scripts/sanitize_code.py
Executable file
110
skills/pdf/scripts/sanitize_code.py
Executable file
@@ -0,0 +1,110 @@
|
||||
import re
|
||||
import html
|
||||
import sys
|
||||
from typing import Dict
|
||||
|
||||
# ---------- Step 0: restore literal unicode escapes/entities to real chars ----------
|
||||
_RE_UNICODE_ESC = re.compile(r"(\\u[0-9a-fA-F]{4})|(\\U[0-9a-fA-F]{8})|(\\x[0-9a-fA-F]{2})")
|
||||
|
||||
def _restore_escapes(s: str) -> str:
|
||||
# HTML entities: ³ ≤ α ...
|
||||
s = html.unescape(s)
|
||||
|
||||
# Literal backslash escapes: "\\u00B3" -> "³"
|
||||
def _dec(m: re.Match) -> str:
|
||||
esc = m.group(0)
|
||||
try:
|
||||
if esc.startswith("\\u") or esc.startswith("\\U"):
|
||||
return chr(int(esc[2:], 16))
|
||||
if esc.startswith("\\x"):
|
||||
return chr(int(esc[2:], 16))
|
||||
except Exception:
|
||||
return esc
|
||||
return esc
|
||||
|
||||
return _RE_UNICODE_ESC.sub(_dec, s)
|
||||
|
||||
# ---------- Step 1: superscripts/subscripts -> <super>/<sub> ----------
|
||||
_SUPERSCRIPT_MAP: Dict[str, str] = {
|
||||
"⁰": "0", "¹": "1", "²": "2", "³": "3", "⁴": "4",
|
||||
"⁵": "5", "⁶": "6", "⁷": "7", "⁸": "8", "⁹": "9",
|
||||
"⁺": "+", "⁻": "-", "⁼": "=", "⁽": "(", "⁾": ")",
|
||||
"ⁿ": "n", "ᶦ": "i",
|
||||
}
|
||||
|
||||
_SUBSCRIPT_MAP: Dict[str, str] = {
|
||||
"₀": "0", "₁": "1", "₂": "2", "₃": "3", "₄": "4",
|
||||
"₅": "5", "₆": "6", "₇": "7", "₈": "8", "₉": "9",
|
||||
"₊": "+", "₋": "-", "₌": "=", "₍": "(", "₎": ")",
|
||||
"ₐ": "a", "ₑ": "e", "ₕ": "h", "ᵢ": "i", "ⱼ": "j",
|
||||
"ₖ": "k", "ₗ": "l", "ₘ": "m", "ₙ": "n", "ₒ": "o",
|
||||
"ₚ": "p", "ᵣ": "r", "ₛ": "s", "ₜ": "t", "ᵤ": "u",
|
||||
"ᵥ": "v", "ₓ": "x",
|
||||
}
|
||||
|
||||
def _replace_super_sub(s: str) -> str:
|
||||
out = []
|
||||
for ch in s:
|
||||
if ch in _SUPERSCRIPT_MAP:
|
||||
out.append(f"<super>{_SUPERSCRIPT_MAP[ch]}</super>")
|
||||
elif ch in _SUBSCRIPT_MAP:
|
||||
out.append(f"<sub>{_SUBSCRIPT_MAP[ch]}</sub>")
|
||||
else:
|
||||
out.append(ch)
|
||||
return "".join(out)
|
||||
|
||||
# ---------- Step 2: symbol fallback for SimHei (protect tags, then replace) ----------
|
||||
_SYMBOL_FALLBACK: Dict[str, str] = {
|
||||
# Currently empty - enable entries as needed for fonts missing specific glyphs
|
||||
# "±": "+/-",
|
||||
# "×": "*",
|
||||
# "÷": "/",
|
||||
# "≤": "<=",
|
||||
# "≥": ">=",
|
||||
# "≠": "!=",
|
||||
# "≈": "~=",
|
||||
# "∞": "inf",
|
||||
}
|
||||
|
||||
def _fallback_symbols(s: str) -> str:
|
||||
# Protect <super>/<sub> tags from being modified
|
||||
placeholders = {}
|
||||
def _protect_tag(m: re.Match) -> str:
|
||||
key = f"@@TAG{len(placeholders)}@@"
|
||||
placeholders[key] = m.group(0)
|
||||
return key
|
||||
|
||||
protected = re.sub(r"</?super>|</?sub>", _protect_tag, s)
|
||||
|
||||
# Replace symbols
|
||||
protected = "".join(_SYMBOL_FALLBACK.get(ch, ch) for ch in protected)
|
||||
|
||||
# Restore tags
|
||||
for k, v in placeholders.items():
|
||||
protected = protected.replace(k, v)
|
||||
|
||||
return protected
|
||||
|
||||
def sanitize_code(text: str) -> str:
|
||||
"""
|
||||
Full sanitization pipeline for PDF generation code.
|
||||
- Restore unicode escapes/entities to real characters
|
||||
- Replace superscript/subscript unicode with <super>/<sub>
|
||||
- Replace other risky symbols with ASCII/text fallbacks
|
||||
"""
|
||||
s = _restore_escapes(text)
|
||||
s = _replace_super_sub(s)
|
||||
s = _fallback_symbols(s)
|
||||
return s
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage: python sanitize_code.py <target_script.py>")
|
||||
sys.exit(1)
|
||||
target = sys.argv[1]
|
||||
with open(target, "r", encoding="utf-8") as f:
|
||||
code = f.read()
|
||||
sanitized = sanitize_code(code)
|
||||
with open(target, "w", encoding="utf-8") as f:
|
||||
f.write(sanitized)
|
||||
print(f"Sanitized: {target}")
|
||||
Reference in New Issue
Block a user