#!/usr/bin/env python3 """Codex Launcher GUI — manage endpoints, launch Desktop or CLI with any provider.""" import gi gi.require_version("Gtk", "3.0") from gi.repository import Gtk, GLib import subprocess, os, signal, sys, threading, time, json, urllib.request, tempfile, shutil import hashlib from pathlib import Path HOME = Path.home() START_SH = Path("/opt/codex-desktop/start.sh") CONFIG = HOME / ".codex/config.toml" CONFIG_BAK = HOME / ".codex/config.toml.launcher-bak" CLEANUP = HOME / ".local/bin/cleanup-codex-stale.sh" PROXY = HOME / ".local/bin/translate-proxy.py" ENDPOINTS_FILE = HOME / ".codex/endpoints.json" BGP_POOLS_FILE = HOME / ".codex/bgp-pools.json" LOG_DIR = HOME / ".cache/codex-desktop" LAUNCH_LOG = LOG_DIR / "launcher.log" PROXY_CONFIG_DIR = HOME / ".cache/codex-proxy" DEFAULT_CONFIG = """model = "" model_provider = "" model_catalog_json = "" """ CHANGELOG = [ ("2.6.1", "2026-05-20", [ "Google OAuth rebuilt to emulate Gemini CLI — no client_secret.json needed", "Uses Google's public OAuth client_id (same as gemini-cli)", "PKCE + CSRF state protection for secure auth", "Just click OAuth Login → browser opens → authorize → done", "Includes cloud-platform scope for Gemini Code Assist compatibility", ]), ("2.6.0", "2026-05-20", [ "Usage Dashboard — per-provider request/token/latency tracking", "Visual cards with success rate bars, model breakdown, error tracking", "Google OAuth: browse for client_secret.json instead of fixed path", ]), ("2.5.1", "2026-05-20", [ "Adaptive retry for 429/502/503 errors with exponential backoff", "BGP routes also retry transient errors before failing over", "Proxy socket reuse — no more 'Address already in use' crashes", "BGP route count shown at proxy startup", ]), ("2.5.0", "2026-05-20", [ "AI BGP — multi-provider routing with automatic failover", "Create BGP pools with ordered routes from any configured endpoint", "Each route uses its own endpoint URL, API key, and model", "Failover strategy: tries primary, falls back on error/timeout", "BGP pools appear in endpoint dropdown with shuffle icon", "Up/down reordering for route priority in pool editor", "Fixed TOML config breakage from multi-line paste in fields", ]), ("2.4.0", "2026-05-20", [ "Added OpenAdapter provider preset (api.openadapter.in)", "One API key access to 40+ models — GLM, DeepSeek, Kimi, Qwen, Claude, GPT, Gemini", "Fixed Add/Edit dialog crash (missing _on_reasoning_toggled method)", "Redesigned Google OAuth flow with live status dialog", ]), ("2.3.2", "2026-05-20", [ "Added Google Gemini provider with OAuth support", "Two presets: 'Google Gemini (API Key)' and 'Google Gemini (OAuth)'", "OAuth Login button in endpoint editor — full Google OAuth2 flow with auto-refresh", "Auto-refreshes OAuth access tokens when expired (no manual re-login needed)", "Supports gemini-2.5-flash, gemini-2.5-pro, gemini-2.0-flash, and more", "Uses Gemini's OpenAI-compatible endpoint — works with existing proxy", ]), ("2.3.0", "2026-05-20", [ "Adaptive Crof self-healing system — auto-adjusts to Crof model limits", "Tracks per-model success/failure history, learns item count limits dynamically", "Proactively compacts input when above learned limit before sending to Crof", "Auto-retries on finish_reason=length — aggressively compacts and resends", "Prevents 'stream disconnected' and 'incomplete' errors on long conversations", ]), ("2.2.1", "2026-05-20", [ "Fixed compaction orphaning function_call_output items — root cause of Crof incomplete responses", "Compaction now respects function_call/function_call_output pairs — no more dangling tool results", "Fixed reasoning control: reasoning_effort=none now always sends enable_thinking=false too", ]), ("2.2.0", "2026-05-20", [ "Added per-provider Reasoning On/Off toggle in endpoint editor", "Added Reasoning Effort level per provider: None, Minimal, Low, Medium, High, Max", "When reasoning is OFF: sends enable_thinking=false + reasoning_effort=none to upstream API", "When reasoning is ON: sends user-selected effort level (default: Medium)", "Fixes Crof mimo-v2.5-pro and similar reasoning models exhausting output tokens", "Strip reasoning_content from proxy output — Codex doesn't use it", "Force max_tokens=64000 minimum for openai-compat providers", ]), ("2.1.3", "2026-05-19", [ "Fixed Crof mimo-v2.5-pro stopping: reasoning_content exhausted all output tokens", "Strip reasoning_content from proxy output — Codex doesn't use it, avoids token waste", "Force max_tokens=64000 minimum for openai-compat providers — gives models room for both reasoning and content", ]), ("2.1.2", "2026-05-19", [ "Fixed Crof.ai and providers stopping after first tool call (root cause: None tool IDs)", "Codex sends function_call items with id=None — proxy now matches tool results to calls by position", "Fixed orphan message output item when response has only tool calls (no text)", "Auto-trims long conversations (>30 items) to prevent context overflow on providers like Crof", "Added request/response logging to ~/.cache/codex-proxy/requests.log", ]), ("2.1.1", "2026-05-19", [ "Fixed proxy: map 'developer' role to 'system' for Chat Completions providers", "Fixed proxy: map 'developer' role to 'user' for Anthropic providers", "Forward 'instructions' field from Responses API as system message/param", "Fixes DeepSeek and other providers rejecting unknown 'developer' role", ]), ("2.1.0", "2026-05-19", [ "Added Codex auth status detection (codex login status)", "Added Re-login button to re-authenticate via codex login", "Auto-checks auth before launching Codex Default mode", "Warns if OAuth token expired or missing before launch", ]), ("2.0.1", "2026-05-19", [ "Added Codex CLI/Desktop installation verifier to main page", "Disables Desktop/CLI launch buttons when corresponding tool is missing", "Shows install instructions in status area on startup", ]), ("2.0.0", "2026-05-19", [ "Initial release: multi-provider Codex Launcher", "Translation proxy: Responses API to Chat Completions + Anthropic Messages", "GTK endpoint manager with 10+ provider presets", "Codex Default mode (built-in OAuth, zero config)", "Browser UA injection for Cloudflare-protected providers (OpenCode)", "Streaming SSE, tool calls, reasoning content support", "Profile backup/import, model auto-fetch, bulk import", "Refresh Models in background thread", "URL normalization to prevent double-path bugs", "Config backup/restore around sessions", ".deb installer package", ]), ] PROVIDER_PRESETS = { "Custom": { "backend_type": "openai-compat", "base_url": "", "models": [], }, "OpenAI": { "backend_type": "native", "base_url": "https://api.openai.com/v1", "models": ["gpt-4o", "gpt-4o-mini"], }, "Anthropic": { "backend_type": "anthropic", "base_url": "https://api.anthropic.com/v1", "models": ["claude-sonnet-4-5", "claude-3-5-haiku-latest"], }, "OpenCode Zen (OpenAI-compatible)": { "backend_type": "openai-compat", "base_url": "https://opencode.ai/zen/v1", "models": [ "glm-5.1", "glm-5", "kimi-k2.5", "kimi-k2.6", "minimax-m2.7", "minimax-m2.5", "minimax-m2.5-free", "deepseek-v4-flash-free", "nemotron-3-super-free", "qwen3.6-plus", "qwen3.5-plus", "big-pickle", ], }, "OpenCode Zen (Anthropic)": { "backend_type": "anthropic", "base_url": "https://opencode.ai/zen/v1", "models": [ "claude-opus-4-7", "claude-opus-4-6", "claude-opus-4-5", "claude-opus-4-1", "claude-sonnet-4-6", "claude-sonnet-4-5", "claude-sonnet-4", "claude-haiku-4-5", "claude-3-5-haiku", ], }, "OpenCode Go (OpenAI-compatible)": { "backend_type": "openai-compat", "base_url": "https://opencode.ai/zen/go/v1", "models": [ "glm-5.1", "glm-5", "kimi-k2.5", "kimi-k2.6", "mimo-v2.5", "mimo-v2.5-pro", "minimax-m2.7", "minimax-m2.5", "qwen3.6-plus", "qwen3.5-plus", "deepseek-v4-pro", "deepseek-v4-flash", ], }, "OpenCode Go (Anthropic)": { "backend_type": "anthropic", "base_url": "https://opencode.ai/zen/go/v1", "models": ["minimax-m2.7", "minimax-m2.5"], }, "Crof.ai": { "backend_type": "openai-compat", "base_url": "https://crof.ai/v1", "models": [], }, "NVIDIA NIM": { "backend_type": "openai-compat", "base_url": "https://integrate.api.nvidia.com/v1", "models": [], }, "Kilo.ai Gateway": { "backend_type": "openai-compat", "base_url": "https://api.kilo.ai/api/gateway", "models": [], }, "Command Code": { "backend_type": "command-code", "base_url": "https://api.commandcode.ai", "cc_version": "0.26.8", "models": [ "deepseek/deepseek-v4-flash", "deepseek/deepseek-v4-pro", "anthropic:claude-sonnet-4-6", "anthropic:claude-haiku-4-5-20251001", "anthropic:claude-opus-4-7", "anthropic:claude-opus-4-6", "openai:gpt-5.5", "openai:gpt-5.4", "openai:gpt-5.4-mini", "openai:gpt-5.3-codex", "moonshotai/Kimi-K2.6", "moonshotai/Kimi-K2.5", "zai-org/GLM-5.1", "zai-org/GLM-5", "MiniMaxAI/MiniMax-M2.7", "MiniMaxAI/MiniMax-M2.5", "Qwen/Qwen3.6-Max-Preview", "Qwen/Qwen3.6-Plus", "stepfun/Step-3.5-Flash", "google/gemini-3.1-flash-lite", ], }, "OpenRouter": { "backend_type": "openai-compat", "base_url": "https://openrouter.ai/api/v1", "models": [], }, "Google Gemini (API Key)": { "backend_type": "openai-compat", "base_url": "https://generativelanguage.googleapis.com/v1beta/openai", "models": [ "gemini-2.5-flash", "gemini-2.5-pro", "gemini-2.0-flash", "gemini-2.0-flash-lite", "gemini-2.5-flash-preview-native-audio-dialog", ], }, "Google Gemini (OAuth)": { "backend_type": "openai-compat", "base_url": "https://generativelanguage.googleapis.com/v1beta/openai", "oauth_provider": "google", "models": [ "gemini-2.5-flash", "gemini-2.5-pro", "gemini-2.0-flash", "gemini-2.0-flash-lite", "gemini-2.5-flash-preview-native-audio-dialog", ], }, "OpenAdapter": { "backend_type": "openai-compat", "base_url": "https://api.openadapter.in/v1", "models": [ "0G-DeepSeek-V3", "0G-DeepSeek-v4-Pro", "0G-GLM-5", "0G-GLM-5.1", "0G-Qwen3.6", "0G-Qwen-VL", ], }, } def safe_name(name): base = "".join(ch if ch.isalnum() or ch in "._-" else "_" for ch in name).strip("._-") or "endpoint" digest = hashlib.sha1(name.encode("utf-8")).hexdigest()[:8] return f"{base}-{digest}" def label_for_backend(backend_type): return { "openai-compat": "OpenAI-compatible", "anthropic": "Anthropic", "command-code": "Command Code", "native": "Native", }.get(backend_type, backend_type) def normalize_model_id(text): value = text.strip().lower() if not value: return "" value = value.replace("/", "-") value = value.replace("+", "plus") value = "".join(ch if ch.isalnum() or ch in ".-" else "-" for ch in value) while "--" in value: value = value.replace("--", "-") return value.strip("-.") def normalize_base_url(url): base = (url or "").strip().rstrip("/") for suffix in ("/chat/completions", "/responses", "/messages"): if base.endswith(suffix): base = base[: -len(suffix)] break return base.rstrip("/") def parse_model_list(text): out = [] seen = set() for raw in text.replace(",", "\n").splitlines(): mid = normalize_model_id(raw) if mid and mid not in seen: seen.add(mid) out.append(mid) return out def apply_provider_preset(endpoint, preset_name): preset = PROVIDER_PRESETS.get(preset_name) if not preset: return endpoint updated = dict(endpoint) updated["provider_preset"] = preset_name updated["backend_type"] = preset["backend_type"] updated["base_url"] = normalize_base_url(preset["base_url"]) if preset.get("cc_version") and not updated.get("cc_version"): updated["cc_version"] = preset["cc_version"] if not updated.get("models"): updated["models"] = list(preset.get("models", [])) if not updated.get("default_model") and updated.get("models"): updated["default_model"] = updated["models"][0] return updated def endpoint_models_url(endpoint): base = normalize_base_url(endpoint.get("base_url") or "") if not base: return "" return f"{base}/models" def endpoint_model_headers(endpoint): key = (endpoint.get("api_key") or "").strip() backend = endpoint.get("backend_type", "openai-compat") headers = {} if backend == "anthropic": if key: headers["x-api-key"] = key headers["anthropic-version"] = "2023-06-01" elif key: headers["Authorization"] = f"Bearer {key}" return headers def fetch_models_for_endpoint(endpoint, timeout=10): url = endpoint_models_url(endpoint) if not url: return None, "Base URL is empty" try: req = urllib.request.Request(url, headers=endpoint_model_headers(endpoint)) raw = urllib.request.urlopen(req, timeout=timeout).read() payload = json.loads(raw) items = payload.get("data") or payload.get("models") or [] ids = [] seen = set() for item in items: mid = item.get("id") if isinstance(item, dict) else None if mid and mid not in seen: seen.add(mid) ids.append(mid) if not ids: return None, "No models returned" return ids, None except Exception as e: return None, str(e) def refresh_endpoint_models(endpoint): ids, err = fetch_models_for_endpoint(endpoint) if not ids: return None, err updated = dict(endpoint) updated["models"] = ids if updated.get("default_model") not in ids: updated["default_model"] = ids[0] return updated, None # ═══════════════════════════════════════════════════════════════════ # Endpoint storage # ═══════════════════════════════════════════════════════════════════ def load_endpoints(): if ENDPOINTS_FILE.exists(): try: return json.loads(ENDPOINTS_FILE.read_text()) except Exception: pass return {"default": None, "endpoints": []} def save_endpoints(data): ENDPOINTS_FILE.parent.mkdir(parents=True, exist_ok=True) ENDPOINTS_FILE.write_text(json.dumps(data, indent=2)) def load_bgp_pools(): if BGP_POOLS_FILE.exists(): try: return json.loads(BGP_POOLS_FILE.read_text()) except Exception: pass return {"pools": []} def save_bgp_pools(data): BGP_POOLS_FILE.parent.mkdir(parents=True, exist_ok=True) BGP_POOLS_FILE.write_text(json.dumps(data, indent=2)) def get_endpoint(name): for e in load_endpoints()["endpoints"]: if e["name"] == name: return e return None def now_utc_iso(): return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) def build_profile_bundle(): return { "version": 1, "exported_at": now_utc_iso(), "endpoints": load_endpoints(), "codex_config_toml": CONFIG.read_text(encoding="utf-8") if CONFIG.exists() else "", } def save_profile_bundle(path): bundle = build_profile_bundle() Path(path).write_text(json.dumps(bundle, indent=2), encoding="utf-8") def import_profile_bundle(path): data = json.loads(Path(path).read_text(encoding="utf-8")) if not isinstance(data, dict): raise ValueError("Invalid profile bundle") endpoints = data.get("endpoints") if not isinstance(endpoints, dict) or "endpoints" not in endpoints: raise ValueError("Profile bundle missing endpoints") # Keep a local rollback point before overwriting the current profile. if CONFIG.exists(): shutil.copy2(str(CONFIG), str(CONFIG_BAK)) if ENDPOINTS_FILE.exists(): shutil.copy2(str(ENDPOINTS_FILE), str(ENDPOINTS_FILE.with_suffix(".json.import-bak"))) save_endpoints(endpoints) cfg = data.get("codex_config_toml", "") if isinstance(cfg, str) and cfg.strip(): CONFIG.parent.mkdir(parents=True, exist_ok=True) CONFIG.write_text(cfg, encoding="utf-8") return endpoints # ═══════════════════════════════════════════════════════════════════ # Config management # ═══════════════════════════════════════════════════════════════════ def backup_config(): if CONFIG.exists(): shutil.copy2(str(CONFIG), str(CONFIG_BAK)) def restore_config(): if CONFIG_BAK.exists(): CONFIG_BAK.rename(CONFIG) def write_config_for_native(endpoint, selected_model): """Write config for native OpenAI (no proxy needed).""" backup_config() model_catalog = _gen_model_catalog(endpoint, selected_model) mc_path = PROXY_CONFIG_DIR / f"models-{safe_name(endpoint['name'])}.json" mc_path.parent.mkdir(parents=True, exist_ok=True) mc_path.write_text(json.dumps(model_catalog, indent=2)) lines = [ f'model = "{_toml_safe(selected_model)}"\n', f'model_provider = "{_toml_safe(endpoint["name"])}"\n', f'model_catalog_json = "{mc_path}"\n', f'\n[model_providers."{endpoint["name"]}"]\n', f'name = "{_toml_safe(endpoint["name"])}"\n', f'base_url = "{_toml_safe(endpoint["base_url"])}"\n', f'experimental_bearer_token = "{_toml_safe(endpoint["api_key"])}"\n', f'\n[profiles."{endpoint["name"]}"]\n', f'model_provider = "{_toml_safe(endpoint["name"])}"\n', f'model = "{_toml_safe(selected_model)}"\n', f'model_catalog_json = "{mc_path}"\n', f'service_tier = "default"\n', f'approvals_reviewer = "user"\n', ] CONFIG.write_text("".join(lines)) def _toml_safe(val): val = str(val).replace('"', '\\"') return val.split('\n', 1)[0].strip() def write_config_for_translated(endpoint, selected_model): """Write config pointing at local proxy.""" backup_config() model_catalog = _gen_model_catalog(endpoint, selected_model) mc_path = PROXY_CONFIG_DIR / f"models-{safe_name(endpoint['name'])}.json" mc_path.parent.mkdir(parents=True, exist_ok=True) mc_path.write_text(json.dumps(model_catalog, indent=2)) lines = [ f'model = "{_toml_safe(selected_model)}"\n', f'model_provider = "{_toml_safe(endpoint["name"])}"\n', f'model_catalog_json = "{mc_path}"\n', f'\n[model_providers."{endpoint["name"]}"]\n', f'name = "{_toml_safe(endpoint["name"])}"\n', f'base_url = "http://127.0.0.1:8080"\n', f'experimental_bearer_token = "{_toml_safe(endpoint["api_key"])}"\n', f'\n[profiles."{endpoint["name"]}"]\n', f'model_provider = "{_toml_safe(endpoint["name"])}"\n', f'model = "{_toml_safe(selected_model)}"\n', f'model_catalog_json = "{mc_path}"\n', f'service_tier = "fast"\n', f'approvals_reviewer = "user"\n', ] CONFIG.write_text("".join(lines)) def _gen_model_catalog(endpoint, selected_model=None): default_model = selected_model or endpoint.get("default_model") models = [] for mid in endpoint.get("models", []): models.append({ "slug": mid, "model": mid, "display_name": mid, "description": f"{endpoint['name']} {mid}", "hidden": False, "isDefault": mid == default_model, "shell_type": "shell_command", "visibility": "list", "default_reasoning_level": "medium", "supported_reasoning_levels": [ {"effort": "low", "description": "Fast"}, {"effort": "medium", "description": "Balanced"}, {"effort": "high", "description": "Deep"}, {"effort": "xhigh", "description": "Extra deep"}, ], "supportedReasoningEfforts": [ {"reasoningEffort": "low", "description": "Fast"}, {"reasoningEffort": "medium", "description": "Balanced"}, {"reasoningEffort": "high", "description": "Deep"}, {"reasoningEffort": "xhigh", "description": "Extra deep"}, ], "priority": 30, "context_size": 128000, "additional_speed_tiers": [], "service_tiers": [], "supports_reasoning_summaries": True, "support_verbosity": True, "reasoning": True, "tool_call": True, "supports_parallel_tool_calls": True, "experimental_supported_tools": [], "supported_in_api": True, "truncation_policy": {"mode": "tokens", "limit": 128000}, "base_instructions": "You are Codex, a coding agent.", }) return {"models": models} # ═══════════════════════════════════════════════════════════════════ # Proxy management # ═══════════════════════════════════════════════════════════════════ _proxy_proc = None def _start_proxy_for(endpoint, logfn): global _proxy_proc _stop_proxy() pcfg = { "port": 8080, "backend_type": endpoint["backend_type"], "target_url": normalize_base_url(endpoint["base_url"]), "api_key": endpoint["api_key"], "cc_version": endpoint.get("cc_version", ""), "oauth_provider": endpoint.get("oauth_provider", ""), "reasoning_enabled": endpoint.get("reasoning_enabled", True), "reasoning_effort": endpoint.get("reasoning_effort", "medium"), "models": [{"id": m, "object": "model", "created": 1700000000, "owned_by": endpoint["name"]} for m in endpoint.get("models", [])], } pcfg_path = PROXY_CONFIG_DIR / f"proxy-{safe_name(endpoint['name'])}.json" pcfg_path.parent.mkdir(parents=True, exist_ok=True) pcfg_path.write_text(json.dumps(pcfg, indent=2)) _start_proxy_with_config(pcfg_path, logfn) def _start_proxy_with_config(pcfg_path, logfn): global _proxy_proc _proxy_proc = subprocess.Popen( ["python3", str(PROXY), "--config", str(pcfg_path)], stdout=subprocess.DEVNULL, preexec_fn=os.setsid, ) for _ in range(30): try: urllib.request.urlopen("http://127.0.0.1:8080/v1/models", timeout=2) logfn("Proxy ready on port 8080") return except Exception: time.sleep(0.5) logfn("WARNING: proxy may not have started in time") def _stop_proxy(): global _proxy_proc if _proxy_proc and _proxy_proc.poll() is None: try: os.killpg(os.getpgid(_proxy_proc.pid), signal.SIGTERM) time.sleep(0.5) if _proxy_proc.poll() is None: os.killpg(os.getpgid(_proxy_proc.pid), signal.SIGKILL) except (ProcessLookupError, PermissionError): pass _proxy_proc = None def _run_cleanup(): subprocess.run(["bash", str(CLEANUP)], capture_output=True, timeout=30) def _last_log_lines(n=15): try: t = LAUNCH_LOG.read_text() return "\n".join(t.splitlines()[-n:]) except Exception: return "(no log file)" def _detect_codex_cli(): try: path = shutil.which("codex") if not path: return None out = subprocess.run(["codex", "--version"], capture_output=True, text=True, timeout=5) ver = (out.stdout or "").strip() or (out.stderr or "").strip() or "unknown" return (path, ver) except Exception: return None def _detect_codex_desktop(): if START_SH.exists(): return str(START_SH) return None def _check_codex_auth(): try: out = subprocess.run( ["codex", "login", "status"], capture_output=True, text=True, timeout=10, ) text = (out.stdout or "").strip() if not text: text = (out.stderr or "").strip() if out.returncode == 0 and text: return ("logged_in", text) if text: return ("error", text) return ("unknown", "No output from codex login status") except FileNotFoundError: return ("not_installed", "codex not found") except Exception as e: return ("error", str(e)) # ═══════════════════════════════════════════════════════════════════ # Main window # ═══════════════════════════════════════════════════════════════════ class LauncherWin(Gtk.Window): def __init__(self): super().__init__(title="Codex Launcher") self.set_default_size(560, 460) self.set_border_width(12) self.set_position(Gtk.WindowPosition.CENTER) self._proc = None self._endpoints_data = load_endpoints() vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8) self.add(vbox) # header row hdr = Gtk.Box(spacing=8) vbox.pack_start(hdr, False, False, 0) lbl = Gtk.Label(label="Codex Launcher v2.6.1") lbl.set_use_markup(True) hdr.pack_start(lbl, False, False, 0) changelog_btn = Gtk.Button(label="Changelog") changelog_btn.connect("clicked", lambda b: self._show_changelog()) hdr.pack_end(changelog_btn, False, False, 0) usage_btn = Gtk.Button(label="Usage") usage_btn.connect("clicked", lambda b: self._open_usage()) hdr.pack_end(usage_btn, False, False, 0) bgp_btn = Gtk.Button(label="AI BGP") bgp_btn.connect("clicked", lambda b: self._open_bgp()) hdr.pack_end(bgp_btn, False, False, 0) mgr_btn = Gtk.Button(label="Manage Endpoints") mgr_btn.connect("clicked", lambda b: self._open_mgr()) hdr.pack_end(mgr_btn, False, False, 0) # verification status bar self._cli_info = _detect_codex_cli() self._desktop_info = _detect_codex_desktop() ver_box = Gtk.Box(spacing=12) vbox.pack_start(ver_box, False, False, 0) if self._cli_info: cli_path, cli_ver = self._cli_info cli_lbl = Gtk.Label() cli_lbl.set_markup(f"✔ Codex CLI {cli_ver} ({cli_path})") cli_lbl.set_use_markup(True) ver_box.pack_start(cli_lbl, False, False, 0) else: cli_lbl = Gtk.Label() cli_lbl.set_markup("✘ Codex CLI — not found") cli_lbl.set_use_markup(True) ver_box.pack_start(cli_lbl, False, False, 0) cli_install_btn = Gtk.Button(label="Install") cli_install_btn.connect("clicked", lambda b: self._show_install_guide("cli")) ver_box.pack_start(cli_install_btn, False, False, 0) ver_box.pack_start(Gtk.Label(label=" "), False, False, 0) if self._desktop_info: desk_lbl = Gtk.Label() desk_lbl.set_markup(f"✔ Codex Desktop ({self._desktop_info})") desk_lbl.set_use_markup(True) ver_box.pack_start(desk_lbl, False, False, 0) else: desk_lbl = Gtk.Label() desk_lbl.set_markup("✘ Codex Desktop — not found") desk_lbl.set_use_markup(True) ver_box.pack_start(desk_lbl, False, False, 0) desk_install_btn = Gtk.Button(label="Install") desk_install_btn.connect("clicked", lambda b: self._show_install_guide("desktop")) ver_box.pack_start(desk_install_btn, False, False, 0) self._missing = [] if not self._cli_info: self._missing.append("cli") if not self._desktop_info: self._missing.append("desktop") auth_box = Gtk.Box(spacing=12) vbox.pack_start(auth_box, False, False, 0) self._auth_label = Gtk.Label() self._auth_label.set_markup("Checking auth…") self._auth_label.set_use_markup(True) self._auth_label.set_ellipsize(3) auth_box.pack_start(self._auth_label, False, False, 0) self._relogin_btn = Gtk.Button(label="Re-login") self._relogin_btn.set_sensitive(False) self._relogin_btn.connect("clicked", lambda b: self._codex_relogin()) auth_box.pack_end(self._relogin_btn, False, False, 0) threading.Thread(target=self._check_auth_async, daemon=True).start() ops_box = Gtk.Box(spacing=8) vbox.pack_start(ops_box, False, False, 0) self._refresh_all_btn = Gtk.Button(label="Refresh Models") self._refresh_all_btn.connect("clicked", lambda b: self._refresh_all_models()) ops_box.pack_start(self._refresh_all_btn, False, False, 0) self._backup_btn = Gtk.Button(label="Backup Profile") self._backup_btn.connect("clicked", lambda b: self._backup_profile()) ops_box.pack_start(self._backup_btn, False, False, 0) self._import_btn = Gtk.Button(label="Import Profile") self._import_btn.connect("clicked", lambda b: self._import_profile()) ops_box.pack_start(self._import_btn, False, False, 0) # endpoint selector sel_box = Gtk.Box(spacing=6) vbox.pack_start(sel_box, False, False, 4) sel_box.pack_start(Gtk.Label(label="Endpoint:"), False, False, 0) self._combo = Gtk.ComboBoxText() self._combo.connect("changed", lambda c: self._on_endpoint_changed()) sel_box.pack_start(self._combo, True, True, 0) # model selector sel_box.pack_start(Gtk.Label(label="Model:"), False, False, 0) self._model_combo = Gtk.ComboBoxText() sel_box.pack_start(self._model_combo, True, True, 0) # launch buttons btn_box = Gtk.Box(spacing=8, homogeneous=True) vbox.pack_start(btn_box, False, False, 8) self._btn_desktop = Gtk.Button(label="Launch Desktop") self._btn_desktop.connect("clicked", lambda b: self._launch("desktop")) if "desktop" in self._missing: self._btn_desktop.set_tooltip_text("Codex Desktop is not installed") self._btn_desktop.set_sensitive(False) btn_box.pack_start(self._btn_desktop, True, True, 0) self._btn_cli = Gtk.Button(label="Launch CLI") self._btn_cli.connect("clicked", lambda b: self._launch("cli")) if "cli" in self._missing: self._btn_cli.set_tooltip_text("Codex CLI is not installed") self._btn_cli.set_sensitive(False) btn_box.pack_start(self._btn_cli, True, True, 0) btn_box2 = Gtk.Box(spacing=8, homogeneous=True) vbox.pack_start(btn_box2, False, False, 0) self._btn_codex_desktop = Gtk.Button(label="Codex Default (Desktop)") self._btn_codex_desktop.connect("clicked", lambda b: self._launch_codex_default("desktop")) if "desktop" in self._missing: self._btn_codex_desktop.set_tooltip_text("Codex Desktop is not installed") self._btn_codex_desktop.set_sensitive(False) btn_box2.pack_start(self._btn_codex_desktop, True, True, 0) self._btn_codex_cli = Gtk.Button(label="Codex Default (CLI)") self._btn_codex_cli.connect("clicked", lambda b: self._launch_codex_default("cli")) if "cli" in self._missing: self._btn_codex_cli.set_tooltip_text("Codex CLI is not installed") self._btn_codex_cli.set_sensitive(False) btn_box2.pack_start(self._btn_codex_cli, True, True, 0) # status sw = Gtk.ScrolledWindow() sw.set_policy(Gtk.PolicyType.AUTOMATIC, Gtk.PolicyType.AUTOMATIC) vbox.pack_start(sw, True, True, 0) self._buf = Gtk.TextBuffer() self._tv = Gtk.TextView(buffer=self._buf) self._tv.set_editable(False) self._tv.set_cursor_visible(False) self._tv.set_wrap_mode(Gtk.WrapMode.WORD_CHAR) sw.add(self._tv) # bottom bar bb = Gtk.Box(spacing=8) vbox.pack_start(bb, False, False, 0) self._kill_btn = Gtk.Button(label="Kill && Cleanup") self._kill_btn.connect("clicked", lambda b: self._kill()) self._kill_btn.set_sensitive(False) bb.pack_start(self._kill_btn, True, True, 0) self._view_log_btn = Gtk.Button(label="View Log") self._view_log_btn.connect("clicked", lambda b: subprocess.Popen(["xdg-open", str(LAUNCH_LOG)])) bb.pack_start(self._view_log_btn, False, False, 0) self._close_btn = Gtk.Button(label="Close") self._close_btn.connect("clicked", lambda b: self._do_close()) bb.pack_start(self._close_btn, False, False, 0) self.show_all() self._rebuild_combo() self._log_dependency_status() # ── helpers ────────────────────────────────────────────────── def log(self, msg): GLib.idle_add(self._append_log, msg) def _append_log(self, msg): e = self._buf.get_end_iter() self._buf.insert(e, msg + "\n") m = self._buf.create_mark(None, e, False) self._tv.scroll_to_mark(m, 0.0, True, 0.0, 0.5) self._buf.delete_mark(m) def _log_dependency_status(self): if self._cli_info: _, ver = self._cli_info self.log(f"✔ Codex CLI detected ({ver})") else: self.log("✘ Codex CLI NOT found — CLI launch disabled. Click 'Install' above.") if self._desktop_info: self.log(f"✔ Codex Desktop detected ({self._desktop_info})") else: self.log("✘ Codex Desktop NOT found — Desktop launch disabled. Click 'Install' above.") if self._missing: self.log("⚠ Install missing tools before using the launcher.") else: self.log("All dependencies OK.") def _check_auth_async(self): status, msg = _check_codex_auth() GLib.idle_add(self._update_auth_status, status, msg) def _update_auth_status(self, status, msg): if status == "logged_in": self._auth_label.set_markup(f"✔ Auth: {msg}") self._relogin_btn.set_sensitive("cli" not in self._missing) elif status == "not_installed": self._auth_label.set_markup("Auth: N/A (CLI not installed)") else: self._auth_label.set_markup(f"⚠ Auth: {msg}") self._relogin_btn.set_sensitive("cli" not in self._missing) return False def _codex_relogin(self): self.log("Opening codex login in terminal…") terms = [ ("x-terminal-emulator", ["-e"]), ("kgx", ["--"]), ("gnome-terminal", ["--"]), ("konsole", ["-e"]), ("xterm", ["-e"]), ] term = None term_args = None for t in terms: if shutil.which(t[0]): term = t[0] term_args = t[1] break if not term: self.log("ERROR: no terminal emulator found for re-login") return cmd_parts = [term] + term_args + ["codex", "login"] subprocess.Popen(cmd_parts, preexec_fn=os.setsid) self.log("Login flow started in terminal. Re-checking auth in 30s…") self._auth_label.set_markup("Auth: waiting for login…") threading.Thread(target=self._delayed_auth_check, daemon=True).start() def _delayed_auth_check(self): time.sleep(30) self._check_auth_async() def _set_busy(self, busy): def _update(): has_cli = "cli" not in self._missing has_desk = "desktop" not in self._missing self._btn_desktop.set_sensitive(not busy and has_desk) self._btn_cli.set_sensitive(not busy and has_cli) self._btn_codex_desktop.set_sensitive(not busy and has_desk) self._btn_codex_cli.set_sensitive(not busy and has_cli) self._kill_btn.set_sensitive(busy) GLib.idle_add(_update) def _rebuild_combo(self): self._endpoints_data = load_endpoints() self._combo.remove_all() names = [e["name"] for e in self._endpoints_data["endpoints"]] for n in names: self._combo.append_text(n) bgp_names = [p["name"] for p in load_bgp_pools().get("pools", [])] for n in bgp_names: self._combo.append_text(f"🔀 {n}") if names or bgp_names: default = self._endpoints_data.get("default") if default and default in names: self._combo.set_active(names.index(default)) else: self._combo.set_active(0) self._on_endpoint_changed() def _on_endpoint_changed(self): name = self._combo.get_active_text() is_bgp = name and name.startswith("🔀 ") bgp_name = name[2:] if is_bgp else None ep = get_endpoint(name) if name and not is_bgp else None self._model_combo.remove_all() if is_bgp: pool = None for p in load_bgp_pools().get("pools", []): if p["name"] == bgp_name: pool = p break if pool: seen = set() for r in pool.get("routes", []): m = r.get("model", "") if m and m not in seen: self._model_combo.append_text(m) seen.add(m) if seen: self._model_combo.set_active(0) elif ep: for m in ep.get("models", []): self._model_combo.append_text(m) GLib.idle_add(self._select_default_model, ep) def _select_default_model(self, ep): dm = ep.get("default_model", "") models = ep.get("models", []) if dm in models: self._model_combo.set_active(models.index(dm)) elif models: self._model_combo.set_active(0) # ── endpoint mgr ───────────────────────────────────────────── def _open_mgr(self): try: self._mgr_window = EndpointMgr(self) self._mgr_window.connect("destroy", lambda *_: setattr(self, "_mgr_window", None)) except Exception as e: import traceback; traceback.print_exc() d = Gtk.MessageDialog(self, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, f"Error: {e}") d.run(); d.destroy() def _open_bgp(self): try: self._bgp_window = BGPPoolMgr(self) self._bgp_window.connect("destroy", lambda *_: setattr(self, "_bgp_window", None)) except Exception as e: import traceback; traceback.print_exc() d = Gtk.MessageDialog(self, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, f"Error: {e}") d.run(); d.destroy() def _open_usage(self): try: self._usage_window = UsageWindow(self) self._usage_window.connect("destroy", lambda *_: setattr(self, "_usage_window", None)) except Exception as e: import traceback; traceback.print_exc() d = Gtk.MessageDialog(self, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, f"Error: {e}") d.run(); d.destroy() def _backup_profile(self): chooser = Gtk.FileChooserDialog( title="Backup Codex Profile", parent=self, action=Gtk.FileChooserAction.SAVE, ) chooser.add_buttons(Gtk.STOCK_CANCEL, Gtk.ResponseType.CANCEL, Gtk.STOCK_SAVE, Gtk.ResponseType.OK) chooser.set_do_overwrite_confirmation(True) chooser.set_current_name(f"codex-profile-{time.strftime('%Y%m%d-%H%M%S')}.json") resp = chooser.run() filename = chooser.get_filename() if resp == Gtk.ResponseType.OK else None chooser.destroy() if not filename: return try: save_profile_bundle(filename) self.log(f"Profile backed up to {filename}") except Exception as e: self._show_message(Gtk.MessageType.ERROR, f"Backup failed:\n{e}") def _refresh_all_models(self): if getattr(self, "_refresh_running", False): return self._refresh_running = True self._refresh_all_btn.set_sensitive(False) self.log("Refreshing models for all providers...") threading.Thread(target=self._refresh_all_models_worker, daemon=True).start() def _refresh_all_models_worker(self): try: data = load_endpoints() updated = 0 failed = [] for idx, ep in enumerate(list(data["endpoints"])): refreshed, err = refresh_endpoint_models(ep) if refreshed: data["endpoints"][idx] = refreshed updated += 1 else: failed.append(f"{ep['name']}: {err}") if updated: save_endpoints(data) GLib.idle_add(self._finish_refresh_all_models, updated, failed) except Exception as e: GLib.idle_add(self._finish_refresh_all_models_error, str(e)) def _finish_refresh_all_models(self, updated, failed): try: if updated: self._rebuild_combo() if getattr(self, "_mgr_window", None): try: self._mgr_window._rebuild() except Exception: pass self.log(f"Refreshed models for {updated} provider(s)") if failed: self._show_message( Gtk.MessageType.WARNING, "Some providers could not auto-fetch models.\n\n" + "\n".join(failed) + "\n\nThose providers were left unchanged so you can manage them manually." ) elif updated: self._show_message(Gtk.MessageType.INFO, f"Refreshed models for {updated} provider(s).") else: self._show_message(Gtk.MessageType.INFO, "No providers were refreshed.") finally: self._refresh_running = False self._refresh_all_btn.set_sensitive(True) return False def _finish_refresh_all_models_error(self, err): try: self._show_message(Gtk.MessageType.ERROR, f"Refresh failed:\n{err}") finally: self._refresh_running = False self._refresh_all_btn.set_sensitive(True) return False def _import_profile(self): if self._proc and self._proc.poll() is None: self._show_message(Gtk.MessageType.WARNING, "Stop Codex before importing a profile.") return chooser = Gtk.FileChooserDialog( title="Import Codex Profile", parent=self, action=Gtk.FileChooserAction.OPEN, ) chooser.add_buttons(Gtk.STOCK_CANCEL, Gtk.ResponseType.CANCEL, Gtk.STOCK_OPEN, Gtk.ResponseType.OK) resp = chooser.run() filename = chooser.get_filename() if resp == Gtk.ResponseType.OK else None chooser.destroy() if not filename: return confirm = Gtk.MessageDialog( self, 0, Gtk.MessageType.QUESTION, Gtk.ButtonsType.YES_NO, "Importing will replace the current endpoints and Codex config. Continue?" ) ok = confirm.run() == Gtk.ResponseType.YES confirm.destroy() if not ok: return try: import_profile_bundle(filename) self._rebuild_combo() self.log(f"Profile imported from {filename}") self._show_message(Gtk.MessageType.INFO, "Profile imported successfully.") except Exception as e: self._show_message(Gtk.MessageType.ERROR, f"Import failed:\n{e}") def _on_endpoints_updated(self): self._rebuild_combo() def _show_message(self, msg_type, text): d = Gtk.MessageDialog(self, 0, msg_type, Gtk.ButtonsType.OK, text) d.run() d.destroy() def _show_changelog(self): d = Gtk.Dialog(title="Changelog", transient_for=self, modal=True) d.set_default_size(520, 480) d.add_button("Close", Gtk.ResponseType.CLOSE) area = d.get_content_area() area.set_margin_start(12) area.set_margin_end(12) area.set_margin_top(12) area.set_margin_bottom(12) sw = Gtk.ScrolledWindow() sw.set_policy(Gtk.PolicyType.AUTOMATIC, Gtk.PolicyType.AUTOMATIC) area.pack_start(sw, True, True, 0) buf = Gtk.TextBuffer() tv = Gtk.TextView(buffer=buf) tv.set_editable(False) tv.set_cursor_visible(False) tv.set_wrap_mode(Gtk.WrapMode.WORD_CHAR) sw.add(tv) lines = [] for ver, date, items in CHANGELOG: lines.append(f"v{ver} ({date})") for item in items: lines.append(f" \u2022 {item}") lines.append("") txt = "\n".join(lines).strip() buf.insert(buf.get_end_iter(), txt) d.show_all() d.run() d.destroy() def _show_install_guide(self, which): if which == "cli": title = "Install Codex CLI" guide = ( "Codex CLI is required to use CLI launch features.\n\n" "Install with npm:\n" " npm install -g @openai/codex\n\n" "Or download from:\n" " https://github.com/openai/codex\n\n" "After installing, restart the launcher." ) else: title = "Install Codex Desktop" guide = ( "Codex Desktop is required to use Desktop launch features.\n\n" "Expected location: /opt/codex-desktop/start.sh\n\n" "Download from:\n" " https://codex.desktop.openai.com\n\n" "After installing, restart the launcher." ) d = Gtk.MessageDialog(self, 0, Gtk.MessageType.INFO, Gtk.ButtonsType.OK, guide) d.set_title(title) d.run() d.destroy() # ── launch ─────────────────────────────────────────────────── def _launch(self, target): name = self._combo.get_active_text() if not name: self.log("ERROR: no endpoint selected") return model = self._model_combo.get_active_text() if not model: self.log("ERROR: no model selected") return is_bgp = name.startswith("🔀 ") if is_bgp: pool_name = name[2:] pool = None for p in load_bgp_pools().get("pools", []): if p["name"] == pool_name: pool = p break if not pool: self.log(f"ERROR: BGP pool '{pool_name}' not found") return self._set_busy(True) self.log(f"=== 🔀 BGP: {pool_name} / {model} → {'Desktop' if target == 'desktop' else 'CLI'} ===") threading.Thread(target=self._run_bgp, args=(pool, model, target), daemon=True).start() return ep = get_endpoint(name) if not ep: self.log("ERROR: endpoint not found") return self._set_busy(True) self.log(f"=== {ep['name']} / {model} → {'Desktop' if target == 'desktop' else 'CLI'} ===") threading.Thread(target=self._run, args=(ep, model, target), daemon=True).start() def _launch_codex_default(self, target): if "cli" not in self._missing: status, msg = _check_codex_auth() if status != "logged_in": d = Gtk.MessageDialog( self, 0, Gtk.MessageType.WARNING, Gtk.ButtonsType.YES_NO, f"Codex auth check: {msg}\n\n" "Launch may fail without valid authentication.\n" "Continue anyway?" ) r = d.run() d.destroy() if r != Gtk.ResponseType.YES: self._set_busy(False) return self._set_busy(True) self.log(f"=== Codex Default (OAuth) → {'Desktop' if target == 'desktop' else 'CLI'} ===") threading.Thread(target=self._run_codex_default, args=(target,), daemon=True).start() def _run(self, ep, model, target): try: self.log("Cleaning up stale processes…") _run_cleanup() needs_proxy = ep["backend_type"] != "native" if needs_proxy: self.log("Starting translation proxy…") _start_proxy_for(ep, self.log) self.log(f"Configuring Codex for {ep['name']} (proxied)…") write_config_for_translated(ep, model) else: self.log(f"Configuring Codex for {ep['name']} (native)…") write_config_for_native(ep, model) if target == "desktop": self._launch_desktop(ep, model) else: self._launch_cli(ep, model) except Exception as e: self.log(f"ERROR: {e}") finally: _stop_proxy() restore_config() self._set_busy(False) self.log("Ready.") def _run_bgp(self, pool, model, target): try: self.log("Cleaning up stale processes…") _run_cleanup() self.log(f"Starting BGP proxy with {len(pool.get('routes', []))} routes…") bgp_ep = { "name": pool["name"], "backend_type": "openai-compat", "base_url": "http://bgp.placeholder", "api_key": "", "default_model": model, "models": list(dict.fromkeys(r.get("model", model) for r in pool.get("routes", []))), } pcfg = { "port": 8080, "backend_type": "openai-compat", "target_url": "http://bgp.placeholder", "api_key": "", "bgp_routes": pool.get("routes", []), "models": [{"id": m, "object": "model", "created": 1700000000, "owned_by": "bgp"} for m in bgp_ep["models"]], } pcfg_path = PROXY_CONFIG_DIR / f"proxy-{safe_name(pool['name'])}.json" pcfg_path.parent.mkdir(parents=True, exist_ok=True) pcfg_path.write_text(json.dumps(pcfg, indent=2)) _start_proxy_with_config(pcfg_path, self.log) write_config_for_translated(bgp_ep, model) if target == "desktop": self._launch_desktop(bgp_ep, model) else: self._launch_cli(bgp_ep, model) except Exception as e: self.log(f"ERROR: {e}") finally: _stop_proxy() restore_config() self._set_busy(False) self.log("Ready.") def _run_codex_default(self, target): try: self.log("Cleaning up stale processes…") _run_cleanup() _stop_proxy() self.log("Resetting config to Codex defaults (OAuth)…") backup_config() if CONFIG.exists(): CONFIG.unlink() if target == "desktop": self._launch_desktop_direct() else: self._launch_cli_default() except Exception as e: self.log(f"ERROR: {e}") finally: restore_config() self._set_busy(False) self.log("Ready.") def _launch_desktop(self, ep, model): args = [str(START_SH)] if ep["backend_type"] != "native": args += ["--", "--ozone-platform=wayland"] self._proc = subprocess.Popen(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, preexec_fn=os.setsid) pid = self._proc.pid self.log(f"Desktop started (PID {pid})") self.log(f"Log: {LAUNCH_LOG}") t0 = time.time() stall_warned = False while self._proc and self._proc.poll() is None: time.sleep(1.5) el = time.time() - t0 if el > 20 and not stall_warned: self.log("⚠ Still starting after 20 s — possible stall. Click Kill if window doesn't appear.") self.log(f"--- last log lines ---\n{_last_log_lines()}") stall_warned = True if self._proc: rc = self._proc.poll() el = time.time() - t0 self.log(f"Desktop exited (code {rc}) after {el:.0f}s") if el < 12: self.log("TIP: Quick exit — may be warm-start handoff (normal) or crash. Kill && retry if needed.") self.log(f"--- last log lines ---\n{_last_log_lines()}") self._proc = None def _launch_cli(self, ep, model): """Launch codex CLI in a terminal with the selected endpoint.""" self.log(f"Launching Codex CLI with {ep['name']}…") # Find a terminal emulator terms = [ ("x-terminal-emulator", ["-e"]), ("kgx", ["--"]), ("gnome-terminal", ["--"]), ("konsole", ["-e"]), ("xterm", ["-e"]), ] term = None term_args = None for t in terms: if shutil.which(t[0]): term = t[0] term_args = t[1] break if not term: self.log("ERROR: no terminal emulator found (tried x-terminal-emulator, kgx, gnome-terminal, konsole, xterm)") return # For proxied endpoints, the proxy is already running (from _run) # For native, no proxy needed cmd_parts = [term] + term_args if ep["backend_type"] == "native": # Just run codex directly — config.toml is already set up cmd_parts.extend(["codex", "-c", f"model={model}"]) else: # Proxy is running, run codex with the profile cmd_parts.extend(["codex", "--profile", ep["name"], "-c", f"model={model}"]) self.log(f"Running: {' '.join(cmd_parts)}") self._proc = subprocess.Popen(cmd_parts, preexec_fn=os.setsid) pid = self._proc.pid self.log(f"CLI started in terminal (PID {pid})") # Wait for terminal process while self._proc and self._proc.poll() is None: time.sleep(1.5) if self._proc: rc = self._proc.poll() self.log(f"CLI exited (code {rc})") self._proc = None def _launch_desktop_direct(self): self.log("Launching Codex Desktop (default OAuth)…") self._proc = subprocess.Popen( [str(START_SH), "--", "--ozone-platform=wayland"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, preexec_fn=os.setsid, ) pid = self._proc.pid self.log(f"Desktop started (PID {pid})") self.log(f"Log: {LAUNCH_LOG}") t0 = time.time() stall_warned = False while self._proc and self._proc.poll() is None: time.sleep(1.5) el = time.time() - t0 if el > 20 and not stall_warned: self.log("Still starting after 20s — possible stall. Click Kill if window doesn't appear.") self.log(f"--- last log lines ---\n{_last_log_lines()}") stall_warned = True if self._proc: rc = self._proc.poll() el = time.time() - t0 self.log(f"Desktop exited (code {rc}) after {el:.0f}s") if el < 12: self.log("TIP: Quick exit — may be warm-start handoff (normal) or crash.") self.log(f"--- last log lines ---\n{_last_log_lines()}") self._proc = None def _launch_cli_default(self): self.log("Launching Codex CLI (default OAuth)…") terms = [ ("x-terminal-emulator", ["-e"]), ("kgx", ["--"]), ("gnome-terminal", ["--"]), ("konsole", ["-e"]), ("xterm", ["-e"]), ] term = None term_args = None for t in terms: if shutil.which(t[0]): term = t[0] term_args = t[1] break if not term: self.log("ERROR: no terminal emulator found") return cmd_parts = [term] + term_args + ["codex"] self.log(f"Running: {' '.join(cmd_parts)}") self._proc = subprocess.Popen(cmd_parts, preexec_fn=os.setsid) pid = self._proc.pid self.log(f"CLI started in terminal (PID {pid})") while self._proc and self._proc.poll() is None: time.sleep(1.5) if self._proc: rc = self._proc.poll() self.log(f"CLI exited (code {rc})") self._proc = None # ── kill ───────────────────────────────────────────────────── def _kill(self): self.log("=== Killing ===") if self._proc and self._proc.poll() is None: try: pgid = os.getpgid(self._proc.pid) os.killpg(pgid, signal.SIGTERM) time.sleep(1) if self._proc.poll() is None: os.killpg(pgid, signal.SIGKILL) except (ProcessLookupError, PermissionError): pass self._proc = None _stop_proxy() _run_cleanup() restore_config() LOG_DIR.mkdir(parents=True, exist_ok=True) LAUNCH_LOG.unlink(missing_ok=True) self.log("Cleanup complete") self._set_busy(False) self.log("Ready.") def _do_close(self): if self._proc and self._proc.poll() is None: d = Gtk.MessageDialog(self, 0, Gtk.MessageType.QUESTION, Gtk.ButtonsType.YES_NO, "Codex is still running. Kill it?") r = d.run() d.destroy() if r != Gtk.ResponseType.YES: return self._kill() _stop_proxy() Gtk.main_quit() # ═══════════════════════════════════════════════════════════════════ # Endpoint manager dialog # ═══════════════════════════════════════════════════════════════════ class EndpointMgr(Gtk.Window): def __init__(self, parent): super().__init__(title="Manage Endpoints") self.set_transient_for(parent) self.set_modal(True) self._parent = parent self.set_default_size(500, 350) self.set_border_width(12) self.set_position(Gtk.WindowPosition.CENTER_ON_PARENT) vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8) self.add(vbox) title_lbl = Gtk.Label(label="Endpoints") title_lbl.set_use_markup(True) vbox.pack_start(title_lbl, False, False, 0) sw = Gtk.ScrolledWindow() vbox.pack_start(sw, True, True, 0) self._store = Gtk.ListStore(str, str, str, str) # name, provider, backend, default_model self._tree = Gtk.TreeView(model=self._store) for i, title in enumerate(["Name", "Provider", "Type", "Default Model"]): col = Gtk.TreeViewColumn(title, Gtk.CellRendererText(), text=i) col.set_resizable(True) self._tree.append_column(col) sw.add(self._tree) btn_bar = Gtk.Box(spacing=8) vbox.pack_start(btn_bar, False, False, 0) self._add_btn = Gtk.Button(label="Add") self._add_btn.connect("clicked", lambda b: self._add()) btn_bar.pack_start(self._add_btn, False, False, 0) self._edit_btn = Gtk.Button(label="Edit") self._edit_btn.connect("clicked", lambda b: self._edit()) btn_bar.pack_start(self._edit_btn, False, False, 0) self._delete_btn = Gtk.Button(label="Delete") self._delete_btn.connect("clicked", lambda b: self._delete()) btn_bar.pack_start(self._delete_btn, False, False, 0) self._default_btn = Gtk.Button(label="Set Default") self._default_btn.connect("clicked", lambda b: self._set_default()) btn_bar.pack_start(self._default_btn, False, False, 0) self._mgr_close_btn = Gtk.Button(label="Close") self._mgr_close_btn.connect("clicked", lambda b: self.destroy()) btn_bar.pack_end(self._mgr_close_btn, False, False, 0) self._rebuild() self.show_all() def _rebuild(self): data = load_endpoints() self._store.clear() for ep in data["endpoints"]: provider = ep.get("provider_preset", "Custom") bt = label_for_backend(ep["backend_type"]) self._store.append([ep["name"], provider, bt, ep.get("default_model", "")]) def _selected(self): sel = self._tree.get_selection() m, i = sel.get_selected() if i is None: return None return self._store[i][0] def _add(self): try: self._dialog = EditEndpointDialog(self, None) self._dialog.connect("destroy", lambda *_: setattr(self, "_dialog", None)) except Exception as e: import traceback; traceback.print_exc() d = Gtk.MessageDialog(self, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, f"Error: {e}") d.run(); d.destroy() def _edit(self): name = self._selected() if name: try: self._dialog = EditEndpointDialog(self, name) self._dialog.connect("destroy", lambda *_: setattr(self, "_dialog", None)) except Exception as e: import traceback; traceback.print_exc() d = Gtk.MessageDialog(self, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, f"Error: {e}") d.run(); d.destroy() def _delete(self): name = self._selected() if not name: return d = Gtk.MessageDialog(self, 0, Gtk.MessageType.QUESTION, Gtk.ButtonsType.YES_NO, f'Delete endpoint "{name}"?') r = d.run() d.destroy() if r != Gtk.ResponseType.YES: return data = load_endpoints() data["endpoints"] = [e for e in data["endpoints"] if e["name"] != name] if data.get("default") == name: data["default"] = data["endpoints"][0]["name"] if data["endpoints"] else None save_endpoints(data) self._rebuild() self._parent._on_endpoints_updated() def _set_default(self): name = self._selected() if not name: return data = load_endpoints() data["default"] = name save_endpoints(data) self._rebuild() self._parent._on_endpoints_updated() # ═══════════════════════════════════════════════════════════════════ # Edit endpoint dialog # ═══════════════════════════════════════════════════════════════════ class EditEndpointDialog(Gtk.Dialog): def __init__(self, parent, existing_name): title = "Edit Endpoint" if existing_name else "Add Endpoint" Gtk.Dialog.__init__(self, title=title) self.set_transient_for(parent) self.set_modal(True) self._parent_mgr = parent self._existing_name = existing_name self._data = get_endpoint(existing_name) if existing_name else { "name": "", "backend_type": "openai-compat", "base_url": "", "api_key": "", "default_model": "", "models": [], "provider_preset": "Custom", } self.set_default_size(480, 520) area = self.get_content_area() area.set_spacing(6) area.set_margin_start(12) area.set_margin_end(12) area.set_margin_top(12) area.set_margin_bottom(12) grid = Gtk.Grid(column_spacing=8, row_spacing=6) area.pack_start(grid, False, False, 0) def add_row(row, label, widget): grid.attach(Gtk.Label(label=label, xalign=1), 0, row, 1, 1) grid.attach(widget, 1, row, 1, 1) self._entry_name = Gtk.Entry(text=self._data.get("name", "")) add_row(0, "Name:", self._entry_name) self._combo_preset = Gtk.ComboBoxText() self._preset_names = list(PROVIDER_PRESETS.keys()) for preset_name in self._preset_names: self._combo_preset.append_text(preset_name) self._combo_preset.set_active(self._preset_names.index(self._data.get("provider_preset", "Custom")) if self._data.get("provider_preset", "Custom") in self._preset_names else 0) self._combo_preset.connect("changed", lambda c: self._apply_selected_preset()) add_row(1, "Preset:", self._combo_preset) self._combo_type = Gtk.ComboBoxText() for val, lab in [("openai-compat", "OpenAI-compatible (needs proxy)"), ("anthropic", "Anthropic (needs proxy)"), ("command-code", "Command Code (needs proxy)"), ("native", "Native OpenAI (no proxy)")]: self._combo_type.append(val, lab) bt = self._data.get("backend_type", "openai-compat") self._combo_type.set_active_id(bt) add_row(2, "Type:", self._combo_type) self._entry_url = Gtk.Entry(text=self._data.get("base_url", "")) add_row(3, "Base URL:", self._entry_url) self._entry_key = Gtk.Entry(text=self._data.get("api_key", "")) self._entry_key.set_visibility(False) key_box = Gtk.Box(spacing=6) key_box.pack_start(self._entry_key, True, True, 0) self._oauth_btn = Gtk.Button(label="OAuth Login") self._oauth_btn.connect("clicked", lambda b: self._do_oauth_login()) key_box.pack_start(self._oauth_btn, False, False, 0) add_row(4, "API Key:", key_box) self._oauth_btn.set_visible(False) self._entry_cc_ver = Gtk.Entry(text=self._data.get("cc_version", "")) self._entry_cc_ver.set_placeholder_text("e.g. 0.26.8 (Command Code only)") add_row(5, "CC Version:", self._entry_cc_ver) reasoning_css = b""" switch.reasoning-toggle { min-width: 56px; min-height: 28px; border-radius: 14px; background: #e67e22; border: 2px solid #cf6d17; } switch.reasoning-toggle:checked { background: #2ecc71; border: 2px solid #27ae60; } switch.reasoning-toggle slider { min-width: 24px; min-height: 24px; border-radius: 12px; background: white; border: 1px solid #bbb; } """ reasoning_box = Gtk.Box(spacing=10) self._switch_reasoning = Gtk.Switch() self._switch_reasoning.set_name("reasoning-toggle") ctx = self._switch_reasoning.get_style_context() ctx.add_class("reasoning-toggle") try: css_prov = Gtk.CssProvider() css_prov.load_from_data(reasoning_css) ctx.add_provider(css_prov, Gtk.STYLE_PROVIDER_PRIORITY_USER) except Exception: pass self._switch_reasoning.set_active(self._data.get("reasoning_enabled", True)) self._switch_reasoning.connect("notify::active", lambda *a: self._on_reasoning_toggled()) reasoning_box.pack_start(self._switch_reasoning, False, False, 0) self._lbl_reasoning = Gtk.Label() reasoning_box.pack_start(self._lbl_reasoning, False, False, 0) add_row(6, "Reasoning:", reasoning_box) self._combo_effort = Gtk.ComboBoxText() for ev, el in [("none", "None"), ("minimal", "Minimal"), ("low", "Low"), ("medium", "Medium"), ("high", "High"), ("max", "Max")]: self._combo_effort.append(ev, el) saved_effort = self._data.get("reasoning_effort", "medium") self._combo_effort.set_active_id(saved_effort if saved_effort in ("none","minimal","low","medium","high","max") else "medium") add_row(7, "Effort:", self._combo_effort) self._on_reasoning_toggled() # Models mlbl = Gtk.Label(label="Models:", xalign=0) area.pack_start(mlbl, False, False, 4) mbox = Gtk.Box(spacing=6) area.pack_start(mbox, False, False, 0) self._entry_model = Gtk.Entry() mbox.pack_start(self._entry_model, True, True, 0) self._add_model_btn = Gtk.Button(label="Add") self._add_model_btn.connect("clicked", lambda b: self._add_model()) mbox.pack_start(self._add_model_btn, False, False, 0) self._add_list_btn = Gtk.Button(label="Add List") self._add_list_btn.connect("clicked", lambda b: self._add_models_from_text()) mbox.pack_start(self._add_list_btn, False, False, 0) self._fetch_models_btn = Gtk.Button(label="Fetch from API") self._fetch_models_btn.connect("clicked", lambda b: self._fetch_models()) mbox.pack_start(self._fetch_models_btn, False, False, 0) bulk_lbl = Gtk.Label(label="Bulk add models (one per line or comma-separated):", xalign=0) area.pack_start(bulk_lbl, False, False, 2) bulk_sw = Gtk.ScrolledWindow() bulk_sw.set_min_content_height(72) area.pack_start(bulk_sw, False, False, 0) self._bulk_buf = Gtk.TextBuffer() self._bulk_text = Gtk.TextView(buffer=self._bulk_buf) self._bulk_text.set_wrap_mode(Gtk.WrapMode.WORD_CHAR) bulk_sw.add(self._bulk_text) sw = Gtk.ScrolledWindow() sw.set_min_content_height(120) area.pack_start(sw, True, True, 0) self._model_store = Gtk.ListStore(str) self._model_tree = Gtk.TreeView(model=self._model_store) self._model_tree.append_column(Gtk.TreeViewColumn("Model ID", Gtk.CellRendererText(), text=0)) self._model_tree.set_rules_hint(True) sw.add(self._model_tree) self._model_tree.connect("row-activated", lambda t, p, c: self._remove_model(p)) for m in self._data.get("models", []): self._model_store.append([m]) # Default model combo dbox = Gtk.Box(spacing=6) area.pack_start(dbox, False, False, 0) dbox.pack_start(Gtk.Label(label="Default Model:"), False, False, 0) self._combo_default = Gtk.ComboBoxText() self._refresh_default_combo() dbox.pack_start(self._combo_default, True, True, 0) dm = self._data.get("default_model", "") if dm: self._combo_default.set_active_id(dm) self._apply_selected_preset(initial=True) # Buttons self.add_button("Cancel", Gtk.ResponseType.CANCEL) self.add_button("Save", Gtk.ResponseType.OK) self.connect("response", self._on_response) self.show_all() def _add_model(self): m = normalize_model_id(self._entry_model.get_text()) if m: current = self._combo_default.get_active_text() self._model_store.append([m]) self._refresh_default_combo(current or m) self._entry_model.set_text("") def _add_models_from_text(self): buf = self._bulk_buf.get_text(self._bulk_buf.get_start_iter(), self._bulk_buf.get_end_iter(), True) models = parse_model_list(buf) if not models: return current = self._combo_default.get_active_text() existing = {self._model_store[i][0] for i in range(len(self._model_store))} added = False for mid in models: if mid not in existing: self._model_store.append([mid]) existing.add(mid) added = True if added: self._refresh_default_combo(current or models[0]) self._bulk_buf.set_text("") def _apply_selected_preset(self, initial=False): preset_name = self._combo_preset.get_active_text() or "Custom" preset = PROVIDER_PRESETS.get(preset_name, PROVIDER_PRESETS["Custom"]) is_oauth = bool(preset.get("oauth_provider")) self._oauth_btn.set_visible(is_oauth) if is_oauth: self._entry_key.set_placeholder_text("Auto-filled by OAuth") else: self._entry_key.set_placeholder_text("") if not initial or self._existing_name is None: self._combo_type.set_active_id(preset.get("backend_type", "openai-compat")) self._entry_url.set_text(preset.get("base_url", "")) if not self._entry_key.get_text().strip(): self._entry_key.set_text("") cc_ver = preset.get("cc_version", "") if cc_ver and not self._entry_cc_ver.get_text().strip(): self._entry_cc_ver.set_text(cc_ver) if preset.get("models") and len(self._model_store) == 0: for mid in preset["models"]: self._model_store.append([mid]) self._refresh_default_combo(preset["models"][0]) if initial and self._data.get("models"): self._refresh_default_combo(self._data.get("default_model", "")) def _on_reasoning_toggled(self, *_): active = self._switch_reasoning.get_active() self._combo_effort.set_sensitive(active) if active: self._lbl_reasoning.set_markup('ON') else: self._lbl_reasoning.set_markup('OFF') def _do_oauth_login(self): preset_name = self._combo_preset.get_active_text() or "Custom" preset = PROVIDER_PRESETS.get(preset_name, {}) provider = preset.get("oauth_provider", "") if provider == "google": self._google_oauth_flow() def _google_oauth_flow(self): token_path = os.path.expanduser("~/.cache/codex-proxy/google-oauth-token.json") CLIENT_ID = "681255809395-oo8ft2oprdrnp9e3aqf6av3hmdib135j.apps.googleusercontent.com" CLIENT_SECRET = "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w" SCOPES = [ "https://www.googleapis.com/auth/cloud-platform", "https://www.googleapis.com/auth/generative-language.retriever", "https://www.googleapis.com/auth/userinfo.email", "https://www.googleapis.com/auth/userinfo.profile", ] import http.server, hashlib, secrets, socket port = 8085 state = secrets.token_hex(32) verifier = secrets.token_urlsafe(32) challenge = hashlib.sha256(verifier.encode()).digest() challenge_b64 = urllib.parse.quote_plus(__import__('base64').urlsafe_b64encode(challenge).rstrip(b'=').decode()) redirect_uri = f"http://127.0.0.1:{port}/oauth2callback" scope_str = " ".join(SCOPES) auth_url = ( f"https://accounts.google.com/o/oauth2/v2/auth?" f"client_id={CLIENT_ID}" f"&redirect_uri={urllib.parse.quote(redirect_uri)}" f"&response_type=code" f"&scope={urllib.parse.quote(scope_str)}" f"&access_type=offline" f"&prompt=consent" f"&state={state}" f"&code_challenge={challenge_b64}" f"&code_challenge_method=S256" ) dlg = Gtk.Dialog(title="Google OAuth (Gemini Mode)", parent=self, modal=True) dlg.add_button("Cancel", Gtk.ResponseType.CANCEL) dlg.set_default_size(520, 280) area = dlg.get_content_area() area.set_margin_start(16) area.set_margin_end(16) area.set_margin_top(12) area.set_margin_bottom(12) area.set_spacing(8) area.pack_start(Gtk.Label(label="Sign in with Google", use_markup=True, xalign=0), False, False, 0) area.pack_start(Gtk.Label(label="Emulating Gemini CLI OAuth — no client_secret.json needed.", xalign=0), False, False, 0) link_lbl = Gtk.Label() link_lbl.set_markup(f'Click here to open Google authorization') link_lbl.set_line_wrap(True) area.pack_start(link_lbl, False, False, 4) self._oauth_status = Gtk.Label(label="Opening browser…", xalign=0) area.pack_start(self._oauth_status, False, False, 4) spinner = Gtk.Spinner() spinner.start() area.pack_start(spinner, False, False, 8) area.show_all() code_holder = [None] error_holder = [None] received_state = [None] class OAuthHandler(http.server.BaseHTTPRequestHandler): def do_GET(self2): qs = urllib.parse.urlparse(self2.path).query params = urllib.parse.parse_qs(qs) received_state[0] = params.get("state", [None])[0] if "code" in params: if received_state[0] != state: self2.send_response(400) self2.send_header("Content-Type", "text/html") self2.end_headers() self2.wfile.write(b"" b"

