8 Commits

7 changed files with 329 additions and 138 deletions

View File

@@ -1,5 +1,50 @@
# Changelog # Changelog
## v3.11.11 (2026-05-26)
**Antigravity Fix: Stricter function_call/output Pairing + Gemini Sanitizer Rewrite (PR #12)**
### Bug Fixes
- **Stricter function_call/output pairing**: Only includes pairs where BOTH call and output exist — no orphan calls sent to Gemini
- **Gemini sanitizer rewritten**: Tool messages (`functionCall`/`functionResponse`) are always preserved as-is, never merged or skipped
- **Text merging more conservative**: Checks last message for tool content before merging consecutive text messages
- **Final trimming safe**: Only removes plain `message` items, never `function_call_output` (which would break tool pairs)
- **Merge PR #12**: Fix by qwen-chat coder
## v3.11.10 (2026-05-26)
## v3.11.10 (2026-05-26)
**Antigravity Fix: Interleave function_call/output Pairs, Gemini Turn Trimming (PR #11)**
### Bug Fixes
- **Fix Antigravity function_call/output ordering**: Tool calls and their responses are now properly interleaved in sequence (`function_call``function_call_output``function_call` → ...) instead of being grouped separately
- **Gemini sanitizer trimming**: Leading/trailing non-user turns removed for Google API compliance (Google requires conversation to start and end with user turn)
- **Stricter role boundary enforcement**: `functionCall` (model) and `functionResponse` (user) never merged across role boundaries
- **Merge PR #11**: Fix by qwen-chat coder
## v3.11.9 (2026-05-26)
## v3.11.9 (2026-05-26)
**Antigravity Fix: Preserve functionCall/functionResponse in Gemini Sanitizer (PR #10)**
### Bug Fixes
- **Fix Antigravity multi-turn tool use**: The Gemini message sanitizer was incorrectly merging/dropping `functionCall` and `functionResponse` turns, causing Antigravity to think forever without responding. These turns are now always preserved as separate messages.
- **Merge PR #10**: `fix: preserve functionCall/functionResponse in Gemini sanitizer` (qwen-chat coder)
## v3.11.8 (2026-05-26)
## v3.11.8 (2026-05-26)
**Vision Cache Persistence, PR #8 Merge**
### New Features
- **Vision description cache persisted across requests**: Image descriptions from the vision fallback API are now cached in a file (`~/.cache/codex-proxy/vision-cache.json`) so the same image URL is never described twice — saves API calls and latency
- **Merge PR #8**: `fix: persist vision description cache across requests` (cobra91)
## v3.11.7 (2026-05-26) ## v3.11.7 (2026-05-26)
**Vision Auto-Detect, Proactive Non-Vision Model Detection, Unit Tests, Bug Fixes** **Vision Auto-Detect, Proactive Non-Vision Model Detection, Unit Tests, Bug Fixes**

Binary file not shown.

Binary file not shown.

View File

@@ -27,6 +27,22 @@ model_catalog_json = ""
""" """
CHANGELOG = [ CHANGELOG = [
("3.11.11", "2026-05-26", [
"Fix Antigravity: stricter function_call/output pairing (PR #12)",
"Gemini sanitizer rewritten — tool messages always preserved",
]),
("3.11.10", "2026-05-26", [
"Fix Antigravity: interleave function_call/output pairs (PR #11)",
"Gemini sanitizer: trim non-user turns for Google API compliance",
]),
("3.11.9", "2026-05-26", [
"Fix Antigravity: preserve functionCall/functionResponse (PR #10)",
"Prevents tool responses from being dropped in multi-turn sessions",
]),
("3.11.8", "2026-05-26", [
"Vision cache persisted across requests (PR #8 merge)",
"No redundant vision API calls for same image URL",
]),
("3.11.7", "2026-05-26", [ ("3.11.7", "2026-05-26", [
"Vision auto-detect: uses provider's vision model for image description", "Vision auto-detect: uses provider's vision model for image description",
"Vision preprocessing replaces image stripping", "Vision preprocessing replaces image stripping",

View File

@@ -83,6 +83,25 @@ model_catalog_json = ""
""" """
CHANGELOG = [ CHANGELOG = [
("3.11.11", "2026-05-26", [
"Fix Antigravity: stricter function_call/output pairing + Gemini sanitizer rewrite (PR #12)",
"Only pairs where BOTH call and output exist are included — no orphan calls",
"Gemini sanitizer: tool messages always preserved as-is, text merging more conservative",
"Final trimming only removes plain messages, never function_call_output",
]),
("3.11.10", "2026-05-26", [
"Fix Antigravity: interleave function_call/output pairs in correct sequence (PR #11)",
"Fix Gemini sanitizer: trim leading/trailing non-user turns for Google API compliance",
"Stricter function call/response isolation — no merging across role boundaries",
]),
("3.11.9", "2026-05-26", [
"Fix Antigravity: preserve functionCall/functionResponse in Gemini sanitizer (PR #10)",
"Prevents tool responses from being merged/dropped in multi-turn Antigravity sessions",
]),
("3.11.8", "2026-05-26", [
"Vision description cache persisted across requests (no redundant API calls for same image)",
"Merge PR #8: fix vision cache persistence across requests",
]),
("3.11.7", "2026-05-26", [ ("3.11.7", "2026-05-26", [
"Vision auto-detect: uses provider's own vision model (e.g. 0G-Qwen-VL) as fallback for image description", "Vision auto-detect: uses provider's own vision model (e.g. 0G-Qwen-VL) as fallback for image description",
"Vision preprocessing replaces image stripping: images described via API instead of just removed", "Vision preprocessing replaces image stripping: images described via API instead of just removed",

View File

@@ -868,6 +868,10 @@ def _auto_detect_vision_fallback(target_url, api_key, models):
chat_url = base + "/v1/chat/completions" chat_url = base + "/v1/chat/completions"
vision_model = "" vision_model = ""
for m in (models or []): for m in (models or []):
if isinstance(m, dict):
m = m.get("name", m.get("id", str(m)))
if not isinstance(m, str):
continue
ml = m.lower() ml = m.lower()
if any(kw in ml for kw in _VISION_MODEL_KEYWORDS): if any(kw in ml for kw in _VISION_MODEL_KEYWORDS):
vision_model = m vision_model = m
@@ -2346,7 +2350,7 @@ def _normalize_tool_args(raw_args):
except json.JSONDecodeError: except json.JSONDecodeError:
return raw_args return raw_args
_XML_TC_RE = re.compile(r'exec_command(.*?)</invoke>', re.DOTALL) _XML_TC_RE = re.compile(r'<invoke><(\w+)(?:_command)?>(.*?)</\1(?:_command)?></invoke>', re.DOTALL)
_XML_ARG_VALUE_RE = re.compile(r'</?arg_value>\s*') _XML_ARG_VALUE_RE = re.compile(r'</?arg_value>\s*')
_PAREN_TC_RE = re.compile( _PAREN_TC_RE = re.compile(
@@ -2403,116 +2407,43 @@ def _mark_vision_fail(model):
with _vision_fail_lock: with _vision_fail_lock:
_vision_fail_cache.add(model) _vision_fail_cache.add(model)
def _vision_describe_image(img_data, cache): def _strip_images_from_input(input_data, model):
"""Call vision fallback API to describe a single image.""" if not isinstance(input_data, list) or _model_supports_vision(model):
if not VISION_FALLBACK_URL:
return None
if isinstance(img_data, dict):
img_url = img_data.get("url", "")
if not img_url:
inner = img_data.get("image_url", img_data)
img_url = inner.get("url", "") if isinstance(inner, dict) else str(inner)
else:
img_url = str(img_data)
if not img_url:
return None
img_hash = hashlib.md5(img_url.encode("utf-8", errors="replace")).hexdigest()
if img_hash in cache:
return cache[img_hash]
try:
payload = json.dumps({
"model": VISION_FALLBACK_MODEL,
"messages": [{"role": "user", "content": [
{"type": "text", "text": "Describe the content of this image in detail. If it contains text, transcribe it fully."},
{"type": "image_url", "image_url": {"url": img_url}},
]}],
"max_tokens": 1024,
"stream": False,
}).encode()
headers = {"Content-Type": "application/json"}
if VISION_FALLBACK_KEY:
headers["Authorization"] = f"Bearer {VISION_FALLBACK_KEY}"
req = urllib.request.Request(VISION_FALLBACK_URL, data=payload, headers=headers)
resp = urllib.request.urlopen(req, timeout=30)
body = json.loads(resp.read().decode())
choices = body.get("choices", [])
if choices:
msg = choices[0].get("message", {})
desc = msg.get("content", "")
if desc:
cache[img_hash] = desc
return desc
except Exception as e:
print(f"[vision-fallback] error describing image: {e}", file=sys.stderr)
return None
def _preprocess_vision(messages, schema):
"""Replace image blocks with text descriptions when provider lacks vision support."""
if schema.supports_vision:
return messages
cache = {}
for msg in messages:
content = msg.get("content")
if not isinstance(content, list):
continue
new_parts = []
changed = False
for part in content:
if isinstance(part, dict) and part.get("type") in ("image_url", "input_image"):
changed = True
img_data = part.get("image_url", part)
description = _vision_describe_image(img_data, cache)
if description:
new_parts.append({"type": "text", "text": f"[Image: {description}]"})
else:
new_parts.append({"type": "text", "text": "[Image: description unavailable - text-only model]"})
else:
new_parts.append(part)
if changed:
msg["content"] = new_parts
return messages
def _preprocess_vision_input(input_data, schema):
"""Replace input_image blocks in Responses API input format with text descriptions."""
if schema.supports_vision:
return input_data return input_data
if not isinstance(input_data, list): modified = False
return input_data result = []
cache = {}
changed_any = False
for item in input_data: for item in input_data:
if item.get("type") != "message": if item.get("type") != "message":
result.append(item)
continue continue
content = item.get("content") content = item.get("content", [])
if not isinstance(content, list): if isinstance(content, str):
result.append(item)
continue continue
new_parts = [] new_content = []
changed = False has_img = False
for part in content: for part in content:
if isinstance(part, dict) and part.get("type") in ("input_image", "image_url"): if isinstance(part, str):
changed = True new_content.append(part)
img_url = "" continue
iu = part.get("image_url") pt = part.get("type", "")
if isinstance(iu, dict): if pt in ("input_image", "image_url"):
img_url = iu.get("url", "") if not has_img:
elif isinstance(iu, str): fname = part.get("image_url", {}).get("url", part.get("url", "image.png"))
img_url = iu if fname.startswith("data:"):
elif part.get("type") == "input_image": fname = "screenshot.png"
img_url = part.get("url", "") new_content.append({"type": "output_text", "text": f"[User attached image: {fname} — this model does not support vision]"})
else: has_img = True
img_url = part.get("url", "") modified = True
desc = _vision_describe_image({"url": img_url}, cache)
if desc:
new_parts.append({"type": "input_text", "text": f"[Image: {desc}]"})
else:
new_parts.append({"type": "input_text", "text": "[Image: description unavailable - text-only model]"})
else: else:
new_parts.append(part) new_content.append(part)
if changed: if modified:
item["content"] = new_parts result.append({**item, "content": new_content})
changed_any = True else:
result.append(item)
if modified:
print(f"[vision-filter] stripped {sum(1 for i in input_data if i.get('type')=='message' and any(c.get('type') in ('input_image','image_url') for c in (i.get('content') or []) if isinstance(c,dict)))} images for model={model}", file=sys.stderr)
return result
return input_data return input_data
def oa_input_to_messages(input_data): def oa_input_to_messages(input_data):
@@ -4581,6 +4512,148 @@ def _extract_text(content):
return "".join(parts) return "".join(parts)
# Persistent cache: image hash → description (survives across requests)
_vision_desc_cache = collections.OrderedDict()
_vision_desc_lock = threading.Lock()
_VISION_DESC_CACHE_MAX = 256
def _vision_describe_image(img_data):
"""Call vision fallback API to describe a single image.
Uses a module-level LRU cache so descriptions survive across requests.
A single image in a multi-turn conversation is only described once.
Returns:
description string or None on failure
"""
global _vision_desc_cache
if not VISION_FALLBACK_URL:
return None
# Normalize image URL from various formats
if isinstance(img_data, dict):
img_url = img_data.get("url", "")
if not img_url:
inner = img_data.get("image_url", img_data)
img_url = inner.get("url", "") if isinstance(inner, dict) else str(inner)
else:
img_url = str(img_data)
if not img_url:
return None
img_hash = hashlib.md5(img_url.encode("utf-8", errors="replace")).hexdigest()
# Check persistent cache first (no API call needed)
with _vision_desc_lock:
if img_hash in _vision_desc_cache:
return _vision_desc_cache[img_hash]
try:
payload = json.dumps({
"model": VISION_FALLBACK_MODEL,
"messages": [{"role": "user", "content": [
{"type": "text", "text": "Describe the content of this image in detail. If it contains text, transcribe it fully."},
{"type": "image_url", "image_url": {"url": img_url}},
]}],
"max_tokens": 1024,
"stream": False,
}).encode()
headers = {"Content-Type": "application/json"}
if VISION_FALLBACK_KEY:
headers["Authorization"] = f"Bearer {VISION_FALLBACK_KEY}"
req = urllib.request.Request(VISION_FALLBACK_URL, data=payload, headers=headers)
resp = urllib.request.urlopen(req, timeout=30)
body = json.loads(resp.read().decode())
choices = body.get("choices", [])
if choices:
msg = choices[0].get("message", {})
desc = msg.get("content", "")
if desc:
with _vision_desc_lock:
_vision_desc_cache[img_hash] = desc
if len(_vision_desc_cache) > _VISION_DESC_CACHE_MAX:
_vision_desc_cache.popitem(last=False)
return desc
except Exception as e:
print(f"[vision-fallback] error describing image: {e}", file=sys.stderr)
return None
def _preprocess_vision(messages, schema):
"""Replace image blocks with text descriptions when provider lacks vision support.
Works on OpenAI Chat Completions message format (post-conversion).
"""
if schema.supports_vision:
return messages
for msg in messages:
content = msg.get("content")
if not isinstance(content, list):
continue
new_parts = []
changed = False
for part in content:
if isinstance(part, dict) and part.get("type") in ("image_url", "input_image"):
changed = True
img_data = part.get("image_url", part)
description = _vision_describe_image(img_data)
if description:
new_parts.append({"type": "text", "text": f"[Image: {description}]"})
else:
new_parts.append({"type": "text", "text": "[Image: description non disponible - modele text-only]"})
else:
new_parts.append(part)
if changed:
msg["content"] = new_parts
return messages
def _preprocess_vision_input(input_data, schema):
"""Replace input_image blocks in Responses API input format with text descriptions.
This runs BEFORE adapter.convert() so images are replaced before any
conversion function can silently drop them.
"""
if schema.supports_vision:
return input_data
if not isinstance(input_data, list):
return input_data
changed_any = False
for item in input_data:
if item.get("type") != "message":
continue
content = item.get("content")
if not isinstance(content, list):
continue
new_parts = []
changed = False
for part in content:
if isinstance(part, dict) and part.get("type") == "input_image":
changed = True
changed_any = True
img_data = part.get("image_url", part)
description = _vision_describe_image(img_data)
if description:
new_parts.append({"type": "input_text", "text": f"[Image: {description}]"})
else:
new_parts.append({"type": "input_text", "text": "[Image: description non disponible - modele text-only]"})
else:
new_parts.append(part)
if changed:
item["content"] = new_parts
return input_data
# ═══════════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════════
@@ -4768,6 +4841,14 @@ def _antigravity_is_simple_user(text):
return False return False
def _antigravity_normalize_context(input_data, model=""): def _antigravity_normalize_context(input_data, model=""):
"""
Normalize context for Antigravity while PRESERVING function_call -> function_call_output pairs.
Google's Gemini API requires STRICT alternation:
- functionCall (role=model) MUST be immediately followed by functionResponse (role=user)
This function compacts old history but NEVER breaks tool call/response pairs.
"""
if not isinstance(input_data, list) or len(input_data) < 2: if not isinstance(input_data, list) or len(input_data) < 2:
return input_data return input_data
is_claude_model = "claude" in model.lower() is_claude_model = "claude" in model.lower()
@@ -4816,7 +4897,7 @@ def _antigravity_normalize_context(input_data, model=""):
dev_messages = [] dev_messages = []
recent_items = [] recent_items = []
tool_outputs = [] tool_outputs = []
other_items = [] tool_calls = []
for i, item in enumerate(input_data): for i, item in enumerate(input_data):
if not isinstance(item, dict): if not isinstance(item, dict):
@@ -4826,8 +4907,8 @@ def _antigravity_normalize_context(input_data, model=""):
dev_messages.append(item) dev_messages.append(item)
elif t == "function_call_output": elif t == "function_call_output":
tool_outputs.append((i, item)) tool_outputs.append((i, item))
elif t in ("function_call",): elif t == "function_call":
other_items.append((i, item)) tool_calls.append((i, item))
elif t == "message": elif t == "message":
recent_items.append((i, item)) recent_items.append((i, item))
@@ -4873,18 +4954,14 @@ def _antigravity_normalize_context(input_data, model=""):
deduped_tail.append((idx, msg_item)) deduped_tail.append((idx, msg_item))
recent_tail = deduped_tail if deduped_tail else recent_tail recent_tail = deduped_tail if deduped_tail else recent_tail
tool_call_ids = set() # Build call_id -> function_call mapping
for _, t_item in kept_tools: tool_call_map = {}
cid = t_item.get("call_id", t_item.get("id", "")) for _, call_item in tool_calls:
cid = call_item.get("call_id", call_item.get("id", ""))
if cid: if cid:
tool_call_ids.add(cid) tool_call_map[cid] = call_item
paired_calls = []
for idx, item in other_items:
cid = item.get("call_id", item.get("id", ""))
if cid in tool_call_ids:
paired_calls.append((idx, item))
# Build result: maintain PAIRED sequence (function_call -> function_call_output)
result = list(dev_messages) result = list(dev_messages)
compaction_summaries = [] compaction_summaries = []
@@ -4900,11 +4977,22 @@ def _antigravity_normalize_context(input_data, model=""):
summary_text = f"[Tool history summary: {n_summarized} older tool outputs omitted. {n_tool_calls} prior function calls were made for file inspection/editing.]" summary_text = f"[Tool history summary: {n_summarized} older tool outputs omitted. {n_tool_calls} prior function calls were made for file inspection/editing.]"
result.append({"type": "message", "role": "user", "content": [{"type": "input_text", "text": summary_text}]}) result.append({"type": "message", "role": "user", "content": [{"type": "input_text", "text": summary_text}]})
for _, call_item in paired_calls: # CRITICAL: Add tool CALLS and their corresponding OUTPUTS in PAIRED ORDER
result.append(call_item) # Only include pairs where BOTH call and output are present
added_pairs = set()
for _, tool_item in kept_tools: for _, tool_item in kept_tools:
result.append(tool_item) cid = tool_item.get("call_id", tool_item.get("id", ""))
if cid and cid in tool_call_map and cid not in added_pairs:
# Add function_call FIRST, then function_call_output IMMEDIATELY
result.append(tool_call_map[cid])
result.append(tool_item)
added_pairs.add(cid)
# Add any orphan tool outputs (no matching call found) - these go at the end before messages
for _, tool_item in kept_tools:
cid = tool_item.get("call_id", tool_item.get("id", ""))
if cid not in added_pairs:
result.append(tool_item)
for cs_item in compaction_summaries: for cs_item in compaction_summaries:
result.append(cs_item) result.append(cs_item)
@@ -4944,7 +5032,7 @@ def _antigravity_normalize_context(input_data, model=""):
while len(result) > _ANTIGRAVITY_MAX_CONTENTS and total_chars > _ANTIGRAVITY_SOFT_CHARS: while len(result) > _ANTIGRAVITY_MAX_CONTENTS and total_chars > _ANTIGRAVITY_SOFT_CHARS:
for i in range(1, len(result) - 1): for i in range(1, len(result) - 1):
if isinstance(result[i], dict) and result[i].get("type") in ("message", "function_call_output"): if isinstance(result[i], dict) and result[i].get("type") in ("message",):
removed = result.pop(i) removed = result.pop(i)
total_chars -= len(json.dumps(removed, ensure_ascii=False)) total_chars -= len(json.dumps(removed, ensure_ascii=False))
break break
@@ -5480,6 +5568,9 @@ class Handler(http.server.BaseHTTPRequestHandler):
resp_part["functionResponse"]["id"] = call_id resp_part["functionResponse"]["id"] = call_id
contents.append({"role": "user", "parts": [resp_part]}) contents.append({"role": "user", "parts": [resp_part]})
# CRITICAL FIX: Sanitize contents while PRESERVING functionCall -> functionResponse alternation.
# Google's Gemini API REQUIRES: functionCall (role=model) must be immediately followed by functionResponse (role=user).
# We NEVER merge, skip, or reorder tool-related messages.
if OAUTH_PROVIDER.startswith("google") and "claude" not in model.lower(): if OAUTH_PROVIDER.startswith("google") and "claude" not in model.lower():
sanitized = [] sanitized = []
last_user_text = None last_user_text = None
@@ -5489,18 +5580,40 @@ class Handler(http.server.BaseHTTPRequestHandler):
parts = [p for p in content.get("parts", []) if isinstance(p, dict)] parts = [p for p in content.get("parts", []) if isinstance(p, dict)]
if not parts: if not parts:
continue continue
# Check if this content has functionCall or functionResponse - these MUST be preserved as-is
has_function_call = any("functionCall" in p for p in parts)
has_function_response = any("functionResponse" in p for p in parts)
text_key = "\n".join([p.get("text", "") for p in parts if "text" in p]).strip() text_key = "\n".join([p.get("text", "") for p in parts if "text" in p]).strip()
# Tool calls/responses are NEVER merged or skipped - they must maintain strict order
if has_function_call or has_function_response:
sanitized.append({"role": role, "parts": parts})
continue
# For plain text messages only: skip duplicate consecutive user text
if role == "user" and text_key and text_key == last_user_text: if role == "user" and text_key and text_key == last_user_text:
continue continue
# Merge consecutive same-role TEXT-ONLY messages (no tool content)
if role == last_role and role in ("user", "model") and sanitized: if role == last_role and role in ("user", "model") and sanitized:
sanitized[-1].setdefault("parts", []).extend(parts) last_parts = sanitized[-1].get("parts", [])
else: # Only merge if the last message is also text-only (no functionCall/functionResponse)
sanitized.append({"role": role, "parts": parts}) last_has_tool = any("functionCall" in p or "functionResponse" in p for p in last_parts)
if not last_has_tool:
sanitized[-1].setdefault("parts", []).extend(parts)
if role == "user" and text_key:
last_user_text = text_key
continue
sanitized.append({"role": role, "parts": parts})
if role == "user" and text_key: if role == "user" and text_key:
last_user_text = text_key last_user_text = text_key
last_role = role last_role = role
# Trim leading non-user messages (Google expects conversation to start with user)
while sanitized and sanitized[0].get("role") != "user": while sanitized and sanitized[0].get("role") != "user":
sanitized.pop(0) sanitized.pop(0)
# Trim trailing non-user messages (must end with user turn for continuation)
while sanitized and sanitized[-1].get("role") != "user": while sanitized and sanitized[-1].get("role") != "user":
sanitized.pop() sanitized.pop()
contents = sanitized contents = sanitized

View File

@@ -6,6 +6,7 @@ Uses only stdlib unittest + unittest.mock (zero pip dependencies).
""" """
import json import json
import os
import sys import sys
import time import time
import unittest import unittest
@@ -19,7 +20,7 @@ import importlib
_spec = importlib.util.spec_from_file_location( _spec = importlib.util.spec_from_file_location(
"translate_proxy", "translate_proxy",
r"C:\dev\Codex-Launcher---Any-AI-Porovider\src\translate-proxy.py", os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "src", "translate-proxy.py"),
) )
tp = importlib.util.module_from_spec(_spec) tp = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(tp) _spec.loader.exec_module(tp)
@@ -121,36 +122,33 @@ class TestExtractXmlToolCalls(unittest.TestCase):
self.assertEqual(tp._extract_xml_tool_calls("just plain text"), []) self.assertEqual(tp._extract_xml_tool_calls("just plain text"), [])
def test_single_tool_call(self): def test_single_tool_call(self):
# Regex: <tool_call>(\w+)(.*?)</tool_call> text = '<invoke><exec_command>echo hi</exec_command></invoke>'
# Format: <tool_call>NAME>CONTENT</tool_call>
text = '<tool_call>bash>echo hi</tool_call>'
results = tp._extract_xml_tool_calls(text) results = tp._extract_xml_tool_calls(text)
self.assertEqual(len(results), 1) self.assertEqual(len(results), 1)
self.assertEqual(results[0]["name"], "bash") self.assertEqual(results[0]["name"], "exec_command")
self.assertIn("call_id", results[0]) self.assertIn("call_id", results[0])
self.assertTrue(results[0]["call_id"].startswith("xml_")) self.assertTrue(results[0]["call_id"].startswith("xml_"))
def test_multiple_tool_calls(self): def test_multiple_tool_calls(self):
text = ( text = (
'<tool_call>bash>echo hi</tool_call>' '<invoke><exec_command>echo hi</exec_command></invoke>'
'<tool_call>edit>test.py</tool_call>' '<invoke><exec_command>test.py</exec_command></invoke>'
) )
results = tp._extract_xml_tool_calls(text) results = tp._extract_xml_tool_calls(text)
self.assertEqual(len(results), 2) self.assertEqual(len(results), 2)
self.assertEqual(results[0]["name"], "bash") self.assertEqual(results[0]["name"], "exec_command")
self.assertEqual(results[1]["name"], "edit") self.assertEqual(results[1]["name"], "exec_command")
def test_json_args(self): def test_json_args(self):
text = '<tool_call>tool>{"key": "value"}</tool_call>' text = '<invoke><exec_command>{"key": "value"}</exec_command></invoke>'
results = tp._extract_xml_tool_calls(text) results = tp._extract_xml_tool_calls(text)
self.assertEqual(len(results), 1) self.assertEqual(len(results), 1)
self.assertEqual(results[0]["name"], "tool") self.assertEqual(results[0]["name"], "exec_command")
args = json.loads(results[0]["args"]) args = json.loads(results[0]["args"])
# JSON parsing of XML content may vary - just check result exists
self.assertIn("args", results[0]) self.assertIn("args", results[0])
def test_code_fenced_args(self): def test_code_fenced_args(self):
text = '<tool_call>tool>{"a": 1}</tool_call>' text = '<invoke><exec_command>{"a": 1}</exec_command></invoke>'
results = tp._extract_xml_tool_calls(text) results = tp._extract_xml_tool_calls(text)
self.assertEqual(len(results), 1) self.assertEqual(len(results), 1)