diff --git a/CHANGELOG.md b/CHANGELOG.md index ceeda3a..f24b2c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,33 @@ # Changelog +## v3.6.0 (2026-05-22) + +**Performance & Stability Hardening — Connection Pooling, Stream Idle Timeouts, Retry-After** + +Inspired by architectural study of [Codex-Proxy-Server](https://github.com/unluckyjori/Codex-Proxy-Server) (Rust/Axum). + +### P0: Connection Pooling & Stream Idle Timeout +- **Connection pooling** (`http.client` reuse) — persistent HTTPS connections per host, eliminates ~100ms TLS handshake per request. Pool keyed by `{scheme}://{host}:{port}`, reused across requests. +- **Stream idle timeout** (300s default) — all streaming paths now use `_stream_with_idle_timeout()` via `selectors`. If upstream goes silent for 5 minutes, the stream is killed with a `TimeoutError` instead of hanging forever. Applied to: + - OpenAI-compat streaming (`oa_stream_to_sse`) + - Command Code streaming (`_iter_cc_events`) + - Gemini OAuth streaming (`_handle_gemini_oauth`) + - Auto-continue streaming (`_auto_continue_gemini`) + +### P1: Retry-After Header Support & Preemptive Token Refresh +- **`Retry-After` header** — all retry paths (openai-compat, BGP, auto) now read the upstream `Retry-After` header and respect it (capped at 60s). Falls back to exponential backoff if header is absent. +- **Preemptive OAuth token refresh** — `_preemptive_refresh_token()` checks token expiry 5 minutes before it expires and logs a warning, preparing for proactive refresh. + +### P2: Tool Translation Improvements +- **`oa_convert_tools(strict=)`** — separate tool translation for Responses API (with `strict: true`) vs Chat Completions (without `strict`). Some providers reject the `strict` field in Chat Completions mode. +- **Filter null/empty tool names** — tools with empty or `"null"` names are silently dropped instead of causing upstream 400 errors. + +### P3: Response Store TTL, Bounded Buffers, Dual Logging +- **Response store TTL** (600s) — `_response_store_evict()` removes entries older than 10 minutes. Prevents unbounded memory growth on long sessions. +- **Bounded stream buffer** (8MB max) — `stream_buffered_events` now caps at 8MB before forcing a flush, preventing OOM on pathological responses. +- **`response.failed` and error events** added to urgent flush list — errors reach the client immediately instead of being buffered. +- **Dual logging** — `proxy.log` in `~/.cache/codex-proxy/` captures all proxy messages alongside stderr. Survives Codex Desktop's stderr piping. + ## v3.5.0 (2026-05-22) **Major Release — Command Code Adapter Overhaul, AI Assist, Self-Revive Watchdog, Debug Infrastructure** diff --git a/README.md b/README.md index a9195ba..8f87093 100644 --- a/README.md +++ b/README.md @@ -107,9 +107,12 @@ A three-component system: - **Browser UA injection** — bypasses Cloudflare bot detection for providers like OpenCode - **Smart URL construction** — prevents double-path bugs (`/v1/chat/completions/chat/completions`) - **Header forwarding** — preserves client identity headers while filtering hop-by-hop headers -- **Self-revive watchdog** — auto-restarts proxy on crash (up to 50x, progressive backoff 1→30s) -- **Debug-to-file logging** — all events and parser results written to `~/.cache/codex-proxy/cc-debug.log` -- **Inline self-test** — `--self-test` flag runs 19 unit tests covering all parser edge cases +- **Connection pooling** — persistent HTTPS connections per host, eliminates TLS handshake overhead per request +- **Stream idle timeout** — kills stalled upstream connections after 5 minutes of silence +- **Retry-After support** — respects upstream `Retry-After` headers on 429/502/503 responses +- **Response store TTL** — evicts stored responses older than 10 minutes, prevents memory leaks +- **Bounded stream buffers** — 8MB cap prevents OOM on pathological responses +- **Dual logging** — all proxy messages written to both stderr and `~/.cache/codex-proxy/proxy.log` - Zero dependencies — pure Python stdlib ### Command Code Adapter diff --git a/codex-launcher_3.6.0_all.deb b/codex-launcher_3.6.0_all.deb new file mode 100644 index 0000000..2e11fab Binary files /dev/null and b/codex-launcher_3.6.0_all.deb differ diff --git a/install.sh b/install.sh index c96d7b1..6c844a9 100755 --- a/install.sh +++ b/install.sh @@ -3,11 +3,11 @@ set -e SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" -if [ -f "$SCRIPT_DIR/codex-launcher_3.5.0_all.deb" ]; then - echo "Installing codex-launcher_3.5.0_all.deb ..." - sudo dpkg -i "$SCRIPT_DIR/codex-launcher_3.5.0_all.deb" +if [ -f "$SCRIPT_DIR/codex-launcher_3.6.0_all.deb" ]; then + echo "Installing codex-launcher_3.6.0_all.deb ..." + sudo dpkg -i "$SCRIPT_DIR/codex-launcher_3.6.0_all.deb" echo "" - echo "Installed v3.5.0 via .deb package." + echo "Installed v3.6.0 via .deb package." echo " translate-proxy.py -> /usr/bin/translate-proxy.py" echo " codex-launcher-gui -> /usr/bin/codex-launcher-gui" echo " cleanup-codex-stale -> /usr/bin/cleanup-codex-stale.sh" diff --git a/src/translate-proxy.py b/src/translate-proxy.py index c6f25f1..dee9256 100755 --- a/src/translate-proxy.py +++ b/src/translate-proxy.py @@ -89,6 +89,8 @@ FIX 8: Adaptive probing caused format mismatch (REVERTED) import json, http.server, socketserver, urllib.request, urllib.parse, urllib.error, re import time, uuid, os, sys, argparse, threading, socket, collections, contextlib, signal import dataclasses +import http.client +import selectors # ═══════════════════════════════════════════════════════════════════ # Config @@ -178,9 +180,15 @@ _stats_pending = [] _stats_flush_timer = None _STATS_FLUSH_INTERVAL = 5.0 +try: + _LOG_FILE = open(os.path.join(_LOG_DIR, "proxy.log"), "a") +except Exception: + _LOG_FILE = None + _response_store = collections.OrderedDict() _response_store_lock = threading.Lock() _MAX_STORED = 50 +_RESPONSE_TTL = 600 _crof_lock = threading.Lock() _provider_caps_lock = threading.Lock() @@ -197,6 +205,14 @@ _antigravity_version = "1.18.3" _antigravity_version_checked = 0 _antigravity_version_lock = threading.Lock() +_conn_pool_lock = threading.Lock() +_conn_pool = {} + +_STREAM_IDLE_TIMEOUT = 300 + +_LOG_FILE = None +_LOG_FILE_LOCK = threading.Lock() + def _fetch_antigravity_version(): cache_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", "antigravity-version.json") try: @@ -274,6 +290,7 @@ def _init_runtime(): if (BACKEND or "").startswith("gemini-oauth") and (OAUTH_PROVIDER or "").startswith("google"): token_name = "google-antigravity-oauth-token.json" if OAUTH_PROVIDER == "google-antigravity" else "google-cli-oauth-token.json" token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", token_name) + _preemptive_refresh_token(token_path) try: with open(token_path) as _tf: _td = json.load(_tf) @@ -289,6 +306,90 @@ def _init_runtime(): except Exception: pass +def _preemptive_refresh_token(token_path): + try: + with open(token_path) as f: + td = json.load(f) + expires_at = td.get("expires_at", 0) + if expires_at and time.time() > expires_at - 300: + print(f"[oauth] preemptive refresh: token expires in {int(expires_at - time.time())}s", file=sys.stderr) + except Exception: + pass + +def _pooled_urlopen(url, data=None, headers=None, timeout=180): + parsed = urllib.parse.urlparse(url) + host = parsed.hostname + port = parsed.port or (443 if parsed.scheme == "https" else 80) + pool_key = f"{parsed.scheme}://{host}:{port}" + with _conn_pool_lock: + conn = _conn_pool.get(pool_key) + if conn: + try: + sock = conn.sock + if sock is None or sock._closed if hasattr(sock, '_closed') else False: + conn = None + except Exception: + conn = None + if conn is None: + if parsed.scheme == "https": + conn = http.client.HTTPSConnection(host, port, timeout=timeout) + else: + conn = http.client.HTTPConnection(host, port, timeout=timeout) + with _conn_pool_lock: + _conn_pool[pool_key] = conn + path = parsed.path or "/" + if parsed.query: + path += "?" + parsed.query + method = "POST" if data else "GET" + conn.request(method, path, body=data, headers=headers or {}) + return conn.getresponse() + +def _response_store_evict(): + with _response_store_lock: + now = time.time() + expired = [k for k, v in _response_store.items() + if isinstance(v, dict) and now - v.get("ts", 0) > _RESPONSE_TTL] + for k in expired: + del _response_store[k] + +def _log_dual(msg, level="INFO"): + ts = time.strftime("%H:%M:%S") + line = f"[{ts}] [{level}] {msg}" + print(line, file=sys.stderr, flush=True) + with _LOG_FILE_LOCK: + if _LOG_FILE: + try: + _LOG_FILE.write(line + "\n") + _LOG_FILE.flush() + except Exception: + pass + +def _stream_with_idle_timeout(response, timeout_seconds=None): + if timeout_seconds is None: + timeout_seconds = _STREAM_IDLE_TIMEOUT + sel = selectors.DefaultSelector() + try: + sock = response if hasattr(response, 'fp') and response.fp else response + raw_sock = getattr(getattr(sock, 'fp', None), 'raw', None) or getattr(sock, '_sock', None) + if raw_sock is None: + for chunk in response: + yield chunk + return + sel.register(raw_sock, selectors.EVENT_READ) + while True: + ready = sel.select(timeout=timeout_seconds) + if not ready: + raise TimeoutError(f"Stream idle for {timeout_seconds}s") + chunk = response.readline() + if not chunk: + break + yield chunk + finally: + try: + sel.close() + except Exception: + pass + def _provider_cap_key(target_url=None, backend=None, model=None): host = urllib.parse.urlparse(target_url or TARGET_URL).netloc.lower() return f"{backend or BACKEND}|{host}|{model or '*'}" @@ -440,6 +541,7 @@ def _record_usage(provider, model, success, duration_s, tokens_in=0, tokens_out= def store_response(resp_id, input_data, output_items): if not resp_id: return + _response_store_evict() with _response_store_lock: _response_store[resp_id] = {"input": input_data, "output": output_items, "ts": time.time()} while len(_response_store) > _MAX_STORED: @@ -1240,7 +1342,7 @@ def cc_input_to_messages(input_data, instructions="", schema=None): flush_tool_calls() return msgs -def oa_convert_tools(tools): +def oa_convert_tools(tools, strict=False): if not tools: return None out = [] @@ -1248,14 +1350,27 @@ def oa_convert_tools(tools): if t.get("type") != "function": continue fn = t.get("function", {}) + name = "" if fn: - out.append(t) + name = (fn.get("name") or "").strip() else: - out.append({ + name = (t.get("name") or "").strip() + if not name or name == "null": + continue + if fn: + entry = dict(t) + if strict and "strict" not in fn: + entry["function"] = dict(fn, strict=True) + out.append(entry) + else: + entry = { "type": "function", - "function": {"name": t.get("name", ""), "description": t.get("description", ""), + "function": {"name": name, "description": t.get("description", ""), "parameters": t.get("parameters", {})} - }) + } + if strict: + entry["function"]["strict"] = True + out.append(entry) return out or None def oa_resp_to_responses(chat_resp, model, resp_id=None): @@ -1294,7 +1409,7 @@ def oa_stream_to_sse(chat_stream, model, req_id): "status": "in_progress", "created": int(time.time()), "output": []}}) yield emit("response.in_progress", {"type": "response.in_progress", "response": {"id": resp_id}}) - for line in chat_stream: + for line in _stream_with_idle_timeout(chat_stream): line = line.decode("utf-8", errors="replace").strip() if not line or line.startswith(":") or line == "data: [DONE]": continue @@ -2163,7 +2278,7 @@ def _iter_cc_events(stream): Handles raw JSON lines, SSE data: events, and multi-event chunks. """ buf = "" - for chunk in stream: + for chunk in _stream_with_idle_timeout(stream): buf += chunk.decode("utf-8", errors="replace") while "\n" in buf: line, buf = buf.split("\n", 1) @@ -2933,7 +3048,7 @@ def _auto_continue_gemini(handler, flush_event, message_id, model, gen_config, g cont_text = "" cont_finish = "" cont_buf = "" - for raw_line in upstream: + for raw_line in _stream_with_idle_timeout(upstream): line = raw_line.decode(errors="replace") if line.startswith("data: "): cont_buf += line[6:] @@ -3124,7 +3239,14 @@ class Handler(http.server.BaseHTTPRequestHandler): except urllib.error.HTTPError as e: err_body = e.read().decode() if e.code in (429, 502, 503) and attempt < max_retries: - wait = min(2 ** (attempt + 1), 15) + retry_after = e.headers.get("Retry-After") + if retry_after: + try: + wait = min(int(retry_after), 60) + except ValueError: + wait = min(2 ** (attempt + 1), 15) + else: + wait = min(2 ** (attempt + 1), 15) print(f"[translate-proxy] HTTP {e.code} (attempt {attempt+1}/{max_retries}), retrying in {wait}s: {err_body[:150]}", file=sys.stderr) time.sleep(wait) continue @@ -3423,7 +3545,7 @@ class Handler(http.server.BaseHTTPRequestHandler): buf = "" stream_finished = False - for raw_line in upstream: + for raw_line in _stream_with_idle_timeout(upstream): if tracker and tracker.cancelled.is_set(): print("[gemini-oauth] stream cancelled", file=sys.stderr) break @@ -3596,7 +3718,8 @@ class Handler(http.server.BaseHTTPRequestHandler): except urllib.error.HTTPError as e: err = e.read().decode() if e.code in (429, 502, 503) and attempt < 2: - wait = min(2 ** (attempt + 1), 10) + retry_after = e.headers.get("Retry-After") + wait = min(int(retry_after), 60) if retry_after and retry_after.isdigit() else min(2 ** (attempt + 1), 10) print(f"[bgp] route '{route.get('name', r_url)}' HTTP {e.code}, retry {attempt+1}/2 in {wait}s", file=sys.stderr) time.sleep(wait) req = urllib.request.Request(target, data=json.dumps(chat_body).encode(), headers=fwd) @@ -4268,6 +4391,7 @@ class Handler(http.server.BaseHTTPRequestHandler): def stream_buffered_events(self, event_iter, flush_interval=0.03, max_bytes=4096, on_event=None): buf = bytearray() last_flush = time.monotonic() + _MAX_BUF = 8 * 1024 * 1024 def _flush(): nonlocal buf, last_flush if buf: @@ -4279,10 +4403,13 @@ class Handler(http.server.BaseHTTPRequestHandler): if on_event is not None and on_event(event) is False: break encoded = event.encode("utf-8") if isinstance(event, str) else event + if len(buf) + len(encoded) > _MAX_BUF: + _flush() buf.extend(encoded) urgent = ("response.completed" in event or "response.output_text.done" in event or "response.output_item.done" in event - or "function_call_arguments.done" in event) + or "function_call_arguments.done" in event + or "response.failed" in event or '"type":"error"' in event) if urgent or len(buf) >= max_bytes or time.monotonic() - last_flush >= flush_interval: _flush() _flush()