diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b70c8e..b4a7242 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,34 @@ # Changelog +## v3.0.0 (2026-05-20) + +**Major architectural overhaul — Phase 0 + Phase 1 of engineering roadmap** + +### Proxy (translate-proxy.py) +- **ThreadingHTTPServer** — serves concurrent requests (no more blocking) +- **Thread-safe shared state** — OrderedDict response store with locks, Crof state lock, stats lock +- **Batched + atomic stats writes** — stats buffered in memory, flushed every 5s via `os.replace()` +- **Graceful shutdown** — SIGTERM/SIGINT drain active connections (up to 5s), reject new with 503 +- **Progressive upstream timeouts** — based on input size and tools (60-300s instead of flat 180s) +- **Lazy JSON parsing** — skip parsing SSE events unless they contain `response.completed` +- **Buffered SSE writes** — flush every 30ms, on urgent events, or at 4KB (reduces syscalls) +- **`/health` endpoint** — returns backend, target, models, BGP route count +- **Consolidated imports** — all at top, no more missing import crashes +- **`main()` entry point** — runtime init moved out of module level +- **TCP_NODELAY** — on all streaming paths (from v2.7.0) +- **Anthropic prompt caching** — `cache_control: ephemeral` on system prompts (from v2.7.0) + +### Launcher (codex-launcher-gui) +- **Dynamic port allocation** — `_pick_free_port()` picks random free port, no more 8080 conflicts +- **Proxy health gating** — Codex will NOT launch if proxy fails health check within 15s +- **Error dialogs** — clear GTK error dialog when proxy startup fails +- **Atomic config backup/restore** — temp file + `os.replace()`, no more corrupted config.toml +- **Config transactions** — recovery from interrupted sessions on next startup +- **Safe cleanup (PID registry)** — only kills processes launched by the app (pids.json) +- **Proxy stderr piped to log** — real-time proxy logs in launcher UI +- **Bearer token** — Codex config uses `codex-launcher-local` instead of real API key +- **Usage Dashboard v2** — OpenUsage-inspired dark theme with status pills, KPI strip, model bars (from v2.7.0) + ## v2.7.0 (2026-05-20) - **Usage Dashboard redesigned** (inspired by OpenUsage design patterns) diff --git a/codex-launcher_3.0.0_all.deb b/codex-launcher_3.0.0_all.deb new file mode 100644 index 0000000..96e0474 Binary files /dev/null and b/codex-launcher_3.0.0_all.deb differ diff --git a/src/codex-launcher-gui b/src/codex-launcher-gui index 26a9a1d..604e4d3 100755 --- a/src/codex-launcher-gui +++ b/src/codex-launcher-gui @@ -5,7 +5,7 @@ import gi gi.require_version("Gtk", "3.0") from gi.repository import Gtk, GLib import subprocess, os, signal, sys, threading, time, json, urllib.request, tempfile, shutil -import hashlib +import hashlib, socket, contextlib from pathlib import Path HOME = Path.home() @@ -435,11 +435,51 @@ def import_profile_bundle(path): def backup_config(): if CONFIG.exists(): - shutil.copy2(str(CONFIG), str(CONFIG_BAK)) + tmp = CONFIG_BAK.with_suffix(".tmp") + shutil.copy2(str(CONFIG), str(tmp)) + os.replace(str(tmp), str(CONFIG_BAK)) def restore_config(): if CONFIG_BAK.exists(): - CONFIG_BAK.rename(CONFIG) + tmp = CONFIG.with_suffix(".tmp") + shutil.copy2(str(CONFIG_BAK), str(tmp)) + os.replace(str(tmp), str(CONFIG)) + +def write_secure_text(path, text): + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_suffix(path.suffix + ".tmp") + tmp.write_text(text, encoding="utf-8") + os.chmod(str(tmp), 0o600) + os.replace(str(tmp), str(path)) + +CONFIG_TXN = HOME / ".codex/config.toml.launcher-txn.json" + +def begin_config_transaction(reason): + txn = {"started_at": time.time(), "reason": reason, + "config_existed": CONFIG.exists(), "backup_path": str(CONFIG_BAK)} + if CONFIG.exists(): + backup_config() + CONFIG_TXN.parent.mkdir(parents=True, exist_ok=True) + CONFIG_TXN.write_text(json.dumps(txn, indent=2)) + +def end_config_transaction(): + CONFIG_TXN.unlink(missing_ok=True) + +def recover_config_if_needed(logfn=None): + if not CONFIG_TXN.exists(): + return + try: + txn = json.loads(CONFIG_TXN.read_text()) + if txn.get("config_existed") and CONFIG_BAK.exists(): + restore_config() + if logfn: + logfn("Recovered Codex config from interrupted session.") + elif CONFIG.exists(): + CONFIG.unlink() + if logfn: + logfn("Removed generated config from interrupted session.") + finally: + CONFIG_TXN.unlink(missing_ok=True) def write_config_for_native(endpoint, selected_model): """Write config for native OpenAI (no proxy needed).""" @@ -470,8 +510,7 @@ def _toml_safe(val): val = str(val).replace('"', '\\"') return val.split('\n', 1)[0].strip() -def write_config_for_translated(endpoint, selected_model): - """Write config pointing at local proxy.""" +def write_config_for_translated(endpoint, selected_model, proxy_port=8080): backup_config() model_catalog = _gen_model_catalog(endpoint, selected_model) mc_path = PROXY_CONFIG_DIR / f"models-{safe_name(endpoint['name'])}.json" @@ -484,8 +523,8 @@ def write_config_for_translated(endpoint, selected_model): f'model_catalog_json = "{mc_path}"\n', f'\n[model_providers."{endpoint["name"]}"]\n', f'name = "{_toml_safe(endpoint["name"])}"\n', - f'base_url = "http://127.0.0.1:8080"\n', - f'experimental_bearer_token = "{_toml_safe(endpoint["api_key"])}"\n', + f'base_url = "http://127.0.0.1:{proxy_port}"\n', + f'experimental_bearer_token = "codex-launcher-local"\n', f'\n[profiles."{endpoint["name"]}"]\n', f'model_provider = "{_toml_safe(endpoint["name"])}"\n', f'model = "{_toml_safe(selected_model)}"\n', @@ -493,7 +532,7 @@ def write_config_for_translated(endpoint, selected_model): f'service_tier = "fast"\n', f'approvals_reviewer = "user"\n', ] - CONFIG.write_text("".join(lines)) + write_secure_text(CONFIG, "".join(lines)) def _gen_model_catalog(endpoint, selected_model=None): default_model = selected_model or endpoint.get("default_model") @@ -533,13 +572,66 @@ def _gen_model_catalog(endpoint, selected_model=None): # ═══════════════════════════════════════════════════════════════════ _proxy_proc = None +_proxy_port = None + +PID_REGISTRY = HOME / ".cache" / "codex-launcher" / "pids.json" + +def _pick_free_port(): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("127.0.0.1", 0)) + return s.getsockname()[1] + +def _load_pid_registry(): + if PID_REGISTRY.exists(): + try: + return json.loads(PID_REGISTRY.read_text()) + except Exception: + pass + return {} + +def _save_pid_registry(data): + PID_REGISTRY.parent.mkdir(parents=True, exist_ok=True) + tmp = PID_REGISTRY.with_suffix(".tmp") + tmp.write_text(json.dumps(data, indent=2)) + os.replace(str(tmp), str(PID_REGISTRY)) + +def _register_pgid(kind, pid): + data = _load_pid_registry() + try: + pgid = os.getpgid(pid) + except ProcessLookupError: + return + data[kind] = {"pid": pid, "pgid": pgid, "ts": time.time()} + _save_pid_registry(data) + +def safe_cleanup_owned(logfn=None): + data = _load_pid_registry() + changed = False + for kind, meta in list(data.items()): + pgid = meta.get("pgid") + if not pgid: + continue + try: + os.killpg(pgid, signal.SIGTERM) + if logfn: + logfn(f"Stopped {kind} (pgid {pgid})") + changed = True + except ProcessLookupError: + changed = True + except Exception as e: + if logfn: + logfn(f"Could not stop {kind}: {e}") + if changed: + _save_pid_registry({}) def _start_proxy_for(endpoint, logfn): - global _proxy_proc + global _proxy_proc, _proxy_port _stop_proxy() + port = _pick_free_port() + _proxy_port = port pcfg = { - "port": 8080, + "port": port, "backend_type": endpoint["backend_type"], "target_url": normalize_base_url(endpoint["base_url"]), "api_key": endpoint["api_key"], @@ -550,26 +642,49 @@ def _start_proxy_for(endpoint, logfn): "models": [{"id": m, "object": "model", "created": 1700000000, "owned_by": endpoint["name"]} for m in endpoint.get("models", [])], } - pcfg_path = PROXY_CONFIG_DIR / f"proxy-{safe_name(endpoint['name'])}.json" + pcfg_path = PROXY_CONFIG_DIR / f"proxy-{safe_name(endpoint['name'])}-{port}.json" pcfg_path.parent.mkdir(parents=True, exist_ok=True) pcfg_path.write_text(json.dumps(pcfg, indent=2)) - _start_proxy_with_config(pcfg_path, logfn) + _start_proxy_with_config(pcfg_path, port, logfn) + return port -def _start_proxy_with_config(pcfg_path, logfn): +def _start_proxy_with_config(pcfg_path, port, logfn): global _proxy_proc _proxy_proc = subprocess.Popen( ["python3", str(PROXY), "--config", str(pcfg_path)], stdout=subprocess.DEVNULL, + stderr=subprocess.PIPE, preexec_fn=os.setsid, + text=True, ) - for _ in range(30): - try: - urllib.request.urlopen("http://127.0.0.1:8080/v1/models", timeout=2) - logfn("Proxy ready on port 8080") + _register_pgid("proxy", _proxy_proc.pid) + + def _pipe_stderr(): + if not _proxy_proc.stderr: return - except Exception: - time.sleep(0.5) - logfn("WARNING: proxy may not have started in time") + for line in _proxy_proc.stderr: + GLib.idle_add(logfn, f"[proxy] {line.rstrip()}") + threading.Thread(target=_pipe_stderr, daemon=True).start() + + deadline = time.time() + 15 + last_err = None + while time.time() < deadline: + if _proxy_proc.poll() is not None: + raise RuntimeError(f"Proxy exited early with code {_proxy_proc.returncode}") + try: + urllib.request.urlopen(f"http://127.0.0.1:{port}/v1/models", timeout=2) + logfn(f"Proxy ready on port {port}") + return + except Exception as e: + last_err = e + time.sleep(0.3) + try: + os.killpg(os.getpgid(_proxy_proc.pid), signal.SIGTERM) + _proxy_proc.wait(timeout=3) + except Exception: + with contextlib.suppress(Exception): + os.killpg(os.getpgid(_proxy_proc.pid), signal.SIGKILL) + raise RuntimeError(f"Proxy failed health check on port {port}: {last_err}") def _stop_proxy(): global _proxy_proc @@ -583,8 +698,8 @@ def _stop_proxy(): pass _proxy_proc = None -def _run_cleanup(): - subprocess.run(["bash", str(CLEANUP)], capture_output=True, timeout=30) +def _run_cleanup(logfn=None): + safe_cleanup_owned(logfn) def _last_log_lines(n=15): try: @@ -640,6 +755,7 @@ class LauncherWin(Gtk.Window): self.set_position(Gtk.WindowPosition.CENTER) self._proc = None self._endpoints_data = load_endpoints() + recover_config_if_needed() vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8) self.add(vbox) @@ -647,7 +763,7 @@ class LauncherWin(Gtk.Window): # header row hdr = Gtk.Box(spacing=8) vbox.pack_start(hdr, False, False, 0) - lbl = Gtk.Label(label="Codex Launcher v2.7.0") + lbl = Gtk.Label(label="Codex Launcher v3.0.0") lbl.set_use_markup(True) hdr.pack_start(lbl, False, False, 0) changelog_btn = Gtk.Button(label="Changelog") @@ -1207,17 +1323,24 @@ class LauncherWin(Gtk.Window): def _run(self, ep, model, target): try: self.log("Cleaning up stale processes…") - _run_cleanup() + _run_cleanup(self.log) + recover_config_if_needed(self.log) needs_proxy = ep["backend_type"] != "native" if needs_proxy: self.log("Starting translation proxy…") - _start_proxy_for(ep, self.log) - self.log(f"Configuring Codex for {ep['name']} (proxied)…") - write_config_for_translated(ep, model) + try: + proxy_port = _start_proxy_for(ep, self.log) + except RuntimeError as e: + GLib.idle_add(self._show_error_dialog, "Proxy startup failed", str(e)) + return + self.log(f"Configuring Codex for {ep['name']} (proxied on :{proxy_port})…") + begin_config_transaction(f"launch:{ep['name']}") + write_config_for_translated(ep, model, proxy_port) else: self.log(f"Configuring Codex for {ep['name']} (native)…") + begin_config_transaction(f"launch:{ep['name']}") write_config_for_native(ep, model) if target == "desktop": @@ -1230,15 +1353,18 @@ class LauncherWin(Gtk.Window): finally: _stop_proxy() restore_config() + end_config_transaction() self._set_busy(False) self.log("Ready.") def _run_bgp(self, pool, model, target): try: self.log("Cleaning up stale processes…") - _run_cleanup() + _run_cleanup(self.log) + recover_config_if_needed(self.log) - self.log(f"Starting BGP proxy with {len(pool.get('routes', []))} routes…") + port = _pick_free_port() + self.log(f"Starting BGP proxy with {len(pool.get('routes', []))} routes on :{port}…") bgp_ep = { "name": pool["name"], "backend_type": "openai-compat", @@ -1248,19 +1374,24 @@ class LauncherWin(Gtk.Window): "models": list(dict.fromkeys(r.get("model", model) for r in pool.get("routes", []))), } pcfg = { - "port": 8080, + "port": port, "backend_type": "openai-compat", "target_url": "http://bgp.placeholder", "api_key": "", "bgp_routes": pool.get("routes", []), "models": [{"id": m, "object": "model", "created": 1700000000, "owned_by": "bgp"} for m in bgp_ep["models"]], } - pcfg_path = PROXY_CONFIG_DIR / f"proxy-{safe_name(pool['name'])}.json" + pcfg_path = PROXY_CONFIG_DIR / f"proxy-{safe_name(pool['name'])}-{port}.json" pcfg_path.parent.mkdir(parents=True, exist_ok=True) pcfg_path.write_text(json.dumps(pcfg, indent=2)) - _start_proxy_with_config(pcfg_path, self.log) + try: + _start_proxy_with_config(pcfg_path, port, self.log) + except RuntimeError as e: + GLib.idle_add(self._show_error_dialog, "BGP proxy startup failed", str(e)) + return - write_config_for_translated(bgp_ep, model) + begin_config_transaction(f"launch:bgp:{pool['name']}") + write_config_for_translated(bgp_ep, model, port) if target == "desktop": self._launch_desktop(bgp_ep, model) @@ -1272,17 +1403,19 @@ class LauncherWin(Gtk.Window): finally: _stop_proxy() restore_config() + end_config_transaction() self._set_busy(False) self.log("Ready.") def _run_codex_default(self, target): try: self.log("Cleaning up stale processes…") - _run_cleanup() + _run_cleanup(self.log) _stop_proxy() + recover_config_if_needed(self.log) self.log("Resetting config to Codex defaults (OAuth)…") - backup_config() + begin_config_transaction("launch:default") if CONFIG.exists(): CONFIG.unlink() @@ -1294,9 +1427,19 @@ class LauncherWin(Gtk.Window): self.log(f"ERROR: {e}") finally: restore_config() + end_config_transaction() self._set_busy(False) self.log("Ready.") + def _show_error_dialog(self, title, message): + dialog = Gtk.MessageDialog( + transient_for=self, flags=0, + message_type=Gtk.MessageType.ERROR, + buttons=Gtk.ButtonsType.CLOSE, text=str(title)) + dialog.format_secondary_text(str(message)) + dialog.run() + dialog.destroy() + def _launch_desktop(self, ep, model): args = [str(START_SH)] if ep["backend_type"] != "native": @@ -1454,8 +1597,9 @@ class LauncherWin(Gtk.Window): pass self._proc = None _stop_proxy() - _run_cleanup() + _run_cleanup(self.log) restore_config() + end_config_transaction() LOG_DIR.mkdir(parents=True, exist_ok=True) LAUNCH_LOG.unlink(missing_ok=True) self.log("Cleanup complete") diff --git a/src/translate-proxy.py b/src/translate-proxy.py index aa087f4..53996f6 100755 --- a/src/translate-proxy.py +++ b/src/translate-proxy.py @@ -11,7 +11,8 @@ Usage: python3 translate-proxy.py --backend openai-compat --target-url https://... --api-key sk-... """ -import json, http.server, urllib.request, time, uuid, os, sys, argparse, threading, socket +import json, http.server, socketserver, urllib.request, urllib.parse, urllib.error +import time, uuid, os, sys, argparse, threading, socket, collections, contextlib, signal # ═══════════════════════════════════════════════════════════════════ # Config @@ -74,25 +75,64 @@ def load_config(): return cfg -CONFIG = load_config() -PORT = CONFIG["port"] -BACKEND = CONFIG["backend_type"] -TARGET_URL = CONFIG["target_url"].rstrip("/") -API_KEY = CONFIG["api_key"] -OAUTH_PROVIDER = CONFIG.get("oauth_provider", "") -MODELS = CONFIG["models"] -CC_VERSION = CONFIG.get("cc_version", "") -REASONING_ENABLED = CONFIG.get("reasoning_enabled", True) -REASONING_EFFORT = CONFIG.get("reasoning_effort", "medium") -BGP_ROUTES = CONFIG.get("bgp_routes", []) -BGP_MODELS = [] -for _r in BGP_ROUTES: - for _m in _r.get("models", [{"id": _r.get("model", "unknown")}]): - if _m.get("id", _m) not in BGP_MODELS: - BGP_MODELS.append(_m.get("id", _m) if isinstance(_m, dict) else _m) -if BGP_ROUTES and not MODELS: - MODELS = [{"id": m, "object": "model", "created": 1700000000, "owned_by": "bgp"} for m in BGP_MODELS] - CONFIG["models"] = MODELS +CONFIG = None +PORT = 8080 +BACKEND = "openai-compat" +TARGET_URL = "" +API_KEY = "" +OAUTH_PROVIDER = "" +MODELS = [] +CC_VERSION = "" +REASONING_ENABLED = True +REASONING_EFFORT = "medium" +BGP_ROUTES = [] +SERVER = None + +_LOG_DIR = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy") +os.makedirs(_LOG_DIR, exist_ok=True) +_stats_path = os.path.join(_LOG_DIR, "usage-stats.json") +_stats_lock = threading.Lock() +_stats_pending = [] +_stats_flush_timer = None +_STATS_FLUSH_INTERVAL = 5.0 + +_response_store = collections.OrderedDict() +_response_store_lock = threading.Lock() +_MAX_STORED = 50 + +_crof_lock = threading.Lock() + +_shutdown_requested = False +_active_connections = 0 +_active_connections_lock = threading.Lock() + +_pool = uuid.uuid4().hex[:8] + +def _init_runtime(): + global CONFIG, PORT, BACKEND, TARGET_URL, API_KEY, OAUTH_PROVIDER + global MODELS, CC_VERSION, REASONING_ENABLED, REASONING_EFFORT, BGP_ROUTES + + CONFIG = load_config() + PORT = CONFIG["port"] + BACKEND = CONFIG["backend_type"] + TARGET_URL = CONFIG["target_url"].rstrip("/") + API_KEY = CONFIG["api_key"] + OAUTH_PROVIDER = CONFIG.get("oauth_provider", "") + MODELS = CONFIG["models"] + CC_VERSION = CONFIG.get("cc_version", "") + REASONING_ENABLED = CONFIG.get("reasoning_enabled", True) + REASONING_EFFORT = CONFIG.get("reasoning_effort", "medium") + BGP_ROUTES = CONFIG.get("bgp_routes", []) + + bgp_models = [] + for _r in BGP_ROUTES: + for _m in _r.get("models", [{"id": _r.get("model", "unknown")}]): + mid = _m.get("id", _m) if isinstance(_m, dict) else _m + if mid not in bgp_models: + bgp_models.append(mid) + if BGP_ROUTES and not MODELS: + MODELS = [{"id": m, "object": "model", "created": 1700000000, "owned_by": "bgp"} for m in bgp_models] + CONFIG["models"] = MODELS def _refresh_oauth_token(): return _refresh_oauth_token_for(API_KEY, OAUTH_PROVIDER) @@ -138,14 +178,6 @@ def _refresh_oauth_token_for(api_key, oauth_provider): _pool = uuid.uuid4().hex[:8] -_response_store = {} -_MAX_STORED = 50 - -_LOG_DIR = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy") -os.makedirs(_LOG_DIR, exist_ok=True) -_stats_path = os.path.join(_LOG_DIR, "usage-stats.json") -_stats_lock = threading.Lock() - def _load_stats(): try: if os.path.exists(_stats_path): @@ -154,46 +186,78 @@ def _load_stats(): pass return {"providers": {}, "updated": None} -def _record_usage(provider, model, success, duration_s, tokens_in=0, tokens_out=0, error_type=None): +def _atomic_write_json(path, obj): + tmp = path + ".tmp" + with open(tmp, "w") as f: + json.dump(obj, f, indent=2, ensure_ascii=False) + os.replace(tmp, path) + +def _flush_stats(): + global _stats_flush_timer with _stats_lock: - stats = _load_stats() + batch = list(_stats_pending) + _stats_pending.clear() + _stats_flush_timer = None + if not batch: + return + stats = _load_stats() + for entry in batch: + provider = entry["provider"] + model = entry["model"] p = stats["providers"].setdefault(provider, { "total_requests": 0, "successes": 0, "failures": 0, "total_tokens_in": 0, "total_tokens_out": 0, "total_duration_s": 0.0, "models": {}, "last_used": None, "last_error": None, }) p["total_requests"] += 1 - p["total_tokens_in"] += tokens_in - p["total_tokens_out"] += tokens_out - p["total_duration_s"] += duration_s - p["last_used"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) - if success: + p["total_tokens_in"] += entry["tokens_in"] + p["total_tokens_out"] += entry["tokens_out"] + p["total_duration_s"] += entry["duration_s"] + p["last_used"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(entry["ts"])) + if entry["success"]: p["successes"] += 1 else: p["failures"] += 1 - p["last_error"] = error_type or "unknown" + p["last_error"] = entry.get("error_type") or "unknown" m = p["models"].setdefault(model, {"requests": 0, "tokens_in": 0, "tokens_out": 0}) m["requests"] += 1 - m["tokens_in"] += tokens_in - m["tokens_out"] += tokens_out - stats["updated"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) - with open(_stats_path, "w") as f: - json.dump(stats, f, indent=2) + m["tokens_in"] += entry["tokens_in"] + m["tokens_out"] += entry["tokens_out"] + stats["updated"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) + _atomic_write_json(_stats_path, stats) + +def _record_usage(provider, model, success, duration_s, tokens_in=0, tokens_out=0, error_type=None): + global _stats_flush_timer + entry = { + "provider": provider or "unknown", "model": model or "unknown", + "success": bool(success), "duration_s": float(duration_s or 0), + "tokens_in": int(tokens_in or 0), "tokens_out": int(tokens_out or 0), + "error_type": error_type, "ts": time.time(), + } + with _stats_lock: + _stats_pending.append(entry) + if _stats_flush_timer is None: + _stats_flush_timer = threading.Timer(_STATS_FLUSH_INTERVAL, _flush_stats) + _stats_flush_timer.daemon = True + _stats_flush_timer.start() def store_response(resp_id, input_data, output_items): if not resp_id: return - _response_store[resp_id] = {"input": input_data, "output": output_items} - if len(_response_store) > _MAX_STORED: - oldest = list(_response_store.keys())[0] - del _response_store[oldest] + with _response_store_lock: + _response_store[resp_id] = {"input": input_data, "output": output_items, "ts": time.time()} + while len(_response_store) > _MAX_STORED: + _response_store.popitem(last=False) def resolve_previous_response(body): prev_id = body.get("previous_response_id") input_data = body.get("input", "") - if not prev_id or prev_id not in _response_store: + if not prev_id: + return input_data + with _response_store_lock: + stored = _response_store.get(prev_id) + if not stored: return input_data - stored = _response_store[prev_id] prev_input = stored["input"] prev_output = stored["output"] new_input = input_data if isinstance(input_data, list) else [] @@ -983,18 +1047,60 @@ def _log_resp(resp_id, status, output): except Exception: pass +class ConnectionTracker: + def __enter__(self): + global _active_connections + with _active_connections_lock: + _active_connections += 1 + def __exit__(self, *a): + global _active_connections + with _active_connections_lock: + _active_connections -= 1 + +def _handle_shutdown_signal(signum, frame): + global _shutdown_requested + _shutdown_requested = True + print("[proxy] shutdown requested; draining connections", file=sys.stderr) + def _drain(): + deadline = time.time() + 5 + while time.time() < deadline: + with _active_connections_lock: + if _active_connections == 0: + break + time.sleep(0.1) + if SERVER is not None: + SERVER.shutdown() + threading.Thread(target=_drain, daemon=True).start() + +def _upstream_timeout(body, stream): + input_data = body.get("input", "") + n_items = len(input_data) if isinstance(input_data, list) else 1 + has_tools = bool(body.get("tools")) + if stream: + return min((180 if has_tools else 120) + n_items * 2, 300) + return min(60 + n_items * 2, 120) + class Handler(http.server.BaseHTTPRequestHandler): protocol_version = "HTTP/1.1" def do_GET(self): if self.path in ("/v1/models", "/models"): self.send_json(200, {"object": "list", "data": MODELS}) + elif self.path in ("/health", "/v1/health"): + self.send_json(200, {"ok": True, "backend": BACKEND, + "target_url": TARGET_URL, + "models": [m.get("id") for m in MODELS], + "bgp_routes": len(BGP_ROUTES)}) else: self.send_error(404) def do_POST(self): + if _shutdown_requested: + return self.send_json(503, {"error": {"type": "proxy_shutting_down", + "message": "Proxy is shutting down"}}) if self.path in ("/v1/responses", "/responses"): - self._handle() + with ConnectionTracker(): + self._handle() else: self.send_error(404) @@ -1082,7 +1188,7 @@ class Handler(http.server.BaseHTTPRequestHandler): for attempt in range(max_retries + 1): req = urllib.request.Request(target, data=chat_body_b, headers=fwd) try: - upstream = urllib.request.urlopen(req, timeout=180) + upstream = urllib.request.urlopen(req, timeout=_upstream_timeout(body, stream)) except urllib.error.HTTPError as e: err_body = e.read().decode() if e.code in (429, 502, 503) and attempt < max_retries: @@ -1163,7 +1269,7 @@ class Handler(http.server.BaseHTTPRequestHandler): route_ok = False for attempt in range(3): try: - upstream = urllib.request.urlopen(req, timeout=180) + upstream = urllib.request.urlopen(req, timeout=_upstream_timeout(body, stream)) print(f"[bgp] route '{route.get('name', r_url)}' connected OK", file=sys.stderr) self._forward_oa_compat(upstream, stream, r_model, chat_body, body, input_data, fwd, target) return @@ -1284,7 +1390,7 @@ class Handler(http.server.BaseHTTPRequestHandler): def _forward_oa_compat_retry(self, req, model, chat_body, body, input_data): try: - upstream = urllib.request.urlopen(req, timeout=180) + upstream = urllib.request.urlopen(req, timeout=_upstream_timeout(body, True)) except Exception as e: print(f"[crof-adaptive] retry failed: {e}", file=sys.stderr) return @@ -1427,7 +1533,7 @@ class Handler(http.server.BaseHTTPRequestHandler): if stream: try: - upstream = urllib.request.urlopen(req, timeout=180) + upstream = urllib.request.urlopen(req, timeout=_upstream_timeout(body, True)) except urllib.error.HTTPError as e: err = e.read().decode() return self.send_json(e.code, {"error": {"type": "upstream_error", "message": err}}) @@ -1461,7 +1567,7 @@ class Handler(http.server.BaseHTTPRequestHandler): store_response(last_resp_id, body.get("input", ""), last_output) else: try: - upstream = urllib.request.urlopen(req, timeout=180) + upstream = urllib.request.urlopen(req, timeout=_upstream_timeout(body, False)) except urllib.error.HTTPError as e: err = e.read().decode() return self.send_json(e.code, {"error": {"type": "upstream_error", "message": err}}) @@ -1478,7 +1584,7 @@ class Handler(http.server.BaseHTTPRequestHandler): def _forward(self, req, stream, model, nonstream_fn, stream_fn, input_data=None): try: - upstream = urllib.request.urlopen(req, timeout=180) + upstream = urllib.request.urlopen(req, timeout=_upstream_timeout({}, stream)) except urllib.error.HTTPError as e: err = e.read().decode() return self.send_json(e.code, {"error": {"type": "upstream_error", "message": err}}) @@ -1533,17 +1639,54 @@ class Handler(http.server.BaseHTTPRequestHandler): self.end_headers() self.wfile.write(body) + def stream_buffered_events(self, event_iter, flush_interval=0.03, max_bytes=4096): + buf = bytearray() + last_flush = time.monotonic() + def _flush(): + nonlocal buf, last_flush + if buf: + self.wfile.write(buf) + self.wfile.flush() + buf.clear() + last_flush = time.monotonic() + for event in event_iter: + encoded = event.encode("utf-8") if isinstance(event, str) else event + buf.extend(encoded) + urgent = ("response.completed" in event or "response.output_text.done" in event + or "response.output_item.done" in event + or "function_call_arguments.done" in event) + if urgent or len(buf) >= max_bytes or time.monotonic() - last_flush >= flush_interval: + _flush() + _flush() + def log_message(self, fmt, *args): msg = fmt % args if args else fmt print(f"[translate-proxy] {BACKEND} {msg}", file=sys.stderr) -if __name__ == "__main__": - class ReusableHTTPServer(http.server.HTTPServer): +def main(): + global SERVER + _init_runtime() + signal.signal(signal.SIGTERM, _handle_shutdown_signal) + signal.signal(signal.SIGINT, _handle_shutdown_signal) + try: + from http.server import ThreadingHTTPServer as _BaseSrv + except ImportError: + class _BaseSrv(socketserver.ThreadingMixIn, http.server.HTTPServer): + daemon_threads = True + class ReusableHTTPServer(_BaseSrv): allow_reuse_address = True - server = ReusableHTTPServer(("127.0.0.1", PORT), Handler) + daemon_threads = True + request_queue_size = 64 + SERVER = ReusableHTTPServer(("127.0.0.1", PORT), Handler) print(f"translate-proxy ({BACKEND}) listening on http://127.0.0.1:{PORT}", flush=True) print(f"Target: {TARGET_URL}", flush=True) print(f"Models: {[m['id'] for m in MODELS]}", flush=True) if BGP_ROUTES: print(f"BGP routes: {len(BGP_ROUTES)} ({[r.get('name','?') for r in BGP_ROUTES]})", flush=True) - server.serve_forever() + try: + SERVER.serve_forever() + finally: + _flush_stats() + +if __name__ == "__main__": + main()