v3.6.0 — Performance & Stability Hardening

P0: Connection pooling (http.client reuse per host), stream idle timeout
    (300s via selectors) on all streaming paths (OA/CC/Gemini/auto-continue)
P1: Retry-After header support on all retry paths, preemptive OAuth token
    refresh (5min before expiry)
P2: oa_convert_tools(strict=) for Responses vs Chat Completions, filter
    null/empty tool names
P3: Response store TTL (600s eviction), bounded stream buffers (8MB cap),
    response.failed/error urgent flush, dual logging (proxy.log)

.deb: v3.6.0 (71KB) — v3.5.0 and v3.3.0 kept as fallback
This commit is contained in:
admin
2026-05-22 13:14:51 +04:00
Unverified
parent 4bbfb4ada7
commit ee78d35aa7
5 changed files with 177 additions and 19 deletions

View File

@@ -89,6 +89,8 @@ FIX 8: Adaptive probing caused format mismatch (REVERTED)
import json, http.server, socketserver, urllib.request, urllib.parse, urllib.error, re
import time, uuid, os, sys, argparse, threading, socket, collections, contextlib, signal
import dataclasses
import http.client
import selectors
# ═══════════════════════════════════════════════════════════════════
# Config
@@ -178,9 +180,15 @@ _stats_pending = []
_stats_flush_timer = None
_STATS_FLUSH_INTERVAL = 5.0
try:
_LOG_FILE = open(os.path.join(_LOG_DIR, "proxy.log"), "a")
except Exception:
_LOG_FILE = None
_response_store = collections.OrderedDict()
_response_store_lock = threading.Lock()
_MAX_STORED = 50
_RESPONSE_TTL = 600
_crof_lock = threading.Lock()
_provider_caps_lock = threading.Lock()
@@ -197,6 +205,14 @@ _antigravity_version = "1.18.3"
_antigravity_version_checked = 0
_antigravity_version_lock = threading.Lock()
_conn_pool_lock = threading.Lock()
_conn_pool = {}
_STREAM_IDLE_TIMEOUT = 300
_LOG_FILE = None
_LOG_FILE_LOCK = threading.Lock()
def _fetch_antigravity_version():
cache_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", "antigravity-version.json")
try:
@@ -274,6 +290,7 @@ def _init_runtime():
if (BACKEND or "").startswith("gemini-oauth") and (OAUTH_PROVIDER or "").startswith("google"):
token_name = "google-antigravity-oauth-token.json" if OAUTH_PROVIDER == "google-antigravity" else "google-cli-oauth-token.json"
token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", token_name)
_preemptive_refresh_token(token_path)
try:
with open(token_path) as _tf:
_td = json.load(_tf)
@@ -289,6 +306,90 @@ def _init_runtime():
except Exception:
pass
def _preemptive_refresh_token(token_path):
try:
with open(token_path) as f:
td = json.load(f)
expires_at = td.get("expires_at", 0)
if expires_at and time.time() > expires_at - 300:
print(f"[oauth] preemptive refresh: token expires in {int(expires_at - time.time())}s", file=sys.stderr)
except Exception:
pass
def _pooled_urlopen(url, data=None, headers=None, timeout=180):
parsed = urllib.parse.urlparse(url)
host = parsed.hostname
port = parsed.port or (443 if parsed.scheme == "https" else 80)
pool_key = f"{parsed.scheme}://{host}:{port}"
with _conn_pool_lock:
conn = _conn_pool.get(pool_key)
if conn:
try:
sock = conn.sock
if sock is None or sock._closed if hasattr(sock, '_closed') else False:
conn = None
except Exception:
conn = None
if conn is None:
if parsed.scheme == "https":
conn = http.client.HTTPSConnection(host, port, timeout=timeout)
else:
conn = http.client.HTTPConnection(host, port, timeout=timeout)
with _conn_pool_lock:
_conn_pool[pool_key] = conn
path = parsed.path or "/"
if parsed.query:
path += "?" + parsed.query
method = "POST" if data else "GET"
conn.request(method, path, body=data, headers=headers or {})
return conn.getresponse()
def _response_store_evict():
with _response_store_lock:
now = time.time()
expired = [k for k, v in _response_store.items()
if isinstance(v, dict) and now - v.get("ts", 0) > _RESPONSE_TTL]
for k in expired:
del _response_store[k]
def _log_dual(msg, level="INFO"):
ts = time.strftime("%H:%M:%S")
line = f"[{ts}] [{level}] {msg}"
print(line, file=sys.stderr, flush=True)
with _LOG_FILE_LOCK:
if _LOG_FILE:
try:
_LOG_FILE.write(line + "\n")
_LOG_FILE.flush()
except Exception:
pass
def _stream_with_idle_timeout(response, timeout_seconds=None):
if timeout_seconds is None:
timeout_seconds = _STREAM_IDLE_TIMEOUT
sel = selectors.DefaultSelector()
try:
sock = response if hasattr(response, 'fp') and response.fp else response
raw_sock = getattr(getattr(sock, 'fp', None), 'raw', None) or getattr(sock, '_sock', None)
if raw_sock is None:
for chunk in response:
yield chunk
return
sel.register(raw_sock, selectors.EVENT_READ)
while True:
ready = sel.select(timeout=timeout_seconds)
if not ready:
raise TimeoutError(f"Stream idle for {timeout_seconds}s")
chunk = response.readline()
if not chunk:
break
yield chunk
finally:
try:
sel.close()
except Exception:
pass
def _provider_cap_key(target_url=None, backend=None, model=None):
host = urllib.parse.urlparse(target_url or TARGET_URL).netloc.lower()
return f"{backend or BACKEND}|{host}|{model or '*'}"
@@ -440,6 +541,7 @@ def _record_usage(provider, model, success, duration_s, tokens_in=0, tokens_out=
def store_response(resp_id, input_data, output_items):
if not resp_id:
return
_response_store_evict()
with _response_store_lock:
_response_store[resp_id] = {"input": input_data, "output": output_items, "ts": time.time()}
while len(_response_store) > _MAX_STORED:
@@ -1240,7 +1342,7 @@ def cc_input_to_messages(input_data, instructions="", schema=None):
flush_tool_calls()
return msgs
def oa_convert_tools(tools):
def oa_convert_tools(tools, strict=False):
if not tools:
return None
out = []
@@ -1248,14 +1350,27 @@ def oa_convert_tools(tools):
if t.get("type") != "function":
continue
fn = t.get("function", {})
name = ""
if fn:
out.append(t)
name = (fn.get("name") or "").strip()
else:
out.append({
name = (t.get("name") or "").strip()
if not name or name == "null":
continue
if fn:
entry = dict(t)
if strict and "strict" not in fn:
entry["function"] = dict(fn, strict=True)
out.append(entry)
else:
entry = {
"type": "function",
"function": {"name": t.get("name", ""), "description": t.get("description", ""),
"function": {"name": name, "description": t.get("description", ""),
"parameters": t.get("parameters", {})}
})
}
if strict:
entry["function"]["strict"] = True
out.append(entry)
return out or None
def oa_resp_to_responses(chat_resp, model, resp_id=None):
@@ -1294,7 +1409,7 @@ def oa_stream_to_sse(chat_stream, model, req_id):
"status": "in_progress", "created": int(time.time()), "output": []}})
yield emit("response.in_progress", {"type": "response.in_progress", "response": {"id": resp_id}})
for line in chat_stream:
for line in _stream_with_idle_timeout(chat_stream):
line = line.decode("utf-8", errors="replace").strip()
if not line or line.startswith(":") or line == "data: [DONE]":
continue
@@ -2163,7 +2278,7 @@ def _iter_cc_events(stream):
Handles raw JSON lines, SSE data: events, and multi-event chunks.
"""
buf = ""
for chunk in stream:
for chunk in _stream_with_idle_timeout(stream):
buf += chunk.decode("utf-8", errors="replace")
while "\n" in buf:
line, buf = buf.split("\n", 1)
@@ -2933,7 +3048,7 @@ def _auto_continue_gemini(handler, flush_event, message_id, model, gen_config, g
cont_text = ""
cont_finish = ""
cont_buf = ""
for raw_line in upstream:
for raw_line in _stream_with_idle_timeout(upstream):
line = raw_line.decode(errors="replace")
if line.startswith("data: "):
cont_buf += line[6:]
@@ -3124,7 +3239,14 @@ class Handler(http.server.BaseHTTPRequestHandler):
except urllib.error.HTTPError as e:
err_body = e.read().decode()
if e.code in (429, 502, 503) and attempt < max_retries:
wait = min(2 ** (attempt + 1), 15)
retry_after = e.headers.get("Retry-After")
if retry_after:
try:
wait = min(int(retry_after), 60)
except ValueError:
wait = min(2 ** (attempt + 1), 15)
else:
wait = min(2 ** (attempt + 1), 15)
print(f"[translate-proxy] HTTP {e.code} (attempt {attempt+1}/{max_retries}), retrying in {wait}s: {err_body[:150]}", file=sys.stderr)
time.sleep(wait)
continue
@@ -3423,7 +3545,7 @@ class Handler(http.server.BaseHTTPRequestHandler):
buf = ""
stream_finished = False
for raw_line in upstream:
for raw_line in _stream_with_idle_timeout(upstream):
if tracker and tracker.cancelled.is_set():
print("[gemini-oauth] stream cancelled", file=sys.stderr)
break
@@ -3596,7 +3718,8 @@ class Handler(http.server.BaseHTTPRequestHandler):
except urllib.error.HTTPError as e:
err = e.read().decode()
if e.code in (429, 502, 503) and attempt < 2:
wait = min(2 ** (attempt + 1), 10)
retry_after = e.headers.get("Retry-After")
wait = min(int(retry_after), 60) if retry_after and retry_after.isdigit() else min(2 ** (attempt + 1), 10)
print(f"[bgp] route '{route.get('name', r_url)}' HTTP {e.code}, retry {attempt+1}/2 in {wait}s", file=sys.stderr)
time.sleep(wait)
req = urllib.request.Request(target, data=json.dumps(chat_body).encode(), headers=fwd)
@@ -4268,6 +4391,7 @@ class Handler(http.server.BaseHTTPRequestHandler):
def stream_buffered_events(self, event_iter, flush_interval=0.03, max_bytes=4096, on_event=None):
buf = bytearray()
last_flush = time.monotonic()
_MAX_BUF = 8 * 1024 * 1024
def _flush():
nonlocal buf, last_flush
if buf:
@@ -4279,10 +4403,13 @@ class Handler(http.server.BaseHTTPRequestHandler):
if on_event is not None and on_event(event) is False:
break
encoded = event.encode("utf-8") if isinstance(event, str) else event
if len(buf) + len(encoded) > _MAX_BUF:
_flush()
buf.extend(encoded)
urgent = ("response.completed" in event or "response.output_text.done" in event
or "response.output_item.done" in event
or "function_call_arguments.done" in event)
or "function_call_arguments.done" in event
or "response.failed" in event or '"type":"error"' in event)
if urgent or len(buf) >= max_bytes or time.monotonic() - last_flush >= flush_interval:
_flush()
_flush()