CSRF state mismatch.

") error_holder[0] = "CSRF state mismatch" return code_holder[0] = params["code"][0] self2.send_response(302) self2.send_header("Location", "https://developers.google.com/gemini-code-assist/auth_success_gemini") self2.end_headers() else: error_holder[0] = params.get("error", ["unknown"])[0] self2.send_response(302) self2.send_header("Location", "https://developers.google.com/gemini-code-assist/auth_failure_gemini") self2.end_headers() def log_message(self2, *a): pass try: server = http.server.HTTPServer(("127.0.0.1", port), OAuthHandler) except OSError: self._oauth_status.set_text(f"Port {port} already in use — close other apps and retry.") spinner.stop() dlg.run(); dlg.destroy() return def wait_for_code(): server.handle_request() server.server_close() GLib.idle_add(self._google_oauth_complete_gemini, dlg, code_holder, error_holder, CLIENT_ID, CLIENT_SECRET, redirect_uri, token_path, spinner, verifier) threading.Thread(target=wait_for_code, daemon=True).start() subprocess.Popen(["xdg-open", auth_url], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) dlg.run() dlg.destroy() def _google_oauth_complete_gemini(self, dlg, code_holder, error_holder, client_id, client_secret, redirect_uri, token_path, spinner, verifier): spinner.stop() if error_holder[0]: self._oauth_status.set_markup(f'Error: {error_holder[0]}') return if not code_holder[0]: self._oauth_status.set_text("No authorization code received.") return self._oauth_status.set_text("Exchanging code for token…") try: token_data = urllib.parse.urlencode({ "code": code_holder[0], "client_id": client_id, "client_secret": client_secret, "redirect_uri": redirect_uri, "grant_type": "authorization_code", "code_verifier": verifier, }).encode() req = urllib.request.Request("https://oauth2.googleapis.com/token", data=token_data, headers={"Content-Type": "application/x-www-form-urlencoded"}) resp = urllib.request.urlopen(req, timeout=30) tokens = json.loads(resp.read()) tokens["client_id"] = client_id tokens["client_secret"] = client_secret tokens["expires_at"] = time.time() + tokens.get("expires_in", 3600) os.makedirs(os.path.dirname(token_path), exist_ok=True) with open(token_path, "w") as f: json.dump(tokens, f, indent=2) os.chmod(token_path, 0o600) self._entry_key.set_text(tokens.get("access_token", "")) self._oauth_status.set_markup('Authorization successful! Token saved.') dlg.set_title("Google OAuth — Success") except Exception as e: self._oauth_status.set_markup(f'Token exchange failed: {e}') def _remove_model(self, path): current = self._combo_default.get_active_text() self._model_store.remove(self._model_store.get_iter(path)) self._refresh_default_combo(current) def _refresh_default_combo(self, active=None): if active is None: active = self._combo_default.get_active_text() self._combo_default.remove_all() for row in self._model_store: self._combo_default.append(row[0], row[0]) if active and any(row[0] == active for row in self._model_store): self._combo_default.set_active_id(active) elif len(self._model_store) > 0: self._combo_default.set_active(0) def _fetch_models(self): ok, err = self._try_fetch_models() if not ok: d = Gtk.MessageDialog(self, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, f"Failed to fetch models:\n{err}") d.run() d.destroy() def _try_fetch_models(self): endpoint = { "base_url": self._entry_url.get_text().strip(), "api_key": self._entry_key.get_text().strip(), "backend_type": self._combo_type.get_active_id() or "openai-compat", } ids, err = fetch_models_for_endpoint(endpoint) if ids: current = self._combo_default.get_active_text() added = 0 for mid in ids: # check dupes found = any(self._model_store[i][0] == mid for i in range(len(self._model_store))) if not found: self._model_store.append([mid]) added += 1 self._refresh_default_combo(current) return True, None return False, err or "No models returned by endpoint" def _on_response(self, dialog, response): if response != Gtk.ResponseType.OK: self.destroy() return name = self._entry_name.get_text().strip() if not name: self._show_error("Name is required") return bt = self._combo_type.get_active_id() url = self._entry_url.get_text().strip() key = self._entry_key.get_text().strip() models = [self._model_store[i][0] for i in range(len(self._model_store))] if not models: ok, err = self._try_fetch_models() if ok: models = [self._model_store[i][0] for i in range(len(self._model_store))] else: d = Gtk.MessageDialog( self, 0, Gtk.MessageType.QUESTION, Gtk.ButtonsType.YES_NO, f"Auto-fetch failed ({err}).\n\nAdd models manually now?" ) r = d.run() d.destroy() if r == Gtk.ResponseType.YES: self._entry_model.grab_focus() return self.destroy() return if not models: self._show_error("At least one model is required") self._entry_model.grab_focus() return default = self._combo_default.get_active_text() or models[0] data = load_endpoints() # If renaming, remove old entry if self._existing_name and self._existing_name != name: data["endpoints"] = [e for e in data["endpoints"] if e["name"] != self._existing_name] # Check for duplicate name existing = [e for e in data["endpoints"] if e["name"] == name and e != self._data] if existing: self._show_error(f'Endpoint "{name}" already exists') return new_ep = {"name": name, "backend_type": bt, "base_url": url, "api_key": key, "default_model": default, "models": models, "provider_preset": self._combo_preset.get_active_text() or "Custom"} cc_ver = self._entry_cc_ver.get_text().strip() if cc_ver: new_ep["cc_version"] = cc_ver new_ep["reasoning_enabled"] = self._switch_reasoning.get_active() new_ep["reasoning_effort"] = self._combo_effort.get_active_id() or "medium" preset_name = self._combo_preset.get_active_text() or "Custom" preset = PROVIDER_PRESETS.get(preset_name, {}) if preset.get("oauth_provider"): new_ep["oauth_provider"] = preset["oauth_provider"] new_ep["base_url"] = normalize_base_url(new_ep["base_url"]) # Update or append found = False for i, e in enumerate(data["endpoints"]): if e["name"] == name: data["endpoints"][i] = new_ep found = True break if not found: data["endpoints"].append(new_ep) if data.get("default") is None: data["default"] = name save_endpoints(data) self._parent_mgr._rebuild() self._parent_mgr._parent._on_endpoints_updated() self.destroy() def _show_error(self, msg): d = Gtk.MessageDialog(self, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, msg) d.run(); d.destroy() # ═══════════════════════════════════════════════════════════════════ # Entry point # ═══════════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════════ # BGP Pool Manager # ═══════════════════════════════════════════════════════════════════ class BGPPoolMgr(Gtk.Window): def __init__(self, parent): super().__init__(title="AI BGP — Pool Manager") self.set_transient_for(parent) self.set_default_size(620, 440) self._parent = parent vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8) vbox.set_margin_start(12) vbox.set_margin_end(12) vbox.set_margin_top(12) vbox.set_margin_bottom(12) self.add(vbox) hdr = Gtk.Box(spacing=8) vbox.pack_start(hdr, False, False, 0) hdr.pack_start(Gtk.Label(label="AI BGP Pools — multi-provider routing with automatic failover", use_markup=True), False, False, 0) self._store = Gtk.ListStore(str, str, str) self._tree = Gtk.TreeView(model=self._store) for i, (title, w) in enumerate([("Pool Name", 200), ("Routes", 250), ("Strategy", 100)]): r = Gtk.CellRendererText() c = Gtk.TreeViewColumn(title, r, text=i) c.set_min_width(w) self._tree.append_column(c) self._tree.set_headers_visible(True) sw = Gtk.ScrolledWindow() sw.add(self._tree) vbox.pack_start(sw, True, True, 0) sel = self._tree.get_selection() sel.connect("changed", lambda *_: self._on_select()) bbox = Gtk.Box(spacing=8) vbox.pack_start(bbox, False, False, 0) self._add_btn = Gtk.Button(label="Create Pool") self._add_btn.connect("clicked", lambda b: self._add_pool()) bbox.pack_start(self._add_btn, True, True, 0) self._edit_btn = Gtk.Button(label="Edit Pool") self._edit_btn.connect("clicked", lambda b: self._edit_pool()) self._edit_btn.set_sensitive(False) bbox.pack_start(self._edit_btn, True, True, 0) self._del_btn = Gtk.Button(label="Delete Pool") self._del_btn.connect("clicked", lambda b: self._del_pool()) self._del_btn.set_sensitive(False) bbox.pack_start(self._del_btn, True, True, 0) close_btn = Gtk.Button(label="Close") close_btn.connect("clicked", lambda b: self.destroy()) bbox.pack_start(close_btn, True, True, 0) self._rebuild() self.show_all() def _rebuild(self): self._store.clear() for pool in load_bgp_pools().get("pools", []): routes_str = " → ".join(f'{r.get("name","?")}/{r.get("model","?")}' for r in pool.get("routes", [])) self._store.append([pool["name"], routes_str, pool.get("strategy", "failover")]) def _selected_name(self): sel = self._tree.get_selection() m, i = sel.get_selected() return self._store[i][0] if i else None def _on_select(self): name = self._selected_name() self._edit_btn.set_sensitive(bool(name)) self._del_btn.set_sensitive(bool(name)) def _add_pool(self): d = BGPPoolEditDialog(self, None) d.connect("response", lambda *_: self._rebuild()) def _edit_pool(self): name = self._selected_name() if name: d = BGPPoolEditDialog(self, name) d.connect("response", lambda *_: self._rebuild()) def _del_pool(self): name = self._selected_name() if not name: return d = Gtk.MessageDialog(self, 0, Gtk.MessageType.QUESTION, Gtk.ButtonsType.YES_NO, f'Delete BGP pool "{name}"?') r = d.run(); d.destroy() if r != Gtk.ResponseType.YES: return data = load_bgp_pools() data["pools"] = [p for p in data["pools"] if p["name"] != name] save_bgp_pools(data) self._rebuild() self._parent._on_endpoints_updated() class BGPPoolEditDialog(Gtk.Dialog): def __init__(self, parent, existing_name): title = "Edit BGP Pool" if existing_name else "Create BGP Pool" Gtk.Dialog.__init__(self, title=title, parent=parent, modal=True) self.add_button("Cancel", Gtk.ResponseType.CANCEL) self.add_button("Save", Gtk.ResponseType.OK) self.set_default_size(580, 480) self._existing_name = existing_name self._parent_mgr = parent data = load_bgp_pools() pool = None if existing_name: for p in data.get("pools", []): if p["name"] == existing_name: pool = p break if not pool: pool = {"name": "", "strategy": "failover", "routes": []} area = self.get_content_area() area.set_margin_start(12) area.set_margin_end(12) area.set_margin_top(12) area.set_margin_bottom(12) area.set_spacing(8) grid = Gtk.Grid(column_spacing=8, row_spacing=6) area.pack_start(grid, False, False, 0) grid.attach(Gtk.Label(label="Pool Name:", xalign=1), 0, 0, 1, 1) self._entry_name = Gtk.Entry(text=pool["name"]) grid.attach(self._entry_name, 1, 0, 1, 1) grid.attach(Gtk.Label(label="Strategy:", xalign=1), 0, 1, 1, 1) self._combo_strategy = Gtk.ComboBoxText() self._combo_strategy.append("failover", "Failover (try primary, fall back on error)") self._combo_strategy.append("race", "Race (send to all, return fastest)") self._combo_strategy.set_active_id(pool.get("strategy", "failover")) grid.attach(self._combo_strategy, 1, 1, 1, 1) area.pack_start(Gtk.Label(label="Routes (drag to reorder priority)", use_markup=True, xalign=0), False, False, 8) self._route_store = Gtk.ListStore(str, str, str, str, str, str) for r in pool.get("routes", []): self._route_store.append([ r.get("name", ""), r.get("endpoint_name", ""), r.get("target_url", ""), r.get("api_key", ""), r.get("model", ""), str(r.get("priority", 99)) ]) self._route_tree = Gtk.TreeView(model=self._route_store) for i, (title, w) in enumerate([ ("Route Name", 120), ("Endpoint", 120), ("URL", 150), ("API Key", 80), ("Model", 120), ("Priority", 60) ]): renderer = Gtk.CellRendererText() renderer.set_property("editable", False) col = Gtk.TreeViewColumn(title, renderer, text=i) col.set_min_width(w) col.set_resizable(True) self._route_tree.append_column(col) self._route_tree.set_headers_visible(True) sw = Gtk.ScrolledWindow() sw.set_policy(Gtk.PolicyType.AUTOMATIC, Gtk.PolicyType.AUTOMATIC) sw.add(self._route_tree) sw.set_min_content_height(200) area.pack_start(sw, True, True, 0) bbox = Gtk.Box(spacing=6) area.pack_start(bbox, False, False, 0) add_r = Gtk.Button(label="Add Route") add_r.connect("clicked", lambda b: self._add_route()) bbox.pack_start(add_r, True, True, 0) edit_r = Gtk.Button(label="Edit Route") edit_r.connect("clicked", lambda b: self._edit_route()) bbox.pack_start(edit_r, True, True, 0) rm_r = Gtk.Button(label="Remove Route") rm_r.connect("clicked", lambda b: self._remove_route()) bbox.pack_start(rm_r, True, True, 0) up_r = Gtk.Button(label="↑ Up") up_r.connect("clicked", lambda b: self._move_route(-1)) bbox.pack_start(up_r, True, True, 0) down_r = Gtk.Button(label="↓ Down") down_r.connect("clicked", lambda b: self._move_route(1)) bbox.pack_start(down_r, True, True, 0) self.show_all() if self.run() == Gtk.ResponseType.OK: self._save() self.destroy() def _save(self): name = self._entry_name.get_text().strip() if not name: return strategy = self._combo_strategy.get_active_id() or "failover" routes = [] for i, row in enumerate(self._route_store): if not row[2]: continue routes.append({ "name": row[0] or f"Route {i+1}", "endpoint_name": row[1], "target_url": row[2], "api_key": row[3], "model": row[4], "priority": i + 1, "reasoning_enabled": True, "reasoning_effort": "medium", }) data = load_bgp_pools() if self._existing_name: data["pools"] = [p for p in data["pools"] if p["name"] != self._existing_name] data["pools"].append({"name": name, "strategy": strategy, "routes": routes}) save_bgp_pools(data) self._parent_mgr._parent._on_endpoints_updated() def _add_route(self): endpoints = load_endpoints().get("endpoints", []) if not endpoints: d = Gtk.MessageDialog(self, 0, Gtk.MessageType.INFO, Gtk.ButtonsType.OK, "No endpoints configured. Add endpoints in Manage Endpoints first.") d.run(); d.destroy() return d = BGPRouteDialog(self, endpoints, None) if d.result: r = d.result self._route_store.append([ r.get("name", ""), r.get("endpoint_name", ""), r.get("target_url", ""), r.get("api_key", ""), r.get("model", ""), str(r.get("priority", 99)) ]) def _edit_route(self): sel = self._route_tree.get_selection() m, i = sel.get_selected() if not i: return endpoints = load_endpoints().get("endpoints", []) existing = { "name": m[i][0], "endpoint_name": m[i][1], "target_url": m[i][2], "api_key": m[i][3], "model": m[i][4], "priority": int(m[i][5]) if m[i][5] else 99, } d = BGPRouteDialog(self, endpoints, existing) if d.result: r = d.result m[i][0] = r.get("name", "") m[i][1] = r.get("endpoint_name", "") m[i][2] = r.get("target_url", "") m[i][3] = r.get("api_key", "") m[i][4] = r.get("model", "") m[i][5] = str(r.get("priority", 99)) def _remove_route(self): sel = self._route_tree.get_selection() m, i = sel.get_selected() if i: self._route_store.remove(i) def _move_route(self, direction): sel = self._route_tree.get_selection() m, i = sel.get_selected() if not i: return path = m.get_path(i) idx = path.get_indices()[0] new_idx = idx + direction if new_idx < 0 or new_idx >= len(self._route_store): return row_data = [m[idx][c] for c in range(6)] self._route_store.remove(m.get_iter(Gtk.TreePath(idx))) new_iter = self._route_store.insert(new_idx) for c, v in enumerate(row_data): self._route_store.set_value(new_iter, c, v) class BGPRouteDialog(Gtk.Dialog): def __init__(self, parent, endpoints, existing): Gtk.Dialog.__init__(self, title="BGP Route", parent=parent, modal=True) self.add_button("Cancel", Gtk.ResponseType.CANCEL) self.add_button("OK", Gtk.ResponseType.OK) self.set_default_size(440, 300) self.result = None area = self.get_content_area() area.set_margin_start(12) area.set_margin_end(12) area.set_margin_top(12) area.set_margin_bottom(12) area.set_spacing(6) grid = Gtk.Grid(column_spacing=8, row_spacing=6) area.pack_start(grid, False, False, 0) def add_row(row, label, widget): grid.attach(Gtk.Label(label=label, xalign=1), 0, row, 1, 1) grid.attach(widget, 1, row, 1, 1) self._entry_name = Gtk.Entry(text=existing.get("name", "") if existing else "") add_row(0, "Route Name:", self._entry_name) self._combo_ep = Gtk.ComboBoxText() ep_names = [e["name"] for e in endpoints] for en in ep_names: self._combo_ep.append(en, en) if existing and existing.get("endpoint_name") in ep_names: self._combo_ep.set_active_id(existing["endpoint_name"]) elif ep_names: self._combo_ep.set_active(0) self._combo_ep.connect("changed", lambda b: self._on_ep_changed(endpoints)) add_row(1, "Endpoint:", self._combo_ep) self._entry_url = Gtk.Entry() add_row(2, "URL:", self._entry_url) self._entry_key = Gtk.Entry() self._entry_key.set_visibility(False) add_row(3, "API Key:", self._entry_key) self._combo_model = Gtk.ComboBoxText() add_row(4, "Model:", self._combo_model) if existing: self._entry_url.set_text(existing.get("target_url", "")) self._entry_key.set_text(existing.get("api_key", "")) self._on_ep_changed(endpoints) if existing and existing.get("model"): self._combo_model.set_active_id(existing["model"]) self.show_all() if self.run() == Gtk.ResponseType.OK: ep_name = self._combo_ep.get_active_text() or "" ep = None for e in endpoints: if e["name"] == ep_name: ep = e break self.result = { "name": self._entry_name.get_text().strip() or ep_name, "endpoint_name": ep_name, "target_url": self._entry_url.get_text().strip(), "api_key": self._entry_key.get_text().strip(), "model": self._combo_model.get_active_text() or "", "priority": 99, } if ep: self.result["reasoning_enabled"] = ep.get("reasoning_enabled", True) self.result["reasoning_effort"] = ep.get("reasoning_effort", "medium") self.result["oauth_provider"] = ep.get("oauth_provider", "") self.destroy() def _on_ep_changed(self, endpoints): ep_name = self._combo_ep.get_active_text() ep = None for e in endpoints: if e["name"] == ep_name: ep = e break if ep: self._entry_url.set_text(normalize_base_url(ep.get("base_url", ""))) self._entry_key.set_text(ep.get("api_key", "")) self._combo_model.remove_all() for m in ep.get("models", []): mid = normalize_model_id(m) if m else "" self._combo_model.append(mid, m) if ep.get("default_model"): self._combo_model.set_active_id(normalize_model_id(ep["default_model"])) elif len(ep.get("models", [])) > 0: self._combo_model.set_active(0) _USAGE_COLORS = { "green": "#27ae60", "yellow": "#f39c12", "orange": "#e67e22", "red": "#e74c3c", "blue": "#3498db", "purple": "#9b59b6", "dark": "#2c3e50", "light": "#ecf0f1", "mid": "#bdc3c7", } _USAGE_STATS_FILE = HOME / ".cache/codex-proxy/usage-stats.json" def _load_usage_stats(): try: if _USAGE_STATS_FILE.exists(): return json.loads(_USAGE_STATS_FILE.read_text()) except Exception: pass return {"providers": {}, "updated": None} def _bar_color(pct): if pct < 0.5: return _USAGE_COLORS["green"] if pct < 0.8: return _USAGE_COLORS["yellow"] return _USAGE_COLORS["red"] class UsageWindow(Gtk.Window): def __init__(self, parent): super().__init__(title="Usage Stats") self.set_transient_for(parent) self.set_default_size(640, 560) self.set_position(Gtk.WindowPosition.CENTER) self._parent = parent vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=0) self.add(vbox) header = Gtk.Box(spacing=8) header.set_margin_start(16) header.set_margin_end(16) header.set_margin_top(12) header.set_margin_bottom(8) vbox.pack_start(header, False, False, 0) title = Gtk.Label() title.set_markup('Usage Dashboard') header.pack_start(title, False, False, 0) refresh_btn = Gtk.Button(label="Refresh") refresh_btn.connect("clicked", lambda b: self._refresh()) header.pack_end(refresh_btn, False, False, 0) self._updated_lbl = Gtk.Label() self._updated_lbl.set_markup('Never') header.pack_end(self._updated_lbl, False, False, 8) sep = Gtk.Separator() vbox.pack_start(sep, False, False, 0) self._cards_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8) sw = Gtk.ScrolledWindow() sw.set_policy(Gtk.PolicyType.NEVER, Gtk.PolicyType.AUTOMATIC) sw.add(self._cards_box) vbox.pack_start(sw, True, True, 0) self._refresh() self.show_all() def _refresh(self): for c in self._cards_box.get_children(): self._cards_box.remove(c) stats = _load_usage_stats() updated = stats.get("updated") if updated: self._updated_lbl.set_markup(f'Updated: {updated}') providers = stats.get("providers", {}) if not providers: empty = Gtk.Label() empty.set_markup('No usage data yet.\nLaunch a session to start tracking.') empty.set_margin_top(60) self._cards_box.pack_start(empty, False, False, 0) self._cards_box.show_all() return sorted_providers = sorted(providers.items(), key=lambda x: x[1].get("total_requests", 0), reverse=True) for prov_name, prov_data in sorted_providers: card = self._build_card(prov_name, prov_data) self._cards_box.pack_start(card, False, False, 0) self._cards_box.show_all() def _build_card(self, name, data): frame = Gtk.Frame() frame.set_margin_start(12) frame.set_margin_end(12) frame.set_margin_top(4) frame.set_margin_bottom(4) style = frame.get_style_context() style.add_class("card") outer = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=4) outer.set_margin_start(12) outer.set_margin_end(12) outer.set_margin_top(8) outer.set_margin_bottom(8) frame.add(outer) top_row = Gtk.Box(spacing=8) outer.pack_start(top_row, False, False, 0) total = data.get("total_requests", 0) ok = data.get("successes", 0) fail = data.get("failures", 0) success_rate = ok / total if total > 0 else 1.0 name_lbl = Gtk.Label() short = name.replace("https://", "").replace("http://", "").split("/")[0] name_lbl.set_markup(f'{short}') top_row.pack_start(name_lbl, False, False, 0) req_lbl = Gtk.Label() req_lbl.set_markup(f'{total} requests') top_row.pack_start(req_lbl, False, False, 8) if fail > 0: err_lbl = Gtk.Label() err_lbl.set_markup(f'{fail} failed') top_row.pack_start(err_lbl, False, False, 4) last_used = data.get("last_used", "") if last_used: lu_lbl = Gtk.Label() lu_lbl.set_markup(f'{last_used}') top_row.pack_end(lu_lbl, False, False, 0) # Progress bar for success rate bar = Gtk.ProgressBar() bar.set_fraction(success_rate) bar_pct = int(success_rate * 100) bar.set_text(f"{bar_pct}% success") bar.set_show_text(True) bar.set_margin_top(2) bar.set_margin_bottom(2) color = _bar_color(1.0 - success_rate) bar_css = f'progress {{ background-color: {color}; border-radius: 4px; }} trough {{ border-radius: 4px; min-height: 10px; }}' provider = Gtk.CssProvider() provider.load_from_data(bar_css.encode()) bar.get_style_context().add_provider(provider, Gtk.STYLE_PROVIDER_PRIORITY_USER) outer.pack_start(bar, False, False, 0) # Stats row stats_row = Gtk.Box(spacing=16) outer.pack_start(stats_row, False, False, 0) t_in = data.get("total_tokens_in", 0) t_out = data.get("total_tokens_out", 0) dur = data.get("total_duration_s", 0.0) avg_dur = dur / total if total > 0 else 0 for label, value in [ ("Tokens In", f"{t_in:,}"), ("Tokens Out", f"{t_out:,}"), ("Avg Latency", f"{avg_dur:.1f}s"), ]: box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=1) l = Gtk.Label() l.set_markup(f'{label}') box.pack_start(l, False, False, 0) v = Gtk.Label() v.set_markup(f'{value}') box.pack_start(v, False, False, 0) stats_row.pack_start(box, False, False, 0) # Models breakdown models = data.get("models", {}) if len(models) > 0: model_str = " ".join( f'{m} ' f'({md.get("requests",0)})' for m, md in sorted(models.items(), key=lambda x: x[1].get("requests", 0), reverse=True)[:4] ) m_lbl = Gtk.Label() m_lbl.set_markup(f'Models: {model_str}') m_lbl.set_line_wrap(True) m_lbl.set_xalign(0) outer.pack_start(m_lbl, False, False, 2) # Error info last_err = data.get("last_error") if last_err: err_lbl = Gtk.Label() err_lbl.set_markup(f'Last error: {last_err}') err_lbl.set_xalign(0) outer.pack_start(err_lbl, False, False, 0) return frame def main(): for d in [LOG_DIR, PROXY_CONFIG_DIR]: d.mkdir(parents=True, exist_ok=True) # Create default endpoints if none exist if not ENDPOINTS_FILE.exists(): save_endpoints({ "default": "OpenAI", "endpoints": [ {"name": "OpenAI", "backend_type": "native", "base_url": "https://api.openai.com/v1", "api_key": "", "default_model": "gpt-4o", "models": ["gpt-4o", "gpt-4o-mini"], "provider_preset": "OpenAI"}, {"name": "Z.AI", "backend_type": "openai-compat", "base_url": "https://api.z.ai/api/coding/paas/v4", "api_key": "", "default_model": "glm-5.1", "models": ["glm-4.5", "glm-4.5-air", "glm-4.6", "glm-4.7", "glm-5", "glm-5-turbo", "glm-5.1"], "provider_preset": "Custom"}, ], }) w = LauncherWin() w.connect("destroy", Gtk.main_quit) Gtk.main() if __name__ == "__main__": main()