v3.0.0: ThreadingHTTPServer, dynamic ports, health gating, atomic config, safe cleanup, buffered SSE, batched stats, graceful shutdown

This commit is contained in:
Roman
2026-05-20 18:54:47 +04:00
Unverified
parent cf8ebd2064
commit b060706e18
4 changed files with 410 additions and 94 deletions

View File

@@ -1,5 +1,34 @@
# Changelog # Changelog
## v3.0.0 (2026-05-20)
**Major architectural overhaul — Phase 0 + Phase 1 of engineering roadmap**
### Proxy (translate-proxy.py)
- **ThreadingHTTPServer** — serves concurrent requests (no more blocking)
- **Thread-safe shared state** — OrderedDict response store with locks, Crof state lock, stats lock
- **Batched + atomic stats writes** — stats buffered in memory, flushed every 5s via `os.replace()`
- **Graceful shutdown** — SIGTERM/SIGINT drain active connections (up to 5s), reject new with 503
- **Progressive upstream timeouts** — based on input size and tools (60-300s instead of flat 180s)
- **Lazy JSON parsing** — skip parsing SSE events unless they contain `response.completed`
- **Buffered SSE writes** — flush every 30ms, on urgent events, or at 4KB (reduces syscalls)
- **`/health` endpoint** — returns backend, target, models, BGP route count
- **Consolidated imports** — all at top, no more missing import crashes
- **`main()` entry point** — runtime init moved out of module level
- **TCP_NODELAY** — on all streaming paths (from v2.7.0)
- **Anthropic prompt caching** — `cache_control: ephemeral` on system prompts (from v2.7.0)
### Launcher (codex-launcher-gui)
- **Dynamic port allocation** — `_pick_free_port()` picks random free port, no more 8080 conflicts
- **Proxy health gating** — Codex will NOT launch if proxy fails health check within 15s
- **Error dialogs** — clear GTK error dialog when proxy startup fails
- **Atomic config backup/restore** — temp file + `os.replace()`, no more corrupted config.toml
- **Config transactions** — recovery from interrupted sessions on next startup
- **Safe cleanup (PID registry)** — only kills processes launched by the app (pids.json)
- **Proxy stderr piped to log** — real-time proxy logs in launcher UI
- **Bearer token** — Codex config uses `codex-launcher-local` instead of real API key
- **Usage Dashboard v2** — OpenUsage-inspired dark theme with status pills, KPI strip, model bars (from v2.7.0)
## v2.7.0 (2026-05-20) ## v2.7.0 (2026-05-20)
- **Usage Dashboard redesigned** (inspired by OpenUsage design patterns) - **Usage Dashboard redesigned** (inspired by OpenUsage design patterns)

Binary file not shown.

View File

