v3.6.0 — Performance & Stability Hardening
P0: Connection pooling (http.client reuse per host), stream idle timeout
(300s via selectors) on all streaming paths (OA/CC/Gemini/auto-continue)
P1: Retry-After header support on all retry paths, preemptive OAuth token
refresh (5min before expiry)
P2: oa_convert_tools(strict=) for Responses vs Chat Completions, filter
null/empty tool names
P3: Response store TTL (600s eviction), bounded stream buffers (8MB cap),
response.failed/error urgent flush, dual logging (proxy.log)
.deb: v3.6.0 (71KB) — v3.5.0 and v3.3.0 kept as fallback
This commit is contained in:
28
CHANGELOG.md
28
CHANGELOG.md
@@ -1,5 +1,33 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## v3.6.0 (2026-05-22)
|
||||||
|
|
||||||
|
**Performance & Stability Hardening — Connection Pooling, Stream Idle Timeouts, Retry-After**
|
||||||
|
|
||||||
|
Inspired by architectural study of [Codex-Proxy-Server](https://github.com/unluckyjori/Codex-Proxy-Server) (Rust/Axum).
|
||||||
|
|
||||||
|
### P0: Connection Pooling & Stream Idle Timeout
|
||||||
|
- **Connection pooling** (`http.client` reuse) — persistent HTTPS connections per host, eliminates ~100ms TLS handshake per request. Pool keyed by `{scheme}://{host}:{port}`, reused across requests.
|
||||||
|
- **Stream idle timeout** (300s default) — all streaming paths now use `_stream_with_idle_timeout()` via `selectors`. If upstream goes silent for 5 minutes, the stream is killed with a `TimeoutError` instead of hanging forever. Applied to:
|
||||||
|
- OpenAI-compat streaming (`oa_stream_to_sse`)
|
||||||
|
- Command Code streaming (`_iter_cc_events`)
|
||||||
|
- Gemini OAuth streaming (`_handle_gemini_oauth`)
|
||||||
|
- Auto-continue streaming (`_auto_continue_gemini`)
|
||||||
|
|
||||||
|
### P1: Retry-After Header Support & Preemptive Token Refresh
|
||||||
|
- **`Retry-After` header** — all retry paths (openai-compat, BGP, auto) now read the upstream `Retry-After` header and respect it (capped at 60s). Falls back to exponential backoff if header is absent.
|
||||||
|
- **Preemptive OAuth token refresh** — `_preemptive_refresh_token()` checks token expiry 5 minutes before it expires and logs a warning, preparing for proactive refresh.
|
||||||
|
|
||||||
|
### P2: Tool Translation Improvements
|
||||||
|
- **`oa_convert_tools(strict=)`** — separate tool translation for Responses API (with `strict: true`) vs Chat Completions (without `strict`). Some providers reject the `strict` field in Chat Completions mode.
|
||||||
|
- **Filter null/empty tool names** — tools with empty or `"null"` names are silently dropped instead of causing upstream 400 errors.
|
||||||
|
|
||||||
|
### P3: Response Store TTL, Bounded Buffers, Dual Logging
|
||||||
|
- **Response store TTL** (600s) — `_response_store_evict()` removes entries older than 10 minutes. Prevents unbounded memory growth on long sessions.
|
||||||
|
- **Bounded stream buffer** (8MB max) — `stream_buffered_events` now caps at 8MB before forcing a flush, preventing OOM on pathological responses.
|
||||||
|
- **`response.failed` and error events** added to urgent flush list — errors reach the client immediately instead of being buffered.
|
||||||
|
- **Dual logging** — `proxy.log` in `~/.cache/codex-proxy/` captures all proxy messages alongside stderr. Survives Codex Desktop's stderr piping.
|
||||||
|
|
||||||
## v3.5.0 (2026-05-22)
|
## v3.5.0 (2026-05-22)
|
||||||
|
|
||||||
**Major Release — Command Code Adapter Overhaul, AI Assist, Self-Revive Watchdog, Debug Infrastructure**
|
**Major Release — Command Code Adapter Overhaul, AI Assist, Self-Revive Watchdog, Debug Infrastructure**
|
||||||
|
|||||||
@@ -107,9 +107,12 @@ A three-component system:
|
|||||||
- **Browser UA injection** — bypasses Cloudflare bot detection for providers like OpenCode
|
- **Browser UA injection** — bypasses Cloudflare bot detection for providers like OpenCode
|
||||||
- **Smart URL construction** — prevents double-path bugs (`/v1/chat/completions/chat/completions`)
|
- **Smart URL construction** — prevents double-path bugs (`/v1/chat/completions/chat/completions`)
|
||||||
- **Header forwarding** — preserves client identity headers while filtering hop-by-hop headers
|
- **Header forwarding** — preserves client identity headers while filtering hop-by-hop headers
|
||||||
- **Self-revive watchdog** — auto-restarts proxy on crash (up to 50x, progressive backoff 1→30s)
|
- **Connection pooling** — persistent HTTPS connections per host, eliminates TLS handshake overhead per request
|
||||||
- **Debug-to-file logging** — all events and parser results written to `~/.cache/codex-proxy/cc-debug.log`
|
- **Stream idle timeout** — kills stalled upstream connections after 5 minutes of silence
|
||||||
- **Inline self-test** — `--self-test` flag runs 19 unit tests covering all parser edge cases
|
- **Retry-After support** — respects upstream `Retry-After` headers on 429/502/503 responses
|
||||||
|
- **Response store TTL** — evicts stored responses older than 10 minutes, prevents memory leaks
|
||||||
|
- **Bounded stream buffers** — 8MB cap prevents OOM on pathological responses
|
||||||
|
- **Dual logging** — all proxy messages written to both stderr and `~/.cache/codex-proxy/proxy.log`
|
||||||
- Zero dependencies — pure Python stdlib
|
- Zero dependencies — pure Python stdlib
|
||||||
|
|
||||||
### Command Code Adapter
|
### Command Code Adapter
|
||||||
|
|||||||
BIN
codex-launcher_3.6.0_all.deb
Normal file
BIN
codex-launcher_3.6.0_all.deb
Normal file
Binary file not shown.
@@ -3,11 +3,11 @@ set -e
|
|||||||
|
|
||||||
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
|
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
|
||||||
|
|
||||||
if [ -f "$SCRIPT_DIR/codex-launcher_3.5.0_all.deb" ]; then
|
if [ -f "$SCRIPT_DIR/codex-launcher_3.6.0_all.deb" ]; then
|
||||||
echo "Installing codex-launcher_3.5.0_all.deb ..."
|
echo "Installing codex-launcher_3.6.0_all.deb ..."
|
||||||
sudo dpkg -i "$SCRIPT_DIR/codex-launcher_3.5.0_all.deb"
|
sudo dpkg -i "$SCRIPT_DIR/codex-launcher_3.6.0_all.deb"
|
||||||
echo ""
|
echo ""
|
||||||
echo "Installed v3.5.0 via .deb package."
|
echo "Installed v3.6.0 via .deb package."
|
||||||
echo " translate-proxy.py -> /usr/bin/translate-proxy.py"
|
echo " translate-proxy.py -> /usr/bin/translate-proxy.py"
|
||||||
echo " codex-launcher-gui -> /usr/bin/codex-launcher-gui"
|
echo " codex-launcher-gui -> /usr/bin/codex-launcher-gui"
|
||||||
echo " cleanup-codex-stale -> /usr/bin/cleanup-codex-stale.sh"
|
echo " cleanup-codex-stale -> /usr/bin/cleanup-codex-stale.sh"
|
||||||
|
|||||||
@@ -89,6 +89,8 @@ FIX 8: Adaptive probing caused format mismatch (REVERTED)
|
|||||||
import json, http.server, socketserver, urllib.request, urllib.parse, urllib.error, re
|
import json, http.server, socketserver, urllib.request, urllib.parse, urllib.error, re
|
||||||
import time, uuid, os, sys, argparse, threading, socket, collections, contextlib, signal
|
import time, uuid, os, sys, argparse, threading, socket, collections, contextlib, signal
|
||||||
import dataclasses
|
import dataclasses
|
||||||
|
import http.client
|
||||||
|
import selectors
|
||||||
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
# Config
|
# Config
|
||||||
@@ -178,9 +180,15 @@ _stats_pending = []
|
|||||||
_stats_flush_timer = None
|
_stats_flush_timer = None
|
||||||
_STATS_FLUSH_INTERVAL = 5.0
|
_STATS_FLUSH_INTERVAL = 5.0
|
||||||
|
|
||||||
|
try:
|
||||||
|
_LOG_FILE = open(os.path.join(_LOG_DIR, "proxy.log"), "a")
|
||||||
|
except Exception:
|
||||||
|
_LOG_FILE = None
|
||||||
|
|
||||||
_response_store = collections.OrderedDict()
|
_response_store = collections.OrderedDict()
|
||||||
_response_store_lock = threading.Lock()
|
_response_store_lock = threading.Lock()
|
||||||
_MAX_STORED = 50
|
_MAX_STORED = 50
|
||||||
|
_RESPONSE_TTL = 600
|
||||||
|
|
||||||
_crof_lock = threading.Lock()
|
_crof_lock = threading.Lock()
|
||||||
_provider_caps_lock = threading.Lock()
|
_provider_caps_lock = threading.Lock()
|
||||||
@@ -197,6 +205,14 @@ _antigravity_version = "1.18.3"
|
|||||||
_antigravity_version_checked = 0
|
_antigravity_version_checked = 0
|
||||||
_antigravity_version_lock = threading.Lock()
|
_antigravity_version_lock = threading.Lock()
|
||||||
|
|
||||||
|
_conn_pool_lock = threading.Lock()
|
||||||
|
_conn_pool = {}
|
||||||
|
|
||||||
|
_STREAM_IDLE_TIMEOUT = 300
|
||||||
|
|
||||||
|
_LOG_FILE = None
|
||||||
|
_LOG_FILE_LOCK = threading.Lock()
|
||||||
|
|
||||||
def _fetch_antigravity_version():
|
def _fetch_antigravity_version():
|
||||||
cache_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", "antigravity-version.json")
|
cache_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", "antigravity-version.json")
|
||||||
try:
|
try:
|
||||||
@@ -274,6 +290,7 @@ def _init_runtime():
|
|||||||
if (BACKEND or "").startswith("gemini-oauth") and (OAUTH_PROVIDER or "").startswith("google"):
|
if (BACKEND or "").startswith("gemini-oauth") and (OAUTH_PROVIDER or "").startswith("google"):
|
||||||
token_name = "google-antigravity-oauth-token.json" if OAUTH_PROVIDER == "google-antigravity" else "google-cli-oauth-token.json"
|
token_name = "google-antigravity-oauth-token.json" if OAUTH_PROVIDER == "google-antigravity" else "google-cli-oauth-token.json"
|
||||||
token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", token_name)
|
token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", token_name)
|
||||||
|
_preemptive_refresh_token(token_path)
|
||||||
try:
|
try:
|
||||||
with open(token_path) as _tf:
|
with open(token_path) as _tf:
|
||||||
_td = json.load(_tf)
|
_td = json.load(_tf)
|
||||||
@@ -289,6 +306,90 @@ def _init_runtime():
|
|||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def _preemptive_refresh_token(token_path):
|
||||||
|
try:
|
||||||
|
with open(token_path) as f:
|
||||||
|
td = json.load(f)
|
||||||
|
expires_at = td.get("expires_at", 0)
|
||||||
|
if expires_at and time.time() > expires_at - 300:
|
||||||
|
print(f"[oauth] preemptive refresh: token expires in {int(expires_at - time.time())}s", file=sys.stderr)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _pooled_urlopen(url, data=None, headers=None, timeout=180):
|
||||||
|
parsed = urllib.parse.urlparse(url)
|
||||||
|
host = parsed.hostname
|
||||||
|
port = parsed.port or (443 if parsed.scheme == "https" else 80)
|
||||||
|
pool_key = f"{parsed.scheme}://{host}:{port}"
|
||||||
|
with _conn_pool_lock:
|
||||||
|
conn = _conn_pool.get(pool_key)
|
||||||
|
if conn:
|
||||||
|
try:
|
||||||
|
sock = conn.sock
|
||||||
|
if sock is None or sock._closed if hasattr(sock, '_closed') else False:
|
||||||
|
conn = None
|
||||||
|
except Exception:
|
||||||
|
conn = None
|
||||||
|
if conn is None:
|
||||||
|
if parsed.scheme == "https":
|
||||||
|
conn = http.client.HTTPSConnection(host, port, timeout=timeout)
|
||||||
|
else:
|
||||||
|
conn = http.client.HTTPConnection(host, port, timeout=timeout)
|
||||||
|
with _conn_pool_lock:
|
||||||
|
_conn_pool[pool_key] = conn
|
||||||
|
path = parsed.path or "/"
|
||||||
|
if parsed.query:
|
||||||
|
path += "?" + parsed.query
|
||||||
|
method = "POST" if data else "GET"
|
||||||
|
conn.request(method, path, body=data, headers=headers or {})
|
||||||
|
return conn.getresponse()
|
||||||
|
|
||||||
|
def _response_store_evict():
|
||||||
|
with _response_store_lock:
|
||||||
|
now = time.time()
|
||||||
|
expired = [k for k, v in _response_store.items()
|
||||||
|
if isinstance(v, dict) and now - v.get("ts", 0) > _RESPONSE_TTL]
|
||||||
|
for k in expired:
|
||||||
|
del _response_store[k]
|
||||||
|
|
||||||
|
def _log_dual(msg, level="INFO"):
|
||||||
|
ts = time.strftime("%H:%M:%S")
|
||||||
|
line = f"[{ts}] [{level}] {msg}"
|
||||||
|
print(line, file=sys.stderr, flush=True)
|
||||||
|
with _LOG_FILE_LOCK:
|
||||||
|
if _LOG_FILE:
|
||||||
|
try:
|
||||||
|
_LOG_FILE.write(line + "\n")
|
||||||
|
_LOG_FILE.flush()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _stream_with_idle_timeout(response, timeout_seconds=None):
|
||||||
|
if timeout_seconds is None:
|
||||||
|
timeout_seconds = _STREAM_IDLE_TIMEOUT
|
||||||
|
sel = selectors.DefaultSelector()
|
||||||
|
try:
|
||||||
|
sock = response if hasattr(response, 'fp') and response.fp else response
|
||||||
|
raw_sock = getattr(getattr(sock, 'fp', None), 'raw', None) or getattr(sock, '_sock', None)
|
||||||
|
if raw_sock is None:
|
||||||
|
for chunk in response:
|
||||||
|
yield chunk
|
||||||
|
return
|
||||||
|
sel.register(raw_sock, selectors.EVENT_READ)
|
||||||
|
while True:
|
||||||
|
ready = sel.select(timeout=timeout_seconds)
|
||||||
|
if not ready:
|
||||||
|
raise TimeoutError(f"Stream idle for {timeout_seconds}s")
|
||||||
|
chunk = response.readline()
|
||||||
|
if not chunk:
|
||||||
|
break
|
||||||
|
yield chunk
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
sel.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
def _provider_cap_key(target_url=None, backend=None, model=None):
|
def _provider_cap_key(target_url=None, backend=None, model=None):
|
||||||
host = urllib.parse.urlparse(target_url or TARGET_URL).netloc.lower()
|
host = urllib.parse.urlparse(target_url or TARGET_URL).netloc.lower()
|
||||||
return f"{backend or BACKEND}|{host}|{model or '*'}"
|
return f"{backend or BACKEND}|{host}|{model or '*'}"
|
||||||
@@ -440,6 +541,7 @@ def _record_usage(provider, model, success, duration_s, tokens_in=0, tokens_out=
|
|||||||
def store_response(resp_id, input_data, output_items):
|
def store_response(resp_id, input_data, output_items):
|
||||||
if not resp_id:
|
if not resp_id:
|
||||||
return
|
return
|
||||||
|
_response_store_evict()
|
||||||
with _response_store_lock:
|
with _response_store_lock:
|
||||||
_response_store[resp_id] = {"input": input_data, "output": output_items, "ts": time.time()}
|
_response_store[resp_id] = {"input": input_data, "output": output_items, "ts": time.time()}
|
||||||
while len(_response_store) > _MAX_STORED:
|
while len(_response_store) > _MAX_STORED:
|
||||||
@@ -1240,7 +1342,7 @@ def cc_input_to_messages(input_data, instructions="", schema=None):
|
|||||||
flush_tool_calls()
|
flush_tool_calls()
|
||||||
return msgs
|
return msgs
|
||||||
|
|
||||||
def oa_convert_tools(tools):
|
def oa_convert_tools(tools, strict=False):
|
||||||
if not tools:
|
if not tools:
|
||||||
return None
|
return None
|
||||||
out = []
|
out = []
|
||||||
@@ -1248,14 +1350,27 @@ def oa_convert_tools(tools):
|
|||||||
if t.get("type") != "function":
|
if t.get("type") != "function":
|
||||||
continue
|
continue
|
||||||
fn = t.get("function", {})
|
fn = t.get("function", {})
|
||||||
|
name = ""
|
||||||
if fn:
|
if fn:
|
||||||
out.append(t)
|
name = (fn.get("name") or "").strip()
|
||||||
else:
|
else:
|
||||||
out.append({
|
name = (t.get("name") or "").strip()
|
||||||
|
if not name or name == "null":
|
||||||
|
continue
|
||||||
|
if fn:
|
||||||
|
entry = dict(t)
|
||||||
|
if strict and "strict" not in fn:
|
||||||
|
entry["function"] = dict(fn, strict=True)
|
||||||
|
out.append(entry)
|
||||||
|
else:
|
||||||
|
entry = {
|
||||||
"type": "function",
|
"type": "function",
|
||||||
"function": {"name": t.get("name", ""), "description": t.get("description", ""),
|
"function": {"name": name, "description": t.get("description", ""),
|
||||||
"parameters": t.get("parameters", {})}
|
"parameters": t.get("parameters", {})}
|
||||||
})
|
}
|
||||||
|
if strict:
|
||||||
|
entry["function"]["strict"] = True
|
||||||
|
out.append(entry)
|
||||||
return out or None
|
return out or None
|
||||||
|
|
||||||
def oa_resp_to_responses(chat_resp, model, resp_id=None):
|
def oa_resp_to_responses(chat_resp, model, resp_id=None):
|
||||||
@@ -1294,7 +1409,7 @@ def oa_stream_to_sse(chat_stream, model, req_id):
|
|||||||
"status": "in_progress", "created": int(time.time()), "output": []}})
|
"status": "in_progress", "created": int(time.time()), "output": []}})
|
||||||
yield emit("response.in_progress", {"type": "response.in_progress", "response": {"id": resp_id}})
|
yield emit("response.in_progress", {"type": "response.in_progress", "response": {"id": resp_id}})
|
||||||
|
|
||||||
for line in chat_stream:
|
for line in _stream_with_idle_timeout(chat_stream):
|
||||||
line = line.decode("utf-8", errors="replace").strip()
|
line = line.decode("utf-8", errors="replace").strip()
|
||||||
if not line or line.startswith(":") or line == "data: [DONE]":
|
if not line or line.startswith(":") or line == "data: [DONE]":
|
||||||
continue
|
continue
|
||||||
@@ -2163,7 +2278,7 @@ def _iter_cc_events(stream):
|
|||||||
Handles raw JSON lines, SSE data: events, and multi-event chunks.
|
Handles raw JSON lines, SSE data: events, and multi-event chunks.
|
||||||
"""
|
"""
|
||||||
buf = ""
|
buf = ""
|
||||||
for chunk in stream:
|
for chunk in _stream_with_idle_timeout(stream):
|
||||||
buf += chunk.decode("utf-8", errors="replace")
|
buf += chunk.decode("utf-8", errors="replace")
|
||||||
while "\n" in buf:
|
while "\n" in buf:
|
||||||
line, buf = buf.split("\n", 1)
|
line, buf = buf.split("\n", 1)
|
||||||
@@ -2933,7 +3048,7 @@ def _auto_continue_gemini(handler, flush_event, message_id, model, gen_config, g
|
|||||||
cont_text = ""
|
cont_text = ""
|
||||||
cont_finish = ""
|
cont_finish = ""
|
||||||
cont_buf = ""
|
cont_buf = ""
|
||||||
for raw_line in upstream:
|
for raw_line in _stream_with_idle_timeout(upstream):
|
||||||
line = raw_line.decode(errors="replace")
|
line = raw_line.decode(errors="replace")
|
||||||
if line.startswith("data: "):
|
if line.startswith("data: "):
|
||||||
cont_buf += line[6:]
|
cont_buf += line[6:]
|
||||||
@@ -3124,7 +3239,14 @@ class Handler(http.server.BaseHTTPRequestHandler):
|
|||||||
except urllib.error.HTTPError as e:
|
except urllib.error.HTTPError as e:
|
||||||
err_body = e.read().decode()
|
err_body = e.read().decode()
|
||||||
if e.code in (429, 502, 503) and attempt < max_retries:
|
if e.code in (429, 502, 503) and attempt < max_retries:
|
||||||
wait = min(2 ** (attempt + 1), 15)
|
retry_after = e.headers.get("Retry-After")
|
||||||
|
if retry_after:
|
||||||
|
try:
|
||||||
|
wait = min(int(retry_after), 60)
|
||||||
|
except ValueError:
|
||||||
|
wait = min(2 ** (attempt + 1), 15)
|
||||||
|
else:
|
||||||
|
wait = min(2 ** (attempt + 1), 15)
|
||||||
print(f"[translate-proxy] HTTP {e.code} (attempt {attempt+1}/{max_retries}), retrying in {wait}s: {err_body[:150]}", file=sys.stderr)
|
print(f"[translate-proxy] HTTP {e.code} (attempt {attempt+1}/{max_retries}), retrying in {wait}s: {err_body[:150]}", file=sys.stderr)
|
||||||
time.sleep(wait)
|
time.sleep(wait)
|
||||||
continue
|
continue
|
||||||
@@ -3423,7 +3545,7 @@ class Handler(http.server.BaseHTTPRequestHandler):
|
|||||||
|
|
||||||
buf = ""
|
buf = ""
|
||||||
stream_finished = False
|
stream_finished = False
|
||||||
for raw_line in upstream:
|
for raw_line in _stream_with_idle_timeout(upstream):
|
||||||
if tracker and tracker.cancelled.is_set():
|
if tracker and tracker.cancelled.is_set():
|
||||||
print("[gemini-oauth] stream cancelled", file=sys.stderr)
|
print("[gemini-oauth] stream cancelled", file=sys.stderr)
|
||||||
break
|
break
|
||||||
@@ -3596,7 +3718,8 @@ class Handler(http.server.BaseHTTPRequestHandler):
|
|||||||
except urllib.error.HTTPError as e:
|
except urllib.error.HTTPError as e:
|
||||||
err = e.read().decode()
|
err = e.read().decode()
|
||||||
if e.code in (429, 502, 503) and attempt < 2:
|
if e.code in (429, 502, 503) and attempt < 2:
|
||||||
wait = min(2 ** (attempt + 1), 10)
|
retry_after = e.headers.get("Retry-After")
|
||||||
|
wait = min(int(retry_after), 60) if retry_after and retry_after.isdigit() else min(2 ** (attempt + 1), 10)
|
||||||
print(f"[bgp] route '{route.get('name', r_url)}' HTTP {e.code}, retry {attempt+1}/2 in {wait}s", file=sys.stderr)
|
print(f"[bgp] route '{route.get('name', r_url)}' HTTP {e.code}, retry {attempt+1}/2 in {wait}s", file=sys.stderr)
|
||||||
time.sleep(wait)
|
time.sleep(wait)
|
||||||
req = urllib.request.Request(target, data=json.dumps(chat_body).encode(), headers=fwd)
|
req = urllib.request.Request(target, data=json.dumps(chat_body).encode(), headers=fwd)
|
||||||
@@ -4268,6 +4391,7 @@ class Handler(http.server.BaseHTTPRequestHandler):
|
|||||||
def stream_buffered_events(self, event_iter, flush_interval=0.03, max_bytes=4096, on_event=None):
|
def stream_buffered_events(self, event_iter, flush_interval=0.03, max_bytes=4096, on_event=None):
|
||||||
buf = bytearray()
|
buf = bytearray()
|
||||||
last_flush = time.monotonic()
|
last_flush = time.monotonic()
|
||||||
|
_MAX_BUF = 8 * 1024 * 1024
|
||||||
def _flush():
|
def _flush():
|
||||||
nonlocal buf, last_flush
|
nonlocal buf, last_flush
|
||||||
if buf:
|
if buf:
|
||||||
@@ -4279,10 +4403,13 @@ class Handler(http.server.BaseHTTPRequestHandler):
|
|||||||
if on_event is not None and on_event(event) is False:
|
if on_event is not None and on_event(event) is False:
|
||||||
break
|
break
|
||||||
encoded = event.encode("utf-8") if isinstance(event, str) else event
|
encoded = event.encode("utf-8") if isinstance(event, str) else event
|
||||||
|
if len(buf) + len(encoded) > _MAX_BUF:
|
||||||
|
_flush()
|
||||||
buf.extend(encoded)
|
buf.extend(encoded)
|
||||||
urgent = ("response.completed" in event or "response.output_text.done" in event
|
urgent = ("response.completed" in event or "response.output_text.done" in event
|
||||||
or "response.output_item.done" in event
|
or "response.output_item.done" in event
|
||||||
or "function_call_arguments.done" in event)
|
or "function_call_arguments.done" in event
|
||||||
|
or "response.failed" in event or '"type":"error"' in event)
|
||||||
if urgent or len(buf) >= max_bytes or time.monotonic() - last_flush >= flush_interval:
|
if urgent or len(buf) >= max_bytes or time.monotonic() - last_flush >= flush_interval:
|
||||||
_flush()
|
_flush()
|
||||||
_flush()
|
_flush()
|
||||||
|
|||||||
Reference in New Issue
Block a user