#!/usr/bin/env python3 """Codex Launcher GUI (tkinter) — manage endpoints, launch Desktop or CLI with any provider. Windows-native tkinter GUI mirroring all features of the GTK version. Imports process management, config engine, proxy lifecycle from codex_launcher_lib. """ import tkinter as tk from tkinter import ttk, filedialog, messagebox, scrolledtext import json import os import shutil import socket import ssl import subprocess import sys import threading import time import urllib.error import urllib.parse import urllib.request import base64 import hashlib import secrets import http.server import collections from pathlib import Path from codex_launcher_lib import ( IS_WINDOWS, HOME, CONFIG, CONFIG_BAK, CONFIG_TXN, ENDPOINTS_FILE, BGP_POOLS_FILE, LAUNCH_LOG, LOG_DIR, PROXY_CONFIG_DIR, BIN_DIR, PROXY, CLEANUP, PID_REGISTRY, PROVIDER_PRESETS, CHANGELOG, DEFAULT_CONFIG, OAUTH_SECRETS_PATH, ANTIGRAVITY_MODELS, safe_name, label_for_backend, normalize_model_id, normalize_base_url, parse_model_list, now_utc_iso, apply_provider_preset, load_endpoints, save_endpoints, load_bgp_pools, save_bgp_pools, get_endpoint, build_profile_bundle, save_profile_bundle, import_profile_bundle, backup_config, restore_config, begin_config_transaction, end_config_transaction, recover_config_if_needed, write_config_for_native, write_config_for_translated, endpoint_models_url, endpoint_model_headers, fetch_models_for_endpoint, refresh_endpoint_models, run_endpoint_doctor, detect_codex_cli, detect_codex_desktop, check_codex_auth, last_log_lines, kill_existing_desktop, safe_cleanup_owned, start_proxy_for, stop_proxy, start_bgp_proxy, get_proxy_state, set_proxy_state, detect_terminal, open_url, open_file, write_secure_text, ensure_dirs, create_default_endpoints, load_monitoring_config, save_monitoring_config, load_incident_store, save_incident_store, load_usage_stats, monitoring_log, IncidentStore, AIDiagnosticAgent, HealthWatcher, load_oauth_secrets, save_oauth_secrets, _usage_theme, UA, ) # ═══════════════════════════════════════════════════════════════════════ # Helpers # ═══════════════════════════════════════════════════════════════════════ def _fmt_tok(n): if n >= 1_000_000: return f"{n/1_000_000:.1f}M" if n >= 1_000: return f"{n/1_000:.1f}K" return str(n) def _fmt_dur(s): if s >= 3600: return f"{s/3600:.1f}h" if s >= 60: return f"{s/60:.1f}m" return f"{s:.1f}s" def _status_pill(success_rate, fail_pct): U = _usage_theme() if fail_pct > 0.15: return ("ERR", U["red"]) if fail_pct > 0.05: return ("WARN", U["yellow"]) return ("OK", U["green"]) def _show_doctor_results_tk(parent, ep_name, checks): dlg = tk.Toplevel(parent) dlg.title(f"Doctor: {ep_name}") dlg.geometry("520x420") dlg.transient(parent) dlg.grab_set() passed = sum(1 for _, ok, _ in checks if ok is True) failed = sum(1 for _, ok, _ in checks if ok is False) warned = sum(1 for _, ok, _ in checks if ok is None) hdr = tk.Label(dlg, text=f"{ep_name} {passed} passed {failed} failed {warned} warnings", font=("Segoe UI", 10, "bold")) hdr.pack(padx=12, pady=(12, 4), anchor="w") ttk.Separator(dlg).pack(fill="x", padx=12) canvas = tk.Canvas(dlg) scrollbar = ttk.Scrollbar(dlg, orient="vertical", command=canvas.yview) inner = tk.Frame(canvas) inner.bind("", lambda e: canvas.configure(scrollregion=canvas.bbox("all"))) canvas.create_window((0, 0), window=inner, anchor="nw") canvas.configure(yscrollcommand=scrollbar.set) for name, ok, detail in checks: row = tk.Frame(inner) row.pack(fill="x", padx=12, pady=1) if ok is True: color, sym = "#27ae60", "✓" elif ok is False: color, sym = "#e74c3c", "✗" else: color, sym = "#f39c12", "○" tk.Label(row, text=sym, fg=color, font=("Segoe UI", 11, "bold")).pack(side="left") tk.Label(row, text=name, font=("Segoe UI", 9, "bold")).pack(side="left", padx=(4, 0)) if detail: tk.Label(row, text=detail, fg="#7f8c8d", font=("Segoe UI", 8)).pack(side="right") canvas.pack(side="left", fill="both", expand=True, padx=(12, 0), pady=6) scrollbar.pack(side="right", fill="y", pady=6) btn_frame = tk.Frame(dlg) btn_frame.pack(pady=(0, 10)) ttk.Button(btn_frame, text="Close", command=dlg.destroy).pack() # ═══════════════════════════════════════════════════════════════════════ # EditEndpointDialog # ═══════════════════════════════════════════════════════════════════════ class EditEndpointDialog: def __init__(self, parent, existing_name=None): self.result = False self._existing_name = existing_name self._parent_mgr = parent if existing_name: self._data = get_endpoint(existing_name) or {} else: self._data = { "name": "", "backend_type": "openai-compat", "base_url": "", "api_key": "", "default_model": "", "models": [], "provider_preset": "Custom", } self._dlg = tk.Toplevel(parent) title = "Edit Endpoint" if existing_name else "Add Endpoint" self._dlg.title(title) self._dlg.geometry("520x600") self._dlg.transient(parent) self._dlg.grab_set() main = ttk.Frame(self._dlg, padding=12) main.pack(fill="both", expand=True) grid = ttk.Frame(main) grid.pack(fill="x") row_idx = [0] def add_field(label, widget_factory): ttk.Label(grid, text=label).grid(row=row_idx[0], column=0, sticky="e", padx=(0, 6), pady=2) w = widget_factory() w.grid(row=row_idx[0], column=1, sticky="ew", pady=2) row_idx[0] += 1 return w self._entry_name = add_field("Name:", lambda: ttk.Entry(grid)) self._entry_name.insert(0, self._data.get("name", "")) self._combo_preset = ttk.Combobox(grid, values=list(PROVIDER_PRESETS.keys()), state="readonly") preset = self._data.get("provider_preset", "Custom") self._combo_preset.set(preset) add_field("Preset:", lambda: self._combo_preset) self._combo_preset.bind("<>", lambda e: self._apply_selected_preset(initial=False)) backend_types = [ ("openai-compat", "OpenAI-compatible (needs proxy)"), ("anthropic", "Anthropic (needs proxy)"), ("command-code", "Command Code (needs proxy)"), ("freebuff", "Freebuff - Free DeepSeek/Kimi (needs proxy)"), ("gemini-oauth-cli", "Gemini CLI OAuth (needs proxy)"), ("gemini-oauth-antigravity", "Antigravity OAuth (needs proxy)"), ("native", "Native OpenAI (no proxy)"), ] self._combo_type = ttk.Combobox(grid, values=[f"{v} - {l}" for v, l in backend_types], state="readonly") bt = self._data.get("backend_type", "openai-compat") bt_display = next((f"{v} - {l}" for v, l in backend_types if v == bt), backend_types[0][0] + " - " + backend_types[0][1]) self._combo_type.set(bt_display) add_field("Type:", lambda: self._combo_type) self._bt_map = {f"{v} - {l}": v for v, l in backend_types} self._entry_url = add_field("Base URL:", lambda: ttk.Entry(grid)) self._entry_url.insert(0, self._data.get("base_url", "")) key_frame = ttk.Frame(grid) self._entry_key = ttk.Entry(key_frame, show="*") self._entry_key.pack(side="left", fill="x", expand=True) self._entry_key.insert(0, self._data.get("api_key", "")) self._reveal_var = tk.BooleanVar(value=False) ttk.Checkbutton(key_frame, text="Show", variable=self._reveal_var, command=lambda: self._entry_key.configure(show="" if self._reveal_var.get() else "*")).pack(side="left", padx=(4, 0)) self._oauth_btn = ttk.Button(key_frame, text="OAuth Login", command=self._do_oauth_login) self._oauth_btn.pack(side="left", padx=(4, 0)) add_field("API Key:", lambda: key_frame) self._entry_cc_ver = add_field("CC Version:", lambda: ttk.Entry(grid)) self._entry_cc_ver.insert(0, self._data.get("cc_version", "")) reason_frame = ttk.Frame(grid) self._reason_var = tk.BooleanVar(value=self._data.get("reasoning_enabled", True)) self._reason_cb = ttk.Checkbutton(reason_frame, text="Reasoning ON", variable=self._reason_var, command=self._on_reasoning_toggled) self._reason_cb.pack(side="left") self._combo_effort = ttk.Combobox(reason_frame, values=["none", "minimal", "low", "medium", "high", "max"], state="readonly", width=10) self._combo_effort.set(self._data.get("reasoning_effort", "medium")) self._combo_effort.pack(side="left", padx=(8, 0)) ttk.Label(reason_frame, text="Effort").pack(side="left", padx=(4, 0)) add_field("Reasoning:", lambda: reason_frame) self._on_reasoning_toggled() grid.columnconfigure(1, weight=1) ttk.Label(main, text="Models:").pack(anchor="w", pady=(8, 2)) model_input_frame = ttk.Frame(main) model_input_frame.pack(fill="x") self._entry_model = ttk.Entry(model_input_frame) self._entry_model.pack(side="left", fill="x", expand=True) ttk.Button(model_input_frame, text="Add", command=self._add_model).pack(side="left", padx=(4, 0)) ttk.Button(model_input_frame, text="Bulk Add", command=self._add_models_from_text).pack(side="left", padx=(4, 0)) ttk.Button(model_input_frame, text="Fetch from API", command=self._fetch_models).pack(side="left", padx=(4, 0)) ttk.Button(model_input_frame, text="Test Endpoint", command=self._diagnose_endpoint).pack(side="left", padx=(4, 0)) ttk.Label(main, text="Bulk add (one per line or comma-separated):").pack(anchor="w", pady=(4, 0)) self._bulk_text = tk.Text(main, height=3, wrap="word") self._bulk_text.pack(fill="x", pady=(2, 4)) list_frame = ttk.Frame(main) list_frame.pack(fill="both", expand=True) self._model_listbox = tk.Listbox(list_frame, height=6) sb = ttk.Scrollbar(list_frame, orient="vertical", command=self._model_listbox.yview) self._model_listbox.configure(yscrollcommand=sb.set) self._model_listbox.pack(side="left", fill="both", expand=True) sb.pack(side="right", fill="y") self._model_listbox.bind("", lambda e: self._remove_selected_model()) for m in self._data.get("models", []): self._model_listbox.insert("end", m) default_frame = ttk.Frame(main) default_frame.pack(fill="x", pady=(4, 0)) ttk.Label(default_frame, text="Default Model:").pack(side="left") self._combo_default = ttk.Combobox(default_frame, state="readonly") self._combo_default.pack(side="left", fill="x", expand=True, padx=(6, 0)) self._refresh_default_combo() dm = self._data.get("default_model", "") if dm: self._combo_default.set(dm) self._apply_selected_preset(initial=True) btn_frame = ttk.Frame(main) btn_frame.pack(fill="x", pady=(8, 0)) ttk.Button(btn_frame, text="Cancel", command=self._cancel).pack(side="right") ttk.Button(btn_frame, text="Save", command=self._save).pack(side="right", padx=(8, 0)) def _on_reasoning_toggled(self): state = "readonly" if self._reason_var.get() else "disabled" self._combo_effort.configure(state=state) def _apply_selected_preset(self, initial=False): preset_name = self._combo_preset.get() or "Custom" preset = PROVIDER_PRESETS.get(preset_name, {}) is_oauth = bool(preset.get("oauth_provider")) self._oauth_btn.configure(state="normal" if is_oauth else "disabled") if not initial or self._existing_name is None: bt = preset.get("backend_type", "openai-compat") bt_display = next((k for k, v in self._bt_map.items() if v == bt), list(self._bt_map.keys())[0]) self._combo_type.set(bt_display) self._entry_url.delete(0, "end") self._entry_url.insert(0, preset.get("base_url", "")) cc_ver = preset.get("cc_version", "") if cc_ver and not self._entry_cc_ver.get().strip(): self._entry_cc_ver.delete(0, "end") self._entry_cc_ver.insert(0, cc_ver) if preset.get("models") and self._model_listbox.size() == 0: self._model_listbox.delete(0, "end") for mid in preset["models"]: self._model_listbox.insert("end", mid) self._refresh_default_combo() if preset["models"]: self._combo_default.set(preset["models"][0]) def _add_model(self): m = normalize_model_id(self._entry_model.get()) if m: self._model_listbox.insert("end", m) self._refresh_default_combo() self._entry_model.delete(0, "end") def _add_models_from_text(self): text = self._bulk_text.get("1.0", "end") models = parse_model_list(text) existing = set(self._model_listbox.get(i) for i in range(self._model_listbox.size())) for mid in models: if mid not in existing: self._model_listbox.insert("end", mid) self._bulk_text.delete("1.0", "end") self._refresh_default_combo() def _remove_selected_model(self): sel = self._model_listbox.curselection() if sel: self._model_listbox.delete(sel[0]) self._refresh_default_combo() def _refresh_default_combo(self): models = list(self._model_listbox.get(i) for i in range(self._model_listbox.size())) current = self._combo_default.get() self._combo_default["values"] = models if current in models: self._combo_default.set(current) elif models: self._combo_default.set(models[0]) else: self._combo_default.set("") def _fetch_models(self): ep = self._make_endpoint_snapshot() ids, err = fetch_models_for_endpoint(ep) if ids: existing = set(self._model_listbox.get(i) for i in range(self._model_listbox.size())) for mid in ids: if mid not in existing: self._model_listbox.insert("end", mid) self._refresh_default_combo() else: messagebox.showerror("Fetch Models", f"Failed:\n{err}", parent=self._dlg) def _diagnose_endpoint(self): ep = self._make_endpoint_snapshot() wait = tk.Toplevel(self._dlg) wait.title("Running Doctor...") wait.geometry("280x80") wait.transient(self._dlg) wait.grab_set() tk.Label(wait, text="Running endpoint diagnostics...").pack(expand=True) def _run(): checks = run_endpoint_doctor(ep) self._dlg.after(0, lambda: (wait.destroy(), _show_doctor_results_tk(self._dlg, ep.get("default_model", "endpoint"), checks))) threading.Thread(target=_run, daemon=True).start() def _make_endpoint_snapshot(self): bt_display = self._combo_type.get() bt = self._bt_map.get(bt_display, "openai-compat") return { "base_url": self._entry_url.get().strip(), "api_key": self._entry_key.get().strip(), "backend_type": bt, "default_model": self._combo_default.get() or "", } def _do_oauth_login(self): preset_name = self._combo_preset.get() or "Custom" preset = PROVIDER_PRESETS.get(preset_name, {}) provider = preset.get("oauth_provider", "") if (provider or "").startswith("google"): self._google_oauth_flow(provider) def _google_oauth_flow(self, oauth_provider="google-cli"): is_antigravity = oauth_provider == "google-antigravity" token_path = str(PROXY_CONFIG_DIR / ("google-antigravity-oauth-token.json" if is_antigravity else "google-cli-oauth-token.json")) _sec = load_oauth_secrets().get("antigravity" if is_antigravity else "gemini_cli", {}) CLIENT_ID = _sec.get("client_id", "") CLIENT_SECRET = _sec.get("client_secret", "") if is_antigravity: SCOPES = [ "https://www.googleapis.com/auth/cloud-platform", "https://www.googleapis.com/auth/userinfo.email", "https://www.googleapis.com/auth/userinfo.profile", "https://www.googleapis.com/auth/cclog", "https://www.googleapis.com/auth/experimentsandconfigs", ] port = 51121 redirect_uri = f"http://localhost:{port}/oauth-callback" callback_path = "/oauth-callback" provider_kind = "antigravity" else: SCOPES = [ "https://www.googleapis.com/auth/cloud-platform", "https://www.googleapis.com/auth/userinfo.email", "https://www.googleapis.com/auth/userinfo.profile", ] port = 0 redirect_uri = None callback_path = "/oauth2callback" provider_kind = "cli" state = secrets.token_hex(32) verifier = secrets.token_urlsafe(64) challenge = base64.urlsafe_b64encode(hashlib.sha256(verifier.encode()).digest()).rstrip(b"=").decode() if port == 0: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("127.0.0.1", 0)) port = s.getsockname()[1] redirect_uri = f"http://127.0.0.1:{port}/oauth2callback" scope_str = " ".join(SCOPES) auth_url = ( f"https://accounts.google.com/o/oauth2/v2/auth?" f"client_id={CLIENT_ID}" f"&redirect_uri={urllib.parse.quote(redirect_uri)}" f"&response_type=code" f"&scope={urllib.parse.quote(scope_str)}" f"&access_type=offline" f"&prompt=select_account%20consent" f"&state={state}" f"&code_challenge={challenge}" f"&code_challenge_method=S256" ) oauth_dlg = tk.Toplevel(self._dlg) oauth_dlg.title("Google OAuth (Gemini Mode)") oauth_dlg.geometry("520x280") oauth_dlg.transient(self._dlg) oauth_dlg.grab_set() tk.Label(oauth_dlg, text="Sign in with Google", font=("Segoe UI", 11, "bold")).pack(padx=16, pady=(12, 0), anchor="w") tk.Label(oauth_dlg, text=f"Using OAuth credentials from {OAUTH_SECRETS_PATH}").pack(padx=16, anchor="w") link_lbl = tk.Label(oauth_dlg, text="Click here to open Google authorization", fg="blue", cursor="hand2") link_lbl.pack(padx=16, pady=(8, 0), anchor="w") link_lbl.bind("", lambda e: open_url(auth_url)) self._oauth_status_var = tk.StringVar(value="Opening browser...") tk.Label(oauth_dlg, textvariable=self._oauth_status_var).pack(padx=16, pady=(8, 0), anchor="w") code_holder = [None] error_holder = [None] received_state = [None] class OAuthHandler(http.server.BaseHTTPRequestHandler): def do_GET(self2): qs = urllib.parse.urlparse(self2.path).query params = urllib.parse.parse_qs(qs) received_state[0] = params.get("state", [None])[0] if self2.path.find(callback_path) == -1: self2.send_response(302) self2.send_header("Location", "https://developers.google.com/gemini-code-assist/auth_failure_gemini") self2.end_headers() error_holder[0] = "unexpected request" return if "code" in params: if received_state[0] != state: self2.send_response(400) self2.send_header("Content-Type", "text/html") self2.end_headers() self2.wfile.write(b"

