#!/usr/bin/env python3 """Codex Launcher GUI — manage endpoints, launch Desktop or CLI with any provider.""" import gi gi.require_version("Gtk", "3.0") from gi.repository import Gtk, GLib import subprocess, os, signal, sys, threading, time, json, urllib.request, urllib.parse, urllib.error, tempfile, shutil import hashlib, socket, ssl, contextlib, re import base64, secrets from pathlib import Path HOME = Path.home() START_SH = Path("/opt/codex-desktop/start.sh") CONFIG = HOME / ".codex/config.toml" CONFIG_BAK = HOME / ".codex/config.toml.launcher-bak" CLEANUP = HOME / ".local/bin/cleanup-codex-stale.sh" PROXY = HOME / ".local/bin/translate-proxy.py" ENDPOINTS_FILE = HOME / ".codex/endpoints.json" BGP_POOLS_FILE = HOME / ".codex/bgp-pools.json" LOG_DIR = HOME / ".cache/codex-desktop" LAUNCH_LOG = LOG_DIR / "launcher.log" PROXY_CONFIG_DIR = HOME / ".cache/codex-proxy" DEFAULT_CONFIG = """model = "" model_provider = "" model_catalog_json = "" """ CHANGELOG = [ ("2.6.1", "2026-05-20", [ "Google OAuth rebuilt to emulate Gemini CLI — no client_secret.json needed", "Uses Google's public OAuth client_id (same as gemini-cli)", "PKCE + CSRF state protection for secure auth", "Just click OAuth Login → browser opens → authorize → done", "Includes cloud-platform scope for Gemini Code Assist compatibility", ]), ("2.6.0", "2026-05-20", [ "Usage Dashboard — per-provider request/token/latency tracking", "Visual cards with success rate bars, model breakdown, error tracking", "Google OAuth: browse for client_secret.json instead of fixed path", ]), ("2.5.1", "2026-05-20", [ "Adaptive retry for 429/502/503 errors with exponential backoff", "BGP routes also retry transient errors before failing over", "Proxy socket reuse — no more 'Address already in use' crashes", "BGP route count shown at proxy startup", ]), ("2.5.0", "2026-05-20", [ "AI BGP — multi-provider routing with automatic failover", "Create BGP pools with ordered routes from any configured endpoint", "Each route uses its own endpoint URL, API key, and model", "Failover strategy: tries primary, falls back on error/timeout", "BGP pools appear in endpoint dropdown with shuffle icon", "Up/down reordering for route priority in pool editor", "Fixed TOML config breakage from multi-line paste in fields", ]), ("2.4.0", "2026-05-20", [ "Added OpenAdapter provider preset (api.openadapter.in)", "One API key access to 40+ models — GLM, DeepSeek, Kimi, Qwen, Claude, GPT, Gemini", "Fixed Add/Edit dialog crash (missing _on_reasoning_toggled method)", "Redesigned Google OAuth flow with live status dialog", ]), ("2.3.2", "2026-05-20", [ "Added Google Gemini provider with OAuth support", "Two presets: 'Google Gemini (API Key)' and 'Google Gemini (OAuth)'", "OAuth Login button in endpoint editor — full Google OAuth2 flow with auto-refresh", "Auto-refreshes OAuth access tokens when expired (no manual re-login needed)", "Supports gemini-2.5-flash, gemini-2.5-pro, gemini-2.0-flash, and more", "Uses Gemini's OpenAI-compatible endpoint — works with existing proxy", ]), ("2.3.0", "2026-05-20", [ "Adaptive Crof self-healing system — auto-adjusts to Crof model limits", "Tracks per-model success/failure history, learns item count limits dynamically", "Proactively compacts input when above learned limit before sending to Crof", "Auto-retries on finish_reason=length — aggressively compacts and resends", "Prevents 'stream disconnected' and 'incomplete' errors on long conversations", ]), ("2.2.1", "2026-05-20", [ "Fixed compaction orphaning function_call_output items — root cause of Crof incomplete responses", "Compaction now respects function_call/function_call_output pairs — no more dangling tool results", "Fixed reasoning control: reasoning_effort=none now always sends enable_thinking=false too", ]), ("2.2.0", "2026-05-20", [ "Added per-provider Reasoning On/Off toggle in endpoint editor", "Added Reasoning Effort level per provider: None, Minimal, Low, Medium, High, Max", "When reasoning is OFF: sends enable_thinking=false + reasoning_effort=none to upstream API", "When reasoning is ON: sends user-selected effort level (default: Medium)", "Fixes Crof mimo-v2.5-pro and similar reasoning models exhausting output tokens", "Strip reasoning_content from proxy output — Codex doesn't use it", "Force max_tokens=64000 minimum for openai-compat providers", ]), ("2.1.3", "2026-05-19", [ "Fixed Crof mimo-v2.5-pro stopping: reasoning_content exhausted all output tokens", "Strip reasoning_content from proxy output — Codex doesn't use it, avoids token waste", "Force max_tokens=64000 minimum for openai-compat providers — gives models room for both reasoning and content", ]), ("2.1.2", "2026-05-19", [ "Fixed Crof.ai and providers stopping after first tool call (root cause: None tool IDs)", "Codex sends function_call items with id=None — proxy now matches tool results to calls by position", "Fixed orphan message output item when response has only tool calls (no text)", "Auto-trims long conversations (>30 items) to prevent context overflow on providers like Crof", "Added request/response logging to ~/.cache/codex-proxy/requests.log", ]), ("2.1.1", "2026-05-19", [ "Fixed proxy: map 'developer' role to 'system' for Chat Completions providers", "Fixed proxy: map 'developer' role to 'user' for Anthropic providers", "Forward 'instructions' field from Responses API as system message/param", "Fixes DeepSeek and other providers rejecting unknown 'developer' role", ]), ("2.1.0", "2026-05-19", [ "Added Codex auth status detection (codex login status)", "Added Re-login button to re-authenticate via codex login", "Auto-checks auth before launching Codex Default mode", "Warns if OAuth token expired or missing before launch", ]), ("2.0.1", "2026-05-19", [ "Added Codex CLI/Desktop installation verifier to main page", "Disables Desktop/CLI launch buttons when corresponding tool is missing", "Shows install instructions in status area on startup", ]), ("2.0.0", "2026-05-19", [ "Initial release: multi-provider Codex Launcher", "Translation proxy: Responses API to Chat Completions + Anthropic Messages", "GTK endpoint manager with 10+ provider presets", "Codex Default mode (built-in OAuth, zero config)", "Browser UA injection for Cloudflare-protected providers (OpenCode)", "Streaming SSE, tool calls, reasoning content support", "Profile backup/import, model auto-fetch, bulk import", "Refresh Models in background thread", "URL normalization to prevent double-path bugs", "Config backup/restore around sessions", ".deb installer package", ]), ] PROVIDER_PRESETS = { "Custom": { "backend_type": "openai-compat", "base_url": "", "models": [], }, "OpenAI": { "backend_type": "native", "base_url": "https://api.openai.com/v1", "models": ["gpt-4o", "gpt-4o-mini"], }, "Anthropic": { "backend_type": "anthropic", "base_url": "https://api.anthropic.com/v1", "models": ["claude-sonnet-4-5", "claude-3-5-haiku-latest"], }, "OpenCode Zen (OpenAI-compatible)": { "backend_type": "openai-compat", "base_url": "https://opencode.ai/zen/v1", "models": [ "glm-5.1", "glm-5", "kimi-k2.5", "kimi-k2.6", "minimax-m2.7", "minimax-m2.5", "minimax-m2.5-free", "deepseek-v4-flash-free", "nemotron-3-super-free", "qwen3.6-plus", "qwen3.5-plus", "big-pickle", ], }, "OpenCode Zen (Anthropic)": { "backend_type": "anthropic", "base_url": "https://opencode.ai/zen/v1", "models": [ "claude-opus-4-7", "claude-opus-4-6", "claude-opus-4-5", "claude-opus-4-1", "claude-sonnet-4-6", "claude-sonnet-4-5", "claude-sonnet-4", "claude-haiku-4-5", "claude-3-5-haiku", ], }, "OpenCode Go (OpenAI-compatible)": { "backend_type": "openai-compat", "base_url": "https://opencode.ai/zen/go/v1", "models": [ "glm-5.1", "glm-5", "kimi-k2.5", "kimi-k2.6", "mimo-v2.5", "mimo-v2.5-pro", "minimax-m2.7", "minimax-m2.5", "qwen3.6-plus", "qwen3.5-plus", "deepseek-v4-pro", "deepseek-v4-flash", ], }, "OpenCode Go (Anthropic)": { "backend_type": "anthropic", "base_url": "https://opencode.ai/zen/go/v1", "models": ["minimax-m2.7", "minimax-m2.5"], }, "Crof.ai": { "backend_type": "openai-compat", "base_url": "https://crof.ai/v1", "models": [], }, "NVIDIA NIM": { "backend_type": "openai-compat", "base_url": "https://integrate.api.nvidia.com/v1", "models": [], }, "Kilo.ai Gateway": { "backend_type": "openai-compat", "base_url": "https://api.kilo.ai/api/gateway", "models": [], }, "Command Code": { "backend_type": "command-code", "base_url": "https://api.commandcode.ai", "cc_version": "0.26.8", "models": [ "deepseek/deepseek-v4-flash", "deepseek/deepseek-v4-pro", "anthropic:claude-sonnet-4-6", "anthropic:claude-haiku-4-5-20251001", "anthropic:claude-opus-4-7", "anthropic:claude-opus-4-6", "openai:gpt-5.5", "openai:gpt-5.4", "openai:gpt-5.4-mini", "openai:gpt-5.3-codex", "moonshotai/Kimi-K2.6", "moonshotai/Kimi-K2.5", "zai-org/GLM-5.1", "zai-org/GLM-5", "MiniMaxAI/MiniMax-M2.7", "MiniMaxAI/MiniMax-M2.5", "Qwen/Qwen3.6-Max-Preview", "Qwen/Qwen3.6-Plus", "stepfun/Step-3.5-Flash", "google/gemini-3.1-flash-lite", ], }, "OpenRouter": { "backend_type": "openai-compat", "base_url": "https://openrouter.ai/api/v1", "models": [], }, "Google Gemini (API Key)": { "backend_type": "openai-compat", "base_url": "https://generativelanguage.googleapis.com/v1beta/openai", "models": [ "gemini-2.5-flash", "gemini-2.5-pro", "gemini-2.0-flash", "gemini-2.0-flash-lite", "gemini-2.5-flash-preview-native-audio-dialog", ], }, "Google Gemini (OAuth)": { "backend_type": "gemini-oauth-cli", "base_url": "https://cloudcode-pa.googleapis.com", "oauth_provider": "google-cli", "models": [ "gemini-2.5-flash", "gemini-2.5-pro", ], }, "Google Antigravity (OAuth)": { "backend_type": "gemini-oauth-antigravity", "base_url": "https://daily-cloudcode-pa.sandbox.googleapis.com", "oauth_provider": "google-antigravity", "models": [ "antigravity-gemini-3-flash", "antigravity-gemini-3-pro", "antigravity-gemini-3.1-pro", "antigravity-claude-sonnet-4-6", "antigravity-claude-opus-4-6-thinking", "gemini-2.5-flash", "gemini-2.5-pro", "gemini-3-flash-preview", "gemini-3-pro-preview", "gemini-3.1-pro-preview", ], }, "OpenAdapter": { "backend_type": "openai-compat", "base_url": "https://api.openadapter.in/v1", "models": [ "0G-DeepSeek-V3", "0G-DeepSeek-v4-Pro", "0G-GLM-5", "0G-GLM-5.1", "0G-Qwen3.6", "0G-Qwen-VL", ], }, "Z.ai Coding": { "backend_type": "openai-compat", "base_url": "https://api.z.ai/api/coding/paas/v4", "models": [ "glm-5.1", "glm-4.7", "GLM-4-Plus", "GLM-4-Long", "GLM-4-Flash", "GLM-4-FlashX", "GLM-Z1-Flash", ], }, } def safe_name(name): base = "".join(ch if ch.isalnum() or ch in "._-" else "_" for ch in name).strip("._-") or "endpoint" digest = hashlib.sha1(name.encode("utf-8")).hexdigest()[:8] return f"{base}-{digest}" def label_for_backend(backend_type): return { "openai-compat": "OpenAI-compatible", "anthropic": "Anthropic", "command-code": "Command Code", "native": "Native", }.get(backend_type, backend_type) def normalize_model_id(text): value = text.strip().lower() if not value: return "" value = value.replace("/", "-") value = value.replace("+", "plus") value = "".join(ch if ch.isalnum() or ch in ".-" else "-" for ch in value) while "--" in value: value = value.replace("--", "-") return value.strip("-.") def normalize_base_url(url): base = (url or "").strip().rstrip("/") for suffix in ("/chat/completions", "/responses", "/messages"): if base.endswith(suffix): base = base[: -len(suffix)] break return base.rstrip("/") def parse_model_list(text): out = [] seen = set() for raw in text.replace(",", "\n").splitlines(): mid = normalize_model_id(raw) if mid and mid not in seen: seen.add(mid) out.append(mid) return out def apply_provider_preset(endpoint, preset_name): preset = PROVIDER_PRESETS.get(preset_name) if not preset: return endpoint updated = dict(endpoint) updated["provider_preset"] = preset_name updated["backend_type"] = preset["backend_type"] updated["base_url"] = normalize_base_url(preset["base_url"]) if preset.get("cc_version") and not updated.get("cc_version"): updated["cc_version"] = preset["cc_version"] if not updated.get("models") or (preset.get("backend_type") or "").startswith("gemini-oauth"): updated["models"] = list(preset.get("models", [])) if preset.get("oauth_provider"): updated["oauth_provider"] = preset["oauth_provider"] if not updated.get("default_model") and updated.get("models"): updated["default_model"] = updated["models"][0] return updated def _doctor_check_streaming(base_url, key, bt, model, add): if bt == "anthropic": test_url = f"{base_url}/v1/messages" headers = {"x-api-key": key, "anthropic-version": "2023-06-01", "content-type": "application/json"} body = json.dumps({"model": model or "claude-3-5-haiku-20241022", "max_tokens": 1, "stream": True, "messages": [{"role": "user", "content": "hi"}]}).encode() else: test_url = f"{base_url}/chat/completions" headers = {"Authorization": f"Bearer {key}", "content-type": "application/json"} body = json.dumps({"model": model, "max_tokens": 1, "stream": True, "messages": [{"role": "user", "content": "hi"}]}).encode() try: req = urllib.request.Request(test_url, data=body, headers=headers, method="POST") t0 = time.time() resp = urllib.request.urlopen(req, timeout=20) content_type = resp.headers.get("content-type", "") first_chunk = resp.read(512) lat = (time.time() - t0) * 1000 is_sse = "text/event-stream" in content_type or first_chunk.startswith(b"data:") if is_sse: add("Streaming support", True, f"SSE OK in {lat:.0f}ms") else: add("Streaming support", False, f"Expected SSE, got {content_type[:60]}") except urllib.error.HTTPError as e: body_text = "" try: body_text = e.read(200).decode(errors="replace") except Exception: pass if e.code == 429: add("Streaming support", None, "Rate limited (skipped)") elif e.code in (400, 404, 422): add("Streaming support", False, f"HTTP {e.code}: {body_text[:80]}") else: add("Streaming support", False, f"HTTP {e.code}") except Exception as e: add("Streaming support", False, str(e)[:100]) def _doctor_check_toolcall(base_url, key, bt, model, add): tool = {"type": "function", "function": {"name": "test_tool", "parameters": {"type": "object", "properties": {"x": {"type": "string"}}}}} if bt == "anthropic": test_url = f"{base_url}/v1/messages" headers = {"x-api-key": key, "anthropic-version": "2023-06-01", "content-type": "application/json"} body = json.dumps({"model": model or "claude-3-5-haiku-20241022", "max_tokens": 50, "stream": False, "tools": [tool], "messages": [{"role": "user", "content": "Use the test_tool with x=hello"}]}).encode() else: test_url = f"{base_url}/chat/completions" headers = {"Authorization": f"Bearer {key}", "content-type": "application/json"} body = json.dumps({"model": model, "max_tokens": 50, "stream": False, "tools": [tool], "messages": [{"role": "user", "content": "Use the test_tool with x=hello"}]}).encode() try: req = urllib.request.Request(test_url, data=body, headers=headers, method="POST") t0 = time.time() resp = urllib.request.urlopen(req, timeout=30) raw = resp.read() lat = (time.time() - t0) * 1000 payload = json.loads(raw) has_tools = False if bt == "anthropic": for block in (payload.get("content") or []): if block.get("type") == "tool_use": has_tools = True break else: choices = payload.get("choices") or [] for ch in choices: if (ch.get("message", {}).get("tool_calls")): has_tools = True break if has_tools: add("Tool-call support", True, f"Tool call received in {lat:.0f}ms") else: add("Tool-call support", None, f"Responded but no tool_call ({lat:.0f}ms)") except urllib.error.HTTPError as e: if e.code == 429: add("Tool-call support", None, "Rate limited (skipped)") elif e.code in (400, 404, 422): err_body = "" try: err_body = e.read(200).decode(errors="replace") except Exception: pass add("Tool-call support", False, f"HTTP {e.code}: {err_body[:80]}") else: add("Tool-call support", False, f"HTTP {e.code}") except Exception as e: add("Tool-call support", False, str(e)[:100]) def run_endpoint_doctor(endpoint): """Comprehensive health checks for an endpoint. Returns [(name, ok, detail), ...]. ok: True=pass, False=fail, None=warn/skip.""" checks = [] def add(name, ok, detail=""): checks.append((name, ok, detail)) url = normalize_base_url(endpoint.get("base_url") or "") key = (endpoint.get("api_key") or "").strip() bt = endpoint.get("backend_type", "openai-compat") model = endpoint.get("default_model") or endpoint.get("models", [""])[0] if endpoint.get("models") else "" # 1. URL format parsed = urllib.parse.urlparse(url) has_url = bool(parsed.scheme and parsed.netloc) add("URL format", has_url, url if has_url else "Missing scheme or host") if not has_url: return checks host = parsed.hostname port = parsed.port or (443 if parsed.scheme == "https" else 80) # 2. DNS resolution try: t0 = time.time() addrs = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM) dns_ms = (time.time() - t0) * 1000 add("DNS resolution", True, f"{addrs[0][4][0]} ({dns_ms:.0f}ms)") except socket.gaierror as e: add("DNS resolution", False, str(e)) return checks # 3. TCP/TLS connection try: t0 = time.time() sock = socket.create_connection((host, port), timeout=10) tcp_ms = (time.time() - t0) * 1000 if parsed.scheme == "https": ctx = ssl.create_default_context() try: ssock = ctx.wrap_socket(sock, server_hostname=host) tls_ms = (time.time() - t0) * 1000 add("TLS connection", True, f"TCP {tcp_ms:.0f}ms + handshake {tls_ms:.0f}ms") ssock.close() except ssl.SSLError as e: add("TLS certificate", False, str(e)[:120]) sock.close() return checks else: add("TCP connection", True, f"{tcp_ms:.0f}ms") sock.close() except (socket.timeout, ConnectionRefusedError, OSError) as e: add("TCP connection", False, str(e)[:100]) return checks # 4. Auth + /models (backend-aware) if bt == "anthropic": add("/models endpoint", None, "Anthropic has no /models endpoint — testing via /messages") try: t0 = time.time() msg_url = f"{url}/v1/messages" body = json.dumps({"model": model or "claude-3-5-haiku-20241022", "max_tokens": 1, "messages": [{"role": "user", "content": "hi"}]}).encode() req = urllib.request.Request(msg_url, data=body, headers={ "x-api-key": key, "anthropic-version": "2023-06-01", "content-type": "application/json", }, method="POST") urllib.request.urlopen(req, timeout=15) lat = (time.time() - t0) * 1000 add("Auth valid", True, f"Responded in {lat:.0f}ms") except urllib.error.HTTPError as e: if e.code in (401, 403): add("Auth valid", False, f"HTTP {e.code} — check API key") elif e.code == 400: add("Auth valid", True, "Authenticated (model or param error)") else: add("Auth valid", False, f"HTTP {e.code}") except Exception as e: add("Auth valid", False, str(e)[:100]) elif bt.startswith("gemini-oauth"): token_name = "google-antigravity-oauth-token.json" if "antigravity" in bt else "google-cli-oauth-token.json" token_path = Path.home() / f".cache/codex-proxy/{token_name}" if token_path.exists(): try: td = json.loads(token_path.read_text()) exp = td.get("expires_at", 0) if exp > time.time(): remaining = exp - time.time() add("OAuth token", True, f"Valid ({remaining / 60:.0f} min remaining)") else: add("OAuth token", False, "Token expired — re-login required") except Exception as e: add("OAuth token", False, str(e)[:80]) else: add("OAuth token", False, f"No token file ({token_name})") try: t0 = time.time() ids, err = fetch_models_for_endpoint(endpoint) lat = (time.time() - t0) * 1000 if ids: add("Network reachable", True, f"{lat:.0f}ms") add("/models endpoint", True, f"{len(ids)} models ({lat:.0f}ms)") if model: add("Selected model exists", model in ids, model if model in ids else f"'{model}' not in {ids[:5]}...") elif err and ("401" in str(err) or "403" in str(err)): add("Network reachable", True, f"{lat:.0f}ms") add("Auth valid", False, str(err)[:100]) else: add("Network reachable", False, str(err or "no response")[:100]) except Exception as e: add("Network", False, str(e)[:100]) else: try: t0 = time.time() ids, err = fetch_models_for_endpoint(endpoint) lat = (time.time() - t0) * 1000 if ids: add("Network reachable", True, f"{lat:.0f}ms") add("Auth valid", True) add("/models endpoint", True, f"{len(ids)} models ({lat:.0f}ms)") if model: add("Selected model exists", model in ids, model if model in ids else f"'{model}' not found in {len(ids)} models") else: add("Selected model", False, "No model selected") elif err and ("401" in str(err) or "403" in str(err)): add("Network reachable", True, f"{lat:.0f}ms") add("Auth valid", False, f"HTTP 401/403 — check API key") elif err and "429" in str(err): add("Network reachable", True, f"{lat:.0f}ms") add("Auth valid", True, "Authenticated but rate-limited") add("/models endpoint", None, "Rate limited — skipped") else: add("Network reachable", False, str(err or "no response")[:100]) except Exception as e: add("Network", False, str(e)[:100]) # 5. Streaming smoke test if bt not in ("native", "command-code"): _doctor_check_streaming(url, key, bt, model, add) # 6. Tool-call support test if bt not in ("native", "command-code"): _doctor_check_toolcall(url, key, bt, model, add) return checks def _show_doctor_results(parent, endpoint_name, checks): dlg = Gtk.Dialog(title=f"Doctor: {endpoint_name}", parent=parent, modal=True) dlg.add_button("Close", Gtk.ResponseType.CLOSE) dlg.set_default_size(480, 400) area = dlg.get_content_area() area.set_margin_start(12) area.set_margin_end(12) area.set_margin_top(12) area.set_margin_bottom(12) area.set_spacing(4) passed = sum(1 for _, ok, _ in checks if ok is True) failed = sum(1 for _, ok, _ in checks if ok is False) warned = sum(1 for _, ok, _ in checks if ok is None) hdr = Gtk.Label() hdr.set_markup(f'{endpoint_name} ' f'{passed} passed ' f'{failed} failed ' f'{warned} warnings') area.pack_start(hdr, False, False, 6) sep = Gtk.Separator() area.pack_start(sep, False, False, 4) for name, ok, detail in checks: row = Gtk.Box(spacing=6) if ok is True: color, sym = "#27ae60", "\u2713" elif ok is False: color, sym = "#e74c3c", "\u2717" else: color, sym = "#f39c12", "\u25CB" icon = Gtk.Label() icon.set_markup(f'{sym}') row.pack_start(icon, False, False, 0) lbl = Gtk.Label() lbl.set_markup(f'{name}') row.pack_start(lbl, False, False, 0) if detail: det = Gtk.Label() det.set_markup(f'{detail}') det.set_line_wrap(True) row.pack_end(det, False, False, 0) area.pack_start(row, False, False, 2) dlg.show_all() dlg.run() dlg.destroy() def endpoint_models_url(endpoint): base = normalize_base_url(endpoint.get("base_url") or "") if not base: return "" return f"{base}/models" def endpoint_model_headers(endpoint): key = (endpoint.get("api_key") or "").strip() backend = endpoint.get("backend_type", "openai-compat") headers = {} if backend == "anthropic": if key: headers["x-api-key"] = key headers["anthropic-version"] = "2023-06-01" elif key: headers["Authorization"] = f"Bearer {key}" return headers def fetch_models_for_endpoint(endpoint, timeout=10): url = endpoint_models_url(endpoint) if not url: return None, "Base URL is empty" try: req = urllib.request.Request(url, headers=endpoint_model_headers(endpoint)) raw = urllib.request.urlopen(req, timeout=timeout).read() payload = json.loads(raw) items = payload.get("data") or payload.get("models") or [] ids = [] seen = set() for item in items: mid = item.get("id") if isinstance(item, dict) else None if mid and mid not in seen: seen.add(mid) ids.append(mid) if not ids: return None, "No models returned" return ids, None except Exception as e: return None, str(e) def refresh_endpoint_models(endpoint): ids, err = fetch_models_for_endpoint(endpoint) if not ids: return None, err updated = dict(endpoint) updated["models"] = ids if updated.get("default_model") not in ids: updated["default_model"] = ids[0] return updated, None # ═══════════════════════════════════════════════════════════════════ # Endpoint storage # ═══════════════════════════════════════════════════════════════════ def load_endpoints(): if ENDPOINTS_FILE.exists(): try: return json.loads(ENDPOINTS_FILE.read_text()) except Exception: pass return {"default": None, "endpoints": []} def save_endpoints(data): ENDPOINTS_FILE.parent.mkdir(parents=True, exist_ok=True) ENDPOINTS_FILE.write_text(json.dumps(data, indent=2)) def load_bgp_pools(): if BGP_POOLS_FILE.exists(): try: return json.loads(BGP_POOLS_FILE.read_text()) except Exception: pass return {"pools": []} def save_bgp_pools(data): BGP_POOLS_FILE.parent.mkdir(parents=True, exist_ok=True) BGP_POOLS_FILE.write_text(json.dumps(data, indent=2)) def get_endpoint(name): for e in load_endpoints()["endpoints"]: if e["name"] == name: return e return None def now_utc_iso(): return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) def build_profile_bundle(): return { "version": 1, "exported_at": now_utc_iso(), "endpoints": load_endpoints(), "codex_config_toml": CONFIG.read_text(encoding="utf-8") if CONFIG.exists() else "", } def save_profile_bundle(path): bundle = build_profile_bundle() Path(path).write_text(json.dumps(bundle, indent=2), encoding="utf-8") def import_profile_bundle(path): data = json.loads(Path(path).read_text(encoding="utf-8")) if not isinstance(data, dict): raise ValueError("Invalid profile bundle") endpoints = data.get("endpoints") if not isinstance(endpoints, dict) or "endpoints" not in endpoints: raise ValueError("Profile bundle missing endpoints") # Keep a local rollback point before overwriting the current profile. if CONFIG.exists(): shutil.copy2(str(CONFIG), str(CONFIG_BAK)) if ENDPOINTS_FILE.exists(): shutil.copy2(str(ENDPOINTS_FILE), str(ENDPOINTS_FILE.with_suffix(".json.import-bak"))) save_endpoints(endpoints) cfg = data.get("codex_config_toml", "") if isinstance(cfg, str) and cfg.strip(): CONFIG.parent.mkdir(parents=True, exist_ok=True) CONFIG.write_text(cfg, encoding="utf-8") return endpoints # ═══════════════════════════════════════════════════════════════════ # Config management # ═══════════════════════════════════════════════════════════════════ def backup_config(): if CONFIG.exists(): tmp = CONFIG_BAK.with_suffix(".tmp") shutil.copy2(str(CONFIG), str(tmp)) os.replace(str(tmp), str(CONFIG_BAK)) def restore_config(): if CONFIG_BAK.exists(): tmp = CONFIG.with_suffix(".tmp") shutil.copy2(str(CONFIG_BAK), str(tmp)) os.replace(str(tmp), str(CONFIG)) def write_secure_text(path, text): path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(path.suffix + ".tmp") tmp.write_text(text, encoding="utf-8") os.chmod(str(tmp), 0o600) os.replace(str(tmp), str(path)) CONFIG_TXN = HOME / ".codex/config.toml.launcher-txn.json" def begin_config_transaction(reason): txn = {"started_at": time.time(), "reason": reason, "config_existed": CONFIG.exists(), "backup_path": str(CONFIG_BAK)} if CONFIG.exists(): backup_config() CONFIG_TXN.parent.mkdir(parents=True, exist_ok=True) CONFIG_TXN.write_text(json.dumps(txn, indent=2)) def end_config_transaction(): CONFIG_TXN.unlink(missing_ok=True) def recover_config_if_needed(logfn=None): if not CONFIG_TXN.exists(): return try: txn = json.loads(CONFIG_TXN.read_text()) if txn.get("config_existed") and CONFIG_BAK.exists(): restore_config() if logfn: logfn("Recovered Codex config from interrupted session.") elif CONFIG.exists(): CONFIG.unlink() if logfn: logfn("Removed generated config from interrupted session.") finally: CONFIG_TXN.unlink(missing_ok=True) def write_config_for_native(endpoint, selected_model): """Write config for native OpenAI (no proxy needed).""" backup_config() model_catalog = _gen_model_catalog(endpoint, selected_model) mc_path = PROXY_CONFIG_DIR / f"models-{safe_name(endpoint['name'])}.json" mc_path.parent.mkdir(parents=True, exist_ok=True) mc_path.write_text(json.dumps(model_catalog, indent=2)) lines = [ f'model = "{_toml_safe(selected_model)}"\n', f'model_provider = "{_toml_safe(endpoint["name"])}"\n', f'model_catalog_json = "{mc_path}"\n', f'\n[model_providers."{endpoint["name"]}"]\n', f'name = "{_toml_safe(endpoint["name"])}"\n', f'base_url = "{_toml_safe(endpoint["base_url"])}"\n', f'experimental_bearer_token = "{_toml_safe(_resolve_secret(endpoint["api_key"]))}"\n', f'\n[profiles."{endpoint["name"]}"]\n', f'model_provider = "{_toml_safe(endpoint["name"])}"\n', f'model = "{_toml_safe(selected_model)}"\n', f'model_catalog_json = "{mc_path}"\n', f'service_tier = "default"\n', f'approvals_reviewer = "user"\n', ] write_secure_text(CONFIG, "".join(lines)) def _toml_safe(val): val = str(val).replace('"', '\\"') return val.split('\n', 1)[0].strip() def _resolve_secret(value): value = (value or "").strip() m = re.fullmatch(r"\$\{ENV:([A-Z0-9_]+)\}", value) if m: return os.environ.get(m.group(1), "") return value def write_config_for_translated(endpoint, selected_model, proxy_port=8080): backup_config() model_catalog = _gen_model_catalog(endpoint, selected_model) mc_path = PROXY_CONFIG_DIR / f"models-{safe_name(endpoint['name'])}.json" mc_path.parent.mkdir(parents=True, exist_ok=True) mc_path.write_text(json.dumps(model_catalog, indent=2)) lines = [ f'model = "{_toml_safe(selected_model)}"\n', f'model_provider = "{_toml_safe(endpoint["name"])}"\n', f'model_catalog_json = "{mc_path}"\n', f'\n[model_providers."{endpoint["name"]}"]\n', f'name = "{_toml_safe(endpoint["name"])}"\n', f'base_url = "http://127.0.0.1:{proxy_port}"\n', f'experimental_bearer_token = "codex-launcher-local"\n', f'\n[profiles."{endpoint["name"]}"]\n', f'model_provider = "{_toml_safe(endpoint["name"])}"\n', f'model = "{_toml_safe(selected_model)}"\n', f'model_catalog_json = "{mc_path}"\n', f'service_tier = "fast"\n', f'approvals_reviewer = "user"\n', ] write_secure_text(CONFIG, "".join(lines)) def _gen_model_catalog(endpoint, selected_model=None): default_model = selected_model or endpoint.get("default_model") models = [] for mid in endpoint.get("models", []): models.append({ "slug": mid, "model": mid, "display_name": mid, "description": f"{endpoint['name']} {mid}", "hidden": False, "isDefault": mid == default_model, "shell_type": "shell_command", "visibility": "list", "default_reasoning_level": "medium", "supported_reasoning_levels": [ {"effort": "low", "description": "Fast"}, {"effort": "medium", "description": "Balanced"}, {"effort": "high", "description": "Deep"}, {"effort": "xhigh", "description": "Extra deep"}, ], "supportedReasoningEfforts": [ {"reasoningEffort": "low", "description": "Fast"}, {"reasoningEffort": "medium", "description": "Balanced"}, {"reasoningEffort": "high", "description": "Deep"}, {"reasoningEffort": "xhigh", "description": "Extra deep"}, ], "priority": 30, "context_size": 128000, "additional_speed_tiers": [], "service_tiers": [], "supports_reasoning_summaries": True, "support_verbosity": True, "reasoning": True, "tool_call": True, "supports_parallel_tool_calls": True, "experimental_supported_tools": [], "supported_in_api": True, "truncation_policy": {"mode": "tokens", "limit": 128000}, "base_instructions": "You are Codex, a coding agent.", }) return {"models": models} # ═══════════════════════════════════════════════════════════════════ # Proxy management # ═══════════════════════════════════════════════════════════════════ _proxy_proc = None _proxy_port = None PID_REGISTRY = HOME / ".cache" / "codex-launcher" / "pids.json" def _pick_free_port(): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("127.0.0.1", 0)) return s.getsockname()[1] def _load_pid_registry(): if PID_REGISTRY.exists(): try: return json.loads(PID_REGISTRY.read_text()) except Exception: pass return {} def _save_pid_registry(data): PID_REGISTRY.parent.mkdir(parents=True, exist_ok=True) tmp = PID_REGISTRY.with_suffix(".tmp") tmp.write_text(json.dumps(data, indent=2)) os.replace(str(tmp), str(PID_REGISTRY)) def _register_pgid(kind, pid): data = _load_pid_registry() try: pgid = os.getpgid(pid) except ProcessLookupError: return data[kind] = {"pid": pid, "pgid": pgid, "ts": time.time()} _save_pid_registry(data) def safe_cleanup_owned(logfn=None): data = _load_pid_registry() changed = False for kind, meta in list(data.items()): pgid = meta.get("pgid") if not pgid: continue try: os.killpg(pgid, signal.SIGTERM) if logfn: logfn(f"Stopped {kind} (pgid {pgid})") changed = True except ProcessLookupError: changed = True except Exception as e: if logfn: logfn(f"Could not stop {kind}: {e}") if changed: _save_pid_registry({}) def _start_proxy_for(endpoint, logfn): global _proxy_proc, _proxy_port _stop_proxy() port = _pick_free_port() _proxy_port = port model_list = endpoint.get("models", []) if (endpoint.get("backend_type") or "").startswith("gemini-oauth") and (endpoint.get("oauth_provider") or "").startswith("google"): token_name = "google-antigravity-oauth-token.json" if endpoint.get("oauth_provider") == "google-antigravity" else "google-cli-oauth-token.json" token_path = os.path.expanduser(f"~/.cache/codex-proxy/{token_name}") try: with open(token_path) as tf: td = json.load(tf) discovered = [] if endpoint.get("oauth_provider") == "google-antigravity" else td.get("available_models", []) if discovered: model_list = discovered except Exception: pass pcfg = { "port": port, "backend_type": endpoint["backend_type"], "target_url": normalize_base_url(endpoint["base_url"]), "api_key": endpoint["api_key"], "cc_version": endpoint.get("cc_version", ""), "oauth_provider": endpoint.get("oauth_provider", ""), "reasoning_enabled": endpoint.get("reasoning_enabled", True), "reasoning_effort": endpoint.get("reasoning_effort", "medium"), "models": [{"id": m, "object": "model", "created": 1700000000, "owned_by": endpoint["name"]} for m in model_list], } pcfg_path = PROXY_CONFIG_DIR / f"proxy-{safe_name(endpoint['name'])}-{port}.json" pcfg_path.parent.mkdir(parents=True, exist_ok=True) pcfg_path.write_text(json.dumps(pcfg, indent=2)) _start_proxy_with_config(pcfg_path, port, logfn) return port def _start_proxy_with_config(pcfg_path, port, logfn): global _proxy_proc _proxy_proc = subprocess.Popen( ["python3", str(PROXY), "--config", str(pcfg_path)], stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, preexec_fn=os.setsid, text=True, ) _register_pgid("proxy", _proxy_proc.pid) def _pipe_stderr(): if not _proxy_proc.stderr: return for line in _proxy_proc.stderr: GLib.idle_add(logfn, f"[proxy] {line.rstrip()}") threading.Thread(target=_pipe_stderr, daemon=True).start() deadline = time.time() + 15 last_err = None while time.time() < deadline: if _proxy_proc.poll() is not None: raise RuntimeError(f"Proxy exited early with code {_proxy_proc.returncode}") try: urllib.request.urlopen(f"http://127.0.0.1:{port}/v1/models", timeout=2) logfn(f"Proxy ready on port {port}") return except Exception as e: last_err = e time.sleep(0.3) try: os.killpg(os.getpgid(_proxy_proc.pid), signal.SIGTERM) _proxy_proc.wait(timeout=3) except Exception: with contextlib.suppress(Exception): os.killpg(os.getpgid(_proxy_proc.pid), signal.SIGKILL) raise RuntimeError(f"Proxy failed health check on port {port}: {last_err}") def _stop_proxy(): global _proxy_proc if _proxy_proc and _proxy_proc.poll() is None: try: os.killpg(os.getpgid(_proxy_proc.pid), signal.SIGTERM) time.sleep(0.5) if _proxy_proc.poll() is None: os.killpg(os.getpgid(_proxy_proc.pid), signal.SIGKILL) except (ProcessLookupError, PermissionError): pass _proxy_proc = None def _kill_existing_desktop(logfn=None): import subprocess as _sp try: out = _sp.run(["pgrep", "-f", "/opt/codex-desktop/electron"], capture_output=True, text=True, timeout=5) pids = [p for p in out.stdout.strip().splitlines() if p.strip().isdigit()] if not pids: return main_pid = int(pids[0]) pgid = os.getpgid(main_pid) if pgid > 0: os.killpg(pgid, signal.SIGTERM) if logfn: logfn(f"Killed existing Codex Desktop (pid {main_pid}, pgid {pgid})") time.sleep(2) try: os.killpg(pgid, signal.SIGKILL) except (ProcessLookupError, PermissionError): pass except Exception as e: if logfn: logfn(f"Note: could not kill existing Desktop: {e}") def _run_cleanup(logfn=None): safe_cleanup_owned(logfn) def _last_log_lines(n=15): try: t = LAUNCH_LOG.read_text() return "\n".join(t.splitlines()[-n:]) except Exception: return "(no log file)" def _detect_codex_cli(): try: path = shutil.which("codex") if not path: return None out = subprocess.run(["codex", "--version"], capture_output=True, text=True, timeout=5) ver = (out.stdout or "").strip() or (out.stderr or "").strip() or "unknown" return (path, ver) except Exception: return None def _detect_codex_desktop(): if START_SH.exists(): return str(START_SH) return None def _check_codex_auth(): try: out = subprocess.run( ["codex", "login", "status"], capture_output=True, text=True, timeout=10, ) text = (out.stdout or "").strip() if not text: text = (out.stderr or "").strip() if out.returncode == 0 and text: return ("logged_in", text) if text: return ("error", text) return ("unknown", "No output from codex login status") except FileNotFoundError: return ("not_installed", "codex not found") except Exception as e: return ("error", str(e)) # ═══════════════════════════════════════════════════════════════════ # Main window # ═══════════════════════════════════════════════════════════════════ class LauncherWin(Gtk.Window): def __init__(self): super().__init__(title="Codex Launcher") self.set_default_size(560, 460) self.set_border_width(12) self.set_position(Gtk.WindowPosition.CENTER) self._proc = None self._endpoints_data = load_endpoints() recover_config_if_needed() vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8) self.add(vbox) # header row hdr = Gtk.Box(spacing=8) vbox.pack_start(hdr, False, False, 0) lbl = Gtk.Label(label="Codex Launcher v3.3.0") lbl.set_use_markup(True) hdr.pack_start(lbl, False, False, 0) changelog_btn = Gtk.Button(label="Changelog") changelog_btn.connect("clicked", lambda b: self._show_changelog()) hdr.pack_end(changelog_btn, False, False, 0) history_btn = Gtk.Button(label="History") history_btn.connect("clicked", lambda b: self._open_history()) hdr.pack_end(history_btn, False, False, 0) bench_btn = Gtk.Button(label="Benchmark") bench_btn.connect("clicked", lambda b: self._open_benchmark()) hdr.pack_end(bench_btn, False, False, 0) usage_btn = Gtk.Button(label="Usage") usage_btn.connect("clicked", lambda b: self._open_usage()) hdr.pack_end(usage_btn, False, False, 0) bgp_btn = Gtk.Button(label="AI BGP") bgp_btn.connect("clicked", lambda b: self._open_bgp()) hdr.pack_end(bgp_btn, False, False, 0) mgr_btn = Gtk.Button(label="Manage Endpoints") mgr_btn.connect("clicked", lambda b: self._open_mgr()) hdr.pack_end(mgr_btn, False, False, 0) # verification status bar self._cli_info = _detect_codex_cli() self._desktop_info = _detect_codex_desktop() ver_box = Gtk.Box(spacing=12) vbox.pack_start(ver_box, False, False, 0) if self._cli_info: cli_path, cli_ver = self._cli_info cli_lbl = Gtk.Label() cli_lbl.set_markup(f"✔ Codex CLI {cli_ver} ({cli_path})") cli_lbl.set_use_markup(True) ver_box.pack_start(cli_lbl, False, False, 0) else: cli_lbl = Gtk.Label() cli_lbl.set_markup("✘ Codex CLI — not found") cli_lbl.set_use_markup(True) ver_box.pack_start(cli_lbl, False, False, 0) cli_install_btn = Gtk.Button(label="Install") cli_install_btn.connect("clicked", lambda b: self._show_install_guide("cli")) ver_box.pack_start(cli_install_btn, False, False, 0) ver_box.pack_start(Gtk.Label(label=" "), False, False, 0) if self._desktop_info: desk_lbl = Gtk.Label() desk_lbl.set_markup(f"✔ Codex Desktop ({self._desktop_info})") desk_lbl.set_use_markup(True) ver_box.pack_start(desk_lbl, False, False, 0) else: desk_lbl = Gtk.Label() desk_lbl.set_markup("✘ Codex Desktop — not found") desk_lbl.set_use_markup(True) ver_box.pack_start(desk_lbl, False, False, 0) desk_install_btn = Gtk.Button(label="Install") desk_install_btn.connect("clicked", lambda b: self._show_install_guide("desktop")) ver_box.pack_start(desk_install_btn, False, False, 0) self._missing = [] if not self._cli_info: self._missing.append("cli") if not self._desktop_info: self._missing.append("desktop") auth_box = Gtk.Box(spacing=12) vbox.pack_start(auth_box, False, False, 0) self._auth_label = Gtk.Label() self._auth_label.set_markup("Checking auth…") self._auth_label.set_use_markup(True) self._auth_label.set_ellipsize(3) auth_box.pack_start(self._auth_label, False, False, 0) self._relogin_btn = Gtk.Button(label="Re-login") self._relogin_btn.set_sensitive(False) self._relogin_btn.connect("clicked", lambda b: self._codex_relogin()) auth_box.pack_end(self._relogin_btn, False, False, 0) threading.Thread(target=self._check_auth_async, daemon=True).start() ops_box = Gtk.Box(spacing=8) vbox.pack_start(ops_box, False, False, 0) self._refresh_all_btn = Gtk.Button(label="Refresh Models") self._refresh_all_btn.connect("clicked", lambda b: self._refresh_all_models()) ops_box.pack_start(self._refresh_all_btn, False, False, 0) self._backup_btn = Gtk.Button(label="Backup Profile") self._backup_btn.connect("clicked", lambda b: self._backup_profile()) ops_box.pack_start(self._backup_btn, False, False, 0) self._import_btn = Gtk.Button(label="Import Profile") self._import_btn.connect("clicked", lambda b: self._import_profile()) ops_box.pack_start(self._import_btn, False, False, 0) # endpoint selector sel_box = Gtk.Box(spacing=6) vbox.pack_start(sel_box, False, False, 4) sel_box.pack_start(Gtk.Label(label="Endpoint:"), False, False, 0) self._combo = Gtk.ComboBoxText() self._combo.connect("changed", lambda c: self._on_endpoint_changed()) sel_box.pack_start(self._combo, True, True, 0) # model selector sel_box.pack_start(Gtk.Label(label="Model:"), False, False, 0) self._model_combo = Gtk.ComboBoxText() sel_box.pack_start(self._model_combo, True, True, 0) # launch buttons btn_box = Gtk.Box(spacing=8, homogeneous=True) vbox.pack_start(btn_box, False, False, 8) self._btn_desktop = Gtk.Button(label="Launch Desktop") self._btn_desktop.connect("clicked", lambda b: self._launch("desktop")) if "desktop" in self._missing: self._btn_desktop.set_tooltip_text("Codex Desktop is not installed") self._btn_desktop.set_sensitive(False) btn_box.pack_start(self._btn_desktop, True, True, 0) self._btn_cli = Gtk.Button(label="Launch CLI") self._btn_cli.connect("clicked", lambda b: self._launch("cli")) if "cli" in self._missing: self._btn_cli.set_tooltip_text("Codex CLI is not installed") self._btn_cli.set_sensitive(False) btn_box.pack_start(self._btn_cli, True, True, 0) btn_box2 = Gtk.Box(spacing=8, homogeneous=True) vbox.pack_start(btn_box2, False, False, 0) self._btn_codex_desktop = Gtk.Button(label="Codex Default (Desktop)") self._btn_codex_desktop.connect("clicked", lambda b: self._launch_codex_default("desktop")) if "desktop" in self._missing: self._btn_codex_desktop.set_tooltip_text("Codex Desktop is not installed") self._btn_codex_desktop.set_sensitive(False) btn_box2.pack_start(self._btn_codex_desktop, True, True, 0) self._btn_codex_cli = Gtk.Button(label="Codex Default (CLI)") self._btn_codex_cli.connect("clicked", lambda b: self._launch_codex_default("cli")) if "cli" in self._missing: self._btn_codex_cli.set_tooltip_text("Codex CLI is not installed") self._btn_codex_cli.set_sensitive(False) btn_box2.pack_start(self._btn_codex_cli, True, True, 0) # status sw = Gtk.ScrolledWindow() sw.set_policy(Gtk.PolicyType.AUTOMATIC, Gtk.PolicyType.AUTOMATIC) vbox.pack_start(sw, True, True, 0) self._buf = Gtk.TextBuffer() self._tv = Gtk.TextView(buffer=self._buf) self._tv.set_editable(False) self._tv.set_cursor_visible(False) self._tv.set_wrap_mode(Gtk.WrapMode.WORD_CHAR) sw.add(self._tv) # bottom bar bb = Gtk.Box(spacing=8) vbox.pack_start(bb, False, False, 0) assist_btn = Gtk.Button(label="AI Assistant") assist_btn.get_style_context().add_class("suggested-action") assist_btn.connect("clicked", lambda b: self._open_assistant()) assist_btn.set_tooltip_text("Open AI coding assistant with streaming, tools, and session management") bb.pack_start(assist_btn, False, False, 0) self._kill_btn = Gtk.Button(label="Kill && Cleanup") self._kill_btn.connect("clicked", lambda b: self._kill()) self._kill_btn.set_sensitive(False) bb.pack_start(self._kill_btn, True, True, 0) self._view_log_btn = Gtk.Button(label="View Log") self._view_log_btn.connect("clicked", lambda b: subprocess.Popen(["xdg-open", str(LAUNCH_LOG)])) bb.pack_start(self._view_log_btn, False, False, 0) self._close_btn = Gtk.Button(label="Close") self._close_btn.connect("clicked", lambda b: self._do_close()) bb.pack_start(self._close_btn, False, False, 0) self.show_all() self._rebuild_combo() self._log_dependency_status() # ── helpers ────────────────────────────────────────────────── def log(self, msg): GLib.idle_add(self._append_log, msg) def _append_log(self, msg): e = self._buf.get_end_iter() self._buf.insert(e, msg + "\n") m = self._buf.create_mark(None, e, False) self._tv.scroll_to_mark(m, 0.0, True, 0.0, 0.5) self._buf.delete_mark(m) def _log_dependency_status(self): if self._cli_info: _, ver = self._cli_info self.log(f"✔ Codex CLI detected ({ver})") else: self.log("✘ Codex CLI NOT found — CLI launch disabled. Click 'Install' above.") if self._desktop_info: self.log(f"✔ Codex Desktop detected ({self._desktop_info})") else: self.log("✘ Codex Desktop NOT found — Desktop launch disabled. Click 'Install' above.") if self._missing: self.log("⚠ Install missing tools before using the launcher.") else: self.log("All dependencies OK.") def _check_auth_async(self): status, msg = _check_codex_auth() GLib.idle_add(self._update_auth_status, status, msg) def _update_auth_status(self, status, msg): if status == "logged_in": self._auth_label.set_markup(f"✔ Auth: {msg}") self._relogin_btn.set_sensitive("cli" not in self._missing) elif status == "not_installed": self._auth_label.set_markup("Auth: N/A (CLI not installed)") else: self._auth_label.set_markup(f"⚠ Auth: {msg}") self._relogin_btn.set_sensitive("cli" not in self._missing) return False def _codex_relogin(self): self.log("Opening codex login in terminal…") terms = [ ("x-terminal-emulator", ["-e"]), ("kgx", ["--"]), ("gnome-terminal", ["--"]), ("konsole", ["-e"]), ("xterm", ["-e"]), ] term = None term_args = None for t in terms: if shutil.which(t[0]): term = t[0] term_args = t[1] break if not term: self.log("ERROR: no terminal emulator found for re-login") return cmd_parts = [term] + term_args + ["codex", "login"] subprocess.Popen(cmd_parts, preexec_fn=os.setsid) self.log("Login flow started in terminal. Re-checking auth in 30s…") self._auth_label.set_markup("Auth: waiting for login…") threading.Thread(target=self._delayed_auth_check, daemon=True).start() def _delayed_auth_check(self): time.sleep(30) self._check_auth_async() def _set_busy(self, busy): def _update(): has_cli = "cli" not in self._missing has_desk = "desktop" not in self._missing self._btn_desktop.set_sensitive(not busy and has_desk) self._btn_cli.set_sensitive(not busy and has_cli) self._btn_codex_desktop.set_sensitive(not busy and has_desk) self._btn_codex_cli.set_sensitive(not busy and has_cli) self._kill_btn.set_sensitive(busy) GLib.idle_add(_update) def _rebuild_combo(self): self._endpoints_data = load_endpoints() self._combo.remove_all() names = [e["name"] for e in self._endpoints_data["endpoints"]] for n in names: self._combo.append_text(n) bgp_names = [p["name"] for p in load_bgp_pools().get("pools", [])] for n in bgp_names: self._combo.append_text(f"🔀 {n}") if names or bgp_names: default = self._endpoints_data.get("default") if default and default in names: self._combo.set_active(names.index(default)) else: self._combo.set_active(0) self._on_endpoint_changed() def _on_endpoint_changed(self): name = self._combo.get_active_text() is_bgp = name and name.startswith("🔀 ") bgp_name = name[2:] if is_bgp else None ep = get_endpoint(name) if name and not is_bgp else None self._model_combo.remove_all() if is_bgp: pool = None for p in load_bgp_pools().get("pools", []): if p["name"] == bgp_name: pool = p break if pool: seen = set() for r in pool.get("routes", []): m = r.get("model", "") if m and m not in seen: self._model_combo.append_text(m) seen.add(m) if seen: self._model_combo.set_active(0) elif ep: for m in ep.get("models", []): self._model_combo.append_text(m) GLib.idle_add(self._select_default_model, ep) def _select_default_model(self, ep): dm = ep.get("default_model", "") models = ep.get("models", []) if dm in models: self._model_combo.set_active(models.index(dm)) elif models: self._model_combo.set_active(0) # ── endpoint mgr ───────────────────────────────────────────── def _open_mgr(self): try: self._mgr_window = EndpointMgr(self) self._mgr_window.connect("destroy", lambda *_: setattr(self, "_mgr_window", None)) except Exception as e: import traceback; traceback.print_exc() d = Gtk.MessageDialog(self, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, f"Error: {e}") d.run(); d.destroy() def _open_bgp(self): try: self._bgp_window = BGPPoolMgr(self) self._bgp_window.connect("destroy", lambda *_: setattr(self, "_bgp_window", None)) except Exception as e: import traceback; traceback.print_exc() d = Gtk.MessageDialog(self, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, f"Error: {e}") d.run(); d.destroy() def _open_usage(self): try: self._usage_window = UsageWindow(self) self._usage_window.connect("destroy", lambda *_: setattr(self, "_usage_window", None)) except Exception as e: import traceback; traceback.print_exc() d = Gtk.MessageDialog(self, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, f"Error: {e}") d.run(); d.destroy() def _open_history(self): try: self._history_window = RequestHistoryWindow(self) self._history_window.connect("destroy", lambda *_: setattr(self, "_history_window", None)) except Exception as e: import traceback; traceback.print_exc() d = Gtk.MessageDialog(self, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, f"Error: {e}") d.run(); d.destroy() def _open_benchmark(self): try: self._benchmark_window = BenchmarkWindow(self) self._benchmark_window.connect("destroy", lambda *_: setattr(self, "_benchmark_window", None)) except Exception as e: import traceback; traceback.print_exc() d = Gtk.MessageDialog(self, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, f"Error: {e}") d.run(); d.destroy() def _open_assistant(self): import subprocess, sys _py = str(Path(__file__).resolve().parent / "flet-codex-assist.py") subprocess.Popen([sys.executable, _py], start_new_session=True) def _backup_profile(self): chooser = Gtk.FileChooserDialog( title="Backup Codex Profile", parent=self, action=Gtk.FileChooserAction.SAVE, ) chooser.add_buttons(Gtk.STOCK_CANCEL, Gtk.ResponseType.CANCEL, Gtk.STOCK_SAVE, Gtk.ResponseType.OK) chooser.set_do_overwrite_confirmation(True) chooser.set_current_name(f"codex-profile-{time.strftime('%Y%m%d-%H%M%S')}.json") resp = chooser.run() filename = chooser.get_filename() if resp == Gtk.ResponseType.OK else None chooser.destroy() if not filename: return try: save_profile_bundle(filename) self.log(f"Profile backed up to {filename}") except Exception as e: self._show_message(Gtk.MessageType.ERROR, f"Backup failed:\n{e}") def _refresh_all_models(self): if getattr(self, "_refresh_running", False): return self._refresh_running = True self._refresh_all_btn.set_sensitive(False) self.log("Refreshing models for all providers...") threading.Thread(target=self._refresh_all_models_worker, daemon=True).start() def _refresh_all_models_worker(self): try: data = load_endpoints() updated = 0 failed = [] for idx, ep in enumerate(list(data["endpoints"])): refreshed, err = refresh_endpoint_models(ep) if refreshed: data["endpoints"][idx] = refreshed updated += 1 else: failed.append(f"{ep['name']}: {err}") if updated: save_endpoints(data) GLib.idle_add(self._finish_refresh_all_models, updated, failed) except Exception as e: GLib.idle_add(self._finish_refresh_all_models_error, str(e)) def _finish_refresh_all_models(self, updated, failed): try: if updated: self._rebuild_combo() if getattr(self, "_mgr_window", None): try: self._mgr_window._rebuild() except Exception: pass self.log(f"Refreshed models for {updated} provider(s)") if failed: self._show_message( Gtk.MessageType.WARNING, "Some providers could not auto-fetch models.\n\n" + "\n".join(failed) + "\n\nThose providers were left unchanged so you can manage them manually." ) elif updated: self._show_message(Gtk.MessageType.INFO, f"Refreshed models for {updated} provider(s).") else: self._show_message(Gtk.MessageType.INFO, "No providers were refreshed.") finally: self._refresh_running = False self._refresh_all_btn.set_sensitive(True) return False def _finish_refresh_all_models_error(self, err): try: self._show_message(Gtk.MessageType.ERROR, f"Refresh failed:\n{err}") finally: self._refresh_running = False self._refresh_all_btn.set_sensitive(True) return False def _import_profile(self): if self._proc and self._proc.poll() is None: self._show_message(Gtk.MessageType.WARNING, "Stop Codex before importing a profile.") return chooser = Gtk.FileChooserDialog( title="Import Codex Profile", parent=self, action=Gtk.FileChooserAction.OPEN, ) chooser.add_buttons(Gtk.STOCK_CANCEL, Gtk.ResponseType.CANCEL, Gtk.STOCK_OPEN, Gtk.ResponseType.OK) resp = chooser.run() filename = chooser.get_filename() if resp == Gtk.ResponseType.OK else None chooser.destroy() if not filename: return confirm = Gtk.MessageDialog( self, 0, Gtk.MessageType.QUESTION, Gtk.ButtonsType.YES_NO, "Importing will replace the current endpoints and Codex config. Continue?" ) ok = confirm.run() == Gtk.ResponseType.YES confirm.destroy() if not ok: return try: import_profile_bundle(filename) self._rebuild_combo() self.log(f"Profile imported from {filename}") self._show_message(Gtk.MessageType.INFO, "Profile imported successfully.") except Exception as e: self._show_message(Gtk.MessageType.ERROR, f"Import failed:\n{e}") def _on_endpoints_updated(self): self._rebuild_combo() def _show_message(self, msg_type, text): d = Gtk.MessageDialog(self, 0, msg_type, Gtk.ButtonsType.OK, text) d.run() d.destroy() def _show_changelog(self): d = Gtk.Dialog(title="Changelog", transient_for=self, modal=True) d.set_default_size(520, 480) d.add_button("Close", Gtk.ResponseType.CLOSE) area = d.get_content_area() area.set_margin_start(12) area.set_margin_end(12) area.set_margin_top(12) area.set_margin_bottom(12) sw = Gtk.ScrolledWindow() sw.set_policy(Gtk.PolicyType.AUTOMATIC, Gtk.PolicyType.AUTOMATIC) area.pack_start(sw, True, True, 0) buf = Gtk.TextBuffer() tv = Gtk.TextView(buffer=buf) tv.set_editable(False) tv.set_cursor_visible(False) tv.set_wrap_mode(Gtk.WrapMode.WORD_CHAR) sw.add(tv) lines = [] for ver, date, items in CHANGELOG: lines.append(f"v{ver} ({date})") for item in items: lines.append(f" \u2022 {item}") lines.append("") txt = "\n".join(lines).strip() buf.insert(buf.get_end_iter(), txt) d.show_all() d.run() d.destroy() def _show_install_guide(self, which): if which == "cli": title = "Install Codex CLI" guide = ( "Codex CLI is required to use CLI launch features.\n\n" "Install with npm:\n" " npm install -g @openai/codex\n\n" "Or download from:\n" " https://github.com/openai/codex\n\n" "After installing, restart the launcher." ) else: title = "Install Codex Desktop" guide = ( "Codex Desktop is required to use Desktop launch features.\n\n" "Expected location: /opt/codex-desktop/start.sh\n\n" "Download from:\n" " https://codex.desktop.openai.com\n\n" "After installing, restart the launcher." ) d = Gtk.MessageDialog(self, 0, Gtk.MessageType.INFO, Gtk.ButtonsType.OK, guide) d.set_title(title) d.run() d.destroy() # ── launch ─────────────────────────────────────────────────── def _launch(self, target): name = self._combo.get_active_text() if not name: self.log("ERROR: no endpoint selected") return model = self._model_combo.get_active_text() if not model: self.log("ERROR: no model selected") return is_bgp = bool(name and name.startswith("🔀 ")) if is_bgp: pool_name = name[2:] pool = None for p in load_bgp_pools().get("pools", []): if p["name"] == pool_name: pool = p break if not pool: self.log(f"ERROR: BGP pool '{pool_name}' not found") return self._set_busy(True) self.log(f"=== 🔀 BGP: {pool_name} / {model} → {'Desktop' if target == 'desktop' else 'CLI'} ===") threading.Thread(target=self._run_bgp, args=(pool, model, target), daemon=True).start() return ep = get_endpoint(name) if not ep: self.log("ERROR: endpoint not found") return self._set_busy(True) self.log(f"=== {ep['name']} / {model} → {'Desktop' if target == 'desktop' else 'CLI'} ===") threading.Thread(target=self._run, args=(ep, model, target), daemon=True).start() def _launch_codex_default(self, target): if "cli" not in self._missing: status, msg = _check_codex_auth() if status != "logged_in": d = Gtk.MessageDialog( self, 0, Gtk.MessageType.WARNING, Gtk.ButtonsType.YES_NO, f"Codex auth check: {msg}\n\n" "Launch may fail without valid authentication.\n" "Continue anyway?" ) r = d.run() d.destroy() if r != Gtk.ResponseType.YES: self._set_busy(False) return self._set_busy(True) self.log(f"=== Codex Default (OAuth) → {'Desktop' if target == 'desktop' else 'CLI'} ===") threading.Thread(target=self._run_codex_default, args=(target,), daemon=True).start() def _run(self, ep, model, target): keep_session_alive = False try: self.log("Cleaning up stale processes…") _run_cleanup(self.log) recover_config_if_needed(self.log) needs_proxy = ep["backend_type"] != "native" if needs_proxy: self.log("Starting translation proxy…") try: proxy_port = _start_proxy_for(ep, self.log) except RuntimeError as e: GLib.idle_add(self._show_error_dialog, "Proxy startup failed", str(e)) return self.log(f"Configuring Codex for {ep['name']} (proxied on :{proxy_port})…") begin_config_transaction(f"launch:{ep['name']}") write_config_for_translated(ep, model, proxy_port) else: self.log(f"Configuring Codex for {ep['name']} (native)…") begin_config_transaction(f"launch:{ep['name']}") write_config_for_native(ep, model) if target == "desktop": if needs_proxy: _kill_existing_desktop(self.log) keep_session_alive = self._launch_desktop(ep, model) else: self._launch_cli(ep, model) except Exception as e: self.log(f"ERROR: {e}") finally: if keep_session_alive: self.log("Warm-start handoff detected; keeping proxy/config active for running Desktop.") self._set_busy(False) self.log("Ready. Use Kill && Cleanup when finished.") else: _stop_proxy() restore_config() end_config_transaction() self._set_busy(False) self.log("Ready.") def _run_bgp(self, pool, model, target): keep_session_alive = False try: self.log("Cleaning up stale processes…") _run_cleanup(self.log) recover_config_if_needed(self.log) port = _pick_free_port() self.log(f"Starting BGP proxy with {len(pool.get('routes', []))} routes on :{port}…") bgp_ep = { "name": pool["name"], "backend_type": "openai-compat", "base_url": "http://bgp.placeholder", "api_key": "", "default_model": model, "models": list(dict.fromkeys(r.get("model", model) for r in pool.get("routes", []))), } pcfg = { "port": port, "backend_type": "openai-compat", "target_url": "http://bgp.placeholder", "api_key": "", "bgp_routes": pool.get("routes", []), "models": [{"id": m, "object": "model", "created": 1700000000, "owned_by": "bgp"} for m in bgp_ep["models"]], } pcfg_path = PROXY_CONFIG_DIR / f"proxy-{safe_name(pool['name'])}-{port}.json" pcfg_path.parent.mkdir(parents=True, exist_ok=True) pcfg_path.write_text(json.dumps(pcfg, indent=2)) try: _start_proxy_with_config(pcfg_path, port, self.log) except RuntimeError as e: GLib.idle_add(self._show_error_dialog, "BGP proxy startup failed", str(e)) return begin_config_transaction(f"launch:bgp:{pool['name']}") write_config_for_translated(bgp_ep, model, port) if target == "desktop": _kill_existing_desktop(self.log) keep_session_alive = self._launch_desktop(bgp_ep, model) else: self._launch_cli(bgp_ep, model) except Exception as e: self.log(f"ERROR: {e}") finally: if keep_session_alive: self.log("Warm-start handoff detected; keeping proxy/config active for running Desktop.") self._set_busy(False) self.log("Ready. Use Kill && Cleanup when finished.") else: _stop_proxy() restore_config() end_config_transaction() self._set_busy(False) self.log("Ready.") def _run_codex_default(self, target): try: self.log("Cleaning up stale processes…") _run_cleanup(self.log) _stop_proxy() recover_config_if_needed(self.log) self.log("Resetting config to Codex defaults (OAuth)…") begin_config_transaction("launch:default") if CONFIG.exists(): CONFIG.unlink() if target == "desktop": self._launch_desktop_direct() else: self._launch_cli_default() except Exception as e: self.log(f"ERROR: {e}") finally: restore_config() end_config_transaction() self._set_busy(False) self.log("Ready.") def _show_error_dialog(self, title, message): dialog = Gtk.MessageDialog( transient_for=self, flags=0, message_type=Gtk.MessageType.ERROR, buttons=Gtk.ButtonsType.CLOSE, text=str(title)) dialog.format_secondary_text(str(message)) dialog.run() dialog.destroy() def _launch_desktop(self, ep, model): args = [str(START_SH)] if ep["backend_type"] != "native": args += ["--", "--ozone-platform=wayland"] self._proc = subprocess.Popen(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, preexec_fn=os.setsid) pid = self._proc.pid self.log(f"Desktop started (PID {pid})") self.log(f"Log: {LAUNCH_LOG}") t0 = time.time() stall_warned = False while self._proc and self._proc.poll() is None: time.sleep(1.5) el = time.time() - t0 if el > 20 and not stall_warned: self.log("⚠ Still starting after 20 s — possible stall. Click Kill if window doesn't appear.") self.log(f"--- last log lines ---\n{_last_log_lines()}") stall_warned = True if self._proc: rc = self._proc.poll() el = time.time() - t0 self.log(f"Desktop exited (code {rc}) after {el:.0f}s") if el < 12: self.log("TIP: Quick exit — may be warm-start handoff (normal) or crash. Kill && retry if needed.") last_lines = _last_log_lines() self.log(f"--- last log lines ---\n{last_lines}") if rc == 0 and "warm-start" in last_lines.lower(): self._proc = None return True self._proc = None return False def _launch_cli(self, ep, model): """Launch codex CLI in a terminal with the selected endpoint.""" self.log(f"Launching Codex CLI with {ep['name']}…") # Find a terminal emulator terms = [ ("x-terminal-emulator", ["-e"]), ("kgx", ["--"]), ("gnome-terminal", ["--"]), ("konsole", ["-e"]), ("xterm", ["-e"]), ] term = None term_args = None for t in terms: if shutil.which(t[0]): term = t[0] term_args = t[1] break if not term: self.log("ERROR: no terminal emulator found (tried x-terminal-emulator, kgx, gnome-terminal, konsole, xterm)") return # For proxied endpoints, the proxy is already running (from _run) # For native, no proxy needed cmd_parts = [term] + term_args if ep["backend_type"] == "native": # Just run codex directly — config.toml is already set up cmd_parts.extend(["codex", "-c", f"model={model}"]) else: # Proxy is running, run codex with the profile cmd_parts.extend(["codex", "--profile", ep["name"], "-c", f"model={model}"]) self.log(f"Running: {' '.join(cmd_parts)}") self._proc = subprocess.Popen(cmd_parts, preexec_fn=os.setsid) pid = self._proc.pid self.log(f"CLI started in terminal (PID {pid})") # Wait for terminal process while self._proc and self._proc.poll() is None: time.sleep(1.5) if self._proc: rc = self._proc.poll() self.log(f"CLI exited (code {rc})") self._proc = None def _launch_desktop_direct(self): self.log("Launching Codex Desktop (default OAuth)…") self._proc = subprocess.Popen( [str(START_SH), "--", "--ozone-platform=wayland"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, preexec_fn=os.setsid, ) pid = self._proc.pid self.log(f"Desktop started (PID {pid})") self.log(f"Log: {LAUNCH_LOG}") t0 = time.time() stall_warned = False while self._proc and self._proc.poll() is None: time.sleep(1.5) el = time.time() - t0 if el > 20 and not stall_warned: self.log("Still starting after 20s — possible stall. Click Kill if window doesn't appear.") self.log(f"--- last log lines ---\n{_last_log_lines()}") stall_warned = True if self._proc: rc = self._proc.poll() el = time.time() - t0 self.log(f"Desktop exited (code {rc}) after {el:.0f}s") if el < 12: self.log("TIP: Quick exit — may be warm-start handoff (normal) or crash.") self.log(f"--- last log lines ---\n{_last_log_lines()}") self._proc = None def _launch_cli_default(self): self.log("Launching Codex CLI (default OAuth)…") terms = [ ("x-terminal-emulator", ["-e"]), ("kgx", ["--"]), ("gnome-terminal", ["--"]), ("konsole", ["-e"]), ("xterm", ["-e"]), ] term = None term_args = None for t in terms: if shutil.which(t[0]): term = t[0] term_args = t[1] break if not term: self.log("ERROR: no terminal emulator found") return cmd_parts = [term] + term_args + ["codex"] self.log(f"Running: {' '.join(cmd_parts)}") self._proc = subprocess.Popen(cmd_parts, preexec_fn=os.setsid) pid = self._proc.pid self.log(f"CLI started in terminal (PID {pid})") while self._proc and self._proc.poll() is None: time.sleep(1.5) if self._proc: rc = self._proc.poll() self.log(f"CLI exited (code {rc})") self._proc = None # ── kill ───────────────────────────────────────────────────── def _kill(self): self.log("=== Killing ===") if self._proc and self._proc.poll() is None: try: pgid = os.getpgid(self._proc.pid) os.killpg(pgid, signal.SIGTERM) time.sleep(1) if self._proc.poll() is None: os.killpg(pgid, signal.SIGKILL) except (ProcessLookupError, PermissionError): pass self._proc = None _stop_proxy() _run_cleanup(self.log) restore_config() end_config_transaction() LOG_DIR.mkdir(parents=True, exist_ok=True) LAUNCH_LOG.unlink(missing_ok=True) self.log("Cleanup complete") self._set_busy(False) self.log("Ready.") def _do_close(self): if self._proc and self._proc.poll() is None: d = Gtk.MessageDialog(self, 0, Gtk.MessageType.QUESTION, Gtk.ButtonsType.YES_NO, "Codex is still running. Kill it?") r = d.run() d.destroy() if r != Gtk.ResponseType.YES: return self._kill() _stop_proxy() Gtk.main_quit() # ═══════════════════════════════════════════════════════════════════ # Endpoint manager dialog # ═══════════════════════════════════════════════════════════════════ class EndpointMgr(Gtk.Window): def __init__(self, parent): super().__init__(title="Manage Endpoints") self.set_transient_for(parent) self.set_modal(True) self._parent = parent self.set_default_size(500, 350) self.set_border_width(12) self.set_position(Gtk.WindowPosition.CENTER_ON_PARENT) vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8) self.add(vbox) title_lbl = Gtk.Label(label="Endpoints") title_lbl.set_use_markup(True) vbox.pack_start(title_lbl, False, False, 0) sw = Gtk.ScrolledWindow() vbox.pack_start(sw, True, True, 0) self._store = Gtk.ListStore(str, str, str, str) # name, provider, backend, default_model self._tree = Gtk.TreeView(model=self._store) for i, title in enumerate(["Name", "Provider", "Type", "Default Model"]): col = Gtk.TreeViewColumn(title, Gtk.CellRendererText(), text=i) col.set_resizable(True) self._tree.append_column(col) sw.add(self._tree) btn_bar = Gtk.Box(spacing=8) vbox.pack_start(btn_bar, False, False, 0) self._add_btn = Gtk.Button(label="Add") self._add_btn.connect("clicked", lambda b: self._add()) btn_bar.pack_start(self._add_btn, False, False, 0) self._edit_btn = Gtk.Button(label="Edit") self._edit_btn.connect("clicked", lambda b: self._edit()) btn_bar.pack_start(self._edit_btn, False, False, 0) self._delete_btn = Gtk.Button(label="Delete") self._delete_btn.connect("clicked", lambda b: self._delete()) btn_bar.pack_start(self._delete_btn, False, False, 0) self._default_btn = Gtk.Button(label="Set Default") self._default_btn.connect("clicked", lambda b: self._set_default()) btn_bar.pack_start(self._default_btn, False, False, 0) self._doctor_btn = Gtk.Button(label="Doctor") self._doctor_btn.connect("clicked", lambda b: self._doctor_selected()) btn_bar.pack_start(self._doctor_btn, False, False, 0) self._doctor_all_btn = Gtk.Button(label="Doctor All") self._doctor_all_btn.connect("clicked", lambda b: self._doctor_all()) btn_bar.pack_start(self._doctor_all_btn, False, False, 0) self._mgr_close_btn = Gtk.Button(label="Close") self._mgr_close_btn.connect("clicked", lambda b: self.destroy()) btn_bar.pack_end(self._mgr_close_btn, False, False, 0) self._rebuild() self.show_all() def _rebuild(self): data = load_endpoints() self._store.clear() for ep in data["endpoints"]: provider = ep.get("provider_preset", "Custom") bt = label_for_backend(ep["backend_type"]) self._store.append([ep["name"], provider, bt, ep.get("default_model", "")]) def _selected(self): sel = self._tree.get_selection() m, i = sel.get_selected() if i is None: return None return self._store[i][0] def _add(self): try: self._dialog = EditEndpointDialog(self, None) self._dialog.connect("destroy", lambda *_: setattr(self, "_dialog", None)) except Exception as e: import traceback; traceback.print_exc() d = Gtk.MessageDialog(self, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, f"Error: {e}") d.run(); d.destroy() def _edit(self): name = self._selected() if name: try: self._dialog = EditEndpointDialog(self, name) self._dialog.connect("destroy", lambda *_: setattr(self, "_dialog", None)) except Exception as e: import traceback; traceback.print_exc() d = Gtk.MessageDialog(self, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, f"Error: {e}") d.run(); d.destroy() def _delete(self): name = self._selected() if not name: return d = Gtk.MessageDialog(self, 0, Gtk.MessageType.QUESTION, Gtk.ButtonsType.YES_NO, f'Delete endpoint "{name}"?') r = d.run() d.destroy() if r != Gtk.ResponseType.YES: return data = load_endpoints() data["endpoints"] = [e for e in data["endpoints"] if e["name"] != name] if data.get("default") == name: data["default"] = data["endpoints"][0]["name"] if data["endpoints"] else None save_endpoints(data) self._rebuild() self._parent._on_endpoints_updated() def _set_default(self): name = self._selected() if not name: return data = load_endpoints() data["default"] = name save_endpoints(data) self._rebuild() self._parent._on_endpoints_updated() def _doctor_selected(self): name = self._selected() if not name: return ep = get_endpoint(name) if not ep: return wait_dlg = Gtk.Dialog(title=f"Doctor: {name}…", parent=self, modal=True) wait_dlg.set_default_size(280, 80) lbl = Gtk.Label(label=f"Running diagnostics for {name}…") lbl.set_margin_top(16) lbl.set_margin_bottom(16) wait_dlg.get_content_area().pack_start(lbl, True, True, 0) wait_dlg.show_all() def _run(): checks = run_endpoint_doctor(ep) GLib.idle_add(wait_dlg.destroy) GLib.idle_add(_show_doctor_results, self, name, checks) threading.Thread(target=_run, daemon=True).start() wait_dlg.run() def _doctor_all(self): data = load_endpoints() endpoints = data.get("endpoints", []) if not endpoints: d = Gtk.MessageDialog(self, 0, Gtk.MessageType.INFO, Gtk.ButtonsType.OK, "No endpoints configured.") d.run() d.destroy() return wait_dlg = Gtk.Dialog(title="Doctor All…", parent=self, modal=True) wait_dlg.set_default_size(320, 80) lbl = Gtk.Label(label=f"Testing {len(endpoints)} endpoints…") lbl.set_margin_top(16) lbl.set_margin_bottom(16) wait_dlg.get_content_area().pack_start(lbl, True, True, 0) wait_dlg.show_all() all_results = {} def _run(): for ep in endpoints: try: all_results[ep["name"]] = run_endpoint_doctor(ep) except Exception as e: all_results[ep["name"]] = [("Doctor run", False, str(e)[:100])] GLib.idle_add(wait_dlg.destroy) GLib.idle_add(self._show_doctor_all_results, all_results) threading.Thread(target=_run, daemon=True).start() wait_dlg.run() def _show_doctor_all_results(self, all_results): dlg = Gtk.Dialog(title="Doctor All Results", parent=self, modal=True) dlg.add_button("Close", Gtk.ResponseType.CLOSE) dlg.set_default_size(560, 450) sw = Gtk.ScrolledWindow() sw.set_policy(Gtk.PolicyType.NEVER, Gtk.PolicyType.AUTOMATIC) area = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8) area.set_margin_start(12) area.set_margin_end(12) area.set_margin_top(12) area.set_margin_bottom(12) sw.add(area) for ep_name, checks in all_results.items(): passed = sum(1 for _, ok, _ in checks if ok is True) failed = sum(1 for _, ok, _ in checks if ok is False) if failed: color, status = "#e74c3c", f"{failed} failed" else: color, status = "#27ae60", f"{passed} passed" hdr = Gtk.Label() hdr.set_markup(f'{ep_name} {status}') hdr.set_xalign(0) area.pack_start(hdr, False, False, 4) for name, ok, detail in checks: if ok is True: sym, sc = "\u2713", "#27ae60" elif ok is False: sym, sc = "\u2717", "#e74c3c" else: sym, sc = "\u25CB", "#f39c12" row = Gtk.Box(spacing=4) row.set_margin_start(12) icon = Gtk.Label() icon.set_markup(f'{sym}') lbl = Gtk.Label() lbl.set_markup(f'{name}' + (f' {detail}' if detail else '') + '') lbl.set_xalign(0) row.pack_start(icon, False, False, 0) row.pack_start(lbl, False, False, 0) area.pack_start(row, False, False, 1) sep = Gtk.Separator() area.pack_start(sep, False, False, 4) dlg.get_content_area().pack_start(sw, True, True, 0) dlg.show_all() dlg.run() dlg.destroy() class EditEndpointDialog(Gtk.Dialog): def __init__(self, parent, existing_name): title = "Edit Endpoint" if existing_name else "Add Endpoint" Gtk.Dialog.__init__(self, title=title) self.set_transient_for(parent) self.set_modal(True) self._parent_mgr = parent self._existing_name = existing_name self._data = get_endpoint(existing_name) if existing_name else { "name": "", "backend_type": "openai-compat", "base_url": "", "api_key": "", "default_model": "", "models": [], "provider_preset": "Custom", } self.set_default_size(480, 520) area = self.get_content_area() area.set_spacing(6) area.set_margin_start(12) area.set_margin_end(12) area.set_margin_top(12) area.set_margin_bottom(12) grid = Gtk.Grid(column_spacing=8, row_spacing=6) area.pack_start(grid, False, False, 0) def add_row(row, label, widget): grid.attach(Gtk.Label(label=label, xalign=1), 0, row, 1, 1) grid.attach(widget, 1, row, 1, 1) self._entry_name = Gtk.Entry(text=self._data.get("name", "")) add_row(0, "Name:", self._entry_name) self._combo_preset = Gtk.ComboBoxText() self._preset_names = list(PROVIDER_PRESETS.keys()) for preset_name in self._preset_names: self._combo_preset.append_text(preset_name) self._combo_preset.set_active(self._preset_names.index(self._data.get("provider_preset", "Custom")) if self._data.get("provider_preset", "Custom") in self._preset_names else 0) self._combo_preset.connect("changed", lambda c: self._apply_selected_preset()) add_row(1, "Preset:", self._combo_preset) self._combo_type = Gtk.ComboBoxText() for val, lab in [("openai-compat", "OpenAI-compatible (needs proxy)"), ("anthropic", "Anthropic (needs proxy)"), ("command-code", "Command Code (needs proxy)"), ("gemini-oauth-cli", "Gemini CLI OAuth (needs proxy)"), ("gemini-oauth-antigravity", "Antigravity OAuth (needs proxy)"), ("native", "Native OpenAI (no proxy)")]: self._combo_type.append(val, lab) bt = self._data.get("backend_type", "openai-compat") self._combo_type.set_active_id(bt) add_row(2, "Type:", self._combo_type) self._entry_url = Gtk.Entry(text=self._data.get("base_url", "")) add_row(3, "Base URL:", self._entry_url) self._entry_key = Gtk.Entry(text=self._data.get("api_key", "")) self._entry_key.set_visibility(False) key_box = Gtk.Box(spacing=6) key_box.pack_start(self._entry_key, True, True, 0) self._oauth_btn = Gtk.Button(label="OAuth Login") self._oauth_btn.connect("clicked", lambda b: self._do_oauth_login()) key_box.pack_start(self._oauth_btn, False, False, 0) add_row(4, "API Key:", key_box) self._oauth_btn.set_visible(False) self._entry_cc_ver = Gtk.Entry(text=self._data.get("cc_version", "")) self._entry_cc_ver.set_placeholder_text("e.g. 0.26.8 (Command Code only)") add_row(5, "CC Version:", self._entry_cc_ver) reasoning_css = b""" switch.reasoning-toggle { min-width: 56px; min-height: 28px; border-radius: 14px; background: #e67e22; border: 2px solid #cf6d17; } switch.reasoning-toggle:checked { background: #2ecc71; border: 2px solid #27ae60; } switch.reasoning-toggle slider { min-width: 24px; min-height: 24px; border-radius: 12px; background: white; border: 1px solid #bbb; } """ reasoning_box = Gtk.Box(spacing=10) self._switch_reasoning = Gtk.Switch() self._switch_reasoning.set_name("reasoning-toggle") ctx = self._switch_reasoning.get_style_context() ctx.add_class("reasoning-toggle") try: css_prov = Gtk.CssProvider() css_prov.load_from_data(reasoning_css) ctx.add_provider(css_prov, Gtk.STYLE_PROVIDER_PRIORITY_USER) except Exception: pass self._switch_reasoning.set_active(self._data.get("reasoning_enabled", True)) self._switch_reasoning.connect("notify::active", lambda *a: self._on_reasoning_toggled()) reasoning_box.pack_start(self._switch_reasoning, False, False, 0) self._lbl_reasoning = Gtk.Label() reasoning_box.pack_start(self._lbl_reasoning, False, False, 0) add_row(6, "Reasoning:", reasoning_box) self._combo_effort = Gtk.ComboBoxText() for ev, el in [("none", "None"), ("minimal", "Minimal"), ("low", "Low"), ("medium", "Medium"), ("high", "High"), ("max", "Max")]: self._combo_effort.append(ev, el) saved_effort = self._data.get("reasoning_effort", "medium") self._combo_effort.set_active_id(saved_effort if saved_effort in ("none","minimal","low","medium","high","max") else "medium") add_row(7, "Effort:", self._combo_effort) self._on_reasoning_toggled() # Models mlbl = Gtk.Label(label="Models:", xalign=0) area.pack_start(mlbl, False, False, 4) mbox = Gtk.Box(spacing=6) area.pack_start(mbox, False, False, 0) self._entry_model = Gtk.Entry() mbox.pack_start(self._entry_model, True, True, 0) self._add_model_btn = Gtk.Button(label="Add") self._add_model_btn.connect("clicked", lambda b: self._add_model()) mbox.pack_start(self._add_model_btn, False, False, 0) self._add_list_btn = Gtk.Button(label="Add List") self._add_list_btn.connect("clicked", lambda b: self._add_models_from_text()) mbox.pack_start(self._add_list_btn, False, False, 0) self._fetch_models_btn = Gtk.Button(label="Fetch from API") self._fetch_models_btn.connect("clicked", lambda b: self._fetch_models()) mbox.pack_start(self._fetch_models_btn, False, False, 0) self._test_btn = Gtk.Button(label="Test Endpoint") self._test_btn.connect("clicked", lambda b: self._diagnose_endpoint()) mbox.pack_start(self._test_btn, False, False, 0) bulk_lbl = Gtk.Label(label="Bulk add models (one per line or comma-separated):", xalign=0) area.pack_start(bulk_lbl, False, False, 2) bulk_sw = Gtk.ScrolledWindow() bulk_sw.set_min_content_height(72) area.pack_start(bulk_sw, False, False, 0) self._bulk_buf = Gtk.TextBuffer() self._bulk_text = Gtk.TextView(buffer=self._bulk_buf) self._bulk_text.set_wrap_mode(Gtk.WrapMode.WORD_CHAR) bulk_sw.add(self._bulk_text) sw = Gtk.ScrolledWindow() sw.set_min_content_height(120) area.pack_start(sw, True, True, 0) self._model_store = Gtk.ListStore(str) self._model_tree = Gtk.TreeView(model=self._model_store) self._model_tree.append_column(Gtk.TreeViewColumn("Model ID", Gtk.CellRendererText(), text=0)) self._model_tree.set_rules_hint(True) sw.add(self._model_tree) self._model_tree.connect("row-activated", lambda t, p, c: self._remove_model(p)) for m in self._data.get("models", []): self._model_store.append([m]) # Default model combo dbox = Gtk.Box(spacing=6) area.pack_start(dbox, False, False, 0) dbox.pack_start(Gtk.Label(label="Default Model:"), False, False, 0) self._combo_default = Gtk.ComboBoxText() self._refresh_default_combo() dbox.pack_start(self._combo_default, True, True, 0) dm = self._data.get("default_model", "") if dm: self._combo_default.set_active_id(dm) self._apply_selected_preset(initial=True) # Buttons self.add_button("Cancel", Gtk.ResponseType.CANCEL) self.add_button("Save", Gtk.ResponseType.OK) self.connect("response", self._on_response) self.show_all() def _add_model(self): m = normalize_model_id(self._entry_model.get_text()) if m: current = self._combo_default.get_active_text() self._model_store.append([m]) self._refresh_default_combo(current or m) self._entry_model.set_text("") def _add_models_from_text(self): buf = self._bulk_buf.get_text(self._bulk_buf.get_start_iter(), self._bulk_buf.get_end_iter(), True) models = parse_model_list(buf) if not models: return current = self._combo_default.get_active_text() existing = {self._model_store[i][0] for i in range(len(self._model_store))} added = False for mid in models: if mid not in existing: self._model_store.append([mid]) existing.add(mid) added = True if added: self._refresh_default_combo(current or models[0]) self._bulk_buf.set_text("") def _apply_selected_preset(self, initial=False): preset_name = self._combo_preset.get_active_text() or "Custom" preset = PROVIDER_PRESETS.get(preset_name, PROVIDER_PRESETS["Custom"]) is_oauth = bool(preset.get("oauth_provider")) self._oauth_btn.set_visible(is_oauth) if is_oauth: self._entry_key.set_placeholder_text("Auto-filled by OAuth") else: self._entry_key.set_placeholder_text("") if not initial or self._existing_name is None: self._combo_type.set_active_id(preset.get("backend_type", "openai-compat")) self._entry_url.set_text(preset.get("base_url", "")) if not self._entry_key.get_text().strip(): self._entry_key.set_text("") cc_ver = preset.get("cc_version", "") if cc_ver and not self._entry_cc_ver.get_text().strip(): self._entry_cc_ver.set_text(cc_ver) if preset.get("models") and len(self._model_store) == 0: for mid in preset["models"]: self._model_store.append([mid]) self._refresh_default_combo(preset["models"][0]) if initial and self._data.get("models"): self._refresh_default_combo(self._data.get("default_model", "")) def _on_reasoning_toggled(self, *_): active = self._switch_reasoning.get_active() self._combo_effort.set_sensitive(active) if active: self._lbl_reasoning.set_markup('ON') else: self._lbl_reasoning.set_markup('OFF') def _do_oauth_login(self): preset_name = self._combo_preset.get_active_text() or "Custom" preset = PROVIDER_PRESETS.get(preset_name, {}) provider = preset.get("oauth_provider", "") if (provider or "").startswith("google"): self._google_oauth_flow(provider) def _google_oauth_flow(self, oauth_provider="google-cli"): is_antigravity = oauth_provider == "google-antigravity" token_path = os.path.expanduser("~/.cache/codex-proxy/google-antigravity-oauth-token.json" if is_antigravity else "~/.cache/codex-proxy/google-cli-oauth-token.json") if is_antigravity: CLIENT_ID = "1071006060591-tmhssin2h21lcre235vtolojh4g403ep.apps.googleusercontent.com" CLIENT_SECRET = "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" SCOPES = [ "https://www.googleapis.com/auth/cloud-platform", "https://www.googleapis.com/auth/userinfo.email", "https://www.googleapis.com/auth/userinfo.profile", "https://www.googleapis.com/auth/cclog", "https://www.googleapis.com/auth/experimentsandconfigs", ] port = 51121 redirect_uri = f"http://localhost:{port}/oauth-callback" callback_path = "/oauth-callback" provider_kind = "antigravity" else: CLIENT_ID = "681255809395-oo8ft2oprdrnp9e3aqf6av3hmdib135j.apps.googleusercontent.com" CLIENT_SECRET = "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" SCOPES = [ "https://www.googleapis.com/auth/cloud-platform", "https://www.googleapis.com/auth/userinfo.email", "https://www.googleapis.com/auth/userinfo.profile", ] port = 0 redirect_uri = None callback_path = "/oauth2callback" provider_kind = "cli" import http.server state = secrets.token_hex(32) verifier = secrets.token_urlsafe(64) challenge = base64.urlsafe_b64encode(hashlib.sha256(verifier.encode()).digest()).rstrip(b"=").decode() if port == 0: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("127.0.0.1", 0)) port = s.getsockname()[1] redirect_uri = f"http://127.0.0.1:{port}/oauth2callback" scope_str = " ".join(SCOPES) auth_url = ( f"https://accounts.google.com/o/oauth2/v2/auth?" f"client_id={CLIENT_ID}" f"&redirect_uri={urllib.parse.quote(redirect_uri)}" f"&response_type=code" f"&scope={urllib.parse.quote(scope_str)}" f"&access_type=offline" f"&prompt=select_account%20consent" f"&state={state}" f"&code_challenge={challenge}" f"&code_challenge_method=S256" ) dlg = Gtk.Dialog(title="Google OAuth (Gemini Mode)", parent=self, modal=True) dlg.add_button("Cancel", Gtk.ResponseType.CANCEL) dlg.set_default_size(520, 280) area = dlg.get_content_area() area.set_margin_start(16) area.set_margin_end(16) area.set_margin_top(12) area.set_margin_bottom(12) area.set_spacing(8) area.pack_start(Gtk.Label(label="Sign in with Google", use_markup=True, xalign=0), False, False, 0) area.pack_start(Gtk.Label(label="Emulating Gemini CLI OAuth — no client_secret.json needed.", xalign=0), False, False, 0) link_lbl = Gtk.Label() link_lbl.set_markup(f'Click here to open Google authorization') link_lbl.set_line_wrap(True) area.pack_start(link_lbl, False, False, 4) self._oauth_status = Gtk.Label(label="Opening browser…", xalign=0) area.pack_start(self._oauth_status, False, False, 4) spinner = Gtk.Spinner() spinner.start() area.pack_start(spinner, False, False, 8) area.show_all() code_holder = [None] error_holder = [None] received_state = [None] class OAuthHandler(http.server.BaseHTTPRequestHandler): def do_GET(self2): qs = urllib.parse.urlparse(self2.path).query params = urllib.parse.parse_qs(qs) received_state[0] = params.get("state", [None])[0] with open("/tmp/codex-oauth-debug.log", "a") as _dbg: _dbg.write(f"[{time.strftime('%H:%M:%S')}] GET {self2.path} state={received_state[0]} code={'code' in params}\n") if self2.path.find(callback_path) == -1: self2.send_response(302) self2.send_header("Location", "https://developers.google.com/gemini-code-assist/auth_failure_gemini") self2.end_headers() error_holder[0] = "unexpected request" return if "code" in params: if received_state[0] != state: self2.send_response(400) self2.send_header("Content-Type", "text/html") self2.end_headers() self2.wfile.write(b"" b"