@@ -5,7 +5,7 @@ import gi
gi.require_version("Gtk", "3.0") gi.require_version("Gtk", "3.0")
from gi.repository import Gtk, GLib from gi.repository import Gtk, GLib
import subprocess, os, signal, sys, threading, time, json, urllib.request, tempfile, shutil import subprocess, os, signal, sys, threading, time, json, urllib.request, tempfile, shutil
import hashlib import hashlib, socket, contextlib
from pathlib import Path from pathlib import Path
HOME = Path.home() HOME = Path.home()
@@ -435,11 +435,51 @@ def import_profile_bundle(path):
def backup_config(): def backup_config():
if CONFIG.exists(): if CONFIG.exists():
shutil.copy2(str(CONFIG), str(CONFIG_BAK)) tmp = CONFIG_BAK.with_suffix(".tmp")
shutil.copy2(str(CONFIG), str(tmp))
os.replace(str(tmp), str(CONFIG_BAK))
def restore_config(): def restore_config():
if CONFIG_BAK.exists(): if CONFIG_BAK.exists():
CONFIG_BAK.rename(CONFIG) tmp = CONFIG.with_suffix(".tmp")
shutil.copy2(str(CONFIG_BAK), str(tmp))
os.replace(str(tmp), str(CONFIG))
def write_secure_text(path, text):
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_text(text, encoding="utf-8")
os.chmod(str(tmp), 0o600)
os.replace(str(tmp), str(path))
CONFIG_TXN = HOME / ".codex/config.toml.launcher-txn.json"
def begin_config_transaction(reason):
txn = {"started_at": time.time(), "reason": reason,
"config_existed": CONFIG.exists(), "backup_path": str(CONFIG_BAK)}
if CONFIG.exists():
backup_config()
CONFIG_TXN.parent.mkdir(parents=True, exist_ok=True)
CONFIG_TXN.write_text(json.dumps(txn, indent=2))
def end_config_transaction():
CONFIG_TXN.unlink(missing_ok=True)
def recover_config_if_needed(logfn=None):
if not CONFIG_TXN.exists():
return
try:
txn = json.loads(CONFIG_TXN.read_text())
if txn.get("config_existed") and CONFIG_BAK.exists():
restore_config()
if logfn:
logfn("Recovered Codex config from interrupted session.")
elif CONFIG.exists():
CONFIG.unlink()
if logfn:
logfn("Removed generated config from interrupted session.")
finally:
CONFIG_TXN.unlink(missing_ok=True)
def write_config_for_native(endpoint, selected_model): def write_config_for_native(endpoint, selected_model):
"""Write config for native OpenAI (no proxy needed).""" """Write config for native OpenAI (no proxy needed)."""
@@ -470,8 +510,7 @@ def _toml_safe(val):
val = str(val).replace('"', '\\"') val = str(val).replace('"', '\\"')
return val.split('\n', 1)[0].strip() return val.split('\n', 1)[0].strip()
def write_config_for_translated(endpoint, selected_model): def write_config_for_translated(endpoint, selected_model, proxy_port=8080):
"""Write config pointing at local proxy."""
backup_config() backup_config()
model_catalog = _gen_model_catalog(endpoint, selected_model) model_catalog = _gen_model_catalog(endpoint, selected_model)
mc_path = PROXY_CONFIG_DIR / f"models-{safe_name(endpoint['name'])}.json" mc_path = PROXY_CONFIG_DIR / f"models-{safe_name(endpoint['name'])}.json"
@@ -484,8 +523,8 @@ def write_config_for_translated(endpoint, selected_model):
f'model_catalog_json = "{mc_path}"\n', f'model_catalog_json = "{mc_path}"\n',
f'\n[model_providers."{endpoint["name"]}"]\n', f'\n[model_providers."{endpoint["name"]}"]\n',
f'name = "{_toml_safe(endpoint["name"])}"\n', f'name = "{_toml_safe(endpoint["name"])}"\n',
f'base_url = "http://127.0.0.1:8080"\n', f'base_url = "http://127.0.0.1:{proxy_port}"\n',
f'experimental_bearer_token = "{_toml_safe(endpoint["api_key"])}"\n', f'experimental_bearer_token = "codex-launcher-local"\n',
f'\n[profiles."{endpoint["name"]}"]\n', f'\n[profiles."{endpoint["name"]}"]\n',
f'model_provider = "{_toml_safe(endpoint["name"])}"\n', f'model_provider = "{_toml_safe(endpoint["name"])}"\n',
f'model = "{_toml_safe(selected_model)}"\n', f'model = "{_toml_safe(selected_model)}"\n',
@@ -493,7 +532,7 @@ def write_config_for_translated(endpoint, selected_model):
f'service_tier = "fast"\n', f'service_tier = "fast"\n',
f'approvals_reviewer = "user"\n', f'approvals_reviewer = "user"\n',
] ]
CONFIG.write_text("".join(lines)) write_secure_text(CONFIG, "".join(lines))
def _gen_model_catalog(endpoint, selected_model=None): def _gen_model_catalog(endpoint, selected_model=None):
default_model = selected_model or endpoint.get("default_model") default_model = selected_model or endpoint.get("default_model")
@@ -533,13 +572,66 @@ def _gen_model_catalog(endpoint, selected_model=None):
# ═══════════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════════
_proxy_proc = None _proxy_proc = None
_proxy_port = None
PID_REGISTRY = HOME / ".cache" / "codex-launcher" / "pids.json"
def _pick_free_port():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]
def _load_pid_registry():
if PID_REGISTRY.exists():
try:
return json.loads(PID_REGISTRY.read_text())
except Exception:
pass
return {}
def _save_pid_registry(data):
PID_REGISTRY.parent.mkdir(parents=True, exist_ok=True)
tmp = PID_REGISTRY.with_suffix(".tmp")
tmp.write_text(json.dumps(data, indent=2))
os.replace(str(tmp), str(PID_REGISTRY))
def _register_pgid(kind, pid):
data = _load_pid_registry()
try:
pgid = os.getpgid(pid)
except ProcessLookupError:
return
data[kind] = {"pid": pid, "pgid": pgid, "ts": time.time()}
_save_pid_registry(data)
def safe_cleanup_owned(logfn=None):
data = _load_pid_registry()
changed = False
for kind, meta in list(data.items()):
pgid = meta.get("pgid")
if not pgid:
continue
try:
os.killpg(pgid, signal.SIGTERM)
if logfn:
logfn(f"Stopped {kind} (pgid {pgid})")
changed = True
except ProcessLookupError:
changed = True
except Exception as e:
if logfn:
logfn(f"Could not stop {kind}: {e}")
if changed:
_save_pid_registry({})
def _start_proxy_for(endpoint, logfn): def _start_proxy_for(endpoint, logfn):
global _proxy_proc global _proxy_proc, _proxy_port
_stop_proxy() _stop_proxy()
port = _pick_free_port()
_proxy_port = port
pcfg = { pcfg = {
"port": 8080, "port": port,
"backend_type": endpoint["backend_type"], "backend_type": endpoint["backend_type"],
"target_url": normalize_base_url(endpoint["base_url"]), "target_url": normalize_base_url(endpoint["base_url"]),
"api_key": endpoint["api_key"], "api_key": endpoint["api_key"],
@@ -550,26 +642,49 @@ def _start_proxy_for(endpoint, logfn):
"models": [{"id": m, "object": "model", "created": 1700000000, "owned_by": endpoint["name"]} "models": [{"id": m, "object": "model", "created": 1700000000, "owned_by": endpoint["name"]}
for m in endpoint.get("models", [])], for m in endpoint.get("models", [])],
} }
pcfg_path = PROXY_CONFIG_DIR / f"proxy-{safe_name(endpoint['name'])}.json" pcfg_path = PROXY_CONFIG_DIR / f"proxy-{safe_name(endpoint['name'])}-{port}.json"
pcfg_path.parent.mkdir(parents=True, exist_ok=True) pcfg_path.parent.mkdir(parents=True, exist_ok=True)
pcfg_path.write_text(json.dumps(pcfg, indent=2)) pcfg_path.write_text(json.dumps(pcfg, indent=2))
_start_proxy_with_config(pcfg_path, logfn) _start_proxy_with_config(pcfg_path, port, logfn)
return port
def _start_proxy_with_config(pcfg_path, logfn): def _start_proxy_with_config(pcfg_path, port, logfn):
global _proxy_proc global _proxy_proc
_proxy_proc = subprocess.Popen( _proxy_proc = subprocess.Popen(
["python3", str(PROXY), "--config", str(pcfg_path)], ["python3", str(PROXY), "--config", str(pcfg_path)],
stdout=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
preexec_fn=os.setsid, preexec_fn=os.setsid,
text=True,
) )
for _ in range(30): _register_pgid("proxy", _proxy_proc.pid)
try:
urllib.request.urlopen("http://127.0.0.1:8080/v1/models", timeout=2) def _pipe_stderr():
logfn("Proxy ready on port 8080") if not _proxy_proc.stderr:
return return
except Exception: for line in _proxy_proc.stderr:
time.sleep(0.5) GLib.idle_add(logfn, f"[proxy] {line.rstrip()}")
logfn("WARNING: proxy may not have started in time") threading.Thread(target=_pipe_stderr, daemon=True).start()
deadline = time.time() + 15
last_err = None
while time.time() < deadline:
if _proxy_proc.poll() is not None:
raise RuntimeError(f"Proxy exited early with code {_proxy_proc.returncode}")
try:
urllib.request.urlopen(f"http://127.0.0.1:{port}/v1/models", timeout=2)
logfn(f"Proxy ready on port {port}")
return
except Exception as e:
last_err = e
time.sleep(0.3)
try:
os.killpg(os.getpgid(_proxy_proc.pid), signal.SIGTERM)
_proxy_proc.wait(timeout=3)
except Exception:
with contextlib.suppress(Exception):
os.killpg(os.getpgid(_proxy_proc.pid), signal.SIGKILL)
raise RuntimeError(f"Proxy failed health check on port {port}: {last_err}")
def _stop_proxy(): def _stop_proxy():
global _proxy_proc global _proxy_proc
@@ -583,8 +698,8 @@ def _stop_proxy():
pass pass
_proxy_proc = None _proxy_proc = None
def _run_cleanup(): def _run_cleanup(logfn=None):
subprocess.run(["bash", str(CLEANUP)], capture_output=True, timeout=30) safe_cleanup_owned(logfn)
def _last_log_lines(n=15): def _last_log_lines(n=15):
try: try:
@@ -640,6 +755,7 @@ class LauncherWin(Gtk.Window):
self.set_position(Gtk.WindowPosition.CENTER) self.set_position(Gtk.WindowPosition.CENTER)
self._proc = None self._proc = None
self._endpoints_data = load_endpoints() self._endpoints_data = load_endpoints()
recover_config_if_needed()
vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8) vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8)
self.add(vbox) self.add(vbox)
@@ -647,7 +763,7 @@ class LauncherWin(Gtk.Window):
# header row # header row
hdr = Gtk.Box(spacing=8) hdr = Gtk.Box(spacing=8)
vbox.pack_start(hdr, False, False, 0) vbox.pack_start(hdr, False, False, 0)
lbl = Gtk.Label(label="<b>Codex Launcher v2.7.0</b>") lbl = Gtk.Label(label="<b>Codex Launcher v3.0.0</b>")
lbl.set_use_markup(True) lbl.set_use_markup(True)
hdr.pack_start(lbl, False, False, 0) hdr.pack_start(lbl, False, False, 0)
changelog_btn = Gtk.Button(label="Changelog") changelog_btn = Gtk.Button(label="Changelog")
@@ -1207,17 +1323,24 @@ class LauncherWin(Gtk.Window):
def _run(self, ep, model, target): def _run(self, ep, model, target):
try: try:
self.log("Cleaning up stale processes…") self.log("Cleaning up stale processes…")
_run_cleanup() _run_cleanup(self.log)
recover_config_if_needed(self.log)
needs_proxy = ep["backend_type"] != "native" needs_proxy = ep["backend_type"] != "native"
if needs_proxy: if needs_proxy:
self.log("Starting translation proxy…") self.log("Starting translation proxy…")
_start_proxy_for(ep, self.log) try:
self.log(f"Configuring Codex for {ep['name']} (proxied)…") proxy_port = _start_proxy_for(ep, self.log)
write_config_for_translated(ep, model) except RuntimeError as e:
GLib.idle_add(self._show_error_dialog, "Proxy startup failed", str(e))
return
self.log(f"Configuring Codex for {ep['name']} (proxied on :{proxy_port})…")
begin_config_transaction(f"launch:{ep['name']}")
write_config_for_translated(ep, model, proxy_port)
else: else:
self.log(f"Configuring Codex for {ep['name']} (native)…") self.log(f"Configuring Codex for {ep['name']} (native)…")
begin_config_transaction(f"launch:{ep['name']}")
write_config_for_native(ep, model) write_config_for_native(ep, model)
if target == "desktop": if target == "desktop":
@@ -1230,15 +1353,18 @@ class LauncherWin(Gtk.Window):
finally: finally:
_stop_proxy() _stop_proxy()
restore_config() restore_config()
end_config_transaction()
self._set_busy(False) self._set_busy(False)
self.log("Ready.") self.log("Ready.")
def _run_bgp(self, pool, model, target): def _run_bgp(self, pool, model, target):
try: try:
self.log("Cleaning up stale processes…") self.log("Cleaning up stale processes…")
_run_cleanup() _run_cleanup(self.log)
recover_config_if_needed(self.log)
self.log(f"Starting BGP proxy with {len(pool.get('routes', []))} routes…") port = _pick_free_port()
self.log(f"Starting BGP proxy with {len(pool.get('routes', []))} routes on :{port}…")
bgp_ep = { bgp_ep = {
"name": pool["name"], "name": pool["name"],
"backend_type": "openai-compat", "backend_type": "openai-compat",
@@ -1248,19 +1374,24 @@ class LauncherWin(Gtk.Window):
"models": list(dict.fromkeys(r.get("model", model) for r in pool.get("routes", []))), "models": list(dict.fromkeys(r.get("model", model) for r in pool.get("routes", []))),
} }
pcfg = { pcfg = {
"port": 8080, "port": port,
"backend_type": "openai-compat", "backend_type": "openai-compat",
"target_url": "http://bgp.placeholder", "target_url": "http://bgp.placeholder",
"api_key": "", "api_key": "",
"bgp_routes": pool.get("routes", []), "bgp_routes": pool.get("routes", []),
"models": [{"id": m, "object": "model", "created": 1700000000, "owned_by": "bgp"} for m in bgp_ep["models"]], "models": [{"id": m, "object": "model", "created": 1700000000, "owned_by": "bgp"} for m in bgp_ep["models"]],
} }
pcfg_path = PROXY_CONFIG_DIR / f"proxy-{safe_name(pool['name'])}.json" pcfg_path = PROXY_CONFIG_DIR / f"proxy-{safe_name(pool['name'])}-{port}.json"
pcfg_path.parent.mkdir(parents=True, exist_ok=True) pcfg_path.parent.mkdir(parents=True, exist_ok=True)
pcfg_path.write_text(json.dumps(pcfg, indent=2)) pcfg_path.write_text(json.dumps(pcfg, indent=2))
_start_proxy_with_config(pcfg_path, self.log) try:
_start_proxy_with_config(pcfg_path, port, self.log)
except RuntimeError as e:
GLib.idle_add(self._show_error_dialog, "BGP proxy startup failed", str(e))
return
write_config_for_translated(bgp_ep, model) begin_config_transaction(f"launch:bgp:{pool['name']}")
write_config_for_translated(bgp_ep, model, port)
if target == "desktop": if target == "desktop":
self._launch_desktop(bgp_ep, model) self._launch_desktop(bgp_ep, model)
@@ -1272,17 +1403,19 @@ class LauncherWin(Gtk.Window):
finally: finally:
_stop_proxy() _stop_proxy()
restore_config() restore_config()
end_config_transaction()
self._set_busy(False) self._set_busy(False)
self.log("Ready.") self.log("Ready.")
def _run_codex_default(self, target): def _run_codex_default(self, target):
try: try:
self.log("Cleaning up stale processes…") self.log("Cleaning up stale processes…")
_run_cleanup() _run_cleanup(self.log)
_stop_proxy() _stop_proxy()
recover_config_if_needed(self.log)
self.log("Resetting config to Codex defaults (OAuth)…") self.log("Resetting config to Codex defaults (OAuth)…")
backup_config() begin_config_transaction("launch:default")
if CONFIG.exists(): if CONFIG.exists():
CONFIG.unlink() CONFIG.unlink()
@@ -1294,9 +1427,19 @@ class LauncherWin(Gtk.Window):
self.log(f"ERROR: {e}") self.log(f"ERROR: {e}")
finally: finally:
restore_config() restore_config()
end_config_transaction()
self._set_busy(False) self._set_busy(False)
self.log("Ready.") self.log("Ready.")
def _show_error_dialog(self, title, message):
dialog = Gtk.MessageDialog(
transient_for=self, flags=0,
message_type=Gtk.MessageType.ERROR,
buttons=Gtk.ButtonsType.CLOSE, text=str(title))
dialog.format_secondary_text(str(message))
dialog.run()
dialog.destroy()
def _launch_desktop(self, ep, model): def _launch_desktop(self, ep, model):
args = [str(START_SH)] args = [str(START_SH)]
if ep["backend_type"] != "native": if ep["backend_type"] != "native":
@@ -1454,8 +1597,9 @@ class LauncherWin(Gtk.Window):
pass pass
self._proc = None self._proc = None
_stop_proxy() _stop_proxy()
_run_cleanup() _run_cleanup(self.log)
restore_config() restore_config()
end_config_transaction()
LOG_DIR.mkdir(parents=True, exist_ok=True) LOG_DIR.mkdir(parents=True, exist_ok=True)
LAUNCH_LOG.unlink(missing_ok=True) LAUNCH_LOG.unlink(missing_ok=True)
self.log("Cleanup complete") self.log("Cleanup complete")

