""" Unit tests for translate-proxy.py Covers pure utility functions that can be tested without a running server. Uses only stdlib unittest + unittest.mock (zero pip dependencies). """ import json import sys import time import unittest from unittest.mock import patch, MagicMock # --------------------------------------------------------------------------- # Import the module under test. # The source file has a dash in its name so we use importlib. # --------------------------------------------------------------------------- import importlib _spec = importlib.util.spec_from_file_location( "translate_proxy", r"C:\dev\Codex-Launcher---Any-AI-Porovider\src\translate-proxy.py", ) tp = importlib.util.module_from_spec(_spec) _spec.loader.exec_module(tp) # =================================================================== # Helpers # =================================================================== def _make_chat_resp(content="Hello", tool_calls=None, finish_reason="stop", model="test-model", prompt_tokens=10, completion_tokens=5): """Build a minimal OpenAI chat completion response dict.""" msg = {"role": "assistant", "content": content} if tool_calls: msg["tool_calls"] = tool_calls return { "choices": [{"message": msg, "finish_reason": finish_reason}], "usage": {"prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": prompt_tokens + completion_tokens}, } # =================================================================== # Tests grouped by functionality # =================================================================== class TestUnwrapCmd(unittest.TestCase): """Tests for _unwrap_cmd (line ~2810).""" def test_non_string_passthrough(self): self.assertIsNone(tp._unwrap_cmd(None)) self.assertEqual(tp._unwrap_cmd(42), 42) def test_plain_string_not_json(self): self.assertEqual(tp._unwrap_cmd("echo hello"), "echo hello") def test_single_wrap(self): inner = json.dumps({"cmd": "echo hello"}) self.assertEqual(tp._unwrap_cmd(inner), "echo hello") def test_double_wrap(self): inner = json.dumps({"cmd": "echo hello"}) outer = json.dumps({"cmd": inner}) self.assertEqual(tp._unwrap_cmd(outer), "echo hello") def test_triple_wrap(self): inner = json.dumps({"cmd": "echo hello"}) mid = json.dumps({"cmd": inner}) outer = json.dumps({"cmd": mid}) self.assertEqual(tp._unwrap_cmd(outer), "echo hello") def test_json_without_cmd_key(self): val = json.dumps({"foo": "bar"}) self.assertEqual(tp._unwrap_cmd(val), val) def test_cmd_value_not_string(self): val = json.dumps({"cmd": 123}) self.assertEqual(tp._unwrap_cmd(val), val) def test_empty_string(self): self.assertEqual(tp._unwrap_cmd(""), "") class TestStripXmlishTags(unittest.TestCase): """Tests for _strip_xmlish_tags (line ~2807).""" def test_none(self): self.assertEqual(tp._strip_xmlish_tags(None), "") def test_empty_string(self): self.assertEqual(tp._strip_xmlish_tags(""), "") def test_no_tags(self): self.assertEqual(tp._strip_xmlish_tags("hello world"), "hello world") def test_basic_tags(self): self.assertEqual(tp._strip_xmlish_tags("hello world"), "hello world") def test_self_closing(self): self.assertEqual(tp._strip_xmlish_tags("before
after"), "before after") def test_multiple_tags(self): self.assertEqual( tp._strip_xmlish_tags("

one

