"""
Unit tests for translate-proxy.py
Covers pure utility functions that can be tested without a running server.
Uses only stdlib unittest + unittest.mock (zero pip dependencies).
"""
import json
import os
import sys
import time
import unittest
from unittest.mock import patch, MagicMock
# ---------------------------------------------------------------------------
# Import the module under test.
# The source file has a dash in its name so we use importlib.
# ---------------------------------------------------------------------------
import importlib
_spec = importlib.util.spec_from_file_location(
"translate_proxy",
os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "src", "translate-proxy.py"),
)
tp = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(tp)
# ===================================================================
# Helpers
# ===================================================================
def _make_chat_resp(content="Hello", tool_calls=None, finish_reason="stop",
model="test-model", prompt_tokens=10, completion_tokens=5):
"""Build a minimal OpenAI chat completion response dict."""
msg = {"role": "assistant", "content": content}
if tool_calls:
msg["tool_calls"] = tool_calls
return {
"choices": [{"message": msg, "finish_reason": finish_reason}],
"usage": {"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens},
}
# ===================================================================
# Tests grouped by functionality
# ===================================================================
class TestUnwrapCmd(unittest.TestCase):
"""Tests for _unwrap_cmd (line ~2810)."""
def test_non_string_passthrough(self):
self.assertIsNone(tp._unwrap_cmd(None))
self.assertEqual(tp._unwrap_cmd(42), 42)
def test_plain_string_not_json(self):
self.assertEqual(tp._unwrap_cmd("echo hello"), "echo hello")
def test_single_wrap(self):
inner = json.dumps({"cmd": "echo hello"})
self.assertEqual(tp._unwrap_cmd(inner), "echo hello")
def test_double_wrap(self):
inner = json.dumps({"cmd": "echo hello"})
outer = json.dumps({"cmd": inner})
self.assertEqual(tp._unwrap_cmd(outer), "echo hello")
def test_triple_wrap(self):
inner = json.dumps({"cmd": "echo hello"})
mid = json.dumps({"cmd": inner})
outer = json.dumps({"cmd": mid})
self.assertEqual(tp._unwrap_cmd(outer), "echo hello")
def test_json_without_cmd_key(self):
val = json.dumps({"foo": "bar"})
self.assertEqual(tp._unwrap_cmd(val), val)
def test_cmd_value_not_string(self):
val = json.dumps({"cmd": 123})
self.assertEqual(tp._unwrap_cmd(val), val)
def test_empty_string(self):
self.assertEqual(tp._unwrap_cmd(""), "")
class TestStripXmlishTags(unittest.TestCase):
"""Tests for _strip_xmlish_tags (line ~2807)."""
def test_none(self):
self.assertEqual(tp._strip_xmlish_tags(None), "")
def test_empty_string(self):
self.assertEqual(tp._strip_xmlish_tags(""), "")
def test_no_tags(self):
self.assertEqual(tp._strip_xmlish_tags("hello world"), "hello world")
def test_basic_tags(self):
self.assertEqual(tp._strip_xmlish_tags("hello world"), "hello world")
def test_self_closing(self):
self.assertEqual(tp._strip_xmlish_tags("before
after"), "before after")
def test_multiple_tags(self):
self.assertEqual(
tp._strip_xmlish_tags("
one
two"),
"one two",
)
class TestExtractXmlToolCalls(unittest.TestCase):
"""Tests for _extract_xml_tool_calls."""
def test_empty(self):
self.assertEqual(tp._extract_xml_tool_calls(""), [])
self.assertEqual(tp._extract_xml_tool_calls(None), [])
def test_no_calls(self):
self.assertEqual(tp._extract_xml_tool_calls("just plain text"), [])
def test_single_tool_call(self):
text = 'echo hi'
results = tp._extract_xml_tool_calls(text)
self.assertEqual(len(results), 1)
self.assertEqual(results[0]["name"], "exec_command")
self.assertIn("call_id", results[0])
self.assertTrue(results[0]["call_id"].startswith("xml_"))
def test_multiple_tool_calls(self):
text = (
'echo hi'
'test.py'
)
results = tp._extract_xml_tool_calls(text)
self.assertEqual(len(results), 2)
self.assertEqual(results[0]["name"], "exec_command")
self.assertEqual(results[1]["name"], "exec_command")
def test_json_args(self):
text = '{"key": "value"}'
results = tp._extract_xml_tool_calls(text)
self.assertEqual(len(results), 1)
self.assertEqual(results[0]["name"], "exec_command")
args = json.loads(results[0]["args"])
self.assertIn("args", results[0])
def test_code_fenced_args(self):
text = '{"a": 1}'
results = tp._extract_xml_tool_calls(text)
self.assertEqual(len(results), 1)
class TestNormalizeToolArgs(unittest.TestCase):
"""Tests for _normalize_tool_args (line ~2196)."""
def test_empty(self):
self.assertEqual(tp._normalize_tool_args(""), "")
self.assertIsNone(tp._normalize_tool_args(None))
self.assertEqual(tp._normalize_tool_args("{}"), "{}")
def test_valid_json_passthrough(self):
raw = json.dumps({"cmd": "echo hello"})
self.assertEqual(tp._normalize_tool_args(raw), raw)
def test_wrapped_arguments_uppercase_key(self):
inner = json.dumps({"cmd": "echo hello"})
raw = json.dumps({"Arguments": inner})
result = tp._normalize_tool_args(raw)
self.assertEqual(json.loads(result), {"cmd": "echo hello"})
def test_wrapped_arguments_with_code_fence(self):
inner = json.dumps({"cmd": "echo hello"})
raw = json.dumps({"Arguments": f"```json\n{inner}\n```"})
result = tp._normalize_tool_args(raw)
self.assertEqual(json.loads(result), {"cmd": "echo hello"})
def test_invalid_json_passthrough(self):
raw = "not json at all"
self.assertEqual(tp._normalize_tool_args(raw), raw)
class TestEmit(unittest.TestCase):
"""Tests for emit (line ~1406)."""
def test_basic_event(self):
result = tp.emit("test.event", {"key": "value"})
self.assertTrue(result.startswith("event: test.event\n"))
self.assertIn('"key": "value"', result)
self.assertTrue(result.endswith("\n\n"))
def test_event_data_format(self):
data = {"type": "response.created", "id": "123"}
result = tp.emit("response.created", data)
lines = result.split("\n")
self.assertEqual(lines[0], "event: response.created")
self.assertTrue(lines[1].startswith("data: "))
parsed = json.loads(lines[1][6:])
self.assertEqual(parsed["type"], "response.created")
class TestUid(unittest.TestCase):
"""Tests for uid (line ~1403)."""
def test_default_prefix(self):
result = tp.uid()
self.assertTrue(result.startswith("id-"))
def test_custom_prefix(self):
result = tp.uid("msg")
self.assertTrue(result.startswith("msg-"))
def test_uniqueness(self):
ids = {tp.uid() for _ in range(100)}
self.assertEqual(len(ids), 100)
class TestRedact(unittest.TestCase):
"""Tests for _redact / _redact_json (line ~2065/2073)."""
def test_none(self):
self.assertIsNone(tp._redact(None))
def test_empty_string(self):
self.assertEqual(tp._redact(""), "")
def test_no_secrets(self):
self.assertEqual(tp._redact("hello world"), "hello world")
def test_sk_key(self):
text = "key=sk-abc123def456ghi789jkl012mno"
self.assertIn("[REDACTED:key]", tp._redact(text))
self.assertNotIn("sk-abc123", tp._redact(text))
def test_sk_ant_key(self):
# Note: sk-ant-... is matched by the first sk- pattern (sk-[A-Za-z0-9_\-]{20,})
# which redacts it as [REDACTED:key] since it runs first.
text = "key=sk-ant-api03-abcdefghijklmnopqrstuv"
redacted = tp._redact(text)
self.assertIn("[REDACTED:", redacted)
self.assertNotIn("sk-ant-api03", redacted)
def test_github_token(self):
text = "token=ghp_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghij"
self.assertIn("[REDACTED:github]", tp._redact(text))
def test_bearer_token(self):
text = "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.verylongpayload"
self.assertIn("Bearer [REDACTED]", tp._redact(text))
def test_redact_json_dict(self):
obj = {"api_key": "sk-abc123def456ghi789jkl012mno345pqr"}
result = tp._redact_json(obj)
self.assertIn("[REDACTED:key]", result)
self.assertNotIn("sk-abc123", result)
class TestEstimateTokens(unittest.TestCase):
"""Tests for _estimate_tokens (line ~1809)."""
def test_none(self):
self.assertEqual(tp._estimate_tokens(None), 0)
def test_string(self):
# "abcdefghij" is 10 chars => max(1, 10//4) = 2
self.assertEqual(tp._estimate_tokens("abcdefghij"), 2)
def test_short_string(self):
# "ab" is 2 chars => max(1, 2//4) = max(1, 0) = 1
self.assertEqual(tp._estimate_tokens("ab"), 1)
def test_dict(self):
obj = {"key": "value"}
result = tp._estimate_tokens(obj)
self.assertGreater(result, 0)
def test_list(self):
result = tp._estimate_tokens([1, 2, 3])
self.assertGreater(result, 0)
class TestItemSummary(unittest.TestCase):
"""Tests for _item_summary (line ~1614)."""
def test_message(self):
item = {"type": "message", "role": "user",
"content": [{"type": "input_text", "text": "hello"}]}
result = tp._item_summary(item)
self.assertIn("[user]", result)
self.assertIn("hello", result)
def test_function_call(self):
item = {"type": "function_call", "name": "exec_command",
"arguments": json.dumps({"cmd": "ls -la"})}
result = tp._item_summary(item)
self.assertIn("[tool call]", result)
self.assertIn("exec_command", result)
self.assertIn("ls -la", result)
def test_function_call_output(self):
item = {"type": "function_call_output", "output": "file1.txt\nfile2.txt"}
result = tp._item_summary(item)
self.assertIn("[tool result]", result)
self.assertIn("file1.txt", result)
def test_function_call_output_truncated(self):
long_output = "x" * 300
item = {"type": "function_call_output", "output": long_output}
result = tp._item_summary(item, max_len=100)
self.assertIn("...", result)
self.assertTrue(len(result) < 300)
def test_unknown_type(self):
item = {"type": "custom_type"}
result = tp._item_summary(item)
self.assertEqual(result, "[custom_type]")
class TestContextLimitForModel(unittest.TestCase):
"""Tests for _context_limit_for_model (line ~1800)."""
def test_none_model(self):
self.assertEqual(tp._context_limit_for_model(None), 32768)
def test_empty_model(self):
self.assertEqual(tp._context_limit_for_model(""), 32768)
def test_gpt4o(self):
self.assertEqual(tp._context_limit_for_model("gpt-4o"), 128000)
def test_claude_sonnet(self):
self.assertEqual(tp._context_limit_for_model("claude-sonnet-4-6"), 200000)
def test_gemini_flash(self):
self.assertEqual(tp._context_limit_for_model("gemini-2.5-flash"), 1000000)
def test_case_insensitive(self):
self.assertEqual(tp._context_limit_for_model("GPT-4O"), 128000)
def test_unknown_model(self):
self.assertEqual(tp._context_limit_for_model("future-model-x"), 32768)
def test_deepseek(self):
self.assertEqual(tp._context_limit_for_model("deepseek-chat"), 64000)
class TestClassifyAntigravityError(unittest.TestCase):
"""Tests for _classify_antigravity_error (line ~632)."""
def test_400(self):
self.assertEqual(tp._classify_antigravity_error(400, "bad input"), "bad_request")
def test_401_transient(self):
self.assertEqual(tp._classify_antigravity_error(401, "access denied"), "auth_transient")
def test_401_permanent_invalid_grant(self):
self.assertEqual(tp._classify_antigravity_error(401, "invalid_grant token"), "auth_permanent")
def test_401_permanent_revoked(self):
self.assertEqual(tp._classify_antigravity_error(401, "token revoked"), "auth_permanent")
def test_403_validation_required(self):
self.assertEqual(tp._classify_antigravity_error(403, "validation_required"), "validation_required")
def test_403_account_banned(self):
self.assertEqual(
tp._classify_antigravity_error(403, "has been disabled for violation of terms of service"),
"account_banned",
)
def test_403_service_disabled(self):
self.assertEqual(tp._classify_antigravity_error(403, "service_disabled"), "service_disabled")
def test_403_generic(self):
self.assertEqual(tp._classify_antigravity_error(403, "some other error"), "forbidden")
def test_429_capacity(self):
self.assertEqual(
tp._classify_antigravity_error(429, "model_capacity_exhausted"),
"capacity_exhausted",
)
def test_429_quota(self):
self.assertEqual(
tp._classify_antigravity_error(429, "quota_exhausted"),
"quota_exhausted",
)
def test_429_generic(self):
self.assertEqual(tp._classify_antigravity_error(429, "slow down"), "rate_limited")
def test_503_capacity(self):
self.assertEqual(
tp._classify_antigravity_error(503, "service temporarily unavailable"),
"capacity_exhausted",
)
def test_500(self):
self.assertEqual(tp._classify_antigravity_error(500, "internal error"), "server_error")
def test_418(self):
self.assertEqual(tp._classify_antigravity_error(418, "I'm a teapot"), "unknown")
class TestParseRateLimitReset(unittest.TestCase):
"""Tests for _parse_rate_limit_reset (line ~658)."""
def test_quota_reset_delay_ms(self):
result = tp._parse_rate_limit_reset('quotaResetDelay: 5000ms')
self.assertEqual(result, 5.0)
def test_quota_reset_delay_seconds(self):
result = tp._parse_rate_limit_reset('quotaResetDelay: 30s')
self.assertEqual(result, 30.0)
def test_hms_format(self):
result = tp._parse_rate_limit_reset('1h30m0s')
self.assertEqual(result, 5400)
def test_resets_in_format(self):
result = tp._parse_rate_limit_reset('Resets in ~2h0m')
self.assertEqual(result, 7200)
def test_retry_after_seconds(self):
result = tp._parse_rate_limit_reset('retry-after: 60 seconds')
self.assertEqual(result, 60)
def test_no_match(self):
self.assertIsNone(tp._parse_rate_limit_reset('no timing info here'))
def test_empty_string(self):
self.assertIsNone(tp._parse_rate_limit_reset(''))
class TestExtractText(unittest.TestCase):
"""Tests for _extract_text (line ~4295)."""
def test_string(self):
self.assertEqual(tp._extract_text("hello"), "hello")
def test_none(self):
self.assertEqual(tp._extract_text(None), "")
def test_number(self):
self.assertEqual(tp._extract_text(42), "")
def test_content_blocks(self):
content = [
{"type": "input_text", "text": "hello "},
{"type": "output_text", "text": "world"},
]
self.assertEqual(tp._extract_text(content), "hello world")
def test_mixed_content(self):
content = [
"plain text ",
{"type": "text", "text": "and dict"},
]
self.assertEqual(tp._extract_text(content), "plain text and dict")
def test_empty_list(self):
self.assertEqual(tp._extract_text([]), "")
class TestValidateToolPairs(unittest.TestCase):
"""Tests for validate_tool_pairs (line ~1984)."""
def test_not_list(self):
self.assertEqual(tp.validate_tool_pairs("not a list"), [])
self.assertEqual(tp.validate_tool_pairs(None), [])
def test_empty_list(self):
self.assertEqual(tp.validate_tool_pairs([]), [])
def test_valid_pairs(self):
items = [
{"type": "function_call", "call_id": "c1", "name": "tool1"},
{"type": "function_call_output", "call_id": "c1", "output": "ok"},
]
self.assertEqual(tp.validate_tool_pairs(items), [])
def test_orphan_output(self):
items = [
{"type": "function_call_output", "call_id": "missing", "output": "ok"},
]
errors = tp.validate_tool_pairs(items)
self.assertEqual(len(errors), 1)
self.assertEqual(errors[0]["error"], "orphan_function_call_output")
self.assertEqual(errors[0]["call_id"], "missing")
def test_orphan_no_call_id(self):
items = [
{"type": "function_call_output", "output": "ok"},
]
errors = tp.validate_tool_pairs(items)
self.assertEqual(len(errors), 1)
def test_mixed_valid_and_orphan(self):
items = [
{"type": "function_call", "call_id": "c1"},
{"type": "function_call_output", "call_id": "c1"},
{"type": "function_call_output", "call_id": "c_orphan"},
]
errors = tp.validate_tool_pairs(items)
self.assertEqual(len(errors), 1)
self.assertEqual(errors[0]["call_id"], "c_orphan")
class TestRepairOrphanToolOutputs(unittest.TestCase):
"""Tests for repair_orphan_tool_outputs (line ~2001)."""
def test_repair(self):
items = [
{"type": "message", "role": "user", "content": "hi"},
{"type": "function_call_output", "output": "orphan result"},
]
errors = [{"index": 1, "call_id": None, "error": "orphan_function_call_output"}]
repaired = tp.repair_orphan_tool_outputs(items, errors)
self.assertEqual(len(repaired), 2)
self.assertEqual(repaired[0]["type"], "message") # kept
self.assertEqual(repaired[1]["type"], "message") # converted
self.assertIn("unmatched tool output", repaired[1]["content"][0]["text"])
def test_no_errors(self):
items = [{"type": "message", "role": "user", "content": "hi"}]
repaired = tp.repair_orphan_tool_outputs(items, [])
self.assertEqual(repaired, items)
class TestHasFunctionCallOutput(unittest.TestCase):
"""Tests for has_function_call_output (line ~2051)."""
def test_not_list(self):
self.assertFalse(tp.has_function_call_output("string"))
self.assertFalse(tp.has_function_call_output(None))
def test_has_output(self):
items = [{"type": "function_call_output", "output": "ok"}]
self.assertTrue(tp.has_function_call_output(items))
def test_no_output(self):
items = [{"type": "message", "role": "user", "content": "hi"}]
self.assertFalse(tp.has_function_call_output(items))
class TestSynthesizeToolResults(unittest.TestCase):
"""Tests for synthesize_tool_results_for_chat (line ~2014)."""
def test_not_list(self):
result, changed = tp.synthesize_tool_results_for_chat("string")
self.assertEqual(result, "string")
self.assertFalse(changed)
def test_empty_list(self):
result, changed = tp.synthesize_tool_results_for_chat([])
self.assertEqual(result, [])
self.assertFalse(changed)
def test_synthesis(self):
items = [
{"type": "function_call", "call_id": "c1", "name": "bash",
"arguments": json.dumps({"cmd": "ls"})},
{"type": "function_call_output", "call_id": "c1", "output": "file.txt"},
{"type": "message", "role": "user", "content": [{"type": "input_text", "text": "hi"}]},
]
result, changed = tp.synthesize_tool_results_for_chat(items)
self.assertTrue(changed)
# Should have synthesized message + original message
self.assertEqual(len(result), 2)
# The synthesized message should contain tool info
synth_msg = result[0]
self.assertEqual(synth_msg["role"], "user")
synth_text = synth_msg["content"][0]["text"]
self.assertIn("Tool execution result", synth_text)
self.assertIn("bash", synth_text)
self.assertIn("file.txt", synth_text)
def test_orphan_output(self):
items = [
{"type": "function_call_output", "call_id": "missing", "output": "orphan"},
]
result, changed = tp.synthesize_tool_results_for_chat(items)
self.assertTrue(changed)
self.assertEqual(result[0]["role"], "user")
class TestOaConvertTools(unittest.TestCase):
"""Tests for oa_convert_tools (line ~2417)."""
def test_none_input(self):
self.assertIsNone(tp.oa_convert_tools(None))
def test_empty_list(self):
self.assertIsNone(tp.oa_convert_tools([]))
def test_function_tools(self):
tools = [
{"type": "function", "function": {"name": "bash", "description": "Run a command",
"parameters": {"type": "object"}}},
]
result = tp.oa_convert_tools(tools)
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["function"]["name"], "bash")
def test_non_function_filtered(self):
tools = [
{"type": "web_search"},
{"type": "function", "function": {"name": "bash", "description": "", "parameters": {}}},
]
result = tp.oa_convert_tools(tools)
self.assertEqual(len(result), 1)
def test_empty_name_filtered(self):
tools = [{"type": "function", "function": {"name": "", "description": "", "parameters": {}}}]
self.assertIsNone(tp.oa_convert_tools(tools))
def test_null_name_filtered(self):
tools = [{"type": "function", "function": {"name": "null", "description": "", "parameters": {}}}]
self.assertIsNone(tp.oa_convert_tools(tools))
def test_function_without_fn_key(self):
tools = [{"type": "function", "name": "my_tool", "description": "desc", "parameters": {}}]
result = tp.oa_convert_tools(tools)
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["function"]["name"], "my_tool")
def test_strict_mode(self):
tools = [{"type": "function", "function": {"name": "t", "description": "", "parameters": {}}}]
result = tp.oa_convert_tools(tools, strict=True)
self.assertTrue(result[0]["function"]["strict"])
class TestOaRespToResponses(unittest.TestCase):
"""Tests for oa_resp_to_responses (line ~2448)."""
def test_simple_text(self):
chat_resp = _make_chat_resp(content="Hello world")
result = tp.oa_resp_to_responses(chat_resp, "test-model", resp_id="r1")
self.assertEqual(result["id"], "r1")
self.assertEqual(result["model"], "test-model")
self.assertEqual(result["status"], "completed")
self.assertEqual(len(result["output"]), 1)
self.assertEqual(result["output"][0]["type"], "message")
text_content = result["output"][0]["content"][0]["text"]
self.assertEqual(text_content, "Hello world")
def test_tool_calls(self):
tc = {"id": "tc1", "type": "function", "function": {"name": "bash", "arguments": '{"cmd":"ls"}'}}
chat_resp = _make_chat_resp(content="", tool_calls=[tc], finish_reason="tool_calls")
result = tp.oa_resp_to_responses(chat_resp, "test-model")
self.assertEqual(result["status"], "completed")
# Should have one function_call output
fc_outputs = [o for o in result["output"] if o["type"] == "function_call"]
self.assertEqual(len(fc_outputs), 1)
self.assertEqual(fc_outputs[0]["name"], "bash")
def test_finish_reason_length(self):
chat_resp = _make_chat_resp(finish_reason="length")
result = tp.oa_resp_to_responses(chat_resp, "m")
self.assertEqual(result["status"], "incomplete")
def test_finish_reason_content_filter(self):
chat_resp = _make_chat_resp(finish_reason="content_filter")
result = tp.oa_resp_to_responses(chat_resp, "m")
self.assertEqual(result["status"], "incomplete")
def test_usage(self):
chat_resp = _make_chat_resp(prompt_tokens=100, completion_tokens=50)
result = tp.oa_resp_to_responses(chat_resp, "m")
self.assertEqual(result["usage"]["input_tokens"], 100)
self.assertEqual(result["usage"]["output_tokens"], 50)
self.assertEqual(result["usage"]["total_tokens"], 150)
def test_content_and_tool_calls(self):
tc = {"id": "tc1", "type": "function", "function": {"name": "bash", "arguments": '{}'}}
chat_resp = _make_chat_resp(content="thinking...", tool_calls=[tc])
result = tp.oa_resp_to_responses(chat_resp, "m")
types = [o["type"] for o in result["output"]]
self.assertIn("message", types)
self.assertIn("function_call", types)
class TestOaInputToMessages(unittest.TestCase):
"""Tests for oa_input_to_messages (line ~2265)."""
def test_string_input(self):
result = tp.oa_input_to_messages("hello")
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["role"], "user")
self.assertEqual(result[0]["content"], "hello")
def test_message_input(self):
items = [
{"type": "message", "role": "user",
"content": [{"type": "input_text", "text": "hello"}]},
]
result = tp.oa_input_to_messages(items)
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["content"], "hello")
def test_developer_role_becomes_system(self):
items = [
{"type": "message", "role": "developer",
"content": [{"type": "input_text", "text": "instructions"}]},
]
result = tp.oa_input_to_messages(items)
self.assertEqual(result[0]["role"], "system")
def test_function_call_and_output(self):
items = [
{"type": "function_call", "call_id": "c1", "name": "bash",
"arguments": json.dumps({"cmd": "ls"})},
{"type": "function_call_output", "call_id": "c1", "output": "file.txt"},
]
result = tp.oa_input_to_messages(items)
# Should produce: assistant with tool_calls + tool message
self.assertEqual(len(result), 2)
self.assertEqual(result[0]["role"], "assistant")
self.assertIsNotNone(result[0]["tool_calls"])
self.assertEqual(result[1]["role"], "tool")
self.assertEqual(result[1]["tool_call_id"], "c1")
def test_string_content(self):
items = [
{"type": "message", "role": "user", "content": "plain text"},
]
result = tp.oa_input_to_messages(items)
self.assertEqual(result[0]["content"], "plain text")
def test_reasoning_content(self):
items = [
{"type": "message", "role": "assistant",
"content": [
{"type": "reasoning", "content": [{"text": "thinking..."}]},
{"type": "output_text", "text": "result"},
]},
]
result = tp.oa_input_to_messages(items)
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["content"], "result")
self.assertIn("reasoning_content", result[0])
self.assertEqual(result[0]["reasoning_content"], "thinking...")
class TestFbStripReasoningFromMessages(unittest.TestCase):
"""Tests for _fb_strip_reasoning_from_messages (line ~1383)."""
def test_strips_reasoning(self):
messages = [
{"role": "assistant", "content": "hi", "reasoning_content": "thoughts"},
{"role": "user", "content": "ok"},
]
result = tp._fb_strip_reasoning_from_messages(messages)
self.assertNotIn("reasoning_content", result[0])
self.assertEqual(result[0]["content"], "hi")
self.assertEqual(result[1]["content"], "ok")
def test_no_reasoning(self):
messages = [{"role": "user", "content": "hi"}]
result = tp._fb_strip_reasoning_from_messages(messages)
self.assertEqual(result, messages)
def test_empty_list(self):
self.assertEqual(tp._fb_strip_reasoning_from_messages([]), [])
class TestCbInputToMessages(unittest.TestCase):
"""Tests for _cb_input_to_messages (line ~1321)."""
def test_string_input(self):
result = tp._cb_input_to_messages("hello")
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["role"], "user")
self.assertEqual(result[0]["content"], "hello")
def test_with_instructions(self):
result = tp._cb_input_to_messages("hello", instructions="be helpful")
self.assertEqual(len(result), 2)
self.assertEqual(result[0]["role"], "system")
self.assertEqual(result[0]["content"], "be helpful")
def test_function_call_and_output(self):
items = [
{"type": "function_call", "call_id": "c1", "name": "bash",
"arguments": json.dumps({"cmd": "ls"})},
{"type": "function_call_output", "call_id": "c1", "output": "file.txt"},
]
result = tp._cb_input_to_messages(items)
self.assertGreaterEqual(len(result), 2)
def test_reasoning_items_skipped(self):
items = [
{"type": "reasoning", "content": [{"text": "thinking"}]},
{"type": "message", "role": "user", "content": [{"type": "input_text", "text": "hi"}]},
]
result = tp._cb_input_to_messages(items)
# reasoning should be skipped, only message kept
msg_roles = [m["role"] for m in result]
self.assertNotIn("reasoning", msg_roles)
def test_developer_role_becomes_system(self):
items = [
{"type": "message", "role": "developer",
"content": [{"type": "input_text", "text": "instructions"}]},
]
result = tp._cb_input_to_messages(items)
self.assertEqual(result[0]["role"], "system")
class TestCodebuffHardDisableReasoning(unittest.TestCase):
"""Tests for _codebuff_hard_disable_reasoning (line ~1258)."""
def test_removes_reasoning_fields(self):
messages = [
{"role": "assistant", "content": "hi",
"reasoning_content": "thoughts", "thinking": "deep"},
{"role": "user", "content": "ok"},
]
tp._codebuff_hard_disable_reasoning(messages)
self.assertNotIn("reasoning_content", messages[0])
self.assertNotIn("thinking", messages[0])
self.assertIn("content", messages[0])
def test_handles_non_dict(self):
messages = ["not a dict", {"role": "user", "content": "ok"}]
tp._codebuff_hard_disable_reasoning(messages)
# Should not crash
def test_removes_all_reasoning_keys(self):
for key in ("reasoning_content", "reasoning", "thinking",
"thinking_content", "thoughts"):
msg = {"role": "assistant", "content": "hi", key: "val"}
tp._codebuff_hard_disable_reasoning([msg])
self.assertNotIn(key, msg, f"Key {key} should have been removed")
class TestIsReasoningContentError(unittest.TestCase):
"""Tests for _is_reasoning_content_error (line ~1269)."""
def test_none(self):
self.assertFalse(tp._is_reasoning_content_error(None))
def test_empty(self):
self.assertFalse(tp._is_reasoning_content_error(""))
def test_matching(self):
self.assertTrue(tp._is_reasoning_content_error("reasoning_content is invalid"))
self.assertTrue(tp._is_reasoning_content_error("thinking mode required"))
self.assertTrue(tp._is_reasoning_content_error("must be passed back"))
def test_not_matching(self):
self.assertFalse(tp._is_reasoning_content_error("rate limit exceeded"))
class TestDsRebuildToolHistory(unittest.TestCase):
"""Tests for _ds_rebuild_tool_history (line ~1298)."""
def test_empty_messages(self):
result = tp._ds_rebuild_tool_history([])
self.assertEqual(result, [])
def test_no_matching_tool_ids(self):
messages = [
{"role": "user", "content": "hi"},
{"role": "tool", "tool_call_id": "tc_unknown", "content": "output"},
]
result = tp._ds_rebuild_tool_history(messages)
self.assertEqual(len(result), 2)
class TestRouteKey(unittest.TestCase):
"""Tests for _route_key (line ~1475)."""
def test_basic(self):
route = {"name": "primary", "target_url": "https://api.example.com", "model": "gpt-4o"}
key = tp._route_key(route)
self.assertEqual(key, "primary::https://api.example.com::gpt-4o")
def test_missing_fields(self):
route = {}
key = tp._route_key(route)
self.assertEqual(key, "::::")
def test_partial_fields(self):
route = {"name": "backup"}
key = tp._route_key(route)
self.assertEqual(key, "backup::::")
class TestScoreRoute(unittest.TestCase):
"""Tests for _score_route (line ~1492)."""
def test_basic_scoring(self):
route = {"name": "r1", "priority": 10}
stats = {}
score = tp._score_route(route, stats)
self.assertEqual(score, 10)
def test_with_latency(self):
route = {"name": "r1", "priority": 10}
key = tp._route_key(route)
stats = {key: {"ewma_latency_s": 2.0, "consecutive_failures": 0}}
score = tp._score_route(stats=stats, route=route)
# priority(10) + min(ewma*5, 50)(10) + failures*20(0) = 20
self.assertEqual(score, 20)
def test_with_failures(self):
route = {"name": "r1", "priority": 10}
key = tp._route_key(route)
stats = {key: {"ewma_latency_s": 0, "consecutive_failures": 3}}
score = tp._score_route(stats=stats, route=route)
# priority(10) + 0 + 3*20 = 70
self.assertEqual(score, 70)
def test_open_circuit(self):
route = {"name": "r1", "priority": 10}
key = tp._route_key(route)
stats = {key: {"open_until_ts": time.time() + 100}}
score = tp._score_route(stats=stats, route=route)
self.assertEqual(score, 1_000_000)
def test_rate_limited(self):
route = {"name": "r1", "priority": 10}
key = tp._route_key(route)
stats = {key: {"ewma_latency_s": 0, "consecutive_failures": 0,
"rate_limited_until": time.time() + 100}}
score = tp._score_route(stats=stats, route=route)
self.assertEqual(score, 10 + 500)
class TestBucketForRoute(unittest.TestCase):
"""Tests for _bucket_for_route (line ~2164)."""
def test_creates_bucket(self):
route = {"name": "test_route"}
bucket = tp._bucket_for_route(route)
self.assertIsNotNone(bucket)
self.assertTrue(hasattr(bucket, "allow"))
def test_same_route_same_bucket(self):
route = {"name": "same_route"}
b1 = tp._bucket_for_route(route)
b2 = tp._bucket_for_route(route)
self.assertIs(b1, b2)
def test_default_name(self):
route = {"target_url": "https://example.com"}
bucket = tp._bucket_for_route(route)
self.assertIsNotNone(bucket)
class TestSanitizeToolCalls(unittest.TestCase):
"""Tests for _sanitize_tool_calls (line ~3467)."""
def test_non_exec_command_passthrough(self):
calls = [{"name": "TodoWrite", "arguments": json.dumps({"todos": []})}]
result = tp._sanitize_tool_calls(calls)
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["name"], "TodoWrite")
def test_exec_command_empty_cmd(self):
calls = [{"name": "exec_command", "arguments": json.dumps({"cmd": ""})}]
result = tp._sanitize_tool_calls(calls)
self.assertEqual(len(result), 1)
cmd = json.loads(result[0]["arguments"])["cmd"]
self.assertIn("CC-SANITIZER", cmd)
def test_exec_command_null_cmd(self):
calls = [{"name": "exec_command", "arguments": json.dumps({"cmd": "null"})}]
result = tp._sanitize_tool_calls(calls)
cmd = json.loads(result[0]["arguments"])["cmd"]
self.assertIn("CC-SANITIZER", cmd)
def test_exec_command_valid_cmd(self):
calls = [{"name": "exec_command", "arguments": json.dumps({"cmd": "echo hello"})}]
result = tp._sanitize_tool_calls(calls)
args = json.loads(result[0]["arguments"])
self.assertEqual(args["cmd"], "echo hello")
def test_double_wrapped_cmd(self):
inner = json.dumps({"cmd": "echo hello"})
calls = [{"name": "exec_command", "arguments": json.dumps({"cmd": inner})}]
result = tp._sanitize_tool_calls(calls)
args = json.loads(result[0]["arguments"])
self.assertEqual(args["cmd"], "echo hello")
def test_json_object_cmd(self):
calls = [{"name": "exec_command",
"arguments": json.dumps({"cmd": json.dumps({"cmd": "ls -la"})})}]
result = tp._sanitize_tool_calls(calls)
args = json.loads(result[0]["arguments"])
self.assertEqual(args["cmd"], "ls -la")
def test_invalid_arguments_json(self):
calls = [{"name": "exec_command", "arguments": "not json"}]
result = tp._sanitize_tool_calls(calls)
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["arguments"], "not json")
class TestBuildExploreCmd(unittest.TestCase):
"""Tests for _build_explore_cmd (line ~2830)."""
def test_empty_input(self):
cmd, desc = tp._build_explore_cmd("")
self.assertIsNone(cmd)
self.assertIsNone(desc)
def test_none_input(self):
cmd, desc = tp._build_explore_cmd(None)
self.assertIsNone(cmd)
self.assertIsNone(desc)
def test_no_url(self):
cmd, desc = tp._build_explore_cmd("just some text without urls")
self.assertIsNone(cmd)
self.assertIsNone(desc)
@patch.object(tp, "_IS_WINDOWS", True)
def test_windows_command(self):
cmd, desc = tp._build_explore_cmd("check out https://github.com/owner/repo")
self.assertIsNotNone(cmd)
self.assertIn("Invoke-WebRequest", cmd)
self.assertIn("$env:TEMP", cmd)
self.assertIn("README", desc)
@patch.object(tp, "_IS_WINDOWS", False)
def test_linux_command(self):
cmd, desc = tp._build_explore_cmd("check out https://github.com/owner/repo")
self.assertIsNotNone(cmd)
self.assertIn("curl", cmd)
self.assertIn("/tmp", cmd)
@patch.object(tp, "_IS_WINDOWS", False)
def test_git_suffix_stripped(self):
cmd, desc = tp._build_explore_cmd("https://github.com/owner/repo.git")
self.assertIsNotNone(cmd)
# The api_base should not have .git
self.assertNotIn("repo.git", cmd)
self.assertIn("repo", cmd)
@patch.object(tp, "_IS_WINDOWS", False)
def test_url_from_json_list(self):
text = json.dumps([{"content": "see https://gitea.com/user/proj for details"}])
cmd, desc = tp._build_explore_cmd(text)
self.assertIsNotNone(cmd)
self.assertIn("gitea.com", cmd)
@patch.object(tp, "_IS_WINDOWS", False)
def test_api_v1_url_passthrough(self):
cmd, desc = tp._build_explore_cmd("https://gitea.com/api/v1/repos/user/proj")
self.assertIsNotNone(cmd)
self.assertIn("api/v1/repos", cmd)
class TestCompactInput(unittest.TestCase):
"""Tests for _compact_input (line ~1657)."""
def test_string_passthrough(self):
self.assertEqual(tp._compact_input("hello"), "hello")
def test_small_list_passthrough(self):
items = [{"type": "message", "role": "user", "content": "hi"}]
result = tp._compact_input(items)
self.assertEqual(result, items)
def test_truncates_large_tool_output(self):
long_output = "x" * 10000
items = [{"type": "function_call_output", "output": long_output}]
result = tp._compact_input(items)
self.assertEqual(len(result), 1)
self.assertIn("truncated", result[0]["output"])
self.assertLess(len(result[0]["output"]), 10000)
class TestAdaptiveCompact(unittest.TestCase):
"""Tests for _adaptive_compact (line ~1820)."""
def test_within_budget(self):
items = [{"type": "message", "role": "user", "content": "hi"}]
result, changed = tp._adaptive_compact(items, "gpt-4o")
self.assertFalse(changed)
self.assertEqual(result, items)
def test_string_passthrough(self):
result, changed = tp._adaptive_compact("hello", "gpt-4o")
self.assertFalse(changed)
self.assertEqual(result, "hello")
class TestUpstreamTarget(unittest.TestCase):
"""Tests for upstream_target (line ~1409)."""
def test_no_trailing_slash(self):
self.assertEqual(tp.upstream_target("https://api.example.com", "/v1/chat"),
"https://api.example.com/v1/chat")
def test_trailing_slash(self):
self.assertEqual(tp.upstream_target("https://api.example.com/", "/v1/chat"),
"https://api.example.com/v1/chat")
def test_suffix_already_present(self):
self.assertEqual(tp.upstream_target("https://api.example.com/v1/chat", "/v1/chat"),
"https://api.example.com/v1/chat")
class TestCcInputToMessages(unittest.TestCase):
"""Tests for cc_input_to_messages (line ~2334)."""
def test_string_input(self):
result = tp.cc_input_to_messages("hello")
self.assertEqual(len(result), 1)
self.assertEqual(result[0], {"role": "user", "content": "hello"})
def test_with_instructions(self):
result = tp.cc_input_to_messages("hello", instructions="be helpful")
self.assertEqual(len(result), 2)
self.assertEqual(result[0]["role"], "user")
self.assertEqual(result[0]["content"], "be helpful")
def test_message_items(self):
items = [
{"type": "message", "role": "user", "content": [{"type": "input_text", "text": "hi"}]},
]
result = tp.cc_input_to_messages(items)
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["content"], "hi")
def test_function_call_as_text(self):
items = [
{"type": "function_call", "call_id": "c1", "name": "bash",
"arguments": json.dumps({"cmd": "ls"})},
]
result = tp.cc_input_to_messages(items)
# Tool calls should be inline JSON text in assistant message
self.assertTrue(any(m["role"] == "assistant" for m in result))
def test_function_call_output_as_user(self):
items = [
{"type": "function_call_output", "call_id": "c1", "output": "result text"},
]
result = tp.cc_input_to_messages(items)
self.assertTrue(any(m["role"] == "user" and "result text" in m.get("content", "") for m in result))
def test_non_dict_items_skipped(self):
items = ["not a dict", 42]
result = tp.cc_input_to_messages(items)
self.assertEqual(len(result), 0)
def test_non_list_non_string_returns_empty(self):
result = tp.cc_input_to_messages(42)
self.assertEqual(result, [])
def test_role_normalization(self):
items = [
{"type": "message", "role": "system", "content": "sys"},
]
result = tp.cc_input_to_messages(items)
# system role should become user
self.assertEqual(result[0]["role"], "user")
if __name__ == "__main__":
unittest.main()