View File

@@ -11,7 +11,8 @@ Usage:
python3 translate-proxy.py --backend openai-compat --target-url https://... --api-key sk-... python3 translate-proxy.py --backend openai-compat --target-url https://... --api-key sk-...
""" """
import json, http.server, urllib.request, time, uuid, os, sys, argparse, threading, socket import json, http.server, socketserver, urllib.request, urllib.parse, urllib.error
import time, uuid, os, sys, argparse, threading, socket, collections, contextlib, signal
# ═══════════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════════
# Config # Config
@@ -74,25 +75,64 @@ def load_config():
return cfg return cfg
CONFIG = load_config() CONFIG = None
PORT = CONFIG["port"] PORT = 8080
BACKEND = CONFIG["backend_type"] BACKEND = "openai-compat"
TARGET_URL = CONFIG["target_url"].rstrip("/") TARGET_URL = ""
API_KEY = CONFIG["api_key"] API_KEY = ""
OAUTH_PROVIDER = CONFIG.get("oauth_provider", "") OAUTH_PROVIDER = ""
MODELS = CONFIG["models"] MODELS = []
CC_VERSION = CONFIG.get("cc_version", "") CC_VERSION = ""
REASONING_ENABLED = CONFIG.get("reasoning_enabled", True) REASONING_ENABLED = True
REASONING_EFFORT = CONFIG.get("reasoning_effort", "medium") REASONING_EFFORT = "medium"
BGP_ROUTES = CONFIG.get("bgp_routes", []) BGP_ROUTES = []
BGP_MODELS = [] SERVER = None
for _r in BGP_ROUTES:
for _m in _r.get("models", [{"id": _r.get("model", "unknown")}]): _LOG_DIR = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy")
if _m.get("id", _m) not in BGP_MODELS: os.makedirs(_LOG_DIR, exist_ok=True)
BGP_MODELS.append(_m.get("id", _m) if isinstance(_m, dict) else _m) _stats_path = os.path.join(_LOG_DIR, "usage-stats.json")
if BGP_ROUTES and not MODELS: _stats_lock = threading.Lock()
MODELS = [{"id": m, "object": "model", "created": 1700000000, "owned_by": "bgp"} for m in BGP_MODELS] _stats_pending = []
CONFIG["models"] = MODELS _stats_flush_timer = None
_STATS_FLUSH_INTERVAL = 5.0
_response_store = collections.OrderedDict()
_response_store_lock = threading.Lock()
_MAX_STORED = 50
_crof_lock = threading.Lock()
_shutdown_requested = False
_active_connections = 0
_active_connections_lock = threading.Lock()
_pool = uuid.uuid4().hex[:8]
def _init_runtime():
global CONFIG, PORT, BACKEND, TARGET_URL, API_KEY, OAUTH_PROVIDER
global MODELS, CC_VERSION, REASONING_ENABLED, REASONING_EFFORT, BGP_ROUTES
CONFIG = load_config()
PORT = CONFIG["port"]
BACKEND = CONFIG["backend_type"]
TARGET_URL = CONFIG["target_url"].rstrip("/")
API_KEY = CONFIG["api_key"]
OAUTH_PROVIDER = CONFIG.get("oauth_provider", "")
MODELS = CONFIG["models"]
CC_VERSION = CONFIG.get("cc_version", "")
REASONING_ENABLED = CONFIG.get("reasoning_enabled", True)
REASONING_EFFORT = CONFIG.get("reasoning_effort", "medium")
BGP_ROUTES = CONFIG.get("bgp_routes", [])
bgp_models = []
for _r in BGP_ROUTES:
for _m in _r.get("models", [{"id": _r.get("model", "unknown")}]):
mid = _m.get("id", _m) if isinstance(_m, dict) else _m
if mid not in bgp_models:
bgp_models.append(mid)
if BGP_ROUTES and not MODELS:
MODELS = [{"id": m, "object": "model", "created": 1700000000, "owned_by": "bgp"} for m in bgp_models]
CONFIG["models"] = MODELS
def _refresh_oauth_token(): def _refresh_oauth_token():
return _refresh_oauth_token_for(API_KEY, OAUTH_PROVIDER) return _refresh_oauth_token_for(API_KEY, OAUTH_PROVIDER)
@@ -138,14 +178,6 @@ def _refresh_oauth_token_for(api_key, oauth_provider):
_pool = uuid.uuid4().hex[:8] _pool = uuid.uuid4().hex[:8]
_response_store = {}
_MAX_STORED = 50
_LOG_DIR = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy")
os.makedirs(_LOG_DIR, exist_ok=True)
_stats_path = os.path.join(_LOG_DIR, "usage-stats.json")
_stats_lock = threading.Lock()
def _load_stats(): def _load_stats():
try: try:
if os.path.exists(_stats_path): if os.path.exists(_stats_path):
@@ -154,46 +186,78 @@ def _load_stats():
pass pass
return {"providers": {}, "updated": None} return {"providers": {}, "updated": None}
def _record_usage(provider, model, success, duration_s, tokens_in=0, tokens_out=0, error_type=None): def _atomic_write_json(path, obj):
tmp = path + ".tmp"
with open(tmp, "w") as f:
json.dump(obj, f, indent=2, ensure_ascii=False)
os.replace(tmp, path)
def _flush_stats():
global _stats_flush_timer
with _stats_lock: with _stats_lock:
stats = _load_stats() batch = list(_stats_pending)
_stats_pending.clear()
_stats_flush_timer = None
if not batch:
return
stats = _load_stats()
for entry in batch:
provider = entry["provider"]
model = entry["model"]
p = stats["providers"].setdefault(provider, { p = stats["providers"].setdefault(provider, {
"total_requests": 0, "successes": 0, "failures": 0, "total_requests": 0, "successes": 0, "failures": 0,
"total_tokens_in": 0, "total_tokens_out": 0, "total_tokens_in": 0, "total_tokens_out": 0,
"total_duration_s": 0.0, "models": {}, "last_used": None, "last_error": None, "total_duration_s": 0.0, "models": {}, "last_used": None, "last_error": None,
}) })
p["total_requests"] += 1 p["total_requests"] += 1
p["total_tokens_in"] += tokens_in p["total_tokens_in"] += entry["tokens_in"]
p["total_tokens_out"] += tokens_out p["total_tokens_out"] += entry["tokens_out"]
p["total_duration_s"] += duration_s p["total_duration_s"] += entry["duration_s"]
p["last_used"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) p["last_used"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(entry["ts"]))
if success: if entry["success"]:
p["successes"] += 1 p["successes"] += 1
else: else:
p["failures"] += 1 p["failures"] += 1
p["last_error"] = error_type or "unknown" p["last_error"] = entry.get("error_type") or "unknown"
m = p["models"].setdefault(model, {"requests": 0, "tokens_in": 0, "tokens_out": 0}) m = p["models"].setdefault(model, {"requests": 0, "tokens_in": 0, "tokens_out": 0})
m["requests"] += 1 m["requests"] += 1
m["tokens_in"] += tokens_in m["tokens_in"] += entry["tokens_in"]
m["tokens_out"] += tokens_out m["tokens_out"] += entry["tokens_out"]
stats["updated"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) stats["updated"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
with open(_stats_path, "w") as f: _atomic_write_json(_stats_path, stats)
json.dump(stats, f, indent=2)
def _record_usage(provider, model, success, duration_s, tokens_in=0, tokens_out=0, error_type=None):
global _stats_flush_timer
entry = {
"provider": provider or "unknown", "model": model or "unknown",
"success": bool(success), "duration_s": float(duration_s or 0),
"tokens_in": int(tokens_in or 0), "tokens_out": int(tokens_out or 0),
"error_type": error_type, "ts": time.time(),
}
with _stats_lock:
_stats_pending.append(entry)
if _stats_flush_timer is None:
_stats_flush_timer = threading.Timer(_STATS_FLUSH_INTERVAL, _flush_stats)
_stats_flush_timer.daemon = True
_stats_flush_timer.start()
def store_response(resp_id, input_data, output_items): def store_response(resp_id, input_data, output_items):
if not resp_id: if not resp_id:
return return
_response_store[resp_id] = {"input": input_data, "output": output_items} with _response_store_lock:
if len(_response_store) > _MAX_STORED: _response_store[resp_id] = {"input": input_data, "output": output_items, "ts": time.time()}
oldest = list(_response_store.keys())[0] while len(_response_store) > _MAX_STORED:
del _response_store[oldest] _response_store.popitem(last=False)
def resolve_previous_response(body): def resolve_previous_response(body):
prev_id = body.get("previous_response_id") prev_id = body.get("previous_response_id")
input_data = body.get("input", "") input_data = body.get("input", "")
if not prev_id or prev_id not in _response_store: if not prev_id:
return input_data
with _response_store_lock:
stored = _response_store.get(prev_id)
if not stored:
return input_data return input_data
stored = _response_store[prev_id]
prev_input = stored["input"] prev_input = stored["input"]
prev_output = stored["output"] prev_output = stored["output"]
new_input = input_data if isinstance(input_data, list) else [] new_input = input_data if isinstance(input_data, list) else []
@@ -983,18 +1047,60 @@ def _log_resp(resp_id, status, output):
except Exception: except Exception:
pass pass
class ConnectionTracker:
def __enter__(self):
global _active_connections
with _active_connections_lock:
_active_connections += 1
def __exit__(self, *a):
global _active_connections
with _active_connections_lock:
_active_connections -= 1
def _handle_shutdown_signal(signum, frame):
global _shutdown_requested
_shutdown_requested = True
print("[proxy] shutdown requested; draining connections", file=sys.stderr)
def _drain():
deadline = time.time() + 5
while time.time() < deadline:
with _active_connections_lock:
if _active_connections == 0:
break
time.sleep(0.1)
if SERVER is not None:
SERVER.shutdown()
threading.Thread(target=_drain, daemon=True).start()
def _upstream_timeout(body, stream):
input_data = body.get("input", "")
n_items = len(input_data) if isinstance(input_data, list) else 1
has_tools = bool(body.get("tools"))
if stream:
return min((180 if has_tools else 120) + n_items * 2, 300)
return min(60 + n_items * 2, 120)
class Handler(http.server.BaseHTTPRequestHandler): class Handler(http.server.BaseHTTPRequestHandler):
protocol_version = "HTTP/1.1" protocol_version = "HTTP/1.1"
def do_GET(self): def do_GET(self):
if self.path in ("/v1/models", "/models"): if self.path in ("/v1/models", "/models"):
self.send_json(200, {"object": "list", "data": MODELS}) self.send_json(200, {"object": "list", "data": MODELS})
elif self.path in ("/health", "/v1/health"):
self.send_json(200, {"ok": True, "backend": BACKEND,
"target_url": TARGET_URL,
"models": [m.get("id") for m in MODELS],
"bgp_routes": len(BGP_ROUTES)})
else: else:
self.send_error(404) self.send_error(404)
def do_POST(self): def do_POST(self):
if _shutdown_requested:
return self.send_json(503, {"error": {"type": "proxy_shutting_down",
"message": "Proxy is shutting down"}})
if self.path in ("/v1/responses", "/responses"): if self.path in ("/v1/responses", "/responses"):
self._handle() with ConnectionTracker():
self._handle()
else: else:
self.send_error(404) self.send_error(404)
@@ -1082,7 +1188,7 @@ class Handler(http.server.BaseHTTPRequestHandler):
for attempt in range(max_retries + 1): for attempt in range(max_retries + 1):
req = urllib.request.Request(target, data=chat_body_b, headers=fwd) req = urllib.request.Request(target, data=chat_body_b, headers=fwd)
try: try:
upstream = urllib.request.urlopen(req, timeout=180) upstream = urllib.request.urlopen(req, timeout=_upstream_timeout(body, stream))
except urllib.error.HTTPError as e: except urllib.error.HTTPError as e:
err_body = e.read().decode() err_body = e.read().decode()
if e.code in (429, 502, 503) and attempt < max_retries: if e.code in (429, 502, 503) and attempt < max_retries:
@@ -1163,7 +1269,7 @@ class Handler(http.server.BaseHTTPRequestHandler):
route_ok = False route_ok = False
for attempt in range(3): for attempt in range(3):
try: try:
upstream = urllib.request.urlopen(req, timeout=180) upstream = urllib.request.urlopen(req, timeout=_upstream_timeout(body, stream))
print(f"[bgp] route '{route.get('name', r_url)}' connected OK", file=sys.stderr) print(f"[bgp] route '{route.get('name', r_url)}' connected OK", file=sys.stderr)
self._forward_oa_compat(upstream, stream, r_model, chat_body, body, input_data, fwd, target) self._forward_oa_compat(upstream, stream, r_model, chat_body, body, input_data, fwd, target)
return return
@@ -1284,7 +1390,7 @@ class Handler(http.server.BaseHTTPRequestHandler):
def _forward_oa_compat_retry(self, req, model, chat_body, body, input_data): def _forward_oa_compat_retry(self, req, model, chat_body, body, input_data):
try: try:
upstream = urllib.request.urlopen(req, timeout=180) upstream = urllib.request.urlopen(req, timeout=_upstream_timeout(body, True))
except Exception as e: except Exception as e:
print(f"[crof-adaptive] retry failed: {e}", file=sys.stderr) print(f"[crof-adaptive] retry failed: {e}", file=sys.stderr)
return return
@@ -1427,7 +1533,7 @@ class Handler(http.server.BaseHTTPRequestHandler):
if stream: if stream:
try: try:
upstream = urllib.request.urlopen(req, timeout=180) upstream = urllib.request.urlopen(req, timeout=_upstream_timeout(body, True))
except urllib.error.HTTPError as e: except urllib.error.HTTPError as e:
err = e.read().decode() err = e.read().decode()
return self.send_json(e.code, {"error": {"type": "upstream_error", "message": err}}) return self.send_json(e.code, {"error": {"type": "upstream_error", "message": err}})
@@ -1461,7 +1567,7 @@ class Handler(http.server.BaseHTTPRequestHandler):
store_response(last_resp_id, body.get("input", ""), last_output) store_response(last_resp_id, body.get("input", ""), last_output)
else: else:
try: try:
upstream = urllib.request.urlopen(req, timeout=180) upstream = urllib.request.urlopen(req, timeout=_upstream_timeout(body, False))
except urllib.error.HTTPError as e: except urllib.error.HTTPError as e:
err = e.read().decode() err = e.read().decode()
return self.send_json(e.code, {"error": {"type": "upstream_error", "message": err}}) return self.send_json(e.code, {"error": {"type": "upstream_error", "message": err}})
@@ -1478,7 +1584,7 @@ class Handler(http.server.BaseHTTPRequestHandler):
def _forward(self, req, stream, model, nonstream_fn, stream_fn, input_data=None): def _forward(self, req, stream, model, nonstream_fn, stream_fn, input_data=None):
try: try:
upstream = urllib.request.urlopen(req, timeout=180) upstream = urllib.request.urlopen(req, timeout=_upstream_timeout({}, stream))
except urllib.error.HTTPError as e: except urllib.error.HTTPError as e:
err = e.read().decode() err = e.read().decode()
return self.send_json(e.code, {"error": {"type": "upstream_error", "message": err}}) return self.send_json(e.code, {"error": {"type": "upstream_error", "message": err}})
@@ -1533,17 +1639,54 @@ class Handler(http.server.BaseHTTPRequestHandler):
self.end_headers() self.end_headers()
self.wfile.write(body) self.wfile.write(body)
def stream_buffered_events(self, event_iter, flush_interval=0.03, max_bytes=4096):
buf = bytearray()
last_flush = time.monotonic()
def _flush():
nonlocal buf, last_flush
if buf:
self.wfile.write(buf)
self.wfile.flush()
buf.clear()
last_flush = time.monotonic()
for event in event_iter:
encoded = event.encode("utf-8") if isinstance(event, str) else event
buf.extend(encoded)
urgent = ("response.completed" in event or "response.output_text.done" in event
or "response.output_item.done" in event
or "function_call_arguments.done" in event)
if urgent or len(buf) >= max_bytes or time.monotonic() - last_flush >= flush_interval:
_flush()
_flush()
def log_message(self, fmt, *args): def log_message(self, fmt, *args):
msg = fmt % args if args else fmt msg = fmt % args if args else fmt
print(f"[translate-proxy] {BACKEND} {msg}", file=sys.stderr) print(f"[translate-proxy] {BACKEND} {msg}", file=sys.stderr)
if __name__ == "__main__": def main():
class ReusableHTTPServer(http.server.HTTPServer): global SERVER
_init_runtime()
signal.signal(signal.SIGTERM, _handle_shutdown_signal)
signal.signal(signal.SIGINT, _handle_shutdown_signal)
try:
from http.server import ThreadingHTTPServer as _BaseSrv
except ImportError:
class _BaseSrv(socketserver.ThreadingMixIn, http.server.HTTPServer):
daemon_threads = True
class ReusableHTTPServer(_BaseSrv):
allow_reuse_address = True allow_reuse_address = True
server = ReusableHTTPServer(("127.0.0.1", PORT), Handler) daemon_threads = True
request_queue_size = 64
SERVER = ReusableHTTPServer(("127.0.0.1", PORT), Handler)
print(f"translate-proxy ({BACKEND}) listening on http://127.0.0.1:{PORT}", flush=True) print(f"translate-proxy ({BACKEND}) listening on http://127.0.0.1:{PORT}", flush=True)
print(f"Target: {TARGET_URL}", flush=True) print(f"Target: {TARGET_URL}", flush=True)
print(f"Models: {[m['id'] for m in MODELS]}", flush=True) print(f"Models: {[m['id'] for m in MODELS]}", flush=True)
if BGP_ROUTES: if BGP_ROUTES:
print(f"BGP routes: {len(BGP_ROUTES)} ({[r.get('name','?') for r in BGP_ROUTES]})", flush=True) print(f"BGP routes: {len(BGP_ROUTES)} ({[r.get('name','?') for r in BGP_ROUTES]})", flush=True)
server.serve_forever() try:
SERVER.serve_forever()
finally:
_flush_stats()
if __name__ == "__main__":
main()