two"), "one two", ) class TestExtractXmlToolCalls(unittest.TestCase): """Tests for _extract_xml_tool_calls.""" def test_empty(self): self.assertEqual(tp._extract_xml_tool_calls(""), []) self.assertEqual(tp._extract_xml_tool_calls(None), []) def test_no_calls(self): self.assertEqual(tp._extract_xml_tool_calls("just plain text"), []) def test_single_tool_call(self): # Regex: (\w+)(.*?) # Format: NAME>CONTENT text = 'bash>echo hi' results = tp._extract_xml_tool_calls(text) self.assertEqual(len(results), 1) self.assertEqual(results[0]["name"], "bash") self.assertIn("call_id", results[0]) self.assertTrue(results[0]["call_id"].startswith("xml_")) def test_multiple_tool_calls(self): text = ( 'bash>echo hi' 'edit>test.py' ) results = tp._extract_xml_tool_calls(text) self.assertEqual(len(results), 2) self.assertEqual(results[0]["name"], "bash") self.assertEqual(results[1]["name"], "edit") def test_json_args(self): text = 'tool>{"key": "value"}' results = tp._extract_xml_tool_calls(text) self.assertEqual(len(results), 1) self.assertEqual(results[0]["name"], "tool") args = json.loads(results[0]["args"]) # JSON parsing of XML content may vary - just check result exists self.assertIn("args", results[0]) def test_code_fenced_args(self): text = 'tool>{"a": 1}' results = tp._extract_xml_tool_calls(text) self.assertEqual(len(results), 1) class TestNormalizeToolArgs(unittest.TestCase): """Tests for _normalize_tool_args (line ~2196).""" def test_empty(self): self.assertEqual(tp._normalize_tool_args(""), "") self.assertIsNone(tp._normalize_tool_args(None)) self.assertEqual(tp._normalize_tool_args("{}"), "{}") def test_valid_json_passthrough(self): raw = json.dumps({"cmd": "echo hello"}) self.assertEqual(tp._normalize_tool_args(raw), raw) def test_wrapped_arguments_uppercase_key(self): inner = json.dumps({"cmd": "echo hello"}) raw = json.dumps({"Arguments": inner}) result = tp._normalize_tool_args(raw) self.assertEqual(json.loads(result), {"cmd": "echo hello"}) def test_wrapped_arguments_with_code_fence(self): inner = json.dumps({"cmd": "echo hello"}) raw = json.dumps({"Arguments": f"```json\n{inner}\n```"}) result = tp._normalize_tool_args(raw) self.assertEqual(json.loads(result), {"cmd": "echo hello"}) def test_invalid_json_passthrough(self): raw = "not json at all" self.assertEqual(tp._normalize_tool_args(raw), raw) class TestEmit(unittest.TestCase): """Tests for emit (line ~1406).""" def test_basic_event(self): result = tp.emit("test.event", {"key": "value"}) self.assertTrue(result.startswith("event: test.event\n")) self.assertIn('"key": "value"', result) self.assertTrue(result.endswith("\n\n")) def test_event_data_format(self): data = {"type": "response.created", "id": "123"} result = tp.emit("response.created", data) lines = result.split("\n") self.assertEqual(lines[0], "event: response.created") self.assertTrue(lines[1].startswith("data: ")) parsed = json.loads(lines[1][6:]) self.assertEqual(parsed["type"], "response.created") class TestUid(unittest.TestCase): """Tests for uid (line ~1403).""" def test_default_prefix(self): result = tp.uid() self.assertTrue(result.startswith("id-")) def test_custom_prefix(self): result = tp.uid("msg") self.assertTrue(result.startswith("msg-")) def test_uniqueness(self): ids = {tp.uid() for _ in range(100)} self.assertEqual(len(ids), 100) class TestRedact(unittest.TestCase): """Tests for _redact / _redact_json (line ~2065/2073).""" def test_none(self): self.assertIsNone(tp._redact(None)) def test_empty_string(self): self.assertEqual(tp._redact(""), "") def test_no_secrets(self): self.assertEqual(tp._redact("hello world"), "hello world") def test_sk_key(self): text = "key=sk-abc123def456ghi789jkl012mno" self.assertIn("[REDACTED:key]", tp._redact(text)) self.assertNotIn("sk-abc123", tp._redact(text)) def test_sk_ant_key(self): # Note: sk-ant-... is matched by the first sk- pattern (sk-[A-Za-z0-9_\-]{20,}) # which redacts it as [REDACTED:key] since it runs first. text = "key=sk-ant-api03-abcdefghijklmnopqrstuv" redacted = tp._redact(text) self.assertIn("[REDACTED:", redacted) self.assertNotIn("sk-ant-api03", redacted) def test_github_token(self): text = "token=ghp_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghij" self.assertIn("[REDACTED:github]", tp._redact(text)) def test_bearer_token(self): text = "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.verylongpayload" self.assertIn("Bearer [REDACTED]", tp._redact(text)) def test_redact_json_dict(self): obj = {"api_key": "sk-abc123def456ghi789jkl012mno345pqr"} result = tp._redact_json(obj) self.assertIn("[REDACTED:key]", result) self.assertNotIn("sk-abc123", result) class TestEstimateTokens(unittest.TestCase): """Tests for _estimate_tokens (line ~1809).""" def test_none(self): self.assertEqual(tp._estimate_tokens(None), 0) def test_string(self): # "abcdefghij" is 10 chars => max(1, 10//4) = 2 self.assertEqual(tp._estimate_tokens("abcdefghij"), 2) def test_short_string(self): # "ab" is 2 chars => max(1, 2//4) = max(1, 0) = 1 self.assertEqual(tp._estimate_tokens("ab"), 1) def test_dict(self): obj = {"key": "value"} result = tp._estimate_tokens(obj) self.assertGreater(result, 0) def test_list(self): result = tp._estimate_tokens([1, 2, 3]) self.assertGreater(result, 0) class TestItemSummary(unittest.TestCase): """Tests for _item_summary (line ~1614).""" def test_message(self): item = {"type": "message", "role": "user", "content": [{"type": "input_text", "text": "hello"}]} result = tp._item_summary(item) self.assertIn("[user]", result) self.assertIn("hello", result) def test_function_call(self): item = {"type": "function_call", "name": "exec_command", "arguments": json.dumps({"cmd": "ls -la"})} result = tp._item_summary(item) self.assertIn("[tool call]", result) self.assertIn("exec_command", result) self.assertIn("ls -la", result) def test_function_call_output(self): item = {"type": "function_call_output", "output": "file1.txt\nfile2.txt"} result = tp._item_summary(item) self.assertIn("[tool result]", result) self.assertIn("file1.txt", result) def test_function_call_output_truncated(self): long_output = "x" * 300 item = {"type": "function_call_output", "output": long_output} result = tp._item_summary(item, max_len=100) self.assertIn("...", result) self.assertTrue(len(result) < 300) def test_unknown_type(self): item = {"type": "custom_type"} result = tp._item_summary(item) self.assertEqual(result, "[custom_type]") class TestContextLimitForModel(unittest.TestCase): """Tests for _context_limit_for_model (line ~1800).""" def test_none_model(self): self.assertEqual(tp._context_limit_for_model(None), 32768) def test_empty_model(self): self.assertEqual(tp._context_limit_for_model(""), 32768) def test_gpt4o(self): self.assertEqual(tp._context_limit_for_model("gpt-4o"), 128000) def test_claude_sonnet(self): self.assertEqual(tp._context_limit_for_model("claude-sonnet-4-6"), 200000) def test_gemini_flash(self): self.assertEqual(tp._context_limit_for_model("gemini-2.5-flash"), 1000000) def test_case_insensitive(self): self.assertEqual(tp._context_limit_for_model("GPT-4O"), 128000) def test_unknown_model(self): self.assertEqual(tp._context_limit_for_model("future-model-x"), 32768) def test_deepseek(self): self.assertEqual(tp._context_limit_for_model("deepseek-chat"), 64000) class TestClassifyAntigravityError(unittest.TestCase): """Tests for _classify_antigravity_error (line ~632).""" def test_400(self): self.assertEqual(tp._classify_antigravity_error(400, "bad input"), "bad_request") def test_401_transient(self): self.assertEqual(tp._classify_antigravity_error(401, "access denied"), "auth_transient") def test_401_permanent_invalid_grant(self): self.assertEqual(tp._classify_antigravity_error(401, "invalid_grant token"), "auth_permanent") def test_401_permanent_revoked(self): self.assertEqual(tp._classify_antigravity_error(401, "token revoked"), "auth_permanent") def test_403_validation_required(self): self.assertEqual(tp._classify_antigravity_error(403, "validation_required"), "validation_required") def test_403_account_banned(self): self.assertEqual( tp._classify_antigravity_error(403, "has been disabled for violation of terms of service"), "account_banned", ) def test_403_service_disabled(self): self.assertEqual(tp._classify_antigravity_error(403, "service_disabled"), "service_disabled") def test_403_generic(self): self.assertEqual(tp._classify_antigravity_error(403, "some other error"), "forbidden") def test_429_capacity(self): self.assertEqual( tp._classify_antigravity_error(429, "model_capacity_exhausted"), "capacity_exhausted", ) def test_429_quota(self): self.assertEqual( tp._classify_antigravity_error(429, "quota_exhausted"), "quota_exhausted", ) def test_429_generic(self): self.assertEqual(tp._classify_antigravity_error(429, "slow down"), "rate_limited") def test_503_capacity(self): self.assertEqual( tp._classify_antigravity_error(503, "service temporarily unavailable"), "capacity_exhausted", ) def test_500(self): self.assertEqual(tp._classify_antigravity_error(500, "internal error"), "server_error") def test_418(self): self.assertEqual(tp._classify_antigravity_error(418, "I'm a teapot"), "unknown") class TestParseRateLimitReset(unittest.TestCase): """Tests for _parse_rate_limit_reset (line ~658).""" def test_quota_reset_delay_ms(self): result = tp._parse_rate_limit_reset('quotaResetDelay: 5000ms') self.assertEqual(result, 5.0) def test_quota_reset_delay_seconds(self): result = tp._parse_rate_limit_reset('quotaResetDelay: 30s') self.assertEqual(result, 30.0) def test_hms_format(self): result = tp._parse_rate_limit_reset('1h30m0s') self.assertEqual(result, 5400) def test_resets_in_format(self): result = tp._parse_rate_limit_reset('Resets in ~2h0m') self.assertEqual(result, 7200) def test_retry_after_seconds(self): result = tp._parse_rate_limit_reset('retry-after: 60 seconds') self.assertEqual(result, 60) def test_no_match(self): self.assertIsNone(tp._parse_rate_limit_reset('no timing info here')) def test_empty_string(self): self.assertIsNone(tp._parse_rate_limit_reset('')) class TestExtractText(unittest.TestCase): """Tests for _extract_text (line ~4295).""" def test_string(self): self.assertEqual(tp._extract_text("hello"), "hello") def test_none(self): self.assertEqual(tp._extract_text(None), "") def test_number(self): self.assertEqual(tp._extract_text(42), "") def test_content_blocks(self): content = [ {"type": "input_text", "text": "hello "}, {"type": "output_text", "text": "world"}, ] self.assertEqual(tp._extract_text(content), "hello world") def test_mixed_content(self): content = [ "plain text ", {"type": "text", "text": "and dict"}, ] self.assertEqual(tp._extract_text(content), "plain text and dict") def test_empty_list(self): self.assertEqual(tp._extract_text([]), "") class TestValidateToolPairs(unittest.TestCase): """Tests for validate_tool_pairs (line ~1984).""" def test_not_list(self): self.assertEqual(tp.validate_tool_pairs("not a list"), []) self.assertEqual(tp.validate_tool_pairs(None), []) def test_empty_list(self): self.assertEqual(tp.validate_tool_pairs([]), []) def test_valid_pairs(self): items = [ {"type": "function_call", "call_id": "c1", "name": "tool1"}, {"type": "function_call_output", "call_id": "c1", "output": "ok"}, ] self.assertEqual(tp.validate_tool_pairs(items), []) def test_orphan_output(self): items = [ {"type": "function_call_output", "call_id": "missing", "output": "ok"}, ] errors = tp.validate_tool_pairs(items) self.assertEqual(len(errors), 1) self.assertEqual(errors[0]["error"], "orphan_function_call_output") self.assertEqual(errors[0]["call_id"], "missing") def test_orphan_no_call_id(self): items = [ {"type": "function_call_output", "output": "ok"}, ] errors = tp.validate_tool_pairs(items) self.assertEqual(len(errors), 1) def test_mixed_valid_and_orphan(self): items = [ {"type": "function_call", "call_id": "c1"}, {"type": "function_call_output", "call_id": "c1"}, {"type": "function_call_output", "call_id": "c_orphan"}, ] errors = tp.validate_tool_pairs(items) self.assertEqual(len(errors), 1) self.assertEqual(errors[0]["call_id"], "c_orphan") class TestRepairOrphanToolOutputs(unittest.TestCase): """Tests for repair_orphan_tool_outputs (line ~2001).""" def test_repair(self): items = [ {"type": "message", "role": "user", "content": "hi"}, {"type": "function_call_output", "output": "orphan result"}, ] errors = [{"index": 1, "call_id": None, "error": "orphan_function_call_output"}] repaired = tp.repair_orphan_tool_outputs(items, errors) self.assertEqual(len(repaired), 2) self.assertEqual(repaired[0]["type"], "message") # kept self.assertEqual(repaired[1]["type"], "message") # converted self.assertIn("unmatched tool output", repaired[1]["content"][0]["text"]) def test_no_errors(self): items = [{"type": "message", "role": "user", "content": "hi"}] repaired = tp.repair_orphan_tool_outputs(items, []) self.assertEqual(repaired, items) class TestHasFunctionCallOutput(unittest.TestCase): """Tests for has_function_call_output (line ~2051).""" def test_not_list(self): self.assertFalse(tp.has_function_call_output("string")) self.assertFalse(tp.has_function_call_output(None)) def test_has_output(self): items = [{"type": "function_call_output", "output": "ok"}] self.assertTrue(tp.has_function_call_output(items)) def test_no_output(self): items = [{"type": "message", "role": "user", "content": "hi"}] self.assertFalse(tp.has_function_call_output(items)) class TestSynthesizeToolResults(unittest.TestCase): """Tests for synthesize_tool_results_for_chat (line ~2014).""" def test_not_list(self): result, changed = tp.synthesize_tool_results_for_chat("string") self.assertEqual(result, "string") self.assertFalse(changed) def test_empty_list(self): result, changed = tp.synthesize_tool_results_for_chat([]) self.assertEqual(result, []) self.assertFalse(changed) def test_synthesis(self): items = [ {"type": "function_call", "call_id": "c1", "name": "bash", "arguments": json.dumps({"cmd": "ls"})}, {"type": "function_call_output", "call_id": "c1", "output": "file.txt"}, {"type": "message", "role": "user", "content": [{"type": "input_text", "text": "hi"}]}, ] result, changed = tp.synthesize_tool_results_for_chat(items) self.assertTrue(changed) # Should have synthesized message + original message self.assertEqual(len(result), 2) # The synthesized message should contain tool info synth_msg = result[0] self.assertEqual(synth_msg["role"], "user") synth_text = synth_msg["content"][0]["text"] self.assertIn("Tool execution result", synth_text) self.assertIn("bash", synth_text) self.assertIn("file.txt", synth_text) def test_orphan_output(self): items = [ {"type": "function_call_output", "call_id": "missing", "output": "orphan"}, ] result, changed = tp.synthesize_tool_results_for_chat(items) self.assertTrue(changed) self.assertEqual(result[0]["role"], "user") class TestOaConvertTools(unittest.TestCase): """Tests for oa_convert_tools (line ~2417).""" def test_none_input(self): self.assertIsNone(tp.oa_convert_tools(None)) def test_empty_list(self): self.assertIsNone(tp.oa_convert_tools([])) def test_function_tools(self): tools = [ {"type": "function", "function": {"name": "bash", "description": "Run a command", "parameters": {"type": "object"}}}, ] result = tp.oa_convert_tools(tools) self.assertEqual(len(result), 1) self.assertEqual(result[0]["function"]["name"], "bash") def test_non_function_filtered(self): tools = [ {"type": "web_search"}, {"type": "function", "function": {"name": "bash", "description": "", "parameters": {}}}, ] result = tp.oa_convert_tools(tools) self.assertEqual(len(result), 1) def test_empty_name_filtered(self): tools = [{"type": "function", "function": {"name": "", "description": "", "parameters": {}}}] self.assertIsNone(tp.oa_convert_tools(tools)) def test_null_name_filtered(self): tools = [{"type": "function", "function": {"name": "null", "description": "", "parameters": {}}}] self.assertIsNone(tp.oa_convert_tools(tools)) def test_function_without_fn_key(self): tools = [{"type": "function", "name": "my_tool", "description": "desc", "parameters": {}}] result = tp.oa_convert_tools(tools) self.assertEqual(len(result), 1) self.assertEqual(result[0]["function"]["name"], "my_tool") def test_strict_mode(self): tools = [{"type": "function", "function": {"name": "t", "description": "", "parameters": {}}}] result = tp.oa_convert_tools(tools, strict=True) self.assertTrue(result[0]["function"]["strict"]) class TestOaRespToResponses(unittest.TestCase): """Tests for oa_resp_to_responses (line ~2448).""" def test_simple_text(self): chat_resp = _make_chat_resp(content="Hello world") result = tp.oa_resp_to_responses(chat_resp, "test-model", resp_id="r1") self.assertEqual(result["id"], "r1") self.assertEqual(result["model"], "test-model") self.assertEqual(result["status"], "completed") self.assertEqual(len(result["output"]), 1) self.assertEqual(result["output"][0]["type"], "message") text_content = result["output"][0]["content"][0]["text"] self.assertEqual(text_content, "Hello world") def test_tool_calls(self): tc = {"id": "tc1", "type": "function", "function": {"name": "bash", "arguments": '{"cmd":"ls"}'}} chat_resp = _make_chat_resp(content="", tool_calls=[tc], finish_reason="tool_calls") result = tp.oa_resp_to_responses(chat_resp, "test-model") self.assertEqual(result["status"], "completed") # Should have one function_call output fc_outputs = [o for o in result["output"] if o["type"] == "function_call"] self.assertEqual(len(fc_outputs), 1) self.assertEqual(fc_outputs[0]["name"], "bash") def test_finish_reason_length(self): chat_resp = _make_chat_resp(finish_reason="length") result = tp.oa_resp_to_responses(chat_resp, "m") self.assertEqual(result["status"], "incomplete") def test_finish_reason_content_filter(self): chat_resp = _make_chat_resp(finish_reason="content_filter") result = tp.oa_resp_to_responses(chat_resp, "m") self.assertEqual(result["status"], "incomplete") def test_usage(self): chat_resp = _make_chat_resp(prompt_tokens=100, completion_tokens=50) result = tp.oa_resp_to_responses(chat_resp, "m") self.assertEqual(result["usage"]["input_tokens"], 100) self.assertEqual(result["usage"]["output_tokens"], 50) self.assertEqual(result["usage"]["total_tokens"], 150) def test_content_and_tool_calls(self): tc = {"id": "tc1", "type": "function", "function": {"name": "bash", "arguments": '{}'}} chat_resp = _make_chat_resp(content="thinking...", tool_calls=[tc]) result = tp.oa_resp_to_responses(chat_resp, "m") types = [o["type"] for o in result["output"]] self.assertIn("message", types) self.assertIn("function_call", types) class TestOaInputToMessages(unittest.TestCase): """Tests for oa_input_to_messages (line ~2265).""" def test_string_input(self): result = tp.oa_input_to_messages("hello") self.assertEqual(len(result), 1) self.assertEqual(result[0]["role"], "user") self.assertEqual(result[0]["content"], "hello") def test_message_input(self): items = [ {"type": "message", "role": "user", "content": [{"type": "input_text", "text": "hello"}]}, ] result = tp.oa_input_to_messages(items) self.assertEqual(len(result), 1) self.assertEqual(result[0]["content"], "hello") def test_developer_role_becomes_system(self): items = [ {"type": "message", "role": "developer", "content": [{"type": "input_text", "text": "instructions"}]}, ] result = tp.oa_input_to_messages(items) self.assertEqual(result[0]["role"], "system") def test_function_call_and_output(self): items = [ {"type": "function_call", "call_id": "c1", "name": "bash", "arguments": json.dumps({"cmd": "ls"})}, {"type": "function_call_output", "call_id": "c1", "output": "file.txt"}, ] result = tp.oa_input_to_messages(items) # Should produce: assistant with tool_calls + tool message self.assertEqual(len(result), 2) self.assertEqual(result[0]["role"], "assistant") self.assertIsNotNone(result[0]["tool_calls"]) self.assertEqual(result[1]["role"], "tool") self.assertEqual(result[1]["tool_call_id"], "c1") def test_string_content(self): items = [ {"type": "message", "role": "user", "content": "plain text"}, ] result = tp.oa_input_to_messages(items) self.assertEqual(result[0]["content"], "plain text") def test_reasoning_content(self): items = [ {"type": "message", "role": "assistant", "content": [ {"type": "reasoning", "content": [{"text": "thinking..."}]}, {"type": "output_text", "text": "result"}, ]}, ] result = tp.oa_input_to_messages(items) self.assertEqual(len(result), 1) self.assertEqual(result[0]["content"], "result") self.assertIn("reasoning_content", result[0]) self.assertEqual(result[0]["reasoning_content"], "thinking...") class TestFbStripReasoningFromMessages(unittest.TestCase): """Tests for _fb_strip_reasoning_from_messages (line ~1383).""" def test_strips_reasoning(self): messages = [ {"role": "assistant", "content": "hi", "reasoning_content": "thoughts"}, {"role": "user", "content": "ok"}, ] result = tp._fb_strip_reasoning_from_messages(messages) self.assertNotIn("reasoning_content", result[0]) self.assertEqual(result[0]["content"], "hi") self.assertEqual(result[1]["content"], "ok") def test_no_reasoning(self): messages = [{"role": "user", "content": "hi"}] result = tp._fb_strip_reasoning_from_messages(messages) self.assertEqual(result, messages) def test_empty_list(self): self.assertEqual(tp._fb_strip_reasoning_from_messages([]), []) class TestCbInputToMessages(unittest.TestCase): """Tests for _cb_input_to_messages (line ~1321).""" def test_string_input(self): result = tp._cb_input_to_messages("hello") self.assertEqual(len(result), 1) self.assertEqual(result[0]["role"], "user") self.assertEqual(result[0]["content"], "hello") def test_with_instructions(self): result = tp._cb_input_to_messages("hello", instructions="be helpful") self.assertEqual(len(result), 2) self.assertEqual(result[0]["role"], "system") self.assertEqual(result[0]["content"], "be helpful") def test_function_call_and_output(self): items = [ {"type": "function_call", "call_id": "c1", "name": "bash", "arguments": json.dumps({"cmd": "ls"})}, {"type": "function_call_output", "call_id": "c1", "output": "file.txt"}, ] result = tp._cb_input_to_messages(items) self.assertGreaterEqual(len(result), 2) def test_reasoning_items_skipped(self): items = [ {"type": "reasoning", "content": [{"text": "thinking"}]}, {"type": "message", "role": "user", "content": [{"type": "input_text", "text": "hi"}]}, ] result = tp._cb_input_to_messages(items) # reasoning should be skipped, only message kept msg_roles = [m["role"] for m in result] self.assertNotIn("reasoning", msg_roles) def test_developer_role_becomes_system(self): items = [ {"type": "message", "role": "developer", "content": [{"type": "input_text", "text": "instructions"}]}, ] result = tp._cb_input_to_messages(items) self.assertEqual(result[0]["role"], "system") class TestCodebuffHardDisableReasoning(unittest.TestCase): """Tests for _codebuff_hard_disable_reasoning (line ~1258).""" def test_removes_reasoning_fields(self): messages = [ {"role": "assistant", "content": "hi", "reasoning_content": "thoughts", "thinking": "deep"}, {"role": "user", "content": "ok"}, ] tp._codebuff_hard_disable_reasoning(messages) self.assertNotIn("reasoning_content", messages[0]) self.assertNotIn("thinking", messages[0]) self.assertIn("content", messages[0]) def test_handles_non_dict(self): messages = ["not a dict", {"role": "user", "content": "ok"}] tp._codebuff_hard_disable_reasoning(messages) # Should not crash def test_removes_all_reasoning_keys(self): for key in ("reasoning_content", "reasoning", "thinking", "thinking_content", "thoughts"): msg = {"role": "assistant", "content": "hi", key: "val"} tp._codebuff_hard_disable_reasoning([msg]) self.assertNotIn(key, msg, f"Key {key} should have been removed") class TestIsReasoningContentError(unittest.TestCase): """Tests for _is_reasoning_content_error (line ~1269).""" def test_none(self): self.assertFalse(tp._is_reasoning_content_error(None)) def test_empty(self): self.assertFalse(tp._is_reasoning_content_error("")) def test_matching(self): self.assertTrue(tp._is_reasoning_content_error("reasoning_content is invalid")) self.assertTrue(tp._is_reasoning_content_error("thinking mode required")) self.assertTrue(tp._is_reasoning_content_error("must be passed back")) def test_not_matching(self): self.assertFalse(tp._is_reasoning_content_error("rate limit exceeded")) class TestDsRebuildToolHistory(unittest.TestCase): """Tests for _ds_rebuild_tool_history (line ~1298).""" def test_empty_messages(self): result = tp._ds_rebuild_tool_history([]) self.assertEqual(result, []) def test_no_matching_tool_ids(self): messages = [ {"role": "user", "content": "hi"}, {"role": "tool", "tool_call_id": "tc_unknown", "content": "output"}, ] result = tp._ds_rebuild_tool_history(messages) self.assertEqual(len(result), 2) class TestRouteKey(unittest.TestCase): """Tests for _route_key (line ~1475).""" def test_basic(self): route = {"name": "primary", "target_url": "https://api.example.com", "model": "gpt-4o"} key = tp._route_key(route) self.assertEqual(key, "primary::https://api.example.com::gpt-4o") def test_missing_fields(self): route = {} key = tp._route_key(route) self.assertEqual(key, "::::") def test_partial_fields(self): route = {"name": "backup"} key = tp._route_key(route) self.assertEqual(key, "backup::::") class TestScoreRoute(unittest.TestCase): """Tests for _score_route (line ~1492).""" def test_basic_scoring(self): route = {"name": "r1", "priority": 10} stats = {} score = tp._score_route(route, stats) self.assertEqual(score, 10) def test_with_latency(self): route = {"name": "r1", "priority": 10} key = tp._route_key(route) stats = {key: {"ewma_latency_s": 2.0, "consecutive_failures": 0}} score = tp._score_route(stats=stats, route=route) # priority(10) + min(ewma*5, 50)(10) + failures*20(0) = 20 self.assertEqual(score, 20) def test_with_failures(self): route = {"name": "r1", "priority": 10} key = tp._route_key(route) stats = {key: {"ewma_latency_s": 0, "consecutive_failures": 3}} score = tp._score_route(stats=stats, route=route) # priority(10) + 0 + 3*20 = 70 self.assertEqual(score, 70) def test_open_circuit(self): route = {"name": "r1", "priority": 10} key = tp._route_key(route) stats = {key: {"open_until_ts": time.time() + 100}} score = tp._score_route(stats=stats, route=route) self.assertEqual(score, 1_000_000) def test_rate_limited(self): route = {"name": "r1", "priority": 10} key = tp._route_key(route) stats = {key: {"ewma_latency_s": 0, "consecutive_failures": 0, "rate_limited_until": time.time() + 100}} score = tp._score_route(stats=stats, route=route) self.assertEqual(score, 10 + 500) class TestBucketForRoute(unittest.TestCase): """Tests for _bucket_for_route (line ~2164).""" def test_creates_bucket(self): route = {"name": "test_route"} bucket = tp._bucket_for_route(route) self.assertIsNotNone(bucket) self.assertTrue(hasattr(bucket, "allow")) def test_same_route_same_bucket(self): route = {"name": "same_route"} b1 = tp._bucket_for_route(route) b2 = tp._bucket_for_route(route) self.assertIs(b1, b2) def test_default_name(self): route = {"target_url": "https://example.com"} bucket = tp._bucket_for_route(route) self.assertIsNotNone(bucket) class TestSanitizeToolCalls(unittest.TestCase): """Tests for _sanitize_tool_calls (line ~3467).""" def test_non_exec_command_passthrough(self): calls = [{"name": "TodoWrite", "arguments": json.dumps({"todos": []})}] result = tp._sanitize_tool_calls(calls) self.assertEqual(len(result), 1) self.assertEqual(result[0]["name"], "TodoWrite") def test_exec_command_empty_cmd(self): calls = [{"name": "exec_command", "arguments": json.dumps({"cmd": ""})}] result = tp._sanitize_tool_calls(calls) self.assertEqual(len(result), 1) cmd = json.loads(result[0]["arguments"])["cmd"] self.assertIn("CC-SANITIZER", cmd) def test_exec_command_null_cmd(self): calls = [{"name": "exec_command", "arguments": json.dumps({"cmd": "null"})}] result = tp._sanitize_tool_calls(calls) cmd = json.loads(result[0]["arguments"])["cmd"] self.assertIn("CC-SANITIZER", cmd) def test_exec_command_valid_cmd(self): calls = [{"name": "exec_command", "arguments": json.dumps({"cmd": "echo hello"})}] result = tp._sanitize_tool_calls(calls) args = json.loads(result[0]["arguments"]) self.assertEqual(args["cmd"], "echo hello") def test_double_wrapped_cmd(self): inner = json.dumps({"cmd": "echo hello"}) calls = [{"name": "exec_command", "arguments": json.dumps({"cmd": inner})}] result = tp._sanitize_tool_calls(calls) args = json.loads(result[0]["arguments"]) self.assertEqual(args["cmd"], "echo hello") def test_json_object_cmd(self): calls = [{"name": "exec_command", "arguments": json.dumps({"cmd": json.dumps({"cmd": "ls -la"})})}] result = tp._sanitize_tool_calls(calls) args = json.loads(result[0]["arguments"]) self.assertEqual(args["cmd"], "ls -la") def test_invalid_arguments_json(self): calls = [{"name": "exec_command", "arguments": "not json"}] result = tp._sanitize_tool_calls(calls) self.assertEqual(len(result), 1) self.assertEqual(result[0]["arguments"], "not json") class TestBuildExploreCmd(unittest.TestCase): """Tests for _build_explore_cmd (line ~2830).""" def test_empty_input(self): cmd, desc = tp._build_explore_cmd("") self.assertIsNone(cmd) self.assertIsNone(desc) def test_none_input(self): cmd, desc = tp._build_explore_cmd(None) self.assertIsNone(cmd) self.assertIsNone(desc) def test_no_url(self): cmd, desc = tp._build_explore_cmd("just some text without urls") self.assertIsNone(cmd) self.assertIsNone(desc) @patch.object(tp, "_IS_WINDOWS", True) def test_windows_command(self): cmd, desc = tp._build_explore_cmd("check out https://github.com/owner/repo") self.assertIsNotNone(cmd) self.assertIn("Invoke-WebRequest", cmd) self.assertIn("$env:TEMP", cmd) self.assertIn("README", desc) @patch.object(tp, "_IS_WINDOWS", False) def test_linux_command(self): cmd, desc = tp._build_explore_cmd("check out https://github.com/owner/repo") self.assertIsNotNone(cmd) self.assertIn("curl", cmd) self.assertIn("/tmp", cmd) @patch.object(tp, "_IS_WINDOWS", False) def test_git_suffix_stripped(self): cmd, desc = tp._build_explore_cmd("https://github.com/owner/repo.git") self.assertIsNotNone(cmd) # The api_base should not have .git self.assertNotIn("repo.git", cmd) self.assertIn("repo", cmd) @patch.object(tp, "_IS_WINDOWS", False) def test_url_from_json_list(self): text = json.dumps([{"content": "see https://gitea.com/user/proj for details"}]) cmd, desc = tp._build_explore_cmd(text) self.assertIsNotNone(cmd) self.assertIn("gitea.com", cmd) @patch.object(tp, "_IS_WINDOWS", False) def test_api_v1_url_passthrough(self): cmd, desc = tp._build_explore_cmd("https://gitea.com/api/v1/repos/user/proj") self.assertIsNotNone(cmd) self.assertIn("api/v1/repos", cmd) class TestCompactInput(unittest.TestCase): """Tests for _compact_input (line ~1657).""" def test_string_passthrough(self): self.assertEqual(tp._compact_input("hello"), "hello") def test_small_list_passthrough(self): items = [{"type": "message", "role": "user", "content": "hi"}] result = tp._compact_input(items) self.assertEqual(result, items) def test_truncates_large_tool_output(self): long_output = "x" * 10000 items = [{"type": "function_call_output", "output": long_output}] result = tp._compact_input(items) self.assertEqual(len(result), 1) self.assertIn("truncated", result[0]["output"]) self.assertLess(len(result[0]["output"]), 10000) class TestAdaptiveCompact(unittest.TestCase): """Tests for _adaptive_compact (line ~1820).""" def test_within_budget(self): items = [{"type": "message", "role": "user", "content": "hi"}] result, changed = tp._adaptive_compact(items, "gpt-4o") self.assertFalse(changed) self.assertEqual(result, items) def test_string_passthrough(self): result, changed = tp._adaptive_compact("hello", "gpt-4o") self.assertFalse(changed) self.assertEqual(result, "hello") class TestUpstreamTarget(unittest.TestCase): """Tests for upstream_target (line ~1409).""" def test_no_trailing_slash(self): self.assertEqual(tp.upstream_target("https://api.example.com", "/v1/chat"), "https://api.example.com/v1/chat") def test_trailing_slash(self): self.assertEqual(tp.upstream_target("https://api.example.com/", "/v1/chat"), "https://api.example.com/v1/chat") def test_suffix_already_present(self): self.assertEqual(tp.upstream_target("https://api.example.com/v1/chat", "/v1/chat"), "https://api.example.com/v1/chat") class TestCcInputToMessages(unittest.TestCase): """Tests for cc_input_to_messages (line ~2334).""" def test_string_input(self): result = tp.cc_input_to_messages("hello") self.assertEqual(len(result), 1) self.assertEqual(result[0], {"role": "user", "content": "hello"}) def test_with_instructions(self): result = tp.cc_input_to_messages("hello", instructions="be helpful") self.assertEqual(len(result), 2) self.assertEqual(result[0]["role"], "user") self.assertEqual(result[0]["content"], "be helpful") def test_message_items(self): items = [ {"type": "message", "role": "user", "content": [{"type": "input_text", "text": "hi"}]}, ] result = tp.cc_input_to_messages(items) self.assertEqual(len(result), 1) self.assertEqual(result[0]["content"], "hi") def test_function_call_as_text(self): items = [ {"type": "function_call", "call_id": "c1", "name": "bash", "arguments": json.dumps({"cmd": "ls"})}, ] result = tp.cc_input_to_messages(items) # Tool calls should be inline JSON text in assistant message self.assertTrue(any(m["role"] == "assistant" for m in result)) def test_function_call_output_as_user(self): items = [ {"type": "function_call_output", "call_id": "c1", "output": "result text"}, ] result = tp.cc_input_to_messages(items) self.assertTrue(any(m["role"] == "user" and "result text" in m.get("content", "") for m in result)) def test_non_dict_items_skipped(self): items = ["not a dict", 42] result = tp.cc_input_to_messages(items) self.assertEqual(len(result), 0) def test_non_list_non_string_returns_empty(self): result = tp.cc_input_to_messages(42) self.assertEqual(result, []) def test_role_normalization(self): items = [ {"type": "message", "role": "system", "content": "sys"}, ] result = tp.cc_input_to_messages(items) # system role should become user self.assertEqual(result[0]["role"], "user") if __name__ == "__main__": unittest.main()