CSRF state mismatch.

") error_holder[0] = "CSRF state mismatch" return code_holder[0] = params["code"][0] self2.send_response(302) self2.send_header("Location", "https://developers.google.com/gemini-code-assist/auth_success_gemini") self2.end_headers() else: error_holder[0] = params.get("error", ["unknown"])[0] self2.send_response(302) self2.send_header("Location", "https://developers.google.com/gemini-code-assist/auth_failure_gemini") self2.end_headers() def log_message(self2, fmt, *args): pass try: bind_host = "localhost" if is_antigravity else "127.0.0.1" server = http.server.HTTPServer((bind_host, port), OAuthHandler) except OSError: self._oauth_status_var.set(f"Port {port} already in use -- close other apps and retry.") return def wait_for_code(): deadline = time.time() + 120 while code_holder[0] is None and error_holder[0] is None and time.time() < deadline: server.handle_request() server.server_close() if code_holder[0]: try: token_data = urllib.parse.urlencode({ "code": code_holder[0], "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "redirect_uri": redirect_uri, "grant_type": "authorization_code", "code_verifier": verifier, }).encode() req = urllib.request.Request("https://oauth2.googleapis.com/token", data=token_data, headers={"Content-Type": "application/x-www-form-urlencoded"}) resp = urllib.request.urlopen(req, timeout=30) tokens = json.loads(resp.read()) tokens["client_id"] = CLIENT_ID tokens["client_secret"] = CLIENT_SECRET tokens["provider_kind"] = provider_kind tokens["expires_at"] = time.time() + tokens.get("expires_in", 3600) os.makedirs(os.path.dirname(token_path), exist_ok=True) with open(token_path, "w") as f: json.dump(tokens, f, indent=2) project_id = "" try: lr = urllib.request.Request( "https://cloudcode-pa.googleapis.com/v1internal:loadCodeAssist", data=json.dumps({}).encode(), headers={"Content-Type": "application/json", "Authorization": f"Bearer {tokens['access_token']}", "User-Agent": "google-api-nodejs-client/9.15.1"}) lresp = urllib.request.urlopen(lr, timeout=15) ldata = json.loads(lresp.read()) p = ldata.get("cloudaicompanionProject", "") if isinstance(p, dict): project_id = p.get("id", "") elif isinstance(p, str): project_id = p if project_id: tokens["project_id"] = project_id with open(token_path, "w") as f2: json.dump(tokens, f2, indent=2) except Exception: pass found_models = [] if is_antigravity: found_models = list(ANTIGRAVITY_MODELS) else: found_models = ["gemini-2.5-flash", "gemini-2.5-pro"] if found_models: tokens["available_models"] = found_models with open(token_path, "w") as f3: json.dump(tokens, f3, indent=2) self._dlg.after(0, lambda: self._oauth_success(oauth_dlg, tokens.get("access_token", ""))) except Exception as e: self._dlg.after(0, lambda: self._oauth_failed(oauth_dlg, str(e))) else: self._dlg.after(0, lambda: self._oauth_failed(oauth_dlg, error_holder[0] or "No authorization code received.")) threading.Thread(target=wait_for_code, daemon=True).start() open_url(auth_url) def _oauth_success(self, dlg, access_token): self._entry_key.delete(0, "end") self._entry_key.insert(0, access_token) self._oauth_status_var.set("Authorization successful! Token saved.") self._dlg.after(1500, dlg.destroy) def _oauth_failed(self, dlg, msg): self._oauth_status_var.set(f"Failed: {msg}") self._dlg.after(3000, dlg.destroy) def _cancel(self): self._dlg.destroy() def _save(self): name = self._entry_name.get().strip() if not name: messagebox.showerror("Error", "Name is required", parent=self._dlg) return bt_display = self._combo_type.get() bt = self._bt_map.get(bt_display, "openai-compat") url = self._entry_url.get().strip() key = self._entry_key.get().strip() models = list(self._model_listbox.get(i) for i in range(self._model_listbox.size())) if not models: ep_snap = self._make_endpoint_snapshot() ids, err = fetch_models_for_endpoint(ep_snap) if ids: for mid in ids: self._model_listbox.insert("end", mid) self._refresh_default_combo() models = list(self._model_listbox.get(i) for i in range(self._model_listbox.size())) else: r = messagebox.askyesno("No Models", f"Auto-fetch failed ({err}).\n\nAdd models manually now?", parent=self._dlg) if r: self._entry_model.focus_set() return self._dlg.destroy() return if not models: messagebox.showerror("Error", "At least one model is required", parent=self._dlg) return default = self._combo_default.get() or models[0] data = load_endpoints() if self._existing_name and self._existing_name != name: data["endpoints"] = [e for e in data["endpoints"] if e["name"] != self._existing_name] existing = [e for e in data["endpoints"] if e["name"] == name] if existing: messagebox.showerror("Error", f'Endpoint "{name}" already exists', parent=self._dlg) return new_ep = { "name": name, "backend_type": bt, "base_url": normalize_base_url(url), "api_key": key, "default_model": default, "models": models, "provider_preset": self._combo_preset.get() or "Custom", "reasoning_enabled": self._reason_var.get(), "reasoning_effort": self._combo_effort.get() or "medium", } cc_ver = self._entry_cc_ver.get().strip() if cc_ver: new_ep["cc_version"] = cc_ver preset_name = self._combo_preset.get() or "Custom" preset = PROVIDER_PRESETS.get(preset_name, {}) if preset.get("oauth_provider"): new_ep["oauth_provider"] = preset["oauth_provider"] found = False for i, e in enumerate(data["endpoints"]): if e["name"] == name: data["endpoints"][i] = new_ep found = True break if not found: data["endpoints"].append(new_ep) if data.get("default") is None: data["default"] = name save_endpoints(data) self.result = True self._dlg.destroy() # ═══════════════════════════════════════════════════════════════════════ # EndpointMgr # ═══════════════════════════════════════════════════════════════════════ class EndpointMgr: def __init__(self, parent, on_update=None): self._parent = parent self._on_update = on_update self._dlg = tk.Toplevel(parent) self._dlg.title("Manage Endpoints") self._dlg.geometry("600x400") self._dlg.transient(parent) main = ttk.Frame(self._dlg, padding=12) main.pack(fill="both", expand=True) ttk.Label(main, text="Endpoints", font=("Segoe UI", 11, "bold")).pack(anchor="w") tree_frame = ttk.Frame(main) tree_frame.pack(fill="both", expand=True, pady=(4, 0)) cols = ("name", "provider", "backend", "default_model") self._tree = ttk.Treeview(tree_frame, columns=cols, show="headings", selectmode="browse") for col, heading, width in [("name", "Name", 140), ("provider", "Provider", 160), ("backend", "Type", 140), ("default_model", "Default Model", 140)]: self._tree.heading(col, text=heading) self._tree.column(col, width=width, minwidth=80) sb = ttk.Scrollbar(tree_frame, orient="vertical", command=self._tree.yview) self._tree.configure(yscrollcommand=sb.set) self._tree.pack(side="left", fill="both", expand=True) sb.pack(side="right", fill="y") btn_frame = ttk.Frame(main) btn_frame.pack(fill="x", pady=(8, 0)) ttk.Button(btn_frame, text="Add", command=self._add).pack(side="left", padx=(0, 4)) ttk.Button(btn_frame, text="Edit", command=self._edit).pack(side="left", padx=(0, 4)) ttk.Button(btn_frame, text="Delete", command=self._delete).pack(side="left", padx=(0, 4)) ttk.Button(btn_frame, text="Set Default", command=self._set_default).pack(side="left", padx=(0, 4)) ttk.Button(btn_frame, text="Doctor", command=self._doctor_selected).pack(side="left", padx=(0, 4)) ttk.Button(btn_frame, text="Doctor All", command=self._doctor_all).pack(side="left", padx=(0, 4)) ttk.Button(btn_frame, text="Close", command=self._dlg.destroy).pack(side="right") self._rebuild() def _rebuild(self): for item in self._tree.get_children(): self._tree.delete(item) data = load_endpoints() for ep in data["endpoints"]: provider = ep.get("provider_preset", "Custom") bt = label_for_backend(ep["backend_type"]) self._tree.insert("", "end", values=(ep["name"], provider, bt, ep.get("default_model", ""))) def _selected_name(self): sel = self._tree.selection() if not sel: return None return self._tree.item(sel[0])["values"][0] def _add(self): d = EditEndpointDialog(self._dlg, None) self._dlg.wait_window(d._dlg) if d.result: self._rebuild() if self._on_update: self._on_update() def _edit(self): name = self._selected_name() if not name: return d = EditEndpointDialog(self._dlg, name) self._dlg.wait_window(d._dlg) if d.result: self._rebuild() if self._on_update: self._on_update() def _delete(self): name = self._selected_name() if not name: return if not messagebox.askyesno("Delete", f'Delete endpoint "{name}"?', parent=self._dlg): return data = load_endpoints() data["endpoints"] = [e for e in data["endpoints"] if e["name"] != name] if data.get("default") == name: data["default"] = data["endpoints"][0]["name"] if data["endpoints"] else None save_endpoints(data) self._rebuild() if self._on_update: self._on_update() def _set_default(self): name = self._selected_name() if not name: return data = load_endpoints() data["default"] = name save_endpoints(data) self._rebuild() if self._on_update: self._on_update() def _doctor_selected(self): name = self._selected_name() if not name: return ep = get_endpoint(name) if not ep: return wait = tk.Toplevel(self._dlg) wait.title(f"Doctor: {name}...") wait.geometry("280x80") wait.transient(self._dlg) wait.grab_set() tk.Label(wait, text=f"Running diagnostics for {name}...").pack(expand=True) def _run(): checks = run_endpoint_doctor(ep) self._dlg.after(0, lambda: (wait.destroy(), _show_doctor_results_tk(self._dlg, name, checks))) threading.Thread(target=_run, daemon=True).start() def _doctor_all(self): data = load_endpoints() endpoints = data.get("endpoints", []) if not endpoints: messagebox.showinfo("Doctor All", "No endpoints configured.", parent=self._dlg) return wait = tk.Toplevel(self._dlg) wait.title("Doctor All...") wait.geometry("320x80") wait.transient(self._dlg) wait.grab_set() tk.Label(wait, text=f"Testing {len(endpoints)} endpoints...").pack(expand=True) all_results = {} def _run(): for ep in endpoints: try: all_results[ep["name"]] = run_endpoint_doctor(ep) except Exception as e: all_results[ep["name"]] = [("Doctor run", False, str(e)[:100])] def _show(): wait.destroy() dlg = tk.Toplevel(self._dlg) dlg.title("Doctor All Results") dlg.geometry("580x480") dlg.transient(self._dlg) canvas = tk.Canvas(dlg) scrollbar = ttk.Scrollbar(dlg, orient="vertical", command=canvas.yview) inner = tk.Frame(canvas) inner.bind("", lambda e: canvas.configure(scrollregion=canvas.bbox("all"))) canvas.create_window((0, 0), window=inner, anchor="nw") canvas.configure(yscrollcommand=scrollbar.set) for ep_name, checks in all_results.items(): passed = sum(1 for _, ok, _ in checks if ok is True) failed = sum(1 for _, ok, _ in checks if ok is False) color = "#e74c3c" if failed else "#27ae60" status = f"{failed} failed" if failed else f"{passed} passed" tk.Label(inner, text=f"{ep_name} {status}", fg=color, font=("Segoe UI", 9, "bold")).pack(anchor="w", padx=12, pady=(8, 2)) for name, ok, detail in checks: if ok is True: sym, sc = "✓", "#27ae60" elif ok is False: sym, sc = "✗", "#e74c3c" else: sym, sc = "○", "#f39c12" row = tk.Frame(inner) row.pack(anchor="w", padx=24, pady=0) tk.Label(row, text=sym, fg=sc, font=("Segoe UI", 9, "bold")).pack(side="left") txt = name if detail: txt += f" {detail}" tk.Label(row, text=txt, fg="#7f8c8d", font=("Segoe UI", 8)).pack(side="left") ttk.Separator(inner).pack(fill="x", padx=12, pady=4) canvas.pack(side="left", fill="both", expand=True, padx=(12, 0)) scrollbar.pack(side="right", fill="y") ttk.Button(dlg, text="Close", command=dlg.destroy).pack(pady=8) self._dlg.after(0, _show) threading.Thread(target=_run, daemon=True).start() # ═══════════════════════════════════════════════════════════════════════ # BGP Pool Manager # ═══════════════════════════════════════════════════════════════════════ class BGPRouteDialog: def __init__(self, parent, endpoints, existing=None): self.result = None self._dlg = tk.Toplevel(parent) self._dlg.title("BGP Route") self._dlg.geometry("440x300") self._dlg.transient(parent) self._dlg.grab_set() main = ttk.Frame(self._dlg, padding=12) main.pack(fill="both", expand=True) ttk.Label(main, text="Route Name:").grid(row=0, column=0, sticky="e", padx=(0, 6), pady=2) self._entry_name = ttk.Entry(main) self._entry_name.grid(row=0, column=1, sticky="ew", pady=2) if existing: self._entry_name.insert(0, existing.get("name", "")) ttk.Label(main, text="Endpoint:").grid(row=1, column=0, sticky="e", padx=(0, 6), pady=2) ep_names = [e["name"] for e in endpoints] self._combo_ep = ttk.Combobox(main, values=ep_names, state="readonly") self._combo_ep.grid(row=1, column=1, sticky="ew", pady=2) if existing and existing.get("endpoint_name") in ep_names: self._combo_ep.set(existing["endpoint_name"]) elif ep_names: self._combo_ep.set(ep_names[0]) ttk.Label(main, text="URL:").grid(row=2, column=0, sticky="e", padx=(0, 6), pady=2) self._entry_url = ttk.Entry(main) self._entry_url.grid(row=2, column=1, sticky="ew", pady=2) ttk.Label(main, text="API Key:").grid(row=3, column=0, sticky="e", padx=(0, 6), pady=2) self._entry_key = ttk.Entry(main, show="*") self._entry_key.grid(row=3, column=1, sticky="ew", pady=2) ttk.Label(main, text="Model:").grid(row=4, column=0, sticky="e", padx=(0, 6), pady=2) self._combo_model = ttk.Combobox(main, state="readonly") self._combo_model.grid(row=4, column=1, sticky="ew", pady=2) main.columnconfigure(1, weight=1) self._endpoints = endpoints self._combo_ep.bind("<>", lambda e: self._on_ep_changed()) self._on_ep_changed() if existing: self._entry_url.delete(0, "end") self._entry_url.insert(0, existing.get("target_url", "")) self._entry_key.delete(0, "end") self._entry_key.insert(0, existing.get("api_key", "")) if existing.get("model"): self._combo_model.set(existing["model"]) btn_frame = ttk.Frame(main) btn_frame.grid(row=5, column=0, columnspan=2, pady=(12, 0)) ttk.Button(btn_frame, text="Cancel", command=self._dlg.destroy).pack(side="right") ttk.Button(btn_frame, text="OK", command=self._ok).pack(side="right", padx=(8, 0)) self._dlg.wait_window() def _on_ep_changed(self): ep_name = self._combo_ep.get() ep = None for e in self._endpoints: if e["name"] == ep_name: ep = e break if ep: self._entry_url.delete(0, "end") self._entry_url.insert(0, normalize_base_url(ep.get("base_url", ""))) self._entry_key.delete(0, "end") self._entry_key.insert(0, ep.get("api_key", "")) models = ep.get("models", []) self._combo_model["values"] = models if ep.get("default_model") and ep["default_model"] in models: self._combo_model.set(ep["default_model"]) elif models: self._combo_model.set(models[0]) def _ok(self): ep_name = self._combo_ep.get() ep = None for e in self._endpoints: if e["name"] == ep_name: ep = e break self.result = { "name": self._entry_name.get().strip() or ep_name, "endpoint_name": ep_name, "target_url": self._entry_url.get().strip(), "api_key": self._entry_key.get().strip(), "model": self._combo_model.get() or "", "priority": 99, } if ep: self.result["reasoning_enabled"] = ep.get("reasoning_enabled", True) self.result["reasoning_effort"] = ep.get("reasoning_effort", "medium") self.result["oauth_provider"] = ep.get("oauth_provider", "") self._dlg.destroy() class BGPPoolEditDialog: def __init__(self, parent, existing_name=None): self.result = False self._existing_name = existing_name self._parent_mgr = parent self._dlg = tk.Toplevel(parent._dlg if hasattr(parent, "_dlg") else parent) title = "Edit BGP Pool" if existing_name else "Create BGP Pool" self._dlg.title(title) self._dlg.geometry("620x500") self._dlg.transient(parent._dlg if hasattr(parent, "_dlg") else parent) self._dlg.grab_set() data = load_bgp_pools() pool = None if existing_name: for p in data.get("pools", []): if p["name"] == existing_name: pool = p break if not pool: pool = {"name": "", "strategy": "failover", "routes": []} main = ttk.Frame(self._dlg, padding=12) main.pack(fill="both", expand=True) grid = ttk.Frame(main) grid.pack(fill="x") ttk.Label(grid, text="Pool Name:").grid(row=0, column=0, sticky="e", padx=(0, 6), pady=2) self._entry_name = ttk.Entry(grid) self._entry_name.grid(row=0, column=1, sticky="ew", pady=2) self._entry_name.insert(0, pool["name"]) ttk.Label(grid, text="Strategy:").grid(row=1, column=0, sticky="e", padx=(0, 6), pady=2) self._combo_strategy = ttk.Combobox(grid, values=["failover", "race"], state="readonly") self._combo_strategy.grid(row=1, column=1, sticky="ew", pady=2) self._combo_strategy.set(pool.get("strategy", "failover")) grid.columnconfigure(1, weight=1) ttk.Label(main, text="Routes (double-click to remove):", font=("Segoe UI", 9, "bold")).pack(anchor="w", pady=(8, 2)) tree_frame = ttk.Frame(main) tree_frame.pack(fill="both", expand=True) cols = ("name", "endpoint", "url", "model", "priority") self._route_tree = ttk.Treeview(tree_frame, columns=cols, show="headings", height=8) for col, heading, w in [("name", "Route Name", 100), ("endpoint", "Endpoint", 120), ("url", "URL", 160), ("model", "Model", 120), ("priority", "Priority", 60)]: self._route_tree.heading(col, text=heading) self._route_tree.column(col, width=w, minwidth=50) rsb = ttk.Scrollbar(tree_frame, orient="vertical", command=self._route_tree.yview) self._route_tree.configure(yscrollcommand=rsb.set) self._route_tree.pack(side="left", fill="both", expand=True) rsb.pack(side="right", fill="y") self._routes = [] for r in pool.get("routes", []): self._routes.append(dict(r)) self._route_tree.insert("", "end", values=( r.get("name", ""), r.get("endpoint_name", ""), r.get("target_url", ""), r.get("model", ""), r.get("priority", 99))) btn_frame = ttk.Frame(main) btn_frame.pack(fill="x", pady=(6, 0)) ttk.Button(btn_frame, text="Add Route", command=self._add_route).pack(side="left", padx=(0, 4)) ttk.Button(btn_frame, text="Edit Route", command=self._edit_route).pack(side="left", padx=(0, 4)) ttk.Button(btn_frame, text="Remove Route", command=self._remove_route).pack(side="left", padx=(0, 4)) ttk.Button(btn_frame, text="Up", command=lambda: self._move_route(-1)).pack(side="left", padx=(0, 4)) ttk.Button(btn_frame, text="Down", command=lambda: self._move_route(1)).pack(side="left", padx=(0, 4)) save_frame = ttk.Frame(main) save_frame.pack(fill="x", pady=(8, 0)) ttk.Button(save_frame, text="Cancel", command=self._dlg.destroy).pack(side="right") ttk.Button(save_frame, text="Save", command=self._save).pack(side="right", padx=(8, 0)) def _add_route(self): endpoints = load_endpoints().get("endpoints", []) if not endpoints: messagebox.showinfo("Info", "No endpoints configured. Add endpoints first.", parent=self._dlg) return d = BGPRouteDialog(self._dlg, endpoints, None) if d.result: r = d.result self._routes.append(r) self._route_tree.insert("", "end", values=( r.get("name", ""), r.get("endpoint_name", ""), r.get("target_url", ""), r.get("model", ""), r.get("priority", 99))) def _edit_route(self): sel = self._route_tree.selection() if not sel: return idx = self._route_tree.index(sel[0]) endpoints = load_endpoints().get("endpoints", []) d = BGPRouteDialog(self._dlg, endpoints, self._routes[idx]) if d.result: r = d.result self._routes[idx] = r self._route_tree.item(sel[0], values=( r.get("name", ""), r.get("endpoint_name", ""), r.get("target_url", ""), r.get("model", ""), r.get("priority", 99))) def _remove_route(self): sel = self._route_tree.selection() if not sel: return idx = self._route_tree.index(sel[0]) self._route_tree.delete(sel[0]) del self._routes[idx] def _move_route(self, direction): sel = self._route_tree.selection() if not sel: return idx = self._route_tree.index(sel[0]) new_idx = idx + direction if new_idx < 0 or new_idx >= len(self._routes): return route = self._routes.pop(idx) self._routes.insert(new_idx, route) self._rebuild_routes_tree(new_idx) def _rebuild_routes_tree(self, select_idx=None): for item in self._route_tree.get_children(): self._route_tree.delete(item) for r in self._routes: self._route_tree.insert("", "end", values=( r.get("name", ""), r.get("endpoint_name", ""), r.get("target_url", ""), r.get("model", ""), r.get("priority", 99))) if select_idx is not None: children = self._route_tree.get_children() if select_idx < len(children): self._route_tree.selection_set(children[select_idx]) def _save(self): name = self._entry_name.get().strip() if not name: return strategy = self._combo_strategy.get() or "failover" routes = [] for i, r in enumerate(self._routes): if not r.get("target_url"): continue routes.append({ "name": r.get("name") or f"Route {i+1}", "endpoint_name": r.get("endpoint_name", ""), "target_url": r.get("target_url", ""), "api_key": r.get("api_key", ""), "model": r.get("model", ""), "priority": i + 1, "reasoning_enabled": True, "reasoning_effort": "medium", }) data = load_bgp_pools() if self._existing_name: data["pools"] = [p for p in data["pools"] if p["name"] != self._existing_name] data["pools"].append({"name": name, "strategy": strategy, "routes": routes}) save_bgp_pools(data) self.result = True self._dlg.destroy() class BGPPoolMgr: def __init__(self, parent, on_update=None): self._parent = parent self._on_update = on_update self._dlg = tk.Toplevel(parent) self._dlg.title("AI BGP -- Pool Manager") self._dlg.geometry("660x440") self._dlg.transient(parent) main = ttk.Frame(self._dlg, padding=12) main.pack(fill="both", expand=True) ttk.Label(main, text="AI BGP Pools -- multi-provider routing with automatic failover", font=("Segoe UI", 10, "bold")).pack(anchor="w") tree_frame = ttk.Frame(main) tree_frame.pack(fill="both", expand=True, pady=(8, 0)) cols = ("name", "routes", "strategy") self._tree = ttk.Treeview(tree_frame, columns=cols, show="headings", height=10) for col, heading, w in [("name", "Pool Name", 180), ("routes", "Routes", 280), ("strategy", "Strategy", 100)]: self._tree.heading(col, text=heading) self._tree.column(col, width=w, minwidth=60) sb = ttk.Scrollbar(tree_frame, orient="vertical", command=self._tree.yview) self._tree.configure(yscrollcommand=sb.set) self._tree.pack(side="left", fill="both", expand=True) sb.pack(side="right", fill="y") btn_frame = ttk.Frame(main) btn_frame.pack(fill="x", pady=(8, 0)) ttk.Button(btn_frame, text="Create Pool", command=self._add_pool).pack(side="left", padx=(0, 4)) ttk.Button(btn_frame, text="Edit Pool", command=self._edit_pool).pack(side="left", padx=(0, 4)) ttk.Button(btn_frame, text="Delete Pool", command=self._del_pool).pack(side="left", padx=(0, 4)) ttk.Button(btn_frame, text="Close", command=self._dlg.destroy).pack(side="right") self._rebuild() def _rebuild(self): for item in self._tree.get_children(): self._tree.delete(item) for pool in load_bgp_pools().get("pools", []): routes_str = " -> ".join(f'{r.get("name","?")}/{r.get("model","?")}' for r in pool.get("routes", [])) self._tree.insert("", "end", values=(pool["name"], routes_str, pool.get("strategy", "failover"))) def _selected_name(self): sel = self._tree.selection() if not sel: return None return self._tree.item(sel[0])["values"][0] def _add_pool(self): d = BGPPoolEditDialog(self, None) self._dlg.wait_window(d._dlg) if d.result: self._rebuild() if self._on_update: self._on_update() def _edit_pool(self): name = self._selected_name() if not name: return d = BGPPoolEditDialog(self, name) self._dlg.wait_window(d._dlg) if d.result: self._rebuild() if self._on_update: self._on_update() def _del_pool(self): name = self._selected_name() if not name: return if not messagebox.askyesno("Delete", f'Delete BGP pool "{name}"?', parent=self._dlg): return data = load_bgp_pools() data["pools"] = [p for p in data["pools"] if p["name"] != name] save_bgp_pools(data) self._rebuild() if self._on_update: self._on_update() # ═══════════════════════════════════════════════════════════════════════ # AI Monitoring Window # ═══════════════════════════════════════════════════════════════════════ class AIMonitoringWindow: def __init__(self, parent): self._dlg = tk.Toplevel(parent) self._dlg.title("AI Monitoring") self._dlg.geometry("580x520") self._dlg.transient(parent) self._cfg = load_monitoring_config() self._store = load_incident_store() main = ttk.Frame(self._dlg, padding=12) main.pack(fill="both", expand=True) hdr = ttk.Frame(main) hdr.pack(fill="x") ttk.Label(hdr, text="AI Monitoring", font=("Segoe UI", 11, "bold")).pack(side="left") self._toggle_var = tk.BooleanVar(value=self._cfg.get("enabled", False)) ttk.Checkbutton(hdr, text="Enabled", variable=self._toggle_var, command=self._on_toggle).pack(side="right") frame = ttk.LabelFrame(main, text="Diagnostic Agent", padding=8) frame.pack(fill="x", pady=(8, 0)) grid = ttk.Frame(frame) grid.pack(fill="x") grid.columnconfigure(1, weight=1) ttk.Label(grid, text="Provider URL:").grid(row=0, column=0, sticky="e", padx=(0, 6), pady=2) self._url_entry = ttk.Entry(grid) self._url_entry.grid(row=0, column=1, sticky="ew", pady=2) self._url_entry.insert(0, self._cfg.get("provider_url", "")) ttk.Label(grid, text="Model:").grid(row=1, column=0, sticky="e", padx=(0, 6), pady=2) self._model_entry = ttk.Entry(grid) self._model_entry.grid(row=1, column=1, sticky="ew", pady=2) self._model_entry.insert(0, self._cfg.get("model", "")) ttk.Label(grid, text="API Key:").grid(row=2, column=0, sticky="e", padx=(0, 6), pady=2) key_frame = ttk.Frame(grid) key_frame.grid(row=2, column=1, sticky="ew", pady=2) self._key_entry = ttk.Entry(key_frame, show="*") self._key_entry.pack(side="left", fill="x", expand=True) self._key_entry.insert(0, self._cfg.get("api_key", "")) self._reveal_key = tk.BooleanVar(value=False) ttk.Checkbutton(key_frame, text="Show", variable=self._reveal_key, command=lambda: self._key_entry.configure(show="" if self._reveal_key.get() else "*")).pack(side="left", padx=(4, 0)) ttk.Label(grid, text="Health Check:").grid(row=3, column=0, sticky="e", padx=(0, 6), pady=2) spin_frame = ttk.Frame(grid) spin_frame.grid(row=3, column=1, sticky="w", pady=2) self._interval_spin = ttk.Spinbox(spin_frame, from_=2, to=30, width=5) self._interval_spin.set(self._cfg.get("health_check_interval_s", 5)) self._interval_spin.pack(side="left") ttk.Label(spin_frame, text="seconds").pack(side="left", padx=(4, 0)) opts_frame = ttk.Frame(frame) opts_frame.pack(fill="x", pady=(4, 0)) self._auto_restart_var = tk.BooleanVar(value=self._cfg.get("auto_restart_proxy", True)) ttk.Checkbutton(opts_frame, text="Auto-restart proxy on crash", variable=self._auto_restart_var).pack(side="left") self._auto_switch_var = tk.BooleanVar(value=self._cfg.get("auto_switch_provider", False)) ttk.Checkbutton(opts_frame, text="Auto-switch provider on repeated failure", variable=self._auto_switch_var).pack(side="left", padx=(12, 0)) ttk.Button(frame, text="Save Configuration", command=self._on_save).pack(pady=(8, 0)) stats = self._store.get("stats", {"ai_calls": 0, "tokens_used": 0}) stats_text = (f"AI diagnostic calls: {stats.get('ai_calls', 0)} | " f"Tokens used: {stats.get('tokens_used', 0):,} | " f"Known patterns: {len(self._store.get('incidents', {}))}") ttk.Label(main, text=stats_text, font=("Segoe UI", 8)).pack(anchor="w", pady=(8, 0)) inc_frame = ttk.LabelFrame(main, text="Recent Incidents", padding=4) inc_frame.pack(fill="both", expand=True, pady=(4, 0)) self._inc_text = tk.Text(inc_frame, height=8, wrap="word", state="disabled") inc_sb = ttk.Scrollbar(inc_frame, orient="vertical", command=self._inc_text.yview) self._inc_text.configure(yscrollcommand=inc_sb.set) self._inc_text.pack(side="left", fill="both", expand=True) inc_sb.pack(side="right", fill="y") self._refresh_incidents() btn_frame = ttk.Frame(main) btn_frame.pack(fill="x", pady=(8, 0)) ttk.Button(btn_frame, text="View Monitoring Log", command=lambda: open_file(str(PROXY_CONFIG_DIR / "monitoring.log"))).pack(side="left") ttk.Button(btn_frame, text="Clear Incident Store", command=self._on_clear_store).pack(side="left", padx=(8, 0)) ttk.Button(btn_frame, text="Close", command=self._dlg.destroy).pack(side="right") def _on_toggle(self): self._cfg["enabled"] = self._toggle_var.get() save_monitoring_config(self._cfg) def _on_save(self): self._cfg["provider_url"] = self._url_entry.get().strip() self._cfg["model"] = self._model_entry.get().strip() self._cfg["api_key"] = self._key_entry.get().strip() try: self._cfg["health_check_interval_s"] = int(self._interval_spin.get()) except ValueError: pass self._cfg["auto_restart_proxy"] = self._auto_restart_var.get() self._cfg["auto_switch_provider"] = self._auto_switch_var.get() save_monitoring_config(self._cfg) self._inc_text.configure(state="normal") self._inc_text.delete("1.0", "end") self._inc_text.insert("end", "Configuration saved.\n") self._inc_text.configure(state="disabled") def _on_clear_store(self): save_incident_store({"version": 1, "incidents": {}, "stats": {"ai_calls": 0, "tokens_used": 0}}) self._store = {"version": 1, "incidents": {}, "stats": {"ai_calls": 0, "tokens_used": 0}} self._refresh_incidents() def _refresh_incidents(self): lines = [] for pattern, inc in sorted(self._store.get("incidents", {}).items(), key=lambda x: x[1].get("last_seen", ""), reverse=True): sc = inc.get("success_count", 0) fc = inc.get("fail_count", 0) rate = sc / max(sc + fc, 1) lines.append( f"[{inc.get('last_seen', '?')[:16]}] {pattern}\n" f" fix={inc.get('fix', '?')} success_rate={rate:.0%} seen={inc.get('occurrences', 0)}x\n" ) if not lines: lines.append("No incidents recorded yet.\n\nEnable AI Monitoring and use Codex to populate the store.\n") self._inc_text.configure(state="normal") self._inc_text.delete("1.0", "end") self._inc_text.insert("end", "\n".join(lines)) self._inc_text.configure(state="disabled") # ═══════════════════════════════════════════════════════════════════════ # Usage Dashboard # ═══════════════════════════════════════════════════════════════════════ class UsageWindow: def __init__(self, parent): self._U = _usage_theme() self._dlg = tk.Toplevel(parent) self._dlg.title("Usage Dashboard") self._dlg.geometry("720x640") self._dlg.transient(parent) self._dlg.configure(bg=self._U["base"]) self._build_header() self._build_summary_strip() ttk.Separator(self._dlg).pack(fill="x", padx=16) self._cards_frame = tk.Frame(self._dlg, bg=self._U["base"]) canvas = tk.Canvas(self._cards_frame, bg=self._U["base"], highlightthickness=0) scrollbar = ttk.Scrollbar(self._cards_frame, orient="vertical", command=canvas.yview) self._cards_inner = tk.Frame(canvas, bg=self._U["base"]) self._cards_inner.bind("", lambda e: canvas.configure(scrollregion=canvas.bbox("all"))) canvas.create_window((0, 0), window=self._cards_inner, anchor="nw") canvas.configure(yscrollcommand=scrollbar.set) canvas.pack(side="left", fill="both", expand=True, padx=(16, 0)) scrollbar.pack(side="right", fill="y") self._cards_frame.pack(fill="both", expand=True, pady=(8, 0)) self._refresh() def _build_header(self): U = self._U hdr = tk.Frame(self._dlg, bg=U["base"]) hdr.pack(fill="x", padx=16, pady=(12, 6)) tk.Label(hdr, text="⚡", fg=U["accent"], bg=U["base"], font=("Segoe UI", 14)).pack(side="left") tk.Label(hdr, text="Usage Dashboard", fg=U["text"], bg=U["base"], font=("Segoe UI", 14, "bold")).pack(side="left", padx=(4, 0)) self._status_dots = tk.Label(hdr, text="", fg=U["text"], bg=U["base"], font=("Segoe UI", 9)) self._status_dots.pack(side="left", padx=(8, 0)) self._updated_lbl = tk.Label(hdr, text="Never", fg=U["dim"], bg=U["base"], font=("Segoe UI", 8)) self._updated_lbl.pack(side="right") refresh_btn = tk.Button(hdr, text="Refresh", fg=U["text"], bg=U["surface0"], activebackground=U["surface1"], relief="flat", bd=0, command=self._refresh, padx=12, pady=2) refresh_btn.pack(side="right", padx=(8, 0)) def _build_summary_strip(self): U = self._U strip = tk.Frame(self._dlg, bg=U["surface0"], padx=12, pady=8) strip.pack(fill="x", padx=16, pady=(0, 6)) self._kpi_labels = {} for key, label, icon in [("providers", "Providers", "\U0001F4CA"), ("requests", "Requests", "⚡"), ("tokens", "Tokens", "\U0001F9E0"), ("latency", "Avg Latency", "⏱")]: box = tk.Frame(strip, bg=U["surface0"]) box.pack(side="left", padx=(0, 20)) tk.Label(box, text=f"{icon} {label}", fg=U["dim"], bg=U["surface0"], font=("Segoe UI", 8), anchor="w").pack(anchor="w") val = tk.Label(box, text="-", fg=U["text"], bg=U["surface0"], font=("Segoe UI", 9, "bold"), anchor="w") val.pack(anchor="w") self._kpi_labels[key] = val def _refresh(self): for w in self._cards_inner.winfo_children(): w.destroy() stats = load_usage_stats() updated = stats.get("updated") if updated: self._updated_lbl.configure(text=updated) providers = stats.get("providers", {}) if not providers: tk.Label(self._cards_inner, text="No usage data yet.\nLaunch a session to start tracking.", fg=self._U["dim"], bg=self._U["base"], font=("Segoe UI", 11)).pack(pady=60) return total_req = total_tok_in = total_tok_out = 0 total_dur = 0.0 n_ok = n_warn = n_err = 0 sorted_providers = sorted(providers.items(), key=lambda x: x[1].get("total_requests", 0), reverse=True) for prov_name, prov_data in sorted_providers: t = prov_data.get("total_requests", 0) total_req += t total_tok_in += prov_data.get("total_tokens_in", 0) total_tok_out += prov_data.get("total_tokens_out", 0) total_dur += prov_data.get("total_duration_s", 0.0) fail = prov_data.get("failures", 0) fail_pct = fail / t if t > 0 else 0 if fail_pct > 0.15: n_err += 1 elif fail_pct > 0.05: n_warn += 1 else: n_ok += 1 self._kpi_labels["providers"].configure(text=str(len(providers))) self._kpi_labels["requests"].configure(text=f"{total_req:,}") tok_sum = total_tok_in + total_tok_out tok_str = f"{_fmt_tok(tok_sum)} in:{_fmt_tok(total_tok_in)} out:{_fmt_tok(total_tok_out)}" if tok_sum else "N/A" self._kpi_labels["tokens"].configure(text=tok_str) avg_lat = total_dur / total_req if total_req > 0 else 0 self._kpi_labels["latency"].configure(text=_fmt_dur(avg_lat)) dots = "" if n_ok: dots += f"●{n_ok} " if n_warn: dots += f"◐{n_warn} " if n_err: dots += f"✗{n_err}" self._status_dots.configure(text=dots) for prov_name, prov_data in sorted_providers: self._build_card(prov_name, prov_data) def _build_card(self, name, data): U = self._U total = data.get("total_requests", 0) ok = data.get("successes", 0) fail = data.get("failures", 0) success_rate = ok / total if total > 0 else 1.0 fail_pct = fail / total if total > 0 else 0 status_text, status_color = _status_pill(success_rate, fail_pct) card = tk.Frame(self._cards_inner, bg=U["surface0"], padx=14, pady=10, highlightbackground=status_color, highlightthickness=1) card.pack(fill="x", pady=(0, 6)) top = tk.Frame(card, bg=U["surface0"]) top.pack(fill="x") tk.Label(top, text="●", fg=status_color, bg=U["surface0"], font=("Segoe UI", 10)).pack(side="left") short = name.replace("https://", "").replace("http://", "").split("/")[0] tk.Label(top, text=short, fg=U["text"], bg=U["surface0"], font=("Segoe UI", 10, "bold")).pack(side="left", padx=(4, 0)) tk.Label(top, text=f" {status_text} ", fg=U["base"], bg=status_color, font=("Segoe UI", 8, "bold")).pack(side="left", padx=(4, 0)) tk.Label(top, text=f"{total} req", fg=U["subtext"], bg=U["surface0"], font=("Segoe UI", 8)).pack(side="left", padx=(6, 0)) last_used = data.get("last_used", "") if last_used: tk.Label(top, text=last_used, fg=U["dim"], bg=U["surface0"], font=("Segoe UI", 7)).pack(side="right") gauge = tk.Frame(card, bg=U["surface0"]) gauge.pack(fill="x", pady=(4, 0)) bar_frame = tk.Frame(gauge, bg=U["surface1"], height=12) bar_frame.pack(fill="x", side="left", expand=True) bar_frame.pack_propagate(False) fill_pct = int(success_rate * 100) fill_frame = tk.Frame(bar_frame, bg=status_color, height=12) fill_frame.place(relwidth=success_rate, relheight=1.0) tk.Label(gauge, text=f"{fill_pct}%", fg=U["subtext"], bg=U["surface0"], font=("Segoe UI", 8)).pack(side="left", padx=(4, 0)) if fail > 0: tk.Label(gauge, text=f"{fail} fail", fg=U["red"], bg=U["surface0"], font=("Segoe UI", 8)).pack(side="right") metrics = tk.Frame(card, bg=U["surface0"]) metrics.pack(fill="x", pady=(4, 0)) t_in = data.get("total_tokens_in", 0) t_out = data.get("total_tokens_out", 0) dur = data.get("total_duration_s", 0.0) avg_dur = dur / total if total > 0 else 0 for label, value, color in [("Tokens In", _fmt_tok(t_in), U["sapphire"]), ("Tokens Out", _fmt_tok(t_out), U["peach"]), ("Avg Latency", _fmt_dur(avg_dur), U["sky"]), ("Duration", _fmt_dur(dur), U["lavender"])]: box = tk.Frame(metrics, bg=U["surface0"]) box.pack(side="left", padx=(0, 16)) tk.Label(box, text=label, fg=U["dim"], bg=U["surface0"], font=("Segoe UI", 7)).pack(anchor="w") tk.Label(box, text=value, fg=color, bg=U["surface0"], font=("Segoe UI", 9, "bold")).pack(anchor="w") models = data.get("models", {}) if models: models_frame = tk.Frame(card, bg=U["surface0"]) models_frame.pack(fill="x", pady=(4, 0)) tk.Label(models_frame, text="Models:", fg=U["lavender"], bg=U["surface0"], font=("Segoe UI", 8, "bold")).pack(anchor="w") sorted_models = sorted(models.items(), key=lambda x: x[1].get("requests", 0), reverse=True) for i, (mname, mdata) in enumerate(sorted_models[:6]): m_req = mdata.get("requests", 0) pct = m_req / total * 100 if total > 0 else 0 color = U["model_palette"][i % len(U["model_palette"])] row = tk.Frame(models_frame, bg=U["surface0"]) row.pack(fill="x") tk.Label(row, text=f"● {mname}", fg=color, bg=U["surface0"], font=("Segoe UI", 7)).pack(side="left") tk.Label(row, text=f"{pct:.0f}% ({m_req})", fg=U["dim"], bg=U["surface0"], font=("Segoe UI", 7)).pack(side="left", padx=(8, 0)) last_err = data.get("last_error") if last_err: err_frame = tk.Frame(card, bg=U["surface0"]) err_frame.pack(fill="x", pady=(4, 0)) tk.Label(err_frame, text=f"⚠ {last_err}", fg=U["red"], bg=U["surface0"], font=("Segoe UI", 7)).pack(anchor="w") # ═══════════════════════════════════════════════════════════════════════ # Request History Window # ═══════════════════════════════════════════════════════════════════════ class RequestHistoryWindow: def __init__(self, parent): self._snap_dir = PROXY_CONFIG_DIR / "requests" self._dlg = tk.Toplevel(parent) self._dlg.title("Request History") self._dlg.geometry("720x500") self._dlg.transient(parent) main = ttk.Frame(self._dlg, padding=10) main.pack(fill="both", expand=True) hdr = ttk.Frame(main) hdr.pack(fill="x") ttk.Label(hdr, text="Request History", font=("Segoe UI", 11, "bold")).pack(side="left") ttk.Button(hdr, text="Clear All", command=self._clear_all).pack(side="right") ttk.Button(hdr, text="Refresh", command=self._load).pack(side="right", padx=(0, 4)) paned = ttk.PanedWindow(main, orient="vertical") paned.pack(fill="both", expand=True, pady=(6, 0)) top_frame = ttk.Frame(paned) cols = ("time", "model", "status", "duration", "id", "error") self._tree = ttk.Treeview(top_frame, columns=cols, show="headings", height=10) for col, heading, w in [("time", "Time", 140), ("model", "Model", 140), ("status", "Status", 80), ("duration", "Duration", 70), ("id", "ID", 180), ("error", "Error", 120)]: self._tree.heading(col, text=heading) self._tree.column(col, width=w, minwidth=50) tree_sb = ttk.Scrollbar(top_frame, orient="vertical", command=self._tree.yview) self._tree.configure(yscrollcommand=tree_sb.set) self._tree.pack(side="left", fill="both", expand=True) tree_sb.pack(side="right", fill="y") paned.add(top_frame, weight=1) bottom_frame = ttk.Frame(paned) self._detail = tk.Text(bottom_frame, height=10, wrap="word", font=("Consolas", 9)) detail_sb = ttk.Scrollbar(bottom_frame, orient="vertical", command=self._detail.yview) self._detail.configure(yscrollcommand=detail_sb.set) self._detail.pack(side="left", fill="both", expand=True) detail_sb.pack(side="right", fill="y") paned.add(bottom_frame, weight=1) self._tree.bind("<>", self._on_select) self._snapshots = [] self._load() def _load(self): for item in self._tree.get_children(): self._tree.delete(item) self._snapshots = [] if not self._snap_dir.exists(): return files = sorted(self._snap_dir.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True) for f in files[:200]: try: data = json.loads(f.read_text()) meta = data.get("_meta", {}) self._snapshots.append(data) ts = meta.get("ts_iso", "")[:19].replace("T", " ") model = meta.get("model", "?") status = meta.get("status", "unknown") dur = f"{meta['duration_s']:.1f}s" if meta.get("duration_s") is not None else "-" rid = meta.get("request_id", "")[:28] err = (meta.get("error") or "")[:60] self._tree.insert("", "end", values=(ts, model, status, dur, rid, err)) except Exception: pass def _on_select(self, event): sel = self._tree.selection() if not sel: return idx = self._tree.index(sel[0]) if idx < len(self._snapshots): data = self._snapshots[idx] self._detail.delete("1.0", "end") self._detail.insert("end", json.dumps(data, indent=2, ensure_ascii=False)[:50000]) def _clear_all(self): if not messagebox.askyesno("Clear All", "Delete all request snapshots?", parent=self._dlg): return if self._snap_dir.exists(): for f in self._snap_dir.glob("*.json"): try: f.unlink() except Exception: pass for item in self._tree.get_children(): self._tree.delete(item) self._snapshots = [] self._detail.delete("1.0", "end") # ═══════════════════════════════════════════════════════════════════════ # Benchmark Window # ═══════════════════════════════════════════════════════════════════════ class BenchmarkWindow: _BENCH_PROMPT = "In exactly 3 bullet points, explain why the sky is blue." _BENCH_TOOLS = [{"type": "function", "function": {"name": "get_weather", "parameters": {"type": "object", "properties": {"city": {"type": "string"}}}}}] def __init__(self, parent): self._dlg = tk.Toplevel(parent) self._dlg.title("Model Benchmark") self._dlg.geometry("820x560") self._dlg.transient(parent) self._running = False self._ep_data = load_endpoints() main = ttk.Frame(self._dlg, padding=10) main.pack(fill="both", expand=True) hdr = ttk.Frame(main) hdr.pack(fill="x") ttk.Label(hdr, text="Multi-Provider Benchmark", font=("Segoe UI", 11, "bold")).pack(side="left") self._run_btn = ttk.Button(hdr, text="Run Benchmark", command=self._run) self._run_btn.pack(side="right") lanes_frame = ttk.Frame(main) lanes_frame.pack(fill="x", pady=(8, 0)) self._lanes = [] self._c_var = tk.BooleanVar(value=False) for i, lane_label in enumerate(["A", "B", "C"]): if i == 2: lf = ttk.LabelFrame(lanes_frame, text="Lane C (optional)") cb = ttk.Checkbutton(lanes_frame, text="Enable Lane C", variable=self._c_var, command=lambda: lf.configure() if not self._c_var.get() else None) else: lf = ttk.LabelFrame(lanes_frame, text=f"Lane {lane_label}") lf.pack(side="left", fill="both", expand=True, padx=(0, 4 if i < 2 else 0)) ep_frame = ttk.Frame(lf, padding=4) ep_frame.pack(fill="x") ttk.Label(ep_frame, text="Endpoint:").pack(side="left") ep_combo = ttk.Combobox(ep_frame, values=[e["name"] for e in self._ep_data.get("endpoints", [])], state="readonly") ep_combo.pack(side="left", fill="x", expand=True, padx=(4, 0)) m_frame = ttk.Frame(lf, padding=4) m_frame.pack(fill="x") ttk.Label(m_frame, text="Model:").pack(side="left") m_combo = ttk.Combobox(m_frame, state="readonly") m_combo.pack(side="left", fill="x", expand=True, padx=(4, 0)) ep_combo.bind("<>", lambda e, mc=m_combo: self._update_lane_models(ep_combo, mc)) self._lanes.append({"ep": ep_combo, "model": m_combo}) default_name = self._ep_data.get("default") eps = self._ep_data.get("endpoints", []) if default_name: self._lanes[0]["ep"].set(default_name) if len(eps) > 1: self._lanes[1]["ep"].set(eps[1]["name"]) elif eps: self._lanes[1]["ep"].set(eps[0]["name"]) if len(eps) > 2: self._lanes[2]["ep"].set(eps[2]["name"]) elif len(eps) > 1: self._lanes[2]["ep"].set(eps[1]["name"]) tests_frame = ttk.Frame(main) tests_frame.pack(fill="x", pady=(8, 0)) self._test_ttft = tk.BooleanVar(value=True) self._test_total = tk.BooleanVar(value=True) self._test_tools = tk.BooleanVar(value=True) self._test_tps = tk.BooleanVar(value=True) ttk.Checkbutton(tests_frame, text="Time to First Token", variable=self._test_ttft).pack(side="left") ttk.Checkbutton(tests_frame, text="Total Latency", variable=self._test_total).pack(side="left", padx=(8, 0)) ttk.Checkbutton(tests_frame, text="Tool Call", variable=self._test_tools).pack(side="left", padx=(8, 0)) ttk.Checkbutton(tests_frame, text="Tokens/sec", variable=self._test_tps).pack(side="left", padx=(8, 0)) results_frame = ttk.Frame(main) results_frame.pack(fill="both", expand=True, pady=(8, 0)) cols = ("test", "a", "b", "c", "winner") self._results_tree = ttk.Treeview(results_frame, columns=cols, show="headings", height=6) for col, heading in [("test", "Test"), ("a", "Lane A"), ("b", "Lane B"), ("c", "Lane C"), ("winner", "Winner")]: self._results_tree.heading(col, text=heading) self._results_tree.column(col, width=150, minwidth=80) rsb = ttk.Scrollbar(results_frame, orient="vertical", command=self._results_tree.yview) self._results_tree.configure(yscrollcommand=rsb.set) self._results_tree.pack(side="left", fill="both", expand=True) rsb.pack(side="right", fill="y") self._status_var = tk.StringVar(value="Select endpoints and models per lane, then Run Benchmark.") ttk.Label(main, textvariable=self._status_var).pack(anchor="w", pady=(4, 0)) def _update_lane_models(self, ep_combo, model_combo): name = ep_combo.get() if not name: return ep = get_endpoint(name) models = (ep or {}).get("models", []) model_combo["values"] = models if models: model_combo.set(models[0]) def _collect_lanes(self): active = [] for i, lane in enumerate(self._lanes): if i == 2 and not self._c_var.get(): continue ep_name = lane["ep"].get() model = lane["model"].get() if not ep_name or not model: continue ep = get_endpoint(ep_name) if not ep: continue active.append({"ep": ep, "model": model, "label": f"{ep_name}/{model}"}) return active def _bench_single(self, ep, model, stream, with_tools=False): url = normalize_base_url(ep.get("base_url", "")) key = (ep.get("api_key") or "").strip() bt = ep.get("backend_type", "openai-compat") if bt == "anthropic": test_url = f"{url}/v1/messages" headers = {"User-Agent": UA, "x-api-key": key, "anthropic-version": "2023-06-01", "content-type": "application/json"} body = {"model": model, "max_tokens": 100, "stream": stream, "messages": [{"role": "user", "content": self._BENCH_PROMPT}]} if with_tools: body["tools"] = self._BENCH_TOOLS body["messages"] = [{"role": "user", "content": "Use get_weather for Paris"}] data = json.dumps(body).encode() else: test_url = f"{url}/chat/completions" headers = {"User-Agent": UA, "Authorization": f"Bearer {key}", "content-type": "application/json"} body = {"model": model, "max_tokens": 100, "stream": stream, "messages": [{"role": "user", "content": self._BENCH_PROMPT}]} if with_tools: body["tools"] = self._BENCH_TOOLS body["messages"] = [{"role": "user", "content": "Use get_weather for Paris"}] data = json.dumps(body).encode() req = urllib.request.Request(test_url, data=data, headers=headers, method="POST") t0 = time.time() ttft = None try: resp = urllib.request.urlopen(req, timeout=60) if stream: first_chunk_time = None chunks = [] while True: chunk = resp.read(4096) if not chunk: break if first_chunk_time is None: first_chunk_time = time.time() ttft = first_chunk_time - t0 chunks.append(chunk) total = time.time() - t0 result_text = b"".join(chunks).decode(errors="replace")[:300] else: raw = resp.read() total = time.time() - t0 result_text = raw.decode(errors="replace")[:300] payload = json.loads(raw) choices = payload.get("choices", []) if choices: msg = choices[0].get("message", {}) if with_tools: tcs = msg.get("tool_calls", []) has_tools = len(tcs) > 0 return {"ttft": ttft or total, "total": total, "detail": f"tools={has_tools}, tok={payload.get('usage', {}).get('total_tokens', '?')}"} content = msg.get("content", "")[:50] return {"ttft": ttft or total, "total": total, "detail": f"{content[:40]}... tok={payload.get('usage', {}).get('total_tokens', '?')}"} return {"ttft": ttft or total, "total": total, "detail": result_text[:60]} except Exception as e: total = time.time() - t0 return {"ttft": ttft or total, "total": total, "detail": f"Error: {str(e)[:40]}"} def _bench_tps(self, ep, model): url = normalize_base_url(ep.get("base_url", "")) key = (ep.get("api_key") or "").strip() bt = ep.get("backend_type", "openai-compat") prompt = "Write a detailed paragraph about artificial intelligence in at least 150 words." max_tok = 512 if bt == "anthropic": test_url = f"{url}/v1/messages" headers = {"User-Agent": UA, "x-api-key": key, "anthropic-version": "2023-06-01", "content-type": "application/json"} else: test_url = f"{url}/chat/completions" headers = {"User-Agent": UA, "Authorization": f"Bearer {key}", "content-type": "application/json"} body = json.dumps({"model": model, "max_tokens": max_tok, "stream": True, "messages": [{"role": "user", "content": prompt}]}).encode() req = urllib.request.Request(test_url, data=body, headers=headers, method="POST") t0 = time.time() first_token_t = None token_count = 0 try: resp = urllib.request.urlopen(req, timeout=90) buf = b"" while True: chunk = resp.read(4096) if not chunk: break if first_token_t is None: first_token_t = time.time() buf += chunk total = time.time() - t0 text = buf.decode(errors="replace") for line in text.split("\n"): if line.startswith("data: ") and line != "data: [DONE]": try: d = json.loads(line[6:]) content = d.get("choices", [{}])[0].get("delta", {}).get("content", "") if content: token_count += max(1, len(content) / 4) except Exception: pass if token_count == 0: token_count = max(1, len(text) / 4) gen_time = (time.time() - first_token_t) if first_token_t else total tps = token_count / gen_time if gen_time > 0 else 0 return {"tps": tps, "tokens": int(token_count), "gen_time": gen_time, "total": total, "detail": f"{int(token_count)} tok / {gen_time:.1f}s"} except Exception as e: total = time.time() - t0 return {"tps": 0, "tokens": 0, "gen_time": total, "total": total, "detail": f"Error: {str(e)[:40]}"} def _run(self): if self._running: return lanes = self._collect_lanes() if len(lanes) < 2: self._status_var.set("Need at least 2 lanes with endpoint + model selected.") return self._running = True self._run_btn.configure(state="disabled") for item in self._results_tree.get_children(): self._results_tree.delete(item) self._status_var.set("Running benchmark...") threading.Thread(target=self._run_bench, args=(lanes,), daemon=True).start() def _run_bench(self, lanes): results = [] tests = [] if self._test_ttft.get(): tests.append(("TTFT (stream)", True, False)) if self._test_total.get(): tests.append(("Total latency", False, False)) if self._test_tools.get(): tests.append(("Tool call", False, True)) run_tps = self._test_tps.get() for test_name, stream, tools in tests: lane_results = [] for lane in lanes: label = lane["label"] self._dlg.after(0, lambda l=label: self._status_var.set(f"Running {test_name}: {l}...")) r = self._bench_single(lane["ep"], lane["model"], stream, tools) lane_results.append((label, r)) metric = "ttft" if stream else "total" values = [(lr[0], lr[1][metric]) for lr in lane_results] sorted_v = sorted(values, key=lambda x: x[1]) best_val = sorted_v[0][1] second_val = sorted_v[1][1] if len(sorted_v) > 1 else best_val + 1 if best_val < second_val * 0.85: winner = sorted_v[0][0] else: winner = "Tie" cols = [] for lr in lane_results: v = lr[1][metric] cols.append(f"{v:.2f}s ({lr[1]['detail'][:30]})") while len(cols) < 3: cols.append("--") cols.append(winner) results.append(tuple([test_name] + cols)) if run_tps: lane_tps = [] for lane in lanes: label = lane["label"] self._dlg.after(0, lambda l=label: self._status_var.set(f"Tokens/sec: {l}...")) r = self._bench_tps(lane["ep"], lane["model"]) lane_tps.append((label, r)) tps_vals = [(lt[0], lt[1]["tps"]) for lt in lane_tps] sorted_tps = sorted(tps_vals, key=lambda x: x[1], reverse=True) best_tps = sorted_tps[0][1] second_tps = sorted_tps[1][1] if len(sorted_tps) > 1 else 0 if best_tps > 0 and second_tps > 0 and best_tps > second_tps * 1.15: winner_tps = sorted_tps[0][0] else: winner_tps = "Tie" cols_tps = [] for lt in lane_tps: tps = lt[1]["tps"] cols_tps.append(f"{tps:.1f} t/s ({lt[1]['detail'][:25]})") while len(cols_tps) < 3: cols_tps.append("--") cols_tps.append(winner_tps) results.append(tuple(["Tokens/sec"] + cols_tps)) def _show(): for row in results: self._results_tree.insert("", "end", values=row) self._status_var.set("Benchmark complete.") self._running = False self._run_btn.configure(state="normal") self._dlg.after(0, _show) # ═══════════════════════════════════════════════════════════════════════ # Main Launcher Window # ═══════════════════════════════════════════════════════════════════════ class LauncherWin: def __init__(self, root): self._root = root self._proc = None self._endpoints_data = load_endpoints() self._refresh_running = False recover_config_if_needed() main = ttk.Frame(root, padding=16) main.pack(fill="both", expand=True) main.pack_propagate(False) # Title hdr = ttk.Frame(main) hdr.pack(fill="x") ttk.Label(hdr, text=f"Codex Launcher v{CHANGELOG[0][0]}", font=("Segoe UI", 13, "bold")).pack(side="left") # Toolbar — two rows to fit all buttons tb1 = ttk.Frame(main) tb1.pack(fill="x", pady=(6, 0)) ttk.Button(tb1, text="Endpoints...", command=self._open_mgr).pack(side="left") ttk.Button(tb1, text="AI Monitor", command=self._open_monitoring).pack(side="left", padx=(6, 0)) ttk.Button(tb1, text="AI BGP", command=self._open_bgp).pack(side="left", padx=(6, 0)) ttk.Button(tb1, text="Usage", command=self._open_usage).pack(side="left", padx=(6, 0)) ttk.Button(tb1, text="Benchmark", command=self._open_benchmark).pack(side="left", padx=(6, 0)) ttk.Button(tb1, text="History", command=self._open_history).pack(side="left", padx=(6, 0)) ttk.Button(tb1, text="OAuth Secrets", command=self._edit_oauth_secrets).pack(side="left", padx=(6, 0)) ttk.Button(tb1, text="Changelog", command=self._show_changelog).pack(side="right") # Detection status — one row per item so long paths don't truncate self._cli_info = detect_codex_cli() self._desktop_info = detect_codex_desktop() cli_row = ttk.Frame(main) cli_row.pack(fill="x", pady=(4, 0)) if self._cli_info: cli_path, cli_ver = self._cli_info ttk.Label(cli_row, text=f"✓ Codex CLI {cli_ver}", foreground="#2ea043").pack(side="left") ttk.Label(cli_row, text=f" ({cli_path})", foreground="gray").pack(side="left") else: ttk.Label(cli_row, text="✗ Codex CLI -- not found", foreground="#d29922").pack(side="left") ttk.Button(cli_row, text="Install", command=lambda: self._show_install_guide("cli")).pack(side="left", padx=(6, 0)) desk_row = ttk.Frame(main) desk_row.pack(fill="x", pady=(2, 0)) if self._desktop_info: ttk.Label(desk_row, text="✓ Codex Desktop", foreground="#2ea043").pack(side="left") ttk.Label(desk_row, text=f" ({self._desktop_info})", foreground="gray").pack(side="left") else: ttk.Label(desk_row, text="✗ Codex Desktop -- not found", foreground="#d29922").pack(side="left") ttk.Button(desk_row, text="Install", command=lambda: self._show_install_guide("desktop")).pack(side="left", padx=(6, 0)) self._missing = [] if not self._cli_info: self._missing.append("cli") if not self._desktop_info: self._missing.append("desktop") # Auth status auth_frame = ttk.Frame(main) auth_frame.pack(fill="x", pady=(6, 0)) self._auth_label = ttk.Label(auth_frame, text="Checking auth...") self._auth_label.pack(side="left") self._relogin_btn = ttk.Button(auth_frame, text="Re-login", command=self._codex_relogin, state="disabled") self._relogin_btn.pack(side="right") threading.Thread(target=self._check_auth_async, daemon=True).start() # Ops bar ops_frame = ttk.Frame(main) ops_frame.pack(fill="x", pady=(6, 0)) self._refresh_all_btn = ttk.Button(ops_frame, text="Refresh Models", command=self._refresh_all_models) self._refresh_all_btn.pack(side="left") ttk.Button(ops_frame, text="Backup Profile", command=self._backup_profile).pack(side="left", padx=(8, 0)) ttk.Button(ops_frame, text="Import Profile", command=self._import_profile).pack(side="left", padx=(8, 0)) # Endpoint + Model selectors sel_frame = ttk.Frame(main) sel_frame.pack(fill="x", pady=(6, 0)) ttk.Label(sel_frame, text="Endpoint:").pack(side="left") self._combo_ep = ttk.Combobox(sel_frame, state="readonly", width=24) self._combo_ep.pack(side="left", padx=(4, 0)) self._combo_ep.bind("<>", lambda e: self._on_endpoint_changed()) ttk.Label(sel_frame, text="Model:").pack(side="left", padx=(12, 0)) self._combo_model = ttk.Combobox(sel_frame, state="readonly", width=24) self._combo_model.pack(side="left", padx=(4, 0)) # Launch buttons btn_frame1 = ttk.Frame(main) btn_frame1.pack(fill="x", pady=(8, 0)) self._btn_desktop = ttk.Button(btn_frame1, text="Launch Desktop", command=lambda: self._launch("desktop")) if "desktop" in self._missing: self._btn_desktop.configure(state="disabled") self._btn_desktop.pack(side="left", fill="x", expand=True, padx=(0, 4)) self._btn_cli = ttk.Button(btn_frame1, text="Launch CLI", command=lambda: self._launch("cli")) if "cli" in self._missing: self._btn_cli.configure(state="disabled") self._btn_cli.pack(side="left", fill="x", expand=True) btn_frame2 = ttk.Frame(main) btn_frame2.pack(fill="x", pady=(4, 0)) self._btn_codex_desktop = ttk.Button(btn_frame2, text="Codex Default (Desktop)", command=lambda: self._launch_codex_default("desktop")) if "desktop" in self._missing: self._btn_codex_desktop.configure(state="disabled") self._btn_codex_desktop.pack(side="left", fill="x", expand=True, padx=(0, 4)) self._btn_codex_cli = ttk.Button(btn_frame2, text="Codex Default (CLI)", command=lambda: self._launch_codex_default("cli")) if "cli" in self._missing: self._btn_codex_cli.configure(state="disabled") self._btn_codex_cli.pack(side="left", fill="x", expand=True) # Log area self._log_text = scrolledtext.ScrolledText(main, height=10, state="disabled", wrap="word", font=("Consolas", 9)) self._log_text.pack(fill="both", expand=True, pady=(8, 0)) # Bottom bar bb = ttk.Frame(main) bb.pack(fill="x", pady=(6, 0)) ttk.Button(bb, text="AI Assistant", command=self._open_assistant).pack(side="left") self._kill_btn = ttk.Button(bb, text="Kill && Cleanup", command=self._kill, state="disabled") self._kill_btn.pack(side="left", fill="x", expand=True, padx=(8, 0)) ttk.Button(bb, text="View Log", command=lambda: open_file(str(LAUNCH_LOG))).pack(side="left") ttk.Button(bb, text="Close", command=self._do_close).pack(side="left", padx=(8, 0)) self._rebuild_combo() self._log_dependency_status() self._start_watcher() # ── Logging ────────────────────────────────────────────────────── def log(self, msg): self._root.after(0, self._append_log, msg) def _append_log(self, msg): self._log_text.configure(state="normal") self._log_text.insert("end", msg + "\n") self._log_text.see("end") self._log_text.configure(state="disabled") def _log_dependency_status(self): if self._cli_info: _, ver = self._cli_info self.log(f"✓ Codex CLI detected ({ver})") else: self.log("✗ Codex CLI NOT found -- CLI launch disabled.") if self._desktop_info: self.log(f"✓ Codex Desktop detected ({self._desktop_info})") else: self.log("✗ Codex Desktop NOT found -- Desktop launch disabled.") if self._missing: self.log("Install missing tools before using the launcher.") else: self.log("All dependencies OK.") # ── Auth ───────────────────────────────────────────────────────── def _check_auth_async(self): status, msg = check_codex_auth() self._root.after(0, lambda: self._update_auth_status(status, msg)) def _update_auth_status(self, status, msg): if status == "logged_in": self._auth_label.configure(text=f"✓ Auth: {msg}", foreground="#2ea043") self._relogin_btn.configure(state="normal" if "cli" not in self._missing else "disabled") elif status == "not_installed": self._auth_label.configure(text="Auth: N/A (CLI not installed)", foreground="#888") else: self._auth_label.configure(text=f"⚠ Auth: {msg}", foreground="#d29922") self._relogin_btn.configure(state="normal" if "cli" not in self._missing else "disabled") def _codex_relogin(self): self.log("Opening codex login in terminal...") term = detect_terminal() if not term: self.log("ERROR: no terminal emulator found for re-login") return term_name, term_args, term_path = term cmd_parts = [term_name] + term_args + ["codex", "login"] if IS_WINDOWS: subprocess.Popen(cmd_parts, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP) else: subprocess.Popen(cmd_parts, preexec_fn=os.setsid) self.log("Login flow started in terminal. Re-checking auth in 30s...") self._auth_label.configure(text="Auth: waiting for login...") threading.Thread(target=lambda: (time.sleep(30), self._check_auth_async()), daemon=True).start() # ── Combo management ───────────────────────────────────────────── def _rebuild_combo(self): self._endpoints_data = load_endpoints() ep_names = [e["name"] for e in self._endpoints_data["endpoints"]] bgp_names = [f"\U0001F500 {p['name']}" for p in load_bgp_pools().get("pools", [])] all_names = ep_names + bgp_names self._combo_ep["values"] = all_names if all_names: default = self._endpoints_data.get("default") if default and default in ep_names: self._combo_ep.set(default) else: self._combo_ep.set(all_names[0]) self._on_endpoint_changed() def _on_endpoint_changed(self): name = self._combo_ep.get() is_bgp = name.startswith("\U0001F500 ") bgp_name = name[2:] if is_bgp else None ep = get_endpoint(name) if name and not is_bgp else None models = [] if is_bgp: for p in load_bgp_pools().get("pools", []): if p["name"] == bgp_name: seen = set() for r in p.get("routes", []): m = r.get("model", "") if m and m not in seen: models.append(m) seen.add(m) break elif ep: models = ep.get("models", []) self._combo_model["values"] = models if ep and ep.get("default_model") in models: self._combo_model.set(ep["default_model"]) elif models: self._combo_model.set(models[0]) else: self._combo_model.set("") # ── Window openers ─────────────────────────────────────────────── def _on_endpoints_updated(self): self._rebuild_combo() def _open_mgr(self): EndpointMgr(self._root, on_update=self._on_endpoints_updated) def _open_bgp(self): BGPPoolMgr(self._root, on_update=self._on_endpoints_updated) def _open_monitoring(self): AIMonitoringWindow(self._root) def _open_usage(self): UsageWindow(self._root) def _open_history(self): RequestHistoryWindow(self._root) def _open_benchmark(self): BenchmarkWindow(self._root) def _open_assistant(self): assist_path = str(Path(__file__).resolve().parent / "flet-codex-assist.py") if Path(assist_path).exists(): subprocess.Popen([sys.executable, assist_path], creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if IS_WINDOWS else 0) def _edit_oauth_secrets(self): data = load_oauth_secrets() if not data: data = {"antigravity": {"client_id": "", "client_secret": ""}, "gemini_cli": {"client_id": "", "client_secret": ""}} dlg = tk.Toplevel(self._root) dlg.title("OAuth 2.0 Client Secrets") dlg.geometry("600x450") dlg.transient(self._root) dlg.grab_set() frame = ttk.Frame(dlg, padding=16) frame.pack(fill="both", expand=True) ttk.Label(frame, text="Google OAuth 2.0 credentials", font=("Segoe UI", 10, "bold")).pack(anchor="w") ttk.Label(frame, text=f"Stored locally in {OAUTH_SECRETS_PATH}", foreground="gray").pack(anchor="w", pady=(0, 8)) fields = {} nf = ttk.Frame(frame) nf.pack(fill="x") row = 0 for section_key, section_label in [("antigravity", "Antigravity (CloudCode)"), ("gemini_cli", "Gemini CLI")]: ttk.Label(nf, text=f"\n{section_label}", font=("Segoe UI", 9, "bold")).grid(row=row, column=0, columnspan=3, sticky="w", pady=(8, 2)) row += 1 sec = data.get(section_key, {}) import_btn = ttk.Button(nf, text="Import JSON", command=lambda sk=section_key: self._import_oauth_json(fields, sk)) import_btn.grid(row=row, column=2, padx=(4, 0), pady=2, sticky="e") for fk, fl in [("client_id", "Client ID"), ("client_secret", "Client Secret")]: ttk.Label(nf, text=fl + ":").grid(row=row, column=0, sticky="w", padx=(8, 4), pady=2) entry = ttk.Entry(nf, width=60) entry.insert(0, sec.get(fk, "")) entry.grid(row=row, column=1, sticky="ew", pady=2) if fk == "client_secret": entry.configure(show="*") fields[(section_key, fk)] = entry row += 1 nf.columnconfigure(1, weight=1) ttk.Label(frame, text="\nImport a client_secret_*.json from Google Cloud Console\nconsole.cloud.google.com → Credentials", foreground="gray").pack(anchor="w") btnf = ttk.Frame(frame) btnf.pack(fill="x", pady=(12, 0)) ttk.Button(btnf, text="Cancel", command=dlg.destroy).pack(side="right", padx=(4, 0)) save_btn = ttk.Button(btnf, text="Save") save_btn.pack(side="right", padx=(4, 0)) def _save(): for (sk, fk), entry in fields.items(): if sk not in data: data[sk] = {} data[sk][fk] = entry.get().strip() try: save_oauth_secrets(data) except Exception as e: messagebox.showerror("Save failed", str(e), parent=dlg) return dlg.destroy() save_btn.configure(command=_save) def _import_oauth_json(self, fields, section_key): path = filedialog.askopenfilename( title="Import Google OAuth Client Secret JSON", filetypes=[("JSON files", "*.json")]) if not path: return try: with open(path, encoding="utf-8") as f: raw = json.load(f) creds = raw.get("installed") or raw.get("web") or raw cid = creds.get("client_id", "") csec = creds.get("client_secret", "") if not cid or not csec: raise ValueError("JSON does not contain client_id and client_secret") if (section_key, "client_id") in fields: fields[(section_key, "client_id")].delete(0, "end") fields[(section_key, "client_id")].insert(0, cid) if (section_key, "client_secret") in fields: fields[(section_key, "client_secret")].delete(0, "end") fields[(section_key, "client_secret")].insert(0, csec) except Exception as e: messagebox.showerror("Import failed", str(e)) # ── Watcher ────────────────────────────────────────────────────── def _start_watcher(self): cfg = load_monitoring_config() if not cfg.get("enabled"): return self._watcher = HealthWatcher( on_failure=lambda c: self.log(f"[AI Monitor] Proxy unresponsive (failures={c})"), on_recovery=lambda: self.log("[AI Monitor] Proxy recovered"), on_signal=lambda fid, cat, line: None, on_action=self._on_watcher_action, ) self._watcher.start() self.log("AI Monitoring: watchdog started") def _on_watcher_action(self, action, trigger): cfg = load_monitoring_config() if action == "restart_proxy" and cfg.get("auto_restart_proxy"): self.log(f"[AI Monitor] Auto-restarting proxy (trigger: {trigger})") self._root.after(0, self._restart_proxy_from_watcher) elif action in ("clear_schema_cache", "delete_provider_caps"): try: cap_file = PROXY_CONFIG_DIR / "provider-caps.json" if cap_file.exists(): cap_file.unlink() self.log("[AI Monitor] Cleared corrupt schema cache") except Exception as e: self.log(f"[AI Monitor] Failed to clear cache: {e}") elif action == "kill_stale_restart": self.log(f"[AI Monitor] Killing stale processes + restarting (trigger: {trigger})") self._kill() self._root.after(0, self._restart_proxy_from_watcher) else: self.log(f"[AI Monitor] Alert: {action} (trigger: {trigger})") def _restart_proxy_from_watcher(self): try: ep_name = load_endpoints().get("default") if not ep_name: return for ep in load_endpoints().get("endpoints", []): if ep.get("name") == ep_name: start_proxy_for(ep, self.log) break except Exception as e: self.log(f"[AI Monitor] Proxy restart failed: {e}") # ── Profile operations ─────────────────────────────────────────── def _backup_profile(self): filename = filedialog.asksaveasfilename( title="Backup Codex Profile", defaultextension=".json", initialfile=f"codex-profile-{time.strftime('%Y%m%d-%H%M%S')}.json", filetypes=[("JSON files", "*.json"), ("All files", "*.*")], ) if not filename: return try: save_profile_bundle(filename) self.log(f"Profile backed up to {filename}") except Exception as e: messagebox.showerror("Backup Failed", str(e)) def _refresh_all_models(self): if self._refresh_running: return self._refresh_running = True self._refresh_all_btn.configure(state="disabled") self.log("Refreshing models for all providers...") threading.Thread(target=self._refresh_all_models_worker, daemon=True).start() def _refresh_all_models_worker(self): try: data = load_endpoints() updated = 0 failed = [] for idx, ep in enumerate(list(data["endpoints"])): refreshed, err = refresh_endpoint_models(ep) if refreshed: data["endpoints"][idx] = refreshed updated += 1 else: failed.append(f"{ep['name']}: {err}") if updated: save_endpoints(data) self._root.after(0, lambda: self._finish_refresh(updated, failed)) except Exception as e: self._root.after(0, lambda: self._finish_refresh_error(str(e))) def _finish_refresh(self, updated, failed): if updated: self._rebuild_combo() self.log(f"Refreshed models for {updated} provider(s)") if failed: messagebox.showwarning("Refresh", "Some providers could not auto-fetch models.\n\n" + "\n".join(failed)) elif updated: messagebox.showinfo("Refresh", f"Refreshed models for {updated} provider(s).") else: messagebox.showinfo("Refresh", "No providers were refreshed.") self._refresh_running = False self._refresh_all_btn.configure(state="normal") def _finish_refresh_error(self, err): messagebox.showerror("Refresh Failed", err) self._refresh_running = False self._refresh_all_btn.configure(state="normal") def _import_profile(self): if self._proc and self._proc.poll() is None: messagebox.showwarning("Import", "Stop Codex before importing a profile.") return filename = filedialog.askopenfilename( title="Import Codex Profile", filetypes=[("JSON files", "*.json"), ("All files", "*.*")], ) if not filename: return if not messagebox.askyesno("Import", "Importing will replace the current endpoints and Codex config. Continue?"): return try: import_profile_bundle(filename) self._rebuild_combo() self.log(f"Profile imported from {filename}") messagebox.showinfo("Import", "Profile imported successfully.") except Exception as e: messagebox.showerror("Import Failed", str(e)) # ── Dialogs ────────────────────────────────────────────────────── def _show_changelog(self): dlg = tk.Toplevel(self._root) dlg.title("Changelog") dlg.geometry("540x480") dlg.transient(self._root) text = scrolledtext.ScrolledText(dlg, wrap="word", font=("Segoe UI", 9)) text.pack(fill="both", expand=True, padx=12, pady=12) for ver, date, items in CHANGELOG: text.insert("end", f"v{ver} ({date})\n") for item in items: text.insert("end", f" • {item}\n") text.insert("end", "\n") text.configure(state="disabled") ttk.Button(dlg, text="Close", command=dlg.destroy).pack(pady=(0, 10)) def _show_install_guide(self, which): if which == "cli": guide = ("Codex CLI is required to use CLI launch features.\n\n" "Install with npm:\n npm install -g @openai/codex\n\n" "Or download from:\n https://github.com/openai/codex\n\n" "After installing, restart the launcher.") else: guide = ("Codex Desktop is required to use Desktop launch features.\n\n" "Download from:\n https://codex.desktop.openai.com\n\n" "After installing, restart the launcher.") messagebox.showinfo(f"Install Codex {which.title()}", guide) # ── Launch ─────────────────────────────────────────────────────── def _set_busy(self, busy): has_cli = "cli" not in self._missing has_desk = "desktop" not in self._missing def _update(): self._btn_desktop.configure(state="disabled" if busy or not has_desk else "normal") self._btn_cli.configure(state="disabled" if busy or not has_cli else "normal") self._btn_codex_desktop.configure(state="disabled" if busy or not has_desk else "normal") self._btn_codex_cli.configure(state="disabled" if busy or not has_cli else "normal") self._kill_btn.configure(state="normal" if busy else "disabled") self._root.after(0, _update) def _launch(self, target): name = self._combo_ep.get() if not name: self.log("ERROR: no endpoint selected") return model = self._combo_model.get() if not model: self.log("ERROR: no model selected") return is_bgp = name.startswith("\U0001F500 ") if is_bgp: pool_name = name[2:] pool = None for p in load_bgp_pools().get("pools", []): if p["name"] == pool_name: pool = p break if not pool: self.log(f"ERROR: BGP pool '{pool_name}' not found") return self._set_busy(True) target_name = "Desktop" if target == "desktop" else "CLI" self.log(f"=== BGP: {pool_name} / {model} -> {target_name} ===") threading.Thread(target=self._run_bgp, args=(pool, model, target), daemon=True).start() return ep = get_endpoint(name) if not ep: self.log("ERROR: endpoint not found") return self._set_busy(True) target_name = "Desktop" if target == "desktop" else "CLI" self.log(f"=== {ep['name']} / {model} -> {target_name} ===") threading.Thread(target=self._run, args=(ep, model, target), daemon=True).start() def _launch_codex_default(self, target): if "cli" not in self._missing: status, msg = check_codex_auth() if status != "logged_in": if not messagebox.askyesno("Auth Warning", f"Codex auth check: {msg}\n\n" "Launch may fail without valid authentication.\nContinue anyway?"): self._set_busy(False) return self._set_busy(True) target_name = "Desktop" if target == "desktop" else "CLI" self.log(f"=== Codex Default (OAuth) -> {target_name} ===") threading.Thread(target=self._run_codex_default, args=(target,), daemon=True).start() def _run(self, ep, model, target): keep_session_alive = False try: self.log("Cleaning up stale processes...") safe_cleanup_owned(self.log) recover_config_if_needed(self.log) needs_proxy = ep["backend_type"] != "native" if needs_proxy: self.log("Starting translation proxy...") try: proxy_port = start_proxy_for(ep, self.log) except RuntimeError as e: self._root.after(0, lambda: messagebox.showerror("Proxy Failed", str(e))) return self.log(f"Configuring Codex for {ep['name']} (proxied on :{proxy_port})...") begin_config_transaction(f"launch:{ep['name']}") write_config_for_translated(ep, model, proxy_port) else: self.log(f"Configuring Codex for {ep['name']} (native)...") begin_config_transaction(f"launch:{ep['name']}") write_config_for_native(ep, model) if target == "desktop": if needs_proxy: kill_existing_desktop(self.log) keep_session_alive = self._launch_desktop(ep, model) else: self._launch_cli(ep, model) except Exception as e: self.log(f"ERROR: {e}") finally: if keep_session_alive: self.log("Warm-start handoff detected; keeping proxy/config active for running Desktop.") self._set_busy(False) self.log("Ready. Use Kill && Cleanup when finished.") else: stop_proxy() restore_config() end_config_transaction() self._set_busy(False) self.log("Ready.") def _run_bgp(self, pool, model, target): keep_session_alive = False try: self.log("Cleaning up stale processes...") safe_cleanup_owned(self.log) recover_config_if_needed(self.log) self.log(f"Starting BGP proxy with {len(pool.get('routes', []))} routes...") port, bgp_ep = start_bgp_proxy(pool, model, self.log) begin_config_transaction(f"launch:bgp:{pool['name']}") write_config_for_translated(bgp_ep, model, port) if target == "desktop": kill_existing_desktop(self.log) keep_session_alive = self._launch_desktop(bgp_ep, model) else: self._launch_cli(bgp_ep, model) except Exception as e: self.log(f"ERROR: {e}") finally: if keep_session_alive: self.log("Warm-start handoff detected; keeping proxy/config active.") self._set_busy(False) self.log("Ready. Use Kill && Cleanup when finished.") else: stop_proxy() restore_config() end_config_transaction() self._set_busy(False) self.log("Ready.") def _run_codex_default(self, target): try: self.log("Cleaning up stale processes...") safe_cleanup_owned(self.log) stop_proxy() recover_config_if_needed(self.log) self.log("Resetting config to Codex defaults (OAuth)...") begin_config_transaction("launch:default") if CONFIG.exists(): CONFIG.unlink() if target == "desktop": self._launch_desktop_direct() else: self._launch_cli_default() except Exception as e: self.log(f"ERROR: {e}") finally: restore_config() end_config_transaction() self._set_busy(False) self.log("Ready.") def _launch_desktop(self, ep, model): desktop_path = self._desktop_info if not desktop_path: self.log("ERROR: Codex Desktop not found") return False if IS_WINDOWS: self._proc = subprocess.Popen( [desktop_path], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP) else: self._proc = subprocess.Popen( [desktop_path], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, preexec_fn=os.setsid) pid = self._proc.pid self.log(f"Desktop started (PID {pid})") self.log(f"Log: {LAUNCH_LOG}") t0 = time.time() stall_warned = False while self._proc and self._proc.poll() is None: time.sleep(1.5) el = time.time() - t0 if el > 20 and not stall_warned: self.log("Still starting after 20s -- possible stall. Click Kill if window doesn't appear.") self.log(f"--- last log lines ---\n{last_log_lines()}") stall_warned = True if self._proc: rc = self._proc.poll() el = time.time() - t0 self.log(f"Desktop exited (code {rc}) after {el:.0f}s") if el < 12: self.log("TIP: Quick exit -- may be warm-start handoff (normal) or crash.") last_lines = last_log_lines() self.log(f"--- last log lines ---\n{last_lines}") if rc == 0 and "warm-start" in last_lines.lower(): self._proc = None return True self._proc = None return False def _launch_cli(self, ep, model): self.log(f"Launching Codex CLI with {ep['name']}...") term = detect_terminal() if not term: self.log("ERROR: no terminal found") return term_name, term_args, _ = term cmd_parts = [term_name] + term_args if ep["backend_type"] == "native": cmd_parts.extend(["codex", "-c", f"model={model}"]) else: cmd_parts.extend(["codex", "--profile", ep["name"], "-c", f"model={model}"]) self.log(f"Running: {' '.join(cmd_parts)}") if IS_WINDOWS: self._proc = subprocess.Popen(cmd_parts, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP) else: self._proc = subprocess.Popen(cmd_parts, preexec_fn=os.setsid) pid = self._proc.pid self.log(f"CLI started in terminal (PID {pid})") while self._proc and self._proc.poll() is None: time.sleep(1.5) if self._proc: rc = self._proc.poll() self.log(f"CLI exited (code {rc})") self._proc = None def _launch_desktop_direct(self): self.log("Launching Codex Desktop (default OAuth)...") desktop_path = self._desktop_info if not desktop_path: self.log("ERROR: Codex Desktop not found") return if IS_WINDOWS: self._proc = subprocess.Popen( [desktop_path], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP) else: self._proc = subprocess.Popen( [desktop_path], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, preexec_fn=os.setsid) pid = self._proc.pid self.log(f"Desktop started (PID {pid})") t0 = time.time() stall_warned = False while self._proc and self._proc.poll() is None: time.sleep(1.5) el = time.time() - t0 if el > 20 and not stall_warned: self.log("Still starting after 20s -- possible stall.") self.log(f"--- last log lines ---\n{last_log_lines()}") stall_warned = True if self._proc: rc = self._proc.poll() el = time.time() - t0 self.log(f"Desktop exited (code {rc}) after {el:.0f}s") self._proc = None def _launch_cli_default(self): self.log("Launching Codex CLI (default OAuth)...") term = detect_terminal() if not term: self.log("ERROR: no terminal found") return term_name, term_args, _ = term cmd_parts = [term_name] + term_args + ["codex"] self.log(f"Running: {' '.join(cmd_parts)}") if IS_WINDOWS: self._proc = subprocess.Popen(cmd_parts, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP) else: self._proc = subprocess.Popen(cmd_parts, preexec_fn=os.setsid) pid = self._proc.pid self.log(f"CLI started in terminal (PID {pid})") while self._proc and self._proc.poll() is None: time.sleep(1.5) if self._proc: rc = self._proc.poll() self.log(f"CLI exited (code {rc})") self._proc = None # ── Kill ───────────────────────────────────────────────────────── def _kill(self): self.log("=== Killing ===") if self._proc and self._proc.poll() is None: try: if IS_WINDOWS: subprocess.run(["taskkill", "/F", "/T", "/PID", str(self._proc.pid)], capture_output=True, timeout=10) else: import signal as sig pgid = os.getpgid(self._proc.pid) os.killpg(pgid, sig.SIGTERM) time.sleep(1) if self._proc.poll() is None: os.killpg(pgid, sig.SIGKILL) except (ProcessLookupError, PermissionError): pass self._proc = None stop_proxy() safe_cleanup_owned(self.log) restore_config() end_config_transaction() LOG_DIR.mkdir(parents=True, exist_ok=True) if LAUNCH_LOG.exists(): try: LAUNCH_LOG.unlink() except Exception: pass self.log("Cleanup complete") self._set_busy(False) self.log("Ready.") def _do_close(self): if self._proc and self._proc.poll() is None: if not messagebox.askyesno("Confirm", "Codex is still running. Kill it?"): return self._kill() stop_proxy() self._root.destroy() # ═══════════════════════════════════════════════════════════════════════ # Entry point # ═══════════════════════════════════════════════════════════════════════ if __name__ == "__main__": ensure_dirs() create_default_endpoints() root = tk.Tk() root.title("Codex Launcher") root.geometry("800x680") root.minsize(640, 520) app = LauncherWin(root) root.mainloop()