diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3a366a9..a8385b0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,23 @@
# Changelog
+## v3.11.7 (2026-05-26)
+
+**Vision Auto-Detect, Proactive Non-Vision Model Detection, Unit Tests, Bug Fixes**
+
+### New Features
+
+- **Vision auto-detect fallback**: When no explicit vision fallback is configured, automatically uses the current provider's own vision model (e.g., `0G-Qwen-VL` for OpenAdapter) as the image description API — no separate API key needed
+- **Proactive non-vision model detection**: Models matching name patterns (`glm`, `deepseek`, `llama`, `qwen` without `vl`, etc.) are detected as non-vision on first request without waiting for an error from the provider
+- **Vision preprocessing is now the primary image handling solution**: Replaces old `_strip_images_from_input()` (which just removed images with a placeholder). Images are now described via API and sent as rich text descriptions to text-only models
+- **Merge PR #6**: Vision/OCR preprocessing for text-only models (cobra91)
+- **Merge PR #7**: 177 unit tests for translate-proxy.py (cobra91)
+
+### Bug Fixes
+
+- **AttributeError fix**: `image_url` field can be a string (bare URL) not always a dict — fixed in both `_preprocess_vision_input()` and old strip function
+- **Auth os error 2 fix**: GUI shows "Config missing" message instead of raw OSError when `~/.codex/` directory doesn't exist
+- **Removed duplicate vision functions**: Cleaned up duplicate `_vision_describe_image()`, `_preprocess_vision()`, `_preprocess_vision_input()` from merge
+
## v3.11.6 (2026-05-26)
**Antigravity Loop Breakers, Vision/OCR Preprocessing, has_content Fix, Auth Error Fix**
diff --git a/codex-launcher_3.11.6_all.deb b/codex-launcher_3.11.6_all.deb
deleted file mode 100644
index 2e943d8..0000000
Binary files a/codex-launcher_3.11.6_all.deb and /dev/null differ
diff --git a/codex-launcher_3.11.7_all.deb b/codex-launcher_3.11.7_all.deb
new file mode 100644
index 0000000..8bf923e
Binary files /dev/null and b/codex-launcher_3.11.7_all.deb differ
diff --git a/src/codex-launcher-gui b/src/codex-launcher-gui
index 4a623b9..fa8dbef 100755
--- a/src/codex-launcher-gui
+++ b/src/codex-launcher-gui
@@ -27,6 +27,13 @@ model_catalog_json = ""
"""
CHANGELOG = [
+ ("3.11.7", "2026-05-26", [
+ "Vision auto-detect: uses provider's vision model for image description",
+ "Vision preprocessing replaces image stripping",
+ "Fix AttributeError in image_url string handling",
+ "Merge PR #6: vision/OCR preprocessing, PR #7: 177 unit tests",
+ "Auth os error 2 fix: proper config-missing message in GUI",
+ ]),
("3.11.6", "2026-05-26", [
"Antigravity loop breakers: per-session tracking, repeated tool detection",
"has_content fix: function_call counts as valid output",
@@ -1303,6 +1310,9 @@ def _check_codex_auth():
if out.returncode == 0 and text:
return ("logged_in", text)
if text:
+ _tl = text.lower()
+ if "no such file" in _tl or "os error 2" in _tl or "not found" in _tl:
+ return ("not_configured", "Config missing — launch once to create")
return ("error", text)
return ("unknown", "No output from codex login status")
except FileNotFoundError:
diff --git a/src/codex_launcher_lib.py b/src/codex_launcher_lib.py
index 5e20b20..c7b6e05 100644
--- a/src/codex_launcher_lib.py
+++ b/src/codex_launcher_lib.py
@@ -83,6 +83,14 @@ model_catalog_json = ""
"""
CHANGELOG = [
+ ("3.11.7", "2026-05-26", [
+ "Vision auto-detect: uses provider's own vision model (e.g. 0G-Qwen-VL) as fallback for image description",
+ "Vision preprocessing replaces image stripping: images described via API instead of just removed",
+ "Fix AttributeError in image_url handling when value is string not dict",
+ "Merge PR #6: vision/OCR preprocessing for text-only models",
+ "Merge PR #7: 177 unit tests for translate-proxy.py",
+ "Auth os error 2 fix: GUI shows config-missing message instead of raw error",
+ ]),
("3.11.6", "2026-05-26", [
"Antigravity loop breakers: per-session tracking, edit-intent nudge (first turn only)",
"Loop breaker: same tool+args repeated 5+ times triggers force finalization",
diff --git a/src/translate-proxy.py b/src/translate-proxy.py
index eb01a2e..a097fdf 100755
--- a/src/translate-proxy.py
+++ b/src/translate-proxy.py
@@ -857,6 +857,25 @@ def _ensure_antigravity_client_version():
_antigravity_client_version_checked = time.time()
return _antigravity_client_version
+_VISION_MODEL_KEYWORDS = ("vl", "vision", "gpt-4o", "gpt-5", "claude-3", "claude-4", "gemini", "qwen-vl", "kimi-vl", "pixtral", "llava")
+
+def _auto_detect_vision_fallback(target_url, api_key, models):
+ """Auto-detect a vision-capable model from the current provider for image description."""
+ base = target_url.rstrip("/")
+ if "/v1" in base:
+ chat_url = base.split("/v1")[0] + "/v1/chat/completions"
+ else:
+ chat_url = base + "/v1/chat/completions"
+ vision_model = ""
+ for m in (models or []):
+ ml = m.lower()
+ if any(kw in ml for kw in _VISION_MODEL_KEYWORDS):
+ vision_model = m
+ break
+ if not vision_model:
+ return "", "", ""
+ return chat_url, vision_model, api_key
+
def _init_runtime():
global CONFIG, PORT, BACKEND, TARGET_URL, API_KEY, OAUTH_PROVIDER, _antigravity_version
global MODELS, CC_VERSION, REASONING_ENABLED, REASONING_EFFORT, BGP_ROUTES
@@ -879,9 +898,17 @@ def _init_runtime():
PROMPT_ENHANCER_MODEL = CONFIG.get("prompt_enhancer_model", "")
PROMPT_ENHANCER_URL = CONFIG.get("prompt_enhancer_url", "")
PROMPT_ENHANCER_KEY = CONFIG.get("prompt_enhancer_key", "")
- VISION_FALLBACK_URL = CONFIG.get("vision_fallback_url") or "https://api.kilo.ai/api/gateway/chat/completions"
- VISION_FALLBACK_MODEL = CONFIG.get("vision_fallback_model") or "kilo-auto/small"
+ VISION_FALLBACK_URL = CONFIG.get("vision_fallback_url") or ""
+ VISION_FALLBACK_MODEL = CONFIG.get("vision_fallback_model") or ""
VISION_FALLBACK_KEY = CONFIG.get("vision_fallback_key") or ""
+ if not VISION_FALLBACK_URL or not VISION_FALLBACK_MODEL:
+ _vision_url, _vision_model, _vision_key = _auto_detect_vision_fallback(TARGET_URL, API_KEY, MODELS)
+ if not VISION_FALLBACK_URL:
+ VISION_FALLBACK_URL = _vision_url
+ if not VISION_FALLBACK_MODEL:
+ VISION_FALLBACK_MODEL = _vision_model
+ if not VISION_FALLBACK_KEY:
+ VISION_FALLBACK_KEY = _vision_key
BGP_ROUTES = CONFIG.get("bgp_routes", [])
_api_key_pool = None
if API_KEY and "," in API_KEY and not OAUTH_PROVIDER.startswith("google") and BACKEND not in ("codebuff", "freebuff"):
@@ -2467,10 +2494,15 @@ def _preprocess_vision_input(input_data, schema):
if isinstance(part, dict) and part.get("type") in ("input_image", "image_url"):
changed = True
img_url = ""
- if part.get("type") == "input_image":
- img_url = part.get("image_url", {}).get("url", "")
+ iu = part.get("image_url")
+ if isinstance(iu, dict):
+ img_url = iu.get("url", "")
+ elif isinstance(iu, str):
+ img_url = iu
+ elif part.get("type") == "input_image":
+ img_url = part.get("url", "")
else:
- img_url = part.get("image_url", {}).get("url", part.get("url", ""))
+ img_url = part.get("url", "")
desc = _vision_describe_image({"url": img_url}, cache)
if desc:
new_parts.append({"type": "input_text", "text": f"[Image: {desc}]"})
@@ -2483,45 +2515,6 @@ def _preprocess_vision_input(input_data, schema):
changed_any = True
return input_data
-def _strip_images_from_input(input_data, model):
- if not isinstance(input_data, list) or _model_supports_vision(model):
- return input_data
- modified = False
- result = []
- for item in input_data:
- if item.get("type") != "message":
- result.append(item)
- continue
- content = item.get("content", [])
- if isinstance(content, str):
- result.append(item)
- continue
- new_content = []
- has_img = False
- for part in content:
- if isinstance(part, str):
- new_content.append(part)
- continue
- pt = part.get("type", "")
- if pt in ("input_image", "image_url"):
- if not has_img:
- fname = part.get("image_url", {}).get("url", part.get("url", "image.png"))
- if fname.startswith("data:"):
- fname = "screenshot.png"
- new_content.append({"type": "output_text", "text": f"[User attached image: {fname} — this model does not support vision]"})
- has_img = True
- modified = True
- else:
- new_content.append(part)
- if modified:
- result.append({**item, "content": new_content})
- else:
- result.append(item)
- if modified:
- print(f"[vision-filter] stripped {sum(1 for i in input_data if i.get('type')=='message' and any(c.get('type') in ('input_image','image_url') for c in (i.get('content') or []) if isinstance(c,dict)))} images for model={model}", file=sys.stderr)
- return result
- return input_data
-
def oa_input_to_messages(input_data):
msgs = []
tool_name_by_id = {}
@@ -4588,139 +4581,6 @@ def _extract_text(content):
return "".join(parts)
-def _vision_describe_image(img_data, cache):
- """Call vision fallback API to describe a single image.
-
- Args:
- img_data: dict with image_url field, or raw image_url dict
- cache: dict mapping image hash -> description (request-scoped)
-
- Returns:
- description string or None on failure
- """
- if not VISION_FALLBACK_URL:
- return None
-
- # Normalize image URL from various formats
- if isinstance(img_data, dict):
- img_url = img_data.get("url", "")
- if not img_url:
- inner = img_data.get("image_url", img_data)
- img_url = inner.get("url", "") if isinstance(inner, dict) else str(inner)
- else:
- img_url = str(img_data)
-
- if not img_url:
- return None
-
- # Check cache
- img_hash = hashlib.md5(img_url.encode("utf-8", errors="replace")).hexdigest()
- if img_hash in cache:
- return cache[img_hash]
-
- try:
- payload = json.dumps({
- "model": VISION_FALLBACK_MODEL,
- "messages": [{"role": "user", "content": [
- {"type": "text", "text": "Describe the content of this image in detail. If it contains text, transcribe it fully."},
- {"type": "image_url", "image_url": {"url": img_url}},
- ]}],
- "max_tokens": 1024,
- "stream": False,
- }).encode()
-
- headers = {"Content-Type": "application/json"}
- if VISION_FALLBACK_KEY:
- headers["Authorization"] = f"Bearer {VISION_FALLBACK_KEY}"
-
- req = urllib.request.Request(VISION_FALLBACK_URL, data=payload, headers=headers)
- resp = urllib.request.urlopen(req, timeout=30)
- body = json.loads(resp.read().decode())
-
- choices = body.get("choices", [])
- if choices:
- msg = choices[0].get("message", {})
- desc = msg.get("content", "")
- if desc:
- cache[img_hash] = desc
- return desc
- except Exception as e:
- print(f"[vision-fallback] error describing image: {e}", file=sys.stderr)
-
- return None
-
-
-def _preprocess_vision(messages, schema):
- """Replace image blocks with text descriptions when provider lacks vision support.
-
- Works on OpenAI Chat Completions message format (post-conversion).
- """
- if schema.supports_vision:
- return messages
-
- cache = {}
-
- for msg in messages:
- content = msg.get("content")
- if not isinstance(content, list):
- continue
- new_parts = []
- changed = False
- for part in content:
- if isinstance(part, dict) and part.get("type") in ("image_url", "input_image"):
- changed = True
- img_data = part.get("image_url", part)
- description = _vision_describe_image(img_data, cache)
- if description:
- new_parts.append({"type": "text", "text": f"[Image: {description}]"})
- else:
- new_parts.append({"type": "text", "text": "[Image: description non disponible - modele text-only]"})
- else:
- new_parts.append(part)
- if changed:
- msg["content"] = new_parts
-
- return messages
-
-
-def _preprocess_vision_input(input_data, schema):
- """Replace input_image blocks in Responses API input format with text descriptions.
-
- This runs BEFORE adapter.convert() so images are replaced before any
- conversion function can silently drop them.
- """
- if schema.supports_vision:
- return input_data
- if not isinstance(input_data, list):
- return input_data
-
- cache = {}
- changed_any = False
-
- for item in input_data:
- if item.get("type") != "message":
- continue
- content = item.get("content")
- if not isinstance(content, list):
- continue
- new_parts = []
- changed = False
- for part in content:
- if isinstance(part, dict) and part.get("type") == "input_image":
- changed = True
- changed_any = True
- img_data = part.get("image_url", part)
- description = _vision_describe_image(img_data, cache)
- if description:
- new_parts.append({"type": "input_text", "text": f"[Image: {description}]"})
- else:
- new_parts.append({"type": "input_text", "text": "[Image: description non disponible - modele text-only]"})
- else:
- new_parts.append(part)
- if changed:
- item["content"] = new_parts
-
- return input_data
# ═══════════════════════════════════════════════════════════════════
@@ -5322,14 +5182,22 @@ class Handler(http.server.BaseHTTPRequestHandler):
body = dict(body)
body["input"] = input_data
- # Strip images for non-vision models
- input_data = _strip_images_from_input(input_data, model)
- body["input"] = input_data
+ # Vision preprocessing for non-vision models
+ _schema = _load_schema(model=model)
+ _needs_vision_preprocess = False
+ if _schema and not _schema.supports_vision:
+ _needs_vision_preprocess = True
+ elif not _model_supports_vision(model):
+ print(f"[vision] model {model} detected as non-vision via name pattern, preprocessing images", file=sys.stderr)
+ if _schema:
+ _schema.supports_vision = False
+ _save_schema(_schema, model=model)
+ _needs_vision_preprocess = True
+ if _needs_vision_preprocess:
+ input_data = _preprocess_vision_input(input_data, _schema)
+ body["input"] = input_data
messages = oa_input_to_messages(input_data)
- _schema = _load_schema(model=model)
- if _schema and not _schema.supports_vision:
- messages = _preprocess_vision(messages, _schema)
messages = _inject_stored_reasoning(messages)
instructions = body.get("instructions", "").strip()
if instructions:
@@ -5384,7 +5252,7 @@ class Handler(http.server.BaseHTTPRequestHandler):
input_data = _crof_compact_for_retry(input_data, model, aggression=attempt)
body = dict(body)
body["input"] = input_data
- messages = oa_input_to_messages(_strip_images_from_input(input_data, model))
+ messages = oa_input_to_messages(_preprocess_vision_input(input_data, _schema) if _schema and not _schema.supports_vision else input_data)
messages = _inject_stored_reasoning(messages)
instructions = body.get("instructions", "").strip()
if instructions:
@@ -6517,7 +6385,7 @@ class Handler(http.server.BaseHTTPRequestHandler):
print(f"[{self._session_id}] [smart-continue] XML injection retry failed: {e}", file=sys.stderr)
break
_nudge_msg = {"role": "user", "content": nudge_text}
- nudge_messages = oa_input_to_messages(_strip_images_from_input(input_data, model)) + [_nudge_msg]
+ nudge_messages = oa_input_to_messages(_preprocess_vision_input(input_data, _schema) if _schema and not _schema.supports_vision else input_data) + [_nudge_msg]
instructions = body.get("instructions", "").strip()
if instructions:
nudge_messages.insert(0, {"role": "system", "content": instructions})
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tests/test_translate_proxy.py b/tests/test_translate_proxy.py
new file mode 100644
index 0000000..b4fa828
--- /dev/null
+++ b/tests/test_translate_proxy.py
@@ -0,0 +1,1168 @@
+"""
+Unit tests for translate-proxy.py
+
+Covers pure utility functions that can be tested without a running server.
+Uses only stdlib unittest + unittest.mock (zero pip dependencies).
+"""
+
+import json
+import sys
+import time
+import unittest
+from unittest.mock import patch, MagicMock
+
+# ---------------------------------------------------------------------------
+# Import the module under test.
+# The source file has a dash in its name so we use importlib.
+# ---------------------------------------------------------------------------
+import importlib
+
+_spec = importlib.util.spec_from_file_location(
+ "translate_proxy",
+ r"C:\dev\Codex-Launcher---Any-AI-Porovider\src\translate-proxy.py",
+)
+tp = importlib.util.module_from_spec(_spec)
+_spec.loader.exec_module(tp)
+
+
+# ===================================================================
+# Helpers
+# ===================================================================
+
+def _make_chat_resp(content="Hello", tool_calls=None, finish_reason="stop",
+ model="test-model", prompt_tokens=10, completion_tokens=5):
+ """Build a minimal OpenAI chat completion response dict."""
+ msg = {"role": "assistant", "content": content}
+ if tool_calls:
+ msg["tool_calls"] = tool_calls
+ return {
+ "choices": [{"message": msg, "finish_reason": finish_reason}],
+ "usage": {"prompt_tokens": prompt_tokens,
+ "completion_tokens": completion_tokens,
+ "total_tokens": prompt_tokens + completion_tokens},
+ }
+
+
+# ===================================================================
+# Tests grouped by functionality
+# ===================================================================
+
+
+class TestUnwrapCmd(unittest.TestCase):
+ """Tests for _unwrap_cmd (line ~2810)."""
+
+ def test_non_string_passthrough(self):
+ self.assertIsNone(tp._unwrap_cmd(None))
+ self.assertEqual(tp._unwrap_cmd(42), 42)
+
+ def test_plain_string_not_json(self):
+ self.assertEqual(tp._unwrap_cmd("echo hello"), "echo hello")
+
+ def test_single_wrap(self):
+ inner = json.dumps({"cmd": "echo hello"})
+ self.assertEqual(tp._unwrap_cmd(inner), "echo hello")
+
+ def test_double_wrap(self):
+ inner = json.dumps({"cmd": "echo hello"})
+ outer = json.dumps({"cmd": inner})
+ self.assertEqual(tp._unwrap_cmd(outer), "echo hello")
+
+ def test_triple_wrap(self):
+ inner = json.dumps({"cmd": "echo hello"})
+ mid = json.dumps({"cmd": inner})
+ outer = json.dumps({"cmd": mid})
+ self.assertEqual(tp._unwrap_cmd(outer), "echo hello")
+
+ def test_json_without_cmd_key(self):
+ val = json.dumps({"foo": "bar"})
+ self.assertEqual(tp._unwrap_cmd(val), val)
+
+ def test_cmd_value_not_string(self):
+ val = json.dumps({"cmd": 123})
+ self.assertEqual(tp._unwrap_cmd(val), val)
+
+ def test_empty_string(self):
+ self.assertEqual(tp._unwrap_cmd(""), "")
+
+
+class TestStripXmlishTags(unittest.TestCase):
+ """Tests for _strip_xmlish_tags (line ~2807)."""
+
+ def test_none(self):
+ self.assertEqual(tp._strip_xmlish_tags(None), "")
+
+ def test_empty_string(self):
+ self.assertEqual(tp._strip_xmlish_tags(""), "")
+
+ def test_no_tags(self):
+ self.assertEqual(tp._strip_xmlish_tags("hello world"), "hello world")
+
+ def test_basic_tags(self):
+ self.assertEqual(tp._strip_xmlish_tags("hello world"), "hello world")
+
+ def test_self_closing(self):
+ self.assertEqual(tp._strip_xmlish_tags("before
after"), "before after")
+
+ def test_multiple_tags(self):
+ self.assertEqual(
+ tp._strip_xmlish_tags("
one
two"),
+ "one two",
+ )
+
+
+class TestExtractXmlToolCalls(unittest.TestCase):
+ """Tests for _extract_xml_tool_calls."""
+
+ def test_empty(self):
+ self.assertEqual(tp._extract_xml_tool_calls(""), [])
+ self.assertEqual(tp._extract_xml_tool_calls(None), [])
+
+ def test_no_calls(self):
+ self.assertEqual(tp._extract_xml_tool_calls("just plain text"), [])
+
+ def test_single_tool_call(self):
+ # Regex: (\w+)(.*?)
+ # Format: NAME>CONTENT
+ text = 'bash>echo hi'
+ results = tp._extract_xml_tool_calls(text)
+ self.assertEqual(len(results), 1)
+ self.assertEqual(results[0]["name"], "bash")
+ self.assertIn("call_id", results[0])
+ self.assertTrue(results[0]["call_id"].startswith("xml_"))
+
+ def test_multiple_tool_calls(self):
+ text = (
+ 'bash>echo hi'
+ 'edit>test.py'
+ )
+ results = tp._extract_xml_tool_calls(text)
+ self.assertEqual(len(results), 2)
+ self.assertEqual(results[0]["name"], "bash")
+ self.assertEqual(results[1]["name"], "edit")
+
+ def test_json_args(self):
+ text = 'tool>{"key": "value"}'
+ results = tp._extract_xml_tool_calls(text)
+ self.assertEqual(len(results), 1)
+ self.assertEqual(results[0]["name"], "tool")
+ args = json.loads(results[0]["args"])
+ # JSON parsing of XML content may vary - just check result exists
+ self.assertIn("args", results[0])
+
+ def test_code_fenced_args(self):
+ text = 'tool>{"a": 1}'
+ results = tp._extract_xml_tool_calls(text)
+ self.assertEqual(len(results), 1)
+
+
+class TestNormalizeToolArgs(unittest.TestCase):
+ """Tests for _normalize_tool_args (line ~2196)."""
+
+ def test_empty(self):
+ self.assertEqual(tp._normalize_tool_args(""), "")
+ self.assertIsNone(tp._normalize_tool_args(None))
+ self.assertEqual(tp._normalize_tool_args("{}"), "{}")
+
+ def test_valid_json_passthrough(self):
+ raw = json.dumps({"cmd": "echo hello"})
+ self.assertEqual(tp._normalize_tool_args(raw), raw)
+
+ def test_wrapped_arguments_uppercase_key(self):
+ inner = json.dumps({"cmd": "echo hello"})
+ raw = json.dumps({"Arguments": inner})
+ result = tp._normalize_tool_args(raw)
+ self.assertEqual(json.loads(result), {"cmd": "echo hello"})
+
+ def test_wrapped_arguments_with_code_fence(self):
+ inner = json.dumps({"cmd": "echo hello"})
+ raw = json.dumps({"Arguments": f"```json\n{inner}\n```"})
+ result = tp._normalize_tool_args(raw)
+ self.assertEqual(json.loads(result), {"cmd": "echo hello"})
+
+ def test_invalid_json_passthrough(self):
+ raw = "not json at all"
+ self.assertEqual(tp._normalize_tool_args(raw), raw)
+
+
+class TestEmit(unittest.TestCase):
+ """Tests for emit (line ~1406)."""
+
+ def test_basic_event(self):
+ result = tp.emit("test.event", {"key": "value"})
+ self.assertTrue(result.startswith("event: test.event\n"))
+ self.assertIn('"key": "value"', result)
+ self.assertTrue(result.endswith("\n\n"))
+
+ def test_event_data_format(self):
+ data = {"type": "response.created", "id": "123"}
+ result = tp.emit("response.created", data)
+ lines = result.split("\n")
+ self.assertEqual(lines[0], "event: response.created")
+ self.assertTrue(lines[1].startswith("data: "))
+ parsed = json.loads(lines[1][6:])
+ self.assertEqual(parsed["type"], "response.created")
+
+
+class TestUid(unittest.TestCase):
+ """Tests for uid (line ~1403)."""
+
+ def test_default_prefix(self):
+ result = tp.uid()
+ self.assertTrue(result.startswith("id-"))
+
+ def test_custom_prefix(self):
+ result = tp.uid("msg")
+ self.assertTrue(result.startswith("msg-"))
+
+ def test_uniqueness(self):
+ ids = {tp.uid() for _ in range(100)}
+ self.assertEqual(len(ids), 100)
+
+
+class TestRedact(unittest.TestCase):
+ """Tests for _redact / _redact_json (line ~2065/2073)."""
+
+ def test_none(self):
+ self.assertIsNone(tp._redact(None))
+
+ def test_empty_string(self):
+ self.assertEqual(tp._redact(""), "")
+
+ def test_no_secrets(self):
+ self.assertEqual(tp._redact("hello world"), "hello world")
+
+ def test_sk_key(self):
+ text = "key=sk-abc123def456ghi789jkl012mno"
+ self.assertIn("[REDACTED:key]", tp._redact(text))
+ self.assertNotIn("sk-abc123", tp._redact(text))
+
+ def test_sk_ant_key(self):
+ # Note: sk-ant-... is matched by the first sk- pattern (sk-[A-Za-z0-9_\-]{20,})
+ # which redacts it as [REDACTED:key] since it runs first.
+ text = "key=sk-ant-api03-abcdefghijklmnopqrstuv"
+ redacted = tp._redact(text)
+ self.assertIn("[REDACTED:", redacted)
+ self.assertNotIn("sk-ant-api03", redacted)
+
+ def test_github_token(self):
+ text = "token=ghp_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghij"
+ self.assertIn("[REDACTED:github]", tp._redact(text))
+
+ def test_bearer_token(self):
+ text = "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.verylongpayload"
+ self.assertIn("Bearer [REDACTED]", tp._redact(text))
+
+ def test_redact_json_dict(self):
+ obj = {"api_key": "sk-abc123def456ghi789jkl012mno345pqr"}
+ result = tp._redact_json(obj)
+ self.assertIn("[REDACTED:key]", result)
+ self.assertNotIn("sk-abc123", result)
+
+
+class TestEstimateTokens(unittest.TestCase):
+ """Tests for _estimate_tokens (line ~1809)."""
+
+ def test_none(self):
+ self.assertEqual(tp._estimate_tokens(None), 0)
+
+ def test_string(self):
+ # "abcdefghij" is 10 chars => max(1, 10//4) = 2
+ self.assertEqual(tp._estimate_tokens("abcdefghij"), 2)
+
+ def test_short_string(self):
+ # "ab" is 2 chars => max(1, 2//4) = max(1, 0) = 1
+ self.assertEqual(tp._estimate_tokens("ab"), 1)
+
+ def test_dict(self):
+ obj = {"key": "value"}
+ result = tp._estimate_tokens(obj)
+ self.assertGreater(result, 0)
+
+ def test_list(self):
+ result = tp._estimate_tokens([1, 2, 3])
+ self.assertGreater(result, 0)
+
+
+class TestItemSummary(unittest.TestCase):
+ """Tests for _item_summary (line ~1614)."""
+
+ def test_message(self):
+ item = {"type": "message", "role": "user",
+ "content": [{"type": "input_text", "text": "hello"}]}
+ result = tp._item_summary(item)
+ self.assertIn("[user]", result)
+ self.assertIn("hello", result)
+
+ def test_function_call(self):
+ item = {"type": "function_call", "name": "exec_command",
+ "arguments": json.dumps({"cmd": "ls -la"})}
+ result = tp._item_summary(item)
+ self.assertIn("[tool call]", result)
+ self.assertIn("exec_command", result)
+ self.assertIn("ls -la", result)
+
+ def test_function_call_output(self):
+ item = {"type": "function_call_output", "output": "file1.txt\nfile2.txt"}
+ result = tp._item_summary(item)
+ self.assertIn("[tool result]", result)
+ self.assertIn("file1.txt", result)
+
+ def test_function_call_output_truncated(self):
+ long_output = "x" * 300
+ item = {"type": "function_call_output", "output": long_output}
+ result = tp._item_summary(item, max_len=100)
+ self.assertIn("...", result)
+ self.assertTrue(len(result) < 300)
+
+ def test_unknown_type(self):
+ item = {"type": "custom_type"}
+ result = tp._item_summary(item)
+ self.assertEqual(result, "[custom_type]")
+
+
+class TestContextLimitForModel(unittest.TestCase):
+ """Tests for _context_limit_for_model (line ~1800)."""
+
+ def test_none_model(self):
+ self.assertEqual(tp._context_limit_for_model(None), 32768)
+
+ def test_empty_model(self):
+ self.assertEqual(tp._context_limit_for_model(""), 32768)
+
+ def test_gpt4o(self):
+ self.assertEqual(tp._context_limit_for_model("gpt-4o"), 128000)
+
+ def test_claude_sonnet(self):
+ self.assertEqual(tp._context_limit_for_model("claude-sonnet-4-6"), 200000)
+
+ def test_gemini_flash(self):
+ self.assertEqual(tp._context_limit_for_model("gemini-2.5-flash"), 1000000)
+
+ def test_case_insensitive(self):
+ self.assertEqual(tp._context_limit_for_model("GPT-4O"), 128000)
+
+ def test_unknown_model(self):
+ self.assertEqual(tp._context_limit_for_model("future-model-x"), 32768)
+
+ def test_deepseek(self):
+ self.assertEqual(tp._context_limit_for_model("deepseek-chat"), 64000)
+
+
+class TestClassifyAntigravityError(unittest.TestCase):
+ """Tests for _classify_antigravity_error (line ~632)."""
+
+ def test_400(self):
+ self.assertEqual(tp._classify_antigravity_error(400, "bad input"), "bad_request")
+
+ def test_401_transient(self):
+ self.assertEqual(tp._classify_antigravity_error(401, "access denied"), "auth_transient")
+
+ def test_401_permanent_invalid_grant(self):
+ self.assertEqual(tp._classify_antigravity_error(401, "invalid_grant token"), "auth_permanent")
+
+ def test_401_permanent_revoked(self):
+ self.assertEqual(tp._classify_antigravity_error(401, "token revoked"), "auth_permanent")
+
+ def test_403_validation_required(self):
+ self.assertEqual(tp._classify_antigravity_error(403, "validation_required"), "validation_required")
+
+ def test_403_account_banned(self):
+ self.assertEqual(
+ tp._classify_antigravity_error(403, "has been disabled for violation of terms of service"),
+ "account_banned",
+ )
+
+ def test_403_service_disabled(self):
+ self.assertEqual(tp._classify_antigravity_error(403, "service_disabled"), "service_disabled")
+
+ def test_403_generic(self):
+ self.assertEqual(tp._classify_antigravity_error(403, "some other error"), "forbidden")
+
+ def test_429_capacity(self):
+ self.assertEqual(
+ tp._classify_antigravity_error(429, "model_capacity_exhausted"),
+ "capacity_exhausted",
+ )
+
+ def test_429_quota(self):
+ self.assertEqual(
+ tp._classify_antigravity_error(429, "quota_exhausted"),
+ "quota_exhausted",
+ )
+
+ def test_429_generic(self):
+ self.assertEqual(tp._classify_antigravity_error(429, "slow down"), "rate_limited")
+
+ def test_503_capacity(self):
+ self.assertEqual(
+ tp._classify_antigravity_error(503, "service temporarily unavailable"),
+ "capacity_exhausted",
+ )
+
+ def test_500(self):
+ self.assertEqual(tp._classify_antigravity_error(500, "internal error"), "server_error")
+
+ def test_418(self):
+ self.assertEqual(tp._classify_antigravity_error(418, "I'm a teapot"), "unknown")
+
+
+class TestParseRateLimitReset(unittest.TestCase):
+ """Tests for _parse_rate_limit_reset (line ~658)."""
+
+ def test_quota_reset_delay_ms(self):
+ result = tp._parse_rate_limit_reset('quotaResetDelay: 5000ms')
+ self.assertEqual(result, 5.0)
+
+ def test_quota_reset_delay_seconds(self):
+ result = tp._parse_rate_limit_reset('quotaResetDelay: 30s')
+ self.assertEqual(result, 30.0)
+
+ def test_hms_format(self):
+ result = tp._parse_rate_limit_reset('1h30m0s')
+ self.assertEqual(result, 5400)
+
+ def test_resets_in_format(self):
+ result = tp._parse_rate_limit_reset('Resets in ~2h0m')
+ self.assertEqual(result, 7200)
+
+ def test_retry_after_seconds(self):
+ result = tp._parse_rate_limit_reset('retry-after: 60 seconds')
+ self.assertEqual(result, 60)
+
+ def test_no_match(self):
+ self.assertIsNone(tp._parse_rate_limit_reset('no timing info here'))
+
+ def test_empty_string(self):
+ self.assertIsNone(tp._parse_rate_limit_reset(''))
+
+
+class TestExtractText(unittest.TestCase):
+ """Tests for _extract_text (line ~4295)."""
+
+ def test_string(self):
+ self.assertEqual(tp._extract_text("hello"), "hello")
+
+ def test_none(self):
+ self.assertEqual(tp._extract_text(None), "")
+
+ def test_number(self):
+ self.assertEqual(tp._extract_text(42), "")
+
+ def test_content_blocks(self):
+ content = [
+ {"type": "input_text", "text": "hello "},
+ {"type": "output_text", "text": "world"},
+ ]
+ self.assertEqual(tp._extract_text(content), "hello world")
+
+ def test_mixed_content(self):
+ content = [
+ "plain text ",
+ {"type": "text", "text": "and dict"},
+ ]
+ self.assertEqual(tp._extract_text(content), "plain text and dict")
+
+ def test_empty_list(self):
+ self.assertEqual(tp._extract_text([]), "")
+
+
+class TestValidateToolPairs(unittest.TestCase):
+ """Tests for validate_tool_pairs (line ~1984)."""
+
+ def test_not_list(self):
+ self.assertEqual(tp.validate_tool_pairs("not a list"), [])
+ self.assertEqual(tp.validate_tool_pairs(None), [])
+
+ def test_empty_list(self):
+ self.assertEqual(tp.validate_tool_pairs([]), [])
+
+ def test_valid_pairs(self):
+ items = [
+ {"type": "function_call", "call_id": "c1", "name": "tool1"},
+ {"type": "function_call_output", "call_id": "c1", "output": "ok"},
+ ]
+ self.assertEqual(tp.validate_tool_pairs(items), [])
+
+ def test_orphan_output(self):
+ items = [
+ {"type": "function_call_output", "call_id": "missing", "output": "ok"},
+ ]
+ errors = tp.validate_tool_pairs(items)
+ self.assertEqual(len(errors), 1)
+ self.assertEqual(errors[0]["error"], "orphan_function_call_output")
+ self.assertEqual(errors[0]["call_id"], "missing")
+
+ def test_orphan_no_call_id(self):
+ items = [
+ {"type": "function_call_output", "output": "ok"},
+ ]
+ errors = tp.validate_tool_pairs(items)
+ self.assertEqual(len(errors), 1)
+
+ def test_mixed_valid_and_orphan(self):
+ items = [
+ {"type": "function_call", "call_id": "c1"},
+ {"type": "function_call_output", "call_id": "c1"},
+ {"type": "function_call_output", "call_id": "c_orphan"},
+ ]
+ errors = tp.validate_tool_pairs(items)
+ self.assertEqual(len(errors), 1)
+ self.assertEqual(errors[0]["call_id"], "c_orphan")
+
+
+class TestRepairOrphanToolOutputs(unittest.TestCase):
+ """Tests for repair_orphan_tool_outputs (line ~2001)."""
+
+ def test_repair(self):
+ items = [
+ {"type": "message", "role": "user", "content": "hi"},
+ {"type": "function_call_output", "output": "orphan result"},
+ ]
+ errors = [{"index": 1, "call_id": None, "error": "orphan_function_call_output"}]
+ repaired = tp.repair_orphan_tool_outputs(items, errors)
+ self.assertEqual(len(repaired), 2)
+ self.assertEqual(repaired[0]["type"], "message") # kept
+ self.assertEqual(repaired[1]["type"], "message") # converted
+ self.assertIn("unmatched tool output", repaired[1]["content"][0]["text"])
+
+ def test_no_errors(self):
+ items = [{"type": "message", "role": "user", "content": "hi"}]
+ repaired = tp.repair_orphan_tool_outputs(items, [])
+ self.assertEqual(repaired, items)
+
+
+class TestHasFunctionCallOutput(unittest.TestCase):
+ """Tests for has_function_call_output (line ~2051)."""
+
+ def test_not_list(self):
+ self.assertFalse(tp.has_function_call_output("string"))
+ self.assertFalse(tp.has_function_call_output(None))
+
+ def test_has_output(self):
+ items = [{"type": "function_call_output", "output": "ok"}]
+ self.assertTrue(tp.has_function_call_output(items))
+
+ def test_no_output(self):
+ items = [{"type": "message", "role": "user", "content": "hi"}]
+ self.assertFalse(tp.has_function_call_output(items))
+
+
+class TestSynthesizeToolResults(unittest.TestCase):
+ """Tests for synthesize_tool_results_for_chat (line ~2014)."""
+
+ def test_not_list(self):
+ result, changed = tp.synthesize_tool_results_for_chat("string")
+ self.assertEqual(result, "string")
+ self.assertFalse(changed)
+
+ def test_empty_list(self):
+ result, changed = tp.synthesize_tool_results_for_chat([])
+ self.assertEqual(result, [])
+ self.assertFalse(changed)
+
+ def test_synthesis(self):
+ items = [
+ {"type": "function_call", "call_id": "c1", "name": "bash",
+ "arguments": json.dumps({"cmd": "ls"})},
+ {"type": "function_call_output", "call_id": "c1", "output": "file.txt"},
+ {"type": "message", "role": "user", "content": [{"type": "input_text", "text": "hi"}]},
+ ]
+ result, changed = tp.synthesize_tool_results_for_chat(items)
+ self.assertTrue(changed)
+ # Should have synthesized message + original message
+ self.assertEqual(len(result), 2)
+ # The synthesized message should contain tool info
+ synth_msg = result[0]
+ self.assertEqual(synth_msg["role"], "user")
+ synth_text = synth_msg["content"][0]["text"]
+ self.assertIn("Tool execution result", synth_text)
+ self.assertIn("bash", synth_text)
+ self.assertIn("file.txt", synth_text)
+
+ def test_orphan_output(self):
+ items = [
+ {"type": "function_call_output", "call_id": "missing", "output": "orphan"},
+ ]
+ result, changed = tp.synthesize_tool_results_for_chat(items)
+ self.assertTrue(changed)
+ self.assertEqual(result[0]["role"], "user")
+
+
+class TestOaConvertTools(unittest.TestCase):
+ """Tests for oa_convert_tools (line ~2417)."""
+
+ def test_none_input(self):
+ self.assertIsNone(tp.oa_convert_tools(None))
+
+ def test_empty_list(self):
+ self.assertIsNone(tp.oa_convert_tools([]))
+
+ def test_function_tools(self):
+ tools = [
+ {"type": "function", "function": {"name": "bash", "description": "Run a command",
+ "parameters": {"type": "object"}}},
+ ]
+ result = tp.oa_convert_tools(tools)
+ self.assertEqual(len(result), 1)
+ self.assertEqual(result[0]["function"]["name"], "bash")
+
+ def test_non_function_filtered(self):
+ tools = [
+ {"type": "web_search"},
+ {"type": "function", "function": {"name": "bash", "description": "", "parameters": {}}},
+ ]
+ result = tp.oa_convert_tools(tools)
+ self.assertEqual(len(result), 1)
+
+ def test_empty_name_filtered(self):
+ tools = [{"type": "function", "function": {"name": "", "description": "", "parameters": {}}}]
+ self.assertIsNone(tp.oa_convert_tools(tools))
+
+ def test_null_name_filtered(self):
+ tools = [{"type": "function", "function": {"name": "null", "description": "", "parameters": {}}}]
+ self.assertIsNone(tp.oa_convert_tools(tools))
+
+ def test_function_without_fn_key(self):
+ tools = [{"type": "function", "name": "my_tool", "description": "desc", "parameters": {}}]
+ result = tp.oa_convert_tools(tools)
+ self.assertEqual(len(result), 1)
+ self.assertEqual(result[0]["function"]["name"], "my_tool")
+
+ def test_strict_mode(self):
+ tools = [{"type": "function", "function": {"name": "t", "description": "", "parameters": {}}}]
+ result = tp.oa_convert_tools(tools, strict=True)
+ self.assertTrue(result[0]["function"]["strict"])
+
+
+class TestOaRespToResponses(unittest.TestCase):
+ """Tests for oa_resp_to_responses (line ~2448)."""
+
+ def test_simple_text(self):
+ chat_resp = _make_chat_resp(content="Hello world")
+ result = tp.oa_resp_to_responses(chat_resp, "test-model", resp_id="r1")
+ self.assertEqual(result["id"], "r1")
+ self.assertEqual(result["model"], "test-model")
+ self.assertEqual(result["status"], "completed")
+ self.assertEqual(len(result["output"]), 1)
+ self.assertEqual(result["output"][0]["type"], "message")
+ text_content = result["output"][0]["content"][0]["text"]
+ self.assertEqual(text_content, "Hello world")
+
+ def test_tool_calls(self):
+ tc = {"id": "tc1", "type": "function", "function": {"name": "bash", "arguments": '{"cmd":"ls"}'}}
+ chat_resp = _make_chat_resp(content="", tool_calls=[tc], finish_reason="tool_calls")
+ result = tp.oa_resp_to_responses(chat_resp, "test-model")
+ self.assertEqual(result["status"], "completed")
+ # Should have one function_call output
+ fc_outputs = [o for o in result["output"] if o["type"] == "function_call"]
+ self.assertEqual(len(fc_outputs), 1)
+ self.assertEqual(fc_outputs[0]["name"], "bash")
+
+ def test_finish_reason_length(self):
+ chat_resp = _make_chat_resp(finish_reason="length")
+ result = tp.oa_resp_to_responses(chat_resp, "m")
+ self.assertEqual(result["status"], "incomplete")
+
+ def test_finish_reason_content_filter(self):
+ chat_resp = _make_chat_resp(finish_reason="content_filter")
+ result = tp.oa_resp_to_responses(chat_resp, "m")
+ self.assertEqual(result["status"], "incomplete")
+
+ def test_usage(self):
+ chat_resp = _make_chat_resp(prompt_tokens=100, completion_tokens=50)
+ result = tp.oa_resp_to_responses(chat_resp, "m")
+ self.assertEqual(result["usage"]["input_tokens"], 100)
+ self.assertEqual(result["usage"]["output_tokens"], 50)
+ self.assertEqual(result["usage"]["total_tokens"], 150)
+
+ def test_content_and_tool_calls(self):
+ tc = {"id": "tc1", "type": "function", "function": {"name": "bash", "arguments": '{}'}}
+ chat_resp = _make_chat_resp(content="thinking...", tool_calls=[tc])
+ result = tp.oa_resp_to_responses(chat_resp, "m")
+ types = [o["type"] for o in result["output"]]
+ self.assertIn("message", types)
+ self.assertIn("function_call", types)
+
+
+class TestOaInputToMessages(unittest.TestCase):
+ """Tests for oa_input_to_messages (line ~2265)."""
+
+ def test_string_input(self):
+ result = tp.oa_input_to_messages("hello")
+ self.assertEqual(len(result), 1)
+ self.assertEqual(result[0]["role"], "user")
+ self.assertEqual(result[0]["content"], "hello")
+
+ def test_message_input(self):
+ items = [
+ {"type": "message", "role": "user",
+ "content": [{"type": "input_text", "text": "hello"}]},
+ ]
+ result = tp.oa_input_to_messages(items)
+ self.assertEqual(len(result), 1)
+ self.assertEqual(result[0]["content"], "hello")
+
+ def test_developer_role_becomes_system(self):
+ items = [
+ {"type": "message", "role": "developer",
+ "content": [{"type": "input_text", "text": "instructions"}]},
+ ]
+ result = tp.oa_input_to_messages(items)
+ self.assertEqual(result[0]["role"], "system")
+
+ def test_function_call_and_output(self):
+ items = [
+ {"type": "function_call", "call_id": "c1", "name": "bash",
+ "arguments": json.dumps({"cmd": "ls"})},
+ {"type": "function_call_output", "call_id": "c1", "output": "file.txt"},
+ ]
+ result = tp.oa_input_to_messages(items)
+ # Should produce: assistant with tool_calls + tool message
+ self.assertEqual(len(result), 2)
+ self.assertEqual(result[0]["role"], "assistant")
+ self.assertIsNotNone(result[0]["tool_calls"])
+ self.assertEqual(result[1]["role"], "tool")
+ self.assertEqual(result[1]["tool_call_id"], "c1")
+
+ def test_string_content(self):
+ items = [
+ {"type": "message", "role": "user", "content": "plain text"},
+ ]
+ result = tp.oa_input_to_messages(items)
+ self.assertEqual(result[0]["content"], "plain text")
+
+ def test_reasoning_content(self):
+ items = [
+ {"type": "message", "role": "assistant",
+ "content": [
+ {"type": "reasoning", "content": [{"text": "thinking..."}]},
+ {"type": "output_text", "text": "result"},
+ ]},
+ ]
+ result = tp.oa_input_to_messages(items)
+ self.assertEqual(len(result), 1)
+ self.assertEqual(result[0]["content"], "result")
+ self.assertIn("reasoning_content", result[0])
+ self.assertEqual(result[0]["reasoning_content"], "thinking...")
+
+
+class TestFbStripReasoningFromMessages(unittest.TestCase):
+ """Tests for _fb_strip_reasoning_from_messages (line ~1383)."""
+
+ def test_strips_reasoning(self):
+ messages = [
+ {"role": "assistant", "content": "hi", "reasoning_content": "thoughts"},
+ {"role": "user", "content": "ok"},
+ ]
+ result = tp._fb_strip_reasoning_from_messages(messages)
+ self.assertNotIn("reasoning_content", result[0])
+ self.assertEqual(result[0]["content"], "hi")
+ self.assertEqual(result[1]["content"], "ok")
+
+ def test_no_reasoning(self):
+ messages = [{"role": "user", "content": "hi"}]
+ result = tp._fb_strip_reasoning_from_messages(messages)
+ self.assertEqual(result, messages)
+
+ def test_empty_list(self):
+ self.assertEqual(tp._fb_strip_reasoning_from_messages([]), [])
+
+
+class TestCbInputToMessages(unittest.TestCase):
+ """Tests for _cb_input_to_messages (line ~1321)."""
+
+ def test_string_input(self):
+ result = tp._cb_input_to_messages("hello")
+ self.assertEqual(len(result), 1)
+ self.assertEqual(result[0]["role"], "user")
+ self.assertEqual(result[0]["content"], "hello")
+
+ def test_with_instructions(self):
+ result = tp._cb_input_to_messages("hello", instructions="be helpful")
+ self.assertEqual(len(result), 2)
+ self.assertEqual(result[0]["role"], "system")
+ self.assertEqual(result[0]["content"], "be helpful")
+
+ def test_function_call_and_output(self):
+ items = [
+ {"type": "function_call", "call_id": "c1", "name": "bash",
+ "arguments": json.dumps({"cmd": "ls"})},
+ {"type": "function_call_output", "call_id": "c1", "output": "file.txt"},
+ ]
+ result = tp._cb_input_to_messages(items)
+ self.assertGreaterEqual(len(result), 2)
+
+ def test_reasoning_items_skipped(self):
+ items = [
+ {"type": "reasoning", "content": [{"text": "thinking"}]},
+ {"type": "message", "role": "user", "content": [{"type": "input_text", "text": "hi"}]},
+ ]
+ result = tp._cb_input_to_messages(items)
+ # reasoning should be skipped, only message kept
+ msg_roles = [m["role"] for m in result]
+ self.assertNotIn("reasoning", msg_roles)
+
+ def test_developer_role_becomes_system(self):
+ items = [
+ {"type": "message", "role": "developer",
+ "content": [{"type": "input_text", "text": "instructions"}]},
+ ]
+ result = tp._cb_input_to_messages(items)
+ self.assertEqual(result[0]["role"], "system")
+
+
+class TestCodebuffHardDisableReasoning(unittest.TestCase):
+ """Tests for _codebuff_hard_disable_reasoning (line ~1258)."""
+
+ def test_removes_reasoning_fields(self):
+ messages = [
+ {"role": "assistant", "content": "hi",
+ "reasoning_content": "thoughts", "thinking": "deep"},
+ {"role": "user", "content": "ok"},
+ ]
+ tp._codebuff_hard_disable_reasoning(messages)
+ self.assertNotIn("reasoning_content", messages[0])
+ self.assertNotIn("thinking", messages[0])
+ self.assertIn("content", messages[0])
+
+ def test_handles_non_dict(self):
+ messages = ["not a dict", {"role": "user", "content": "ok"}]
+ tp._codebuff_hard_disable_reasoning(messages)
+ # Should not crash
+
+ def test_removes_all_reasoning_keys(self):
+ for key in ("reasoning_content", "reasoning", "thinking",
+ "thinking_content", "thoughts"):
+ msg = {"role": "assistant", "content": "hi", key: "val"}
+ tp._codebuff_hard_disable_reasoning([msg])
+ self.assertNotIn(key, msg, f"Key {key} should have been removed")
+
+
+class TestIsReasoningContentError(unittest.TestCase):
+ """Tests for _is_reasoning_content_error (line ~1269)."""
+
+ def test_none(self):
+ self.assertFalse(tp._is_reasoning_content_error(None))
+
+ def test_empty(self):
+ self.assertFalse(tp._is_reasoning_content_error(""))
+
+ def test_matching(self):
+ self.assertTrue(tp._is_reasoning_content_error("reasoning_content is invalid"))
+ self.assertTrue(tp._is_reasoning_content_error("thinking mode required"))
+ self.assertTrue(tp._is_reasoning_content_error("must be passed back"))
+
+ def test_not_matching(self):
+ self.assertFalse(tp._is_reasoning_content_error("rate limit exceeded"))
+
+
+class TestDsRebuildToolHistory(unittest.TestCase):
+ """Tests for _ds_rebuild_tool_history (line ~1298)."""
+
+ def test_empty_messages(self):
+ result = tp._ds_rebuild_tool_history([])
+ self.assertEqual(result, [])
+
+ def test_no_matching_tool_ids(self):
+ messages = [
+ {"role": "user", "content": "hi"},
+ {"role": "tool", "tool_call_id": "tc_unknown", "content": "output"},
+ ]
+ result = tp._ds_rebuild_tool_history(messages)
+ self.assertEqual(len(result), 2)
+
+
+class TestRouteKey(unittest.TestCase):
+ """Tests for _route_key (line ~1475)."""
+
+ def test_basic(self):
+ route = {"name": "primary", "target_url": "https://api.example.com", "model": "gpt-4o"}
+ key = tp._route_key(route)
+ self.assertEqual(key, "primary::https://api.example.com::gpt-4o")
+
+ def test_missing_fields(self):
+ route = {}
+ key = tp._route_key(route)
+ self.assertEqual(key, "::::")
+
+ def test_partial_fields(self):
+ route = {"name": "backup"}
+ key = tp._route_key(route)
+ self.assertEqual(key, "backup::::")
+
+
+class TestScoreRoute(unittest.TestCase):
+ """Tests for _score_route (line ~1492)."""
+
+ def test_basic_scoring(self):
+ route = {"name": "r1", "priority": 10}
+ stats = {}
+ score = tp._score_route(route, stats)
+ self.assertEqual(score, 10)
+
+ def test_with_latency(self):
+ route = {"name": "r1", "priority": 10}
+ key = tp._route_key(route)
+ stats = {key: {"ewma_latency_s": 2.0, "consecutive_failures": 0}}
+ score = tp._score_route(stats=stats, route=route)
+ # priority(10) + min(ewma*5, 50)(10) + failures*20(0) = 20
+ self.assertEqual(score, 20)
+
+ def test_with_failures(self):
+ route = {"name": "r1", "priority": 10}
+ key = tp._route_key(route)
+ stats = {key: {"ewma_latency_s": 0, "consecutive_failures": 3}}
+ score = tp._score_route(stats=stats, route=route)
+ # priority(10) + 0 + 3*20 = 70
+ self.assertEqual(score, 70)
+
+ def test_open_circuit(self):
+ route = {"name": "r1", "priority": 10}
+ key = tp._route_key(route)
+ stats = {key: {"open_until_ts": time.time() + 100}}
+ score = tp._score_route(stats=stats, route=route)
+ self.assertEqual(score, 1_000_000)
+
+ def test_rate_limited(self):
+ route = {"name": "r1", "priority": 10}
+ key = tp._route_key(route)
+ stats = {key: {"ewma_latency_s": 0, "consecutive_failures": 0,
+ "rate_limited_until": time.time() + 100}}
+ score = tp._score_route(stats=stats, route=route)
+ self.assertEqual(score, 10 + 500)
+
+
+class TestBucketForRoute(unittest.TestCase):
+ """Tests for _bucket_for_route (line ~2164)."""
+
+ def test_creates_bucket(self):
+ route = {"name": "test_route"}
+ bucket = tp._bucket_for_route(route)
+ self.assertIsNotNone(bucket)
+ self.assertTrue(hasattr(bucket, "allow"))
+
+ def test_same_route_same_bucket(self):
+ route = {"name": "same_route"}
+ b1 = tp._bucket_for_route(route)
+ b2 = tp._bucket_for_route(route)
+ self.assertIs(b1, b2)
+
+ def test_default_name(self):
+ route = {"target_url": "https://example.com"}
+ bucket = tp._bucket_for_route(route)
+ self.assertIsNotNone(bucket)
+
+
+class TestSanitizeToolCalls(unittest.TestCase):
+ """Tests for _sanitize_tool_calls (line ~3467)."""
+
+ def test_non_exec_command_passthrough(self):
+ calls = [{"name": "TodoWrite", "arguments": json.dumps({"todos": []})}]
+ result = tp._sanitize_tool_calls(calls)
+ self.assertEqual(len(result), 1)
+ self.assertEqual(result[0]["name"], "TodoWrite")
+
+ def test_exec_command_empty_cmd(self):
+ calls = [{"name": "exec_command", "arguments": json.dumps({"cmd": ""})}]
+ result = tp._sanitize_tool_calls(calls)
+ self.assertEqual(len(result), 1)
+ cmd = json.loads(result[0]["arguments"])["cmd"]
+ self.assertIn("CC-SANITIZER", cmd)
+
+ def test_exec_command_null_cmd(self):
+ calls = [{"name": "exec_command", "arguments": json.dumps({"cmd": "null"})}]
+ result = tp._sanitize_tool_calls(calls)
+ cmd = json.loads(result[0]["arguments"])["cmd"]
+ self.assertIn("CC-SANITIZER", cmd)
+
+ def test_exec_command_valid_cmd(self):
+ calls = [{"name": "exec_command", "arguments": json.dumps({"cmd": "echo hello"})}]
+ result = tp._sanitize_tool_calls(calls)
+ args = json.loads(result[0]["arguments"])
+ self.assertEqual(args["cmd"], "echo hello")
+
+ def test_double_wrapped_cmd(self):
+ inner = json.dumps({"cmd": "echo hello"})
+ calls = [{"name": "exec_command", "arguments": json.dumps({"cmd": inner})}]
+ result = tp._sanitize_tool_calls(calls)
+ args = json.loads(result[0]["arguments"])
+ self.assertEqual(args["cmd"], "echo hello")
+
+ def test_json_object_cmd(self):
+ calls = [{"name": "exec_command",
+ "arguments": json.dumps({"cmd": json.dumps({"cmd": "ls -la"})})}]
+ result = tp._sanitize_tool_calls(calls)
+ args = json.loads(result[0]["arguments"])
+ self.assertEqual(args["cmd"], "ls -la")
+
+ def test_invalid_arguments_json(self):
+ calls = [{"name": "exec_command", "arguments": "not json"}]
+ result = tp._sanitize_tool_calls(calls)
+ self.assertEqual(len(result), 1)
+ self.assertEqual(result[0]["arguments"], "not json")
+
+
+class TestBuildExploreCmd(unittest.TestCase):
+ """Tests for _build_explore_cmd (line ~2830)."""
+
+ def test_empty_input(self):
+ cmd, desc = tp._build_explore_cmd("")
+ self.assertIsNone(cmd)
+ self.assertIsNone(desc)
+
+ def test_none_input(self):
+ cmd, desc = tp._build_explore_cmd(None)
+ self.assertIsNone(cmd)
+ self.assertIsNone(desc)
+
+ def test_no_url(self):
+ cmd, desc = tp._build_explore_cmd("just some text without urls")
+ self.assertIsNone(cmd)
+ self.assertIsNone(desc)
+
+ @patch.object(tp, "_IS_WINDOWS", True)
+ def test_windows_command(self):
+ cmd, desc = tp._build_explore_cmd("check out https://github.com/owner/repo")
+ self.assertIsNotNone(cmd)
+ self.assertIn("Invoke-WebRequest", cmd)
+ self.assertIn("$env:TEMP", cmd)
+ self.assertIn("README", desc)
+
+ @patch.object(tp, "_IS_WINDOWS", False)
+ def test_linux_command(self):
+ cmd, desc = tp._build_explore_cmd("check out https://github.com/owner/repo")
+ self.assertIsNotNone(cmd)
+ self.assertIn("curl", cmd)
+ self.assertIn("/tmp", cmd)
+
+ @patch.object(tp, "_IS_WINDOWS", False)
+ def test_git_suffix_stripped(self):
+ cmd, desc = tp._build_explore_cmd("https://github.com/owner/repo.git")
+ self.assertIsNotNone(cmd)
+ # The api_base should not have .git
+ self.assertNotIn("repo.git", cmd)
+ self.assertIn("repo", cmd)
+
+ @patch.object(tp, "_IS_WINDOWS", False)
+ def test_url_from_json_list(self):
+ text = json.dumps([{"content": "see https://gitea.com/user/proj for details"}])
+ cmd, desc = tp._build_explore_cmd(text)
+ self.assertIsNotNone(cmd)
+ self.assertIn("gitea.com", cmd)
+
+ @patch.object(tp, "_IS_WINDOWS", False)
+ def test_api_v1_url_passthrough(self):
+ cmd, desc = tp._build_explore_cmd("https://gitea.com/api/v1/repos/user/proj")
+ self.assertIsNotNone(cmd)
+ self.assertIn("api/v1/repos", cmd)
+
+
+class TestCompactInput(unittest.TestCase):
+ """Tests for _compact_input (line ~1657)."""
+
+ def test_string_passthrough(self):
+ self.assertEqual(tp._compact_input("hello"), "hello")
+
+ def test_small_list_passthrough(self):
+ items = [{"type": "message", "role": "user", "content": "hi"}]
+ result = tp._compact_input(items)
+ self.assertEqual(result, items)
+
+ def test_truncates_large_tool_output(self):
+ long_output = "x" * 10000
+ items = [{"type": "function_call_output", "output": long_output}]
+ result = tp._compact_input(items)
+ self.assertEqual(len(result), 1)
+ self.assertIn("truncated", result[0]["output"])
+ self.assertLess(len(result[0]["output"]), 10000)
+
+
+class TestAdaptiveCompact(unittest.TestCase):
+ """Tests for _adaptive_compact (line ~1820)."""
+
+ def test_within_budget(self):
+ items = [{"type": "message", "role": "user", "content": "hi"}]
+ result, changed = tp._adaptive_compact(items, "gpt-4o")
+ self.assertFalse(changed)
+ self.assertEqual(result, items)
+
+ def test_string_passthrough(self):
+ result, changed = tp._adaptive_compact("hello", "gpt-4o")
+ self.assertFalse(changed)
+ self.assertEqual(result, "hello")
+
+
+class TestUpstreamTarget(unittest.TestCase):
+ """Tests for upstream_target (line ~1409)."""
+
+ def test_no_trailing_slash(self):
+ self.assertEqual(tp.upstream_target("https://api.example.com", "/v1/chat"),
+ "https://api.example.com/v1/chat")
+
+ def test_trailing_slash(self):
+ self.assertEqual(tp.upstream_target("https://api.example.com/", "/v1/chat"),
+ "https://api.example.com/v1/chat")
+
+ def test_suffix_already_present(self):
+ self.assertEqual(tp.upstream_target("https://api.example.com/v1/chat", "/v1/chat"),
+ "https://api.example.com/v1/chat")
+
+
+class TestCcInputToMessages(unittest.TestCase):
+ """Tests for cc_input_to_messages (line ~2334)."""
+
+ def test_string_input(self):
+ result = tp.cc_input_to_messages("hello")
+ self.assertEqual(len(result), 1)
+ self.assertEqual(result[0], {"role": "user", "content": "hello"})
+
+ def test_with_instructions(self):
+ result = tp.cc_input_to_messages("hello", instructions="be helpful")
+ self.assertEqual(len(result), 2)
+ self.assertEqual(result[0]["role"], "user")
+ self.assertEqual(result[0]["content"], "be helpful")
+
+ def test_message_items(self):
+ items = [
+ {"type": "message", "role": "user", "content": [{"type": "input_text", "text": "hi"}]},
+ ]
+ result = tp.cc_input_to_messages(items)
+ self.assertEqual(len(result), 1)
+ self.assertEqual(result[0]["content"], "hi")
+
+ def test_function_call_as_text(self):
+ items = [
+ {"type": "function_call", "call_id": "c1", "name": "bash",
+ "arguments": json.dumps({"cmd": "ls"})},
+ ]
+ result = tp.cc_input_to_messages(items)
+ # Tool calls should be inline JSON text in assistant message
+ self.assertTrue(any(m["role"] == "assistant" for m in result))
+
+ def test_function_call_output_as_user(self):
+ items = [
+ {"type": "function_call_output", "call_id": "c1", "output": "result text"},
+ ]
+ result = tp.cc_input_to_messages(items)
+ self.assertTrue(any(m["role"] == "user" and "result text" in m.get("content", "") for m in result))
+
+ def test_non_dict_items_skipped(self):
+ items = ["not a dict", 42]
+ result = tp.cc_input_to_messages(items)
+ self.assertEqual(len(result), 0)
+
+ def test_non_list_non_string_returns_empty(self):
+ result = tp.cc_input_to_messages(42)
+ self.assertEqual(result, [])
+
+ def test_role_normalization(self):
+ items = [
+ {"type": "message", "role": "system", "content": "sys"},
+ ]
+ result = tp.cc_input_to_messages(items)
+ # system role should become user
+ self.assertEqual(result[0]["role"], "user")
+
+
+if __name__ == "__main__":
+ unittest.main()