diff --git a/codex-launcher-gui.py b/codex-launcher-gui.py index a23bc0b..1ce59aa 100644 --- a/codex-launcher-gui.py +++ b/codex-launcher-gui.py @@ -444,18 +444,20 @@ PROVIDER_PRESETS = { "gemini-2.5-flash", "gemini-2.5-pro", ], }, - "Google Antigravity (OAuth)": { - "backend_type": "gemini-oauth-antigravity", - "base_url": "https://cloudcode-pa.googleapis.com", - "oauth_provider": "google-antigravity", - "models": [ - "Gemini 3.5 Flash (High)", "Gemini 3.5 Flash (Medium)", "Gemini 3.5 Flash (Low)", - "Gemini 3.1 Pro (High)", "Gemini 3.1 Pro (Low)", - "Claude Sonnet 4.6 (Thinking)", - "Claude Opus 4.6 (Thinking)", - "GPT-OSS 120B (Medium)", - ], - }, + "Google Antigravity (OAuth)": { + "backend_type": "gemini-oauth-antigravity", + "base_url": "https://cloudcode-pa.googleapis.com", + "oauth_provider": "google-antigravity", + "models": [ + "antigravity-gemini-3-flash", + "antigravity-gemini-3-pro", + "antigravity-gemini-3.1-pro", + "antigravity-claude-sonnet-4-6", + "antigravity-claude-opus-4-6-thinking", + "gemini-2.5-flash", "gemini-2.5-pro", + "gemini-3-flash-preview", "gemini-3-pro-preview", "gemini-3.1-pro-preview", + ], + }, "OpenAdapter": { "backend_type": "openai-compat", "base_url": "https://api.openadapter.in/v1", @@ -503,6 +505,46 @@ PROVIDER_PRESETS = { "moonshotai/kimi-k2.6", "minimax/minimax-m2.7", ], }, + "Perplexity": { + "backend_type": "openai-compat", + "base_url": "https://api.perplexity.ai", + "models": ["sonar", "sonar-pro", "sonar-reasoning-pro", "sonar-deep-research"], + }, + "Cohere": { + "backend_type": "openai-compat", + "base_url": "https://api.cohere.ai/compatibility/v1", + "models": [], + }, + "Hugging Face": { + "backend_type": "openai-compat", + "base_url": "https://router.huggingface.co/v1", + "models": [], + }, + "Together AI": { + "backend_type": "openai-compat", + "base_url": "https://api.together.xyz/v1", + "models": [], + }, + "Groq": { + "backend_type": "openai-compat", + "base_url": "https://api.groq.com/openai/v1", + "models": [], + }, + "Fireworks AI": { + "backend_type": "openai-compat", + "base_url": "https://api.fireworks.ai/inference/v1", + "models": [], + }, + "LM Studio (local)": { + "backend_type": "openai-compat", + "base_url": "http://127.0.0.1:1234/v1", + "models": [], + }, + "vLLM / OpenAI-Compatible (self-hosted)": { + "backend_type": "openai-compat", + "base_url": "http://localhost:8000/v1", + "models": [], + }, } def safe_name(name): @@ -519,6 +561,7 @@ def label_for_backend(backend_type): "anthropic": "Anthropic", "command-code": "Command Code", "codebuff": "Codebuff (Free AI)", + "freebuff": "Freebuff (Free AI)", "native": "Native", }.get(backend_type, backend_type) @@ -569,6 +612,27 @@ def apply_provider_preset(endpoint, preset_name): updated["default_model"] = updated["models"][0] return updated +def _check_provider_latency(endpoint, timeout=5): + import urllib.request + base_url = (endpoint.get("base_url") or "").rstrip("/") + api_key = endpoint.get("api_key", "") + bt = endpoint.get("backend_type", "openai-compat") + if not base_url or not api_key: + return None + url = base_url + "/models" if not base_url.endswith("/models") else base_url + headers = {"Authorization": f"Bearer {api_key}"} + if bt == "anthropic": + headers = {"x-api-key": api_key, "anthropic-version": "2023-06-01"} + url = base_url.replace("/v1", "") + "/v1/models" if "/v1" not in base_url else base_url + "/models" + try: + req = urllib.request.Request(url, headers=headers) + t0 = time.time() + with urllib.request.urlopen(req, timeout=timeout) as resp: + resp.read(1) + return time.time() - t0 + except Exception: + return None + def _doctor_check_streaming(base_url, key, bt, model, add): if bt == "anthropic": test_url = f"{base_url}/v1/messages" @@ -2086,6 +2150,10 @@ class LauncherWin(Gtk.Window): self._combo.connect("changed", lambda c: self._on_endpoint_changed()) sel_box.pack_start(self._combo, True, True, 0) + self._latency_label = Gtk.Label(label=" -- ") + self._latency_label.set_opacity(0.7) + sel_box.pack_start(self._latency_label, False, False, 4) + # model selector sel_box.pack_start(Gtk.Label(label="Model:"), False, False, 0) self._model_combo = Gtk.ComboBoxText() @@ -2288,10 +2356,11 @@ class LauncherWin(Gtk.Window): def _on_endpoint_changed(self): name = self._combo.get_active_text() - is_bgp = name and name.startswith("🔀 ") + is_bgp = name and name.startswith("\U0001f500 ") bgp_name = name[2:] if is_bgp else None ep = get_endpoint(name) if name and not is_bgp else None self._model_combo.remove_all() + self._check_latency(name) if is_bgp: pool = None for p in load_bgp_pools().get("pools", []): @@ -2320,6 +2389,34 @@ class LauncherWin(Gtk.Window): elif models: self._model_combo.set_active(0) + def _check_latency(self, ep_name): + self._latency_label.set_text("...") + is_bgp = ep_name and ep_name.startswith("\U0001f500 ") + if is_bgp or not ep_name: + self._latency_label.set_text(" -- ") + return + ep = get_endpoint(ep_name) + if not ep: + self._latency_label.set_text(" -- ") + return + + def _run(): + lat = _check_provider_latency(ep) + def _update(): + if lat is None: + self._latency_label.set_text(" -- ") + self._latency_label.set_opacity(0.5) + elif lat < 1.0: + self._latency_label.set_text(f" {lat:.2f}s") + self._latency_label.set_markup(f'{lat:.2f}s') + elif lat < 3.0: + self._latency_label.set_markup(f'{lat:.2f}s') + else: + self._latency_label.set_markup(f'{lat:.2f}s') + GLib.idle_add(_update) + + threading.Thread(target=_run, daemon=True).start() + # ── endpoint mgr ───────────────────────────────────────────── def _open_mgr(self): @@ -2406,7 +2503,7 @@ class LauncherWin(Gtk.Window): return for ep in load_endpoints().get("endpoints", []): if ep.get("name") == ep_name: - self._start_proxy(ep) + _start_proxy_for(ep, self.log) break except Exception as e: self.log(f"[AI Monitor] Proxy restart failed: {e}") @@ -2421,7 +2518,7 @@ class LauncherWin(Gtk.Window): return for ep in load_endpoints().get("endpoints", []): if ep.get("name") == ep_name: - self._start_proxy(ep) + _start_proxy_for(ep, self.log) self.log("Proxy restarted") break except Exception as e: diff --git a/translate-proxy.py b/translate-proxy.py index 0641346..ac44d7f 100755 --- a/translate-proxy.py +++ b/translate-proxy.py @@ -242,6 +242,7 @@ def load_config(): p = argparse.ArgumentParser(description="Responses API translation proxy") p.add_argument("--config", help="JSON config file path") p.add_argument("--port", type=int, default=None) + p.add_argument("--host", default=None, help="Bind address (default: 127.0.0.1, use 0.0.0.0 for Docker)") p.add_argument("--backend", default=None, choices=["openai-compat", "anthropic", "command-code", "codebuff", "freebuff", "auto"]) p.add_argument("--target-url", default=None) p.add_argument("--api-key", default=None) @@ -308,6 +309,8 @@ REASONING_ENABLED = True REASONING_EFFORT = "medium" FORCE_MODEL = "" BGP_ROUTES = [] +CAVEMAN_MODE = False +RTK_COMPRESSION = False PROMPT_ENHANCER = False PROMPT_ENHANCER_MODE = "offline" PROMPT_ENHANCER_MODEL = "" @@ -374,6 +377,54 @@ _antigravity_version = "2.0.1" _antigravity_version_checked = 0 _antigravity_version_lock = threading.Lock() _antigravity_version_validated = False +class CircuitBreaker: + def __init__(self, failure_threshold=5, recovery_timeout=60): + self.failure_threshold = failure_threshold + self.recovery_timeout = recovery_timeout + self.failure_count = 0 + self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN + self.last_state_change = time.time() + self.lock = threading.Lock() + + def can_execute(self): + with self.lock: + now = time.time() + if self.state == "OPEN": + if now - self.last_state_change > self.recovery_timeout: + self.state = "HALF_OPEN" + self.last_state_change = now + print(f"[circuit-breaker] Circuit transitioning from OPEN to HALF_OPEN", file=sys.stderr) + return True + return False + return True + + def record_success(self): + with self.lock: + if self.state == "HALF_OPEN": + print(f"[circuit-breaker] Circuit transitioning from HALF_OPEN to CLOSED (success)", file=sys.stderr) + self.failure_count = 0 + self.state = "CLOSED" + self.last_state_change = time.time() + + def record_failure(self): + with self.lock: + self.failure_count += 1 + now = time.time() + if self.state in ("CLOSED", "HALF_OPEN"): + if self.failure_count >= self.failure_threshold or self.state == "HALF_OPEN": + self.state = "OPEN" + self.last_state_change = now + print(f"[circuit-breaker] Circuit tripped! Transitioning to OPEN. Failure count: {self.failure_count}", file=sys.stderr) + +_circuit_breakers = {} +_circuit_breakers_lock = threading.Lock() + +def get_circuit_breaker(backend_name): + with _circuit_breakers_lock: + if backend_name not in _circuit_breakers: + _circuit_breakers[backend_name] = CircuitBreaker() + return _circuit_breakers[backend_name] + _last_user_urls = collections.deque(maxlen=20) _conn_pool_lock = threading.Lock() @@ -721,7 +772,7 @@ class GoogleAccountPool(AccountPool): self.variant = variant def _do_load(self): - cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy") + cache_dir = _LOG_DIR accounts = [] primary = f"google-{self.variant}-oauth-token.json" primary_path = os.path.join(cache_dir, primary) @@ -865,7 +916,7 @@ def _refresh_google_token(token_data, token_path): return token_data.get("access_token", "") def _force_refresh_google_token(): - token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", + token_path = os.path.join(_LOG_DIR, "google-antigravity-oauth-token.json" if OAUTH_PROVIDER == "google-antigravity" else "google-oauth-token.json") try: @@ -960,7 +1011,7 @@ def _validate_antigravity_version(version, access_token=None, project_id=None): if not access_token: access_token = _refresh_oauth_token() if not project_id: - token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", "google-antigravity-oauth-token.json") + token_path = os.path.join(_LOG_DIR, "google-antigravity-oauth-token.json") try: with open(token_path) as f: project_id = json.load(f).get("project_id", "") @@ -1014,7 +1065,7 @@ def _validate_antigravity_version(version, access_token=None, project_id=None): return True def _fetch_antigravity_version(): - cache_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", "antigravity-version.json") + cache_path = os.path.join(_LOG_DIR, "antigravity-version.json") try: with open(cache_path) as f: cached = json.load(f) @@ -1027,7 +1078,7 @@ def _fetch_antigravity_version(): project_id = None try: access_token = _refresh_oauth_token() - token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", "google-antigravity-oauth-token.json") + token_path = os.path.join(_LOG_DIR, "google-antigravity-oauth-token.json") with open(token_path) as f: project_id = json.load(f).get("project_id", "") except Exception: @@ -1128,6 +1179,7 @@ def _init_runtime(): global MODELS, CC_VERSION, REASONING_ENABLED, REASONING_EFFORT, BGP_ROUTES global _api_key_pool, PROMPT_ENHANCER global VISION_FALLBACK_URL, VISION_FALLBACK_MODEL, VISION_FALLBACK_KEY + global CAVEMAN_MODE, RTK_COMPRESSION CONFIG = load_config() PORT = CONFIG["port"] @@ -1161,6 +1213,8 @@ def _init_runtime(): if not VISION_FALLBACK_KEY: VISION_FALLBACK_KEY = _vision_key BGP_ROUTES = CONFIG.get("bgp_routes", []) + CAVEMAN_MODE = CONFIG.get("caveman_mode", False) or os.environ.get("CAVEMAN_MODE") == "1" + RTK_COMPRESSION = CONFIG.get("rtk_compression", False) or os.environ.get("RTK_COMPRESSION") == "1" _api_key_pool = None if API_KEY and "," in API_KEY and not OAUTH_PROVIDER.startswith("google") and BACKEND not in ("codebuff", "freebuff"): _api_key_pool = APIKeyPool(BACKEND, API_KEY) @@ -1243,7 +1297,7 @@ def _hot_reload_api_key(): if (BACKEND or "").startswith("gemini-oauth") and (OAUTH_PROVIDER or "").startswith("google"): token_name = "google-antigravity-oauth-token.json" if OAUTH_PROVIDER == "google-antigravity" else "google-cli-oauth-token.json" - token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", token_name) + token_path = os.path.join(_LOG_DIR, token_name) _preemptive_refresh_token(token_path) try: with open(token_path) as _tf: @@ -1395,7 +1449,7 @@ def _refresh_oauth_token_for(api_key, oauth_provider): if not oauth_provider.startswith("google"): return api_key token_name = "google-antigravity-oauth-token.json" if oauth_provider == "google-antigravity" else "google-cli-oauth-token.json" - token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", token_name) + token_path = os.path.join(_LOG_DIR, token_name) if not os.path.exists(token_path): return api_key try: @@ -1764,7 +1818,7 @@ _CROF_ADAPTIVE = { _model_max_tokens = {} _model_max_tokens_lock = threading.Lock() -def _estimate_tokens(item): +def _estimate_item_tokens(item): if not isinstance(item, dict): return 4 t = item.get("type", "") @@ -1794,7 +1848,7 @@ def _estimate_tokens(item): def _estimate_input_tokens(input_data): if not isinstance(input_data, list): return 0 - return sum(_estimate_tokens(i) for i in input_data) + return sum(_estimate_item_tokens(i) for i in input_data) def _get_model_max_tokens(model): with _model_max_tokens_lock: @@ -1873,6 +1927,8 @@ def _sorted_bgp_routes(): return sorted(BGP_ROUTES, key=lambda r: _score_route(r, stats)) def _crof_record(model, n_items, success): + if "crof.ai" not in TARGET_URL: + return if not isinstance(n_items, int) or n_items < 1: return entry = {"model": model, "items": n_items, "ok": success} @@ -2010,9 +2066,89 @@ def _extract_files(items): pass return files +def _rtk_compress_tool_output(output_str): + if not isinstance(output_str, str) or not output_str: + return output_str + + lines = output_str.splitlines() + compressed_lines = [] + + # Heuristic 1: Compress git diffs + is_diff = output_str.startswith("diff --git") or "@@ -" in output_str + + if is_diff: + unchanged_count = 0 + held_lines = [] + for line in lines: + if line.startswith("+") or line.startswith("-") or line.startswith("@@") or line.startswith("diff --git") or line.startswith("---") or line.startswith("+++"): + if unchanged_count > 2: + compressed_lines.append(f"... [omitted {unchanged_count} unchanged lines] ...") + else: + compressed_lines.extend(held_lines) + held_lines = [] + unchanged_count = 0 + compressed_lines.append(line) + else: + held_lines.append(line) + unchanged_count += 1 + if held_lines: + if unchanged_count > 2: + compressed_lines.append(f"... [omitted {unchanged_count} unchanged lines] ...") + else: + compressed_lines.extend(held_lines) + return "\n".join(compressed_lines) + + # Heuristic 2: Compress directory listings / file trees + is_tree_or_list = any("node_modules" in line or ".git/" in line or "dist/" in line for line in lines[:50]) + if is_tree_or_list: + ignored_patterns = re.compile(r'(\.git/|node_modules/|__pycache__/|\.venv/|\.pyc$|\.o$|dist/|build/|\.next/)') + omitted = 0 + for line in lines: + if ignored_patterns.search(line): + omitted += 1 + else: + compressed_lines.append(line) + if omitted > 0: + compressed_lines.append(f"... [omitted {omitted} dependency/build files from tree] ...") + return "\n".join(compressed_lines) + + # Heuristic 3: Compress long logs or multiple repeating lines + last_line = None + repeat_count = 0 + for line in lines: + if line == last_line and line.strip(): + repeat_count += 1 + if repeat_count <= 1: + compressed_lines.append(line) + else: + if repeat_count > 1: + compressed_lines.append(f"... [repeated {repeat_count - 1} times] ...") + repeat_count = 0 + compressed_lines.append(line) + last_line = line + if repeat_count > 1: + compressed_lines.append(f"... [repeated {repeat_count - 1} times] ...") + + return "\n".join(compressed_lines) + def _compact_input(input_data): if isinstance(input_data, str): return input_data + + if RTK_COMPRESSION and isinstance(input_data, list): + compressed_data = [] + for item in input_data: + if isinstance(item, dict) and item.get("type") == "function_call_output": + o = item.get("output", "") + if isinstance(o, str) and o: + compressed = _rtk_compress_tool_output(o) + if len(compressed) < len(o): + print(f"[rtk] compressed tool output length: {len(o)} -> {len(compressed)} (-{((len(o)-len(compressed))/len(o))*100:.1f}%)", file=sys.stderr) + item = dict(item) + item["output"] = compressed + compressed_data.append(item) + input_data = compressed_data + if not isinstance(input_data, list) or len(input_data) <= _MAX_INPUT_ITEMS: out = [] for item in input_data: @@ -2129,11 +2265,21 @@ _PROVIDER_POLICIES = { "tool_output_limit": 8000, "max_input_items": 200}, "googleapis": {"compaction": "conservative", "context_size": 1000000, "tool_output_limit": 8000, "max_input_items": 250}, + "groq": {"reasoning_mode": "off", "max_tokens": 32768, "strip_reasoning": True, + "tool_output_limit": 4000, "max_input_items": 20, "compaction": "aggressive", + "prompt_caching": "none"}, + "cerebras": {"reasoning_mode": "off", "max_tokens": 32768, "strip_reasoning": True, + "tool_output_limit": 4000, "max_input_items": 20, "compaction": "aggressive", + "prompt_caching": "none"}, + "xiaomimimo": {"reasoning_mode": "provider_default", "max_tokens": 65536, "strip_reasoning": True, + "tool_output_limit": 8000, "max_input_items": 40, "compaction": "balanced", + "prompt_caching": "none"}, } _DEFAULT_PROVIDER_POLICY = { "compaction": "balanced", "context_size": 128000, "tool_output_limit": 6000, "max_input_items": 60, + "prompt_caching": "auto", } def provider_policy(target_url=None, backend=None): @@ -2923,12 +3069,14 @@ def oa_stream_to_sse(chat_stream, model, req_id, _reasoning_out=None): tc_buf = {} fr = None msg_opened = False + _last_chunk = {} yield emit("response.created", {"type": "response.created", "response": {"id": resp_id, "object": "response", "model": model, "status": "in_progress", "created": int(time.time()), "output": []}}) yield emit("response.in_progress", {"type": "response.in_progress", "response": {"id": resp_id}}) + _last_stream_usage = {} for line in _stream_with_idle_timeout(chat_stream): line = line.decode("utf-8", errors="replace").strip() if not line or line.startswith(":") or line == "data: [DONE]": @@ -2942,6 +3090,10 @@ def oa_stream_to_sse(chat_stream, model, req_id, _reasoning_out=None): choices = chunk.get("choices", []) if not choices: continue + usage = chunk.get("usage") + if usage: + _last_stream_usage = usage + fr = choices[0].get("finish_reason") delta = choices[0].get("delta", {}) fr = choices[0].get("finish_reason") @@ -3130,6 +3282,8 @@ def an_stream_to_sse(stream, model, req_id): tc_args = "" block_type = None stop_reason = "end_turn" + an_input_tokens = 0 + an_output_tokens = 0 yield emit("response.created", {"type": "response.created", "response": {"id": resp_id, "object": "response", "model": model, @@ -3153,7 +3307,9 @@ def an_stream_to_sse(stream, model, req_id): et = data.get("type", "") if et == "message_start": - pass + msg_usage = data.get("message", {}).get("usage", {}) + if msg_usage: + an_input_tokens = msg_usage.get("input_tokens", 0) elif et == "content_block_start": cb_type = data.get("content_block", {}).get("type", "") @@ -3219,14 +3375,24 @@ def an_stream_to_sse(stream, model, req_id): elif et == "message_delta": stop_reason = data.get("delta", {}).get("stop_reason", "end_turn") + delta_usage = data.get("usage", {}) + if delta_usage: + an_output_tokens = delta_usage.get("output_tokens", 0) elif et == "message_stop": sm = {"end_turn": "completed", "max_tokens": "incomplete", "stop_sequence": "completed", "tool_use": "completed"} status = sm.get(stop_reason, "incomplete") + _final_usage = { + "input_tokens": an_input_tokens, + "output_tokens": an_output_tokens, + "total_tokens": an_input_tokens + an_output_tokens + } yield emit("response.completed", {"type": "response.completed", "response": {"id": resp_id, "object": "response", "model": model, - "status": status, "created": int(time.time()), "output": completed}}) + "status": status, "created": int(time.time()), "output": completed, + "usage": _final_usage}}) + _DEFAULT_CC_CONFIG = { "workingDir": tempfile.gettempdir(), @@ -4086,7 +4252,7 @@ def cc_stream_to_sse(cc_stream, model, req_id): total_usage = {} _event_types_seen = set() - _debug_log_path = os.path.expanduser("~/.cache/codex-proxy/cc-debug.log") + _debug_log_path = os.path.join(_LOG_DIR, "cc-debug.log") _debug_fh = open(_debug_log_path, "a") # [FIX 14] always write debug to FILE (not just stderr which may be piped) _deflog = lambda *a, **kw: print(*a, file=_debug_fh, flush=True, **kw) @@ -4924,6 +5090,80 @@ def _preprocess_vision_input(input_data, schema): _MAX_REQLOG_LINES = 2000 +def _log_cache_stats(raw_resp): + try: + usage = raw_resp.get("usage", {}) + cached = usage.get("prompt_tokens_details", {}).get("cached_tokens", 0) + total = usage.get("prompt_tokens", 0) or usage.get("input_tokens", 0) + if cached and total: + pct = (cached / total * 100) if total else 0 + print(f"[cache] stats: cached={cached}/{total} ({pct:.1f}%)", file=sys.stderr) + except Exception: + pass + +def _extract_text_length(items): + if not items: + return 0 + if isinstance(items, str): + return len(items) + if isinstance(items, dict): + items = [items] + if not isinstance(items, list): + return 0 + + total_len = 0 + for item in items: + if not isinstance(item, dict): + continue + itype = item.get("type") + if itype == "message": + content = item.get("content", "") + if isinstance(content, str): + total_len += len(content) + elif isinstance(content, list): + for part in content: + if isinstance(part, dict): + total_len += len(part.get("text", part.get("input_text", ""))) + elif itype == "function_call": + total_len += len(item.get("name", "")) + args = item.get("arguments", "") + if isinstance(args, str): + total_len += len(args) + elif isinstance(args, dict): + total_len += len(json.dumps(args)) + elif itype == "function_call_output": + out = item.get("output", "") + if isinstance(out, str): + total_len += len(out) + elif isinstance(out, list): + for part in out: + if isinstance(part, dict): + total_len += len(part.get("text", "")) + return total_len + +def _estimate_tokens_from_items(items): + return _extract_text_length(items) // 4 + +def _record_usage_with_tokens(provider, model, success, duration_s, raw_resp, input_items=None, output_items=None, error_type=None, **kwargs): + try: + usage = raw_resp.get("usage", {}) if isinstance(raw_resp, dict) else {} + if not usage and isinstance(raw_resp, dict) and ("input_tokens" in raw_resp or "prompt_tokens" in raw_resp): + usage = raw_resp + + tokens_in = int(usage.get("prompt_tokens", 0) or usage.get("input_tokens", 0) or 0) + tokens_out = int(usage.get("completion_tokens", 0) or usage.get("output_tokens", 0) or 0) + + # Fallback estimation heuristic + if tokens_in == 0 and input_items: + tokens_in = _estimate_tokens_from_items(input_items) + if tokens_out == 0 and output_items: + tokens_out = _estimate_tokens_from_items(output_items) + + _record_usage(provider, model, success, duration_s, tokens_in, tokens_out, error_type) + except Exception as e: + print(f"[usage] error recording usage: {e}", file=sys.stderr) + _record_usage(provider, model, success, duration_s, error_type=error_type) + def _log_resp(resp_id, status, output): try: import datetime as _dt @@ -5341,6 +5581,7 @@ def _antigravity_normalize_context(input_data, model=""): class Handler(http.server.BaseHTTPRequestHandler): protocol_version = "HTTP/1.1" + _request_failed = False def do_GET(self): if self.path in ("/v1/models", "/models"): @@ -5431,9 +5672,24 @@ class Handler(http.server.BaseHTTPRequestHandler): except Exception as e: return self.send_json(400, {"error": {"message": f"Bad request: {e}"}}) + if CAVEMAN_MODE and isinstance(body, dict): + instructions = body.get("instructions", "").strip() + caveman_prompt = "You are in Caveman Mode. Answer concisely, using technical terms. Strip away all pleasantries, greetings, introductory filler, and concluding remarks. Go straight to the answer/code." + if instructions: + instructions = f"{instructions}\n\n{caveman_prompt}" + else: + instructions = caveman_prompt + body["instructions"] = instructions + self._session_id = uuid.uuid4().hex[:8] _sid = self._session_id + # Check Circuit Breaker + cb = get_circuit_breaker(BACKEND) + if not cb.can_execute(): + print(f"[{_sid}] Circuit breaker is OPEN for backend {BACKEND}. Rejecting request.", file=sys.stderr) + return self.send_json(503, {"error": {"type": "circuit_breaker_open", "message": f"Circuit breaker is open for backend {BACKEND}"}}) + import datetime as _dt _log_path = os.path.join(_LOG_DIR, "requests.log") _ts = _dt.datetime.now().isoformat() @@ -5493,6 +5749,9 @@ class Handler(http.server.BaseHTTPRequestHandler): wait_ms = (time.monotonic() - wait_start) * 1000 if wait_ms > 100: print(f"[{_sid}] waited {wait_ms:.0f}ms for upstream slot (concurrency gate)", file=sys.stderr) + self._last_status = 200 + self._request_failed = False + cb = get_circuit_breaker(BACKEND) try: with RequestTracker(request_id) as tracker: if BACKEND == "auto": @@ -5510,8 +5769,19 @@ class Handler(http.server.BaseHTTPRequestHandler): self._handle_gemini_oauth(body, model, stream, tracker) else: self._handle_openai_compat(body, model, stream, tracker) + + # Record success or failure + is_4xx = (400 <= getattr(self, "_last_status", 200) < 500) + if self._request_failed and not is_4xx: + cb.record_failure() + else: + cb.record_success() + update_snapshot_response(request_id, "completed", time.time() - _req_t0) except Exception as _snap_err: + is_4xx = (400 <= getattr(self, "_last_status", 200) < 500) + if not is_4xx: + cb.record_failure() update_snapshot_response(request_id, "error", time.time() - _req_t0, _snap_err) raise finally: @@ -5681,6 +5951,12 @@ class Handler(http.server.BaseHTTPRequestHandler): def _is_mimo_provider(): return "xiaomimimo.com" in TARGET_URL + @staticmethod + def _make_cache_key(model, instructions): + import hashlib + raw = f"{model}|{instructions}" + return hashlib.sha256(raw.encode()).hexdigest()[:16] + def _build_chat_body(self, model, messages, body, stream): chat_body = {"model": model, "messages": messages} is_mimo = self._is_mimo_provider() @@ -5698,6 +5974,12 @@ class Handler(http.server.BaseHTTPRequestHandler): if body.get("tool_choice"): chat_body["tool_choice"] = body["tool_choice"] chat_body["stream"] = stream + _pc_policy = provider_policy().get("prompt_caching", "auto") + if _pc_policy != "none" and not is_mimo: + _instr = body.get("instructions", "").strip() + if _instr: + chat_body["prompt_cache_key"] = self._make_cache_key(model, _instr) + chat_body["prompt_cache_retention"] = "24h" if is_mimo: if not REASONING_ENABLED or REASONING_EFFORT == "none": chat_body["thinking"] = {"type": "disabled"} @@ -5749,7 +6031,7 @@ class Handler(http.server.BaseHTTPRequestHandler): body["input"] = input_data access_token = _refresh_oauth_token() - token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", "google-antigravity-oauth-token.json") + token_path = os.path.join(_LOG_DIR, "google-antigravity-oauth-token.json") project_id = "" try: with open(token_path) as f: @@ -6411,6 +6693,8 @@ class Handler(http.server.BaseHTTPRequestHandler): self.send_json(200, resp) def _handle_gemini_oauth(self, body, model, stream, tracker=None): + ag_state = {} + _mp_oa = {"max_tool_calls": 150, "warn_tool_calls": 100} input_data = body.get("input", "") policy = provider_policy() original_model = model @@ -6505,7 +6789,7 @@ class Handler(http.server.BaseHTTPRequestHandler): access_token = _refresh_oauth_token() token_name = "google-antigravity-oauth-token.json" if OAUTH_PROVIDER == "google-antigravity" else "google-cli-oauth-token.json" - token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", token_name) + token_path = os.path.join(_LOG_DIR, token_name) project_id = "" try: with open(token_path) as f: @@ -7083,6 +7367,7 @@ class Handler(http.server.BaseHTTPRequestHandler): buf = "" stream_finished = False last_finish = "" + _last_stream_usage = {} try: for raw_line in _stream_with_idle_timeout(upstream, _idle_timeout_for_model(model)): if tracker and tracker.cancelled.is_set(): @@ -7097,6 +7382,13 @@ class Handler(http.server.BaseHTTPRequestHandler): if not line.strip() and buf: try: chunk = json.loads(buf) + usage = chunk.get("response", chunk).get("usageMetadata", {}) + if usage: + _last_stream_usage = { + "prompt_tokens": usage.get("promptTokenCount", 0), + "completion_tokens": usage.get("candidatesTokenCount", 0), + "total_tokens": usage.get("totalTokenCount", 0) + } except Exception: buf = "" continue @@ -7165,6 +7457,7 @@ class Handler(http.server.BaseHTTPRequestHandler): buf += line except TimeoutError as te: print(f"[{self._session_id}] [antigravity-v2] STREAM TIMEOUT: {te}", file=sys.stderr) + self._request_failed = True _log_resp(resp_id, "stream_timeout", [{"type": "error", "code": "stream_timeout", "message": str(te)}]) try: flush_event("response.failed", {"type": "response.failed", "response": {"id": resp_id, "object": "response", "status": "failed", "error": {"type": "stream_timeout", "message": str(te)[:200]}}}) @@ -7176,6 +7469,11 @@ class Handler(http.server.BaseHTTPRequestHandler): print(f"[{self._session_id}] [antigravity-v2] client disconnected during stream", file=sys.stderr) _log_resp(resp_id, "client_disconnect", []) return + except Exception as e: + print(f"[{self._session_id}] [antigravity-v2] stream error: {e}", file=sys.stderr) + self._request_failed = True + self.close_connection = True + return if OAUTH_PROVIDER.startswith("google") and full_text and not current_tool_calls and last_finish == "MAX_TOKENS" and not stream_finished: result = _auto_continue_gemini(self, flush_event, message_id, model, gen_config, gemini_tools, system_parts, project_id, headers, endpoints, url_suffix, full_text, output_items, message_started) @@ -7205,6 +7503,10 @@ class Handler(http.server.BaseHTTPRequestHandler): flush_event("response.completed", {"type": "response.completed", "response": final_resp}) self.close_connection = True + provider = TARGET_URL.split("//")[-1].split("/")[0] + success = not self._request_failed + _record_usage_with_tokens(provider, model, success, time.time() - created, _last_stream_usage, input_items=input_data, output_items=out) + with _response_store_lock: _response_store[resp_id] = final_resp while len(_response_store) > _MAX_STORED: @@ -7239,6 +7541,17 @@ class Handler(http.server.BaseHTTPRequestHandler): _response_store.popitem(last=False) self.send_json(200, resp) + provider = TARGET_URL.split("//")[-1].split("/")[0] + usage = data.get("response", data).get("usageMetadata", {}) + _usage_dict = {} + if usage: + _usage_dict = { + "prompt_tokens": usage.get("promptTokenCount", 0), + "completion_tokens": usage.get("candidatesTokenCount", 0), + "total_tokens": usage.get("totalTokenCount", 0) + } + _record_usage_with_tokens(provider, model, True, time.time() - created, _usage_dict, input_items=input_data, output_items=out) + def _handle_bgp(self, body, model, stream, messages, input_data): routes = _sorted_bgp_routes() routes = [r for r in routes if _bucket_for_route(r).allow()] @@ -7376,13 +7689,15 @@ class Handler(http.server.BaseHTTPRequestHandler): collected_events.append(event) _observe_event(event) print(f"[{self._session_id}] stream ended: events={len(collected_events)} finish={finish_reason} has_content={has_content} has_message={has_message} has_tool_call={has_tool_call} elapsed={time.time()-t0:.1f}s", file=sys.stderr) + pass except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError): print("[translate-proxy] client disconnected during stream", file=sys.stderr) _crof_record(model, n_items, False) _log_resp(last_resp_id, "client_disconnect", last_output) return except (TimeoutError, OSError, urllib.error.URLError) as e: - print(f"[translate-proxy] upstream error during stream: {type(e).__name__}: {e}", file=sys.stderr) + print(f"[translate-proxy upstream error during stream: {type(e).__name__}: {e}", file=sys.stderr) + self._request_failed = True err_resp_id = body.get("request_id") or body.get("id") or uid("resp") try: self.wfile.write(emit("response.failed", {"type": "response.failed", @@ -7411,7 +7726,7 @@ class Handler(http.server.BaseHTTPRequestHandler): while len(_last_reasoning_store) > _MAX_STORED: oldest = next(iter(_last_reasoning_store)) del _last_reasoning_store[oldest] - _record_usage(provider, model, success, time.time() - t0, error_type="length" if not success else None) + _record_usage_with_tokens(provider, model, success, time.time() - t0, _last_stream_usage, error_type="length" if not success else None, input_items=input_data, output_items=last_output) # Auto-learn provider quirks before flushing the bad response to Codex. if finish_reason == "length" and not has_content and has_function_call_output(input_data): @@ -7647,15 +7962,17 @@ class Handler(http.server.BaseHTTPRequestHandler): self.stream_buffered_events(collected_events) else: - result = oa_resp_to_responses(json.loads(upstream.read()), model) + raw_resp = json.loads(upstream.read()) + result = oa_resp_to_responses(raw_resp, model) success = result.get("status") != "incomplete" + _log_cache_stats(raw_resp) _crof_record(model, n_items, success) + _record_usage_with_tokens(provider, model, success, time.time() - t0, raw_resp, input_items=input_data, output_items=result.get("output", [])) self.send_json(200, result) rid = result.get("id") _log_resp(rid, result.get("status"), result.get("output", [])) if rid and input_data is not None: store_response(rid, input_data, result.get("output", [])) - _record_usage(provider, model, success, time.time() - t0) def _forward_oa_compat_retry(self, req, model, chat_body, body, input_data, tracker=None): try: @@ -7692,7 +8009,7 @@ class Handler(http.server.BaseHTTPRequestHandler): last_resp_id = d.get("response", {}).get("id") last_output = d.get("response", {}).get("output", []) last_status = d.get("response", {}).get("status") - except: pass + except (json.JSONDecodeError, KeyError, TypeError): pass return True self.stream_buffered_events(oa_stream_to_sse(upstream, model, body.get("request_id") or body.get("id")), on_event=on_event) except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError): @@ -7834,11 +8151,14 @@ class Handler(http.server.BaseHTTPRequestHandler): _save_schema(schema, model=model) + t0 = time.time() + provider = TARGET_URL.split("//")[-1].split("/")[0] + if stream: self.send_response(200) self.send_header("Content-Type", "text/event-stream") self.send_header("Cache-Control", "no-cache") - self.send_header("Connection", "keep-alive") + self.send_header("Connection", "close") self.end_headers() if hasattr(self, 'connection') and self.connection: try: @@ -7847,8 +8167,9 @@ class Handler(http.server.BaseHTTPRequestHandler): pass last_resp_id = None last_output = None + last_usage = {} def on_event(event): - nonlocal last_resp_id, last_output + nonlocal last_resp_id, last_output, last_usage if tracker and tracker.cancelled.is_set(): print("[command-code] stream cancelled", file=sys.stderr) return False @@ -7859,12 +8180,14 @@ class Handler(http.server.BaseHTTPRequestHandler): if d.get("type") == "response.completed": last_resp_id = d.get("response", {}).get("id") last_output = d.get("response", {}).get("output", []) - except: pass + last_usage = d.get("response", {}).get("usage", {}) + except (json.JSONDecodeError, KeyError, TypeError): pass return True try: self.stream_buffered_events(cc_stream_to_sse(upstream, model, body.get("request_id") or body.get("id")), on_event=on_event) except Exception as e: print(f"[{self._session_id}] stream error: {e}", file=sys.stderr) + self._request_failed = True try: err_event = 'data: ' + json.dumps({"type": "response.completed", "response": {"id": body.get("request_id") or body.get("id") or uid("resp"), @@ -7876,6 +8199,9 @@ class Handler(http.server.BaseHTTPRequestHandler): self.wfile.flush() except Exception: pass + + success = not self._request_failed + _record_usage_with_tokens(provider, model, success, time.time() - t0, last_usage, input_items=body.get("input", ""), output_items=last_output) if last_resp_id: store_response(last_resp_id, body.get("input", ""), last_output) else: @@ -7883,6 +8209,8 @@ class Handler(http.server.BaseHTTPRequestHandler): result = cc_resp_to_responses(raw, model) self.send_json(200, result) rid = result.get("id") + success = result.get("status") != "incomplete" + _record_usage_with_tokens(provider, model, success, time.time() - t0, result.get("usage", {}), input_items=body.get("input", ""), output_items=result.get("output", [])) if rid: store_response(rid, body.get("input", ""), result.get("output", [])) @@ -8451,12 +8779,18 @@ class Handler(http.server.BaseHTTPRequestHandler): return def _forward(self, req, stream, model, nonstream_fn, stream_fn, input_data=None, tracker=None): + t0 = time.time() + provider = TARGET_URL.split("//")[-1].split("/")[0] try: upstream = urllib.request.urlopen(req, timeout=_upstream_timeout({}, stream)) except urllib.error.HTTPError as e: err = e.read().decode() + self._request_failed = True + _record_usage(provider, model, False, time.time() - t0, error_type=f"HTTP_{e.code}", tokens_in=_estimate_tokens_from_items(input_data)) return self.send_json(e.code, {"error": {"type": "upstream_error", "message": err}}) except Exception as e: + self._request_failed = True + _record_usage(provider, model, False, time.time() - t0, error_type=type(e).__name__, tokens_in=_estimate_tokens_from_items(input_data)) return self.send_json(500, {"error": {"type": "proxy_error", "message": str(e)}}) if stream: @@ -8473,9 +8807,10 @@ class Handler(http.server.BaseHTTPRequestHandler): last_resp_id = None last_output = None last_status = None + _last_stream_usage = {} try: def on_event(event): - nonlocal last_resp_id, last_output, last_status + nonlocal last_resp_id, last_output, last_status, _last_stream_usage if tracker and tracker.cancelled.is_set(): print("[translate-proxy] stream cancelled", file=sys.stderr) return False @@ -8487,11 +8822,20 @@ class Handler(http.server.BaseHTTPRequestHandler): last_resp_id = d.get("response", {}).get("id") last_output = d.get("response", {}).get("output", []) last_status = d.get("response", {}).get("status") - except: pass + resp_usage = d.get("response", {}).get("usage") + if resp_usage: + _last_stream_usage = resp_usage + except (json.JSONDecodeError, KeyError, TypeError): pass return True self.stream_buffered_events(stream_fn(upstream), on_event=on_event) except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError): print("[translate-proxy] client disconnected during stream", file=sys.stderr) + except Exception as e: + print(f"[translate-proxy] stream error: {e}", file=sys.stderr) + self._request_failed = True + + success = (last_status == "completed") + _record_usage_with_tokens(provider, model, success, time.time() - t0, _last_stream_usage, input_items=input_data, output_items=last_output) _log_resp(last_resp_id, last_status or "client_disconnect", last_output) if last_resp_id and input_data is not None: store_response(last_resp_id, input_data, last_output) @@ -8499,11 +8843,16 @@ class Handler(http.server.BaseHTTPRequestHandler): result = nonstream_fn(upstream) self.send_json(200, result) rid = result.get("id") + success = result.get("status") != "incomplete" + _record_usage_with_tokens(provider, model, success, time.time() - t0, result.get("usage", {}), input_items=input_data, output_items=result.get("output", [])) _log_resp(rid, result.get("status"), result.get("output", [])) if rid and input_data is not None: store_response(rid, input_data, result.get("output", [])) def send_json(self, status, data): + self._last_status = status + if status >= 500: + self._request_failed = True try: body = json.dumps(data).encode() self.send_response(status) @@ -8695,8 +9044,9 @@ def main(): allow_reuse_address = True daemon_threads = True request_queue_size = 64 - SERVER = ReusableHTTPServer(("127.0.0.1", PORT), Handler) - print(f"translate-proxy ({BACKEND}) listening on http://127.0.0.1:{PORT}", flush=True) + BIND_HOST = getattr(_args, "host", None) or os.environ.get("CODEX_HOST", "127.0.0.1") + SERVER = ReusableHTTPServer((BIND_HOST, PORT), Handler) + print(f"translate-proxy ({BACKEND}) listening on http://{BIND_HOST}:{PORT}", flush=True) print(f"Target: {TARGET_URL}", flush=True) print(f"Models: {[m['id'] for m in MODELS]}", flush=True) if BACKEND in ("codebuff", "freebuff"):