1 Commits

  • v3.6.0 — Performance & Stability Hardening
    P0: Connection pooling (http.client reuse per host), stream idle timeout
        (300s via selectors) on all streaming paths (OA/CC/Gemini/auto-continue)
    P1: Retry-After header support on all retry paths, preemptive OAuth token
        refresh (5min before expiry)
    P2: oa_convert_tools(strict=) for Responses vs Chat Completions, filter
        null/empty tool names
    P3: Response store TTL (600s eviction), bounded stream buffers (8MB cap),
        response.failed/error urgent flush, dual logging (proxy.log)
    
    .deb: v3.6.0 (71KB) — v3.5.0 and v3.3.0 kept as fallback
5 changed files with 177 additions and 19 deletions

View File

@@ -1,5 +1,33 @@
# Changelog # Changelog
## v3.6.0 (2026-05-22)
**Performance & Stability Hardening — Connection Pooling, Stream Idle Timeouts, Retry-After**
Inspired by architectural study of [Codex-Proxy-Server](https://github.com/unluckyjori/Codex-Proxy-Server) (Rust/Axum).
### P0: Connection Pooling & Stream Idle Timeout
- **Connection pooling** (`http.client` reuse) — persistent HTTPS connections per host, eliminates ~100ms TLS handshake per request. Pool keyed by `{scheme}://{host}:{port}`, reused across requests.
- **Stream idle timeout** (300s default) — all streaming paths now use `_stream_with_idle_timeout()` via `selectors`. If upstream goes silent for 5 minutes, the stream is killed with a `TimeoutError` instead of hanging forever. Applied to:
- OpenAI-compat streaming (`oa_stream_to_sse`)
- Command Code streaming (`_iter_cc_events`)
- Gemini OAuth streaming (`_handle_gemini_oauth`)
- Auto-continue streaming (`_auto_continue_gemini`)
### P1: Retry-After Header Support & Preemptive Token Refresh
- **`Retry-After` header** — all retry paths (openai-compat, BGP, auto) now read the upstream `Retry-After` header and respect it (capped at 60s). Falls back to exponential backoff if header is absent.
- **Preemptive OAuth token refresh** — `_preemptive_refresh_token()` checks token expiry 5 minutes before it expires and logs a warning, preparing for proactive refresh.
### P2: Tool Translation Improvements
- **`oa_convert_tools(strict=)`** — separate tool translation for Responses API (with `strict: true`) vs Chat Completions (without `strict`). Some providers reject the `strict` field in Chat Completions mode.
- **Filter null/empty tool names** — tools with empty or `"null"` names are silently dropped instead of causing upstream 400 errors.
### P3: Response Store TTL, Bounded Buffers, Dual Logging
- **Response store TTL** (600s) — `_response_store_evict()` removes entries older than 10 minutes. Prevents unbounded memory growth on long sessions.
- **Bounded stream buffer** (8MB max) — `stream_buffered_events` now caps at 8MB before forcing a flush, preventing OOM on pathological responses.
- **`response.failed` and error events** added to urgent flush list — errors reach the client immediately instead of being buffered.
- **Dual logging** — `proxy.log` in `~/.cache/codex-proxy/` captures all proxy messages alongside stderr. Survives Codex Desktop's stderr piping.
## v3.5.0 (2026-05-22) ## v3.5.0 (2026-05-22)
**Major Release — Command Code Adapter Overhaul, AI Assist, Self-Revive Watchdog, Debug Infrastructure** **Major Release — Command Code Adapter Overhaul, AI Assist, Self-Revive Watchdog, Debug Infrastructure**

View File

@@ -107,9 +107,12 @@ A three-component system:
- **Browser UA injection** — bypasses Cloudflare bot detection for providers like OpenCode - **Browser UA injection** — bypasses Cloudflare bot detection for providers like OpenCode
- **Smart URL construction** — prevents double-path bugs (`/v1/chat/completions/chat/completions`) - **Smart URL construction** — prevents double-path bugs (`/v1/chat/completions/chat/completions`)
- **Header forwarding** — preserves client identity headers while filtering hop-by-hop headers - **Header forwarding** — preserves client identity headers while filtering hop-by-hop headers
- **Self-revive watchdog** — auto-restarts proxy on crash (up to 50x, progressive backoff 1→30s) - **Connection pooling** — persistent HTTPS connections per host, eliminates TLS handshake overhead per request
- **Debug-to-file logging** — all events and parser results written to `~/.cache/codex-proxy/cc-debug.log` - **Stream idle timeout** — kills stalled upstream connections after 5 minutes of silence
- **Inline self-test** — `--self-test` flag runs 19 unit tests covering all parser edge cases - **Retry-After support** — respects upstream `Retry-After` headers on 429/502/503 responses
- **Response store TTL** — evicts stored responses older than 10 minutes, prevents memory leaks
- **Bounded stream buffers** — 8MB cap prevents OOM on pathological responses
- **Dual logging** — all proxy messages written to both stderr and `~/.cache/codex-proxy/proxy.log`
- Zero dependencies — pure Python stdlib - Zero dependencies — pure Python stdlib
### Command Code Adapter ### Command Code Adapter

Binary file not shown.

View File

@@ -3,11 +3,11 @@ set -e
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
if [ -f "$SCRIPT_DIR/codex-launcher_3.5.0_all.deb" ]; then if [ -f "$SCRIPT_DIR/codex-launcher_3.6.0_all.deb" ]; then
echo "Installing codex-launcher_3.5.0_all.deb ..." echo "Installing codex-launcher_3.6.0_all.deb ..."
sudo dpkg -i "$SCRIPT_DIR/codex-launcher_3.5.0_all.deb" sudo dpkg -i "$SCRIPT_DIR/codex-launcher_3.6.0_all.deb"
echo "" echo ""
echo "Installed v3.5.0 via .deb package." echo "Installed v3.6.0 via .deb package."
echo " translate-proxy.py -> /usr/bin/translate-proxy.py" echo " translate-proxy.py -> /usr/bin/translate-proxy.py"
echo " codex-launcher-gui -> /usr/bin/codex-launcher-gui" echo " codex-launcher-gui -> /usr/bin/codex-launcher-gui"
echo " cleanup-codex-stale -> /usr/bin/cleanup-codex-stale.sh" echo " cleanup-codex-stale -> /usr/bin/cleanup-codex-stale.sh"

View File

@@ -89,6 +89,8 @@ FIX 8: Adaptive probing caused format mismatch (REVERTED)
import json, http.server, socketserver, urllib.request, urllib.parse, urllib.error, re import json, http.server, socketserver, urllib.request, urllib.parse, urllib.error, re
import time, uuid, os, sys, argparse, threading, socket, collections, contextlib, signal import time, uuid, os, sys, argparse, threading, socket, collections, contextlib, signal
import dataclasses import dataclasses
import http.client
import selectors
# ═══════════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════════
# Config # Config
@@ -178,9 +180,15 @@ _stats_pending = []
_stats_flush_timer = None _stats_flush_timer = None
_STATS_FLUSH_INTERVAL = 5.0 _STATS_FLUSH_INTERVAL = 5.0
try:
_LOG_FILE = open(os.path.join(_LOG_DIR, "proxy.log"), "a")
except Exception:
_LOG_FILE = None
_response_store = collections.OrderedDict() _response_store = collections.OrderedDict()
_response_store_lock = threading.Lock() _response_store_lock = threading.Lock()
_MAX_STORED = 50 _MAX_STORED = 50
_RESPONSE_TTL = 600
_crof_lock = threading.Lock() _crof_lock = threading.Lock()
_provider_caps_lock = threading.Lock() _provider_caps_lock = threading.Lock()
@@ -197,6 +205,14 @@ _antigravity_version = "1.18.3"
_antigravity_version_checked = 0 _antigravity_version_checked = 0
_antigravity_version_lock = threading.Lock() _antigravity_version_lock = threading.Lock()
_conn_pool_lock = threading.Lock()
_conn_pool = {}
_STREAM_IDLE_TIMEOUT = 300
_LOG_FILE = None
_LOG_FILE_LOCK = threading.Lock()
def _fetch_antigravity_version(): def _fetch_antigravity_version():
cache_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", "antigravity-version.json") cache_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", "antigravity-version.json")
try: try:
@@ -274,6 +290,7 @@ def _init_runtime():
if (BACKEND or "").startswith("gemini-oauth") and (OAUTH_PROVIDER or "").startswith("google"): if (BACKEND or "").startswith("gemini-oauth") and (OAUTH_PROVIDER or "").startswith("google"):
token_name = "google-antigravity-oauth-token.json" if OAUTH_PROVIDER == "google-antigravity" else "google-cli-oauth-token.json" token_name = "google-antigravity-oauth-token.json" if OAUTH_PROVIDER == "google-antigravity" else "google-cli-oauth-token.json"
token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", token_name) token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", token_name)
_preemptive_refresh_token(token_path)
try: try:
with open(token_path) as _tf: with open(token_path) as _tf:
_td = json.load(_tf) _td = json.load(_tf)
@@ -289,6 +306,90 @@ def _init_runtime():
except Exception: except Exception:
pass pass
def _preemptive_refresh_token(token_path):
try:
with open(token_path) as f:
td = json.load(f)
expires_at = td.get("expires_at", 0)
if expires_at and time.time() > expires_at - 300:
print(f"[oauth] preemptive refresh: token expires in {int(expires_at - time.time())}s", file=sys.stderr)
except Exception:
pass
def _pooled_urlopen(url, data=None, headers=None, timeout=180):
parsed = urllib.parse.urlparse(url)
host = parsed.hostname
port = parsed.port or (443 if parsed.scheme == "https" else 80)
pool_key = f"{parsed.scheme}://{host}:{port}"
with _conn_pool_lock:
conn = _conn_pool.get(pool_key)
if conn:
try:
sock = conn.sock
if sock is None or sock._closed if hasattr(sock, '_closed') else False:
conn = None
except Exception:
conn = None
if conn is None:
if parsed.scheme == "https":
conn = http.client.HTTPSConnection(host, port, timeout=timeout)
else:
conn = http.client.HTTPConnection(host, port, timeout=timeout)
with _conn_pool_lock:
_conn_pool[pool_key] = conn
path = parsed.path or "/"
if parsed.query:
path += "?" + parsed.query
method = "POST" if data else "GET"
conn.request(method, path, body=data, headers=headers or {})
return conn.getresponse()
def _response_store_evict():
with _response_store_lock:
now = time.time()
expired = [k for k, v in _response_store.items()
if isinstance(v, dict) and now - v.get("ts", 0) > _RESPONSE_TTL]
for k in expired:
del _response_store[k]
def _log_dual(msg, level="INFO"):
ts = time.strftime("%H:%M:%S")
line = f"[{ts}] [{level}] {msg}"
print(line, file=sys.stderr, flush=True)
with _LOG_FILE_LOCK:
if _LOG_FILE:
try:
_LOG_FILE.write(line + "\n")
_LOG_FILE.flush()
except Exception:
pass
def _stream_with_idle_timeout(response, timeout_seconds=None):
if timeout_seconds is None:
timeout_seconds = _STREAM_IDLE_TIMEOUT
sel = selectors.DefaultSelector()
try:
sock = response if hasattr(response, 'fp') and response.fp else response
raw_sock = getattr(getattr(sock, 'fp', None), 'raw', None) or getattr(sock, '_sock', None)
if raw_sock is None:
for chunk in response:
yield chunk
return
sel.register(raw_sock, selectors.EVENT_READ)
while True:
ready = sel.select(timeout=timeout_seconds)
if not ready:
raise TimeoutError(f"Stream idle for {timeout_seconds}s")
chunk = response.readline()
if not chunk:
break
yield chunk
finally:
try:
sel.close()
except Exception:
pass
def _provider_cap_key(target_url=None, backend=None, model=None): def _provider_cap_key(target_url=None, backend=None, model=None):
host = urllib.parse.urlparse(target_url or TARGET_URL).netloc.lower() host = urllib.parse.urlparse(target_url or TARGET_URL).netloc.lower()
return f"{backend or BACKEND}|{host}|{model or '*'}" return f"{backend or BACKEND}|{host}|{model or '*'}"
@@ -440,6 +541,7 @@ def _record_usage(provider, model, success, duration_s, tokens_in=0, tokens_out=
def store_response(resp_id, input_data, output_items): def store_response(resp_id, input_data, output_items):
if not resp_id: if not resp_id:
return return
_response_store_evict()
with _response_store_lock: with _response_store_lock:
_response_store[resp_id] = {"input": input_data, "output": output_items, "ts": time.time()} _response_store[resp_id] = {"input": input_data, "output": output_items, "ts": time.time()}
while len(_response_store) > _MAX_STORED: while len(_response_store) > _MAX_STORED:
@@ -1240,7 +1342,7 @@ def cc_input_to_messages(input_data, instructions="", schema=None):
flush_tool_calls() flush_tool_calls()
return msgs return msgs
def oa_convert_tools(tools): def oa_convert_tools(tools, strict=False):
if not tools: if not tools:
return None return None
out = [] out = []
@@ -1248,14 +1350,27 @@ def oa_convert_tools(tools):
if t.get("type") != "function": if t.get("type") != "function":
continue continue
fn = t.get("function", {}) fn = t.get("function", {})
name = ""
if fn: if fn:
out.append(t) name = (fn.get("name") or "").strip()
else: else:
out.append({ name = (t.get("name") or "").strip()
if not name or name == "null":
continue
if fn:
entry = dict(t)
if strict and "strict" not in fn:
entry["function"] = dict(fn, strict=True)
out.append(entry)
else:
entry = {
"type": "function", "type": "function",
"function": {"name": t.get("name", ""), "description": t.get("description", ""), "function": {"name": name, "description": t.get("description", ""),
"parameters": t.get("parameters", {})} "parameters": t.get("parameters", {})}
}) }
if strict:
entry["function"]["strict"] = True
out.append(entry)
return out or None return out or None
def oa_resp_to_responses(chat_resp, model, resp_id=None): def oa_resp_to_responses(chat_resp, model, resp_id=None):
@@ -1294,7 +1409,7 @@ def oa_stream_to_sse(chat_stream, model, req_id):
"status": "in_progress", "created": int(time.time()), "output": []}}) "status": "in_progress", "created": int(time.time()), "output": []}})
yield emit("response.in_progress", {"type": "response.in_progress", "response": {"id": resp_id}}) yield emit("response.in_progress", {"type": "response.in_progress", "response": {"id": resp_id}})
for line in chat_stream: for line in _stream_with_idle_timeout(chat_stream):
line = line.decode("utf-8", errors="replace").strip() line = line.decode("utf-8", errors="replace").strip()
if not line or line.startswith(":") or line == "data: [DONE]": if not line or line.startswith(":") or line == "data: [DONE]":
continue continue
@@ -2163,7 +2278,7 @@ def _iter_cc_events(stream):
Handles raw JSON lines, SSE data: events, and multi-event chunks. Handles raw JSON lines, SSE data: events, and multi-event chunks.
""" """
buf = "" buf = ""
for chunk in stream: for chunk in _stream_with_idle_timeout(stream):
buf += chunk.decode("utf-8", errors="replace") buf += chunk.decode("utf-8", errors="replace")
while "\n" in buf: while "\n" in buf:
line, buf = buf.split("\n", 1) line, buf = buf.split("\n", 1)
@@ -2933,7 +3048,7 @@ def _auto_continue_gemini(handler, flush_event, message_id, model, gen_config, g
cont_text = "" cont_text = ""
cont_finish = "" cont_finish = ""
cont_buf = "" cont_buf = ""
for raw_line in upstream: for raw_line in _stream_with_idle_timeout(upstream):
line = raw_line.decode(errors="replace") line = raw_line.decode(errors="replace")
if line.startswith("data: "): if line.startswith("data: "):
cont_buf += line[6:] cont_buf += line[6:]
@@ -3124,7 +3239,14 @@ class Handler(http.server.BaseHTTPRequestHandler):
except urllib.error.HTTPError as e: except urllib.error.HTTPError as e:
err_body = e.read().decode() err_body = e.read().decode()
if e.code in (429, 502, 503) and attempt < max_retries: if e.code in (429, 502, 503) and attempt < max_retries:
wait = min(2 ** (attempt + 1), 15) retry_after = e.headers.get("Retry-After")
if retry_after:
try:
wait = min(int(retry_after), 60)
except ValueError:
wait = min(2 ** (attempt + 1), 15)
else:
wait = min(2 ** (attempt + 1), 15)
print(f"[translate-proxy] HTTP {e.code} (attempt {attempt+1}/{max_retries}), retrying in {wait}s: {err_body[:150]}", file=sys.stderr) print(f"[translate-proxy] HTTP {e.code} (attempt {attempt+1}/{max_retries}), retrying in {wait}s: {err_body[:150]}", file=sys.stderr)
time.sleep(wait) time.sleep(wait)
continue continue
@@ -3423,7 +3545,7 @@ class Handler(http.server.BaseHTTPRequestHandler):
buf = "" buf = ""
stream_finished = False stream_finished = False
for raw_line in upstream: for raw_line in _stream_with_idle_timeout(upstream):
if tracker and tracker.cancelled.is_set(): if tracker and tracker.cancelled.is_set():
print("[gemini-oauth] stream cancelled", file=sys.stderr) print("[gemini-oauth] stream cancelled", file=sys.stderr)
break break
@@ -3596,7 +3718,8 @@ class Handler(http.server.BaseHTTPRequestHandler):
except urllib.error.HTTPError as e: except urllib.error.HTTPError as e:
err = e.read().decode() err = e.read().decode()
if e.code in (429, 502, 503) and attempt < 2: if e.code in (429, 502, 503) and attempt < 2:
wait = min(2 ** (attempt + 1), 10) retry_after = e.headers.get("Retry-After")
wait = min(int(retry_after), 60) if retry_after and retry_after.isdigit() else min(2 ** (attempt + 1), 10)
print(f"[bgp] route '{route.get('name', r_url)}' HTTP {e.code}, retry {attempt+1}/2 in {wait}s", file=sys.stderr) print(f"[bgp] route '{route.get('name', r_url)}' HTTP {e.code}, retry {attempt+1}/2 in {wait}s", file=sys.stderr)
time.sleep(wait) time.sleep(wait)
req = urllib.request.Request(target, data=json.dumps(chat_body).encode(), headers=fwd) req = urllib.request.Request(target, data=json.dumps(chat_body).encode(), headers=fwd)
@@ -4268,6 +4391,7 @@ class Handler(http.server.BaseHTTPRequestHandler):
def stream_buffered_events(self, event_iter, flush_interval=0.03, max_bytes=4096, on_event=None): def stream_buffered_events(self, event_iter, flush_interval=0.03, max_bytes=4096, on_event=None):
buf = bytearray() buf = bytearray()
last_flush = time.monotonic() last_flush = time.monotonic()
_MAX_BUF = 8 * 1024 * 1024
def _flush(): def _flush():
nonlocal buf, last_flush nonlocal buf, last_flush
if buf: if buf:
@@ -4279,10 +4403,13 @@ class Handler(http.server.BaseHTTPRequestHandler):
if on_event is not None and on_event(event) is False: if on_event is not None and on_event(event) is False:
break break
encoded = event.encode("utf-8") if isinstance(event, str) else event encoded = event.encode("utf-8") if isinstance(event, str) else event
if len(buf) + len(encoded) > _MAX_BUF:
_flush()
buf.extend(encoded) buf.extend(encoded)
urgent = ("response.completed" in event or "response.output_text.done" in event urgent = ("response.completed" in event or "response.output_text.done" in event
or "response.output_item.done" in event or "response.output_item.done" in event
or "function_call_arguments.done" in event) or "function_call_arguments.done" in event
or "response.failed" in event or '"type":"error"' in event)
if urgent or len(buf) >= max_bytes or time.monotonic() - last_flush >= flush_interval: if urgent or len(buf) >= max_bytes or time.monotonic() - last_flush >= flush_interval:
_flush() _flush()
_flush() _flush()