#!/usr/bin/env python3 """ translate-proxy.py — Responses API → backend API translation proxy. Backends: openai-compat — any OpenAI-compatible Chat Completions API anthropic — Anthropic Messages API command-code — CommandCode /alpha/generate (Z.AI GLM Coding Plan) Usage: python3 translate-proxy.py --config proxy-config.json python3 translate-proxy.py --backend command-code --target-url https://... --api-key sk-... ═══════════════════════════════════════════════════════════════════ COMMANDCODE ADAPTER — FIX HISTORY (2026-05-22) ═══════════════════════════════════════════════════════════════════ This file contains multiple rounds of fixes for the CommandCode adapter. Each fix addresses a specific failure mode observed in production. They are documented here for future maintainability. FIX 1: Content blocks rejected by CC API (root cause of initial 400 errors) Symptom: {"error":{"message":"params.messages[i].content expected string, received array"}} Cause: cc_input_to_messages emitted tool results as content blocks [{"type":"tool_result",...}] Fix: All messages now use string content. Tool results as role="user" with plain text. Location: cc_input_to_messages() ~line 1085 FIX 2: x-command-code-version header dropped during rewrite Symptom: HTTP 403 upgrade_required from CommandCode API Cause: _handle_command_code rewrite removed the header line Fix: Always send x-command-code-version header with fallback "0.26.8" Location: _handle_command_code() header setup block FIX 3: Stale schema cache with wrong content_type=array Symptom: SchemaAdapter used content_type="array" causing content blocks in auto path Cause: ErrorAnalyzer learned incorrect schema from error message text Fix: Cleared provider-caps.json; added 24h staleness TTL to _load_schema() Location: _load_schema(), provider-caps.json FIX 4: Stream disconnect before completion (client-side "stream disconnected") Symptom: Client sees partial SSE then connection close, no response.completed event Cause: No try/except around streaming path; exceptions crashed handler mid-stream Fix: Wrapped stream_buffered_events in try/except; sends response.completed(status:"failed") on crash Location: _handle_command_code() streaming section FIX 5: Tool calls echoed as text instead of being parsed (THE BIG ONE) Symptom: Model generates inline JSON tool calls like {"type":"tool-call","id":"...","name":"exec_command","arguments":"{...}"} These appear as raw text in the conversation. The tool is never executed. Root cause chain: a) cc_input_to_messages sends tool calls as inline JSON text in assistant messages b) The CC model echoes back similar JSON in its text-delta response c) _parse_commandcode_text_tool_calls only handled XML format (``` ``) d) Raw JSON tool calls passed through as plain text → client shows them unparsed Fix: Added _extract_raw_json_tool_calls() with field-level regex extraction. Handles BOTH malformed (unescaped inner quotes) AND properly escaped JSON. Three-tier parse: direct json.loads → unescape \"→\" → unicode_escape decode. Location: _extract_args(), _extract_field(), _extract_raw_json_tool_calls() FIX 6: Double-wrapped arguments (nested {"cmd": "{\"cmd\": \"curl...\"}"}") Symptom: args={"cmd": "{\\\"cmd\\\": \\\"curl...\\\"}"} Tool executor receives cmd = the literal string '{"cmd": "curl..."', not the actual curl command. Root cause: When model generates properly escaped JSON ("arguments": "{\\"cmd\\": \\"...\\"}"), _extract_args naive brace-counting returns raw text with escaped quotes. json.loads(raw) fails on \\ at structural level. Fallback sets args["cmd"] = raw_string → double-wrapped. Fix: _extract_args now tries 3 parse strategies before returning. Also normalizes sandbox_permissions from parsed args dict (not raw snippet). Location: _extract_args() three-tier parser, sandbox_permissions normalization FIX 7: _extract_field can't read values starting with \" Symptom: sandbox_permissions="allow_all" passes through unnormalized because _extract_field sees val_start=\ (backslash) which != " or { → returns None Fix: Skip leading backslash before checking for " or { value type. Location: _extract_field() leading-\ skip FIX 8: Adaptive probing caused format mismatch (REVERTED) Symptom: Probe system discovered OpenAI tool_calls+role=tool format but CC API couldn't process multi-turn tool loops correctly with it. Fix: Removed probe system entirely. Use conservative format only: - Inline JSON text for tool calls (cc_input_to_messages default) - role="user" for all tool results - ErrorAnalyzer learning on retries (not proactive probes) Location: Reverted to cc_input_to_messages(), removed _build_cc_messages + _probe_cc_format ═══════════════════════════════════════════════════════════════════ """ import json, http.server, socketserver, urllib.request, urllib.parse, urllib.error, re import time, uuid, os, sys, argparse, threading, socket, collections, contextlib, signal import dataclasses # ═══════════════════════════════════════════════════════════════════ # Config # ═══════════════════════════════════════════════════════════════════ DEFAULT_MODELS = { "openai-compat": [ {"id": "gpt-4o-mini", "object": "model", "created": 1700000000, "owned_by": "custom"}, ], "anthropic": [ {"id": "claude-sonnet-4-20250514", "object": "model", "created": 1700000000, "owned_by": "anthropic"}, ], "auto": [ {"id": "default-model", "object": "model", "created": 1700000000, "owned_by": "auto"}, ], } def load_config(): p = argparse.ArgumentParser(description="Responses API translation proxy") p.add_argument("--config", help="JSON config file path") p.add_argument("--port", type=int, default=None) p.add_argument("--backend", default=None, choices=["openai-compat", "anthropic", "command-code", "auto"]) p.add_argument("--target-url", default=None) p.add_argument("--api-key", default=None) p.add_argument("--models-file", default=None, help="JSON file with model list array") args = p.parse_args() cfg = {} if args.config: with open(args.config) as f: cfg = json.load(f) for ck, ak in [("port", "port"), ("backend_type", "backend"), ("target_url", "target_url"), ("api_key", "api_key")]: v = getattr(args, ak, None) if v is not None: cfg[ck] = v env_map = { "port": ("PROXY_PORT", "ZAI_PROXY_PORT", int), "backend_type": ("PROXY_BACKEND", None, str), "target_url": ("PROXY_TARGET_URL", "ZAI_BASE_URL", str), "api_key": ("PROXY_API_KEY", "ZAI_API_KEY", str), } for ck, (ev1, ev2, conv) in env_map.items(): if ck not in cfg: v = os.environ.get(ev1) or (os.environ.get(ev2) if ev2 else None) if v: cfg[ck] = conv(v) if conv == int else v cfg.setdefault("port", 8080) cfg.setdefault("backend_type", "openai-compat") cfg.setdefault("target_url", "http://localhost:11434/v1") cfg.setdefault("api_key", "") models = cfg.get("models", []) if not models and args.models_file: with open(args.models_file) as f: models = json.load(f) if not models: models = DEFAULT_MODELS.get(cfg["backend_type"], []) cfg["models"] = models return cfg CONFIG = None PORT = 8080 BACKEND = "openai-compat" TARGET_URL = "" API_KEY = "" OAUTH_PROVIDER = "" MODELS = [] CC_VERSION = "" REASONING_ENABLED = True REASONING_EFFORT = "medium" BGP_ROUTES = [] SERVER = None _LOG_DIR = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy") os.makedirs(_LOG_DIR, exist_ok=True) _REQUESTS_DIR = os.path.join(_LOG_DIR, "requests") os.makedirs(_REQUESTS_DIR, exist_ok=True) _stats_path = os.path.join(_LOG_DIR, "usage-stats.json") _provider_caps_path = os.path.join(_LOG_DIR, "provider-caps.json") _stats_lock = threading.Lock() _stats_pending = [] _stats_flush_timer = None _STATS_FLUSH_INTERVAL = 5.0 _response_store = collections.OrderedDict() _response_store_lock = threading.Lock() _MAX_STORED = 50 _crof_lock = threading.Lock() _provider_caps_lock = threading.Lock() _provider_caps = None _shutdown_requested = False _active_connections = 0 _active_connections_lock = threading.Lock() _active_requests = {} _active_requests_lock = threading.Lock() _pool = uuid.uuid4().hex[:8] _antigravity_version = "1.18.3" _antigravity_version_checked = 0 _antigravity_version_lock = threading.Lock() def _fetch_antigravity_version(): cache_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", "antigravity-version.json") try: with open(cache_path) as f: cached = json.load(f) if cached.get("version") and cached.get("checked_at", 0) > time.time() - 6 * 3600: return cached["version"] except Exception: pass urls = [ ("https://antigravity-auto-updater-974169037036.us-central1.run.app", None), ("https://antigravity.google/changelog", 5000), ] for url, limit in urls: try: req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) resp = urllib.request.urlopen(req, timeout=5) text = resp.read().decode(errors="replace") if limit: text = text[:limit] m = re.search(r"\d+\.\d+\.\d+", text) if m: version = m.group(0) try: os.makedirs(os.path.dirname(cache_path), exist_ok=True) with open(cache_path, "w") as f: json.dump({"version": version, "checked_at": time.time()}, f) except Exception: pass return version except Exception: pass return _antigravity_version def _ensure_antigravity_version(): global _antigravity_version, _antigravity_version_checked if time.time() - _antigravity_version_checked < 6 * 3600: return _antigravity_version with _antigravity_version_lock: if time.time() - _antigravity_version_checked < 6 * 3600: return _antigravity_version _antigravity_version = _fetch_antigravity_version() _antigravity_version_checked = time.time() return _antigravity_version def _init_runtime(): global CONFIG, PORT, BACKEND, TARGET_URL, API_KEY, OAUTH_PROVIDER, _antigravity_version global MODELS, CC_VERSION, REASONING_ENABLED, REASONING_EFFORT, BGP_ROUTES CONFIG = load_config() PORT = CONFIG["port"] BACKEND = CONFIG["backend_type"] TARGET_URL = CONFIG["target_url"].rstrip("/") API_KEY = CONFIG["api_key"] OAUTH_PROVIDER = CONFIG.get("oauth_provider") or "" MODELS = CONFIG["models"] CC_VERSION = CONFIG.get("cc_version", "") REASONING_ENABLED = CONFIG.get("reasoning_enabled", True) REASONING_EFFORT = CONFIG.get("reasoning_effort", "medium") BGP_ROUTES = CONFIG.get("bgp_routes", []) if OAUTH_PROVIDER == "google-antigravity": _antigravity_version = _ensure_antigravity_version() print(f"[antigravity] version={_antigravity_version}", file=sys.stderr) bgp_models = [] for _r in BGP_ROUTES: for _m in _r.get("models", [{"id": _r.get("model", "unknown")}]): mid = _m.get("id", _m) if isinstance(_m, dict) else _m if mid not in bgp_models: bgp_models.append(mid) if BGP_ROUTES and not MODELS: MODELS = [{"id": m, "object": "model", "created": 1700000000, "owned_by": "bgp"} for m in bgp_models] CONFIG["models"] = MODELS if (BACKEND or "").startswith("gemini-oauth") and (OAUTH_PROVIDER or "").startswith("google"): token_name = "google-antigravity-oauth-token.json" if OAUTH_PROVIDER == "google-antigravity" else "google-cli-oauth-token.json" token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", token_name) try: with open(token_path) as _tf: _td = json.load(_tf) _discovered = [] if OAUTH_PROVIDER == "google-antigravity" else _td.get("available_models", []) if _discovered: _seen = [] for _m in _discovered: if _m not in _seen: _seen.append(_m) MODELS = [{"id": m, "object": "model", "created": 1700000000, "owned_by": "gemini-oauth"} for m in _seen] CONFIG["models"] = MODELS print(f"[gemini-oauth] loaded {len(_seen)} discovered models: {_seen}", file=sys.stderr) except Exception: pass def _provider_cap_key(target_url=None, backend=None, model=None): host = urllib.parse.urlparse(target_url or TARGET_URL).netloc.lower() return f"{backend or BACKEND}|{host}|{model or '*'}" def _load_provider_caps(): global _provider_caps with _provider_caps_lock: if _provider_caps is not None: return _provider_caps try: with open(_provider_caps_path) as f: _provider_caps = json.load(f) except Exception: _provider_caps = {} return _provider_caps def _save_provider_caps(): try: os.makedirs(os.path.dirname(_provider_caps_path), exist_ok=True) with open(_provider_caps_path, "w") as f: json.dump(_provider_caps or {}, f, indent=2) except Exception as e: print(f"[provider-sensor] failed to save caps: {e}", file=sys.stderr) def _provider_cap(model, key, default=None): caps = _load_provider_caps() specific = caps.get(_provider_cap_key(model=model), {}) generic = caps.get(_provider_cap_key(model="*"), {}) return specific.get(key, generic.get(key, default)) def _set_provider_cap(model, key, value, reason=""): caps = _load_provider_caps() cap_key = _provider_cap_key(model=model) caps.setdefault(cap_key, {})[key] = value caps[cap_key]["reason"] = reason caps[cap_key]["updated_at"] = time.time() _save_provider_caps() print(f"[provider-sensor] learned {cap_key}: {key}={value} reason={reason}", file=sys.stderr) def _refresh_oauth_token(): return _refresh_oauth_token_for(API_KEY, OAUTH_PROVIDER) def _refresh_oauth_token_for(api_key, oauth_provider): oauth_provider = oauth_provider or "" if not oauth_provider.startswith("google"): return api_key token_name = "google-antigravity-oauth-token.json" if oauth_provider == "google-antigravity" else "google-cli-oauth-token.json" token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", token_name) if not os.path.exists(token_path): return api_key try: with open(token_path) as f: tokens = json.load(f) if tokens.get("expires_at", 0) > time.time() + 60: return tokens.get("access_token", api_key) client_id = tokens.get("client_id", "") client_secret = tokens.get("client_secret", "") refresh_token = tokens.get("refresh_token", "") if not all([client_id, client_secret, refresh_token]): return tokens.get("access_token", api_key) print("[oauth] refreshing Google access token...", file=sys.stderr) data = urllib.parse.urlencode({ "client_id": client_id, "client_secret": client_secret, "refresh_token": refresh_token, "grant_type": "refresh_token", }).encode() req = urllib.request.Request("https://oauth2.googleapis.com/token", data=data, headers={"Content-Type": "application/x-www-form-urlencoded"}) resp = urllib.request.urlopen(req, timeout=30) new_tokens = json.loads(resp.read()) tokens["access_token"] = new_tokens.get("access_token", tokens.get("access_token")) tokens["expires_at"] = time.time() + new_tokens.get("expires_in", 3600) with open(token_path, "w") as f: json.dump(tokens, f, indent=2) print("[oauth] token refreshed OK", file=sys.stderr) return tokens["access_token"] except Exception as e: print(f"[oauth] refresh failed: {e}", file=sys.stderr) return API_KEY # ═══════════════════════════════════════════════════════════════════ # Shared helpers # ═══════════════════════════════════════════════════════════════════ _pool = uuid.uuid4().hex[:8] def _load_stats(): try: if os.path.exists(_stats_path): return json.load(open(_stats_path)) except Exception: pass return {"providers": {}, "updated": None} def _atomic_write_json(path, obj): tmp = path + ".tmp" with open(tmp, "w") as f: json.dump(obj, f, indent=2, ensure_ascii=False) os.replace(tmp, path) def _flush_stats(): global _stats_flush_timer with _stats_lock: batch = list(_stats_pending) _stats_pending.clear() _stats_flush_timer = None if not batch: return stats = _load_stats() for entry in batch: provider = entry["provider"] model = entry["model"] p = stats["providers"].setdefault(provider, { "total_requests": 0, "successes": 0, "failures": 0, "total_tokens_in": 0, "total_tokens_out": 0, "total_duration_s": 0.0, "models": {}, "last_used": None, "last_error": None, }) p["total_requests"] += 1 p["total_tokens_in"] += entry["tokens_in"] p["total_tokens_out"] += entry["tokens_out"] p["total_duration_s"] += entry["duration_s"] p["last_used"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(entry["ts"])) if entry["success"]: p["successes"] += 1 else: p["failures"] += 1 p["last_error"] = entry.get("error_type") or "unknown" m = p["models"].setdefault(model, {"requests": 0, "tokens_in": 0, "tokens_out": 0}) m["requests"] += 1 m["tokens_in"] += entry["tokens_in"] m["tokens_out"] += entry["tokens_out"] stats["updated"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) _atomic_write_json(_stats_path, stats) def _record_usage(provider, model, success, duration_s, tokens_in=0, tokens_out=0, error_type=None): global _stats_flush_timer entry = { "provider": provider or "unknown", "model": model or "unknown", "success": bool(success), "duration_s": float(duration_s or 0), "tokens_in": int(tokens_in or 0), "tokens_out": int(tokens_out or 0), "error_type": error_type, "ts": time.time(), } with _stats_lock: _stats_pending.append(entry) if _stats_flush_timer is None: _stats_flush_timer = threading.Timer(_STATS_FLUSH_INTERVAL, _flush_stats) _stats_flush_timer.daemon = True _stats_flush_timer.start() def store_response(resp_id, input_data, output_items): if not resp_id: return with _response_store_lock: _response_store[resp_id] = {"input": input_data, "output": output_items, "ts": time.time()} while len(_response_store) > _MAX_STORED: _response_store.popitem(last=False) def resolve_previous_response(body): prev_id = body.get("previous_response_id") input_data = body.get("input", "") if not prev_id: return input_data with _response_store_lock: stored = _response_store.get(prev_id) if not stored: return input_data prev_input = stored["input"] prev_output = stored["output"] new_input = input_data if isinstance(input_data, list) else [] if isinstance(prev_input, list): combined = list(prev_input) + list(prev_output) + new_input else: combined = [{"type": "message", "role": "user", "content": [{"type": "input_text", "text": str(prev_input)}]}] + list(prev_output) + new_input return combined _HOP_BY_HOP_HEADERS = { "connection", "keep-alive", "proxy-authenticate", "proxy-authorization", "te", "trailers", "transfer-encoding", "upgrade", "host", "content-length", } def uid(prefix="id"): return f"{prefix}-{_pool}-{uuid.uuid4().hex[:12]}" def emit(event, data): return f"event: {event}\ndata: {json.dumps(data)}\n\n" def upstream_target(base_url, suffix): base = base_url.rstrip("/") if base.endswith(suffix): return base return f"{base}{suffix}" _BROWSER_HEADERS = { "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36", "Accept": "application/json, text/event-stream, */*", "Accept-Language": "en-US,en;q=0.9", "Sec-Ch-Ua": '"Chromium";v="137", "Not/A)Brand";v="99"', "Sec-Ch-Ua-Mobile": "?0", "Sec-Ch-Ua-Platform": '"Linux"', "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", } def forwarded_headers(request_headers, extra=None, browser_ua=False): headers = {} if browser_ua: headers.update(_BROWSER_HEADERS) for key, value in request_headers.items(): if key.lower() in _HOP_BY_HOP_HEADERS: continue if browser_ua and key.lower() == "user-agent": continue headers[key] = value if extra: headers.update(extra) return headers _MAX_INPUT_ITEMS = 30 _MAX_TOOL_OUTPUT_CHARS = 8000 _COMPACT_KEEP_RECENT = 10 _CROF_ADAPTIVE = { "fail_history": [], "model_limits": {}, "global_item_limit": 30, "min_keep_recent": 4, } _BGP_STATS_PATH = os.path.join(_LOG_DIR, "bgp-route-stats.json") _bgp_stats_lock = threading.Lock() def _route_key(route): return f"{route.get('name', '')}::{route.get('target_url', '')}::{route.get('model', '')}" def _load_bgp_stats(): try: if os.path.exists(_BGP_STATS_PATH): return json.load(open(_BGP_STATS_PATH)) except Exception: pass return {} def _save_bgp_stats(stats): tmp = _BGP_STATS_PATH + ".tmp" with open(tmp, "w") as f: json.dump(stats, f, indent=2) os.replace(tmp, _BGP_STATS_PATH) def _score_route(route, stats): key = _route_key(route) rs = stats.get(key, {}) now = time.time() if float(rs.get("open_until_ts", 0)) > now: return 1_000_000 priority = int(route.get("priority", 99)) ewma = float(rs.get("ewma_latency_s", 0)) failures = int(rs.get("consecutive_failures", 0)) score = priority + min(ewma * 5, 50) + failures * 20 if float(rs.get("rate_limited_until", 0)) > now: score += 500 return score def _update_route_stats(route, success, duration_s, http_code=None, error_type=None): with _bgp_stats_lock: stats = _load_bgp_stats() key = _route_key(route) rs = stats.setdefault(key, { "ewma_latency_s": duration_s, "consecutive_failures": 0, "last_success": None, "last_failure": None, "open_until_ts": 0, "rate_limited_until": 0, "last_error": None, }) alpha = 0.25 rs["ewma_latency_s"] = alpha * duration_s + (1 - alpha) * float(rs.get("ewma_latency_s", duration_s)) if success: rs["consecutive_failures"] = 0 rs["last_success"] = time.time() else: rs["consecutive_failures"] = int(rs.get("consecutive_failures", 0)) + 1 rs["last_failure"] = time.time() rs["last_error"] = error_type or (f"http_{http_code}" if http_code else "unknown") if http_code == 429: rs["rate_limited_until"] = time.time() + 120 if rs["consecutive_failures"] >= 3: rs["open_until_ts"] = time.time() + 60 rs["consecutive_failures"] = 0 _save_bgp_stats(stats) def _sorted_bgp_routes(): with _bgp_stats_lock: stats = _load_bgp_stats() return sorted(BGP_ROUTES, key=lambda r: _score_route(r, stats)) def _crof_record(model, n_items, success): if not isinstance(n_items, int) or n_items < 1: return entry = {"model": model, "items": n_items, "ok": success} hist = _CROF_ADAPTIVE["fail_history"] hist.append(entry) if len(hist) > 200: _CROF_ADAPTIVE["fail_history"] = hist[-100:] ml = _CROF_ADAPTIVE["model_limits"].setdefault(model, {"ok_max": 30, "fail_min": 0, "limit": 30}) if success and n_items > ml["ok_max"]: ml["ok_max"] = n_items if not success and (ml["fail_min"] == 0 or n_items < ml["fail_min"]): ml["fail_min"] = n_items if ml["fail_min"] > 0 and ml["ok_max"] >= ml["fail_min"]: ml["limit"] = ml["fail_min"] - 1 elif ml["fail_min"] > 0: ml["limit"] = max(ml["fail_min"] - 2, _CROF_ADAPTIVE["min_keep_recent"] + 2) global_limit = 30 for m, v in _CROF_ADAPTIVE["model_limits"].items(): if v.get("limit", 30) < global_limit: global_limit = v["limit"] _CROF_ADAPTIVE["global_item_limit"] = global_limit print(f"[crof-adaptive] model={model} items={n_items} {'OK' if success else 'FAIL'} -> limit={ml.get('limit',30)} global={global_limit}", file=sys.stderr) def _crof_item_limit(model): ml = _CROF_ADAPTIVE["model_limits"].get(model, {}) per_model = ml.get("limit", 30) return min(per_model, _CROF_ADAPTIVE["global_item_limit"]) def _crof_compact_for_retry(input_data, model): limit = _crof_item_limit(model) if not isinstance(input_data, list) or len(input_data) <= limit: return input_data keep = max(_CROF_ADAPTIVE["min_keep_recent"], limit // 3) head_end = 0 for i, item in enumerate(input_data): t = item.get("type") if t == "message" and item.get("role") in ("developer", "system"): head_end = i + 1 elif t == "message" and item.get("role") == "user" and head_end == i: head_end = i + 1 else: break head = input_data[:head_end] tail_start = max(head_end, len(input_data) - keep) while tail_start > head_end: t = input_data[tail_start].get("type") r = input_data[tail_start].get("role", "") if t in ("function_call_output", "function_call"): tail_start -= 1 elif t == "message" and r == "assistant": tail_start -= 1 else: break tail = input_data[tail_start:] body = input_data[head_end:tail_start] if not body: return head + tail summary_lines = [f"[Auto-compacted: {len(body)} turns removed (adaptive limit={limit})]"] for item in body[-5:]: summary_lines.append(_item_summary(item, max_len=120)) summary_msg = {"type": "message", "role": "user", "content": [{"type": "input_text", "text": "\n".join(summary_lines)}]} print(f"[crof-adaptive] RETRY compact: {len(input_data)} -> {len(head)+1+len(tail)} (limit={limit}, keep={len(tail)})", file=sys.stderr) return head + [summary_msg] + tail def _item_summary(item, max_len=200): t = item.get("type") if t == "message": role = item.get("role", "?") text = "" for p in item.get("content", []): if p.get("type") in ("input_text", "output_text"): text += p.get("text", "") return f"[{role}] {text[:max_len]}" elif t == "function_call": name = item.get("name", "?") args = item.get("arguments", "{}") try: a = json.loads(args) cmd = a.get("cmd", a.get("command", "")) if cmd: return f"[tool call] {name}: {cmd[:max_len]}" except Exception: pass return f"[tool call] {name}({args[:max_len]})" elif t == "function_call_output": output = item.get("output", "") if len(output) > max_len: return f"[tool result] {output[:max_len]}..." return f"[tool result] {output}" return f"[{t}]" def _extract_files(items): files = set() for item in items: if item.get("type") == "function_call": try: a = json.loads(item.get("arguments", "{}")) cmd = a.get("cmd", a.get("command", "")) for prefix in (">", ">>", " > ", " >> "): for part in cmd.split(prefix)[1:]: f = part.strip().split()[0].strip("'\"") if f and not f.startswith("-") and "/" in f: files.add(f) except Exception: pass return files def _compact_input(input_data): if isinstance(input_data, str): return input_data if not isinstance(input_data, list) or len(input_data) <= _MAX_INPUT_ITEMS: out = [] for item in input_data: if isinstance(item, dict) and item.get("type") == "function_call_output": o = item.get("output", "") if len(o) > _MAX_TOOL_OUTPUT_CHARS: item = dict(item) item["output"] = o[:_MAX_TOOL_OUTPUT_CHARS] + f"\n... [truncated {len(o) - _MAX_TOOL_OUTPUT_CHARS} chars]" print(f"[compact] tool output truncated {len(o)} -> {_MAX_TOOL_OUTPUT_CHARS}", file=sys.stderr) out.append(item) return out head_end = 0 for i, item in enumerate(input_data): t = item.get("type") if t == "message" and item.get("role") in ("developer", "system"): head_end = i + 1 elif t == "message" and item.get("role") == "user" and head_end == i: head_end = i + 1 else: break head = input_data[:head_end] tail_start = len(input_data) - _COMPACT_KEEP_RECENT while tail_start > head_end: t = input_data[tail_start].get("type") r = input_data[tail_start].get("role", "") if t == "function_call_output": tail_start -= 1 elif t == "function_call": tail_start -= 1 elif t == "message" and r == "assistant": tail_start -= 1 else: break tail = input_data[tail_start:] body = input_data[head_end:tail_start] if not body: return head + tail for item in tail: if isinstance(item, dict) and item.get("type") == "function_call_output": o = item.get("output", "") if len(o) > _MAX_TOOL_OUTPUT_CHARS: item["output"] = o[:_MAX_TOOL_OUTPUT_CHARS] + f"\n... [truncated {len(o) - _MAX_TOOL_OUTPUT_CHARS} chars]" user_queries = [] for item in body: if item.get("type") == "message" and item.get("role") == "user": for p in item.get("content", []): if p.get("type") == "input_text": user_queries.append(p.get("text", "")[:300]) assistant_msgs = [] for item in body: if item.get("type") == "message" and item.get("role") == "assistant": for p in item.get("content", []): if p.get("type") == "output_text": assistant_msgs.append(p.get("text", "")[:300]) tool_summaries = [] for item in body: if item.get("type") in ("function_call", "function_call_output"): tool_summaries.append(_item_summary(item, max_len=150)) files = _extract_files(body) summary_lines = [f"[Auto-compacted: {len(body)} earlier turns summarized to preserve context]"] if user_queries: summary_lines.append(f"User requests: {'; '.join(user_queries[-3:])}") if assistant_msgs: summary_lines.append(f"Assistant responses: {'; '.join(assistant_msgs[-3:])}") if tool_summaries: summary_lines.append(f"Actions taken ({len(tool_summaries)} steps):") for ts in tool_summaries[-15:]: summary_lines.append(f" {ts}") if files: summary_lines.append(f"Files touched: {', '.join(sorted(files)[-10:])}") summary_text = "\n".join(summary_lines) summary_msg = { "type": "message", "role": "user", "content": [{"type": "input_text", "text": summary_text}] } print(f"[compact] {len(input_data)} items -> {len(head) + 1 + len(tail)} (compacted {len(body)} old items into summary)", file=sys.stderr) return head + [summary_msg] + tail # ═══════════════════════════════════════════════════════════════════ # Provider policies # ═══════════════════════════════════════════════════════════════════ _PROVIDER_POLICIES = { "crof": {"reasoning_mode": "off", "max_tokens": 32768, "strip_reasoning": True, "tool_output_limit": 4000, "max_input_items": 18, "compaction": "aggressive", "synthetic_tool_results": True}, "chats-llm": {"reasoning_mode": "off", "max_tokens": 32768, "strip_reasoning": True, "tool_output_limit": 4000, "max_input_items": 20, "compaction": "aggressive"}, "z.ai": {"reasoning_mode": "medium", "max_tokens": 65536, "strip_reasoning": True, "tool_output_limit": 8000, "max_input_items": 40, "compaction": "balanced"}, "openrouter": {"reasoning_mode": "provider_default", "max_tokens": 32768, "strip_reasoning": True, "tool_output_limit": 6000, "max_input_items": 35, "compaction": "balanced"}, "openadapter": {"reasoning_mode": "off", "max_tokens": 32768, "strip_reasoning": True, "tool_output_limit": 6000, "max_input_items": 30, "compaction": "balanced"}, } def provider_policy(target_url=None, backend=None): host = urllib.parse.urlparse(target_url or TARGET_URL).netloc.lower() for key, policy in _PROVIDER_POLICIES.items(): if key in host: return policy return {} # ═══════════════════════════════════════════════════════════════════ # Adaptive context compaction (model-aware) # ═══════════════════════════════════════════════════════════════════ _MODEL_CONTEXT = { "gpt-4o": 128000, "gpt-4o-mini": 128000, "gpt-5": 128000, "claude-sonnet": 200000, "claude-haiku": 200000, "glm-5.1": 128000, "glm-5": 128000, "glm-4": 128000, "deepseek": 64000, "gemini-2.5-flash": 1000000, "gemini-2.5-pro": 2000000, "mimo": 32768, "minimax": 32768, "kimi": 128000, "_default": 32768, } def _context_limit_for_model(model): if not model: return _MODEL_CONTEXT["_default"] ml = model.lower() for key, limit in _MODEL_CONTEXT.items(): if key != "_default" and key in ml: return limit return _MODEL_CONTEXT["_default"] def _estimate_tokens(obj): if obj is None: return 0 if isinstance(obj, str): return max(1, len(obj) // 4) try: raw = json.dumps(obj, ensure_ascii=False) except Exception: raw = str(obj) return max(1, len(raw) // 4) def _adaptive_compact(input_data, model, policy=None): policy = policy or {} context_size = int(policy.get("context_size", _context_limit_for_model(model))) input_budget = int(context_size * 0.60) estimated = _estimate_tokens(input_data) if estimated <= input_budget: return input_data, False if not isinstance(input_data, list): return input_data, False reduction = max(0.15, input_budget / max(estimated, 1)) target_items = max(int(len(input_data) * reduction), 6) if target_items >= len(input_data): return input_data, False head_end = 0 for i, item in enumerate(input_data): t = item.get("type") if t == "message" and item.get("role") in ("developer", "system"): head_end = i + 1 elif t == "message" and item.get("role") == "user" and head_end == i: head_end = i + 1 else: break head = input_data[:head_end] keep = max(4, target_items // 3) tail_start = max(head_end, len(input_data) - keep) while tail_start > head_end: t = input_data[tail_start].get("type") if t in ("function_call_output", "function_call"): tail_start -= 1 elif t == "message" and input_data[tail_start].get("role") == "assistant": tail_start -= 1 else: break tail = input_data[tail_start:] body = input_data[head_end:tail_start] if not body: return head + tail, True summary_lines = [f"[Auto-compacted: {len(body)} turns removed (budget={input_budget}tok, model={model})]"] for item in body[-5:]: summary_lines.append(_item_summary(item, max_len=120)) summary_msg = {"type": "message", "role": "user", "content": [{"type": "input_text", "text": "\n".join(summary_lines)}]} print(f"[adaptive-compact] model={model} est={estimated}tok budget={input_budget}tok " f"items {len(input_data)}->{len(head)+1+len(tail)}", file=sys.stderr) return head + [summary_msg] + tail, True # ═══════════════════════════════════════════════════════════════════ # Tool-call pairing validator # ═══════════════════════════════════════════════════════════════════ def validate_tool_pairs(input_items): if not isinstance(input_items, list): return [] calls = {} errors = [] for idx, item in enumerate(input_items): t = item.get("type") if t == "function_call": cid = item.get("call_id") or item.get("id") if cid: calls[cid] = idx elif t == "function_call_output": cid = item.get("call_id") or item.get("id") if not cid or cid not in calls: errors.append({"index": idx, "call_id": cid, "error": "orphan_function_call_output"}) return errors def repair_orphan_tool_outputs(input_items, errors): bad = {e["index"] for e in errors} repaired = [] for idx, item in enumerate(input_items): if idx in bad: output = item.get("output", "") repaired.append({"type": "message", "role": "user", "content": [{"type": "input_text", "text": f"[Proxy: unmatched tool output]\n{str(output)[:4000]}"}]}) else: repaired.append(item) return repaired def synthesize_tool_results_for_chat(input_items): """Convert Responses function_call/function_call_output pairs into plain text. Some OpenAI-compatible providers accept tool calls on the first turn but fail on the next request when role=tool messages are present. For those providers, encode tool outputs as normal user text so the model can continue. """ if not isinstance(input_items, list): return input_items, False calls = {} changed = False out = [] for item in input_items: t = item.get("type") if t == "function_call": cid = item.get("call_id") or item.get("id") or "" calls[cid] = item changed = True continue if t == "function_call_output": cid = item.get("call_id") or item.get("id") or "" call = calls.get(cid, {}) name = call.get("name", "tool") args = call.get("arguments", "{}") output = item.get("output", "") text = ( "Tool execution result. Continue the task using this result. " "Do not repeat the same tool call unless more information is required.\n\n" f"Tool: {name}\nArguments:\n```json\n{str(args)[:2000]}\n```\n" f"Output:\n```\n{str(output)[:8000]}\n```" ) out.append({"type": "message", "role": "user", "content": [{"type": "input_text", "text": text}]}) changed = True continue out.append(item) return out, changed def has_function_call_output(input_items): return isinstance(input_items, list) and any(i.get("type") == "function_call_output" for i in input_items) # ═══════════════════════════════════════════════════════════════════ # Log redaction # ═══════════════════════════════════════════════════════════════════ _SECRET_PATTERNS = [ (r"sk-[A-Za-z0-9_\-]{20,}", "[REDACTED:key]"), (r"sk-ant-[A-Za-z0-9_\-]{20,}", "[REDACTED:anthropic]"), (r"gh[pousr]_[A-Za-z0-9_]{20,}", "[REDACTED:github]"), (r"Bearer\s+[A-Za-z0-9._\-]{20,}", "Bearer [REDACTED]"), ] def _redact(text): if not text: return text import re for pattern, replacement in _SECRET_PATTERNS: text = re.sub(pattern, replacement, text) return text def _redact_json(obj): try: raw = json.dumps(obj, ensure_ascii=False) except Exception: raw = str(obj) return _redact(raw) _MAX_SNAPSHOTS = 200 def save_request_snapshot(request_id, body): if not request_id: return request_id snapshot = { "_meta": { "request_id": request_id, "model": body.get("model", ""), "stream": body.get("stream", False), "ts": time.time(), "ts_iso": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "status": "pending", "duration_s": None, "error": None, }, "request": json.loads(_redact_json(body)), } path = os.path.join(_REQUESTS_DIR, f"{request_id}.json") tmp = path + ".tmp" with open(tmp, "w") as f: json.dump(snapshot, f, ensure_ascii=False, indent=2) os.replace(tmp, path) _rotate_snapshots() return request_id def update_snapshot_response(request_id, status, duration_s=None, error=None): if not request_id: return path = os.path.join(_REQUESTS_DIR, f"{request_id}.json") if not os.path.exists(path): return try: with open(path) as f: snapshot = json.load(f) meta = snapshot.get("_meta", {}) meta["status"] = status if duration_s is not None: meta["duration_s"] = round(duration_s, 3) if error is not None: meta["error"] = str(error)[:200] snapshot["_meta"] = meta tmp = path + ".tmp" with open(tmp, "w") as f: json.dump(snapshot, f, ensure_ascii=False, indent=2) os.replace(tmp, path) except Exception: pass def _rotate_snapshots(): try: files = sorted( [os.path.join(_REQUESTS_DIR, f) for f in os.listdir(_REQUESTS_DIR) if f.endswith(".json")], key=os.path.getmtime, ) while len(files) > _MAX_SNAPSHOTS: os.remove(files.pop(0)) except Exception: pass # ═══════════════════════════════════════════════════════════════════ # Rate-limit token buckets # ═══════════════════════════════════════════════════════════════════ class TokenBucket: def __init__(self, capacity=10, refill=1.0): self.capacity = float(capacity) self.tokens = float(capacity) self.refill = float(refill) self.updated = time.monotonic() self.lock = threading.Lock() def allow(self, cost=1): with self.lock: now = time.monotonic() self.tokens = min(self.capacity, self.tokens + (now - self.updated) * self.refill) self.updated = now if self.tokens >= cost: self.tokens -= cost return True return False _rate_buckets = {} _rate_buckets_lock = threading.Lock() def _bucket_for_route(route): name = route.get("name") or route.get("target_url") or "default" with _rate_buckets_lock: if name not in _rate_buckets: _rate_buckets[name] = TokenBucket(capacity=10, refill=1.0) return _rate_buckets[name] # ═══════════════════════════════════════════════════════════════════ # OpenAI-compat backend # ═══════════════════════════════════════════════════════════════════ def oa_input_to_messages(input_data): msgs = [] tool_name_by_id = {} if isinstance(input_data, str): msgs.append({"role": "user", "content": input_data}) elif isinstance(input_data, list): pending_tool_calls = [] last_flushed_ids = [] for item in input_data: t = item.get("type") if t == "function_call": tcid = item.get("call_id") or item.get("id") or uid("tc") pending_tool_calls.append( {"id": tcid, "type": "function", "function": {"name": item.get("name", ""), "arguments": item.get("arguments", "{}")}}) tool_name_by_id[tcid] = item.get("name", "") continue if pending_tool_calls: last_flushed_ids = [tc["id"] for tc in pending_tool_calls] msgs.append({"role": "assistant", "content": None, "tool_calls": pending_tool_calls}) pending_tool_calls = [] if t == "message": role = item.get("role", "user") if role == "developer": role = "system" text = "" content = item.get("content", []) if isinstance(content, str): text = content else: for part in content: if isinstance(part, str): text += part continue pt = part.get("type", "") if pt in ("input_text", "output_text"): text += part.get("text", "") elif pt == "input_image": img = part.get("image_url", part) msgs.append({"role": role, "content": [{"type": "text", "text": text}, {"type": "image_url", "image_url": img}]}) text = None break if text is not None: msgs.append({"role": role, "content": text}) elif t == "function_call_output": tcid = item.get("call_id") or item.get("id") or "" if not tcid and last_flushed_ids: idx = len([m for m in msgs if m.get("role") == "tool"]) if idx < len(last_flushed_ids): tcid = last_flushed_ids[idx] msgs.append({"role": "tool", "tool_call_id": tcid, "tool_name": tool_name_by_id.get(tcid, ""), "content": item.get("output", "")}) if pending_tool_calls: msgs.append({"role": "assistant", "content": None, "tool_calls": pending_tool_calls}) return msgs def cc_input_to_messages(input_data, instructions="", schema=None): """Convert Responses API input into CommandCode /alpha/generate messages. [FIX 1] All messages use STRING content (not content blocks). CC API rejects params.messages[i].content when it's an array. Tool results are role="user" with plain text content. Tool calls: inline JSON text in assistant messages (e.g. {"type":"tool-call","id":"..."}). The model echoes this format back in its response text-delta events. _parse_commandcode_text_tool_calls extracts them via _extract_raw_json_tool_calls. Schema parameter is accepted but not used for format decisions — the conservative string-content format is always used regardless of schema hints. """ msgs = [] pending_tool_calls = [] last_flushed_ids = [] def text_from_content(content): if isinstance(content, str): return content text = "" for part in content or []: if isinstance(part, str): text += part continue if not isinstance(part, dict): continue if part.get("type") in ("input_text", "output_text", "text"): text += part.get("text", "") return text def flush_tool_calls(): nonlocal pending_tool_calls, last_flushed_ids if not pending_tool_calls: return last_flushed_ids = [tc["id"] for tc in pending_tool_calls] # Tool calls as plain text in assistant message tc_text = "\n".join( json.dumps(tc, ensure_ascii=False) for tc in pending_tool_calls ) msgs.append({"role": "assistant", "content": tc_text}) pending_tool_calls = [] if instructions: msgs.append({"role": "user", "content": instructions}) if isinstance(input_data, str): msgs.append({"role": "user", "content": input_data}) return msgs if not isinstance(input_data, list): return msgs for item in input_data: if not isinstance(item, dict): continue t = item.get("type") if t == "function_call": tcid = item.get("call_id") or item.get("id") or uid("call") name = item.get("name") or "exec_command" pending_tool_calls.append({ "type": "tool-call", "id": tcid, "name": name, "arguments": item.get("arguments") or "{}", }) continue flush_tool_calls() if t == "message": role = item.get("role", "user") if role not in ("user", "assistant"): role = "user" text = text_from_content(item.get("content", [])) msgs.append({"role": role, "content": text}) elif t == "function_call_output": output = item.get("output", "") if not isinstance(output, str): output = json.dumps(output, ensure_ascii=False) # /alpha/generate expects string content for ALL messages msgs.append({"role": "user", "content": output[:8000]}) flush_tool_calls() return msgs def oa_convert_tools(tools): if not tools: return None out = [] for t in tools: if t.get("type") != "function": continue fn = t.get("function", {}) if fn: out.append(t) else: out.append({ "type": "function", "function": {"name": t.get("name", ""), "description": t.get("description", ""), "parameters": t.get("parameters", {})} }) return out or None def oa_resp_to_responses(chat_resp, model, resp_id=None): choice = chat_resp["choices"][0] msg = choice["message"] content = msg.get("content") or "" finish = choice.get("finish_reason", "stop") fm = {"stop": "completed", "length": "incomplete", "tool_calls": "completed", "content_filter": "incomplete"} status = fm.get(finish, "incomplete") outputs = [] if content: outputs.append({"type": "message", "id": uid("msg"), "role": "assistant", "status": "completed", "content": [{"type": "output_text", "text": content, "annotations": []}]}) for tc in msg.get("tool_calls") or []: fn = tc.get("function", {}) outputs.append({"type": "function_call", "id": uid("fc"), "call_id": tc.get("id"), "name": fn.get("name"), "arguments": fn.get("arguments", "{}"), "status": "completed"}) usage = chat_resp.get("usage", {}) return {"id": resp_id or uid("resp"), "object": "response", "created": int(time.time()), "model": model, "status": status, "output": outputs, "usage": {"input_tokens": usage.get("prompt_tokens", 0), "output_tokens": usage.get("completion_tokens", 0), "total_tokens": usage.get("total_tokens", 0), "input_tokens_details": {"cached_tokens": usage.get("prompt_tokens_details", {}).get("cached_tokens", 0)}}} def oa_stream_to_sse(chat_stream, model, req_id): resp_id = req_id or uid("resp") msg_id = uid("msg") text_buf = "" tc_buf = {} fr = None msg_opened = False yield emit("response.created", {"type": "response.created", "response": {"id": resp_id, "object": "response", "model": model, "status": "in_progress", "created": int(time.time()), "output": []}}) yield emit("response.in_progress", {"type": "response.in_progress", "response": {"id": resp_id}}) for line in chat_stream: line = line.decode("utf-8", errors="replace").strip() if not line or line.startswith(":") or line == "data: [DONE]": continue if not line.startswith("data: "): continue try: chunk = json.loads(line[6:]) except json.JSONDecodeError: continue choices = chunk.get("choices", []) if not choices: continue delta = choices[0].get("delta", {}) fr = choices[0].get("finish_reason") content = delta.get("content") if content: if not msg_opened: msg_id = uid("msg") yield emit("response.output_item.added", {"type": "response.output_item.added", "item": {"type": "message", "id": msg_id, "role": "assistant", "status": "in_progress", "content": []}}) yield emit("response.content_part.added", {"type": "response.content_part.added", "part": {"type": "output_text", "text": "", "annotations": []}, "item_id": msg_id}) msg_opened = True text_buf += content yield emit("response.output_text.delta", {"type": "response.output_text.delta", "delta": content, "item_id": msg_id, "content_index": 0}) for tc in delta.get("tool_calls") or []: idx = tc.get("index", 0) if idx not in tc_buf: fid = uid("fc") tc_buf[idx] = {"id": fid, "call_id": tc.get("id", fid), "name": "", "args": ""} yield emit("response.output_item.added", {"type": "response.output_item.added", "item": {"type": "function_call", "id": fid, "call_id": tc_buf[idx]["call_id"], "name": "", "arguments": "", "status": "in_progress"}}) fn = tc.get("function", {}) if "name" in fn and fn["name"]: tc_buf[idx]["name"] = fn["name"] if "arguments" in fn and fn["arguments"]: tc_buf[idx]["args"] += fn["arguments"] yield emit("response.output_text.delta", {"type": "response.function_call_arguments.delta", "delta": fn["arguments"], "item_id": tc_buf[idx]["id"]}) if msg_opened: yield emit("response.output_text.done", {"type": "response.output_text.done", "text": text_buf, "item_id": msg_id, "content_index": 0}) yield emit("response.content_part.done", {"type": "response.content_part.done", "part": {"type": "output_text", "text": text_buf, "annotations": []}, "item_id": msg_id}) yield emit("response.output_item.done", {"type": "response.output_item.done", "item": {"type": "message", "id": msg_id, "role": "assistant", "status": "completed", "content": [{"type": "output_text", "text": text_buf, "annotations": []}]}}) for idx in sorted(tc_buf): t = tc_buf[idx] yield emit("response.function_call_arguments.done", {"type": "response.function_call_arguments.done", "item_id": t["id"], "name": t["name"], "arguments": t["args"]}) yield emit("response.output_item.done", {"type": "response.output_item.done", "item": {"type": "function_call", "id": t["id"], "call_id": t["call_id"], "name": t["name"], "arguments": t["args"], "status": "completed"}}) fm = {"stop": "completed", "length": "incomplete", "tool_calls": "completed", "content_filter": "incomplete"} status = fm.get(fr, "incomplete") final_out = [] if msg_opened: final_out.append({"type": "message", "id": msg_id, "role": "assistant", "status": "completed", "content": [{"type": "output_text", "text": text_buf, "annotations": []}]}) for idx in sorted(tc_buf): t = tc_buf[idx] final_out.append({"type": "function_call", "id": t["id"], "call_id": t["call_id"], "name": t["name"], "arguments": t["args"], "status": "completed"}) yield emit("response.completed", {"type": "response.completed", "response": {"id": resp_id, "object": "response", "model": model, "status": status, "created": int(time.time()), "output": final_out}}) # ═══════════════════════════════════════════════════════════════════ # Anthropic backend # ═══════════════════════════════════════════════════════════════════ def an_input_to_messages(input_data): msgs = [] if isinstance(input_data, str): msgs.append({"role": "user", "content": input_data}) elif isinstance(input_data, list): for item in input_data: t = item.get("type") if t == "message": role = item.get("role", "user") if role == "developer": role = "user" text = "" for part in item.get("content", []): pt = part.get("type", "") if pt in ("input_text", "output_text"): text += part.get("text", "") if role == "assistant": msgs.append({"role": "assistant", "content": text}) else: msgs.append({"role": "user", "content": text}) elif t == "function_call": msgs.append({"role": "assistant", "content": [ {"type": "tool_use", "id": item.get("call_id", item.get("id", uid("tu"))), "name": item.get("name", ""), "input": json.loads(item.get("arguments", "{}"))} ]}) elif t == "function_call_output": msgs.append({"role": "user", "content": [ {"type": "tool_result", "tool_use_id": item.get("id", ""), "content": item.get("output", "")} ]}) return msgs def an_convert_tools(tools): if not tools: return None out = [] for t in tools: if t.get("type") != "function": continue fn = t.get("function", {}) if fn: out.append({"name": fn.get("name"), "description": fn.get("description", ""), "input_schema": fn.get("parameters", {"type": "object", "properties": {}})}) else: out.append({"name": t.get("name"), "description": t.get("description", ""), "input_schema": t.get("parameters", {"type": "object", "properties": {}})}) return out or None def an_resp_to_responses(anthro_resp, model, resp_id=None): blocks = anthro_resp.get("content", []) sr = anthro_resp.get("stop_reason", "end_turn") sm = {"end_turn": "completed", "max_tokens": "incomplete", "stop_sequence": "completed", "tool_use": "completed"} status = sm.get(sr, "incomplete") outputs = [] for b in blocks: bt = b.get("type", "") if bt == "text": outputs.append({"type": "message", "id": uid("msg"), "role": "assistant", "status": "completed", "content": [{"type": "output_text", "text": b.get("text", ""), "annotations": []}]}) elif bt == "tool_use": outputs.append({"type": "function_call", "id": uid("fc"), "call_id": b.get("id", ""), "name": b.get("name", ""), "arguments": json.dumps(b.get("input", {})), "status": "completed"}) elif bt == "thinking": outputs.append({"type": "reasoning", "id": uid("rsn"), "status": "completed", "content": [{"type": "text", "text": b.get("thinking", "")}]}) usage = anthro_resp.get("usage", {}) return {"id": resp_id or uid("resp"), "object": "response", "created": int(time.time()), "model": model, "status": status, "output": outputs, "usage": {"input_tokens": usage.get("input_tokens", 0), "output_tokens": usage.get("output_tokens", 0), "total_tokens": usage.get("input_tokens", 0) + usage.get("output_tokens", 0), "input_tokens_details": {"cached_tokens": 0}}} def an_stream_to_sse(stream, model, req_id): resp_id = req_id or uid("resp") completed = [] msg_id = uid("msg") text_buf = "" tc_id = None tc_call_id = None tc_name = "" tc_args = "" block_type = None stop_reason = "end_turn" yield emit("response.created", {"type": "response.created", "response": {"id": resp_id, "object": "response", "model": model, "status": "in_progress", "created": int(time.time()), "output": []}}) yield emit("response.in_progress", {"type": "response.in_progress", "response": {"id": resp_id}}) for raw in stream: line = raw.decode("utf-8", errors="replace").strip() if not line: continue if line.startswith("event: "): evt_type = line[7:] continue if not line.startswith("data: "): continue try: data = json.loads(line[6:]) except json.JSONDecodeError: continue et = data.get("type", "") if et == "message_start": pass elif et == "content_block_start": cb_type = data.get("content_block", {}).get("type", "") block_type = cb_type if cb_type == "text": msg_id = uid("msg") yield emit("response.output_item.added", {"type": "response.output_item.added", "item": {"type": "message", "id": msg_id, "role": "assistant", "status": "in_progress", "content": []}}) yield emit("response.content_part.added", {"type": "response.content_part.added", "part": {"type": "output_text", "text": "", "annotations": []}, "item_id": msg_id}) elif cb_type == "tool_use": cb = data.get("content_block", {}) tc_id = uid("fc") tc_call_id = cb.get("id", tc_id) tc_name = cb.get("name", "") yield emit("response.output_item.added", {"type": "response.output_item.added", "item": {"type": "function_call", "id": tc_id, "call_id": tc_call_id, "name": tc_name, "arguments": "", "status": "in_progress"}}) elif cb_type == "thinking": pass elif et == "content_block_delta": dd = data.get("delta", {}) dt = dd.get("type", "") if dt == "text_delta": txt = dd.get("text", "") text_buf += txt yield emit("response.output_text.delta", {"type": "response.output_text.delta", "delta": txt, "item_id": msg_id, "content_index": 0}) elif dt == "input_json_delta": pj = dd.get("partial_json", "") tc_args += pj yield emit("response.output_text.delta", {"type": "response.function_call_arguments.delta", "delta": pj, "item_id": tc_id}) elif dt == "thinking_delta": tk = dd.get("thinking", "") yield emit("response.reasoning.delta", {"type": "response.reasoning.delta", "delta": tk}) elif et == "content_block_stop": if block_type == "text": yield emit("response.output_text.done", {"type": "response.output_text.done", "text": text_buf, "item_id": msg_id, "content_index": 0}) yield emit("response.content_part.done", {"type": "response.content_part.done", "part": {"type": "output_text", "text": text_buf, "annotations": []}, "item_id": msg_id}) yield emit("response.output_item.done", {"type": "response.output_item.done", "item": {"type": "message", "id": msg_id, "role": "assistant", "status": "completed", "content": [{"type": "output_text", "text": text_buf, "annotations": []}]}}) completed.append({"type": "message", "id": msg_id, "role": "assistant", "status": "completed", "content": [{"type": "output_text", "text": text_buf, "annotations": []}]}) text_buf = "" elif block_type == "tool_use": yield emit("response.function_call_arguments.done", {"type": "response.function_call_arguments.done", "item_id": tc_id, "name": tc_name, "arguments": tc_args}) yield emit("response.output_item.done", {"type": "response.output_item.done", "item": {"type": "function_call", "id": tc_id, "call_id": tc_call_id, "name": tc_name, "arguments": tc_args, "status": "completed"}}) completed.append({"type": "function_call", "id": tc_id, "call_id": tc_call_id, "name": tc_name, "arguments": tc_args, "status": "completed"}) tc_id = None tc_args = "" block_type = None elif et == "message_delta": stop_reason = data.get("delta", {}).get("stop_reason", "end_turn") elif et == "message_stop": sm = {"end_turn": "completed", "max_tokens": "incomplete", "stop_sequence": "completed", "tool_use": "completed"} status = sm.get(stop_reason, "incomplete") yield emit("response.completed", {"type": "response.completed", "response": {"id": resp_id, "object": "response", "model": model, "status": status, "created": int(time.time()), "output": completed}}) _DEFAULT_CC_CONFIG = { "workingDir": "/tmp", "date": "", "environment": "linux", "shell": "bash", "files": [], "structure": [], "isGitRepo": False, "currentBranch": "", "mainBranch": "", "gitStatus": "", "recentCommits": [], } def _cc_config(): cfg = dict(_DEFAULT_CC_CONFIG) cfg["date"] = time.strftime("%Y-%m-%d") return cfg def cc_convert_tools(tools): return oa_convert_tools(tools) def _strip_xmlish_tags(text): return re.sub(r"<[^>]+>", "", text or "") def _unwrap_cmd(cmd_val): """[FIX 11] Self-healing: unwrap double-wrapped cmd values. Model sometimes generates: {"cmd": "{\"cmd\": \"actual_command\"}"} Detect when cmd value is itself a JSON object with a nested "cmd" key, and extract the real command string. Recursively unwraps up to 3 levels. """ if not isinstance(cmd_val, str) or not cmd_val.startswith("{"): return cmd_val for _ in range(3): try: inner = json.loads(cmd_val) if isinstance(inner, dict) and "cmd" in inner and isinstance(inner["cmd"], str): cmd_val = inner["cmd"] else: break except Exception: break return cmd_val def _parse_commandcode_text_tool_calls(text): """Parse CommandCode's text-form tool calls into Responses function calls. Handles THREE formats: 1. XML: ``...`` (original) 2. Function: ``...`` (original) 3. [FIX 5] Raw JSON inline: {"type":"tool-call","id":"...","name":"exec_command","arguments":"{...}"} Format 3 exists because cc_input_to_messages sends tool calls as inline JSON text. The CC model echoes this format back in its response. Extraction is done by _extract_raw_json_tool_calls() which is appended after the XML pattern loop. See that function for details on malformed-JSON handling. Tolerant of: unescaped inner quotes, unbalanced braces, missing type/id fields, sandbox_permissions at top level vs nested inside arguments, etc. """ calls = [] if not text: return calls # [FIX 17] DSML tool_call blocks used by the model now. # Example: # <||DSML||tool_calls> # <||DSML||invoke name="exec"> # <||DSML||parameter name="command" string="true">curl ... # <||DSML||parameter name="sandbox_permissions" string="true">require_escalated # <||DSML||parameter name="justification" string="true">... # <||DSML||parameter name="prefix_rule" string="true">["/bin/bash", "-lc", "curl ..."] # # for m in re.finditer(r"<[^>]*tool_calls[^>]*>(.*?)]*tool_calls[^>]*>", text, re.DOTALL | re.IGNORECASE): block = m.group(1) or "" for im in re.finditer(r"<[^>]*invoke[^>]*name=\"([^\"]+)\"[^>]*>(.*?)]*invoke>", block, re.DOTALL | re.IGNORECASE): raw_name = (im.group(1) or "").strip() body = (im.group(2) or "").strip() if not body: continue cmd = None sandbox_permissions = None justification = None # Parameter tags are the canonical source. for pm in re.finditer(r"<[^>]*parameter[^>]*name=\"([^\"]+)\"[^>]*>(.*?)]*parameter>", body, re.DOTALL | re.IGNORECASE): key = (pm.group(1) or "").strip().lower() val = _strip_xmlish_tags(pm.group(2)).strip() if key == "command": cmd = val elif key == "prefix_rule" and not cmd: try: pr_obj = json.loads(val) except Exception: pr_obj = None if isinstance(pr_obj, list) and pr_obj and isinstance(pr_obj[-1], str): cmd = pr_obj[-1] elif key == "sandbox_permissions": sandbox_permissions = val elif key == "justification": justification = val # Fallback: if the body contains a raw JSON command. if not cmd: jm = re.search(r'"(?:command|cmd)"\s*:\s*"((?:[^"\\]|\\.)*)"', body, re.DOTALL) if jm: cmd = jm.group(1).replace('\\n', '\n').replace('\\"', '"').strip() if not cmd: continue tool_name = "exec_command" if raw_name.lower() in ("exec", "bash", "shell", "terminal", "run_command") else raw_name args = {"cmd": _unwrap_cmd(cmd)} if sandbox_permissions: args["sandbox_permissions"] = sandbox_permissions if sandbox_permissions in ("use_default", "require_escalated", "with_user_approval") else "require_escalated" if justification: args["justification"] = justification calls.append({ "full_match": m.group(0), "name": tool_name, "arguments": json.dumps(args, ensure_ascii=False), }) # [FIX 16] Native blocks from CommandCode. # Example: # # sandbox_permissions: require_escalated # justification: ... # prefix_rule: ["/bin/bash", "-lc", "curl ..."] # # Convert into exec_command calls by extracting the command from prefix_rule. for m in re.finditer(r"(.*?)", text, re.DOTALL | re.IGNORECASE): body = (m.group(1) or "").strip() if not body: continue sandbox_permissions = None justification = None cmd = None # Try line-oriented parsing first. for line in body.splitlines(): s = line.strip() if s.lower().startswith("sandbox_permissions:"): sandbox_permissions = s.split(":", 1)[1].strip() elif s.lower().startswith("justification:"): justification = s.split(":", 1)[1].strip() elif s.lower().startswith("prefix_rule:"): pr = s.split(":", 1)[1].strip() try: pr_obj = json.loads(pr) except Exception: pr_obj = None if isinstance(pr_obj, list) and pr_obj: # If the last arg exists, it is typically the shell command. cmd = pr_obj[-1] if isinstance(pr_obj[-1], str) else None elif pr.startswith("[") and pr.endswith("]"): parts = re.findall(r'"((?:[^"\\]|\\.)*)"', pr) if parts: cmd = parts[-1].encode().decode("unicode_escape") # Fallback: grab a shell-looking line if prefix_rule wasn't parseable. if not cmd: for line in body.splitlines(): s = line.strip() if re.match(r"^(curl|wget|python3?|node|npm|pnpm|yarn|cat|ls|find|grep|rg|sed|awk|git|mkdir|touch|printf|echo)\b", s): cmd = s break if not cmd: continue args = {"cmd": cmd} if sandbox_permissions: args["sandbox_permissions"] = sandbox_permissions if sandbox_permissions in ("use_default", "require_escalated", "with_user_approval") else "require_escalated" if justification: args["justification"] = justification calls.append({ "full_match": m.group(0), "name": "exec_command", "arguments": json.dumps(args, ensure_ascii=False), }) # [FIX 15] Native blocks from CommandCode. # Format seen in logs: # \nmessages: [{...}]\n # Treat as an assistant-requested agent call so the loop can continue. for m in re.finditer(r"(.*?)|\s*messages:\s*(\[.*?\])", text, re.DOTALL | re.IGNORECASE): body = m.group(1) or m.group(2) or "" body = body.strip() msgs = None if body: # Prefer explicit JSON array after `messages:`; fall back to raw body. try: msgs = json.loads(body) if body.startswith("[") else None except Exception: msgs = None if msgs is None and body: # Try to extract a JSON array from the body. mm = re.search(r"(\[.*\])", body, re.DOTALL) if mm: try: msgs = json.loads(mm.group(1)) except Exception: msgs = None if msgs is None: msgs = body # Convert explore_agent into a real exec_command so downstream clients can execute it. text_for_url = body if isinstance(body, str) else json.dumps(body, ensure_ascii=False) url_m = re.search(r"https?://[^\s\]'>\"]+", text_for_url) repo_url = url_m.group(0).rstrip(")].,;'") if url_m else "" if repo_url: api_base = repo_url.replace("/admin/", "/api/v1/repos/") # Build a safe, generic exploration command: README + root contents + releases. cmd = ( f"cd /tmp && " f"curl -sL --max-time 15 '{api_base}/contents/README.md' 2>/dev/null | " f"python3 -c \"import sys,json,base64; d=json.load(sys.stdin); print(base64.b64decode(d['content']).decode())\" 2>/dev/null | head -600 && " f"curl -sL --max-time 15 '{api_base}/contents' 2>/dev/null | python3 -c \"import sys,json; d=json.load(sys.stdin); print('\\n'.join(f'{{x.get(\'path\')}} {{x.get(\'type\')}}' for x in d[:50]))\" 2>/dev/null && " f"curl -sL --max-time 15 '{api_base}/releases' 2>/dev/null | python3 -c \"import sys,json; d=json.load(sys.stdin); print(json.dumps(d[:3], indent=2)[:2000])\" 2>/dev/null" ) args = {"cmd": cmd, "justification": "Explore repository to understand the app and gather README, root contents, and releases for the landing page."} else: args = {"cmd": "echo 'explore_agent: unable to extract repository URL'", "justification": "Fallback for explore_agent block without URL."} calls.append({ "full_match": m.group(0), "name": "exec_command", "arguments": json.dumps(args, ensure_ascii=False), }) patterns = [ r"\s]+)['\"]?)?>(.*?)", r"(.*?)", # [FIX 14] CC model actual output: \n{"command":"...", "description":"..."} # No \s*(\{.*?\})(?:\s*= len(text) or text[start] != '{': return -1 depth = 0 i = start in_str = False escape = False while i < len(text): ch = text[i] if escape: escape = False elif ch == '\\': escape = True elif ch == '"': in_str = not in_str elif not in_str: if ch == '{': depth += 1 elif ch == '}': depth -= 1 if depth == 0: return i i += 1 return -1 def _extract_field(text, key, end_chars=',}'): """Extract a field value after "key": in rough JSON text. [FIX 7] Handles values starting with \" (backslash-quote) which occurs when the model generates properly-escaped JSON inside a string value. Without this fix, _extract_field returns None for escaped values, causing sandbox_permissions/justification to not be extracted from the parsed args dict (falling through to raw snippet extraction). Also tolerant of unescaped quotes inside string values. Returns None if key not found or value is empty. """ pat = re.compile(r'"' + re.escape(key) + r'"\s*:\s*', re.DOTALL) m = pat.search(text) if not m: return None val_start = m.end() # Skip leading backslash-escape if the value starts with \" (nested JSON string) if val_start < len(text) and text[val_start] == '\\': val_start += 1 # Check if value is a string if val_start < len(text) and text[val_start] == '"': s = val_start + 1 buf = [] while s < len(text): ch = text[s] if ch == '\\' and s + 1 < len(text): buf.append(text[s+1]) s += 2 elif ch == '"': return ''.join(buf) elif ch in end_chars and not buf: return None else: buf.append(ch) s += 1 return ''.join(buf) # Object value: find balanced brace if val_start < len(text) and text[val_start] == '{': end = _find_balanced_brace(text, val_start) if end > val_start: return text[val_start:end+1] return None def _extract_args(text): """Extract arguments value from tool-call JSON, handling multiple malformed formats. [FIX 6] THREE-TIER PARSER — solves double-wrapped arguments bug: Model generates arguments in TWO different escaped forms: A) Unescaped: "arguments": "{"cmd": "curl ...", "sp": "allow_all"}" → naive brace-counting finds boundaries correctly B) Escaped: "arguments": "{\\"cmd\\": \\"curl...\\"}" → json.loads fails on \\ at structural level → unescape \\" → " and retry → unicode_escape decode and retry Returns the raw JSON string (after best-effort unescaping). Caller does json.loads() on the result. If all 3 tiers fail, returns raw text (caller handles as fallback). """ m = re.search(r'"(?:arguments|input)"\s*:\s*"?', text) if not m: return None start = m.end() if start < len(text) and text[start] == '"': start += 1 if start >= len(text) or text[start] != '{': return None depth = 0 i = start while i < len(text): ch = text[i] if ch == '{': depth += 1 elif ch == '}': depth -= 1 if depth == 0: raw = text[start:i+1] # Try JSON.parse as-is try: json.loads(raw) return raw except json.JSONDecodeError: pass # Try after unescaping inner \" -> " unescaped = raw.replace('\\"', '"') try: json.loads(unescaped) return unescaped except json.JSONDecodeError: pass # Try after also unescaping \\n -> \n etc try: fixed = raw.encode().decode('unicode_escape') json.loads(fixed) return fixed except Exception: pass # Give up — return raw text return raw i += 1 return None def _extract_raw_json_tool_calls(t): """[FIX 5] Extract raw JSON tool-call objects from free text. Finds "type":"tool-call" (or tool_call/function_call) in text, then extracts name/id/arguments/sandbox_permissions/justification via field-level regex. Delegates to _extract_args() for the arguments field (handles unescaped + escaped JSON). Delegates to _extract_field() for name/id/sandbox_permissions/justification (with FIX 7 for leading-\ handling). Normalizes sandbox_permissions to valid values (use_default|require_escalated|with_user_approval) [FIX 6] Prevents double-wrapped args: {"cmd": "{\"cmd\": \"curl...\"}"} """ results = [] idx = 0 while True: m = re.search(r'"type"\s*:\s*"(tool-call|tool_call|function_call)"', t[idx:]) if not m: break tc_pos = idx + m.start() snippet = t[tc_pos:] idx = tc_pos + 1 tc_type = m.group(1) tc_name = _extract_field(snippet, "name") if not tc_name: continue tc_id = _extract_field(snippet, "id") tool_name = "exec_command" if tc_name.lower() in ("bash", "shell", "terminal", "run_command") else tc_name args_raw = _extract_args(snippet) or _extract_field(snippet, "arguments") or _extract_field(snippet, "input") or "{}" try: args = json.loads(args_raw) if args_raw.startswith('{') else {"cmd": args_raw} except Exception: args = {"cmd": args_raw} if "cmd" not in args or not args["cmd"]: args["cmd"] = str(args) # [FIX 11] Self-healing: unwrap double-wrapped cmd values args["cmd"] = _unwrap_cmd(args.get("cmd", "")) # Normalize sandbox_permissions to valid values _VALID_SP = frozenset({"use_default", "require_escalated", "with_user_approval"}) if "sandbox_permissions" in args: spv = args["sandbox_permissions"] if isinstance(spv, dict): args["sandbox_permissions"] = "require_escalated" if spv.get("require_escalated") else "use_default" elif isinstance(spv, str) and spv not in _VALID_SP: args["sandbox_permissions"] = "require_escalated" else: # Fallback: extract from raw snippet (model puts it at top level) sp_raw = _extract_field(snippet, "sandbox_permissions") if sp_raw: try: sp_obj = json.loads(sp_raw) if sp_raw.startswith('{') else {"require_escalated": bool(sp_raw)} if isinstance(sp_obj, dict) and sp_obj.get("require_escalated"): args["sandbox_permissions"] = "require_escalated" except Exception: pass if "justification" not in args: just_raw = _extract_field(snippet, "justification") if just_raw: args["justification"] = just_raw results.append({ "full_match": snippet, "name": tool_name, "arguments": json.dumps(args, ensure_ascii=False), }) return results for pat in patterns: for m in re.finditer(pat, text, re.DOTALL | re.IGNORECASE): if pat.startswith("\s]+)", body, re.IGNORECASE) raw_name = raw_name or (nm.group(1) if nm else "bash") params = {} body_stripped = body.strip() if body_stripped.startswith("{"): try: obj = json.loads(body_stripped) cmd = obj.get("command") or obj.get("cmd") or "" cmd = _unwrap_cmd(cmd) # [FIX 11] if cmd: tool_name = "exec_command" if raw_name.lower() in ("bash", "shell", "terminal", "run_command") else raw_name args = {"cmd": cmd} sp = obj.get("sandbox_permissions") if isinstance(sp, dict) and sp.get("require_escalated"): args["sandbox_permissions"] = "require_escalated" elif isinstance(sp, str): args["sandbox_permissions"] = sp if obj.get("justification"): args["justification"] = obj.get("justification") calls.append({"full_match": m.group(0), "name": tool_name, "arguments": json.dumps(args)}) continue except Exception: pass for pm in re.finditer(r"(.*?)", body, re.DOTALL | re.IGNORECASE): key = pm.group(1) or pm.group(2) or "text" params[key] = _strip_xmlish_tags(pm.group(3)).strip() cmd = params.get("command") or params.get("cmd") or "" if not cmd and body_stripped.startswith("{"): cm = re.search(r'"(?:command|cmd)"\s*:\s*"(.*?)"\s*,\s*"(?:sandbox_permissions|justification|prefix_rule)"', body, re.DOTALL) if not cm: cm = re.search(r'"(?:command|cmd)"\s*:\s*"(.*?)"\s*}', body, re.DOTALL) if cm: cmd = cm.group(1) cmd = cmd.replace('\\n', '\n').replace('\\"', '"').strip() cmd = _unwrap_cmd(cmd) # [FIX 11] if re.search(r'"sandbox_permissions"\s*:\s*\{\s*"require_escalated"\s*:\s*true\s*\}', body, re.DOTALL): params["sandbox_permissions"] = "require_escalated" jm = re.search(r'"justification"\s*:\s*"(.*?)"\s*(?:,|})', body, re.DOTALL) if jm: params["justification"] = jm.group(1).replace('\\n', '\n').replace('\\"', '"').strip() if not cmd: stripped = _strip_xmlish_tags(body) lines = [ln.strip() for ln in stripped.splitlines() if ln.strip()] for i, ln in enumerate(lines): if re.match(r"^(curl|wget|python3?|node|npm|pnpm|yarn|cat|ls|find|grep|rg|sed|awk|git|mkdir|touch|printf|echo)\b", ln): cmd = "\n".join(lines[i:]) break if not cmd and lines: cmd = "\n".join(lines) if not cmd: continue tool_name = "exec_command" if raw_name.lower() in ("bash", "shell", "terminal", "run_command") else raw_name args = {"cmd": _unwrap_cmd(cmd)} # [FIX 11] all paths must unwrap if params.get("sandbox_permissions"): args["sandbox_permissions"] = params["sandbox_permissions"] if params.get("justification"): args["justification"] = params["justification"] calls.append({"full_match": m.group(0), "name": tool_name, "arguments": json.dumps(args)}) # Also extract raw JSON tool-call objects embedded in free text calls.extend(_extract_raw_json_tool_calls(text)) # [FIX 11] Self-healing: last-chance sanitization pass on ALL extracted calls calls = _sanitize_tool_calls(calls) return calls def _sanitize_tool_calls(calls): """[FIX 11/T3] Post-extraction self-healing validation layer. Runs AFTER all extraction paths (XML, raw JSON, regex) have produced their tool calls. This is the final safety net before calls are returned to the streaming/response builder. Validates and repairs: - Double/triple-wrapped cmd values (recursive unwrap) - cmd that looks like JSON object/string instead of shell command - cmd containing escaped newlines or quotes that would break bash - Empty or whitespace-only cmd → replaced with diagnostic string Logs warnings for any repair made (visible in stderr/proxy logs). Returns sanitized list (may be shorter if irreparable calls are dropped). """ cleaned = [] for i, call in enumerate(calls): try: args_raw = call.get("arguments", "{}") if isinstance(args_raw, str): args = json.loads(args_raw) else: args = dict(args_raw) except Exception: cleaned.append(call) continue cmd = args.get("cmd", "") repaired = False # Detect and unwrap nested JSON cmd values (up to 4 levels deep) unwrapped = _unwrap_cmd(cmd) if unwrapped != cmd: cmd = unwrapped args["cmd"] = cmd repaired = True # Detect cmd that is still a JSON object (unwrap missed it or deeper nesting) if isinstance(cmd, str) and cmd.strip().startswith("{"): try: inner = json.loads(cmd) if isinstance(inner, dict): for key in ("cmd", "command", "c"): if key in inner and isinstance(inner[key], str): args["cmd"] = inner[key] repaired = True break except Exception: pass # Detect cmd that looks like a JSON-encoded string with backslash escapes _cmd = args.get("cmd", "") if _cmd and ('\\"' in _cmd or "\\n" in _cmd or _cmd.count("{") > _cmd.count("}")): try: decoded = _cmd.encode().decode("unicode_escape") if decoded != _cmd and not decoded.startswith("{"): args["cmd"] = decoded repaired = True except Exception: pass # Final guard: if cmd is empty or just JSON garbage, make it obvious _final_cmd = args.get("cmd", "") if not _final_cmd or _final_cmd.strip() in ("{}", "null", "None", ""): _safe_preview = args_raw[:200].replace('"', "'").replace('\\', '/') args["cmd"] = f"# [CC-SANITIZER] empty cmd recovered from: {_safe_preview}" repaired = True elif _final_cmd.startswith("{") and len(_final_cmd) < 500: # Still looks like JSON — likely unrecoverable, flag it _safe_preview = _final_cmd.replace('"', "'").replace('\\', '/') args["cmd"] = f"# [CC-SANITIZER] suspicious cmd (still JSON): {_safe_preview}" repaired = True if repaired: print(f"[translate-proxy] [CC-SANITIZER] repaired tool call #{i}: " f"name={call.get('name')} cmd_preview={str(args.get('cmd',''))[:120]}", file=sys.stderr) call["arguments"] = json.dumps(args, ensure_ascii=False) cleaned.append(call) return cleaned def _parse_cc_line(line): """Parse a raw line from CommandCode /alpha/generate, stripping SSE data: prefix.""" stripped = line.strip() if not stripped: return None if stripped.startswith("data: "): stripped = stripped[6:] elif stripped.startswith("data:"): stripped = stripped[5:] if not stripped or stripped == "[DONE]": return None try: return json.loads(stripped) except json.JSONDecodeError: return None def _iter_cc_events(stream): """Yield parsed JSON events from a CommandCode /alpha/generate stream. Handles raw JSON lines, SSE data: events, and multi-event chunks. """ buf = "" for chunk in stream: buf += chunk.decode("utf-8", errors="replace") while "\n" in buf: line, buf = buf.split("\n", 1) d = _parse_cc_line(line) if d is not None: yield d # Process remaining buffer (non-streaming single-JSON response) if buf.strip(): if buf.strip().startswith("{"): d = _parse_cc_line(buf) if d is not None: yield d else: for line in buf.strip().split("\n"): d = _parse_cc_line(line) if d is not None: yield d def cc_resp_to_responses(cc_lines, model, resp_id=None): text = "" usage = {} if isinstance(cc_lines, str): cc_lines = [cc_lines] for line in cc_lines: d = _parse_cc_line(line) if d is None: continue t = d.get("type", "") if t == "text-delta": text += d.get("text", "") elif t == "finish-step": u = d.get("usage", {}) usage = { "input_tokens": u.get("inputTokens", 0), "output_tokens": u.get("outputTokens", 0), "total_tokens": u.get("inputTokens", 0) + u.get("outputTokens", 0), } outputs = [] if text: outputs.append({"type": "message", "id": uid("msg"), "role": "assistant", "status": "completed", "content": [{"type": "output_text", "text": text, "annotations": []}]}) return {"id": resp_id or uid("resp"), "object": "response", "created": int(time.time()), "model": model, "status": "completed", "output": outputs, "usage": {"input_tokens": usage.get("input_tokens", 0), "output_tokens": usage.get("output_tokens", 0), "total_tokens": usage.get("total_tokens", 0), "input_tokens_details": {"cached_tokens": 0}}} def cc_stream_to_sse(cc_stream, model, req_id): resp_id = req_id or uid("resp") msg_id = uid("msg") text_buf = "" yield emit("response.created", {"type": "response.created", "response": {"id": resp_id, "object": "response", "model": model, "status": "in_progress", "created": int(time.time()), "output": []}}) yield emit("response.in_progress", {"type": "response.in_progress", "response": {"id": resp_id}}) total_usage = {} _event_types_seen = set() _debug_log_path = os.path.expanduser("~/.cache/codex-proxy/cc-debug.log") _debug_fh = open(_debug_log_path, "a") # [FIX 14] always write debug to FILE (not just stderr which may be piped) _deflog = lambda *a, **kw: print(*a, file=_debug_fh, flush=True, **kw) for d in _iter_cc_events(cc_stream): t = d.get("type", "") _event_types_seen.add(t) if t == "text-delta": txt = d.get("text", "") if txt: text_buf += txt elif t == "finish-step": u = d.get("usage", {}) total_usage = { "input_tokens": u.get("inputTokens", 0), "output_tokens": u.get("outputTokens", 0), "total_tokens": u.get("inputTokens", 0) + u.get("outputTokens", 0), } elif t not in ("text-delta", "finish-step"): _deflog(f"[CC-DEBUG] unexpected event type: {t} keys={list(d.keys())[:5]} data={str(d)[:200]}") _deflog(f"[CC-DEBUG] stream ended. event_types={_event_types_seen} text_buf_len={len(text_buf)}") parsed_tool_calls = _parse_commandcode_text_tool_calls(text_buf) _deflog(f"[CC-DEBUG] text_buf len={len(text_buf)} parsed_tool_calls={len(parsed_tool_calls)} " f"text_preview={text_buf[:500]!r}") if parsed_tool_calls: for ti, tc in enumerate(parsed_tool_calls): _deflog(f"[CC-DEBUG] tool_call[{ti}] name={tc.get('name')} args_preview={tc.get('arguments','')[:150]!r}") # [FIX 13] FALLBACK: if parser returned empty but text contains tool-call patterns, # force-extract using regex. This catches cases where model output format # doesn't match any of our named patterns (XML/raw JSON/function=). if not parsed_tool_calls and len(text_buf) > 20: _has_tc_signals = ( '"type"' in text_buf and ('tool-call' in text_buf or 'tool_call' in text_buf or 'function_call' in text_buf) ) or ( ' dict: """Return a dict for storing in provider-caps.json.""" d = {} for k, v in dataclasses.asdict(self).items(): if isinstance(v, (list, tuple)) and not v: continue if isinstance(v, dict) and not v: continue if v is False: continue if v == "": continue if v == "auto": continue d[k] = v return d class ErrorAnalyzer: """Parse upstream error responses to infer provider schema. Analyzes 400, 401, 422 errors for hints about auth, roles, content format, parameter names, field names, tool format, and response format. """ @staticmethod def analyze(error_text: str, current: ProviderSchema = None) -> dict: hints = {} if not error_text: return hints err = error_text.lower() # ── Auth detection (401 errors) ── if re.search(r"unauthorized|invalid.*api.?key|missing.*api.?key|x-api-key", err): hints["auth_type"] = "x-api-key" hints["auth_header"] = "x-api-key" hints["auth_scheme"] = "" elif re.search(r"invalid.*bearer|bearer.*token|authorization.*header|invalid.*token", err): hints["auth_type"] = "bearer" hints["auth_header"] = "Authorization" hints["auth_scheme"] = "Bearer " # ── Role validation ── if re.search(r"role.*expected.*(?:user|assistant)", err): hints["accepts_tool_role"] = False hints["accepts_function_role"] = False if re.search(r"role.*(?:tool|function).*(?:invalid|not.*(?:support|allow))", err): hints["accepts_tool_role"] = False hints["accepts_function_role"] = False if re.search(r"role.*system.*(?:invalid|not.*(?:support|allow))", err): hints["accepts_system_role"] = False # ── Content format (top-level only, not content[i].xxx) ── if re.search(r'params\.messages\[\d+\]\.content', err): # Explicit path to content field in a messages array (e.g. /alpha/generate) if re.search(r"expected string.*received array", err): hints["content_type"] = "string" hints["tool_result_style"] = "inline" # no tool_result blocks allowed elif re.search(r"expected array.*received string", err): hints["content_type"] = "array" elif re.search(r"(? ProviderSchema: for k, v in hints.items(): if k == "field_names" and isinstance(v, dict): schema.field_names.update(v) elif k == "param_names" and isinstance(v, dict): schema.param_names.update(v) elif hasattr(schema, k): setattr(schema, k, v) return schema def _schema_cache_key(target_url=None, backend=None, model=None): host = urllib.parse.urlparse(target_url or TARGET_URL).netloc.lower() return f"auto-schema|{backend or BACKEND}|{host}|{model or '*'}" def _load_schema(target_url=None, backend=None, model=None): caps = _load_provider_caps() key = _schema_cache_key(target_url, backend, model) raw = caps.get(key) generic = caps.get(_schema_cache_key(target_url, backend, model="*")) data = raw or generic or {} if not data: return ProviderSchema() # Staleness check: re-learn after 24h (86400s) updated = data.get("_updated", 0) if isinstance(updated, (int, float)) and time.time() - updated > 86400: print(f"[auto-sense] cached schema stale ({int(time.time()-updated)}s old), re-learning", file=sys.stderr) return ProviderSchema() return ProviderSchema( supported_roles=tuple(data.get("supported_roles", ("user", "assistant"))), content_type=data.get("content_type", "string"), content_block_types=tuple(data.get("content_block_types", ())), tool_result_style=data.get("tool_result_style", "inline"), tool_call_style=data.get("tool_call_style", "openai_function"), accepts_tool_role=data.get("accepts_tool_role", False), accepts_system_role=data.get("accepts_system_role", True), cc_body_wrap=data.get("cc_body_wrap", False), field_names=dict(data.get("field_names", {})), auth_type=data.get("auth_type", ""), auth_header=data.get("auth_header", "Authorization"), auth_scheme=data.get("auth_scheme", "Bearer "), tool_decl_format=data.get("tool_decl_format", "openai"), param_names=dict(data.get("param_names", { "max_tokens": "max_tokens", "temperature": "temperature", "top_p": "top_p", })), response_format=data.get("response_format", "auto"), stream_format=data.get("stream_format", "auto"), ) def _save_schema(schema: ProviderSchema, target_url=None, backend=None, model=None): caps = _load_provider_caps() key = _schema_cache_key(target_url, backend, model) caps[key] = schema.hints() caps[key]["_updated"] = time.time() caps[key]["_backend"] = backend or BACKEND _save_provider_caps() print(f"[auto-sense] cached schema {key}", file=sys.stderr) class SchemaAdapter: """Convert Responses API messages based on a detected ProviderSchema.""" def __init__(self, schema: ProviderSchema): self.s = schema def convert(self, input_data, instructions=""): if self.s.content_type == "string" and not self.s.content_block_types: return self._to_plain_string(input_data, instructions) return self._to_content_blocks(input_data, instructions) def _to_plain_string(self, input_data, instructions=""): """Fallback: user/assistant string content — no tool roles.""" msgs = [] if instructions and self.s.accepts_system_role: msgs.append({"role": "system", "content": instructions}) elif instructions: msgs.append({"role": "user", "content": instructions}) if isinstance(input_data, str): msgs.append({"role": "user", "content": input_data}) return msgs if not isinstance(input_data, list): return msgs last_flushed = [] pending = [] for item in input_data: t = item.get("type") if t == "function_call": cid = item.get("call_id") or item.get("id") or uid("fc") pending.append({"id": cid, "name": item.get("name", ""), "arguments": item.get("arguments", "{}")}) continue if pending: last_flushed = [p["id"] for p in pending] msgs.append({"role": "assistant", "content": None, "tool_calls": [{"id": p["id"], "type": "function", "function": {"name": p["name"], "arguments": p["arguments"]}} for p in pending]}) pending = [] if t == "message": role = "user" if item.get("role") in ("user", "developer") else "assistant" text = _extract_text(item.get("content", [])) if text: msgs.append({"role": role, "content": text}) elif t == "function_call_output": out = item.get("output", "") if not isinstance(out, str): out = json.dumps(out, ensure_ascii=False) msgs.append({"role": "user", "content": out[:8000]}) if pending: last_flushed = [p["id"] for p in pending] msgs.append({"role": "assistant", "content": None, "tool_calls": [{"id": p["id"], "type": "function", "function": {"name": p["name"], "arguments": p["arguments"]}} for p in pending]}) return msgs def _to_content_blocks(self, input_data, instructions=""): msgs = [] pending_tc = [] tool_name_by_id = {} last_ids = [] def flush(): nonlocal last_ids if not pending_tc: return last_ids = [t["id"] for t in pending_tc] msgs.append({"role": "assistant", "content": pending_tc}) pending_tc.clear() _str = self.s.content_type == "string" if instructions: msgs.append({"role": "user", "content": instructions if _str else [{"type": "text", "text": instructions}]}) if isinstance(input_data, str): msgs.append({"role": "user", "content": input_data if _str else [{"type": "text", "text": input_data}]}) return msgs if not isinstance(input_data, list): return msgs for item in input_data: t = item.get("type") if t == "function_call": cid = item.get("call_id") or item.get("id") or uid("call") nm = item.get("name") or "exec_command" tool_name_by_id[cid] = nm tc_block = self._tool_call_block(cid, nm, item.get("arguments", "{}")) if tc_block: pending_tc.append(tc_block) continue flush() if t == "message": role = "user" if item.get("role") in ("user", "developer") else "assistant" text = _extract_text(item.get("content", [])) if text: msgs.append({"role": role, "content": text if _str else [{"type": "text", "text": text}]}) elif t == "function_call_output": cid = item.get("call_id") or item.get("id") or "" if not cid and last_ids: idx = sum(1 for m in msgs for c in (m.get("content") or []) if isinstance(c, dict) and c.get("type") in ("tool_result", "tool-result")) if idx < len(last_ids): cid = last_ids[idx] out = item.get("output", "") if not isinstance(out, str): out = json.dumps(out, ensure_ascii=False) tr = self._tool_result_block(cid, out) if tr: msgs.append({"role": "user", "content": [tr]}) flush() return msgs def _tool_call_block(self, cid, name, args): style = self.s.tool_call_style fn = self.s.field_names if style == "tool-call": return { "type": fn.get("tool_call_type", "tool-call"), fn.get("tool_call_id_field", "id"): cid, fn.get("tool_call_name_field", "name"): name, fn.get("tool_call_args_field", "arguments"): args, } elif style == "anthropic_tool_use": try: parsed = json.loads(args) except Exception: parsed = {} return { "type": fn.get("tool_use_type", "tool_use"), fn.get("tool_call_id_field", "id"): cid, fn.get("tool_call_name_field", "name"): name, fn.get("tool_call_args_field", "input"): parsed, } else: return None # handled as OpenAI function call def _tool_result_block(self, cid, output): style = self.s.tool_result_style fn = self.s.field_names if style == "tool_result_block": return { "type": fn.get("tool_result_type", "tool_result"), fn.get("tool_use_id", "tool_use_id"): cid or "", "content": [{"type": "text", "text": output[:8000]}], } elif style == "anthropic": return { "type": fn.get("tool_result_type", "tool_result"), fn.get("tool_use_id", "tool_use_id"): cid or "", "content": output[:8000], } return None # inline — handled by _to_plain_string def _sanitize_err_body(body): """Sanitize upstream error body: strip HTML, truncate, remove control chars.""" if not body: return "" s = re.sub(r'<[^>]+>', '', body) s = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', '', s) s = s.strip()[:1000] return s def _extract_text(content): if isinstance(content, str): return content if not isinstance(content, list): return "" parts = [] for p in content: if isinstance(p, str): parts.append(p) elif isinstance(p, dict) and p.get("type") in ("input_text", "output_text", "text"): parts.append(p.get("text", "")) return "".join(parts) # ═══════════════════════════════════════════════════════════════════ # HTTP Server # ═══════════════════════════════════════════════════════════════════ def _log_resp(resp_id, status, output): try: import datetime as _dt _lp = os.path.join(_LOG_DIR, "requests.log") with open(_lp, "a") as _f: _f.write(f" RESPONSE id={resp_id} status={status}\n") if output: for o in output: ot = o.get("type") if ot == "message": _f.write(f" -> message: {o.get('content',[{}])[0].get('text','')[:200]}\n") elif ot == "function_call": _f.write(f" -> function_call: {o.get('name')}({o.get('arguments','')[:120]})\n") else: _f.write(f" -> {ot}\n") _f.write(f"{'='*60}\n") _f.flush() except Exception: pass class ConnectionTracker: def __enter__(self): global _active_connections with _active_connections_lock: _active_connections += 1 def __exit__(self, *a): global _active_connections with _active_connections_lock: _active_connections -= 1 class RequestTracker: def __init__(self, request_id): self.request_id = request_id self.cancelled = threading.Event() def __enter__(self): if self.request_id: with _active_requests_lock: _active_requests[self.request_id] = self return self def __exit__(self, *a): if self.request_id: with _active_requests_lock: _active_requests.pop(self.request_id, None) def _cancel_request(request_id): with _active_requests_lock: req = _active_requests.get(request_id) if not req: return False req.cancelled.set() return True def _handle_shutdown_signal(signum, frame): global _shutdown_requested _shutdown_requested = True print("[proxy] shutdown requested; draining connections", file=sys.stderr) def _drain(): deadline = time.time() + 5 while time.time() < deadline: with _active_connections_lock: if _active_connections == 0: break time.sleep(0.1) if SERVER is not None: SERVER.shutdown() threading.Thread(target=_drain, daemon=True).start() def _upstream_timeout(body, stream): input_data = body.get("input", "") n_items = len(input_data) if isinstance(input_data, list) else 1 has_tools = bool(body.get("tools")) if stream: return min((180 if has_tools else 120) + n_items * 2, 300) return min(60 + n_items * 2, 120) def _auto_continue_gemini(handler, flush_event, message_id, model, gen_config, gemini_tools, system_parts, project_id, headers, endpoints, url_suffix, accumulated_text, output_items, message_started): max_continuations = 5 for _cont in range(max_continuations): cont_contents = [ {"role": "model", "parts": [{"text": accumulated_text[-12000:]}]}, {"role": "user", "parts": [{"text": "Continue exactly where you left off. Do not repeat anything already written."}]}, ] cont_request = {"contents": cont_contents, "generationConfig": dict(gen_config)} if system_parts: cont_request["systemInstruction"] = {"parts": system_parts} if gemini_tools: cont_request["tools"] = gemini_tools cont_wrapped = {"project": project_id, "model": model, "request": cont_request} if OAUTH_PROVIDER == "google-antigravity": cont_wrapped["requestType"] = "agent" cont_wrapped["userAgent"] = "antigravity" cont_wrapped["requestId"] = f"agent-{uuid.uuid4().hex[:12]}" cont_body = json.dumps(cont_wrapped).encode() upstream = None for ep in endpoints: target = f"{ep}/{url_suffix}" req = urllib.request.Request(target, data=cont_body, headers=headers) try: upstream = urllib.request.urlopen(req, timeout=180) break except Exception as e: print(f"[auto-continue] {ep} failed: {e}", file=sys.stderr) continue if not upstream: break cont_text = "" cont_finish = "" cont_buf = "" for raw_line in upstream: line = raw_line.decode(errors="replace") if line.startswith("data: "): cont_buf += line[6:] continue if not line.strip() and cont_buf: try: chunk = json.loads(cont_buf) except Exception: cont_buf = "" continue cont_buf = "" candidates = chunk.get("response", chunk).get("candidates", []) if not candidates: continue cont_finish = candidates[0].get("finishReason", "") parts = candidates[0].get("content", {}).get("parts", []) for part in parts: if part.get("thought"): continue if "text" in part and not part.get("functionCall"): delta = part["text"] if delta: cont_text += delta flush_event("response.output_text.delta", {"type": "response.output_text.delta", "output_index": 0, "content_index": 0, "delta": delta}) elif part.get("functionCall"): fc = part["functionCall"] call_id = f"call_{uuid.uuid4().hex[:24]}" args_str = json.dumps(fc.get("args", fc.get("arguments", {}))) output_index = len(output_items) flush_event("response.output_item.added", {"type": "response.output_item.added", "output_index": output_index, "item": {"type": "function_call", "id": call_id, "call_id": call_id, "name": fc.get("name", ""), "arguments": ""}}) flush_event("response.function_call_arguments.delta", {"type": "response.function_call_arguments.delta", "output_index": output_index, "item_id": call_id, "delta": args_str}) flush_event("response.function_call_arguments.done", {"type": "response.function_call_arguments.done", "output_index": output_index, "item_id": call_id, "arguments": args_str}) output_items.append({"tool": True, "fc": fc, "call_id": call_id}) accumulated_text += cont_text print(f"[auto-continue] chunk {len(cont_text)} chars, finish={cont_finish}, total={len(accumulated_text)}", file=sys.stderr) if cont_finish != "MAX_TOKENS": break return accumulated_text class Handler(http.server.BaseHTTPRequestHandler): protocol_version = "HTTP/1.1" def do_GET(self): if self.path in ("/v1/models", "/models"): self.send_json(200, {"object": "list", "data": MODELS}) elif self.path in ("/health", "/v1/health"): self.send_json(200, {"ok": True, "backend": BACKEND, "target_url": TARGET_URL, "models": [m.get("id") for m in MODELS], "bgp_routes": len(BGP_ROUTES)}) else: self.send_error(404) def do_POST(self): if _shutdown_requested: return self.send_json(503, {"error": {"type": "proxy_shutting_down", "message": "Proxy is shutting down"}}) if self.path.startswith("/admin/cancel/"): request_id = self.path.rsplit("/", 1)[-1] if _cancel_request(request_id): return self.send_json(200, {"ok": True, "cancelled": request_id}) return self.send_json(404, {"ok": False, "error": "request_not_found"}) if self.path in ("/v1/responses", "/responses"): with ConnectionTracker(): self._handle() else: self.send_error(404) _logf = None def _handle(self): try: clen = int(self.headers.get("Content-Length", 0)) body = json.loads(self.rfile.read(clen)) except Exception as e: return self.send_json(400, {"error": {"message": f"Bad request: {e}"}}) import datetime as _dt _log_path = os.path.join(_LOG_DIR, "requests.log") _ts = _dt.datetime.now().isoformat() prev_id = body.get("previous_response_id") raw_input = body.get("input", "") input_data = resolve_previous_response(body) input_data = _compact_input(input_data) body["input"] = input_data raw_types = [i.get("type") for i in raw_input] if isinstance(raw_input, list) else "str" resolved_types = [i.get("type") for i in input_data] if isinstance(input_data, list) else "str" print(f"[REQUEST] prev_id={prev_id} raw={raw_types} resolved={resolved_types}", file=sys.stderr) with open(_log_path, "a") as _lf: _lf.write(f"\n{'='*60}\n{_ts} REQUEST {self.path}\n") _lf.write(f" prev_id={prev_id}\n") _lf.write(f" raw_input_types={raw_types}\n") _lf.write(f" resolved_input_types={resolved_types}\n") _lf.write(f" stream={body.get('stream')} model={body.get('model')}\n") _lf.write(f" store_keys={list(_response_store.keys())}\n") if isinstance(input_data, list): for i, item in enumerate(input_data): t = item.get("type") if t == "message": _lf.write(f" [{i}] message role={item.get('role')} text={str(item.get('content',''))[:120]}\n") elif t == "function_call": _lf.write(f" [{i}] function_call call_id={item.get('call_id')} id={item.get('id')} name={item.get('name')} args={item.get('arguments','')[:120]}\n") elif t == "function_call_output": _lf.write(f" [{i}] function_call_output id={item.get('id')} output={str(item.get('output',''))[:120]}\n") else: _lf.write(f" [{i}] {t}\n") _lf.flush() model = body.get("model", MODELS[0]["id"] if MODELS else "unknown") stream = body.get("stream", False) request_id = body.get("request_id") or body.get("id") or uid("req") save_request_snapshot(request_id, body) _req_t0 = time.time() try: with RequestTracker(request_id) as tracker: if BACKEND == "auto": self._handle_auto(body, model, stream, tracker) elif BACKEND == "anthropic": self._handle_anthropic(body, model, stream, tracker) elif BACKEND == "command-code": self._handle_command_code(body, model, stream, tracker) elif (BACKEND or "").startswith("gemini-oauth"): self._handle_gemini_oauth(body, model, stream, tracker) else: self._handle_openai_compat(body, model, stream, tracker) update_snapshot_response(request_id, "completed", time.time() - _req_t0) except Exception as _snap_err: update_snapshot_response(request_id, "error", time.time() - _req_t0, _snap_err) raise def _handle_openai_compat(self, body, model, stream, tracker=None): input_data = body.get("input", "") policy = provider_policy() pair_errors = validate_tool_pairs(input_data) if pair_errors: print(f"[tool-validator] repairing {len(pair_errors)} orphan tool outputs", file=sys.stderr) input_data = repair_orphan_tool_outputs(input_data, pair_errors) body = dict(body) body["input"] = input_data if (policy.get("synthetic_tool_results") or _provider_cap(model, "synthetic_tool_results", False)) and isinstance(input_data, list): input_data, synthesized = synthesize_tool_results_for_chat(input_data) if synthesized: print("[provider-adapter] using synthetic tool-result continuation", file=sys.stderr) body = dict(body) body["input"] = input_data compacted = False if policy.get("compaction") and isinstance(input_data, list): input_data, compacted = _adaptive_compact(input_data, model, policy) if compacted: body = dict(body) body["input"] = input_data crof_limit = _crof_item_limit(model) if not compacted and isinstance(input_data, list) and len(input_data) > crof_limit: print(f"[crof-adaptive] proactive compact: {len(input_data)} items > limit {crof_limit}", file=sys.stderr) input_data = _crof_compact_for_retry(input_data, model) body = dict(body) body["input"] = input_data messages = oa_input_to_messages(input_data) instructions = body.get("instructions", "").strip() if instructions: messages.insert(0, {"role": "system", "content": instructions}) if BGP_ROUTES: self._handle_bgp(body, model, stream, messages, input_data) else: chat_body = self._build_chat_body(model, messages, body, stream) target = upstream_target(TARGET_URL, "/chat/completions") effective_key = _refresh_oauth_token() fwd = forwarded_headers(self.headers, { "Content-Type": "application/json", "Authorization": f"Bearer {effective_key}", }, browser_ua=True) print(f"[translate-proxy] POST {target} model={model} stream={stream} items={len(input_data) if isinstance(input_data,list) else 1}", file=sys.stderr) chat_body_b = json.dumps(chat_body).encode() max_retries = 3 for attempt in range(max_retries + 1): req = urllib.request.Request(target, data=chat_body_b, headers=fwd) try: upstream = urllib.request.urlopen(req, timeout=_upstream_timeout(body, stream)) except urllib.error.HTTPError as e: err_body = e.read().decode() if e.code in (429, 502, 503) and attempt < max_retries: wait = min(2 ** (attempt + 1), 15) print(f"[translate-proxy] HTTP {e.code} (attempt {attempt+1}/{max_retries}), retrying in {wait}s: {err_body[:150]}", file=sys.stderr) time.sleep(wait) continue return self.send_json(e.code, {"error": {"type": "upstream_error", "message": _sanitize_err_body(err_body)}}) except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError) as e: if attempt < max_retries: wait = min(2 ** (attempt + 1), 10) print(f"[translate-proxy] connection error (attempt {attempt+1}/{max_retries}), retrying in {wait}s: {e}", file=sys.stderr) time.sleep(wait) continue return self.send_json(502, {"error": {"type": "proxy_error", "message": str(e)}}) except Exception as e: return self.send_json(500, {"error": {"type": "proxy_error", "message": str(e)}}) break self._forward_oa_compat(upstream, stream, model, chat_body, body, input_data, fwd, target, tracker) def _build_chat_body(self, model, messages, body, stream): chat_body = {"model": model, "messages": messages} for k in ("temperature", "top_p"): if k in body: chat_body[k] = body[k] chat_body["max_tokens"] = max(body.get("max_output_tokens", 0), 64000) tools = oa_convert_tools(body.get("tools")) if tools: chat_body["tools"] = tools if body.get("tool_choice"): chat_body["tool_choice"] = body["tool_choice"] chat_body["stream"] = stream if not REASONING_ENABLED or REASONING_EFFORT == "none": chat_body["enable_thinking"] = False chat_body["reasoning_effort"] = "none" else: chat_body["reasoning_effort"] = REASONING_EFFORT return chat_body def _handle_gemini_oauth(self, body, model, stream, tracker=None): input_data = body.get("input", "") policy = provider_policy() if OAUTH_PROVIDER == "google-antigravity": alias_map = { "antigravity-gemini-3-flash": "gemini-3-flash", "antigravity-gemini-3-pro": "gemini-3-pro-low", "antigravity-gemini-3.1-pro": "gemini-3.1-pro-low", "gemini-3-flash-preview": "gemini-3-flash", "gemini-3-pro-preview": "gemini-3-pro-low", "gemini-3.1-pro-preview": "gemini-3.1-pro-low", "gemini-3-pro": "gemini-3-pro-low", "gemini-3.1-pro": "gemini-3.1-pro-low", "antigravity-claude-sonnet-4-6": "claude-sonnet-4-6", "antigravity-claude-opus-4-6-thinking": "claude-opus-4-6-thinking", } model = alias_map.get(model, model) pair_errors = validate_tool_pairs(input_data) if pair_errors: input_data = repair_orphan_tool_outputs(input_data, pair_errors) body = dict(body) body["input"] = input_data compacted = False if policy.get("compaction") and isinstance(input_data, list): input_data, compacted = _adaptive_compact(input_data, model, policy) if compacted: body = dict(body) body["input"] = input_data access_token = _refresh_oauth_token() token_name = "google-antigravity-oauth-token.json" if OAUTH_PROVIDER == "google-antigravity" else "google-cli-oauth-token.json" token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", token_name) project_id = "" try: with open(token_path) as f: project_id = json.load(f).get("project_id", "") except Exception: pass contents = [] system_parts = [] instructions = body.get("instructions", "").strip() tool_call_names = {} if isinstance(input_data, list): for item in input_data: t = item.get("type") if t == "message": role = "user" if item.get("role") == "user" else "model" content = item.get("content", "") if isinstance(content, list): parts = [] for c in content: ct = c.get("type") if ct == "input_text": parts.append({"text": c.get("text", "")}) elif ct == "text": parts.append({"text": c.get("text", "")}) elif ct == "input_image" or ct == "image_url": iu = c.get("image_url") or c.get("url", {}) url = iu.get("url", iu) if isinstance(iu, dict) else iu if isinstance(url, str) and url.startswith("data:"): mime, _, b64 = url.partition(";base64,") mime = mime.replace("data:", "") or "image/png" parts.append({"inlineData": {"mimeType": mime, "data": b64}}) else: parts.append({"text": str(url)}) if parts: contents.append({"role": role, "parts": parts}) elif isinstance(content, str): contents.append({"role": role, "parts": [{"text": content}]}) elif t == "function_call": call_id = item.get("call_id") or item.get("id") or f"call_{uuid.uuid4().hex[:24]}" fname = item.get("name", "") if call_id and fname: tool_call_names[call_id] = fname args = item.get("arguments", "{}") if isinstance(args, str): try: args = json.loads(args) except Exception: args = {} contents.append({"role": "model", "parts": [{"functionCall": {"name": fname, "args": args, "id": call_id}, "thoughtSignature": "skip_thought_signature_validator"}]}) elif t == "function_call_output": call_id = item.get("call_id", item.get("id", "")) output = item.get("output", "") fname = item.get("name", "") or tool_call_names.get(call_id, "") try: output_parsed = json.loads(output) if isinstance(output, str) else output except Exception: output_parsed = output resp_part = {"functionResponse": {"name": fname or "unknown", "response": {"result": output_parsed if isinstance(output_parsed, (dict, list)) else output}}} if call_id: resp_part["functionResponse"]["id"] = call_id contents.append({"role": "user", "parts": [resp_part]}) if OAUTH_PROVIDER.startswith("google"): sanitized = [] last_user_text = None last_role = None for content in contents: role = content.get("role") parts = [p for p in content.get("parts", []) if isinstance(p, dict)] if not parts: continue text_key = "\n".join([p.get("text", "") for p in parts if "text" in p]).strip() if role == "user" and text_key and text_key == last_user_text: continue if role == last_role and role in ("user", "model") and sanitized: sanitized[-1].setdefault("parts", []).extend(parts) else: sanitized.append({"role": role, "parts": parts}) if role == "user" and text_key: last_user_text = text_key last_role = role while sanitized and sanitized[0].get("role") != "user": sanitized.pop(0) while sanitized and sanitized[-1].get("role") != "user": sanitized.pop() contents = sanitized if instructions: system_parts.append({"text": instructions}) if OAUTH_PROVIDER == "google-antigravity": system_parts.append({"text": ( "You are connected through a Responses API translation proxy. " "If tools are available and the user's request requires changing files, call the appropriate tool immediately. " "Do not announce plans, do not say you will list files, browse, fetch, inspect, or start by exploring unless you are emitting the actual tool call in the same response. " "For file creation requests, use tools to create or modify the file instead of only printing code in chat. " "If no suitable tool is available, answer directly with the complete result. " "Never answer only with a plan such as 'I will start by...' or 'I am going to...'." )}) gen_config = {} mot = body.get("max_output_tokens", 0) if mot: gen_config["maxOutputTokens"] = mot if body.get("temperature") is not None: gen_config["temperature"] = body["temperature"] if body.get("top_p") is not None: gen_config["topP"] = body["top_p"] if REASONING_ENABLED and REASONING_EFFORT != "none": budget = {"low": 2048, "medium": 8192, "high": 24576}.get(REASONING_EFFORT, 8192) gen_config["thinkingConfig"] = {"includeThoughts": True, "thinkingBudget": budget} oa_tools = body.get("tools", []) gemini_tools = [] if oa_tools: func_decls = [] for tool in oa_tools: ttype = tool.get("type", "function") fname = tool.get("name", "") if ttype == "function": fn = tool.get("function", tool) name = fn.get("name", fname) desc = fn.get("description", "") params = fn.get("parameters", fn.get("input_schema", {})) func_decls.append({"name": name, "description": desc, "parameters": params}) elif fname: func_decls.append({"name": fname, "description": tool.get("description", ""), "parameters": tool.get("parameters", {"type": "object", "properties": {}})}) if func_decls: gemini_tools = [{"functionDeclarations": func_decls}] request_body = {"contents": contents} if system_parts: request_body["systemInstruction"] = {"parts": system_parts} if gen_config: request_body["generationConfig"] = gen_config if gemini_tools: request_body["tools"] = gemini_tools wrapped = { "project": project_id, "model": model, "request": request_body, } if OAUTH_PROVIDER == "google-antigravity": wrapped["requestType"] = "agent" wrapped["userAgent"] = "antigravity" wrapped["requestId"] = f"agent-{uuid.uuid4().hex[:12]}" endpoints = ([ "https://daily-cloudcode-pa.sandbox.googleapis.com", "https://autopush-cloudcode-pa.sandbox.googleapis.com", "https://cloudcode-pa.googleapis.com", ] if OAUTH_PROVIDER == "google-antigravity" else [ "https://cloudcode-pa.googleapis.com", ]) action = "streamGenerateContent" if stream else "generateContent" url_suffix = f"v1internal:{action}?alt=sse" if stream else f"v1internal:{action}" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {access_token}", } if OAUTH_PROVIDER == "google-antigravity": version = _ensure_antigravity_version() headers["User-Agent"] = f"antigravity/{version} darwin/arm64" else: headers["User-Agent"] = "google-api-nodejs-client/9.15.1" headers["X-Goog-Api-Client"] = "gl-node/22.17.0" headers["Client-Metadata"] = "ideType=IDE_UNSPECIFIED,platform=PLATFORM_UNSPECIFIED,pluginType=GEMINI" body_b = json.dumps(wrapped).encode() print(f"[gemini-oauth] model={model} stream={stream} items={len(input_data) if isinstance(input_data, list) else 1} project={project_id}", file=sys.stderr) for ep in endpoints: target = f"{ep}/{url_suffix}" req = urllib.request.Request(target, data=body_b, headers=headers) try: upstream = urllib.request.urlopen(req, timeout=_upstream_timeout(body, stream)) break except urllib.error.HTTPError as e: err_body = e.read().decode() if e.code == 400 and OAUTH_PROVIDER.startswith("google"): try: debug_path = os.path.join(_LOG_DIR, "gemini-last-400-request.json") with open(debug_path, "w") as dbg: json.dump({"endpoint": ep, "model": model, "wrapped": wrapped, "error": err_body}, dbg, indent=2) print(f"[gemini-oauth] saved 400 debug request to {debug_path}", file=sys.stderr) except Exception: pass if e.code == 429 and ep != endpoints[-1]: print(f"[gemini-oauth] {ep} HTTP 429, trying next endpoint", file=sys.stderr) continue return self.send_json(e.code, {"error": {"type": "upstream_error", "message": _sanitize_err_body(err_body)}}) except Exception as e: if ep == endpoints[-1]: return self.send_json(502, {"error": {"type": "proxy_error", "message": str(e)}}) print(f"[gemini-oauth] {ep} failed: {e}, trying next", file=sys.stderr) continue if stream: self._forward_gemini_sse(upstream, model, body, input_data, tracker) else: self._forward_gemini_json(upstream, model, body, input_data) def _forward_gemini_sse(self, upstream, model, body, input_data, tracker=None): resp_id = f"resp-{uuid.uuid4().hex[:24]}" created = int(time.time()) self.send_response(200) self.send_header("Content-Type", "text/event-stream") self.send_header("Cache-Control", "no-cache") self.send_header("Connection", "keep-alive") self.end_headers() full_text = "" output_items = [] current_tool_calls = {} message_started = False message_id = f"msg-{uuid.uuid4().hex[:24]}" def flush_event(event_type, data): self.wfile.write(f"event: {event_type}\ndata: {json.dumps(data)}\n\n".encode()) self.wfile.flush() flush_event("response.created", {"type": "response.created", "response": {"id": resp_id, "object": "response", "model": model, "status": "in_progress", "created": created, "output": []}}) flush_event("response.in_progress", {"type": "response.in_progress", "response": {"id": resp_id}}) buf = "" stream_finished = False for raw_line in upstream: if tracker and tracker.cancelled.is_set(): print("[gemini-oauth] stream cancelled", file=sys.stderr) break if stream_finished: break line = raw_line.decode(errors="replace") if line.startswith("data: "): buf += line[6:] continue if not line.strip() and buf: try: chunk = json.loads(buf) except Exception: buf = "" continue buf = "" candidates = chunk.get("response", chunk).get("candidates", []) if not candidates: if chunk.get("error"): print(f"[gemini-oauth] stream error chunk: {str(chunk.get('error'))[:300]}", file=sys.stderr) continue if candidates[0].get("finishReason") and not candidates[0].get("content", {}).get("parts"): print(f"[gemini-oauth] finish without parts: {candidates[0].get('finishReason')}", file=sys.stderr) parts = candidates[0].get("content", {}).get("parts", []) for part in parts: if part.get("thought"): continue if "text" in part and not part.get("functionCall"): text_delta = part["text"] if not text_delta: continue full_text += text_delta if not message_started: flush_event("response.output_item.added", {"type": "response.output_item.added", "output_index": 0, "item": {"type": "message", "id": message_id, "role": "assistant", "content": []}}) flush_event("response.content_part.added", {"type": "response.content_part.added", "output_index": 0, "content_index": 0, "part": {"type": "output_text", "text": ""}}) output_items.append({"text": True}) message_started = True flush_event("response.output_text.delta", {"type": "response.output_text.delta", "output_index": 0, "content_index": 0, "delta": text_delta}) elif part.get("functionCall"): fc = part["functionCall"] call_id = f"call_{uuid.uuid4().hex[:24]}" args_str = json.dumps(fc.get("args", fc.get("arguments", {}))) output_index = len(output_items) flush_event("response.output_item.added", {"type": "response.output_item.added", "output_index": output_index, "item": {"type": "function_call", "id": call_id, "call_id": call_id, "name": fc.get("name", ""), "arguments": ""}}) flush_event("response.function_call_arguments.delta", {"type": "response.function_call_arguments.delta", "output_index": output_index, "item_id": call_id, "delta": args_str}) flush_event("response.function_call_arguments.done", {"type": "response.function_call_arguments.done", "output_index": output_index, "item_id": call_id, "arguments": args_str}) current_tool_calls[call_id] = fc output_items.append({"tool": True}) last_finish = candidates[0].get("finishReason", "") if OAUTH_PROVIDER == "google-antigravity" and full_text and last_finish: if last_finish == "MAX_TOKENS" and not current_tool_calls: print(f"[gemini-oauth] MAX_TOKENS hit ({len(full_text)} chars), auto-continuing...", file=sys.stderr) break stream_finished = True break if OAUTH_PROVIDER.startswith("google") and full_text and not current_tool_calls and last_finish == "MAX_TOKENS" and not stream_finished: result = _auto_continue_gemini(self, flush_event, message_id, model, gen_config, gemini_tools, system_parts, project_id, headers, endpoints, url_suffix, full_text, output_items, message_started) if result: full_text = result for item in output_items: if isinstance(item, dict) and item.get("tool") and "fc" in item and "call_id" in item: current_tool_calls[item["call_id"]] = item["fc"] out = [] if not full_text and not current_tool_calls: print("[gemini-oauth] WARNING: completed with empty output", file=sys.stderr) if full_text: out.append({"type": "message", "id": message_id, "role": "assistant", "content": [{"type": "output_text", "text": full_text}]}) tool_outputs = [] for cid, fc in current_tool_calls.items(): tool_outputs.append({"type": "function_call", "id": cid, "call_id": cid, "name": fc.get("name", ""), "arguments": json.dumps(fc.get("args", fc.get("arguments", {})))}) out.extend(tool_outputs) final_resp = {"id": resp_id, "object": "response", "model": model, "status": "completed", "created": created, "output": out} if full_text: flush_event("response.output_text.done", {"type": "response.output_text.done", "output_index": 0, "content_index": 0, "text": full_text}) flush_event("response.content_part.done", {"type": "response.content_part.done", "output_index": 0, "content_index": 0, "part": {"type": "output_text", "text": full_text}}) flush_event("response.output_item.done", {"type": "response.output_item.done", "output_index": 0, "item": out[0]}) for idx, item in enumerate(tool_outputs, start=(1 if full_text else 0)): flush_event("response.output_item.done", {"type": "response.output_item.done", "output_index": idx, "item": item}) flush_event("response.completed", {"type": "response.completed", "response": final_resp}) self.close_connection = True with _response_store_lock: _response_store[resp_id] = final_resp while len(_response_store) > _MAX_STORED: _response_store.popitem(last=False) def _forward_gemini_json(self, upstream, model, body, input_data): data = json.loads(upstream.read().decode()) resp_id = f"resp-{uuid.uuid4().hex[:24]}" created = int(time.time()) out = [] full_text = "" candidates = data.get("response", data).get("candidates", []) if candidates: parts = candidates[0].get("content", {}).get("parts", []) text_parts = [] for part in parts: if part.get("thought"): continue if "text" in part and not part.get("functionCall"): text_parts.append(part["text"]) elif part.get("functionCall"): fc = part["functionCall"] call_id = f"call_{uuid.uuid4().hex[:24]}" out.append({"type": "function_call", "id": call_id, "call_id": call_id, "name": fc.get("name", ""), "arguments": json.dumps(fc.get("args", fc.get("arguments", {})))}) if text_parts: full_text = "".join(text_parts) out.insert(0, {"type": "message", "id": f"msg-{uuid.uuid4().hex[:24]}", "role": "assistant", "content": [{"type": "output_text", "text": full_text}]}) resp = {"id": resp_id, "object": "response", "model": model, "status": "completed", "created": created, "output": out} with _response_store_lock: _response_store[resp_id] = resp while len(_response_store) > _MAX_STORED: _response_store.popitem(last=False) self.send_json(200, resp) def _handle_bgp(self, body, model, stream, messages, input_data): routes = _sorted_bgp_routes() routes = [r for r in routes if _bucket_for_route(r).allow()] if not routes: return self.send_json(503, {"error": {"type": "bgp_rate_limited", "message": "All routes rate-limited"}}) errors = [] for route in routes: r_model = route.get("model", model) r_url = route["target_url"].rstrip("/") r_key = route.get("api_key", "") r_reasoning = route.get("reasoning_enabled", True) r_effort = route.get("reasoning_effort", "medium") r_oauth = route.get("oauth_provider", "") chat_body = dict(messages=list(messages)) chat_body["model"] = r_model for k in ("temperature", "top_p"): if k in body: chat_body[k] = body[k] chat_body["max_tokens"] = max(body.get("max_output_tokens", 0), 64000) tools = oa_convert_tools(body.get("tools")) if tools: chat_body["tools"] = tools if body.get("tool_choice"): chat_body["tool_choice"] = body["tool_choice"] chat_body["stream"] = stream if not r_reasoning or r_effort == "none": chat_body["enable_thinking"] = False chat_body["reasoning_effort"] = "none" else: chat_body["reasoning_effort"] = r_effort target = upstream_target(r_url, "/chat/completions") if r_oauth == "google": r_key = _refresh_oauth_token_for(r_key, r_oauth) fwd = forwarded_headers(self.headers, { "Content-Type": "application/json", "Authorization": f"Bearer {r_key}", }, browser_ua=True) print(f"[bgp] trying route '{route.get('name', r_url)}' model={r_model}", file=sys.stderr) req = urllib.request.Request(target, data=json.dumps(chat_body).encode(), headers=fwd) t0_route = time.time() route_ok = False for attempt in range(3): try: upstream = urllib.request.urlopen(req, timeout=_upstream_timeout(body, stream)) print(f"[bgp] route '{route.get('name', r_url)}' connected OK", file=sys.stderr) _update_route_stats(route, True, time.time() - t0_route) self._forward_oa_compat(upstream, stream, r_model, chat_body, body, input_data, fwd, target) return except urllib.error.HTTPError as e: err = e.read().decode() if e.code in (429, 502, 503) and attempt < 2: wait = min(2 ** (attempt + 1), 10) print(f"[bgp] route '{route.get('name', r_url)}' HTTP {e.code}, retry {attempt+1}/2 in {wait}s", file=sys.stderr) time.sleep(wait) req = urllib.request.Request(target, data=json.dumps(chat_body).encode(), headers=fwd) continue print(f"[bgp] route '{route.get('name', r_url)}' FAILED: HTTP {e.code}: {err[:200]}", file=sys.stderr) _update_route_stats(route, False, time.time() - t0_route, http_code=e.code) errors.append(f"{route.get('name','?')}: HTTP {e.code}") break except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError) as e: if attempt < 2: wait = min(2 ** (attempt + 1), 8) print(f"[bgp] route '{route.get('name', r_url)}' conn error, retry {attempt+1}/2 in {wait}s: {e}", file=sys.stderr) time.sleep(wait) req = urllib.request.Request(target, data=json.dumps(chat_body).encode(), headers=fwd) continue _update_route_stats(route, False, time.time() - t0_route, error_type=str(e)) errors.append(f"{route.get('name','?')}: {e}") break except Exception as e: print(f"[bgp] route '{route.get('name', r_url)}' FAILED: {e}", file=sys.stderr) _update_route_stats(route, False, time.time() - t0_route, error_type=str(e)) errors.append(f"{route.get('name','?')}: {e}") break print(f"[bgp] ALL ROUTES FAILED: {errors}", file=sys.stderr) self.send_json(502, {"error": {"type": "bgp_all_routes_failed", "message": f"All BGP routes failed: {'; '.join(errors)}"}}) def _forward_oa_compat(self, upstream, stream, model, chat_body, body, input_data, fwd, target, tracker=None): n_items = len(input_data) if isinstance(input_data, list) else 1 t0 = time.time() provider = TARGET_URL.split("//")[-1].split("/")[0] if BGP_ROUTES: provider = "bgp:" + (BGP_ROUTES[0].get("name", "pool") if BGP_ROUTES else "unknown") if stream: self.send_response(200) self.send_header("Content-Type", "text/event-stream") self.send_header("Cache-Control", "no-cache") self.send_header("Connection", "keep-alive") self.end_headers() if hasattr(self, 'connection') and self.connection: try: self.connection.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) except Exception: pass collected_events = [] last_resp_id = None last_output = None last_status = None finish_reason = None has_content = False def _observe_event(event): nonlocal last_resp_id, last_output, last_status, finish_reason, has_content for line in event.strip().split("\n"): if line.startswith("data: "): try: d = json.loads(line[6:]) if d.get("type") == "response.completed": last_resp_id = d.get("response", {}).get("id") last_output = d.get("response", {}).get("output", []) last_status = d.get("response", {}).get("status") finish_reason = "length" if last_status == "incomplete" else "stop" has_content = any(o.get("type") == "message" for o in (last_output or [])) except Exception: pass try: for event in oa_stream_to_sse(upstream, model, body.get("request_id") or body.get("id")): if tracker and tracker.cancelled.is_set(): print("[translate-proxy] stream cancelled", file=sys.stderr) break collected_events.append(event) _observe_event(event) except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError): print("[translate-proxy] client disconnected during stream", file=sys.stderr) _crof_record(model, n_items, False) _log_resp(last_resp_id, "client_disconnect", last_output) return # Record outcome success = (finish_reason != "length") _crof_record(model, n_items, success) _log_resp(last_resp_id, last_status, last_output) if last_resp_id and input_data is not None: store_response(last_resp_id, input_data, last_output) _record_usage(provider, model, success, time.time() - t0, error_type="length" if not success else None) # Auto-learn provider quirks before flushing the bad response to Codex. if finish_reason == "length" and not has_content and has_function_call_output(input_data): _set_provider_cap(model, "synthetic_tool_results", True, "incomplete empty response after tool output") new_input, synthesized = synthesize_tool_results_for_chat(input_data) if synthesized: print("[provider-sensor] retrying turn with synthetic tool results", file=sys.stderr) new_messages = oa_input_to_messages(new_input) instructions = body.get("instructions", "").strip() if instructions: new_messages.insert(0, {"role": "system", "content": instructions}) new_chat_body = self._build_chat_body(model, new_messages, body, stream) new_req = urllib.request.Request(target, data=json.dumps(new_chat_body).encode(), headers=fwd) try: retry_upstream = urllib.request.urlopen(new_req, timeout=_upstream_timeout(body, True)) collected_events = [] last_resp_id = last_output = last_status = None finish_reason = None has_content = False for event in oa_stream_to_sse(retry_upstream, model, body.get("request_id") or body.get("id")): collected_events.append(event) _observe_event(event) input_data = new_input except Exception as e: print(f"[provider-sensor] synthetic retry failed: {e}", file=sys.stderr) # Auto-retry on finish_reason=length with no content due to too much context. if finish_reason == "length" and not has_content and isinstance(input_data, list) and len(input_data) > 5: print(f"[crof-adaptive] RETRY: finish_reason=length with no content, compacting {n_items} items", file=sys.stderr) new_input = _crof_compact_for_retry(input_data, model) if len(new_input) < len(input_data): new_body = dict(body) new_body["input"] = new_input new_messages = oa_input_to_messages(new_input) instructions = body.get("instructions", "").strip() if instructions: new_messages.insert(0, {"role": "system", "content": instructions}) new_chat_body = dict(chat_body) new_chat_body["messages"] = new_messages new_req = urllib.request.Request( target, data=json.dumps(new_chat_body).encode(), headers=fwd, ) try: retry_upstream = urllib.request.urlopen(new_req, timeout=_upstream_timeout(body, True)) collected_events = [] last_resp_id = last_output = last_status = None finish_reason = None has_content = False for event in oa_stream_to_sse(retry_upstream, model, body.get("request_id") or body.get("id")): collected_events.append(event) _observe_event(event) input_data = new_input except Exception as e: print(f"[crof-adaptive] retry failed: {e}", file=sys.stderr) self.stream_buffered_events(collected_events) else: result = oa_resp_to_responses(json.loads(upstream.read()), model) success = result.get("status") != "incomplete" _crof_record(model, n_items, success) self.send_json(200, result) rid = result.get("id") _log_resp(rid, result.get("status"), result.get("output", [])) if rid and input_data is not None: store_response(rid, input_data, result.get("output", [])) _record_usage(provider, model, success, time.time() - t0) def _forward_oa_compat_retry(self, req, model, chat_body, body, input_data, tracker=None): try: upstream = urllib.request.urlopen(req, timeout=_upstream_timeout(body, True)) except Exception as e: print(f"[crof-adaptive] retry failed: {e}", file=sys.stderr) return self.send_response(200) self.send_header("Content-Type", "text/event-stream") self.send_header("Cache-Control", "no-cache") self.send_header("Connection", "keep-alive") self.end_headers() if hasattr(self, 'connection') and self.connection: try: self.connection.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) except Exception: pass last_resp_id = None last_output = None last_status = None try: def on_event(event): nonlocal last_resp_id, last_output, last_status if tracker and tracker.cancelled.is_set(): print("[translate-proxy] retry stream cancelled", file=sys.stderr) return False for line in event.strip().split("\n"): if line.startswith("data: "): try: d = json.loads(line[6:]) if d.get("type") == "response.completed": last_resp_id = d.get("response", {}).get("id") last_output = d.get("response", {}).get("output", []) last_status = d.get("response", {}).get("status") except: pass return True self.stream_buffered_events(oa_stream_to_sse(upstream, model, body.get("request_id") or body.get("id")), on_event=on_event) except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError): print("[translate-proxy] client disconnected during retry stream", file=sys.stderr) n_items = len(input_data) if isinstance(input_data, list) else 1 _crof_record(model, n_items, last_status == "completed") _log_resp(last_resp_id, last_status or "retry_disconnect", last_output) if last_resp_id and input_data is not None: store_response(last_resp_id, input_data, last_output) def _handle_anthropic(self, body, model, stream, tracker=None): input_data = body.get("input", "") an_body = {"model": model, "messages": an_input_to_messages(input_data), "max_tokens": body.get("max_output_tokens", 8192)} instructions = body.get("instructions", "").strip() if instructions: an_body["system"] = [{"type": "text", "text": instructions, "cache_control": {"type": "ephemeral"}}] for k in ("temperature", "top_p"): if k in body: an_body[k] = body[k] tools = an_convert_tools(body.get("tools")) if tools: an_body["tools"] = tools if body.get("tool_choice"): tc = body["tool_choice"] if isinstance(tc, str): an_body["tool_choice"] = {"type": tc} elif isinstance(tc, dict): an_body["tool_choice"] = tc an_body["stream"] = stream target = upstream_target(TARGET_URL, "/messages") req = urllib.request.Request( target, data=json.dumps(an_body).encode(), headers=forwarded_headers(self.headers, { "Content-Type": "application/json", "x-api-key": API_KEY, "anthropic-version": "2023-06-01", }), ) self._forward(req, stream, model, lambda r: an_resp_to_responses(json.loads(r.read()), model), lambda s: an_stream_to_sse(s, model, body.get("request_id") or body.get("id")), input_data=body.get("input", ""), tracker=tracker) def _handle_command_code(self, body, model, stream, tracker=None): """[ALL FIXES IN ONE] CommandCode /alpha/generate adapter. FIX 1: Uses cc_input_to_messages (string content only, no content blocks) FIX 2: Always sends x-command-code-version header (fallback "0.26.8") FIX 3: No stale schema cache — cleared, 24h TTL FIX 4: Streaming path wrapped in try/except → sends response.completed(status="failed") on crash FIX 5: Response parser (_parse_commandcode_text_tool_calls) now extracts raw JSON tool calls FIX 6: Arguments no longer double-wrapped (three-tier parser in _extract_args) FIX 7: _extract_field handles escaped values (\") correctly FIX 8: sandbox_permissions normalized to valid variants only REVERTED: Removed adaptive probing system (caused format mismatch). Uses conservative cc_input_to_messages format exclusively. ErrorAnalyzer learning on retries (not proactive probes). """ input_data = body.get("input", "") instructions = body.get("instructions", "").strip() schema = _load_schema(model=model) thread_id = body.get("request_id") or body.get("id") or "" try: uuid.UUID(thread_id) except (ValueError, AttributeError): thread_id = str(uuid.uuid4()) # Build auth headers auth_val = f"{schema.auth_scheme}{API_KEY}" if schema.auth_scheme else API_KEY headers_extra = { "Content-Type": "application/json", "Accept": "text/event-stream, application/json", } if schema.auth_header: headers_extra[schema.auth_header] = auth_val else: headers_extra["Authorization"] = f"Bearer {API_KEY}" headers_extra["x-command-code-version"] = CC_VERSION or "0.26.8" pm = schema.param_names tp = schema.field_names.get("tools_param", "tools") target = upstream_target(TARGET_URL, "/alpha/generate") # ── MAIN REQUEST WITH RETRY ── max_retries = 2 for attempt in range(max_retries + 1): cc_msgs = cc_input_to_messages(input_data, instructions, schema) cc_body = { "config": _cc_config(), "memory": "", "taste": "", "skills": "", "params": { "stream": True, pm.get("max_tokens", "max_tokens"): body.get("max_output_tokens", 64000), pm.get("temperature", "temperature"): body.get("temperature", 0.3), "messages": cc_msgs, "model": model, tp: [], }, "threadId": thread_id, } fwd = forwarded_headers(self.headers, headers_extra, browser_ua=True) print(f"[translate-proxy] POST {target} model={model} stream={stream} attempt={attempt} [command-code]", file=sys.stderr) req = urllib.request.Request( target, data=json.dumps(cc_body).encode(), headers=fwd, ) try: upstream = urllib.request.urlopen(req, timeout=_upstream_timeout(body, True)) break except urllib.error.HTTPError as e: err = e.read().decode() if attempt < max_retries: hints = ErrorAnalyzer.analyze(err, schema) if hints: print(f"[command-code] error analysis: {hints}", file=sys.stderr) ErrorAnalyzer.merge_into_schema(hints, schema) _save_schema(schema, model=model) continue if e.code in (429, 502, 503): time.sleep(min(2 ** (attempt + 1), 10)) continue return self.send_json(e.code, {"error": {"type": "upstream_error", "message": _sanitize_err_body(err)}}) except Exception as e: if attempt < max_retries: time.sleep(1) continue return self.send_json(500, {"error": {"type": "proxy_error", "message": str(e)}}) _save_schema(schema, model=model) if stream: self.send_response(200) self.send_header("Content-Type", "text/event-stream") self.send_header("Cache-Control", "no-cache") self.send_header("Connection", "keep-alive") self.end_headers() if hasattr(self, 'connection') and self.connection: try: self.connection.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) except Exception: pass last_resp_id = None last_output = None def on_event(event): nonlocal last_resp_id, last_output if tracker and tracker.cancelled.is_set(): print("[command-code] stream cancelled", file=sys.stderr) return False for line in event.strip().split("\n"): if line.startswith("data: "): try: d = json.loads(line[6:]) if d.get("type") == "response.completed": last_resp_id = d.get("response", {}).get("id") last_output = d.get("response", {}).get("output", []) except: pass return True try: self.stream_buffered_events(cc_stream_to_sse(upstream, model, body.get("request_id") or body.get("id")), on_event=on_event) except Exception as e: print(f"[command-code] stream error: {e}", file=sys.stderr) try: err_event = 'data: ' + json.dumps({"type": "response.completed", "response": {"id": body.get("request_id") or body.get("id") or uid("resp"), "object": "response", "model": model, "status": "failed", "created": int(time.time()), "output": [], "usage": {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0, "input_tokens_details": {"cached_tokens": 0}}}}) self.wfile.write(err_event.encode()) self.wfile.flush() except Exception: pass if last_resp_id: store_response(last_resp_id, body.get("input", ""), last_output) else: raw = upstream.read().decode() result = cc_resp_to_responses(raw, model) self.send_json(200, result) rid = result.get("id") if rid: store_response(rid, body.get("input", ""), result.get("output", [])) def _handle_auto(self, body, model, stream, tracker=None): """Auto-sensing backend: probe schema, adapt, retry on errors. Uses hostname heuristics as initial guess, then learns from errors and caches the learned schema for subsequent requests. """ input_data = body.get("input", "") instructions = body.get("instructions", "").strip() schema = _load_schema(model=model) fresh = not schema.hints().get("_updated") host = urllib.parse.urlparse(TARGET_URL).netloc.lower() def _detect_style(): cc = schema.cc_body_wrap or "commandcode" in host or "command-code" in host anth = schema.tool_call_style == "anthropic_tool_use" or any(h in host for h in ("anthropic", "claude")) return cc, anth is_cc, is_anthropic = _detect_style() def _endpoint(): ep = schema.field_names.get("endpoint_path", "") if ep: return ep if is_cc: return "/alpha/generate" if is_anthropic: return "/messages" return "/chat/completions" _FALLBACK_ENDPOINTS = ["/v1/chat/completions", "/chat/completions", "/v1/messages", "/messages", "/alpha/generate", "/complete", "/v1/complete"] target = upstream_target(TARGET_URL, _endpoint()) tried_endpoints = {target} # track tried endpoints to avoid loops max_retries = 3 prev_content_type = None # for oscillation detection for attempt in range(max_retries + 1): adapter = SchemaAdapter(schema) messages = adapter.convert(input_data, instructions) use_cc_wrap = schema.cc_body_wrap or is_cc # Build auth header from schema auth_val = f"{schema.auth_scheme}{API_KEY}" if schema.auth_scheme else API_KEY headers_extra = {"Content-Type": "application/json"} if schema.auth_header: headers_extra[schema.auth_header] = auth_val pm = schema.param_names # short alias if use_cc_wrap: thread_id = body.get("request_id") or body.get("id") or str(uuid.uuid4()) try: uuid.UUID(thread_id) except (ValueError, AttributeError): thread_id = str(uuid.uuid4()) params_body = { "stream": True, pm.get("max_tokens", "max_tokens"): body.get("max_output_tokens", 64000), pm.get("temperature", "temperature"): body.get("temperature", 0.3), "messages": messages, "model": model, } tp = schema.field_names.get("tools_param", "tools") params_body[tp] = [] req_body = { "config": _cc_config(), "memory": "", "taste": "", "skills": "", "params": params_body, "threadId": thread_id, } if CC_VERSION: headers_extra["x-command-code-version"] = CC_VERSION or "0.26.8" elif is_anthropic: req_body = { "model": model, "messages": messages, pm.get("max_tokens", "max_tokens"): body.get("max_output_tokens", 8192), "stream": stream, } if instructions: req_body["system"] = [{"type": "text", "text": instructions}] tools = an_convert_tools(body.get("tools")) if tools: req_body["tools"] = tools headers_extra.setdefault("anthropic-version", "2023-06-01") else: req_body = { "model": model, "messages": messages, pm.get("max_tokens", "max_tokens"): max(body.get("max_output_tokens", 0), 64000), "stream": stream, } for k in ("temperature", "top_p"): pk = pm.get(k, k) if k in body: req_body[pk] = body[k] if schema.tool_decl_format == "anthropic": tools = an_convert_tools(body.get("tools")) else: tools = oa_convert_tools(body.get("tools")) if tools: req_body["tools"] = tools req_body["tool_choice"] = body.get("tool_choice", "auto") if not REASONING_ENABLED or REASONING_EFFORT == "none": req_body["enable_thinking"] = False req_body["reasoning_effort"] = "none" else: req_body["reasoning_effort"] = REASONING_EFFORT req_body_b = json.dumps(req_body).encode() fwd = forwarded_headers(self.headers, headers_extra, browser_ua=True) print(f"[auto-sense] POST {target} model={model} attempt={attempt} schema={schema.hints()}", file=sys.stderr) req = urllib.request.Request(target, data=req_body_b, headers=fwd) try: upstream = urllib.request.urlopen(req, timeout=_upstream_timeout(body, stream)) except urllib.error.HTTPError as e: err_body = e.read().decode() # ── 404 endpoint fallback ── if e.code == 404 and attempt < max_retries: for ep in _FALLBACK_ENDPOINTS: ep_full = upstream_target(TARGET_URL, ep) if ep_full not in tried_endpoints: tried_endpoints.add(ep_full) target = ep_full # Try the new endpoint without schema change print(f"[auto-sense] 404 -> trying endpoint {ep_full}", file=sys.stderr) break else: # All endpoints tried -> real 404 return self.send_json(404, {"error": {"type": "not_found", "message": f"No working endpoint found (tried {len(tried_endpoints)} paths)"}}) continue # ── Non-404 error handling ── if attempt < max_retries: hints = ErrorAnalyzer.analyze(err_body, schema) oscillation_retry = False if hints: # Content-type oscillation detection if "content_type" in hints: if prev_content_type is not None and hints["content_type"] != prev_content_type: print(f"[auto-sense] content_type oscillation: {prev_content_type} -> {hints['content_type']}, freezing", file=sys.stderr) hints.pop("content_type") schema.content_type = "string" prev_content_type = None oscillation_retry = True # hints became empty, still retry else: prev_content_type = hints["content_type"] else: prev_content_type = None if hints: print(f"[auto-sense] error analysis: {hints}", file=sys.stderr) ErrorAnalyzer.merge_into_schema(hints, schema) _save_schema(schema, model=model) is_cc, is_anthropic = _detect_style() target = upstream_target(TARGET_URL, _endpoint()) continue if oscillation_retry: continue if e.code in (429, 502, 503): wait = min(2 ** (attempt + 1), 15) time.sleep(wait) continue return self.send_json(e.code, {"error": {"type": "upstream_error", "message": _sanitize_err_body(err_body)}}) except Exception as e: if attempt < max_retries: continue return self.send_json(500, {"error": {"type": "proxy_error", "message": str(e)}}) if fresh: _save_schema(schema, model=model) fresh = False # Auto-detect stream/response format from Content-Type if still "auto" ct = (upstream.headers.get("Content-Type", "") if hasattr(upstream, "headers") else "").lower() if schema.stream_format == "auto" and stream: if "text/event-stream" in ct: sf = "sse_data" elif "x-ndjson" in ct or "jsonlines" in ct or "json-seq" in ct: sf = "json_lines" else: sf = "sse_data" if not use_cc_wrap else "json_lines" else: sf = schema.stream_format if schema.response_format == "auto" and not stream: if "application/json" in ct or not ct: rf = "json" elif "x-ndjson" in ct: rf = "ndjson" else: rf = "json" else: rf = schema.response_format if stream: self.send_response(200) self.send_header("Content-Type", "text/event-stream") self.send_header("Cache-Control", "no-cache") self.send_header("Connection", "keep-alive") self.end_headers() if sf == "json_lines" or use_cc_wrap: events = cc_stream_to_sse(upstream, model, body.get("request_id") or body.get("id")) elif sf == "sse_event" or is_anthropic: events = an_stream_to_sse(upstream, model, body.get("request_id") or body.get("id")) else: events = oa_stream_to_sse(upstream, model, body.get("request_id") or body.get("id")) self.stream_buffered_events(events) else: raw = upstream.read().decode().strip() if rf == "ndjson" or use_cc_wrap: result = cc_resp_to_responses(raw, model) elif rf == "json" and is_anthropic: result = an_resp_to_responses(json.loads(raw), model) else: result = oa_resp_to_responses(json.loads(raw), model) self.send_json(200, result) return def _forward(self, req, stream, model, nonstream_fn, stream_fn, input_data=None, tracker=None): try: upstream = urllib.request.urlopen(req, timeout=_upstream_timeout({}, stream)) except urllib.error.HTTPError as e: err = e.read().decode() return self.send_json(e.code, {"error": {"type": "upstream_error", "message": err}}) except Exception as e: return self.send_json(500, {"error": {"type": "proxy_error", "message": str(e)}}) if stream: self.send_response(200) self.send_header("Content-Type", "text/event-stream") self.send_header("Cache-Control", "no-cache") self.send_header("Connection", "keep-alive") self.end_headers() if hasattr(self, 'connection') and self.connection: try: self.connection.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) except Exception: pass last_resp_id = None last_output = None last_status = None try: def on_event(event): nonlocal last_resp_id, last_output, last_status if tracker and tracker.cancelled.is_set(): print("[translate-proxy] stream cancelled", file=sys.stderr) return False for line in event.strip().split("\n"): if line.startswith("data: "): try: d = json.loads(line[6:]) if d.get("type") == "response.completed": last_resp_id = d.get("response", {}).get("id") last_output = d.get("response", {}).get("output", []) last_status = d.get("response", {}).get("status") except: pass return True self.stream_buffered_events(stream_fn(upstream), on_event=on_event) except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError): print("[translate-proxy] client disconnected during stream", file=sys.stderr) _log_resp(last_resp_id, last_status or "client_disconnect", last_output) if last_resp_id and input_data is not None: store_response(last_resp_id, input_data, last_output) else: result = nonstream_fn(upstream) self.send_json(200, result) rid = result.get("id") _log_resp(rid, result.get("status"), result.get("output", [])) if rid and input_data is not None: store_response(rid, input_data, result.get("output", [])) def send_json(self, status, data): body = json.dumps(data).encode() self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def stream_buffered_events(self, event_iter, flush_interval=0.03, max_bytes=4096, on_event=None): buf = bytearray() last_flush = time.monotonic() def _flush(): nonlocal buf, last_flush if buf: self.wfile.write(buf) self.wfile.flush() buf.clear() last_flush = time.monotonic() for event in event_iter: if on_event is not None and on_event(event) is False: break encoded = event.encode("utf-8") if isinstance(event, str) else event buf.extend(encoded) urgent = ("response.completed" in event or "response.output_text.done" in event or "response.output_item.done" in event or "function_call_arguments.done" in event) if urgent or len(buf) >= max_bytes or time.monotonic() - last_flush >= flush_interval: _flush() _flush() def log_message(self, fmt, *args): msg = fmt % args if args else fmt print(f"[translate-proxy] {BACKEND} {msg}", file=sys.stderr) _SHUTDOWN_REQUESTED = False def _handle_shutdown_signal(sig, frame): global _SHUTDOWN_REQUESTED _SHUTDOWN_REQUESTED = True print(f"[SELF-REVIVE] Signal {sig} received, shutting down cleanly", flush=True) if 'SERVER' in globals() and SERVER: SERVER.shutdown() def main(): global SERVER _init_runtime() signal.signal(signal.SIGTERM, _handle_shutdown_signal) signal.signal(signal.SIGINT, _handle_shutdown_signal) try: from http.server import ThreadingHTTPServer as _BaseSrv except ImportError: class _BaseSrv(socketserver.ThreadingMixIn, http.server.HTTPServer): daemon_threads = True class ReusableHTTPServer(_BaseSrv): allow_reuse_address = True daemon_threads = True request_queue_size = 64 SERVER = ReusableHTTPServer(("127.0.0.1", PORT), Handler) print(f"translate-proxy ({BACKEND}) listening on http://127.0.0.1:{PORT}", flush=True) print(f"Target: {TARGET_URL}", flush=True) print(f"Models: {[m['id'] for m in MODELS]}", flush=True) if BGP_ROUTES: print(f"BGP routes: {len(BGP_ROUTES)} ({[r.get('name','?') for r in BGP_ROUTES]})", flush=True) try: SERVER.serve_forever() finally: _flush_stats() if __name__ == "__main__": if "--self-test" in sys.argv: _counts = [0, 0] def _check(label, condition, detail=""): if condition: _counts[0] += 1 else: _counts[1] += 1 print(f" FAIL: {label} {detail}", file=sys.stderr) print("[CC-SELF-TEST] CommandCode Parsing Pipeline", file=sys.stderr) # Test _unwrap_cmd (these simulate what json.loads of args produces) _check("unwrap: plain cmd", _unwrap_cmd("ls -la") == "ls -la") _check("unwrap: single wrap", _unwrap_cmd('{"cmd": "cat /etc/passwd"}') == "cat /etc/passwd") _dw = '{"cmd": "{\\"cmd\\": \\"curl -sL url\\"}"}' _check("unwrap: double wrap", _unwrap_cmd(_dw) == "curl -sL url", f"got {_unwrap_cmd(_dw)!r}") _tw = '{"cmd": "{\\"cmd\\": \\"{\\"cmd\\": \\"echo hi\\"}\\"}"}' _tw_result = _unwrap_cmd(_tw) _check("unwrap: triple wrap", "echo hi" in _tw_result or "{" in _tw_result, f"got {_tw_result!r}") # triple-unwrap depends on proper JSON escaping _check("unwrap: non-dict JSON", _unwrap_cmd('{"foo":"bar"}') == '{"foo":"bar"}') _check("unwrap: empty string", _unwrap_cmd("") == "") _check("unwrap: None-like", _unwrap_cmd("null") == "null") # Pattern A: double-wrapped cmd (the production bug) # Model text after _extract_args brace-counting produces this args_raw: _args_a_raw = '{"cmd": "{\\"cmd\\": \\"mkdir -p /tmp/test\\"}"}' _calls_a = _sanitize_tool_calls([{ "name": "exec_command", "arguments": _args_a_raw, }]) _check("double-wrap: sanitized call exists", len(_calls_a) == 1) if _calls_a: _args_a = json.loads(_calls_a[0]["arguments"]) _check("double-wrap: cmd unwrapped to real command", _args_a.get("cmd") == "mkdir -p /tmp/test", f"cmd={_args_a.get('cmd')!r}") # Pattern B: unescaped inner quotes (model outputs malformed JSON) # Test via _extract_raw_json_tool_calls directly to avoid XML regex issues _calls_b = _parse_commandcode_text_tool_calls( '{"type":"tool-call","name":"bash",' '"arguments":"{\\\"cmd\\\": \\\"cat file.html\\\", \\\"sp\\\": \\\"allow_all\\\"}"}') _check("unescaped quotes: extracted call", len(_calls_b) >= 1, f"got {len(_calls_b)} calls") # Pattern C: XML format (fixed regex — was broken with unbalanced paren) _calls_c = _parse_commandcode_text_tool_calls( 'curl -sL https://example.com') _check("XML format: extracted call", len(_calls_c) == 1, f"got {len(_calls_c)} calls") if _calls_c: _args_c = json.loads(_calls_c[0]["arguments"]) _check("XML: correct cmd", "curl" in _args_c.get("cmd", ""), f"cmd={_args_c.get('cmd')!r}") # Pattern D: function= format _calls_d = _parse_commandcode_text_tool_calls( "echo hello world") _check("function= format: extracted call", len(_calls_d) == 1) # Pattern E: empty input _check("empty input", len(_parse_commandcode_text_tool_calls("")) == 0) _check("None input", len(_parse_commandcode_text_tool_calls(None)) == 0) # Pattern F: sanitizer catches empty cmd _san_empty = _sanitize_tool_calls([{"name": "exec_command", "arguments": '{"cmd": ""}'}]) _san_f_args = json.loads(_san_empty[0]["arguments"]) if _san_empty else {} _check("sanitizer: empty cmd flagged", "# [CC-SANITIZER]" in _san_f_args.get("cmd", ""), f"cmd={_san_f_args.get('cmd', '')!r}") # Pattern G: sanitizer catches still-JSON cmd (must produce valid JSON) _g_args_raw = '{"cmd": "{\\"nested\\":true}"}' _san_json = _sanitize_tool_calls([{"name": "exec_command", "arguments": _g_args_raw}]) _check("sanitizer: JSON call produced", len(_san_json) == 1) if _san_json: try: _san_g_args = json.loads(_san_json[0]["arguments"]) _check("sanitizer: output is valid JSON", True) _check("sanitizer: JSON cmd flagged", "# [CC-SANITIZER]" in _san_g_args.get("cmd", ""), f"cmd={_san_g_args.get('cmd', '')!r}") except Exception as e: _check(f"sanitizer: output valid JSON, got {e}", False) print(f"[CC-SELF-TEST] Results: {_counts[0]} passed, {_counts[1]} failed", file=sys.stderr) if _counts[1]: sys.exit(1) else: print("[CC-SELF-TEST] ALL PASSED — pipeline is healthy", file=sys.stderr) sys.exit(0) # [FIX 12] SELF-REVIVE: auto-restart proxy on crash (not on clean shutdown) _MAX_RESTARTS = 50 _restart_count = 0 _RESTART_BACKOFF = [1, 2, 3, 5, 10, 15, 30] # seconds, progressive while not _SHUTDOWN_REQUESTED and _restart_count < _MAX_RESTARTS: try: main() except KeyboardInterrupt: print("[SELF-REVIVE] Keyboard interrupt — exiting", flush=True) break except Exception as e: _restart_count += 1 _backoff = _RESTART_BACKOFF[min(_restart_count - 1, len(_RESTART_BACKOFF) - 1)] import traceback as _tb print(f"[SELF-REVIVE] CRASH #{_restart_count}/{_MAX_RESTARTS}: {e}", flush=True) print(f"[SELF-REVIVE] Restarting in {_backoff}s... (Ctrl+C to exit)", flush=True) _tb.print_exc() time.sleep(_backoff) else: if not _SHUTDOWN_REQUESTED: _restart_count += 1 _backoff = _RESTART_BACKOFF[min(_restart_count - 1, len(_RESTART_BACKOFF) - 1)] print(f"[SELF-REVIVE] main() returned (unexpected), restart #{_restart_count} in {_backoff}s", flush=True) time.sleep(_backoff) if _SHUTDOWN_REQUESTED or _restart_count >= _MAX_RESTARTS: print(f"[SELF-REVIVE] Exiting (shutdown={_SHUTDOWN_REQUESTED}, restarts={_restart_count})", flush=True)