CSRF state mismatch.

") error_holder[0] = "CSRF state mismatch" return code_holder[0] = params["code"][0] self2.send_response(302) self2.send_header("Location", "https://developers.google.com/gemini-code-assist/auth_success_gemini") self2.end_headers() else: error_holder[0] = params.get("error", ["unknown"])[0] self2.send_response(302) self2.send_header("Location", "https://developers.google.com/gemini-code-assist/auth_failure_gemini") self2.end_headers() def log_message(self2, fmt, *args): with open("/tmp/codex-oauth-debug.log", "a") as _dbg: _dbg.write(f"[{time.strftime('%H:%M:%S')}] {fmt % args}\n") try: bind_host = "localhost" if is_antigravity else "127.0.0.1" server = http.server.HTTPServer((bind_host, port), OAuthHandler) except OSError: self._oauth_status.set_text(f"Port {port} already in use — close other apps and retry.") spinner.stop() dlg.run(); dlg.destroy() return def _oauth_log(msg): with open("/tmp/codex-oauth-debug.log", "a") as _f: _f.write(f"[{time.strftime('%H:%M:%S')}] {msg}\n") _oauth_log(f"Starting OAuth: port={port} redirect_uri={redirect_uri}") def wait_for_code(): _oauth_log("wait_for_code thread started") deadline = time.time() + 120 while code_holder[0] is None and error_holder[0] is None and time.time() < deadline: server.handle_request() server.server_close() _oauth_log(f"Server closed. code={'yes' if code_holder[0] else 'no'} error={'yes' if error_holder[0] else 'no'}") if code_holder[0]: try: _oauth_log("Exchanging code for token...") token_data = urllib.parse.urlencode({ "code": code_holder[0], "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "redirect_uri": redirect_uri, "grant_type": "authorization_code", "code_verifier": verifier, }).encode() req = urllib.request.Request("https://oauth2.googleapis.com/token", data=token_data, headers={"Content-Type": "application/x-www-form-urlencoded"}) resp = urllib.request.urlopen(req, timeout=30) tokens = json.loads(resp.read()) tokens["client_id"] = CLIENT_ID tokens["client_secret"] = CLIENT_SECRET tokens["provider_kind"] = provider_kind tokens["expires_at"] = time.time() + tokens.get("expires_in", 3600) os.makedirs(os.path.dirname(token_path), exist_ok=True) with open(token_path, "w") as f: json.dump(tokens, f, indent=2) os.chmod(token_path, 0o600) _oauth_log(f"Token saved to {token_path}") project_id = "" try: _oauth_log("Discovering project ID via loadCodeAssist...") lr = urllib.request.Request( "https://cloudcode-pa.googleapis.com/v1internal:loadCodeAssist", data=json.dumps({}).encode(), headers={ "Content-Type": "application/json", "Authorization": f"Bearer {tokens['access_token']}", "User-Agent": "google-api-nodejs-client/9.15.1", }) lresp = urllib.request.urlopen(lr, timeout=15) ldata = json.loads(lresp.read()) p = ldata.get("cloudaicompanionProject", "") if isinstance(p, dict): project_id = p.get("id", "") elif isinstance(p, str): project_id = p _oauth_log(f"Project ID: {project_id or '(none)'}") if project_id: tokens["project_id"] = project_id with open(token_path, "w") as f2: json.dump(tokens, f2, indent=2) os.chmod(token_path, 0o600) except Exception as pe: _oauth_log(f"loadCodeAssist failed (non-fatal): {pe}") if is_antigravity: found_models = [ "gemini-2.5-flash", "gemini-2.5-pro", "gemini-3-flash-preview", "gemini-3-pro-preview", "gemini-3.1-pro-preview", "gemini-3-pro-low", "gemini-3-pro-high", "gemini-3.1-pro-low", "gemini-3.1-pro-high", "gemini-3-flash-low", "gemini-3-flash-medium", "gemini-3-flash-high", "claude-sonnet-4-6", "claude-opus-4-6-thinking", "claude-opus-4-6-thinking-low", "claude-opus-4-6-thinking-medium", "claude-opus-4-6-thinking-high", "gemini-claude-sonnet-4-6", "gemini-claude-opus-4-6-thinking-low", "gemini-claude-opus-4-6-thinking-medium", "gemini-claude-opus-4-6-thinking-high", "gemini-3-pro-image", ] probe_candidates = [ "gemini-2.5-flash", "gemini-2.5-pro", "gemini-3-flash-preview", "gemini-3-pro-preview", "gemini-3.1-pro-preview", ] _oauth_log(f"Probing {len(probe_candidates)} model candidates...") for mc in probe_candidates: try: pr = urllib.request.Request( "https://daily-cloudcode-pa.sandbox.googleapis.com/v1internal:generateContent", data=json.dumps({ "project": project_id, "model": mc, "request": {"contents": [{"role": "user", "parts": [{"text": "x"}]}], "generationConfig": {"maxOutputTokens": 1}}, }).encode(), headers={ "Content-Type": "application/json", "Authorization": f"Bearer {tokens['access_token']}", "User-Agent": "google-api-nodejs-client/9.15.1", "Client-Metadata": "ideType=IDE_UNSPECIFIED,platform=PLATFORM_UNSPECIFIED,pluginType=GEMINI", }) pr.get_method = lambda: "POST" resp = urllib.request.urlopen(pr, timeout=10) resp.read() found_models.append(mc) _oauth_log(f" {mc} → available") except urllib.error.HTTPError as e: if e.code == 429: found_models.append(mc) _oauth_log(f" {mc} → available (rate limited)") else: e.read() _oauth_log(f" {mc} → HTTP {e.code}") except Exception as e: _oauth_log(f" {mc} → error: {e}") else: found_models = ["gemini-2.5-flash", "gemini-2.5-pro"] if found_models: tokens["available_models"] = found_models with open(token_path, "w") as f3: json.dump(tokens, f3, indent=2) os.chmod(token_path, 0o600) _oauth_log(f"Discovered {len(found_models)} models: {found_models}") else: _oauth_log("No models discovered (will use defaults)") GLib.idle_add(self._oauth_success, dlg, tokens.get("access_token", ""), spinner) return except urllib.error.HTTPError as e: body = e.read().decode(errors='replace') _oauth_log(f"Token exchange HTTP {e.code}: {body}") GLib.idle_add(self._oauth_failed, dlg, f"Token exchange failed ({e.code}): {body[:200]}", spinner) return except Exception as e: _oauth_log(f"Token exchange FAILED: {e}") GLib.idle_add(self._oauth_failed, dlg, f"Token exchange failed: {e}", spinner) return _oauth_log(f"OAuth failed: {error_holder[0] or 'timeout'}") GLib.idle_add(self._oauth_failed, dlg, error_holder[0] or "No authorization code received.", spinner) threading.Thread(target=wait_for_code, daemon=True).start() subprocess.Popen(["xdg-open", auth_url], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) dlg.connect("response", lambda d, r: d.destroy()) dlg.run() def _oauth_success(self, dlg, access_token, spinner): spinner.stop() self._entry_key.set_text(access_token) self._oauth_status.set_markup('Authorization successful! Token saved.') dlg.set_title("Google OAuth — Success") GLib.timeout_add(1500, lambda: dlg.response(Gtk.ResponseType.OK)) def _oauth_failed(self, dlg, msg, spinner): spinner.stop() self._oauth_status.set_markup(f'{msg}') GLib.timeout_add(3000, lambda: dlg.response(Gtk.ResponseType.CANCEL)) def _remove_model(self, path): current = self._combo_default.get_active_text() self._model_store.remove(self._model_store.get_iter(path)) self._refresh_default_combo(current) def _refresh_default_combo(self, active=None): if active is None: active = self._combo_default.get_active_text() self._combo_default.remove_all() for row in self._model_store: self._combo_default.append(row[0], row[0]) if active and any(row[0] == active for row in self._model_store): self._combo_default.set_active_id(active) elif len(self._model_store) > 0: self._combo_default.set_active(0) def _fetch_models(self): ok, err = self._try_fetch_models() if not ok: d = Gtk.MessageDialog(self, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, f"Failed to fetch models:\n{err}") d.run() d.destroy() def _try_fetch_models(self): endpoint = { "base_url": self._entry_url.get_text().strip(), "api_key": self._entry_key.get_text().strip(), "backend_type": self._combo_type.get_active_id() or "openai-compat", } ids, err = fetch_models_for_endpoint(endpoint) if ids: current = self._combo_default.get_active_text() added = 0 for mid in ids: # check dupes found = any(self._model_store[i][0] == mid for i in range(len(self._model_store))) if not found: self._model_store.append([mid]) added += 1 self._refresh_default_combo(current) return True, None return False, err or "No models returned by endpoint" def _diagnose_endpoint(self): ep = { "base_url": self._entry_url.get_text().strip(), "api_key": self._entry_key.get_text().strip(), "backend_type": self._combo_type.get_active_id() or "openai-compat", "default_model": self._combo_default.get_active_text() or "", } name = ep.get("default_model") or "endpoint" wait_dlg = Gtk.Dialog(title="Running Doctor…", parent=self, modal=True) wait_dlg.set_default_size(280, 80) lbl = Gtk.Label(label="Running endpoint diagnostics…") lbl.set_margin_top(16) lbl.set_margin_bottom(16) wait_dlg.get_content_area().pack_start(lbl, True, True, 0) wait_dlg.show_all() def _run(): checks = run_endpoint_doctor(ep) GLib.idle_add(wait_dlg.destroy) GLib.idle_add(_show_doctor_results, self, name, checks) threading.Thread(target=_run, daemon=True).start() wait_dlg.run() def _on_response(self, dialog, response): if response != Gtk.ResponseType.OK: self.destroy() return name = self._entry_name.get_text().strip() if not name: self._show_error("Name is required") return bt = self._combo_type.get_active_id() or PROVIDER_PRESETS.get(self._combo_preset.get_active_text() or "", {}).get("backend_type") or "openai-compat" url = self._entry_url.get_text().strip() key = self._entry_key.get_text().strip() models = [self._model_store[i][0] for i in range(len(self._model_store))] if not models: ok, err = self._try_fetch_models() if ok: models = [self._model_store[i][0] for i in range(len(self._model_store))] else: d = Gtk.MessageDialog( self, 0, Gtk.MessageType.QUESTION, Gtk.ButtonsType.YES_NO, f"Auto-fetch failed ({err}).\n\nAdd models manually now?" ) r = d.run() d.destroy() if r == Gtk.ResponseType.YES: self._entry_model.grab_focus() return self.destroy() return if not models: self._show_error("At least one model is required") self._entry_model.grab_focus() return default = self._combo_default.get_active_text() or models[0] data = load_endpoints() # If renaming, remove old entry if self._existing_name and self._existing_name != name: data["endpoints"] = [e for e in data["endpoints"] if e["name"] != self._existing_name] # Check for duplicate name existing = [e for e in data["endpoints"] if e["name"] == name and e != self._data] if existing: self._show_error(f'Endpoint "{name}" already exists') return new_ep = {"name": name, "backend_type": bt, "base_url": url, "api_key": key, "default_model": default, "models": models, "provider_preset": self._combo_preset.get_active_text() or "Custom"} cc_ver = self._entry_cc_ver.get_text().strip() if cc_ver: new_ep["cc_version"] = cc_ver new_ep["reasoning_enabled"] = self._switch_reasoning.get_active() new_ep["reasoning_effort"] = self._combo_effort.get_active_id() or "medium" preset_name = self._combo_preset.get_active_text() or "Custom" preset = PROVIDER_PRESETS.get(preset_name, {}) if preset.get("oauth_provider"): new_ep["oauth_provider"] = preset["oauth_provider"] new_ep["base_url"] = normalize_base_url(new_ep["base_url"]) # Update or append found = False for i, e in enumerate(data["endpoints"]): if e["name"] == name: data["endpoints"][i] = new_ep found = True break if not found: data["endpoints"].append(new_ep) if data.get("default") is None: data["default"] = name save_endpoints(data) self._parent_mgr._rebuild() self._parent_mgr._parent._on_endpoints_updated() self.destroy() def _show_error(self, msg): d = Gtk.MessageDialog(self, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, msg) d.run(); d.destroy() # ═══════════════════════════════════════════════════════════════════ # Entry point # ═══════════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════════ # BGP Pool Manager # ═══════════════════════════════════════════════════════════════════ class BGPPoolMgr(Gtk.Window): def __init__(self, parent): super().__init__(title="AI BGP — Pool Manager") self.set_transient_for(parent) self.set_default_size(620, 440) self._parent = parent vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8) vbox.set_margin_start(12) vbox.set_margin_end(12) vbox.set_margin_top(12) vbox.set_margin_bottom(12) self.add(vbox) hdr = Gtk.Box(spacing=8) vbox.pack_start(hdr, False, False, 0) hdr.pack_start(Gtk.Label(label="AI BGP Pools — multi-provider routing with automatic failover", use_markup=True), False, False, 0) self._store = Gtk.ListStore(str, str, str) self._tree = Gtk.TreeView(model=self._store) for i, (title, w) in enumerate([("Pool Name", 200), ("Routes", 250), ("Strategy", 100)]): r = Gtk.CellRendererText() c = Gtk.TreeViewColumn(title, r, text=i) c.set_min_width(w) self._tree.append_column(c) self._tree.set_headers_visible(True) sw = Gtk.ScrolledWindow() sw.add(self._tree) vbox.pack_start(sw, True, True, 0) sel = self._tree.get_selection() sel.connect("changed", lambda *_: self._on_select()) bbox = Gtk.Box(spacing=8) vbox.pack_start(bbox, False, False, 0) self._add_btn = Gtk.Button(label="Create Pool") self._add_btn.connect("clicked", lambda b: self._add_pool()) bbox.pack_start(self._add_btn, True, True, 0) self._edit_btn = Gtk.Button(label="Edit Pool") self._edit_btn.connect("clicked", lambda b: self._edit_pool()) self._edit_btn.set_sensitive(False) bbox.pack_start(self._edit_btn, True, True, 0) self._del_btn = Gtk.Button(label="Delete Pool") self._del_btn.connect("clicked", lambda b: self._del_pool()) self._del_btn.set_sensitive(False) bbox.pack_start(self._del_btn, True, True, 0) close_btn = Gtk.Button(label="Close") close_btn.connect("clicked", lambda b: self.destroy()) bbox.pack_start(close_btn, True, True, 0) self._rebuild() self.show_all() def _rebuild(self): self._store.clear() for pool in load_bgp_pools().get("pools", []): routes_str = " → ".join(f'{r.get("name","?")}/{r.get("model","?")}' for r in pool.get("routes", [])) self._store.append([pool["name"], routes_str, pool.get("strategy", "failover")]) def _selected_name(self): sel = self._tree.get_selection() m, i = sel.get_selected() return self._store[i][0] if i else None def _on_select(self): name = self._selected_name() self._edit_btn.set_sensitive(bool(name)) self._del_btn.set_sensitive(bool(name)) def _add_pool(self): d = BGPPoolEditDialog(self, None) d.connect("response", lambda *_: self._rebuild()) def _edit_pool(self): name = self._selected_name() if name: d = BGPPoolEditDialog(self, name) d.connect("response", lambda *_: self._rebuild()) def _del_pool(self): name = self._selected_name() if not name: return d = Gtk.MessageDialog(self, 0, Gtk.MessageType.QUESTION, Gtk.ButtonsType.YES_NO, f'Delete BGP pool "{name}"?') r = d.run(); d.destroy() if r != Gtk.ResponseType.YES: return data = load_bgp_pools() data["pools"] = [p for p in data["pools"] if p["name"] != name] save_bgp_pools(data) self._rebuild() self._parent._on_endpoints_updated() class BGPPoolEditDialog(Gtk.Dialog): def __init__(self, parent, existing_name): title = "Edit BGP Pool" if existing_name else "Create BGP Pool" Gtk.Dialog.__init__(self, title=title, parent=parent, modal=True) self.add_button("Cancel", Gtk.ResponseType.CANCEL) self.add_button("Save", Gtk.ResponseType.OK) self.set_default_size(580, 480) self._existing_name = existing_name self._parent_mgr = parent data = load_bgp_pools() pool = None if existing_name: for p in data.get("pools", []): if p["name"] == existing_name: pool = p break if not pool: pool = {"name": "", "strategy": "failover", "routes": []} area = self.get_content_area() area.set_margin_start(12) area.set_margin_end(12) area.set_margin_top(12) area.set_margin_bottom(12) area.set_spacing(8) grid = Gtk.Grid(column_spacing=8, row_spacing=6) area.pack_start(grid, False, False, 0) grid.attach(Gtk.Label(label="Pool Name:", xalign=1), 0, 0, 1, 1) self._entry_name = Gtk.Entry(text=pool["name"]) grid.attach(self._entry_name, 1, 0, 1, 1) grid.attach(Gtk.Label(label="Strategy:", xalign=1), 0, 1, 1, 1) self._combo_strategy = Gtk.ComboBoxText() self._combo_strategy.append("failover", "Failover (try primary, fall back on error)") self._combo_strategy.append("race", "Race (send to all, return fastest)") self._combo_strategy.set_active_id(pool.get("strategy", "failover")) grid.attach(self._combo_strategy, 1, 1, 1, 1) area.pack_start(Gtk.Label(label="Routes (drag to reorder priority)", use_markup=True, xalign=0), False, False, 8) self._route_store = Gtk.ListStore(str, str, str, str, str, str) for r in pool.get("routes", []): self._route_store.append([ r.get("name", ""), r.get("endpoint_name", ""), r.get("target_url", ""), r.get("api_key", ""), r.get("model", ""), str(r.get("priority", 99)) ]) self._route_tree = Gtk.TreeView(model=self._route_store) for i, (title, w) in enumerate([ ("Route Name", 120), ("Endpoint", 120), ("URL", 150), ("API Key", 80), ("Model", 120), ("Priority", 60) ]): renderer = Gtk.CellRendererText() renderer.set_property("editable", False) col = Gtk.TreeViewColumn(title, renderer, text=i) col.set_min_width(w) col.set_resizable(True) self._route_tree.append_column(col) self._route_tree.set_headers_visible(True) sw = Gtk.ScrolledWindow() sw.set_policy(Gtk.PolicyType.AUTOMATIC, Gtk.PolicyType.AUTOMATIC) sw.add(self._route_tree) sw.set_min_content_height(200) area.pack_start(sw, True, True, 0) bbox = Gtk.Box(spacing=6) area.pack_start(bbox, False, False, 0) add_r = Gtk.Button(label="Add Route") add_r.connect("clicked", lambda b: self._add_route()) bbox.pack_start(add_r, True, True, 0) edit_r = Gtk.Button(label="Edit Route") edit_r.connect("clicked", lambda b: self._edit_route()) bbox.pack_start(edit_r, True, True, 0) rm_r = Gtk.Button(label="Remove Route") rm_r.connect("clicked", lambda b: self._remove_route()) bbox.pack_start(rm_r, True, True, 0) up_r = Gtk.Button(label="↑ Up") up_r.connect("clicked", lambda b: self._move_route(-1)) bbox.pack_start(up_r, True, True, 0) down_r = Gtk.Button(label="↓ Down") down_r.connect("clicked", lambda b: self._move_route(1)) bbox.pack_start(down_r, True, True, 0) self.show_all() if self.run() == Gtk.ResponseType.OK: self._save() self.destroy() def _save(self): name = self._entry_name.get_text().strip() if not name: return strategy = self._combo_strategy.get_active_id() or "failover" routes = [] for i, row in enumerate(self._route_store): if not row[2]: continue routes.append({ "name": row[0] or f"Route {i+1}", "endpoint_name": row[1], "target_url": row[2], "api_key": row[3], "model": row[4], "priority": i + 1, "reasoning_enabled": True, "reasoning_effort": "medium", }) data = load_bgp_pools() if self._existing_name: data["pools"] = [p for p in data["pools"] if p["name"] != self._existing_name] data["pools"].append({"name": name, "strategy": strategy, "routes": routes}) save_bgp_pools(data) self._parent_mgr._parent._on_endpoints_updated() def _add_route(self): endpoints = load_endpoints().get("endpoints", []) if not endpoints: d = Gtk.MessageDialog(self, 0, Gtk.MessageType.INFO, Gtk.ButtonsType.OK, "No endpoints configured. Add endpoints in Manage Endpoints first.") d.run(); d.destroy() return d = BGPRouteDialog(self, endpoints, None) if d.result: r = d.result self._route_store.append([ r.get("name", ""), r.get("endpoint_name", ""), r.get("target_url", ""), r.get("api_key", ""), r.get("model", ""), str(r.get("priority", 99)) ]) def _edit_route(self): sel = self._route_tree.get_selection() m, i = sel.get_selected() if not i: return endpoints = load_endpoints().get("endpoints", []) existing = { "name": m[i][0], "endpoint_name": m[i][1], "target_url": m[i][2], "api_key": m[i][3], "model": m[i][4], "priority": int(m[i][5]) if m[i][5] else 99, } d = BGPRouteDialog(self, endpoints, existing) if d.result: r = d.result m[i][0] = r.get("name", "") m[i][1] = r.get("endpoint_name", "") m[i][2] = r.get("target_url", "") m[i][3] = r.get("api_key", "") m[i][4] = r.get("model", "") m[i][5] = str(r.get("priority", 99)) def _remove_route(self): sel = self._route_tree.get_selection() m, i = sel.get_selected() if i: self._route_store.remove(i) def _move_route(self, direction): sel = self._route_tree.get_selection() m, i = sel.get_selected() if not i: return path = m.get_path(i) idx = path.get_indices()[0] new_idx = idx + direction if new_idx < 0 or new_idx >= len(self._route_store): return row_data = [m[idx][c] for c in range(6)] self._route_store.remove(m.get_iter(Gtk.TreePath(idx))) new_iter = self._route_store.insert(new_idx) for c, v in enumerate(row_data): self._route_store.set_value(new_iter, c, v) class BGPRouteDialog(Gtk.Dialog): def __init__(self, parent, endpoints, existing): Gtk.Dialog.__init__(self, title="BGP Route", parent=parent, modal=True) self.add_button("Cancel", Gtk.ResponseType.CANCEL) self.add_button("OK", Gtk.ResponseType.OK) self.set_default_size(440, 300) self.result = None area = self.get_content_area() area.set_margin_start(12) area.set_margin_end(12) area.set_margin_top(12) area.set_margin_bottom(12) area.set_spacing(6) grid = Gtk.Grid(column_spacing=8, row_spacing=6) area.pack_start(grid, False, False, 0) def add_row(row, label, widget): grid.attach(Gtk.Label(label=label, xalign=1), 0, row, 1, 1) grid.attach(widget, 1, row, 1, 1) self._entry_name = Gtk.Entry(text=existing.get("name", "") if existing else "") add_row(0, "Route Name:", self._entry_name) self._combo_ep = Gtk.ComboBoxText() ep_names = [e["name"] for e in endpoints] for en in ep_names: self._combo_ep.append(en, en) if existing and existing.get("endpoint_name") in ep_names: self._combo_ep.set_active_id(existing["endpoint_name"]) elif ep_names: self._combo_ep.set_active(0) self._combo_ep.connect("changed", lambda b: self._on_ep_changed(endpoints)) add_row(1, "Endpoint:", self._combo_ep) self._entry_url = Gtk.Entry() add_row(2, "URL:", self._entry_url) self._entry_key = Gtk.Entry() self._entry_key.set_visibility(False) add_row(3, "API Key:", self._entry_key) self._combo_model = Gtk.ComboBoxText() add_row(4, "Model:", self._combo_model) if existing: self._entry_url.set_text(existing.get("target_url", "")) self._entry_key.set_text(existing.get("api_key", "")) self._on_ep_changed(endpoints) if existing and existing.get("model"): self._combo_model.set_active_id(existing["model"]) self.show_all() if self.run() == Gtk.ResponseType.OK: ep_name = self._combo_ep.get_active_text() or "" ep = None for e in endpoints: if e["name"] == ep_name: ep = e break self.result = { "name": self._entry_name.get_text().strip() or ep_name, "endpoint_name": ep_name, "target_url": self._entry_url.get_text().strip(), "api_key": self._entry_key.get_text().strip(), "model": self._combo_model.get_active_text() or "", "priority": 99, } if ep: self.result["reasoning_enabled"] = ep.get("reasoning_enabled", True) self.result["reasoning_effort"] = ep.get("reasoning_effort", "medium") self.result["oauth_provider"] = ep.get("oauth_provider", "") self.destroy() def _on_ep_changed(self, endpoints): ep_name = self._combo_ep.get_active_text() ep = None for e in endpoints: if e["name"] == ep_name: ep = e break if ep: self._entry_url.set_text(normalize_base_url(ep.get("base_url", ""))) self._entry_key.set_text(ep.get("api_key", "")) self._combo_model.remove_all() for m in ep.get("models", []): mid = normalize_model_id(m) if m else "" self._combo_model.append(mid, m) if ep.get("default_model"): self._combo_model.set_active_id(normalize_model_id(ep["default_model"])) elif len(ep.get("models", [])) > 0: self._combo_model.set_active(0) _U = { "base": "#0C0E16", "surface0": "#161928", "surface1": "#1E2235", "surface2": "#2A2F47", "text": "#E4E6F0", "subtext": "#B0B4C8", "dim": "#5C6180", "accent": "#7EB8F7", "blue": "#5DA4E8", "sapphire": "#4EC5C1", "green": "#59D4A0", "yellow": "#F0C75E", "red": "#F06A77", "peach": "#F09860", "teal": "#4EC5C1", "lavender": "#A899F0", "sky": "#70C8E8", "maroon": "#C44B5C", "flamingo": "#E878B0", "rosewater": "#F0D0C0", "model_palette": ["#F09860", "#4EC5C1", "#5DA4E8", "#59D4A0", "#F0C75E", "#A899F0", "#70C8E8", "#E878B0", "#C44B5C", "#F0D0C0", "#7EB8F7", "#F06A77"], } _USAGE_STATS_FILE = HOME / ".cache/codex-proxy/usage-stats.json" def _load_usage_stats(): try: if _USAGE_STATS_FILE.exists(): return json.loads(_USAGE_STATS_FILE.read_text()) except Exception: pass return {"providers": {}, "updated": None} def _fmt_tok(n): if n >= 1_000_000: return f"{n/1_000_000:.1f}M" if n >= 1_000: return f"{n/1_000:.1f}K" return str(n) def _fmt_dur(s): if s >= 3600: return f"{s/3600:.1f}h" if s >= 60: return f"{s/60:.1f}m" return f"{s:.1f}s" def _status_pill(success_rate, fail_pct): if fail_pct > 0.15: return ("ERR", _U["red"]) if fail_pct > 0.05: return ("WARN", _U["yellow"]) return ("OK", _U["green"]) def _make_css_widget(css_str): p = Gtk.CssProvider() p.load_from_data(css_str.encode()) return p def _apply_css(widget, css_str): ctx = widget.get_style_context() ctx.add_provider(_make_css_widget(css_str), Gtk.STYLE_PROVIDER_PRIORITY_USER) class UsageWindow(Gtk.Window): def __init__(self, parent): super().__init__(title="Usage Dashboard") self.set_transient_for(parent) self.set_default_size(720, 640) self.set_position(Gtk.WindowPosition.CENTER) self._parent = parent _apply_css(self, f""" window {{ background-color: {_U["base"]}; }} separator {{ background-color: {_U["surface1"]}; }} """) vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=0) self.add(vbox) self._build_header(vbox) self._build_summary_strip(vbox) sep = Gtk.Separator() vbox.pack_start(sep, False, False, 0) self._cards_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6) self._cards_box.set_margin_top(8) sw = Gtk.ScrolledWindow() sw.set_policy(Gtk.PolicyType.NEVER, Gtk.PolicyType.AUTOMATIC) sw.add(self._cards_box) vbox.pack_start(sw, True, True, 0) self._refresh() self.show_all() def _build_header(self, parent): hdr = Gtk.Box(spacing=8) hdr.set_margin_start(16) hdr.set_margin_end(16) hdr.set_margin_top(12) hdr.set_margin_bottom(6) parent.pack_start(hdr, False, False, 0) bolt = Gtk.Label() bolt.set_markup(f'\u26A1') hdr.pack_start(bolt, False, False, 0) title = Gtk.Label() title.set_markup(f'Usage Dashboard') hdr.pack_start(title, False, False, 0) self._status_dots = Gtk.Label() hdr.pack_start(self._status_dots, False, False, 8) self._updated_lbl = Gtk.Label() self._updated_lbl.set_markup(f'Never') hdr.pack_end(self._updated_lbl, False, False, 4) refresh_btn = Gtk.Button(label="Refresh") _apply_css(refresh_btn, f""" button {{ color: {_U["text"]}; background-color: {_U["surface0"]}; border: 1px solid {_U["surface1"]}; border-radius: 6px; padding: 4px 12px; }} button:hover {{ background-color: {_U["surface1"]}; }} """) refresh_btn.connect("clicked", lambda b: self._refresh()) hdr.pack_end(refresh_btn, False, False, 0) def _build_summary_strip(self, parent): strip = Gtk.Box(spacing=0) strip.set_margin_start(16) strip.set_margin_end(16) strip.set_margin_bottom(6) _apply_css(strip, f"box {{ background-color: {_U["surface0"]}; border-radius: 8px; padding: 8px 12px; }}") parent.pack_start(strip, False, False, 0) self._kpi_boxes = {} for key, label, icon in [ ("providers", "Providers", "\U0001F4CA"), ("requests", "Requests", "\u26A1"), ("tokens", "Tokens", "\U0001F9E0"), ("latency", "Avg Latency", "\u23F1"), ]: box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=1) lbl = Gtk.Label() lbl.set_markup(f'{icon} {label}') lbl.set_xalign(0) box.pack_start(lbl, False, False, 0) val = Gtk.Label() val.set_markup(f'-') val.set_xalign(0) box.pack_start(val, False, False, 0) box.set_margin_end(20) strip.pack_start(box, False, False, 0) self._kpi_boxes[key] = val def _refresh(self): for c in self._cards_box.get_children(): self._cards_box.remove(c) stats = _load_usage_stats() updated = stats.get("updated") if updated: self._updated_lbl.set_markup(f'{updated}') providers = stats.get("providers", {}) if not providers: empty = Gtk.Label() empty.set_markup(f'No usage data yet.\nLaunch a session to start tracking.') empty.set_margin_top(60) self._cards_box.pack_start(empty, False, False, 0) self._cards_box.show_all() return total_req = 0 total_tok_in = 0 total_tok_out = 0 total_dur = 0.0 n_ok = 0 n_warn = 0 n_err = 0 sorted_providers = sorted(providers.items(), key=lambda x: x[1].get("total_requests", 0), reverse=True) for prov_name, prov_data in sorted_providers: t = prov_data.get("total_requests", 0) total_req += t total_tok_in += prov_data.get("total_tokens_in", 0) total_tok_out += prov_data.get("total_tokens_out", 0) total_dur += prov_data.get("total_duration_s", 0.0) fail = prov_data.get("failures", 0) fail_pct = fail / t if t > 0 else 0 _, sc = _status_pill(0, fail_pct) if fail_pct > 0.15: n_err += 1 elif fail_pct > 0.05: n_warn += 1 else: n_ok += 1 self._kpi_boxes["providers"].set_markup( f'{len(providers)}') self._kpi_boxes["requests"].set_markup( f'{total_req:,}') tok_sum = total_tok_in + total_tok_out tok_str = f"{_fmt_tok(tok_sum)} in:{_fmt_tok(total_tok_in)} out:{_fmt_tok(total_tok_out)}" if tok_sum else "N/A" self._kpi_boxes["tokens"].set_markup( f'{tok_str}') avg_lat = total_dur / total_req if total_req > 0 else 0 self._kpi_boxes["latency"].set_markup( f'{_fmt_dur(avg_lat)}') dots_parts = [] if n_ok: dots_parts.append(f'\u25CF{n_ok}') if n_warn: dots_parts.append(f'\u25D0{n_warn}') if n_err: dots_parts.append(f'\u2717{n_err}') if dots_parts: self._status_dots.set_markup(" ".join(dots_parts)) for prov_name, prov_data in sorted_providers: card = self._build_card(prov_name, prov_data) self._cards_box.pack_start(card, False, False, 0) self._cards_box.show_all() def _build_card(self, name, data): card = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=0) card.set_margin_start(12) card.set_margin_end(12) _apply_css(card, f""" box {{ background-color: {_U["surface0"]}; border-radius: 10px; border: 1px solid {_U["surface1"]}; }} """) total = data.get("total_requests", 0) ok = data.get("successes", 0) fail = data.get("failures", 0) success_rate = ok / total if total > 0 else 1.0 fail_pct = fail / total if total > 0 else 0 status_text, status_color = _status_pill(success_rate, fail_pct) border_color = status_color _apply_css(card, f""" box {{ background-color: {_U["surface0"]}; border-radius: 10px; border: 1px solid {border_color}; }} """) inner = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=3) inner.set_margin_start(14) inner.set_margin_end(14) inner.set_margin_top(10) inner.set_margin_bottom(10) card.pack_start(inner, False, False, 0) top = Gtk.Box(spacing=6) inner.pack_start(top, False, False, 0) dot = Gtk.Label() dot.set_markup(f'\u25CF') top.pack_start(dot, False, False, 0) name_lbl = Gtk.Label() short = name.replace("https://", "").replace("http://", "").split("/")[0] name_lbl.set_markup(f'{short}') top.pack_start(name_lbl, False, False, 0) pill = Gtk.Label() pill.set_markup(f' {status_text} ') top.pack_start(pill, False, False, 4) req_lbl = Gtk.Label() req_lbl.set_markup(f'{total} req') top.pack_start(req_lbl, False, False, 6) last_used = data.get("last_used", "") if last_used: lu_lbl = Gtk.Label() lu_lbl.set_markup(f'{last_used}') top.pack_end(lu_lbl, False, False, 0) sep1 = Gtk.Separator() _apply_css(sep1, f"separator {{ background-color: {status_color}; margin-top: 4px; }}") inner.pack_start(sep1, False, False, 0) gauge_box = Gtk.Box(spacing=4) gauge_box.set_margin_top(4) inner.pack_start(gauge_box, False, False, 0) gauge_label = Gtk.Label() gauge_label.set_markup(f'\u26A1') gauge_box.pack_start(gauge_label, False, False, 0) bar = Gtk.ProgressBar() bar.set_fraction(success_rate) bar_pct = int(success_rate * 100) bar.set_text(f"{bar_pct}%") bar.set_show_text(True) bar_css = f""" progress {{ background-color: {status_color}; border-radius: 6px; }} trough {{ background-color: {_U["surface1"]}; border-radius: 6px; min-height: 12px; }} """ _apply_css(bar, bar_css) bar.set_hexpand(True) gauge_box.pack_start(bar, True, True, 0) if fail > 0: fail_lbl = Gtk.Label() fail_lbl.set_markup(f'{fail} fail') gauge_box.pack_end(fail_lbl, False, False, 0) metrics_box = Gtk.Box(spacing=0) metrics_box.set_margin_top(4) inner.pack_start(metrics_box, False, False, 0) t_in = data.get("total_tokens_in", 0) t_out = data.get("total_tokens_out", 0) dur = data.get("total_duration_s", 0.0) avg_dur = dur / total if total > 0 else 0 for label, value, color in [ ("Tokens In", f"{_fmt_tok(t_in)}", _U["sapphire"]), ("Tokens Out", f"{_fmt_tok(t_out)}", _U["peach"]), ("Avg Latency", _fmt_dur(avg_dur), _U["sky"]), ("Duration", _fmt_dur(dur), _U["lavender"]), ]: box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=0) l = Gtk.Label() l.set_markup(f'{label}') l.set_xalign(0) box.pack_start(l, False, False, 0) v = Gtk.Label() v.set_markup(f'{value}') v.set_xalign(0) box.pack_start(v, False, False, 0) box.set_margin_end(16) metrics_box.pack_start(box, False, False, 0) models = data.get("models", {}) if models: self._build_models_section(inner, models, total) last_err = data.get("last_error") if last_err: err_box = Gtk.Box(spacing=4) err_box.set_margin_top(4) inner.pack_start(err_box, False, False, 0) icon = Gtk.Label() icon.set_markup(f'\u26A0') err_box.pack_start(icon, False, False, 0) err_lbl = Gtk.Label() err_lbl.set_markup(f'{last_err}') err_lbl.set_xalign(0) err_lbl.set_line_wrap(True) err_box.pack_start(err_lbl, False, False, 0) return card def _build_models_section(self, parent, models, total_req): sep_m = Gtk.Separator() _apply_css(sep_m, f"separator {{ background-color: {_U["lavender"]}; margin-top: 4px; margin-bottom: 2px; }}") parent.pack_start(sep_m, False, False, 0) header = Gtk.Box(spacing=4) header.set_margin_top(2) parent.pack_start(header, False, False, 0) icon = Gtk.Label() icon.set_markup(f'\U0001F916') header.pack_start(icon, False, False, 0) lbl = Gtk.Label() lbl.set_markup(f'Models') header.pack_start(lbl, False, False, 0) sorted_models = sorted(models.items(), key=lambda x: x[1].get("requests", 0), reverse=True) if total_req > 0: comp_bar = Gtk.Box(spacing=0) _apply_css(comp_bar, f"box {{ background-color: {_U["surface1"]}; border-radius: 4px; min-height: 8px; margin-top: 2px; }}") parent.pack_start(comp_bar, False, False, 0) for i, (mname, mdata) in enumerate(sorted_models): m_req = mdata.get("requests", 0) pct = m_req / total_req if pct < 0.01: continue seg = Gtk.Box() color = _U["model_palette"][i % len(_U["model_palette"])] _apply_css(seg, f"box {{ background-color: {color}; min-height: 8px; }}") seg.set_size_request(max(int(pct * 400), 4), 8) comp_bar.pack_start(seg, False, False, 0) models_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=1) models_box.set_margin_top(2) parent.pack_start(models_box, False, False, 0) for i, (mname, mdata) in enumerate(sorted_models[:6]): row = Gtk.Box(spacing=6) models_box.pack_start(row, False, False, 0) color = _U["model_palette"][i % len(_U["model_palette"])] dot = Gtk.Label() dot.set_markup(f'\u25CF') row.pack_start(dot, False, False, 0) m_lbl = Gtk.Label() m_lbl.set_markup(f'{mname}') m_lbl.set_xalign(0) m_lbl.set_size_request(120, -1) row.pack_start(m_lbl, False, False, 0) m_req = mdata.get("requests", 0) pct = m_req / total_req * 100 if total_req > 0 else 0 m_bar = Gtk.ProgressBar() m_bar.set_fraction(m_req / total_req if total_req > 0 else 0) _apply_css(m_bar, f""" progress {{ background-color: {color}; border-radius: 3px; }} trough {{ background-color: {_U["surface1"]}; border-radius: 3px; min-height: 6px; }} """) m_bar.set_size_request(80, -1) row.pack_start(m_bar, False, False, 0) pct_lbl = Gtk.Label() pct_lbl.set_markup(f'{pct:.0f}% ({m_req})') row.pack_start(pct_lbl, False, False, 0) m_in = mdata.get("tokens_in", 0) m_out = mdata.get("tokens_out", 0) if m_in or m_out: tok_lbl = Gtk.Label() tok_lbl.set_markup(f'in:{_fmt_tok(m_in)} out:{_fmt_tok(m_out)}') row.pack_end(tok_lbl, False, False, 0) def main(): for d in [LOG_DIR, PROXY_CONFIG_DIR]: d.mkdir(parents=True, exist_ok=True) # Create default endpoints if none exist if not ENDPOINTS_FILE.exists(): save_endpoints({ "default": "OpenAI", "endpoints": [ {"name": "OpenAI", "backend_type": "native", "base_url": "https://api.openai.com/v1", "api_key": "", "default_model": "gpt-4o", "models": ["gpt-4o", "gpt-4o-mini"], "provider_preset": "OpenAI"}, {"name": "Z.AI", "backend_type": "openai-compat", "base_url": "https://api.z.ai/api/coding/paas/v4", "api_key": "", "default_model": "glm-5.1", "models": ["glm-4.5", "glm-4.5-air", "glm-4.6", "glm-4.7", "glm-5", "glm-5-turbo", "glm-5.1"], "provider_preset": "Custom"}, ], }) w = LauncherWin() w.connect("destroy", Gtk.main_quit) Gtk.main() class RequestHistoryWindow(Gtk.Window): _SNAP_DIR = Path.home() / ".cache/codex-proxy/requests" def __init__(self, parent): Gtk.Window.__init__(self, title="Request History") self.set_transient_for(parent) self.set_default_size(720, 500) self.set_position(Gtk.WindowPosition.CENTER) vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6) vbox.set_margin_start(10) vbox.set_margin_end(10) vbox.set_margin_top(10) vbox.set_margin_bottom(10) self.add(vbox) hdr = Gtk.Box(spacing=8) vbox.pack_start(hdr, False, False, 0) lbl = Gtk.Label(label="Request History") lbl.set_use_markup(True) hdr.pack_start(lbl, False, False, 0) refresh_btn = Gtk.Button(label="Refresh") refresh_btn.connect("clicked", lambda b: self._load()) hdr.pack_end(refresh_btn, False, False, 0) clear_btn = Gtk.Button(label="Clear All") clear_btn.connect("clicked", lambda b: self._clear_all()) hdr.pack_end(clear_btn, False, False, 0) paned = Gtk.Paned(orientation=Gtk.Orientation.VERTICAL) vbox.pack_start(paned, True, True, 0) top_sw = Gtk.ScrolledWindow() top_sw.set_policy(Gtk.PolicyType.AUTOMATIC, Gtk.PolicyType.AUTOMATIC) paned.pack1(top_sw, resize=True, shrink=False) self._store = Gtk.ListStore(str, str, str, str, str, str) self._tree = Gtk.TreeView(model=self._store) for i, (title, w) in enumerate([("Time", 140), ("Model", 140), ("Status", 80), ("Duration", 70), ("ID", 180), ("Error", 120)]): col = Gtk.TreeViewColumn(title, Gtk.CellRendererText(), text=i) col.set_resizable(True) col.set_min_width(w) self._tree.append_column(col) self._tree.connect("row-activated", self._on_row_activated) top_sw.add(self._tree) self._detail = Gtk.TextView() self._detail.set_editable(False) self._detail.set_monospace(True) self._detail.set_wrap_mode(Gtk.WrapMode.WORD_CHAR) bottom_sw = Gtk.ScrolledWindow() bottom_sw.set_policy(Gtk.PolicyType.AUTOMATIC, Gtk.PolicyType.AUTOMATIC) bottom_sw.add(self._detail) paned.pack2(bottom_sw, resize=True, shrink=False) self._snapshots = [] self._load() self.show_all() def _load(self): self._store.clear() self._snapshots = [] snap_dir = self._SNAP_DIR if not snap_dir.exists(): return files = sorted(snap_dir.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True) for f in files[:200]: try: data = json.loads(f.read_text()) meta = data.get("_meta", {}) self._snapshots.append(data) ts = meta.get("ts_iso", "")[:19].replace("T", " ") model = meta.get("model", "?") status = meta.get("status", "unknown") dur = f"{meta['duration_s']:.1f}s" if meta.get("duration_s") is not None else "-" rid = meta.get("request_id", "")[:28] err = (meta.get("error") or "")[:60] self._store.append([ts, model, status, dur, rid, err]) except Exception: pass def _on_row_activated(self, tree, path, column): idx = path[0] if idx < len(self._snapshots): data = self._snapshots[idx] buf = self._detail.get_buffer() buf.set_text(json.dumps(data, indent=2, ensure_ascii=False)[:50000]) def _clear_all(self): d = Gtk.MessageDialog(self, 0, Gtk.MessageType.WARNING, Gtk.ButtonsType.YES_NO, "Delete all request snapshots?") r = d.run() d.destroy() if r != Gtk.ResponseType.YES: return snap_dir = self._SNAP_DIR if snap_dir.exists(): for f in snap_dir.glob("*.json"): try: f.unlink() except Exception: pass self._store.clear() self._snapshots = [] self._detail.get_buffer().set_text("") class BenchmarkWindow(Gtk.Window): _BENCH_PROMPT = "In exactly 3 bullet points, explain why the sky is blue." _BENCH_TOOLS = [{"type": "function", "function": {"name": "get_weather", "parameters": {"type": "object", "properties": {"city": {"type": "string"}}}}}] def __init__(self, parent): Gtk.Window.__init__(self, title="Model Benchmark") self.set_transient_for(parent) self.set_default_size(820, 560) self.set_position(Gtk.WindowPosition.CENTER) self._running = False self._ep_data = load_endpoints() vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8) vbox.set_margin_start(10) vbox.set_margin_end(10) vbox.set_margin_top(10) vbox.set_margin_bottom(10) self.add(vbox) hdr = Gtk.Box(spacing=8) vbox.pack_start(hdr, False, False, 0) lbl = Gtk.Label(label="Multi-Provider Benchmark") lbl.set_use_markup(True) hdr.pack_start(lbl, False, False, 0) self._run_btn = Gtk.Button(label="Run Benchmark") self._run_btn.connect("clicked", lambda b: self._run()) hdr.pack_end(self._run_btn, False, False, 0) lanes_box = Gtk.Box(spacing=6) vbox.pack_start(lanes_box, False, False, 0) self._lanes = [] for i in range(3): frame = Gtk.Frame(label=f"{'A' if i == 0 else 'B' if i == 1 else 'C'}" if i < 2 else None) if i == 2: self._c_frame = frame self._c_check = Gtk.CheckButton(label="Enable Lane C") self._c_check.set_active(False) frame.set_label_widget(self._c_check) frame.set_sensitive(False) self._c_check.connect("toggled", lambda b: frame.set_sensitive(b.get_active())) inner = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=4) inner.set_margin_start(6) inner.set_margin_end(6) inner.set_margin_top(4) inner.set_margin_bottom(4) frame.add(inner) lanes_box.pack_start(frame, True, True, 0) row_ep = Gtk.Box(spacing=4) inner.pack_start(row_ep, False, False, 0) row_ep.pack_start(Gtk.Label(label="Endpoint:"), False, False, 0) ep_combo = Gtk.ComboBoxText() for ep in self._ep_data.get("endpoints", []): ep_combo.append(ep["name"], ep["name"]) row_ep.pack_start(ep_combo, True, True, 0) row_m = Gtk.Box(spacing=4) inner.pack_start(row_m, False, False, 0) row_m.pack_start(Gtk.Label(label="Model:"), False, False, 0) m_combo = Gtk.ComboBoxText() m_combo.set_entry_text_column(0) row_m.pack_start(m_combo, True, True, 0) ep_combo.connect("changed", lambda b, mc=m_combo: self._update_lane_models(b, mc)) self._lanes.append({"ep": ep_combo, "model": m_combo}) default_name = self._ep_data.get("default") if default_name: self._lanes[0]["ep"].set_active_id(default_name) eps = self._ep_data.get("endpoints", []) if len(eps) > 1: self._lanes[1]["ep"].set_active_id(eps[1]["name"]) elif eps: self._lanes[1]["ep"].set_active_id(eps[0]["name"]) if len(eps) > 2: self._lanes[2]["ep"].set_active_id(eps[2]["name"]) elif len(eps) > 1: self._lanes[2]["ep"].set_active_id(eps[1]["name"]) tests_box = Gtk.Box(spacing=6) vbox.pack_start(tests_box, False, False, 0) self._test_ttft = Gtk.CheckButton(label="Time to First Token") self._test_ttft.set_active(True) tests_box.pack_start(self._test_ttft, False, False, 0) self._test_total = Gtk.CheckButton(label="Total Latency") self._test_total.set_active(True) tests_box.pack_start(self._test_total, False, False, 0) self._test_tools = Gtk.CheckButton(label="Tool Call") self._test_tools.set_active(True) tests_box.pack_start(self._test_tools, False, False, 0) self._test_tps = Gtk.CheckButton(label="Tokens/sec") self._test_tps.set_active(True) tests_box.pack_start(self._test_tps, False, False, 0) results_sw = Gtk.ScrolledWindow() results_sw.set_policy(Gtk.PolicyType.AUTOMATIC, Gtk.PolicyType.AUTOMATIC) vbox.pack_start(results_sw, True, True, 0) self._results_store = Gtk.ListStore(str, str, str, str, str) self._results_tree = Gtk.TreeView(model=self._results_store) for i, title in enumerate(["Test", "Lane A", "Lane B", "Lane C", "Winner"]): col = Gtk.TreeViewColumn(title, Gtk.CellRendererText(), text=i) col.set_resizable(True) self._results_tree.append_column(col) results_sw.add(self._results_tree) self._status = Gtk.Label(label="Select endpoints and models per lane, then Run Benchmark.") self._status.set_xalign(0) vbox.pack_start(self._status, False, False, 0) self.show_all() def _update_lane_models(self, ep_combo, model_combo): name = ep_combo.get_active_text() if not name: return ep = get_endpoint(name) models = (ep or {}).get("models", []) active = model_combo.get_active_text() model_combo.remove_all() for m in models: model_combo.append(m, m) if active and any(m == active for m in models): model_combo.set_active_id(active) elif models: model_combo.set_active(0) def _collect_lanes(self): active = [] for i, lane in enumerate(self._lanes): if i == 2 and not self._c_check.get_active(): continue ep_name = lane["ep"].get_active_text() model = lane["model"].get_active_text() if not ep_name or not model: continue ep = get_endpoint(ep_name) if not ep: continue active.append({"ep": ep, "model": model, "label": f"{ep_name}/{model}"}) return active def _run(self): if self._running: return lanes = self._collect_lanes() if len(lanes) < 2: self._status.set_text("Need at least 2 lanes with endpoint + model selected.") return self._running = True self._run_btn.set_sensitive(False) self._results_store.clear() self._status.set_text("Running benchmark…") threading.Thread(target=self._run_bench, args=(lanes,), daemon=True).start() def _bench_single(self, ep, model, stream, with_tools=False): url = normalize_base_url(ep.get("base_url", "")) key = (ep.get("api_key") or "").strip() bt = ep.get("backend_type", "openai-compat") if bt == "anthropic": test_url = f"{url}/v1/messages" headers = {"x-api-key": key, "anthropic-version": "2023-06-01", "content-type": "application/json"} body = {"model": model, "max_tokens": 100, "stream": stream, "messages": [{"role": "user", "content": self._BENCH_PROMPT}]} if with_tools: body["tools"] = self._BENCH_TOOLS body["messages"] = [{"role": "user", "content": "Use get_weather for Paris"}] data = json.dumps(body).encode() elif bt.startswith("gemini-oauth"): token_name = "google-antigravity-oauth-token.json" if "antigravity" in bt else "google-cli-oauth-token.json" token_path = Path.home() / f".cache/codex-proxy/{token_name}" oauth_token = "" if token_path.exists(): try: td = json.loads(token_path.read_text()) oauth_token = td.get("access_token", "") except Exception: pass test_url = f"{url}/v1/chat/completions" headers = {"Authorization": f"Bearer {oauth_token}", "content-type": "application/json"} body = {"model": model, "max_tokens": 100, "stream": stream, "messages": [{"role": "user", "content": self._BENCH_PROMPT}]} if with_tools: body["tools"] = self._BENCH_TOOLS body["messages"] = [{"role": "user", "content": "Use get_weather for Paris"}] data = json.dumps(body).encode() else: test_url = f"{url}/chat/completions" headers = {"Authorization": f"Bearer {key}", "content-type": "application/json"} body = {"model": model, "max_tokens": 100, "stream": stream, "messages": [{"role": "user", "content": self._BENCH_PROMPT}]} if with_tools: body["tools"] = self._BENCH_TOOLS body["messages"] = [{"role": "user", "content": "Use get_weather for Paris"}] data = json.dumps(body).encode() req = urllib.request.Request(test_url, data=data, headers=headers, method="POST") t0 = time.time() ttft = None try: resp = urllib.request.urlopen(req, timeout=60) if stream: first_chunk_time = None chunks = [] while True: chunk = resp.read(4096) if not chunk: break if first_chunk_time is None: first_chunk_time = time.time() ttft = first_chunk_time - t0 chunks.append(chunk) total = time.time() - t0 result_text = b"".join(chunks).decode(errors="replace")[:300] else: raw = resp.read() total = time.time() - t0 result_text = raw.decode(errors="replace")[:300] payload = json.loads(raw) choices = payload.get("choices", []) if choices: msg = choices[0].get("message", {}) if with_tools: tcs = msg.get("tool_calls", []) has_tools = len(tcs) > 0 return {"ttft": ttft or total, "total": total, "detail": f"tools={has_tools}, tok={payload.get('usage', {}).get('total_tokens', '?')}"} content = msg.get("content", "")[:50] return {"ttft": ttft or total, "total": total, "detail": f"{content[:40]}… tok={payload.get('usage', {}).get('total_tokens', '?')}"} return {"ttft": ttft or total, "total": total, "detail": result_text[:60]} except Exception as e: total = time.time() - t0 return {"ttft": ttft or total, "total": total, "detail": f"Error: {str(e)[:40]}"} def _bench_tps(self, ep, model): url = normalize_base_url(ep.get("base_url", "")) key = (ep.get("api_key") or "").strip() bt = ep.get("backend_type", "openai-compat") prompt = "Write a detailed paragraph about artificial intelligence in at least 150 words." max_tok = 512 if bt == "anthropic": test_url = f"{url}/v1/messages" headers = {"x-api-key": key, "anthropic-version": "2023-06-01", "content-type": "application/json"} body = json.dumps({"model": model, "max_tokens": max_tok, "stream": True, "messages": [{"role": "user", "content": prompt}]}).encode() elif bt.startswith("gemini-oauth"): token_name = "google-antigravity-oauth-token.json" if "antigravity" in bt else "google-cli-oauth-token.json" token_path = Path.home() / f".cache/codex-proxy/{token_name}" oauth_token = "" if token_path.exists(): try: td = json.loads(token_path.read_text()) oauth_token = td.get("access_token", "") except Exception: pass test_url = f"{url}/v1/chat/completions" headers = {"Authorization": f"Bearer {oauth_token}", "content-type": "application/json"} body = json.dumps({"model": model, "max_tokens": max_tok, "stream": True, "messages": [{"role": "user", "content": prompt}]}).encode() else: test_url = f"{url}/chat/completions" headers = {"Authorization": f"Bearer {key}", "content-type": "application/json"} body = json.dumps({"model": model, "max_tokens": max_tok, "stream": True, "messages": [{"role": "user", "content": prompt}]}).encode() req = urllib.request.Request(test_url, data=body, headers=headers, method="POST") t0 = time.time() first_token_t = None token_count = 0 try: resp = urllib.request.urlopen(req, timeout=90) buf = b"" while True: chunk = resp.read(4096) if not chunk: break if first_token_t is None: first_token_t = time.time() buf += chunk total = time.time() - t0 text = buf.decode(errors="replace") if bt == "anthropic": for line in text.split("\n"): if "content_block_delta" in line and "text_delta" in line: try: idx = line.index("{") evt = json.loads(line[idx:]) delta = evt.get("delta", {}) token_count += len(delta.get("text", "")) / 4 except Exception: pass if token_count == 0: token_count = max(1, len(text) / 4) else: for line in text.split("\n"): if line.startswith("data: ") and line != "data: [DONE]": try: d = json.loads(line[6:]) content = d.get("choices", [{}])[0].get("delta", {}).get("content", "") if content: token_count += max(1, len(content) / 4) except Exception: pass if token_count == 0: token_count = max(1, len(text) / 4) gen_time = (time.time() - first_token_t) if first_token_t else total tps = token_count / gen_time if gen_time > 0 else 0 return {"tps": tps, "tokens": int(token_count), "gen_time": gen_time, "total": total, "detail": f"{int(token_count)} tok / {gen_time:.1f}s"} except Exception as e: total = time.time() - t0 return {"tps": 0, "tokens": 0, "gen_time": total, "total": total, "detail": f"Error: {str(e)[:40]}"} def _run_bench(self, lanes): results = [] tests = [] if self._test_ttft.get_active(): tests.append(("TTFT (stream)", True, False)) if self._test_total.get_active(): tests.append(("Total latency", False, False)) if self._test_tools.get_active(): tests.append(("Tool call", False, True)) run_tps = self._test_tps.get_active() for test_name, stream, tools in tests: lane_results = [] for lane in lanes: label = lane["label"] GLib.idle_add(self._status.set_text, f"{test_name}: {label}…") r = self._bench_single(lane["ep"], lane["model"], stream, tools) lane_results.append((label, r)) metric = "ttft" if stream else "total" values = [(lr[0], lr[1][metric]) for lr in lane_results] sorted_v = sorted(values, key=lambda x: x[1]) best_val = sorted_v[0][1] second_val = sorted_v[1][1] if best_val < second_val * 0.85: winner = sorted_v[0][0] else: winner = "Tie" cols = [] for lr in lane_results: v = lr[1][metric] cols.append(f"{v:.2f}s ({lr[1]['detail'][:30]})") while len(cols) < 3: cols.append("—") cols.append(winner) results.append(tuple([test_name] + cols)) if run_tps: lane_tps = [] for lane in lanes: label = lane["label"] GLib.idle_add(self._status.set_text, f"Tokens/sec: {label}…") r = self._bench_tps(lane["ep"], lane["model"]) lane_tps.append((label, r)) tps_vals = [(lt[0], lt[1]["tps"]) for lt in lane_tps] sorted_tps = sorted(tps_vals, key=lambda x: x[1], reverse=True) best_tps = sorted_tps[0][1] second_tps = sorted_tps[1][1] if len(sorted_tps) > 1 else 0 if best_tps > 0 and second_tps > 0 and best_tps > second_tps * 1.15: winner_tps = sorted_tps[0][0] else: winner_tps = "Tie" cols_tps = [] for lt in lane_tps: tps = lt[1]["tps"] cols_tps.append(f"{tps:.1f} t/s ({lt[1]['detail'][:25]})") while len(cols_tps) < 3: cols_tps.append("—") cols_tps.append(winner_tps) results.append(tuple(["Tokens/sec"] + cols_tps)) def _show(): for row in results: self._results_store.append(row) self._status.set_text("Benchmark complete.") self._running = False self._run_btn.set_sensitive(True) GLib.idle_add(_show) if __name__ == "__main__": main()