1169 lines
43 KiB
Python
1169 lines
43 KiB
Python
"""
|
|
Unit tests for translate-proxy.py
|
|
|
|
Covers pure utility functions that can be tested without a running server.
|
|
Uses only stdlib unittest + unittest.mock (zero pip dependencies).
|
|
"""
|
|
|
|
import json
|
|
import sys
|
|
import time
|
|
import unittest
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Import the module under test.
|
|
# The source file has a dash in its name so we use importlib.
|
|
# ---------------------------------------------------------------------------
|
|
import importlib
|
|
|
|
_spec = importlib.util.spec_from_file_location(
|
|
"translate_proxy",
|
|
r"C:\dev\Codex-Launcher---Any-AI-Porovider\src\translate-proxy.py",
|
|
)
|
|
tp = importlib.util.module_from_spec(_spec)
|
|
_spec.loader.exec_module(tp)
|
|
|
|
|
|
# ===================================================================
|
|
# Helpers
|
|
# ===================================================================
|
|
|
|
def _make_chat_resp(content="Hello", tool_calls=None, finish_reason="stop",
|
|
model="test-model", prompt_tokens=10, completion_tokens=5):
|
|
"""Build a minimal OpenAI chat completion response dict."""
|
|
msg = {"role": "assistant", "content": content}
|
|
if tool_calls:
|
|
msg["tool_calls"] = tool_calls
|
|
return {
|
|
"choices": [{"message": msg, "finish_reason": finish_reason}],
|
|
"usage": {"prompt_tokens": prompt_tokens,
|
|
"completion_tokens": completion_tokens,
|
|
"total_tokens": prompt_tokens + completion_tokens},
|
|
}
|
|
|
|
|
|
# ===================================================================
|
|
# Tests grouped by functionality
|
|
# ===================================================================
|
|
|
|
|
|
class TestUnwrapCmd(unittest.TestCase):
|
|
"""Tests for _unwrap_cmd (line ~2810)."""
|
|
|
|
def test_non_string_passthrough(self):
|
|
self.assertIsNone(tp._unwrap_cmd(None))
|
|
self.assertEqual(tp._unwrap_cmd(42), 42)
|
|
|
|
def test_plain_string_not_json(self):
|
|
self.assertEqual(tp._unwrap_cmd("echo hello"), "echo hello")
|
|
|
|
def test_single_wrap(self):
|
|
inner = json.dumps({"cmd": "echo hello"})
|
|
self.assertEqual(tp._unwrap_cmd(inner), "echo hello")
|
|
|
|
def test_double_wrap(self):
|
|
inner = json.dumps({"cmd": "echo hello"})
|
|
outer = json.dumps({"cmd": inner})
|
|
self.assertEqual(tp._unwrap_cmd(outer), "echo hello")
|
|
|
|
def test_triple_wrap(self):
|
|
inner = json.dumps({"cmd": "echo hello"})
|
|
mid = json.dumps({"cmd": inner})
|
|
outer = json.dumps({"cmd": mid})
|
|
self.assertEqual(tp._unwrap_cmd(outer), "echo hello")
|
|
|
|
def test_json_without_cmd_key(self):
|
|
val = json.dumps({"foo": "bar"})
|
|
self.assertEqual(tp._unwrap_cmd(val), val)
|
|
|
|
def test_cmd_value_not_string(self):
|
|
val = json.dumps({"cmd": 123})
|
|
self.assertEqual(tp._unwrap_cmd(val), val)
|
|
|
|
def test_empty_string(self):
|
|
self.assertEqual(tp._unwrap_cmd(""), "")
|
|
|
|
|
|
class TestStripXmlishTags(unittest.TestCase):
|
|
"""Tests for _strip_xmlish_tags (line ~2807)."""
|
|
|
|
def test_none(self):
|
|
self.assertEqual(tp._strip_xmlish_tags(None), "")
|
|
|
|
def test_empty_string(self):
|
|
self.assertEqual(tp._strip_xmlish_tags(""), "")
|
|
|
|
def test_no_tags(self):
|
|
self.assertEqual(tp._strip_xmlish_tags("hello world"), "hello world")
|
|
|
|
def test_basic_tags(self):
|
|
self.assertEqual(tp._strip_xmlish_tags("<b>hello</b> world"), "hello world")
|
|
|
|
def test_self_closing(self):
|
|
self.assertEqual(tp._strip_xmlish_tags("before <br/> after"), "before after")
|
|
|
|
def test_multiple_tags(self):
|
|
self.assertEqual(
|
|
tp._strip_xmlish_tags("<p>one</p> <b>two</b>"),
|
|
"one two",
|
|
)
|
|
|
|
|
|
class TestExtractXmlToolCalls(unittest.TestCase):
|
|
"""Tests for _extract_xml_tool_calls."""
|
|
|
|
def test_empty(self):
|
|
self.assertEqual(tp._extract_xml_tool_calls(""), [])
|
|
self.assertEqual(tp._extract_xml_tool_calls(None), [])
|
|
|
|
def test_no_calls(self):
|
|
self.assertEqual(tp._extract_xml_tool_calls("just plain text"), [])
|
|
|
|
def test_single_tool_call(self):
|
|
# Regex: <tool_call>(\w+)(.*?)</tool_call>
|
|
# Format: <tool_call>NAME>CONTENT</tool_call>
|
|
text = '<tool_call>bash>echo hi</tool_call>'
|
|
results = tp._extract_xml_tool_calls(text)
|
|
self.assertEqual(len(results), 1)
|
|
self.assertEqual(results[0]["name"], "bash")
|
|
self.assertIn("call_id", results[0])
|
|
self.assertTrue(results[0]["call_id"].startswith("xml_"))
|
|
|
|
def test_multiple_tool_calls(self):
|
|
text = (
|
|
'<tool_call>bash>echo hi</tool_call>'
|
|
'<tool_call>edit>test.py</tool_call>'
|
|
)
|
|
results = tp._extract_xml_tool_calls(text)
|
|
self.assertEqual(len(results), 2)
|
|
self.assertEqual(results[0]["name"], "bash")
|
|
self.assertEqual(results[1]["name"], "edit")
|
|
|
|
def test_json_args(self):
|
|
text = '<tool_call>tool>{"key": "value"}</tool_call>'
|
|
results = tp._extract_xml_tool_calls(text)
|
|
self.assertEqual(len(results), 1)
|
|
self.assertEqual(results[0]["name"], "tool")
|
|
args = json.loads(results[0]["args"])
|
|
# JSON parsing of XML content may vary - just check result exists
|
|
self.assertIn("args", results[0])
|
|
|
|
def test_code_fenced_args(self):
|
|
text = '<tool_call>tool>{"a": 1}</tool_call>'
|
|
results = tp._extract_xml_tool_calls(text)
|
|
self.assertEqual(len(results), 1)
|
|
|
|
|
|
class TestNormalizeToolArgs(unittest.TestCase):
|
|
"""Tests for _normalize_tool_args (line ~2196)."""
|
|
|
|
def test_empty(self):
|
|
self.assertEqual(tp._normalize_tool_args(""), "")
|
|
self.assertIsNone(tp._normalize_tool_args(None))
|
|
self.assertEqual(tp._normalize_tool_args("{}"), "{}")
|
|
|
|
def test_valid_json_passthrough(self):
|
|
raw = json.dumps({"cmd": "echo hello"})
|
|
self.assertEqual(tp._normalize_tool_args(raw), raw)
|
|
|
|
def test_wrapped_arguments_uppercase_key(self):
|
|
inner = json.dumps({"cmd": "echo hello"})
|
|
raw = json.dumps({"Arguments": inner})
|
|
result = tp._normalize_tool_args(raw)
|
|
self.assertEqual(json.loads(result), {"cmd": "echo hello"})
|
|
|
|
def test_wrapped_arguments_with_code_fence(self):
|
|
inner = json.dumps({"cmd": "echo hello"})
|
|
raw = json.dumps({"Arguments": f"```json\n{inner}\n```"})
|
|
result = tp._normalize_tool_args(raw)
|
|
self.assertEqual(json.loads(result), {"cmd": "echo hello"})
|
|
|
|
def test_invalid_json_passthrough(self):
|
|
raw = "not json at all"
|
|
self.assertEqual(tp._normalize_tool_args(raw), raw)
|
|
|
|
|
|
class TestEmit(unittest.TestCase):
|
|
"""Tests for emit (line ~1406)."""
|
|
|
|
def test_basic_event(self):
|
|
result = tp.emit("test.event", {"key": "value"})
|
|
self.assertTrue(result.startswith("event: test.event\n"))
|
|
self.assertIn('"key": "value"', result)
|
|
self.assertTrue(result.endswith("\n\n"))
|
|
|
|
def test_event_data_format(self):
|
|
data = {"type": "response.created", "id": "123"}
|
|
result = tp.emit("response.created", data)
|
|
lines = result.split("\n")
|
|
self.assertEqual(lines[0], "event: response.created")
|
|
self.assertTrue(lines[1].startswith("data: "))
|
|
parsed = json.loads(lines[1][6:])
|
|
self.assertEqual(parsed["type"], "response.created")
|
|
|
|
|
|
class TestUid(unittest.TestCase):
|
|
"""Tests for uid (line ~1403)."""
|
|
|
|
def test_default_prefix(self):
|
|
result = tp.uid()
|
|
self.assertTrue(result.startswith("id-"))
|
|
|
|
def test_custom_prefix(self):
|
|
result = tp.uid("msg")
|
|
self.assertTrue(result.startswith("msg-"))
|
|
|
|
def test_uniqueness(self):
|
|
ids = {tp.uid() for _ in range(100)}
|
|
self.assertEqual(len(ids), 100)
|
|
|
|
|
|
class TestRedact(unittest.TestCase):
|
|
"""Tests for _redact / _redact_json (line ~2065/2073)."""
|
|
|
|
def test_none(self):
|
|
self.assertIsNone(tp._redact(None))
|
|
|
|
def test_empty_string(self):
|
|
self.assertEqual(tp._redact(""), "")
|
|
|
|
def test_no_secrets(self):
|
|
self.assertEqual(tp._redact("hello world"), "hello world")
|
|
|
|
def test_sk_key(self):
|
|
text = "key=sk-abc123def456ghi789jkl012mno"
|
|
self.assertIn("[REDACTED:key]", tp._redact(text))
|
|
self.assertNotIn("sk-abc123", tp._redact(text))
|
|
|
|
def test_sk_ant_key(self):
|
|
# Note: sk-ant-... is matched by the first sk- pattern (sk-[A-Za-z0-9_\-]{20,})
|
|
# which redacts it as [REDACTED:key] since it runs first.
|
|
text = "key=sk-ant-api03-abcdefghijklmnopqrstuv"
|
|
redacted = tp._redact(text)
|
|
self.assertIn("[REDACTED:", redacted)
|
|
self.assertNotIn("sk-ant-api03", redacted)
|
|
|
|
def test_github_token(self):
|
|
text = "token=ghp_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghij"
|
|
self.assertIn("[REDACTED:github]", tp._redact(text))
|
|
|
|
def test_bearer_token(self):
|
|
text = "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.verylongpayload"
|
|
self.assertIn("Bearer [REDACTED]", tp._redact(text))
|
|
|
|
def test_redact_json_dict(self):
|
|
obj = {"api_key": "sk-abc123def456ghi789jkl012mno345pqr"}
|
|
result = tp._redact_json(obj)
|
|
self.assertIn("[REDACTED:key]", result)
|
|
self.assertNotIn("sk-abc123", result)
|
|
|
|
|
|
class TestEstimateTokens(unittest.TestCase):
|
|
"""Tests for _estimate_tokens (line ~1809)."""
|
|
|
|
def test_none(self):
|
|
self.assertEqual(tp._estimate_tokens(None), 0)
|
|
|
|
def test_string(self):
|
|
# "abcdefghij" is 10 chars => max(1, 10//4) = 2
|
|
self.assertEqual(tp._estimate_tokens("abcdefghij"), 2)
|
|
|
|
def test_short_string(self):
|
|
# "ab" is 2 chars => max(1, 2//4) = max(1, 0) = 1
|
|
self.assertEqual(tp._estimate_tokens("ab"), 1)
|
|
|
|
def test_dict(self):
|
|
obj = {"key": "value"}
|
|
result = tp._estimate_tokens(obj)
|
|
self.assertGreater(result, 0)
|
|
|
|
def test_list(self):
|
|
result = tp._estimate_tokens([1, 2, 3])
|
|
self.assertGreater(result, 0)
|
|
|
|
|
|
class TestItemSummary(unittest.TestCase):
|
|
"""Tests for _item_summary (line ~1614)."""
|
|
|
|
def test_message(self):
|
|
item = {"type": "message", "role": "user",
|
|
"content": [{"type": "input_text", "text": "hello"}]}
|
|
result = tp._item_summary(item)
|
|
self.assertIn("[user]", result)
|
|
self.assertIn("hello", result)
|
|
|
|
def test_function_call(self):
|
|
item = {"type": "function_call", "name": "exec_command",
|
|
"arguments": json.dumps({"cmd": "ls -la"})}
|
|
result = tp._item_summary(item)
|
|
self.assertIn("[tool call]", result)
|
|
self.assertIn("exec_command", result)
|
|
self.assertIn("ls -la", result)
|
|
|
|
def test_function_call_output(self):
|
|
item = {"type": "function_call_output", "output": "file1.txt\nfile2.txt"}
|
|
result = tp._item_summary(item)
|
|
self.assertIn("[tool result]", result)
|
|
self.assertIn("file1.txt", result)
|
|
|
|
def test_function_call_output_truncated(self):
|
|
long_output = "x" * 300
|
|
item = {"type": "function_call_output", "output": long_output}
|
|
result = tp._item_summary(item, max_len=100)
|
|
self.assertIn("...", result)
|
|
self.assertTrue(len(result) < 300)
|
|
|
|
def test_unknown_type(self):
|
|
item = {"type": "custom_type"}
|
|
result = tp._item_summary(item)
|
|
self.assertEqual(result, "[custom_type]")
|
|
|
|
|
|
class TestContextLimitForModel(unittest.TestCase):
|
|
"""Tests for _context_limit_for_model (line ~1800)."""
|
|
|
|
def test_none_model(self):
|
|
self.assertEqual(tp._context_limit_for_model(None), 32768)
|
|
|
|
def test_empty_model(self):
|
|
self.assertEqual(tp._context_limit_for_model(""), 32768)
|
|
|
|
def test_gpt4o(self):
|
|
self.assertEqual(tp._context_limit_for_model("gpt-4o"), 128000)
|
|
|
|
def test_claude_sonnet(self):
|
|
self.assertEqual(tp._context_limit_for_model("claude-sonnet-4-6"), 200000)
|
|
|
|
def test_gemini_flash(self):
|
|
self.assertEqual(tp._context_limit_for_model("gemini-2.5-flash"), 1000000)
|
|
|
|
def test_case_insensitive(self):
|
|
self.assertEqual(tp._context_limit_for_model("GPT-4O"), 128000)
|
|
|
|
def test_unknown_model(self):
|
|
self.assertEqual(tp._context_limit_for_model("future-model-x"), 32768)
|
|
|
|
def test_deepseek(self):
|
|
self.assertEqual(tp._context_limit_for_model("deepseek-chat"), 64000)
|
|
|
|
|
|
class TestClassifyAntigravityError(unittest.TestCase):
|
|
"""Tests for _classify_antigravity_error (line ~632)."""
|
|
|
|
def test_400(self):
|
|
self.assertEqual(tp._classify_antigravity_error(400, "bad input"), "bad_request")
|
|
|
|
def test_401_transient(self):
|
|
self.assertEqual(tp._classify_antigravity_error(401, "access denied"), "auth_transient")
|
|
|
|
def test_401_permanent_invalid_grant(self):
|
|
self.assertEqual(tp._classify_antigravity_error(401, "invalid_grant token"), "auth_permanent")
|
|
|
|
def test_401_permanent_revoked(self):
|
|
self.assertEqual(tp._classify_antigravity_error(401, "token revoked"), "auth_permanent")
|
|
|
|
def test_403_validation_required(self):
|
|
self.assertEqual(tp._classify_antigravity_error(403, "validation_required"), "validation_required")
|
|
|
|
def test_403_account_banned(self):
|
|
self.assertEqual(
|
|
tp._classify_antigravity_error(403, "has been disabled for violation of terms of service"),
|
|
"account_banned",
|
|
)
|
|
|
|
def test_403_service_disabled(self):
|
|
self.assertEqual(tp._classify_antigravity_error(403, "service_disabled"), "service_disabled")
|
|
|
|
def test_403_generic(self):
|
|
self.assertEqual(tp._classify_antigravity_error(403, "some other error"), "forbidden")
|
|
|
|
def test_429_capacity(self):
|
|
self.assertEqual(
|
|
tp._classify_antigravity_error(429, "model_capacity_exhausted"),
|
|
"capacity_exhausted",
|
|
)
|
|
|
|
def test_429_quota(self):
|
|
self.assertEqual(
|
|
tp._classify_antigravity_error(429, "quota_exhausted"),
|
|
"quota_exhausted",
|
|
)
|
|
|
|
def test_429_generic(self):
|
|
self.assertEqual(tp._classify_antigravity_error(429, "slow down"), "rate_limited")
|
|
|
|
def test_503_capacity(self):
|
|
self.assertEqual(
|
|
tp._classify_antigravity_error(503, "service temporarily unavailable"),
|
|
"capacity_exhausted",
|
|
)
|
|
|
|
def test_500(self):
|
|
self.assertEqual(tp._classify_antigravity_error(500, "internal error"), "server_error")
|
|
|
|
def test_418(self):
|
|
self.assertEqual(tp._classify_antigravity_error(418, "I'm a teapot"), "unknown")
|
|
|
|
|
|
class TestParseRateLimitReset(unittest.TestCase):
|
|
"""Tests for _parse_rate_limit_reset (line ~658)."""
|
|
|
|
def test_quota_reset_delay_ms(self):
|
|
result = tp._parse_rate_limit_reset('quotaResetDelay: 5000ms')
|
|
self.assertEqual(result, 5.0)
|
|
|
|
def test_quota_reset_delay_seconds(self):
|
|
result = tp._parse_rate_limit_reset('quotaResetDelay: 30s')
|
|
self.assertEqual(result, 30.0)
|
|
|
|
def test_hms_format(self):
|
|
result = tp._parse_rate_limit_reset('1h30m0s')
|
|
self.assertEqual(result, 5400)
|
|
|
|
def test_resets_in_format(self):
|
|
result = tp._parse_rate_limit_reset('Resets in ~2h0m')
|
|
self.assertEqual(result, 7200)
|
|
|
|
def test_retry_after_seconds(self):
|
|
result = tp._parse_rate_limit_reset('retry-after: 60 seconds')
|
|
self.assertEqual(result, 60)
|
|
|
|
def test_no_match(self):
|
|
self.assertIsNone(tp._parse_rate_limit_reset('no timing info here'))
|
|
|
|
def test_empty_string(self):
|
|
self.assertIsNone(tp._parse_rate_limit_reset(''))
|
|
|
|
|
|
class TestExtractText(unittest.TestCase):
|
|
"""Tests for _extract_text (line ~4295)."""
|
|
|
|
def test_string(self):
|
|
self.assertEqual(tp._extract_text("hello"), "hello")
|
|
|
|
def test_none(self):
|
|
self.assertEqual(tp._extract_text(None), "")
|
|
|
|
def test_number(self):
|
|
self.assertEqual(tp._extract_text(42), "")
|
|
|
|
def test_content_blocks(self):
|
|
content = [
|
|
{"type": "input_text", "text": "hello "},
|
|
{"type": "output_text", "text": "world"},
|
|
]
|
|
self.assertEqual(tp._extract_text(content), "hello world")
|
|
|
|
def test_mixed_content(self):
|
|
content = [
|
|
"plain text ",
|
|
{"type": "text", "text": "and dict"},
|
|
]
|
|
self.assertEqual(tp._extract_text(content), "plain text and dict")
|
|
|
|
def test_empty_list(self):
|
|
self.assertEqual(tp._extract_text([]), "")
|
|
|
|
|
|
class TestValidateToolPairs(unittest.TestCase):
|
|
"""Tests for validate_tool_pairs (line ~1984)."""
|
|
|
|
def test_not_list(self):
|
|
self.assertEqual(tp.validate_tool_pairs("not a list"), [])
|
|
self.assertEqual(tp.validate_tool_pairs(None), [])
|
|
|
|
def test_empty_list(self):
|
|
self.assertEqual(tp.validate_tool_pairs([]), [])
|
|
|
|
def test_valid_pairs(self):
|
|
items = [
|
|
{"type": "function_call", "call_id": "c1", "name": "tool1"},
|
|
{"type": "function_call_output", "call_id": "c1", "output": "ok"},
|
|
]
|
|
self.assertEqual(tp.validate_tool_pairs(items), [])
|
|
|
|
def test_orphan_output(self):
|
|
items = [
|
|
{"type": "function_call_output", "call_id": "missing", "output": "ok"},
|
|
]
|
|
errors = tp.validate_tool_pairs(items)
|
|
self.assertEqual(len(errors), 1)
|
|
self.assertEqual(errors[0]["error"], "orphan_function_call_output")
|
|
self.assertEqual(errors[0]["call_id"], "missing")
|
|
|
|
def test_orphan_no_call_id(self):
|
|
items = [
|
|
{"type": "function_call_output", "output": "ok"},
|
|
]
|
|
errors = tp.validate_tool_pairs(items)
|
|
self.assertEqual(len(errors), 1)
|
|
|
|
def test_mixed_valid_and_orphan(self):
|
|
items = [
|
|
{"type": "function_call", "call_id": "c1"},
|
|
{"type": "function_call_output", "call_id": "c1"},
|
|
{"type": "function_call_output", "call_id": "c_orphan"},
|
|
]
|
|
errors = tp.validate_tool_pairs(items)
|
|
self.assertEqual(len(errors), 1)
|
|
self.assertEqual(errors[0]["call_id"], "c_orphan")
|
|
|
|
|
|
class TestRepairOrphanToolOutputs(unittest.TestCase):
|
|
"""Tests for repair_orphan_tool_outputs (line ~2001)."""
|
|
|
|
def test_repair(self):
|
|
items = [
|
|
{"type": "message", "role": "user", "content": "hi"},
|
|
{"type": "function_call_output", "output": "orphan result"},
|
|
]
|
|
errors = [{"index": 1, "call_id": None, "error": "orphan_function_call_output"}]
|
|
repaired = tp.repair_orphan_tool_outputs(items, errors)
|
|
self.assertEqual(len(repaired), 2)
|
|
self.assertEqual(repaired[0]["type"], "message") # kept
|
|
self.assertEqual(repaired[1]["type"], "message") # converted
|
|
self.assertIn("unmatched tool output", repaired[1]["content"][0]["text"])
|
|
|
|
def test_no_errors(self):
|
|
items = [{"type": "message", "role": "user", "content": "hi"}]
|
|
repaired = tp.repair_orphan_tool_outputs(items, [])
|
|
self.assertEqual(repaired, items)
|
|
|
|
|
|
class TestHasFunctionCallOutput(unittest.TestCase):
|
|
"""Tests for has_function_call_output (line ~2051)."""
|
|
|
|
def test_not_list(self):
|
|
self.assertFalse(tp.has_function_call_output("string"))
|
|
self.assertFalse(tp.has_function_call_output(None))
|
|
|
|
def test_has_output(self):
|
|
items = [{"type": "function_call_output", "output": "ok"}]
|
|
self.assertTrue(tp.has_function_call_output(items))
|
|
|
|
def test_no_output(self):
|
|
items = [{"type": "message", "role": "user", "content": "hi"}]
|
|
self.assertFalse(tp.has_function_call_output(items))
|
|
|
|
|
|
class TestSynthesizeToolResults(unittest.TestCase):
|
|
"""Tests for synthesize_tool_results_for_chat (line ~2014)."""
|
|
|
|
def test_not_list(self):
|
|
result, changed = tp.synthesize_tool_results_for_chat("string")
|
|
self.assertEqual(result, "string")
|
|
self.assertFalse(changed)
|
|
|
|
def test_empty_list(self):
|
|
result, changed = tp.synthesize_tool_results_for_chat([])
|
|
self.assertEqual(result, [])
|
|
self.assertFalse(changed)
|
|
|
|
def test_synthesis(self):
|
|
items = [
|
|
{"type": "function_call", "call_id": "c1", "name": "bash",
|
|
"arguments": json.dumps({"cmd": "ls"})},
|
|
{"type": "function_call_output", "call_id": "c1", "output": "file.txt"},
|
|
{"type": "message", "role": "user", "content": [{"type": "input_text", "text": "hi"}]},
|
|
]
|
|
result, changed = tp.synthesize_tool_results_for_chat(items)
|
|
self.assertTrue(changed)
|
|
# Should have synthesized message + original message
|
|
self.assertEqual(len(result), 2)
|
|
# The synthesized message should contain tool info
|
|
synth_msg = result[0]
|
|
self.assertEqual(synth_msg["role"], "user")
|
|
synth_text = synth_msg["content"][0]["text"]
|
|
self.assertIn("Tool execution result", synth_text)
|
|
self.assertIn("bash", synth_text)
|
|
self.assertIn("file.txt", synth_text)
|
|
|
|
def test_orphan_output(self):
|
|
items = [
|
|
{"type": "function_call_output", "call_id": "missing", "output": "orphan"},
|
|
]
|
|
result, changed = tp.synthesize_tool_results_for_chat(items)
|
|
self.assertTrue(changed)
|
|
self.assertEqual(result[0]["role"], "user")
|
|
|
|
|
|
class TestOaConvertTools(unittest.TestCase):
|
|
"""Tests for oa_convert_tools (line ~2417)."""
|
|
|
|
def test_none_input(self):
|
|
self.assertIsNone(tp.oa_convert_tools(None))
|
|
|
|
def test_empty_list(self):
|
|
self.assertIsNone(tp.oa_convert_tools([]))
|
|
|
|
def test_function_tools(self):
|
|
tools = [
|
|
{"type": "function", "function": {"name": "bash", "description": "Run a command",
|
|
"parameters": {"type": "object"}}},
|
|
]
|
|
result = tp.oa_convert_tools(tools)
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0]["function"]["name"], "bash")
|
|
|
|
def test_non_function_filtered(self):
|
|
tools = [
|
|
{"type": "web_search"},
|
|
{"type": "function", "function": {"name": "bash", "description": "", "parameters": {}}},
|
|
]
|
|
result = tp.oa_convert_tools(tools)
|
|
self.assertEqual(len(result), 1)
|
|
|
|
def test_empty_name_filtered(self):
|
|
tools = [{"type": "function", "function": {"name": "", "description": "", "parameters": {}}}]
|
|
self.assertIsNone(tp.oa_convert_tools(tools))
|
|
|
|
def test_null_name_filtered(self):
|
|
tools = [{"type": "function", "function": {"name": "null", "description": "", "parameters": {}}}]
|
|
self.assertIsNone(tp.oa_convert_tools(tools))
|
|
|
|
def test_function_without_fn_key(self):
|
|
tools = [{"type": "function", "name": "my_tool", "description": "desc", "parameters": {}}]
|
|
result = tp.oa_convert_tools(tools)
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0]["function"]["name"], "my_tool")
|
|
|
|
def test_strict_mode(self):
|
|
tools = [{"type": "function", "function": {"name": "t", "description": "", "parameters": {}}}]
|
|
result = tp.oa_convert_tools(tools, strict=True)
|
|
self.assertTrue(result[0]["function"]["strict"])
|
|
|
|
|
|
class TestOaRespToResponses(unittest.TestCase):
|
|
"""Tests for oa_resp_to_responses (line ~2448)."""
|
|
|
|
def test_simple_text(self):
|
|
chat_resp = _make_chat_resp(content="Hello world")
|
|
result = tp.oa_resp_to_responses(chat_resp, "test-model", resp_id="r1")
|
|
self.assertEqual(result["id"], "r1")
|
|
self.assertEqual(result["model"], "test-model")
|
|
self.assertEqual(result["status"], "completed")
|
|
self.assertEqual(len(result["output"]), 1)
|
|
self.assertEqual(result["output"][0]["type"], "message")
|
|
text_content = result["output"][0]["content"][0]["text"]
|
|
self.assertEqual(text_content, "Hello world")
|
|
|
|
def test_tool_calls(self):
|
|
tc = {"id": "tc1", "type": "function", "function": {"name": "bash", "arguments": '{"cmd":"ls"}'}}
|
|
chat_resp = _make_chat_resp(content="", tool_calls=[tc], finish_reason="tool_calls")
|
|
result = tp.oa_resp_to_responses(chat_resp, "test-model")
|
|
self.assertEqual(result["status"], "completed")
|
|
# Should have one function_call output
|
|
fc_outputs = [o for o in result["output"] if o["type"] == "function_call"]
|
|
self.assertEqual(len(fc_outputs), 1)
|
|
self.assertEqual(fc_outputs[0]["name"], "bash")
|
|
|
|
def test_finish_reason_length(self):
|
|
chat_resp = _make_chat_resp(finish_reason="length")
|
|
result = tp.oa_resp_to_responses(chat_resp, "m")
|
|
self.assertEqual(result["status"], "incomplete")
|
|
|
|
def test_finish_reason_content_filter(self):
|
|
chat_resp = _make_chat_resp(finish_reason="content_filter")
|
|
result = tp.oa_resp_to_responses(chat_resp, "m")
|
|
self.assertEqual(result["status"], "incomplete")
|
|
|
|
def test_usage(self):
|
|
chat_resp = _make_chat_resp(prompt_tokens=100, completion_tokens=50)
|
|
result = tp.oa_resp_to_responses(chat_resp, "m")
|
|
self.assertEqual(result["usage"]["input_tokens"], 100)
|
|
self.assertEqual(result["usage"]["output_tokens"], 50)
|
|
self.assertEqual(result["usage"]["total_tokens"], 150)
|
|
|
|
def test_content_and_tool_calls(self):
|
|
tc = {"id": "tc1", "type": "function", "function": {"name": "bash", "arguments": '{}'}}
|
|
chat_resp = _make_chat_resp(content="thinking...", tool_calls=[tc])
|
|
result = tp.oa_resp_to_responses(chat_resp, "m")
|
|
types = [o["type"] for o in result["output"]]
|
|
self.assertIn("message", types)
|
|
self.assertIn("function_call", types)
|
|
|
|
|
|
class TestOaInputToMessages(unittest.TestCase):
|
|
"""Tests for oa_input_to_messages (line ~2265)."""
|
|
|
|
def test_string_input(self):
|
|
result = tp.oa_input_to_messages("hello")
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0]["role"], "user")
|
|
self.assertEqual(result[0]["content"], "hello")
|
|
|
|
def test_message_input(self):
|
|
items = [
|
|
{"type": "message", "role": "user",
|
|
"content": [{"type": "input_text", "text": "hello"}]},
|
|
]
|
|
result = tp.oa_input_to_messages(items)
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0]["content"], "hello")
|
|
|
|
def test_developer_role_becomes_system(self):
|
|
items = [
|
|
{"type": "message", "role": "developer",
|
|
"content": [{"type": "input_text", "text": "instructions"}]},
|
|
]
|
|
result = tp.oa_input_to_messages(items)
|
|
self.assertEqual(result[0]["role"], "system")
|
|
|
|
def test_function_call_and_output(self):
|
|
items = [
|
|
{"type": "function_call", "call_id": "c1", "name": "bash",
|
|
"arguments": json.dumps({"cmd": "ls"})},
|
|
{"type": "function_call_output", "call_id": "c1", "output": "file.txt"},
|
|
]
|
|
result = tp.oa_input_to_messages(items)
|
|
# Should produce: assistant with tool_calls + tool message
|
|
self.assertEqual(len(result), 2)
|
|
self.assertEqual(result[0]["role"], "assistant")
|
|
self.assertIsNotNone(result[0]["tool_calls"])
|
|
self.assertEqual(result[1]["role"], "tool")
|
|
self.assertEqual(result[1]["tool_call_id"], "c1")
|
|
|
|
def test_string_content(self):
|
|
items = [
|
|
{"type": "message", "role": "user", "content": "plain text"},
|
|
]
|
|
result = tp.oa_input_to_messages(items)
|
|
self.assertEqual(result[0]["content"], "plain text")
|
|
|
|
def test_reasoning_content(self):
|
|
items = [
|
|
{"type": "message", "role": "assistant",
|
|
"content": [
|
|
{"type": "reasoning", "content": [{"text": "thinking..."}]},
|
|
{"type": "output_text", "text": "result"},
|
|
]},
|
|
]
|
|
result = tp.oa_input_to_messages(items)
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0]["content"], "result")
|
|
self.assertIn("reasoning_content", result[0])
|
|
self.assertEqual(result[0]["reasoning_content"], "thinking...")
|
|
|
|
|
|
class TestFbStripReasoningFromMessages(unittest.TestCase):
|
|
"""Tests for _fb_strip_reasoning_from_messages (line ~1383)."""
|
|
|
|
def test_strips_reasoning(self):
|
|
messages = [
|
|
{"role": "assistant", "content": "hi", "reasoning_content": "thoughts"},
|
|
{"role": "user", "content": "ok"},
|
|
]
|
|
result = tp._fb_strip_reasoning_from_messages(messages)
|
|
self.assertNotIn("reasoning_content", result[0])
|
|
self.assertEqual(result[0]["content"], "hi")
|
|
self.assertEqual(result[1]["content"], "ok")
|
|
|
|
def test_no_reasoning(self):
|
|
messages = [{"role": "user", "content": "hi"}]
|
|
result = tp._fb_strip_reasoning_from_messages(messages)
|
|
self.assertEqual(result, messages)
|
|
|
|
def test_empty_list(self):
|
|
self.assertEqual(tp._fb_strip_reasoning_from_messages([]), [])
|
|
|
|
|
|
class TestCbInputToMessages(unittest.TestCase):
|
|
"""Tests for _cb_input_to_messages (line ~1321)."""
|
|
|
|
def test_string_input(self):
|
|
result = tp._cb_input_to_messages("hello")
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0]["role"], "user")
|
|
self.assertEqual(result[0]["content"], "hello")
|
|
|
|
def test_with_instructions(self):
|
|
result = tp._cb_input_to_messages("hello", instructions="be helpful")
|
|
self.assertEqual(len(result), 2)
|
|
self.assertEqual(result[0]["role"], "system")
|
|
self.assertEqual(result[0]["content"], "be helpful")
|
|
|
|
def test_function_call_and_output(self):
|
|
items = [
|
|
{"type": "function_call", "call_id": "c1", "name": "bash",
|
|
"arguments": json.dumps({"cmd": "ls"})},
|
|
{"type": "function_call_output", "call_id": "c1", "output": "file.txt"},
|
|
]
|
|
result = tp._cb_input_to_messages(items)
|
|
self.assertGreaterEqual(len(result), 2)
|
|
|
|
def test_reasoning_items_skipped(self):
|
|
items = [
|
|
{"type": "reasoning", "content": [{"text": "thinking"}]},
|
|
{"type": "message", "role": "user", "content": [{"type": "input_text", "text": "hi"}]},
|
|
]
|
|
result = tp._cb_input_to_messages(items)
|
|
# reasoning should be skipped, only message kept
|
|
msg_roles = [m["role"] for m in result]
|
|
self.assertNotIn("reasoning", msg_roles)
|
|
|
|
def test_developer_role_becomes_system(self):
|
|
items = [
|
|
{"type": "message", "role": "developer",
|
|
"content": [{"type": "input_text", "text": "instructions"}]},
|
|
]
|
|
result = tp._cb_input_to_messages(items)
|
|
self.assertEqual(result[0]["role"], "system")
|
|
|
|
|
|
class TestCodebuffHardDisableReasoning(unittest.TestCase):
|
|
"""Tests for _codebuff_hard_disable_reasoning (line ~1258)."""
|
|
|
|
def test_removes_reasoning_fields(self):
|
|
messages = [
|
|
{"role": "assistant", "content": "hi",
|
|
"reasoning_content": "thoughts", "thinking": "deep"},
|
|
{"role": "user", "content": "ok"},
|
|
]
|
|
tp._codebuff_hard_disable_reasoning(messages)
|
|
self.assertNotIn("reasoning_content", messages[0])
|
|
self.assertNotIn("thinking", messages[0])
|
|
self.assertIn("content", messages[0])
|
|
|
|
def test_handles_non_dict(self):
|
|
messages = ["not a dict", {"role": "user", "content": "ok"}]
|
|
tp._codebuff_hard_disable_reasoning(messages)
|
|
# Should not crash
|
|
|
|
def test_removes_all_reasoning_keys(self):
|
|
for key in ("reasoning_content", "reasoning", "thinking",
|
|
"thinking_content", "thoughts"):
|
|
msg = {"role": "assistant", "content": "hi", key: "val"}
|
|
tp._codebuff_hard_disable_reasoning([msg])
|
|
self.assertNotIn(key, msg, f"Key {key} should have been removed")
|
|
|
|
|
|
class TestIsReasoningContentError(unittest.TestCase):
|
|
"""Tests for _is_reasoning_content_error (line ~1269)."""
|
|
|
|
def test_none(self):
|
|
self.assertFalse(tp._is_reasoning_content_error(None))
|
|
|
|
def test_empty(self):
|
|
self.assertFalse(tp._is_reasoning_content_error(""))
|
|
|
|
def test_matching(self):
|
|
self.assertTrue(tp._is_reasoning_content_error("reasoning_content is invalid"))
|
|
self.assertTrue(tp._is_reasoning_content_error("thinking mode required"))
|
|
self.assertTrue(tp._is_reasoning_content_error("must be passed back"))
|
|
|
|
def test_not_matching(self):
|
|
self.assertFalse(tp._is_reasoning_content_error("rate limit exceeded"))
|
|
|
|
|
|
class TestDsRebuildToolHistory(unittest.TestCase):
|
|
"""Tests for _ds_rebuild_tool_history (line ~1298)."""
|
|
|
|
def test_empty_messages(self):
|
|
result = tp._ds_rebuild_tool_history([])
|
|
self.assertEqual(result, [])
|
|
|
|
def test_no_matching_tool_ids(self):
|
|
messages = [
|
|
{"role": "user", "content": "hi"},
|
|
{"role": "tool", "tool_call_id": "tc_unknown", "content": "output"},
|
|
]
|
|
result = tp._ds_rebuild_tool_history(messages)
|
|
self.assertEqual(len(result), 2)
|
|
|
|
|
|
class TestRouteKey(unittest.TestCase):
|
|
"""Tests for _route_key (line ~1475)."""
|
|
|
|
def test_basic(self):
|
|
route = {"name": "primary", "target_url": "https://api.example.com", "model": "gpt-4o"}
|
|
key = tp._route_key(route)
|
|
self.assertEqual(key, "primary::https://api.example.com::gpt-4o")
|
|
|
|
def test_missing_fields(self):
|
|
route = {}
|
|
key = tp._route_key(route)
|
|
self.assertEqual(key, "::::")
|
|
|
|
def test_partial_fields(self):
|
|
route = {"name": "backup"}
|
|
key = tp._route_key(route)
|
|
self.assertEqual(key, "backup::::")
|
|
|
|
|
|
class TestScoreRoute(unittest.TestCase):
|
|
"""Tests for _score_route (line ~1492)."""
|
|
|
|
def test_basic_scoring(self):
|
|
route = {"name": "r1", "priority": 10}
|
|
stats = {}
|
|
score = tp._score_route(route, stats)
|
|
self.assertEqual(score, 10)
|
|
|
|
def test_with_latency(self):
|
|
route = {"name": "r1", "priority": 10}
|
|
key = tp._route_key(route)
|
|
stats = {key: {"ewma_latency_s": 2.0, "consecutive_failures": 0}}
|
|
score = tp._score_route(stats=stats, route=route)
|
|
# priority(10) + min(ewma*5, 50)(10) + failures*20(0) = 20
|
|
self.assertEqual(score, 20)
|
|
|
|
def test_with_failures(self):
|
|
route = {"name": "r1", "priority": 10}
|
|
key = tp._route_key(route)
|
|
stats = {key: {"ewma_latency_s": 0, "consecutive_failures": 3}}
|
|
score = tp._score_route(stats=stats, route=route)
|
|
# priority(10) + 0 + 3*20 = 70
|
|
self.assertEqual(score, 70)
|
|
|
|
def test_open_circuit(self):
|
|
route = {"name": "r1", "priority": 10}
|
|
key = tp._route_key(route)
|
|
stats = {key: {"open_until_ts": time.time() + 100}}
|
|
score = tp._score_route(stats=stats, route=route)
|
|
self.assertEqual(score, 1_000_000)
|
|
|
|
def test_rate_limited(self):
|
|
route = {"name": "r1", "priority": 10}
|
|
key = tp._route_key(route)
|
|
stats = {key: {"ewma_latency_s": 0, "consecutive_failures": 0,
|
|
"rate_limited_until": time.time() + 100}}
|
|
score = tp._score_route(stats=stats, route=route)
|
|
self.assertEqual(score, 10 + 500)
|
|
|
|
|
|
class TestBucketForRoute(unittest.TestCase):
|
|
"""Tests for _bucket_for_route (line ~2164)."""
|
|
|
|
def test_creates_bucket(self):
|
|
route = {"name": "test_route"}
|
|
bucket = tp._bucket_for_route(route)
|
|
self.assertIsNotNone(bucket)
|
|
self.assertTrue(hasattr(bucket, "allow"))
|
|
|
|
def test_same_route_same_bucket(self):
|
|
route = {"name": "same_route"}
|
|
b1 = tp._bucket_for_route(route)
|
|
b2 = tp._bucket_for_route(route)
|
|
self.assertIs(b1, b2)
|
|
|
|
def test_default_name(self):
|
|
route = {"target_url": "https://example.com"}
|
|
bucket = tp._bucket_for_route(route)
|
|
self.assertIsNotNone(bucket)
|
|
|
|
|
|
class TestSanitizeToolCalls(unittest.TestCase):
|
|
"""Tests for _sanitize_tool_calls (line ~3467)."""
|
|
|
|
def test_non_exec_command_passthrough(self):
|
|
calls = [{"name": "TodoWrite", "arguments": json.dumps({"todos": []})}]
|
|
result = tp._sanitize_tool_calls(calls)
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0]["name"], "TodoWrite")
|
|
|
|
def test_exec_command_empty_cmd(self):
|
|
calls = [{"name": "exec_command", "arguments": json.dumps({"cmd": ""})}]
|
|
result = tp._sanitize_tool_calls(calls)
|
|
self.assertEqual(len(result), 1)
|
|
cmd = json.loads(result[0]["arguments"])["cmd"]
|
|
self.assertIn("CC-SANITIZER", cmd)
|
|
|
|
def test_exec_command_null_cmd(self):
|
|
calls = [{"name": "exec_command", "arguments": json.dumps({"cmd": "null"})}]
|
|
result = tp._sanitize_tool_calls(calls)
|
|
cmd = json.loads(result[0]["arguments"])["cmd"]
|
|
self.assertIn("CC-SANITIZER", cmd)
|
|
|
|
def test_exec_command_valid_cmd(self):
|
|
calls = [{"name": "exec_command", "arguments": json.dumps({"cmd": "echo hello"})}]
|
|
result = tp._sanitize_tool_calls(calls)
|
|
args = json.loads(result[0]["arguments"])
|
|
self.assertEqual(args["cmd"], "echo hello")
|
|
|
|
def test_double_wrapped_cmd(self):
|
|
inner = json.dumps({"cmd": "echo hello"})
|
|
calls = [{"name": "exec_command", "arguments": json.dumps({"cmd": inner})}]
|
|
result = tp._sanitize_tool_calls(calls)
|
|
args = json.loads(result[0]["arguments"])
|
|
self.assertEqual(args["cmd"], "echo hello")
|
|
|
|
def test_json_object_cmd(self):
|
|
calls = [{"name": "exec_command",
|
|
"arguments": json.dumps({"cmd": json.dumps({"cmd": "ls -la"})})}]
|
|
result = tp._sanitize_tool_calls(calls)
|
|
args = json.loads(result[0]["arguments"])
|
|
self.assertEqual(args["cmd"], "ls -la")
|
|
|
|
def test_invalid_arguments_json(self):
|
|
calls = [{"name": "exec_command", "arguments": "not json"}]
|
|
result = tp._sanitize_tool_calls(calls)
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0]["arguments"], "not json")
|
|
|
|
|
|
class TestBuildExploreCmd(unittest.TestCase):
|
|
"""Tests for _build_explore_cmd (line ~2830)."""
|
|
|
|
def test_empty_input(self):
|
|
cmd, desc = tp._build_explore_cmd("")
|
|
self.assertIsNone(cmd)
|
|
self.assertIsNone(desc)
|
|
|
|
def test_none_input(self):
|
|
cmd, desc = tp._build_explore_cmd(None)
|
|
self.assertIsNone(cmd)
|
|
self.assertIsNone(desc)
|
|
|
|
def test_no_url(self):
|
|
cmd, desc = tp._build_explore_cmd("just some text without urls")
|
|
self.assertIsNone(cmd)
|
|
self.assertIsNone(desc)
|
|
|
|
@patch.object(tp, "_IS_WINDOWS", True)
|
|
def test_windows_command(self):
|
|
cmd, desc = tp._build_explore_cmd("check out https://github.com/owner/repo")
|
|
self.assertIsNotNone(cmd)
|
|
self.assertIn("Invoke-WebRequest", cmd)
|
|
self.assertIn("$env:TEMP", cmd)
|
|
self.assertIn("README", desc)
|
|
|
|
@patch.object(tp, "_IS_WINDOWS", False)
|
|
def test_linux_command(self):
|
|
cmd, desc = tp._build_explore_cmd("check out https://github.com/owner/repo")
|
|
self.assertIsNotNone(cmd)
|
|
self.assertIn("curl", cmd)
|
|
self.assertIn("/tmp", cmd)
|
|
|
|
@patch.object(tp, "_IS_WINDOWS", False)
|
|
def test_git_suffix_stripped(self):
|
|
cmd, desc = tp._build_explore_cmd("https://github.com/owner/repo.git")
|
|
self.assertIsNotNone(cmd)
|
|
# The api_base should not have .git
|
|
self.assertNotIn("repo.git", cmd)
|
|
self.assertIn("repo", cmd)
|
|
|
|
@patch.object(tp, "_IS_WINDOWS", False)
|
|
def test_url_from_json_list(self):
|
|
text = json.dumps([{"content": "see https://gitea.com/user/proj for details"}])
|
|
cmd, desc = tp._build_explore_cmd(text)
|
|
self.assertIsNotNone(cmd)
|
|
self.assertIn("gitea.com", cmd)
|
|
|
|
@patch.object(tp, "_IS_WINDOWS", False)
|
|
def test_api_v1_url_passthrough(self):
|
|
cmd, desc = tp._build_explore_cmd("https://gitea.com/api/v1/repos/user/proj")
|
|
self.assertIsNotNone(cmd)
|
|
self.assertIn("api/v1/repos", cmd)
|
|
|
|
|
|
class TestCompactInput(unittest.TestCase):
|
|
"""Tests for _compact_input (line ~1657)."""
|
|
|
|
def test_string_passthrough(self):
|
|
self.assertEqual(tp._compact_input("hello"), "hello")
|
|
|
|
def test_small_list_passthrough(self):
|
|
items = [{"type": "message", "role": "user", "content": "hi"}]
|
|
result = tp._compact_input(items)
|
|
self.assertEqual(result, items)
|
|
|
|
def test_truncates_large_tool_output(self):
|
|
long_output = "x" * 10000
|
|
items = [{"type": "function_call_output", "output": long_output}]
|
|
result = tp._compact_input(items)
|
|
self.assertEqual(len(result), 1)
|
|
self.assertIn("truncated", result[0]["output"])
|
|
self.assertLess(len(result[0]["output"]), 10000)
|
|
|
|
|
|
class TestAdaptiveCompact(unittest.TestCase):
|
|
"""Tests for _adaptive_compact (line ~1820)."""
|
|
|
|
def test_within_budget(self):
|
|
items = [{"type": "message", "role": "user", "content": "hi"}]
|
|
result, changed = tp._adaptive_compact(items, "gpt-4o")
|
|
self.assertFalse(changed)
|
|
self.assertEqual(result, items)
|
|
|
|
def test_string_passthrough(self):
|
|
result, changed = tp._adaptive_compact("hello", "gpt-4o")
|
|
self.assertFalse(changed)
|
|
self.assertEqual(result, "hello")
|
|
|
|
|
|
class TestUpstreamTarget(unittest.TestCase):
|
|
"""Tests for upstream_target (line ~1409)."""
|
|
|
|
def test_no_trailing_slash(self):
|
|
self.assertEqual(tp.upstream_target("https://api.example.com", "/v1/chat"),
|
|
"https://api.example.com/v1/chat")
|
|
|
|
def test_trailing_slash(self):
|
|
self.assertEqual(tp.upstream_target("https://api.example.com/", "/v1/chat"),
|
|
"https://api.example.com/v1/chat")
|
|
|
|
def test_suffix_already_present(self):
|
|
self.assertEqual(tp.upstream_target("https://api.example.com/v1/chat", "/v1/chat"),
|
|
"https://api.example.com/v1/chat")
|
|
|
|
|
|
class TestCcInputToMessages(unittest.TestCase):
|
|
"""Tests for cc_input_to_messages (line ~2334)."""
|
|
|
|
def test_string_input(self):
|
|
result = tp.cc_input_to_messages("hello")
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0], {"role": "user", "content": "hello"})
|
|
|
|
def test_with_instructions(self):
|
|
result = tp.cc_input_to_messages("hello", instructions="be helpful")
|
|
self.assertEqual(len(result), 2)
|
|
self.assertEqual(result[0]["role"], "user")
|
|
self.assertEqual(result[0]["content"], "be helpful")
|
|
|
|
def test_message_items(self):
|
|
items = [
|
|
{"type": "message", "role": "user", "content": [{"type": "input_text", "text": "hi"}]},
|
|
]
|
|
result = tp.cc_input_to_messages(items)
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0]["content"], "hi")
|
|
|
|
def test_function_call_as_text(self):
|
|
items = [
|
|
{"type": "function_call", "call_id": "c1", "name": "bash",
|
|
"arguments": json.dumps({"cmd": "ls"})},
|
|
]
|
|
result = tp.cc_input_to_messages(items)
|
|
# Tool calls should be inline JSON text in assistant message
|
|
self.assertTrue(any(m["role"] == "assistant" for m in result))
|
|
|
|
def test_function_call_output_as_user(self):
|
|
items = [
|
|
{"type": "function_call_output", "call_id": "c1", "output": "result text"},
|
|
]
|
|
result = tp.cc_input_to_messages(items)
|
|
self.assertTrue(any(m["role"] == "user" and "result text" in m.get("content", "") for m in result))
|
|
|
|
def test_non_dict_items_skipped(self):
|
|
items = ["not a dict", 42]
|
|
result = tp.cc_input_to_messages(items)
|
|
self.assertEqual(len(result), 0)
|
|
|
|
def test_non_list_non_string_returns_empty(self):
|
|
result = tp.cc_input_to_messages(42)
|
|
self.assertEqual(result, [])
|
|
|
|
def test_role_normalization(self):
|
|
items = [
|
|
{"type": "message", "role": "system", "content": "sys"},
|
|
]
|
|
result = tp.cc_input_to_messages(items)
|
|
# system role should become user
|
|
self.assertEqual(result[0]["role"], "user")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|