Files
Codex-Launcher---Any-AI-Por…/src/codex-launcher-gui.py
cobra91 88e0183ddc fix: GUI improvements, CROF gate, data dir consolidation, sticky proxy port
- GUI: add Clear Log, Restart Proxy, View Log (opens requests.log) buttons
- CROF: skip entirely unless TARGET_URL contains crof.ai (no logs pollution)
- Consolidate all data dirs into codex-proxy (remove codex-desktop, codex-launcher)
- Proxy port persists across restarts via .last-proxy-port file
- Adaptive compact budget raised from 60% to 80% context window
- Startup config cleanup moved after _init_runtime() to avoid deleting active config
2026-05-25 15:43:49 +02:00

2822 lines
126 KiB
Python

#!/usr/bin/env python3
"""Codex Launcher GUI (tkinter) — manage endpoints, launch Desktop or CLI with any provider.
Windows-native tkinter GUI mirroring all features of the GTK version.
Imports process management, config engine, proxy lifecycle from codex_launcher_lib.
"""
import tkinter as tk
from tkinter import ttk, filedialog, messagebox, scrolledtext
import json
import os
import shutil
import socket
import ssl
import subprocess
import sys
import threading
import time
import urllib.error
import urllib.parse
import urllib.request
import base64
import hashlib
import secrets
import http.server
import collections
from pathlib import Path
from codex_launcher_lib import (
IS_WINDOWS, HOME, CONFIG, CONFIG_BAK, CONFIG_TXN,
ENDPOINTS_FILE, BGP_POOLS_FILE, LAUNCH_LOG, LOG_DIR,
PROXY_CONFIG_DIR, BIN_DIR, PROXY, CLEANUP, PID_REGISTRY,
PROVIDER_PRESETS, CHANGELOG, DEFAULT_CONFIG, OAUTH_SECRETS_PATH,
ANTIGRAVITY_MODELS,
safe_name, label_for_backend, normalize_model_id, normalize_base_url,
parse_model_list, now_utc_iso, apply_provider_preset,
load_endpoints, save_endpoints, load_bgp_pools, save_bgp_pools,
get_endpoint, build_profile_bundle, save_profile_bundle, import_profile_bundle,
backup_config, restore_config, begin_config_transaction, end_config_transaction,
recover_config_if_needed, write_config_for_native, write_config_for_translated,
endpoint_models_url, endpoint_model_headers, fetch_models_for_endpoint,
refresh_endpoint_models, run_endpoint_doctor,
detect_codex_cli, detect_codex_desktop, check_codex_auth,
last_log_lines, kill_existing_desktop, safe_cleanup_owned,
start_proxy_for, stop_proxy, start_bgp_proxy, get_proxy_state, set_proxy_state,
detect_terminal, open_url, open_file, write_secure_text,
ensure_dirs, create_default_endpoints,
load_monitoring_config, save_monitoring_config,
load_incident_store, save_incident_store, load_usage_stats,
monitoring_log,
IncidentStore, AIDiagnosticAgent, HealthWatcher,
load_oauth_secrets, save_oauth_secrets,
_usage_theme, UA,
)
# ═══════════════════════════════════════════════════════════════════════
# Helpers
# ═══════════════════════════════════════════════════════════════════════
def _fmt_tok(n):
if n >= 1_000_000:
return f"{n/1_000_000:.1f}M"
if n >= 1_000:
return f"{n/1_000:.1f}K"
return str(n)
def _fmt_dur(s):
if s >= 3600:
return f"{s/3600:.1f}h"
if s >= 60:
return f"{s/60:.1f}m"
return f"{s:.1f}s"
def _status_pill(success_rate, fail_pct):
U = _usage_theme()
if fail_pct > 0.15:
return ("ERR", U["red"])
if fail_pct > 0.05:
return ("WARN", U["yellow"])
return ("OK", U["green"])
def _show_doctor_results_tk(parent, ep_name, checks):
dlg = tk.Toplevel(parent)
dlg.title(f"Doctor: {ep_name}")
dlg.geometry("520x420")
dlg.transient(parent)
dlg.grab_set()
passed = sum(1 for _, ok, _ in checks if ok is True)
failed = sum(1 for _, ok, _ in checks if ok is False)
warned = sum(1 for _, ok, _ in checks if ok is None)
hdr = tk.Label(dlg, text=f"{ep_name} {passed} passed {failed} failed {warned} warnings",
font=("Segoe UI", 10, "bold"))
hdr.pack(padx=12, pady=(12, 4), anchor="w")
ttk.Separator(dlg).pack(fill="x", padx=12)
canvas = tk.Canvas(dlg)
scrollbar = ttk.Scrollbar(dlg, orient="vertical", command=canvas.yview)
inner = tk.Frame(canvas)
inner.bind("<Configure>", lambda e: canvas.configure(scrollregion=canvas.bbox("all")))
canvas.create_window((0, 0), window=inner, anchor="nw")
canvas.configure(yscrollcommand=scrollbar.set)
for name, ok, detail in checks:
row = tk.Frame(inner)
row.pack(fill="x", padx=12, pady=1)
if ok is True:
color, sym = "#27ae60", ""
elif ok is False:
color, sym = "#e74c3c", ""
else:
color, sym = "#f39c12", ""
tk.Label(row, text=sym, fg=color, font=("Segoe UI", 11, "bold")).pack(side="left")
tk.Label(row, text=name, font=("Segoe UI", 9, "bold")).pack(side="left", padx=(4, 0))
if detail:
tk.Label(row, text=detail, fg="#7f8c8d", font=("Segoe UI", 8)).pack(side="right")
canvas.pack(side="left", fill="both", expand=True, padx=(12, 0), pady=6)
scrollbar.pack(side="right", fill="y", pady=6)
btn_frame = tk.Frame(dlg)
btn_frame.pack(pady=(0, 10))
ttk.Button(btn_frame, text="Close", command=dlg.destroy).pack()
# ═══════════════════════════════════════════════════════════════════════
# EditEndpointDialog
# ═══════════════════════════════════════════════════════════════════════
class EditEndpointDialog:
def __init__(self, parent, existing_name=None):
self.result = False
self._existing_name = existing_name
self._parent_mgr = parent
if existing_name:
self._data = get_endpoint(existing_name) or {}
else:
self._data = {
"name": "", "backend_type": "openai-compat",
"base_url": "", "api_key": "", "default_model": "",
"models": [], "provider_preset": "Custom",
}
self._dlg = tk.Toplevel(parent)
title = "Edit Endpoint" if existing_name else "Add Endpoint"
self._dlg.title(title)
self._dlg.geometry("520x600")
self._dlg.transient(parent)
self._dlg.grab_set()
main = ttk.Frame(self._dlg, padding=12)
main.pack(fill="both", expand=True)
grid = ttk.Frame(main)
grid.pack(fill="x")
row_idx = [0]
def add_field(label, widget_factory):
ttk.Label(grid, text=label).grid(row=row_idx[0], column=0, sticky="e", padx=(0, 6), pady=2)
w = widget_factory()
w.grid(row=row_idx[0], column=1, sticky="ew", pady=2)
row_idx[0] += 1
return w
self._entry_name = add_field("Name:", lambda: ttk.Entry(grid))
self._entry_name.insert(0, self._data.get("name", ""))
self._combo_preset = ttk.Combobox(grid, values=list(PROVIDER_PRESETS.keys()), state="readonly")
preset = self._data.get("provider_preset", "Custom")
self._combo_preset.set(preset)
add_field("Preset:", lambda: self._combo_preset)
self._combo_preset.bind("<<ComboboxSelected>>", lambda e: self._apply_selected_preset(initial=False))
backend_types = [
("openai-compat", "OpenAI-compatible (needs proxy)"),
("anthropic", "Anthropic (needs proxy)"),
("command-code", "Command Code (needs proxy)"),
("freebuff", "Freebuff - Free DeepSeek/Kimi (needs proxy)"),
("gemini-oauth-cli", "Gemini CLI OAuth (needs proxy)"),
("gemini-oauth-antigravity", "Antigravity OAuth (needs proxy)"),
("native", "Native OpenAI (no proxy)"),
]
self._combo_type = ttk.Combobox(grid, values=[f"{v} - {l}" for v, l in backend_types], state="readonly")
bt = self._data.get("backend_type", "openai-compat")
bt_display = next((f"{v} - {l}" for v, l in backend_types if v == bt), backend_types[0][0] + " - " + backend_types[0][1])
self._combo_type.set(bt_display)
add_field("Type:", lambda: self._combo_type)
self._bt_map = {f"{v} - {l}": v for v, l in backend_types}
self._entry_url = add_field("Base URL:", lambda: ttk.Entry(grid))
self._entry_url.insert(0, self._data.get("base_url", ""))
key_frame = ttk.Frame(grid)
self._entry_key = ttk.Entry(key_frame, show="*")
self._entry_key.pack(side="left", fill="x", expand=True)
self._entry_key.insert(0, self._data.get("api_key", ""))
self._reveal_var = tk.BooleanVar(value=False)
ttk.Checkbutton(key_frame, text="Show", variable=self._reveal_var,
command=lambda: self._entry_key.configure(show="" if self._reveal_var.get() else "*")).pack(side="left", padx=(4, 0))
self._oauth_btn = ttk.Button(key_frame, text="OAuth Login", command=self._do_oauth_login)
self._oauth_btn.pack(side="left", padx=(4, 0))
add_field("API Key:", lambda: key_frame)
self._entry_cc_ver = add_field("CC Version:", lambda: ttk.Entry(grid))
self._entry_cc_ver.insert(0, self._data.get("cc_version", ""))
reason_frame = ttk.Frame(grid)
self._reason_var = tk.BooleanVar(value=self._data.get("reasoning_enabled", True))
self._reason_cb = ttk.Checkbutton(reason_frame, text="Reasoning ON", variable=self._reason_var,
command=self._on_reasoning_toggled)
self._reason_cb.pack(side="left")
self._combo_effort = ttk.Combobox(reason_frame, values=["none", "minimal", "low", "medium", "high", "max"],
state="readonly", width=10)
self._combo_effort.set(self._data.get("reasoning_effort", "medium"))
self._combo_effort.pack(side="left", padx=(8, 0))
ttk.Label(reason_frame, text="Effort").pack(side="left", padx=(4, 0))
add_field("Reasoning:", lambda: reason_frame)
self._on_reasoning_toggled()
grid.columnconfigure(1, weight=1)
ttk.Label(main, text="Models:").pack(anchor="w", pady=(8, 2))
model_input_frame = ttk.Frame(main)
model_input_frame.pack(fill="x")
self._entry_model = ttk.Entry(model_input_frame)
self._entry_model.pack(side="left", fill="x", expand=True)
ttk.Button(model_input_frame, text="Add", command=self._add_model).pack(side="left", padx=(4, 0))
ttk.Button(model_input_frame, text="Bulk Add", command=self._add_models_from_text).pack(side="left", padx=(4, 0))
ttk.Button(model_input_frame, text="Fetch from API", command=self._fetch_models).pack(side="left", padx=(4, 0))
ttk.Button(model_input_frame, text="Test Endpoint", command=self._diagnose_endpoint).pack(side="left", padx=(4, 0))
ttk.Label(main, text="Bulk add (one per line or comma-separated):").pack(anchor="w", pady=(4, 0))
self._bulk_text = tk.Text(main, height=3, wrap="word")
self._bulk_text.pack(fill="x", pady=(2, 4))
list_frame = ttk.Frame(main)
list_frame.pack(fill="both", expand=True)
self._model_listbox = tk.Listbox(list_frame, height=6)
sb = ttk.Scrollbar(list_frame, orient="vertical", command=self._model_listbox.yview)
self._model_listbox.configure(yscrollcommand=sb.set)
self._model_listbox.pack(side="left", fill="both", expand=True)
sb.pack(side="right", fill="y")
self._model_listbox.bind("<Double-Button-1>", lambda e: self._remove_selected_model())
for m in self._data.get("models", []):
self._model_listbox.insert("end", m)
default_frame = ttk.Frame(main)
default_frame.pack(fill="x", pady=(4, 0))
ttk.Label(default_frame, text="Default Model:").pack(side="left")
self._combo_default = ttk.Combobox(default_frame, state="readonly")
self._combo_default.pack(side="left", fill="x", expand=True, padx=(6, 0))
self._refresh_default_combo()
dm = self._data.get("default_model", "")
if dm:
self._combo_default.set(dm)
self._apply_selected_preset(initial=True)
btn_frame = ttk.Frame(main)
btn_frame.pack(fill="x", pady=(8, 0))
ttk.Button(btn_frame, text="Cancel", command=self._cancel).pack(side="right")
ttk.Button(btn_frame, text="Save", command=self._save).pack(side="right", padx=(8, 0))
def _on_reasoning_toggled(self):
state = "readonly" if self._reason_var.get() else "disabled"
self._combo_effort.configure(state=state)
def _apply_selected_preset(self, initial=False):
preset_name = self._combo_preset.get() or "Custom"
preset = PROVIDER_PRESETS.get(preset_name, {})
is_oauth = bool(preset.get("oauth_provider"))
self._oauth_btn.configure(state="normal" if is_oauth else "disabled")
if not initial or self._existing_name is None:
bt = preset.get("backend_type", "openai-compat")
bt_display = next((k for k, v in self._bt_map.items() if v == bt), list(self._bt_map.keys())[0])
self._combo_type.set(bt_display)
self._entry_url.delete(0, "end")
self._entry_url.insert(0, preset.get("base_url", ""))
cc_ver = preset.get("cc_version", "")
if cc_ver and not self._entry_cc_ver.get().strip():
self._entry_cc_ver.delete(0, "end")
self._entry_cc_ver.insert(0, cc_ver)
if preset.get("models") and self._model_listbox.size() == 0:
self._model_listbox.delete(0, "end")
for mid in preset["models"]:
self._model_listbox.insert("end", mid)
self._refresh_default_combo()
if preset["models"]:
self._combo_default.set(preset["models"][0])
def _add_model(self):
m = normalize_model_id(self._entry_model.get())
if m:
self._model_listbox.insert("end", m)
self._refresh_default_combo()
self._entry_model.delete(0, "end")
def _add_models_from_text(self):
text = self._bulk_text.get("1.0", "end")
models = parse_model_list(text)
existing = set(self._model_listbox.get(i) for i in range(self._model_listbox.size()))
for mid in models:
if mid not in existing:
self._model_listbox.insert("end", mid)
self._bulk_text.delete("1.0", "end")
self._refresh_default_combo()
def _remove_selected_model(self):
sel = self._model_listbox.curselection()
if sel:
self._model_listbox.delete(sel[0])
self._refresh_default_combo()
def _refresh_default_combo(self):
models = list(self._model_listbox.get(i) for i in range(self._model_listbox.size()))
current = self._combo_default.get()
self._combo_default["values"] = models
if current in models:
self._combo_default.set(current)
elif models:
self._combo_default.set(models[0])
else:
self._combo_default.set("")
def _fetch_models(self):
ep = self._make_endpoint_snapshot()
ids, err = fetch_models_for_endpoint(ep)
if ids:
existing = set(self._model_listbox.get(i) for i in range(self._model_listbox.size()))
for mid in ids:
if mid not in existing:
self._model_listbox.insert("end", mid)
self._refresh_default_combo()
else:
messagebox.showerror("Fetch Models", f"Failed:\n{err}", parent=self._dlg)
def _diagnose_endpoint(self):
ep = self._make_endpoint_snapshot()
wait = tk.Toplevel(self._dlg)
wait.title("Running Doctor...")
wait.geometry("280x80")
wait.transient(self._dlg)
wait.grab_set()
tk.Label(wait, text="Running endpoint diagnostics...").pack(expand=True)
def _run():
checks = run_endpoint_doctor(ep)
self._dlg.after(0, lambda: (wait.destroy(), _show_doctor_results_tk(self._dlg, ep.get("default_model", "endpoint"), checks)))
threading.Thread(target=_run, daemon=True).start()
def _make_endpoint_snapshot(self):
bt_display = self._combo_type.get()
bt = self._bt_map.get(bt_display, "openai-compat")
return {
"base_url": self._entry_url.get().strip(),
"api_key": self._entry_key.get().strip(),
"backend_type": bt,
"default_model": self._combo_default.get() or "",
}
def _do_oauth_login(self):
preset_name = self._combo_preset.get() or "Custom"
preset = PROVIDER_PRESETS.get(preset_name, {})
provider = preset.get("oauth_provider", "")
if (provider or "").startswith("google"):
self._google_oauth_flow(provider)
def _google_oauth_flow(self, oauth_provider="google-cli"):
is_antigravity = oauth_provider == "google-antigravity"
token_path = str(PROXY_CONFIG_DIR / ("google-antigravity-oauth-token.json" if is_antigravity else "google-cli-oauth-token.json"))
_sec = load_oauth_secrets().get("antigravity" if is_antigravity else "gemini_cli", {})
CLIENT_ID = _sec.get("client_id", "")
CLIENT_SECRET = _sec.get("client_secret", "")
if is_antigravity:
SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
"https://www.googleapis.com/auth/userinfo.email",
"https://www.googleapis.com/auth/userinfo.profile",
"https://www.googleapis.com/auth/cclog",
"https://www.googleapis.com/auth/experimentsandconfigs",
]
port = 51121
redirect_uri = f"http://localhost:{port}/oauth-callback"
callback_path = "/oauth-callback"
provider_kind = "antigravity"
else:
SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
"https://www.googleapis.com/auth/userinfo.email",
"https://www.googleapis.com/auth/userinfo.profile",
]
port = 0
redirect_uri = None
callback_path = "/oauth2callback"
provider_kind = "cli"
state = secrets.token_hex(32)
verifier = secrets.token_urlsafe(64)
challenge = base64.urlsafe_b64encode(hashlib.sha256(verifier.encode()).digest()).rstrip(b"=").decode()
if port == 0:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
port = s.getsockname()[1]
redirect_uri = f"http://127.0.0.1:{port}/oauth2callback"
scope_str = " ".join(SCOPES)
auth_url = (
f"https://accounts.google.com/o/oauth2/v2/auth?"
f"client_id={CLIENT_ID}"
f"&redirect_uri={urllib.parse.quote(redirect_uri)}"
f"&response_type=code"
f"&scope={urllib.parse.quote(scope_str)}"
f"&access_type=offline"
f"&prompt=select_account%20consent"
f"&state={state}"
f"&code_challenge={challenge}"
f"&code_challenge_method=S256"
)
oauth_dlg = tk.Toplevel(self._dlg)
oauth_dlg.title("Google OAuth (Gemini Mode)")
oauth_dlg.geometry("520x280")
oauth_dlg.transient(self._dlg)
oauth_dlg.grab_set()
tk.Label(oauth_dlg, text="Sign in with Google", font=("Segoe UI", 11, "bold")).pack(padx=16, pady=(12, 0), anchor="w")
tk.Label(oauth_dlg, text=f"Using OAuth credentials from {OAUTH_SECRETS_PATH}").pack(padx=16, anchor="w")
link_lbl = tk.Label(oauth_dlg, text="Click here to open Google authorization", fg="blue", cursor="hand2")
link_lbl.pack(padx=16, pady=(8, 0), anchor="w")
link_lbl.bind("<Button-1>", lambda e: open_url(auth_url))
self._oauth_status_var = tk.StringVar(value="Opening browser...")
tk.Label(oauth_dlg, textvariable=self._oauth_status_var).pack(padx=16, pady=(8, 0), anchor="w")
code_holder = [None]
error_holder = [None]
received_state = [None]
class OAuthHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self2):
qs = urllib.parse.urlparse(self2.path).query
params = urllib.parse.parse_qs(qs)
received_state[0] = params.get("state", [None])[0]
if self2.path.find(callback_path) == -1:
self2.send_response(302)
self2.send_header("Location", "https://developers.google.com/gemini-code-assist/auth_failure_gemini")
self2.end_headers()
error_holder[0] = "unexpected request"
return
if "code" in params:
if received_state[0] != state:
self2.send_response(400)
self2.send_header("Content-Type", "text/html")
self2.end_headers()
self2.wfile.write(b"<html><body><h2>CSRF state mismatch.</h2></body></html>")
error_holder[0] = "CSRF state mismatch"
return
code_holder[0] = params["code"][0]
self2.send_response(302)
self2.send_header("Location", "https://developers.google.com/gemini-code-assist/auth_success_gemini")
self2.end_headers()
else:
error_holder[0] = params.get("error", ["unknown"])[0]
self2.send_response(302)
self2.send_header("Location", "https://developers.google.com/gemini-code-assist/auth_failure_gemini")
self2.end_headers()
def log_message(self2, fmt, *args):
pass
try:
bind_host = "localhost" if is_antigravity else "127.0.0.1"
server = http.server.HTTPServer((bind_host, port), OAuthHandler)
except OSError:
self._oauth_status_var.set(f"Port {port} already in use -- close other apps and retry.")
return
def wait_for_code():
deadline = time.time() + 120
while code_holder[0] is None and error_holder[0] is None and time.time() < deadline:
server.handle_request()
server.server_close()
if code_holder[0]:
try:
token_data = urllib.parse.urlencode({
"code": code_holder[0], "client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET, "redirect_uri": redirect_uri,
"grant_type": "authorization_code", "code_verifier": verifier,
}).encode()
req = urllib.request.Request("https://oauth2.googleapis.com/token", data=token_data,
headers={"Content-Type": "application/x-www-form-urlencoded"})
resp = urllib.request.urlopen(req, timeout=30)
tokens = json.loads(resp.read())
tokens["client_id"] = CLIENT_ID
tokens["client_secret"] = CLIENT_SECRET
tokens["provider_kind"] = provider_kind
tokens["expires_at"] = time.time() + tokens.get("expires_in", 3600)
os.makedirs(os.path.dirname(token_path), exist_ok=True)
with open(token_path, "w") as f:
json.dump(tokens, f, indent=2)
project_id = ""
try:
lr = urllib.request.Request(
"https://cloudcode-pa.googleapis.com/v1internal:loadCodeAssist",
data=json.dumps({}).encode(),
headers={"Content-Type": "application/json",
"Authorization": f"Bearer {tokens['access_token']}",
"User-Agent": "google-api-nodejs-client/9.15.1"})
lresp = urllib.request.urlopen(lr, timeout=15)
ldata = json.loads(lresp.read())
p = ldata.get("cloudaicompanionProject", "")
if isinstance(p, dict):
project_id = p.get("id", "")
elif isinstance(p, str):
project_id = p
if project_id:
tokens["project_id"] = project_id
with open(token_path, "w") as f2:
json.dump(tokens, f2, indent=2)
except Exception:
pass
found_models = []
if is_antigravity:
found_models = list(ANTIGRAVITY_MODELS)
else:
found_models = ["gemini-2.5-flash", "gemini-2.5-pro"]
if found_models:
tokens["available_models"] = found_models
with open(token_path, "w") as f3:
json.dump(tokens, f3, indent=2)
self._dlg.after(0, lambda: self._oauth_success(oauth_dlg, tokens.get("access_token", "")))
except Exception as e:
self._dlg.after(0, lambda: self._oauth_failed(oauth_dlg, str(e)))
else:
self._dlg.after(0, lambda: self._oauth_failed(oauth_dlg, error_holder[0] or "No authorization code received."))
threading.Thread(target=wait_for_code, daemon=True).start()
open_url(auth_url)
def _oauth_success(self, dlg, access_token):
self._entry_key.delete(0, "end")
self._entry_key.insert(0, access_token)
self._oauth_status_var.set("Authorization successful! Token saved.")
self._dlg.after(1500, dlg.destroy)
def _oauth_failed(self, dlg, msg):
self._oauth_status_var.set(f"Failed: {msg}")
self._dlg.after(3000, dlg.destroy)
def _cancel(self):
self._dlg.destroy()
def _save(self):
name = self._entry_name.get().strip()
if not name:
messagebox.showerror("Error", "Name is required", parent=self._dlg)
return
bt_display = self._combo_type.get()
bt = self._bt_map.get(bt_display, "openai-compat")
url = self._entry_url.get().strip()
key = self._entry_key.get().strip()
models = list(self._model_listbox.get(i) for i in range(self._model_listbox.size()))
if not models:
ep_snap = self._make_endpoint_snapshot()
ids, err = fetch_models_for_endpoint(ep_snap)
if ids:
for mid in ids:
self._model_listbox.insert("end", mid)
self._refresh_default_combo()
models = list(self._model_listbox.get(i) for i in range(self._model_listbox.size()))
else:
r = messagebox.askyesno("No Models", f"Auto-fetch failed ({err}).\n\nAdd models manually now?", parent=self._dlg)
if r:
self._entry_model.focus_set()
return
self._dlg.destroy()
return
if not models:
messagebox.showerror("Error", "At least one model is required", parent=self._dlg)
return
default = self._combo_default.get() or models[0]
data = load_endpoints()
if self._existing_name and self._existing_name != name:
data["endpoints"] = [e for e in data["endpoints"] if e["name"] != self._existing_name]
existing = [e for e in data["endpoints"] if e["name"] == name]
if existing:
messagebox.showerror("Error", f'Endpoint "{name}" already exists', parent=self._dlg)
return
new_ep = {
"name": name, "backend_type": bt, "base_url": normalize_base_url(url),
"api_key": key, "default_model": default, "models": models,
"provider_preset": self._combo_preset.get() or "Custom",
"reasoning_enabled": self._reason_var.get(),
"reasoning_effort": self._combo_effort.get() or "medium",
}
cc_ver = self._entry_cc_ver.get().strip()
if cc_ver:
new_ep["cc_version"] = cc_ver
preset_name = self._combo_preset.get() or "Custom"
preset = PROVIDER_PRESETS.get(preset_name, {})
if preset.get("oauth_provider"):
new_ep["oauth_provider"] = preset["oauth_provider"]
found = False
for i, e in enumerate(data["endpoints"]):
if e["name"] == name:
data["endpoints"][i] = new_ep
found = True
break
if not found:
data["endpoints"].append(new_ep)
if data.get("default") is None:
data["default"] = name
save_endpoints(data)
self.result = True
self._dlg.destroy()
# ═══════════════════════════════════════════════════════════════════════
# EndpointMgr
# ═══════════════════════════════════════════════════════════════════════
class EndpointMgr:
def __init__(self, parent, on_update=None):
self._parent = parent
self._on_update = on_update
self._dlg = tk.Toplevel(parent)
self._dlg.title("Manage Endpoints")
self._dlg.geometry("600x400")
self._dlg.transient(parent)
main = ttk.Frame(self._dlg, padding=12)
main.pack(fill="both", expand=True)
ttk.Label(main, text="Endpoints", font=("Segoe UI", 11, "bold")).pack(anchor="w")
tree_frame = ttk.Frame(main)
tree_frame.pack(fill="both", expand=True, pady=(4, 0))
cols = ("name", "provider", "backend", "default_model")
self._tree = ttk.Treeview(tree_frame, columns=cols, show="headings", selectmode="browse")
for col, heading, width in [("name", "Name", 140), ("provider", "Provider", 160),
("backend", "Type", 140), ("default_model", "Default Model", 140)]:
self._tree.heading(col, text=heading)
self._tree.column(col, width=width, minwidth=80)
sb = ttk.Scrollbar(tree_frame, orient="vertical", command=self._tree.yview)
self._tree.configure(yscrollcommand=sb.set)
self._tree.pack(side="left", fill="both", expand=True)
sb.pack(side="right", fill="y")
btn_frame = ttk.Frame(main)
btn_frame.pack(fill="x", pady=(8, 0))
ttk.Button(btn_frame, text="Add", command=self._add).pack(side="left", padx=(0, 4))
ttk.Button(btn_frame, text="Edit", command=self._edit).pack(side="left", padx=(0, 4))
ttk.Button(btn_frame, text="Delete", command=self._delete).pack(side="left", padx=(0, 4))
ttk.Button(btn_frame, text="Set Default", command=self._set_default).pack(side="left", padx=(0, 4))
ttk.Button(btn_frame, text="Doctor", command=self._doctor_selected).pack(side="left", padx=(0, 4))
ttk.Button(btn_frame, text="Doctor All", command=self._doctor_all).pack(side="left", padx=(0, 4))
ttk.Button(btn_frame, text="Close", command=self._dlg.destroy).pack(side="right")
self._rebuild()
def _rebuild(self):
for item in self._tree.get_children():
self._tree.delete(item)
data = load_endpoints()
for ep in data["endpoints"]:
provider = ep.get("provider_preset", "Custom")
bt = label_for_backend(ep["backend_type"])
self._tree.insert("", "end", values=(ep["name"], provider, bt, ep.get("default_model", "")))
def _selected_name(self):
sel = self._tree.selection()
if not sel:
return None
return self._tree.item(sel[0])["values"][0]
def _add(self):
d = EditEndpointDialog(self._dlg, None)
self._dlg.wait_window(d._dlg)
if d.result:
self._rebuild()
if self._on_update:
self._on_update()
def _edit(self):
name = self._selected_name()
if not name:
return
d = EditEndpointDialog(self._dlg, name)
self._dlg.wait_window(d._dlg)
if d.result:
self._rebuild()
if self._on_update:
self._on_update()
def _delete(self):
name = self._selected_name()
if not name:
return
if not messagebox.askyesno("Delete", f'Delete endpoint "{name}"?', parent=self._dlg):
return
data = load_endpoints()
data["endpoints"] = [e for e in data["endpoints"] if e["name"] != name]
if data.get("default") == name:
data["default"] = data["endpoints"][0]["name"] if data["endpoints"] else None
save_endpoints(data)
self._rebuild()
if self._on_update:
self._on_update()
def _set_default(self):
name = self._selected_name()
if not name:
return
data = load_endpoints()
data["default"] = name
save_endpoints(data)
self._rebuild()
if self._on_update:
self._on_update()
def _doctor_selected(self):
name = self._selected_name()
if not name:
return
ep = get_endpoint(name)
if not ep:
return
wait = tk.Toplevel(self._dlg)
wait.title(f"Doctor: {name}...")
wait.geometry("280x80")
wait.transient(self._dlg)
wait.grab_set()
tk.Label(wait, text=f"Running diagnostics for {name}...").pack(expand=True)
def _run():
checks = run_endpoint_doctor(ep)
self._dlg.after(0, lambda: (wait.destroy(), _show_doctor_results_tk(self._dlg, name, checks)))
threading.Thread(target=_run, daemon=True).start()
def _doctor_all(self):
data = load_endpoints()
endpoints = data.get("endpoints", [])
if not endpoints:
messagebox.showinfo("Doctor All", "No endpoints configured.", parent=self._dlg)
return
wait = tk.Toplevel(self._dlg)
wait.title("Doctor All...")
wait.geometry("320x80")
wait.transient(self._dlg)
wait.grab_set()
tk.Label(wait, text=f"Testing {len(endpoints)} endpoints...").pack(expand=True)
all_results = {}
def _run():
for ep in endpoints:
try:
all_results[ep["name"]] = run_endpoint_doctor(ep)
except Exception as e:
all_results[ep["name"]] = [("Doctor run", False, str(e)[:100])]
def _show():
wait.destroy()
dlg = tk.Toplevel(self._dlg)
dlg.title("Doctor All Results")
dlg.geometry("580x480")
dlg.transient(self._dlg)
canvas = tk.Canvas(dlg)
scrollbar = ttk.Scrollbar(dlg, orient="vertical", command=canvas.yview)
inner = tk.Frame(canvas)
inner.bind("<Configure>", lambda e: canvas.configure(scrollregion=canvas.bbox("all")))
canvas.create_window((0, 0), window=inner, anchor="nw")
canvas.configure(yscrollcommand=scrollbar.set)
for ep_name, checks in all_results.items():
passed = sum(1 for _, ok, _ in checks if ok is True)
failed = sum(1 for _, ok, _ in checks if ok is False)
color = "#e74c3c" if failed else "#27ae60"
status = f"{failed} failed" if failed else f"{passed} passed"
tk.Label(inner, text=f"{ep_name} {status}", fg=color,
font=("Segoe UI", 9, "bold")).pack(anchor="w", padx=12, pady=(8, 2))
for name, ok, detail in checks:
if ok is True:
sym, sc = "", "#27ae60"
elif ok is False:
sym, sc = "", "#e74c3c"
else:
sym, sc = "", "#f39c12"
row = tk.Frame(inner)
row.pack(anchor="w", padx=24, pady=0)
tk.Label(row, text=sym, fg=sc, font=("Segoe UI", 9, "bold")).pack(side="left")
txt = name
if detail:
txt += f" {detail}"
tk.Label(row, text=txt, fg="#7f8c8d", font=("Segoe UI", 8)).pack(side="left")
ttk.Separator(inner).pack(fill="x", padx=12, pady=4)
canvas.pack(side="left", fill="both", expand=True, padx=(12, 0))
scrollbar.pack(side="right", fill="y")
ttk.Button(dlg, text="Close", command=dlg.destroy).pack(pady=8)
self._dlg.after(0, _show)
threading.Thread(target=_run, daemon=True).start()
# ═══════════════════════════════════════════════════════════════════════
# BGP Pool Manager
# ═══════════════════════════════════════════════════════════════════════
class BGPRouteDialog:
def __init__(self, parent, endpoints, existing=None):
self.result = None
self._dlg = tk.Toplevel(parent)
self._dlg.title("BGP Route")
self._dlg.geometry("440x300")
self._dlg.transient(parent)
self._dlg.grab_set()
main = ttk.Frame(self._dlg, padding=12)
main.pack(fill="both", expand=True)
ttk.Label(main, text="Route Name:").grid(row=0, column=0, sticky="e", padx=(0, 6), pady=2)
self._entry_name = ttk.Entry(main)
self._entry_name.grid(row=0, column=1, sticky="ew", pady=2)
if existing:
self._entry_name.insert(0, existing.get("name", ""))
ttk.Label(main, text="Endpoint:").grid(row=1, column=0, sticky="e", padx=(0, 6), pady=2)
ep_names = [e["name"] for e in endpoints]
self._combo_ep = ttk.Combobox(main, values=ep_names, state="readonly")
self._combo_ep.grid(row=1, column=1, sticky="ew", pady=2)
if existing and existing.get("endpoint_name") in ep_names:
self._combo_ep.set(existing["endpoint_name"])
elif ep_names:
self._combo_ep.set(ep_names[0])
ttk.Label(main, text="URL:").grid(row=2, column=0, sticky="e", padx=(0, 6), pady=2)
self._entry_url = ttk.Entry(main)
self._entry_url.grid(row=2, column=1, sticky="ew", pady=2)
ttk.Label(main, text="API Key:").grid(row=3, column=0, sticky="e", padx=(0, 6), pady=2)
self._entry_key = ttk.Entry(main, show="*")
self._entry_key.grid(row=3, column=1, sticky="ew", pady=2)
ttk.Label(main, text="Model:").grid(row=4, column=0, sticky="e", padx=(0, 6), pady=2)
self._combo_model = ttk.Combobox(main, state="readonly")
self._combo_model.grid(row=4, column=1, sticky="ew", pady=2)
main.columnconfigure(1, weight=1)
self._endpoints = endpoints
self._combo_ep.bind("<<ComboboxSelected>>", lambda e: self._on_ep_changed())
self._on_ep_changed()
if existing:
self._entry_url.delete(0, "end")
self._entry_url.insert(0, existing.get("target_url", ""))
self._entry_key.delete(0, "end")
self._entry_key.insert(0, existing.get("api_key", ""))
if existing.get("model"):
self._combo_model.set(existing["model"])
btn_frame = ttk.Frame(main)
btn_frame.grid(row=5, column=0, columnspan=2, pady=(12, 0))
ttk.Button(btn_frame, text="Cancel", command=self._dlg.destroy).pack(side="right")
ttk.Button(btn_frame, text="OK", command=self._ok).pack(side="right", padx=(8, 0))
self._dlg.wait_window()
def _on_ep_changed(self):
ep_name = self._combo_ep.get()
ep = None
for e in self._endpoints:
if e["name"] == ep_name:
ep = e
break
if ep:
self._entry_url.delete(0, "end")
self._entry_url.insert(0, normalize_base_url(ep.get("base_url", "")))
self._entry_key.delete(0, "end")
self._entry_key.insert(0, ep.get("api_key", ""))
models = ep.get("models", [])
self._combo_model["values"] = models
if ep.get("default_model") and ep["default_model"] in models:
self._combo_model.set(ep["default_model"])
elif models:
self._combo_model.set(models[0])
def _ok(self):
ep_name = self._combo_ep.get()
ep = None
for e in self._endpoints:
if e["name"] == ep_name:
ep = e
break
self.result = {
"name": self._entry_name.get().strip() or ep_name,
"endpoint_name": ep_name,
"target_url": self._entry_url.get().strip(),
"api_key": self._entry_key.get().strip(),
"model": self._combo_model.get() or "",
"priority": 99,
}
if ep:
self.result["reasoning_enabled"] = ep.get("reasoning_enabled", True)
self.result["reasoning_effort"] = ep.get("reasoning_effort", "medium")
self.result["oauth_provider"] = ep.get("oauth_provider", "")
self._dlg.destroy()
class BGPPoolEditDialog:
def __init__(self, parent, existing_name=None):
self.result = False
self._existing_name = existing_name
self._parent_mgr = parent
self._dlg = tk.Toplevel(parent._dlg if hasattr(parent, "_dlg") else parent)
title = "Edit BGP Pool" if existing_name else "Create BGP Pool"
self._dlg.title(title)
self._dlg.geometry("620x500")
self._dlg.transient(parent._dlg if hasattr(parent, "_dlg") else parent)
self._dlg.grab_set()
data = load_bgp_pools()
pool = None
if existing_name:
for p in data.get("pools", []):
if p["name"] == existing_name:
pool = p
break
if not pool:
pool = {"name": "", "strategy": "failover", "routes": []}
main = ttk.Frame(self._dlg, padding=12)
main.pack(fill="both", expand=True)
grid = ttk.Frame(main)
grid.pack(fill="x")
ttk.Label(grid, text="Pool Name:").grid(row=0, column=0, sticky="e", padx=(0, 6), pady=2)
self._entry_name = ttk.Entry(grid)
self._entry_name.grid(row=0, column=1, sticky="ew", pady=2)
self._entry_name.insert(0, pool["name"])
ttk.Label(grid, text="Strategy:").grid(row=1, column=0, sticky="e", padx=(0, 6), pady=2)
self._combo_strategy = ttk.Combobox(grid, values=["failover", "race"], state="readonly")
self._combo_strategy.grid(row=1, column=1, sticky="ew", pady=2)
self._combo_strategy.set(pool.get("strategy", "failover"))
grid.columnconfigure(1, weight=1)
ttk.Label(main, text="Routes (double-click to remove):", font=("Segoe UI", 9, "bold")).pack(anchor="w", pady=(8, 2))
tree_frame = ttk.Frame(main)
tree_frame.pack(fill="both", expand=True)
cols = ("name", "endpoint", "url", "model", "priority")
self._route_tree = ttk.Treeview(tree_frame, columns=cols, show="headings", height=8)
for col, heading, w in [("name", "Route Name", 100), ("endpoint", "Endpoint", 120),
("url", "URL", 160), ("model", "Model", 120), ("priority", "Priority", 60)]:
self._route_tree.heading(col, text=heading)
self._route_tree.column(col, width=w, minwidth=50)
rsb = ttk.Scrollbar(tree_frame, orient="vertical", command=self._route_tree.yview)
self._route_tree.configure(yscrollcommand=rsb.set)
self._route_tree.pack(side="left", fill="both", expand=True)
rsb.pack(side="right", fill="y")
self._routes = []
for r in pool.get("routes", []):
self._routes.append(dict(r))
self._route_tree.insert("", "end", values=(
r.get("name", ""), r.get("endpoint_name", ""),
r.get("target_url", ""), r.get("model", ""), r.get("priority", 99)))
btn_frame = ttk.Frame(main)
btn_frame.pack(fill="x", pady=(6, 0))
ttk.Button(btn_frame, text="Add Route", command=self._add_route).pack(side="left", padx=(0, 4))
ttk.Button(btn_frame, text="Edit Route", command=self._edit_route).pack(side="left", padx=(0, 4))
ttk.Button(btn_frame, text="Remove Route", command=self._remove_route).pack(side="left", padx=(0, 4))
ttk.Button(btn_frame, text="Up", command=lambda: self._move_route(-1)).pack(side="left", padx=(0, 4))
ttk.Button(btn_frame, text="Down", command=lambda: self._move_route(1)).pack(side="left", padx=(0, 4))
save_frame = ttk.Frame(main)
save_frame.pack(fill="x", pady=(8, 0))
ttk.Button(save_frame, text="Cancel", command=self._dlg.destroy).pack(side="right")
ttk.Button(save_frame, text="Save", command=self._save).pack(side="right", padx=(8, 0))
def _add_route(self):
endpoints = load_endpoints().get("endpoints", [])
if not endpoints:
messagebox.showinfo("Info", "No endpoints configured. Add endpoints first.", parent=self._dlg)
return
d = BGPRouteDialog(self._dlg, endpoints, None)
if d.result:
r = d.result
self._routes.append(r)
self._route_tree.insert("", "end", values=(
r.get("name", ""), r.get("endpoint_name", ""),
r.get("target_url", ""), r.get("model", ""), r.get("priority", 99)))
def _edit_route(self):
sel = self._route_tree.selection()
if not sel:
return
idx = self._route_tree.index(sel[0])
endpoints = load_endpoints().get("endpoints", [])
d = BGPRouteDialog(self._dlg, endpoints, self._routes[idx])
if d.result:
r = d.result
self._routes[idx] = r
self._route_tree.item(sel[0], values=(
r.get("name", ""), r.get("endpoint_name", ""),
r.get("target_url", ""), r.get("model", ""), r.get("priority", 99)))
def _remove_route(self):
sel = self._route_tree.selection()
if not sel:
return
idx = self._route_tree.index(sel[0])
self._route_tree.delete(sel[0])
del self._routes[idx]
def _move_route(self, direction):
sel = self._route_tree.selection()
if not sel:
return
idx = self._route_tree.index(sel[0])
new_idx = idx + direction
if new_idx < 0 or new_idx >= len(self._routes):
return
route = self._routes.pop(idx)
self._routes.insert(new_idx, route)
self._rebuild_routes_tree(new_idx)
def _rebuild_routes_tree(self, select_idx=None):
for item in self._route_tree.get_children():
self._route_tree.delete(item)
for r in self._routes:
self._route_tree.insert("", "end", values=(
r.get("name", ""), r.get("endpoint_name", ""),
r.get("target_url", ""), r.get("model", ""), r.get("priority", 99)))
if select_idx is not None:
children = self._route_tree.get_children()
if select_idx < len(children):
self._route_tree.selection_set(children[select_idx])
def _save(self):
name = self._entry_name.get().strip()
if not name:
return
strategy = self._combo_strategy.get() or "failover"
routes = []
for i, r in enumerate(self._routes):
if not r.get("target_url"):
continue
routes.append({
"name": r.get("name") or f"Route {i+1}",
"endpoint_name": r.get("endpoint_name", ""),
"target_url": r.get("target_url", ""),
"api_key": r.get("api_key", ""),
"model": r.get("model", ""),
"priority": i + 1,
"reasoning_enabled": True,
"reasoning_effort": "medium",
})
data = load_bgp_pools()
if self._existing_name:
data["pools"] = [p for p in data["pools"] if p["name"] != self._existing_name]
data["pools"].append({"name": name, "strategy": strategy, "routes": routes})
save_bgp_pools(data)
self.result = True
self._dlg.destroy()
class BGPPoolMgr:
def __init__(self, parent, on_update=None):
self._parent = parent
self._on_update = on_update
self._dlg = tk.Toplevel(parent)
self._dlg.title("AI BGP -- Pool Manager")
self._dlg.geometry("660x440")
self._dlg.transient(parent)
main = ttk.Frame(self._dlg, padding=12)
main.pack(fill="both", expand=True)
ttk.Label(main, text="AI BGP Pools -- multi-provider routing with automatic failover",
font=("Segoe UI", 10, "bold")).pack(anchor="w")
tree_frame = ttk.Frame(main)
tree_frame.pack(fill="both", expand=True, pady=(8, 0))
cols = ("name", "routes", "strategy")
self._tree = ttk.Treeview(tree_frame, columns=cols, show="headings", height=10)
for col, heading, w in [("name", "Pool Name", 180), ("routes", "Routes", 280), ("strategy", "Strategy", 100)]:
self._tree.heading(col, text=heading)
self._tree.column(col, width=w, minwidth=60)
sb = ttk.Scrollbar(tree_frame, orient="vertical", command=self._tree.yview)
self._tree.configure(yscrollcommand=sb.set)
self._tree.pack(side="left", fill="both", expand=True)
sb.pack(side="right", fill="y")
btn_frame = ttk.Frame(main)
btn_frame.pack(fill="x", pady=(8, 0))
ttk.Button(btn_frame, text="Create Pool", command=self._add_pool).pack(side="left", padx=(0, 4))
ttk.Button(btn_frame, text="Edit Pool", command=self._edit_pool).pack(side="left", padx=(0, 4))
ttk.Button(btn_frame, text="Delete Pool", command=self._del_pool).pack(side="left", padx=(0, 4))
ttk.Button(btn_frame, text="Close", command=self._dlg.destroy).pack(side="right")
self._rebuild()
def _rebuild(self):
for item in self._tree.get_children():
self._tree.delete(item)
for pool in load_bgp_pools().get("pools", []):
routes_str = " -> ".join(f'{r.get("name","?")}/{r.get("model","?")}' for r in pool.get("routes", []))
self._tree.insert("", "end", values=(pool["name"], routes_str, pool.get("strategy", "failover")))
def _selected_name(self):
sel = self._tree.selection()
if not sel:
return None
return self._tree.item(sel[0])["values"][0]
def _add_pool(self):
d = BGPPoolEditDialog(self, None)
self._dlg.wait_window(d._dlg)
if d.result:
self._rebuild()
if self._on_update:
self._on_update()
def _edit_pool(self):
name = self._selected_name()
if not name:
return
d = BGPPoolEditDialog(self, name)
self._dlg.wait_window(d._dlg)
if d.result:
self._rebuild()
if self._on_update:
self._on_update()
def _del_pool(self):
name = self._selected_name()
if not name:
return
if not messagebox.askyesno("Delete", f'Delete BGP pool "{name}"?', parent=self._dlg):
return
data = load_bgp_pools()
data["pools"] = [p for p in data["pools"] if p["name"] != name]
save_bgp_pools(data)
self._rebuild()
if self._on_update:
self._on_update()
# ═══════════════════════════════════════════════════════════════════════
# AI Monitoring Window
# ═══════════════════════════════════════════════════════════════════════
class AIMonitoringWindow:
def __init__(self, parent):
self._dlg = tk.Toplevel(parent)
self._dlg.title("AI Monitoring")
self._dlg.geometry("580x520")
self._dlg.transient(parent)
self._cfg = load_monitoring_config()
self._store = load_incident_store()
main = ttk.Frame(self._dlg, padding=12)
main.pack(fill="both", expand=True)
hdr = ttk.Frame(main)
hdr.pack(fill="x")
ttk.Label(hdr, text="AI Monitoring", font=("Segoe UI", 11, "bold")).pack(side="left")
self._toggle_var = tk.BooleanVar(value=self._cfg.get("enabled", False))
ttk.Checkbutton(hdr, text="Enabled", variable=self._toggle_var,
command=self._on_toggle).pack(side="right")
frame = ttk.LabelFrame(main, text="Diagnostic Agent", padding=8)
frame.pack(fill="x", pady=(8, 0))
grid = ttk.Frame(frame)
grid.pack(fill="x")
grid.columnconfigure(1, weight=1)
ttk.Label(grid, text="Provider URL:").grid(row=0, column=0, sticky="e", padx=(0, 6), pady=2)
self._url_entry = ttk.Entry(grid)
self._url_entry.grid(row=0, column=1, sticky="ew", pady=2)
self._url_entry.insert(0, self._cfg.get("provider_url", ""))
ttk.Label(grid, text="Model:").grid(row=1, column=0, sticky="e", padx=(0, 6), pady=2)
self._model_entry = ttk.Entry(grid)
self._model_entry.grid(row=1, column=1, sticky="ew", pady=2)
self._model_entry.insert(0, self._cfg.get("model", ""))
ttk.Label(grid, text="API Key:").grid(row=2, column=0, sticky="e", padx=(0, 6), pady=2)
key_frame = ttk.Frame(grid)
key_frame.grid(row=2, column=1, sticky="ew", pady=2)
self._key_entry = ttk.Entry(key_frame, show="*")
self._key_entry.pack(side="left", fill="x", expand=True)
self._key_entry.insert(0, self._cfg.get("api_key", ""))
self._reveal_key = tk.BooleanVar(value=False)
ttk.Checkbutton(key_frame, text="Show", variable=self._reveal_key,
command=lambda: self._key_entry.configure(show="" if self._reveal_key.get() else "*")).pack(side="left", padx=(4, 0))
ttk.Label(grid, text="Health Check:").grid(row=3, column=0, sticky="e", padx=(0, 6), pady=2)
spin_frame = ttk.Frame(grid)
spin_frame.grid(row=3, column=1, sticky="w", pady=2)
self._interval_spin = ttk.Spinbox(spin_frame, from_=2, to=30, width=5)
self._interval_spin.set(self._cfg.get("health_check_interval_s", 5))
self._interval_spin.pack(side="left")
ttk.Label(spin_frame, text="seconds").pack(side="left", padx=(4, 0))
opts_frame = ttk.Frame(frame)
opts_frame.pack(fill="x", pady=(4, 0))
self._auto_restart_var = tk.BooleanVar(value=self._cfg.get("auto_restart_proxy", True))
ttk.Checkbutton(opts_frame, text="Auto-restart proxy on crash",
variable=self._auto_restart_var).pack(side="left")
self._auto_switch_var = tk.BooleanVar(value=self._cfg.get("auto_switch_provider", False))
ttk.Checkbutton(opts_frame, text="Auto-switch provider on repeated failure",
variable=self._auto_switch_var).pack(side="left", padx=(12, 0))
ttk.Button(frame, text="Save Configuration", command=self._on_save).pack(pady=(8, 0))
stats = self._store.get("stats", {"ai_calls": 0, "tokens_used": 0})
stats_text = (f"AI diagnostic calls: {stats.get('ai_calls', 0)} | "
f"Tokens used: {stats.get('tokens_used', 0):,} | "
f"Known patterns: {len(self._store.get('incidents', {}))}")
ttk.Label(main, text=stats_text, font=("Segoe UI", 8)).pack(anchor="w", pady=(8, 0))
inc_frame = ttk.LabelFrame(main, text="Recent Incidents", padding=4)
inc_frame.pack(fill="both", expand=True, pady=(4, 0))
self._inc_text = tk.Text(inc_frame, height=8, wrap="word", state="disabled")
inc_sb = ttk.Scrollbar(inc_frame, orient="vertical", command=self._inc_text.yview)
self._inc_text.configure(yscrollcommand=inc_sb.set)
self._inc_text.pack(side="left", fill="both", expand=True)
inc_sb.pack(side="right", fill="y")
self._refresh_incidents()
btn_frame = ttk.Frame(main)
btn_frame.pack(fill="x", pady=(8, 0))
ttk.Button(btn_frame, text="View Monitoring Log",
command=lambda: open_file(str(PROXY_CONFIG_DIR / "monitoring.log"))).pack(side="left")
ttk.Button(btn_frame, text="Clear Incident Store", command=self._on_clear_store).pack(side="left", padx=(8, 0))
ttk.Button(btn_frame, text="Close", command=self._dlg.destroy).pack(side="right")
def _on_toggle(self):
self._cfg["enabled"] = self._toggle_var.get()
save_monitoring_config(self._cfg)
def _on_save(self):
self._cfg["provider_url"] = self._url_entry.get().strip()
self._cfg["model"] = self._model_entry.get().strip()
self._cfg["api_key"] = self._key_entry.get().strip()
try:
self._cfg["health_check_interval_s"] = int(self._interval_spin.get())
except ValueError:
pass
self._cfg["auto_restart_proxy"] = self._auto_restart_var.get()
self._cfg["auto_switch_provider"] = self._auto_switch_var.get()
save_monitoring_config(self._cfg)
self._inc_text.configure(state="normal")
self._inc_text.delete("1.0", "end")
self._inc_text.insert("end", "Configuration saved.\n")
self._inc_text.configure(state="disabled")
def _on_clear_store(self):
save_incident_store({"version": 1, "incidents": {}, "stats": {"ai_calls": 0, "tokens_used": 0}})
self._store = {"version": 1, "incidents": {}, "stats": {"ai_calls": 0, "tokens_used": 0}}
self._refresh_incidents()
def _refresh_incidents(self):
lines = []
for pattern, inc in sorted(self._store.get("incidents", {}).items(),
key=lambda x: x[1].get("last_seen", ""), reverse=True):
sc = inc.get("success_count", 0)
fc = inc.get("fail_count", 0)
rate = sc / max(sc + fc, 1)
lines.append(
f"[{inc.get('last_seen', '?')[:16]}] {pattern}\n"
f" fix={inc.get('fix', '?')} success_rate={rate:.0%} seen={inc.get('occurrences', 0)}x\n"
)
if not lines:
lines.append("No incidents recorded yet.\n\nEnable AI Monitoring and use Codex to populate the store.\n")
self._inc_text.configure(state="normal")
self._inc_text.delete("1.0", "end")
self._inc_text.insert("end", "\n".join(lines))
self._inc_text.configure(state="disabled")
# ═══════════════════════════════════════════════════════════════════════
# Usage Dashboard
# ═══════════════════════════════════════════════════════════════════════
class UsageWindow:
def __init__(self, parent):
self._U = _usage_theme()
self._dlg = tk.Toplevel(parent)
self._dlg.title("Usage Dashboard")
self._dlg.geometry("720x640")
self._dlg.transient(parent)
self._dlg.configure(bg=self._U["base"])
self._build_header()
self._build_summary_strip()
ttk.Separator(self._dlg).pack(fill="x", padx=16)
self._cards_frame = tk.Frame(self._dlg, bg=self._U["base"])
canvas = tk.Canvas(self._cards_frame, bg=self._U["base"], highlightthickness=0)
scrollbar = ttk.Scrollbar(self._cards_frame, orient="vertical", command=canvas.yview)
self._cards_inner = tk.Frame(canvas, bg=self._U["base"])
self._cards_inner.bind("<Configure>", lambda e: canvas.configure(scrollregion=canvas.bbox("all")))
canvas.create_window((0, 0), window=self._cards_inner, anchor="nw")
canvas.configure(yscrollcommand=scrollbar.set)
canvas.pack(side="left", fill="both", expand=True, padx=(16, 0))
scrollbar.pack(side="right", fill="y")
self._cards_frame.pack(fill="both", expand=True, pady=(8, 0))
self._refresh()
def _build_header(self):
U = self._U
hdr = tk.Frame(self._dlg, bg=U["base"])
hdr.pack(fill="x", padx=16, pady=(12, 6))
tk.Label(hdr, text="", fg=U["accent"], bg=U["base"], font=("Segoe UI", 14)).pack(side="left")
tk.Label(hdr, text="Usage Dashboard", fg=U["text"], bg=U["base"],
font=("Segoe UI", 14, "bold")).pack(side="left", padx=(4, 0))
self._status_dots = tk.Label(hdr, text="", fg=U["text"], bg=U["base"], font=("Segoe UI", 9))
self._status_dots.pack(side="left", padx=(8, 0))
self._updated_lbl = tk.Label(hdr, text="Never", fg=U["dim"], bg=U["base"], font=("Segoe UI", 8))
self._updated_lbl.pack(side="right")
refresh_btn = tk.Button(hdr, text="Refresh", fg=U["text"], bg=U["surface0"],
activebackground=U["surface1"], relief="flat", bd=0,
command=self._refresh, padx=12, pady=2)
refresh_btn.pack(side="right", padx=(8, 0))
def _build_summary_strip(self):
U = self._U
strip = tk.Frame(self._dlg, bg=U["surface0"], padx=12, pady=8)
strip.pack(fill="x", padx=16, pady=(0, 6))
self._kpi_labels = {}
for key, label, icon in [("providers", "Providers", "\U0001F4CA"),
("requests", "Requests", ""),
("tokens", "Tokens", "\U0001F9E0"),
("latency", "Avg Latency", "")]:
box = tk.Frame(strip, bg=U["surface0"])
box.pack(side="left", padx=(0, 20))
tk.Label(box, text=f"{icon} {label}", fg=U["dim"], bg=U["surface0"],
font=("Segoe UI", 8), anchor="w").pack(anchor="w")
val = tk.Label(box, text="-", fg=U["text"], bg=U["surface0"],
font=("Segoe UI", 9, "bold"), anchor="w")
val.pack(anchor="w")
self._kpi_labels[key] = val
def _refresh(self):
for w in self._cards_inner.winfo_children():
w.destroy()
stats = load_usage_stats()
updated = stats.get("updated")
if updated:
self._updated_lbl.configure(text=updated)
providers = stats.get("providers", {})
if not providers:
tk.Label(self._cards_inner, text="No usage data yet.\nLaunch a session to start tracking.",
fg=self._U["dim"], bg=self._U["base"], font=("Segoe UI", 11)).pack(pady=60)
return
total_req = total_tok_in = total_tok_out = 0
total_dur = 0.0
n_ok = n_warn = n_err = 0
sorted_providers = sorted(providers.items(), key=lambda x: x[1].get("total_requests", 0), reverse=True)
for prov_name, prov_data in sorted_providers:
t = prov_data.get("total_requests", 0)
total_req += t
total_tok_in += prov_data.get("total_tokens_in", 0)
total_tok_out += prov_data.get("total_tokens_out", 0)
total_dur += prov_data.get("total_duration_s", 0.0)
fail = prov_data.get("failures", 0)
fail_pct = fail / t if t > 0 else 0
if fail_pct > 0.15:
n_err += 1
elif fail_pct > 0.05:
n_warn += 1
else:
n_ok += 1
self._kpi_labels["providers"].configure(text=str(len(providers)))
self._kpi_labels["requests"].configure(text=f"{total_req:,}")
tok_sum = total_tok_in + total_tok_out
tok_str = f"{_fmt_tok(tok_sum)} in:{_fmt_tok(total_tok_in)} out:{_fmt_tok(total_tok_out)}" if tok_sum else "N/A"
self._kpi_labels["tokens"].configure(text=tok_str)
avg_lat = total_dur / total_req if total_req > 0 else 0
self._kpi_labels["latency"].configure(text=_fmt_dur(avg_lat))
dots = ""
if n_ok:
dots += f"{n_ok} "
if n_warn:
dots += f"{n_warn} "
if n_err:
dots += f"{n_err}"
self._status_dots.configure(text=dots)
for prov_name, prov_data in sorted_providers:
self._build_card(prov_name, prov_data)
def _build_card(self, name, data):
U = self._U
total = data.get("total_requests", 0)
ok = data.get("successes", 0)
fail = data.get("failures", 0)
success_rate = ok / total if total > 0 else 1.0
fail_pct = fail / total if total > 0 else 0
status_text, status_color = _status_pill(success_rate, fail_pct)
card = tk.Frame(self._cards_inner, bg=U["surface0"], padx=14, pady=10,
highlightbackground=status_color, highlightthickness=1)
card.pack(fill="x", pady=(0, 6))
top = tk.Frame(card, bg=U["surface0"])
top.pack(fill="x")
tk.Label(top, text="", fg=status_color, bg=U["surface0"], font=("Segoe UI", 10)).pack(side="left")
short = name.replace("https://", "").replace("http://", "").split("/")[0]
tk.Label(top, text=short, fg=U["text"], bg=U["surface0"],
font=("Segoe UI", 10, "bold")).pack(side="left", padx=(4, 0))
tk.Label(top, text=f" {status_text} ", fg=U["base"], bg=status_color,
font=("Segoe UI", 8, "bold")).pack(side="left", padx=(4, 0))
tk.Label(top, text=f"{total} req", fg=U["subtext"], bg=U["surface0"],
font=("Segoe UI", 8)).pack(side="left", padx=(6, 0))
last_used = data.get("last_used", "")
if last_used:
tk.Label(top, text=last_used, fg=U["dim"], bg=U["surface0"],
font=("Segoe UI", 7)).pack(side="right")
gauge = tk.Frame(card, bg=U["surface0"])
gauge.pack(fill="x", pady=(4, 0))
bar_frame = tk.Frame(gauge, bg=U["surface1"], height=12)
bar_frame.pack(fill="x", side="left", expand=True)
bar_frame.pack_propagate(False)
fill_pct = int(success_rate * 100)
fill_frame = tk.Frame(bar_frame, bg=status_color, height=12)
fill_frame.place(relwidth=success_rate, relheight=1.0)
tk.Label(gauge, text=f"{fill_pct}%", fg=U["subtext"], bg=U["surface0"],
font=("Segoe UI", 8)).pack(side="left", padx=(4, 0))
if fail > 0:
tk.Label(gauge, text=f"{fail} fail", fg=U["red"], bg=U["surface0"],
font=("Segoe UI", 8)).pack(side="right")
metrics = tk.Frame(card, bg=U["surface0"])
metrics.pack(fill="x", pady=(4, 0))
t_in = data.get("total_tokens_in", 0)
t_out = data.get("total_tokens_out", 0)
dur = data.get("total_duration_s", 0.0)
avg_dur = dur / total if total > 0 else 0
for label, value, color in [("Tokens In", _fmt_tok(t_in), U["sapphire"]),
("Tokens Out", _fmt_tok(t_out), U["peach"]),
("Avg Latency", _fmt_dur(avg_dur), U["sky"]),
("Duration", _fmt_dur(dur), U["lavender"])]:
box = tk.Frame(metrics, bg=U["surface0"])
box.pack(side="left", padx=(0, 16))
tk.Label(box, text=label, fg=U["dim"], bg=U["surface0"], font=("Segoe UI", 7)).pack(anchor="w")
tk.Label(box, text=value, fg=color, bg=U["surface0"],
font=("Segoe UI", 9, "bold")).pack(anchor="w")
models = data.get("models", {})
if models:
models_frame = tk.Frame(card, bg=U["surface0"])
models_frame.pack(fill="x", pady=(4, 0))
tk.Label(models_frame, text="Models:", fg=U["lavender"], bg=U["surface0"],
font=("Segoe UI", 8, "bold")).pack(anchor="w")
sorted_models = sorted(models.items(), key=lambda x: x[1].get("requests", 0), reverse=True)
for i, (mname, mdata) in enumerate(sorted_models[:6]):
m_req = mdata.get("requests", 0)
pct = m_req / total * 100 if total > 0 else 0
color = U["model_palette"][i % len(U["model_palette"])]
row = tk.Frame(models_frame, bg=U["surface0"])
row.pack(fill="x")
tk.Label(row, text=f"{mname}", fg=color, bg=U["surface0"],
font=("Segoe UI", 7)).pack(side="left")
tk.Label(row, text=f"{pct:.0f}% ({m_req})", fg=U["dim"], bg=U["surface0"],
font=("Segoe UI", 7)).pack(side="left", padx=(8, 0))
last_err = data.get("last_error")
if last_err:
err_frame = tk.Frame(card, bg=U["surface0"])
err_frame.pack(fill="x", pady=(4, 0))
tk.Label(err_frame, text=f"{last_err}", fg=U["red"], bg=U["surface0"],
font=("Segoe UI", 7)).pack(anchor="w")
# ═══════════════════════════════════════════════════════════════════════
# Request History Window
# ═══════════════════════════════════════════════════════════════════════
class RequestHistoryWindow:
def __init__(self, parent):
self._snap_dir = PROXY_CONFIG_DIR / "requests"
self._dlg = tk.Toplevel(parent)
self._dlg.title("Request History")
self._dlg.geometry("720x500")
self._dlg.transient(parent)
main = ttk.Frame(self._dlg, padding=10)
main.pack(fill="both", expand=True)
hdr = ttk.Frame(main)
hdr.pack(fill="x")
ttk.Label(hdr, text="Request History", font=("Segoe UI", 11, "bold")).pack(side="left")
ttk.Button(hdr, text="Clear All", command=self._clear_all).pack(side="right")
ttk.Button(hdr, text="Refresh", command=self._load).pack(side="right", padx=(0, 4))
paned = ttk.PanedWindow(main, orient="vertical")
paned.pack(fill="both", expand=True, pady=(6, 0))
top_frame = ttk.Frame(paned)
cols = ("time", "model", "status", "duration", "id", "error")
self._tree = ttk.Treeview(top_frame, columns=cols, show="headings", height=10)
for col, heading, w in [("time", "Time", 140), ("model", "Model", 140), ("status", "Status", 80),
("duration", "Duration", 70), ("id", "ID", 180), ("error", "Error", 120)]:
self._tree.heading(col, text=heading)
self._tree.column(col, width=w, minwidth=50)
tree_sb = ttk.Scrollbar(top_frame, orient="vertical", command=self._tree.yview)
self._tree.configure(yscrollcommand=tree_sb.set)
self._tree.pack(side="left", fill="both", expand=True)
tree_sb.pack(side="right", fill="y")
paned.add(top_frame, weight=1)
bottom_frame = ttk.Frame(paned)
self._detail = tk.Text(bottom_frame, height=10, wrap="word", font=("Consolas", 9))
detail_sb = ttk.Scrollbar(bottom_frame, orient="vertical", command=self._detail.yview)
self._detail.configure(yscrollcommand=detail_sb.set)
self._detail.pack(side="left", fill="both", expand=True)
detail_sb.pack(side="right", fill="y")
paned.add(bottom_frame, weight=1)
self._tree.bind("<<TreeviewSelect>>", self._on_select)
self._snapshots = []
self._load()
def _load(self):
for item in self._tree.get_children():
self._tree.delete(item)
self._snapshots = []
if not self._snap_dir.exists():
return
files = sorted(self._snap_dir.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True)
for f in files[:200]:
try:
data = json.loads(f.read_text())
meta = data.get("_meta", {})
self._snapshots.append(data)
ts = meta.get("ts_iso", "")[:19].replace("T", " ")
model = meta.get("model", "?")
status = meta.get("status", "unknown")
dur = f"{meta['duration_s']:.1f}s" if meta.get("duration_s") is not None else "-"
rid = meta.get("request_id", "")[:28]
err = (meta.get("error") or "")[:60]
self._tree.insert("", "end", values=(ts, model, status, dur, rid, err))
except Exception:
pass
def _on_select(self, event):
sel = self._tree.selection()
if not sel:
return
idx = self._tree.index(sel[0])
if idx < len(self._snapshots):
data = self._snapshots[idx]
self._detail.delete("1.0", "end")
self._detail.insert("end", json.dumps(data, indent=2, ensure_ascii=False)[:50000])
def _clear_all(self):
if not messagebox.askyesno("Clear All", "Delete all request snapshots?", parent=self._dlg):
return
if self._snap_dir.exists():
for f in self._snap_dir.glob("*.json"):
try:
f.unlink()
except Exception:
pass
for item in self._tree.get_children():
self._tree.delete(item)
self._snapshots = []
self._detail.delete("1.0", "end")
# ═══════════════════════════════════════════════════════════════════════
# Benchmark Window
# ═══════════════════════════════════════════════════════════════════════
class BenchmarkWindow:
_BENCH_PROMPT = "In exactly 3 bullet points, explain why the sky is blue."
_BENCH_TOOLS = [{"type": "function", "function": {"name": "get_weather",
"parameters": {"type": "object", "properties": {"city": {"type": "string"}}}}}]
def __init__(self, parent):
self._dlg = tk.Toplevel(parent)
self._dlg.title("Model Benchmark")
self._dlg.geometry("820x560")
self._dlg.transient(parent)
self._running = False
self._ep_data = load_endpoints()
main = ttk.Frame(self._dlg, padding=10)
main.pack(fill="both", expand=True)
hdr = ttk.Frame(main)
hdr.pack(fill="x")
ttk.Label(hdr, text="Multi-Provider Benchmark", font=("Segoe UI", 11, "bold")).pack(side="left")
self._run_btn = ttk.Button(hdr, text="Run Benchmark", command=self._run)
self._run_btn.pack(side="right")
lanes_frame = ttk.Frame(main)
lanes_frame.pack(fill="x", pady=(8, 0))
self._lanes = []
self._c_var = tk.BooleanVar(value=False)
for i, lane_label in enumerate(["A", "B", "C"]):
if i == 2:
lf = ttk.LabelFrame(lanes_frame, text="Lane C (optional)")
cb = ttk.Checkbutton(lanes_frame, text="Enable Lane C", variable=self._c_var,
command=lambda: lf.configure() if not self._c_var.get() else None)
else:
lf = ttk.LabelFrame(lanes_frame, text=f"Lane {lane_label}")
lf.pack(side="left", fill="both", expand=True, padx=(0, 4 if i < 2 else 0))
ep_frame = ttk.Frame(lf, padding=4)
ep_frame.pack(fill="x")
ttk.Label(ep_frame, text="Endpoint:").pack(side="left")
ep_combo = ttk.Combobox(ep_frame, values=[e["name"] for e in self._ep_data.get("endpoints", [])], state="readonly")
ep_combo.pack(side="left", fill="x", expand=True, padx=(4, 0))
m_frame = ttk.Frame(lf, padding=4)
m_frame.pack(fill="x")
ttk.Label(m_frame, text="Model:").pack(side="left")
m_combo = ttk.Combobox(m_frame, state="readonly")
m_combo.pack(side="left", fill="x", expand=True, padx=(4, 0))
ep_combo.bind("<<ComboboxSelected>>", lambda e, mc=m_combo: self._update_lane_models(ep_combo, mc))
self._lanes.append({"ep": ep_combo, "model": m_combo})
default_name = self._ep_data.get("default")
eps = self._ep_data.get("endpoints", [])
if default_name:
self._lanes[0]["ep"].set(default_name)
if len(eps) > 1:
self._lanes[1]["ep"].set(eps[1]["name"])
elif eps:
self._lanes[1]["ep"].set(eps[0]["name"])
if len(eps) > 2:
self._lanes[2]["ep"].set(eps[2]["name"])
elif len(eps) > 1:
self._lanes[2]["ep"].set(eps[1]["name"])
tests_frame = ttk.Frame(main)
tests_frame.pack(fill="x", pady=(8, 0))
self._test_ttft = tk.BooleanVar(value=True)
self._test_total = tk.BooleanVar(value=True)
self._test_tools = tk.BooleanVar(value=True)
self._test_tps = tk.BooleanVar(value=True)
ttk.Checkbutton(tests_frame, text="Time to First Token", variable=self._test_ttft).pack(side="left")
ttk.Checkbutton(tests_frame, text="Total Latency", variable=self._test_total).pack(side="left", padx=(8, 0))
ttk.Checkbutton(tests_frame, text="Tool Call", variable=self._test_tools).pack(side="left", padx=(8, 0))
ttk.Checkbutton(tests_frame, text="Tokens/sec", variable=self._test_tps).pack(side="left", padx=(8, 0))
results_frame = ttk.Frame(main)
results_frame.pack(fill="both", expand=True, pady=(8, 0))
cols = ("test", "a", "b", "c", "winner")
self._results_tree = ttk.Treeview(results_frame, columns=cols, show="headings", height=6)
for col, heading in [("test", "Test"), ("a", "Lane A"), ("b", "Lane B"), ("c", "Lane C"), ("winner", "Winner")]:
self._results_tree.heading(col, text=heading)
self._results_tree.column(col, width=150, minwidth=80)
rsb = ttk.Scrollbar(results_frame, orient="vertical", command=self._results_tree.yview)
self._results_tree.configure(yscrollcommand=rsb.set)
self._results_tree.pack(side="left", fill="both", expand=True)
rsb.pack(side="right", fill="y")
self._status_var = tk.StringVar(value="Select endpoints and models per lane, then Run Benchmark.")
ttk.Label(main, textvariable=self._status_var).pack(anchor="w", pady=(4, 0))
def _update_lane_models(self, ep_combo, model_combo):
name = ep_combo.get()
if not name:
return
ep = get_endpoint(name)
models = (ep or {}).get("models", [])
model_combo["values"] = models
if models:
model_combo.set(models[0])
def _collect_lanes(self):
active = []
for i, lane in enumerate(self._lanes):
if i == 2 and not self._c_var.get():
continue
ep_name = lane["ep"].get()
model = lane["model"].get()
if not ep_name or not model:
continue
ep = get_endpoint(ep_name)
if not ep:
continue
active.append({"ep": ep, "model": model, "label": f"{ep_name}/{model}"})
return active
def _bench_single(self, ep, model, stream, with_tools=False):
url = normalize_base_url(ep.get("base_url", ""))
key = (ep.get("api_key") or "").strip()
bt = ep.get("backend_type", "openai-compat")
if bt == "anthropic":
test_url = f"{url}/v1/messages"
headers = {"User-Agent": UA, "x-api-key": key, "anthropic-version": "2023-06-01", "content-type": "application/json"}
body = {"model": model, "max_tokens": 100, "stream": stream,
"messages": [{"role": "user", "content": self._BENCH_PROMPT}]}
if with_tools:
body["tools"] = self._BENCH_TOOLS
body["messages"] = [{"role": "user", "content": "Use get_weather for Paris"}]
data = json.dumps(body).encode()
else:
test_url = f"{url}/chat/completions"
headers = {"User-Agent": UA, "Authorization": f"Bearer {key}", "content-type": "application/json"}
body = {"model": model, "max_tokens": 100, "stream": stream,
"messages": [{"role": "user", "content": self._BENCH_PROMPT}]}
if with_tools:
body["tools"] = self._BENCH_TOOLS
body["messages"] = [{"role": "user", "content": "Use get_weather for Paris"}]
data = json.dumps(body).encode()
req = urllib.request.Request(test_url, data=data, headers=headers, method="POST")
t0 = time.time()
ttft = None
try:
resp = urllib.request.urlopen(req, timeout=60)
if stream:
first_chunk_time = None
chunks = []
while True:
chunk = resp.read(4096)
if not chunk:
break
if first_chunk_time is None:
first_chunk_time = time.time()
ttft = first_chunk_time - t0
chunks.append(chunk)
total = time.time() - t0
result_text = b"".join(chunks).decode(errors="replace")[:300]
else:
raw = resp.read()
total = time.time() - t0
result_text = raw.decode(errors="replace")[:300]
payload = json.loads(raw)
choices = payload.get("choices", [])
if choices:
msg = choices[0].get("message", {})
if with_tools:
tcs = msg.get("tool_calls", [])
has_tools = len(tcs) > 0
return {"ttft": ttft or total, "total": total,
"detail": f"tools={has_tools}, tok={payload.get('usage', {}).get('total_tokens', '?')}"}
content = msg.get("content", "")[:50]
return {"ttft": ttft or total, "total": total,
"detail": f"{content[:40]}... tok={payload.get('usage', {}).get('total_tokens', '?')}"}
return {"ttft": ttft or total, "total": total, "detail": result_text[:60]}
except Exception as e:
total = time.time() - t0
return {"ttft": ttft or total, "total": total, "detail": f"Error: {str(e)[:40]}"}
def _bench_tps(self, ep, model):
url = normalize_base_url(ep.get("base_url", ""))
key = (ep.get("api_key") or "").strip()
bt = ep.get("backend_type", "openai-compat")
prompt = "Write a detailed paragraph about artificial intelligence in at least 150 words."
max_tok = 512
if bt == "anthropic":
test_url = f"{url}/v1/messages"
headers = {"User-Agent": UA, "x-api-key": key, "anthropic-version": "2023-06-01", "content-type": "application/json"}
else:
test_url = f"{url}/chat/completions"
headers = {"User-Agent": UA, "Authorization": f"Bearer {key}", "content-type": "application/json"}
body = json.dumps({"model": model, "max_tokens": max_tok, "stream": True,
"messages": [{"role": "user", "content": prompt}]}).encode()
req = urllib.request.Request(test_url, data=body, headers=headers, method="POST")
t0 = time.time()
first_token_t = None
token_count = 0
try:
resp = urllib.request.urlopen(req, timeout=90)
buf = b""
while True:
chunk = resp.read(4096)
if not chunk:
break
if first_token_t is None:
first_token_t = time.time()
buf += chunk
total = time.time() - t0
text = buf.decode(errors="replace")
for line in text.split("\n"):
if line.startswith("data: ") and line != "data: [DONE]":
try:
d = json.loads(line[6:])
content = d.get("choices", [{}])[0].get("delta", {}).get("content", "")
if content:
token_count += max(1, len(content) / 4)
except Exception:
pass
if token_count == 0:
token_count = max(1, len(text) / 4)
gen_time = (time.time() - first_token_t) if first_token_t else total
tps = token_count / gen_time if gen_time > 0 else 0
return {"tps": tps, "tokens": int(token_count), "gen_time": gen_time, "total": total,
"detail": f"{int(token_count)} tok / {gen_time:.1f}s"}
except Exception as e:
total = time.time() - t0
return {"tps": 0, "tokens": 0, "gen_time": total, "total": total, "detail": f"Error: {str(e)[:40]}"}
def _run(self):
if self._running:
return
lanes = self._collect_lanes()
if len(lanes) < 2:
self._status_var.set("Need at least 2 lanes with endpoint + model selected.")
return
self._running = True
self._run_btn.configure(state="disabled")
for item in self._results_tree.get_children():
self._results_tree.delete(item)
self._status_var.set("Running benchmark...")
threading.Thread(target=self._run_bench, args=(lanes,), daemon=True).start()
def _run_bench(self, lanes):
results = []
tests = []
if self._test_ttft.get():
tests.append(("TTFT (stream)", True, False))
if self._test_total.get():
tests.append(("Total latency", False, False))
if self._test_tools.get():
tests.append(("Tool call", False, True))
run_tps = self._test_tps.get()
for test_name, stream, tools in tests:
lane_results = []
for lane in lanes:
label = lane["label"]
self._dlg.after(0, lambda l=label: self._status_var.set(f"Running {test_name}: {l}..."))
r = self._bench_single(lane["ep"], lane["model"], stream, tools)
lane_results.append((label, r))
metric = "ttft" if stream else "total"
values = [(lr[0], lr[1][metric]) for lr in lane_results]
sorted_v = sorted(values, key=lambda x: x[1])
best_val = sorted_v[0][1]
second_val = sorted_v[1][1] if len(sorted_v) > 1 else best_val + 1
if best_val < second_val * 0.85:
winner = sorted_v[0][0]
else:
winner = "Tie"
cols = []
for lr in lane_results:
v = lr[1][metric]
cols.append(f"{v:.2f}s ({lr[1]['detail'][:30]})")
while len(cols) < 3:
cols.append("--")
cols.append(winner)
results.append(tuple([test_name] + cols))
if run_tps:
lane_tps = []
for lane in lanes:
label = lane["label"]
self._dlg.after(0, lambda l=label: self._status_var.set(f"Tokens/sec: {l}..."))
r = self._bench_tps(lane["ep"], lane["model"])
lane_tps.append((label, r))
tps_vals = [(lt[0], lt[1]["tps"]) for lt in lane_tps]
sorted_tps = sorted(tps_vals, key=lambda x: x[1], reverse=True)
best_tps = sorted_tps[0][1]
second_tps = sorted_tps[1][1] if len(sorted_tps) > 1 else 0
if best_tps > 0 and second_tps > 0 and best_tps > second_tps * 1.15:
winner_tps = sorted_tps[0][0]
else:
winner_tps = "Tie"
cols_tps = []
for lt in lane_tps:
tps = lt[1]["tps"]
cols_tps.append(f"{tps:.1f} t/s ({lt[1]['detail'][:25]})")
while len(cols_tps) < 3:
cols_tps.append("--")
cols_tps.append(winner_tps)
results.append(tuple(["Tokens/sec"] + cols_tps))
def _show():
for row in results:
self._results_tree.insert("", "end", values=row)
self._status_var.set("Benchmark complete.")
self._running = False
self._run_btn.configure(state="normal")
self._dlg.after(0, _show)
# ═══════════════════════════════════════════════════════════════════════
# Main Launcher Window
# ═══════════════════════════════════════════════════════════════════════
class LauncherWin:
def __init__(self, root):
self._root = root
self._proc = None
self._endpoints_data = load_endpoints()
self._refresh_running = False
recover_config_if_needed()
main = ttk.Frame(root, padding=16)
main.pack(fill="both", expand=True)
main.pack_propagate(False)
# Title
hdr = ttk.Frame(main)
hdr.pack(fill="x")
ttk.Label(hdr, text=f"Codex Launcher v{CHANGELOG[0][0]}", font=("Segoe UI", 13, "bold")).pack(side="left")
# Toolbar — two rows to fit all buttons
tb1 = ttk.Frame(main)
tb1.pack(fill="x", pady=(6, 0))
ttk.Button(tb1, text="Endpoints...", command=self._open_mgr).pack(side="left")
ttk.Button(tb1, text="AI Monitor", command=self._open_monitoring).pack(side="left", padx=(6, 0))
ttk.Button(tb1, text="AI BGP", command=self._open_bgp).pack(side="left", padx=(6, 0))
ttk.Button(tb1, text="Usage", command=self._open_usage).pack(side="left", padx=(6, 0))
ttk.Button(tb1, text="Benchmark", command=self._open_benchmark).pack(side="left", padx=(6, 0))
ttk.Button(tb1, text="History", command=self._open_history).pack(side="left", padx=(6, 0))
ttk.Button(tb1, text="OAuth Secrets", command=self._edit_oauth_secrets).pack(side="left", padx=(6, 0))
ttk.Button(tb1, text="Changelog", command=self._show_changelog).pack(side="right")
# Detection status — one row per item so long paths don't truncate
self._cli_info = detect_codex_cli()
self._desktop_info = detect_codex_desktop()
cli_row = ttk.Frame(main)
cli_row.pack(fill="x", pady=(4, 0))
if self._cli_info:
cli_path, cli_ver = self._cli_info
ttk.Label(cli_row, text=f"✓ Codex CLI {cli_ver}", foreground="#2ea043").pack(side="left")
ttk.Label(cli_row, text=f" ({cli_path})", foreground="gray").pack(side="left")
else:
ttk.Label(cli_row, text="✗ Codex CLI -- not found", foreground="#d29922").pack(side="left")
ttk.Button(cli_row, text="Install", command=lambda: self._show_install_guide("cli")).pack(side="left", padx=(6, 0))
desk_row = ttk.Frame(main)
desk_row.pack(fill="x", pady=(2, 0))
if self._desktop_info:
ttk.Label(desk_row, text="✓ Codex Desktop", foreground="#2ea043").pack(side="left")
ttk.Label(desk_row, text=f" ({self._desktop_info})", foreground="gray").pack(side="left")
else:
ttk.Label(desk_row, text="✗ Codex Desktop -- not found", foreground="#d29922").pack(side="left")
ttk.Button(desk_row, text="Install", command=lambda: self._show_install_guide("desktop")).pack(side="left", padx=(6, 0))
self._missing = []
if not self._cli_info:
self._missing.append("cli")
if not self._desktop_info:
self._missing.append("desktop")
# Auth status
auth_frame = ttk.Frame(main)
auth_frame.pack(fill="x", pady=(6, 0))
self._auth_label = ttk.Label(auth_frame, text="Checking auth...")
self._auth_label.pack(side="left")
self._relogin_btn = ttk.Button(auth_frame, text="Re-login", command=self._codex_relogin, state="disabled")
self._relogin_btn.pack(side="right")
threading.Thread(target=self._check_auth_async, daemon=True).start()
# Ops bar
ops_frame = ttk.Frame(main)
ops_frame.pack(fill="x", pady=(6, 0))
self._refresh_all_btn = ttk.Button(ops_frame, text="Refresh Models", command=self._refresh_all_models)
self._refresh_all_btn.pack(side="left")
ttk.Button(ops_frame, text="Backup Profile", command=self._backup_profile).pack(side="left", padx=(8, 0))
ttk.Button(ops_frame, text="Import Profile", command=self._import_profile).pack(side="left", padx=(8, 0))
# Endpoint + Model selectors
sel_frame = ttk.Frame(main)
sel_frame.pack(fill="x", pady=(6, 0))
ttk.Label(sel_frame, text="Endpoint:").pack(side="left")
self._combo_ep = ttk.Combobox(sel_frame, state="readonly", width=24)
self._combo_ep.pack(side="left", padx=(4, 0))
self._combo_ep.bind("<<ComboboxSelected>>", lambda e: self._on_endpoint_changed())
ttk.Label(sel_frame, text="Model:").pack(side="left", padx=(12, 0))
self._combo_model = ttk.Combobox(sel_frame, state="readonly", width=24)
self._combo_model.pack(side="left", padx=(4, 0))
# Launch buttons
btn_frame1 = ttk.Frame(main)
btn_frame1.pack(fill="x", pady=(8, 0))
self._btn_desktop = ttk.Button(btn_frame1, text="Launch Desktop", command=lambda: self._launch("desktop"))
if "desktop" in self._missing:
self._btn_desktop.configure(state="disabled")
self._btn_desktop.pack(side="left", fill="x", expand=True, padx=(0, 4))
self._btn_cli = ttk.Button(btn_frame1, text="Launch CLI", command=lambda: self._launch("cli"))
if "cli" in self._missing:
self._btn_cli.configure(state="disabled")
self._btn_cli.pack(side="left", fill="x", expand=True)
btn_frame2 = ttk.Frame(main)
btn_frame2.pack(fill="x", pady=(4, 0))
self._btn_codex_desktop = ttk.Button(btn_frame2, text="Codex Default (Desktop)",
command=lambda: self._launch_codex_default("desktop"))
if "desktop" in self._missing:
self._btn_codex_desktop.configure(state="disabled")
self._btn_codex_desktop.pack(side="left", fill="x", expand=True, padx=(0, 4))
self._btn_codex_cli = ttk.Button(btn_frame2, text="Codex Default (CLI)",
command=lambda: self._launch_codex_default("cli"))
if "cli" in self._missing:
self._btn_codex_cli.configure(state="disabled")
self._btn_codex_cli.pack(side="left", fill="x", expand=True)
# Log area
self._log_text = scrolledtext.ScrolledText(main, height=10, state="disabled", wrap="word",
font=("Consolas", 9))
self._log_text.pack(fill="both", expand=True, pady=(8, 0))
# Bottom bar
bb = ttk.Frame(main)
bb.pack(fill="x", pady=(6, 0))
ttk.Button(bb, text="Clear Log", command=self._clear_log).pack(side="left")
self._restart_btn = ttk.Button(bb, text="Restart Proxy", command=self._restart_proxy, state="disabled")
self._restart_btn.pack(side="left", padx=(4, 0))
ttk.Button(bb, text="AI Assistant", command=self._open_assistant).pack(side="left", padx=(4, 0))
self._kill_btn = ttk.Button(bb, text="Kill && Cleanup", command=self._kill, state="disabled")
self._kill_btn.pack(side="left", fill="x", expand=True, padx=(8, 0))
ttk.Button(bb, text="View Log", command=self._open_proxy_log_dir).pack(side="left")
ttk.Button(bb, text="Close", command=self._do_close).pack(side="left", padx=(8, 0))
self._rebuild_combo()
self._log_dependency_status()
self._start_watcher()
# ── Logging ──────────────────────────────────────────────────────
def log(self, msg):
self._root.after(0, self._append_log, msg)
def _append_log(self, msg):
self._log_text.configure(state="normal")
self._log_text.insert("end", msg + "\n")
self._log_text.see("end")
self._log_text.configure(state="disabled")
def _clear_log(self):
self._log_text.configure(state="normal")
self._log_text.delete("1.0", "end")
self._log_text.configure(state="disabled")
def _restart_proxy(self):
self._kill()
ep_name = load_endpoints().get("default")
if not ep_name:
self.log("No default endpoint set.")
return
for ep in load_endpoints().get("endpoints", []):
if ep.get("name") == ep_name:
time.sleep(0.3)
start_proxy_for(ep, self.log)
self.log(f"Proxy restarted for {ep_name}")
return
self.log(f"Endpoint '{ep_name}' not found.")
def _log_dependency_status(self):
if self._cli_info:
_, ver = self._cli_info
self.log(f"✓ Codex CLI detected ({ver})")
else:
self.log("✗ Codex CLI NOT found -- CLI launch disabled.")
if self._desktop_info:
self.log(f"✓ Codex Desktop detected ({self._desktop_info})")
else:
self.log("✗ Codex Desktop NOT found -- Desktop launch disabled.")
if self._missing:
self.log("Install missing tools before using the launcher.")
else:
self.log("All dependencies OK.")
# ── Auth ─────────────────────────────────────────────────────────
def _check_auth_async(self):
status, msg = check_codex_auth()
self._root.after(0, lambda: self._update_auth_status(status, msg))
def _update_auth_status(self, status, msg):
if status == "logged_in":
self._auth_label.configure(text=f"✓ Auth: {msg}", foreground="#2ea043")
self._relogin_btn.configure(state="normal" if "cli" not in self._missing else "disabled")
elif status == "not_installed":
self._auth_label.configure(text="Auth: N/A (CLI not installed)", foreground="#888")
else:
self._auth_label.configure(text=f"⚠ Auth: {msg}", foreground="#d29922")
self._relogin_btn.configure(state="normal" if "cli" not in self._missing else "disabled")
def _codex_relogin(self):
self.log("Opening codex login in terminal...")
term = detect_terminal()
if not term:
self.log("ERROR: no terminal emulator found for re-login")
return
term_name, term_args, term_path = term
cmd_parts = [term_name] + term_args + ["codex", "login"]
if IS_WINDOWS:
subprocess.Popen(cmd_parts, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
else:
subprocess.Popen(cmd_parts, preexec_fn=os.setsid)
self.log("Login flow started in terminal. Re-checking auth in 30s...")
self._auth_label.configure(text="Auth: waiting for login...")
threading.Thread(target=lambda: (time.sleep(30), self._check_auth_async()), daemon=True).start()
# ── Combo management ─────────────────────────────────────────────
def _rebuild_combo(self):
self._endpoints_data = load_endpoints()
ep_names = [e["name"] for e in self._endpoints_data["endpoints"]]
bgp_names = [f"\U0001F500 {p['name']}" for p in load_bgp_pools().get("pools", [])]
all_names = ep_names + bgp_names
self._combo_ep["values"] = all_names
if all_names:
default = self._endpoints_data.get("default")
if default and default in ep_names:
self._combo_ep.set(default)
else:
self._combo_ep.set(all_names[0])
self._on_endpoint_changed()
def _on_endpoint_changed(self):
name = self._combo_ep.get()
is_bgp = name.startswith("\U0001F500 ")
bgp_name = name[2:] if is_bgp else None
ep = get_endpoint(name) if name and not is_bgp else None
models = []
if is_bgp:
for p in load_bgp_pools().get("pools", []):
if p["name"] == bgp_name:
seen = set()
for r in p.get("routes", []):
m = r.get("model", "")
if m and m not in seen:
models.append(m)
seen.add(m)
break
elif ep:
models = ep.get("models", [])
self._combo_model["values"] = models
if ep and ep.get("default_model") in models:
self._combo_model.set(ep["default_model"])
elif models:
self._combo_model.set(models[0])
else:
self._combo_model.set("")
# ── Window openers ───────────────────────────────────────────────
def _on_endpoints_updated(self):
self._rebuild_combo()
def _open_mgr(self):
EndpointMgr(self._root, on_update=self._on_endpoints_updated)
def _open_bgp(self):
BGPPoolMgr(self._root, on_update=self._on_endpoints_updated)
def _open_monitoring(self):
AIMonitoringWindow(self._root)
def _open_usage(self):
UsageWindow(self._root)
def _open_history(self):
RequestHistoryWindow(self._root)
def _open_benchmark(self):
BenchmarkWindow(self._root)
def _open_proxy_log_dir(self):
log_dir = str(PROXY_CONFIG_DIR)
req_log = PROXY_CONFIG_DIR / "requests.log"
if IS_WINDOWS:
if req_log.exists():
os.startfile(str(req_log))
else:
os.startfile(log_dir)
else:
import subprocess as _sp
_sp.Popen(["xdg-open", log_dir])
def _open_assistant(self):
assist_path = str(Path(__file__).resolve().parent / "flet-codex-assist.py")
if Path(assist_path).exists():
subprocess.Popen([sys.executable, assist_path], creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if IS_WINDOWS else 0)
def _edit_oauth_secrets(self):
data = load_oauth_secrets()
if not data:
data = {"antigravity": {"client_id": "", "client_secret": ""},
"gemini_cli": {"client_id": "", "client_secret": ""}}
dlg = tk.Toplevel(self._root)
dlg.title("OAuth 2.0 Client Secrets")
dlg.geometry("600x450")
dlg.transient(self._root)
dlg.grab_set()
frame = ttk.Frame(dlg, padding=16)
frame.pack(fill="both", expand=True)
ttk.Label(frame, text="Google OAuth 2.0 credentials", font=("Segoe UI", 10, "bold")).pack(anchor="w")
ttk.Label(frame, text=f"Stored locally in {OAUTH_SECRETS_PATH}", foreground="gray").pack(anchor="w", pady=(0, 8))
fields = {}
nf = ttk.Frame(frame)
nf.pack(fill="x")
row = 0
for section_key, section_label in [("antigravity", "Antigravity (CloudCode)"), ("gemini_cli", "Gemini CLI")]:
ttk.Label(nf, text=f"\n{section_label}", font=("Segoe UI", 9, "bold")).grid(row=row, column=0, columnspan=3, sticky="w", pady=(8, 2))
row += 1
sec = data.get(section_key, {})
import_btn = ttk.Button(nf, text="Import JSON",
command=lambda sk=section_key: self._import_oauth_json(fields, sk))
import_btn.grid(row=row, column=2, padx=(4, 0), pady=2, sticky="e")
for fk, fl in [("client_id", "Client ID"), ("client_secret", "Client Secret")]:
ttk.Label(nf, text=fl + ":").grid(row=row, column=0, sticky="w", padx=(8, 4), pady=2)
entry = ttk.Entry(nf, width=60)
entry.insert(0, sec.get(fk, ""))
entry.grid(row=row, column=1, sticky="ew", pady=2)
if fk == "client_secret":
entry.configure(show="*")
fields[(section_key, fk)] = entry
row += 1
nf.columnconfigure(1, weight=1)
ttk.Label(frame, text="\nImport a client_secret_*.json from Google Cloud Console\nconsole.cloud.google.com → Credentials", foreground="gray").pack(anchor="w")
btnf = ttk.Frame(frame)
btnf.pack(fill="x", pady=(12, 0))
ttk.Button(btnf, text="Cancel", command=dlg.destroy).pack(side="right", padx=(4, 0))
save_btn = ttk.Button(btnf, text="Save")
save_btn.pack(side="right", padx=(4, 0))
def _save():
for (sk, fk), entry in fields.items():
if sk not in data:
data[sk] = {}
data[sk][fk] = entry.get().strip()
try:
save_oauth_secrets(data)
except Exception as e:
messagebox.showerror("Save failed", str(e), parent=dlg)
return
dlg.destroy()
save_btn.configure(command=_save)
def _import_oauth_json(self, fields, section_key):
path = filedialog.askopenfilename(
title="Import Google OAuth Client Secret JSON",
filetypes=[("JSON files", "*.json")])
if not path:
return
try:
with open(path, encoding="utf-8") as f:
raw = json.load(f)
creds = raw.get("installed") or raw.get("web") or raw
cid = creds.get("client_id", "")
csec = creds.get("client_secret", "")
if not cid or not csec:
raise ValueError("JSON does not contain client_id and client_secret")
if (section_key, "client_id") in fields:
fields[(section_key, "client_id")].delete(0, "end")
fields[(section_key, "client_id")].insert(0, cid)
if (section_key, "client_secret") in fields:
fields[(section_key, "client_secret")].delete(0, "end")
fields[(section_key, "client_secret")].insert(0, csec)
except Exception as e:
messagebox.showerror("Import failed", str(e))
# ── Watcher ──────────────────────────────────────────────────────
def _start_watcher(self):
cfg = load_monitoring_config()
if not cfg.get("enabled"):
return
self._watcher = HealthWatcher(
on_failure=lambda c: self.log(f"[AI Monitor] Proxy unresponsive (failures={c})"),
on_recovery=lambda: self.log("[AI Monitor] Proxy recovered"),
on_signal=lambda fid, cat, line: None,
on_action=self._on_watcher_action,
)
self._watcher.start()
self.log("AI Monitoring: watchdog started")
def _on_watcher_action(self, action, trigger):
cfg = load_monitoring_config()
if action == "restart_proxy" and cfg.get("auto_restart_proxy"):
self.log(f"[AI Monitor] Auto-restarting proxy (trigger: {trigger})")
self._root.after(0, self._restart_proxy_from_watcher)
elif action in ("clear_schema_cache", "delete_provider_caps"):
try:
cap_file = PROXY_CONFIG_DIR / "provider-caps.json"
if cap_file.exists():
cap_file.unlink()
self.log("[AI Monitor] Cleared corrupt schema cache")
except Exception as e:
self.log(f"[AI Monitor] Failed to clear cache: {e}")
elif action == "kill_stale_restart":
self.log(f"[AI Monitor] Killing stale processes + restarting (trigger: {trigger})")
self._kill()
self._root.after(0, self._restart_proxy_from_watcher)
else:
self.log(f"[AI Monitor] Alert: {action} (trigger: {trigger})")
def _restart_proxy_from_watcher(self):
try:
ep_name = load_endpoints().get("default")
if not ep_name:
return
for ep in load_endpoints().get("endpoints", []):
if ep.get("name") == ep_name:
start_proxy_for(ep, self.log)
break
except Exception as e:
self.log(f"[AI Monitor] Proxy restart failed: {e}")
# ── Profile operations ───────────────────────────────────────────
def _backup_profile(self):
filename = filedialog.asksaveasfilename(
title="Backup Codex Profile",
defaultextension=".json",
initialfile=f"codex-profile-{time.strftime('%Y%m%d-%H%M%S')}.json",
filetypes=[("JSON files", "*.json"), ("All files", "*.*")],
)
if not filename:
return
try:
save_profile_bundle(filename)
self.log(f"Profile backed up to {filename}")
except Exception as e:
messagebox.showerror("Backup Failed", str(e))
def _refresh_all_models(self):
if self._refresh_running:
return
self._refresh_running = True
self._refresh_all_btn.configure(state="disabled")
self.log("Refreshing models for all providers...")
threading.Thread(target=self._refresh_all_models_worker, daemon=True).start()
def _refresh_all_models_worker(self):
try:
data = load_endpoints()
updated = 0
failed = []
for idx, ep in enumerate(list(data["endpoints"])):
refreshed, err = refresh_endpoint_models(ep)
if refreshed:
data["endpoints"][idx] = refreshed
updated += 1
else:
failed.append(f"{ep['name']}: {err}")
if updated:
save_endpoints(data)
self._root.after(0, lambda: self._finish_refresh(updated, failed))
except Exception as e:
self._root.after(0, lambda: self._finish_refresh_error(str(e)))
def _finish_refresh(self, updated, failed):
if updated:
self._rebuild_combo()
self.log(f"Refreshed models for {updated} provider(s)")
if failed:
messagebox.showwarning("Refresh", "Some providers could not auto-fetch models.\n\n" +
"\n".join(failed))
elif updated:
messagebox.showinfo("Refresh", f"Refreshed models for {updated} provider(s).")
else:
messagebox.showinfo("Refresh", "No providers were refreshed.")
self._refresh_running = False
self._refresh_all_btn.configure(state="normal")
def _finish_refresh_error(self, err):
messagebox.showerror("Refresh Failed", err)
self._refresh_running = False
self._refresh_all_btn.configure(state="normal")
def _import_profile(self):
if self._proc and self._proc.poll() is None:
messagebox.showwarning("Import", "Stop Codex before importing a profile.")
return
filename = filedialog.askopenfilename(
title="Import Codex Profile",
filetypes=[("JSON files", "*.json"), ("All files", "*.*")],
)
if not filename:
return
if not messagebox.askyesno("Import",
"Importing will replace the current endpoints and Codex config. Continue?"):
return
try:
import_profile_bundle(filename)
self._rebuild_combo()
self.log(f"Profile imported from {filename}")
messagebox.showinfo("Import", "Profile imported successfully.")
except Exception as e:
messagebox.showerror("Import Failed", str(e))
# ── Dialogs ──────────────────────────────────────────────────────
def _show_changelog(self):
dlg = tk.Toplevel(self._root)
dlg.title("Changelog")
dlg.geometry("540x480")
dlg.transient(self._root)
text = scrolledtext.ScrolledText(dlg, wrap="word", font=("Segoe UI", 9))
text.pack(fill="both", expand=True, padx=12, pady=12)
for ver, date, items in CHANGELOG:
text.insert("end", f"v{ver} ({date})\n")
for item in items:
text.insert("end", f"{item}\n")
text.insert("end", "\n")
text.configure(state="disabled")
ttk.Button(dlg, text="Close", command=dlg.destroy).pack(pady=(0, 10))
def _show_install_guide(self, which):
if which == "cli":
guide = ("Codex CLI is required to use CLI launch features.\n\n"
"Install with npm:\n npm install -g @openai/codex\n\n"
"Or download from:\n https://github.com/openai/codex\n\n"
"After installing, restart the launcher.")
else:
guide = ("Codex Desktop is required to use Desktop launch features.\n\n"
"Download from:\n https://codex.desktop.openai.com\n\n"
"After installing, restart the launcher.")
messagebox.showinfo(f"Install Codex {which.title()}", guide)
# ── Launch ───────────────────────────────────────────────────────
def _set_busy(self, busy):
has_cli = "cli" not in self._missing
has_desk = "desktop" not in self._missing
def _update():
self._btn_desktop.configure(state="disabled" if busy or not has_desk else "normal")
self._btn_cli.configure(state="disabled" if busy or not has_cli else "normal")
self._btn_codex_desktop.configure(state="disabled" if busy or not has_desk else "normal")
self._btn_codex_cli.configure(state="disabled" if busy or not has_cli else "normal")
self._kill_btn.configure(state="normal" if busy else "disabled")
self._restart_btn.configure(state="normal" if busy else "disabled")
self._root.after(0, _update)
def _launch(self, target):
name = self._combo_ep.get()
if not name:
self.log("ERROR: no endpoint selected")
return
model = self._combo_model.get()
if not model:
self.log("ERROR: no model selected")
return
is_bgp = name.startswith("\U0001F500 ")
if is_bgp:
pool_name = name[2:]
pool = None
for p in load_bgp_pools().get("pools", []):
if p["name"] == pool_name:
pool = p
break
if not pool:
self.log(f"ERROR: BGP pool '{pool_name}' not found")
return
self._set_busy(True)
target_name = "Desktop" if target == "desktop" else "CLI"
self.log(f"=== BGP: {pool_name} / {model} -> {target_name} ===")
threading.Thread(target=self._run_bgp, args=(pool, model, target), daemon=True).start()
return
ep = get_endpoint(name)
if not ep:
self.log("ERROR: endpoint not found")
return
self._set_busy(True)
target_name = "Desktop" if target == "desktop" else "CLI"
self.log(f"=== {ep['name']} / {model} -> {target_name} ===")
threading.Thread(target=self._run, args=(ep, model, target), daemon=True).start()
def _launch_codex_default(self, target):
if "cli" not in self._missing:
status, msg = check_codex_auth()
if status != "logged_in":
if not messagebox.askyesno("Auth Warning",
f"Codex auth check: {msg}\n\n"
"Launch may fail without valid authentication.\nContinue anyway?"):
self._set_busy(False)
return
self._set_busy(True)
target_name = "Desktop" if target == "desktop" else "CLI"
self.log(f"=== Codex Default (OAuth) -> {target_name} ===")
threading.Thread(target=self._run_codex_default, args=(target,), daemon=True).start()
def _run(self, ep, model, target):
keep_session_alive = False
try:
self.log("Cleaning up stale processes...")
safe_cleanup_owned(self.log)
recover_config_if_needed(self.log)
needs_proxy = ep["backend_type"] != "native"
if needs_proxy:
self.log("Starting translation proxy...")
try:
proxy_port = start_proxy_for(ep, self.log)
except RuntimeError as e:
self._root.after(0, lambda: messagebox.showerror("Proxy Failed", str(e)))
return
self.log(f"Configuring Codex for {ep['name']} (proxied on :{proxy_port})...")
begin_config_transaction(f"launch:{ep['name']}")
write_config_for_translated(ep, model, proxy_port)
else:
self.log(f"Configuring Codex for {ep['name']} (native)...")
begin_config_transaction(f"launch:{ep['name']}")
write_config_for_native(ep, model)
if target == "desktop":
if needs_proxy:
kill_existing_desktop(self.log)
keep_session_alive = self._launch_desktop(ep, model)
else:
self._launch_cli(ep, model)
except Exception as e:
self.log(f"ERROR: {e}")
finally:
if keep_session_alive:
self.log("Warm-start handoff detected; keeping proxy/config active for running Desktop.")
self._set_busy(False)
self.log("Ready. Use Kill && Cleanup when finished.")
else:
stop_proxy()
restore_config()
end_config_transaction()
self._set_busy(False)
self.log("Ready.")
def _run_bgp(self, pool, model, target):
keep_session_alive = False
try:
self.log("Cleaning up stale processes...")
safe_cleanup_owned(self.log)
recover_config_if_needed(self.log)
self.log(f"Starting BGP proxy with {len(pool.get('routes', []))} routes...")
port, bgp_ep = start_bgp_proxy(pool, model, self.log)
begin_config_transaction(f"launch:bgp:{pool['name']}")
write_config_for_translated(bgp_ep, model, port)
if target == "desktop":
kill_existing_desktop(self.log)
keep_session_alive = self._launch_desktop(bgp_ep, model)
else:
self._launch_cli(bgp_ep, model)
except Exception as e:
self.log(f"ERROR: {e}")
finally:
if keep_session_alive:
self.log("Warm-start handoff detected; keeping proxy/config active.")
self._set_busy(False)
self.log("Ready. Use Kill && Cleanup when finished.")
else:
stop_proxy()
restore_config()
end_config_transaction()
self._set_busy(False)
self.log("Ready.")
def _run_codex_default(self, target):
try:
self.log("Cleaning up stale processes...")
safe_cleanup_owned(self.log)
stop_proxy()
recover_config_if_needed(self.log)
self.log("Resetting config to Codex defaults (OAuth)...")
begin_config_transaction("launch:default")
if CONFIG.exists():
CONFIG.unlink()
if target == "desktop":
self._launch_desktop_direct()
else:
self._launch_cli_default()
except Exception as e:
self.log(f"ERROR: {e}")
finally:
restore_config()
end_config_transaction()
self._set_busy(False)
self.log("Ready.")
def _launch_desktop(self, ep, model):
desktop_path = self._desktop_info
if not desktop_path:
self.log("ERROR: Codex Desktop not found")
return False
if IS_WINDOWS:
self._proc = subprocess.Popen(
[desktop_path],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
else:
self._proc = subprocess.Popen(
[desktop_path], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
preexec_fn=os.setsid)
pid = self._proc.pid
self.log(f"Desktop started (PID {pid})")
self.log(f"Log: {LAUNCH_LOG}")
t0 = time.time()
stall_warned = False
while self._proc and self._proc.poll() is None:
time.sleep(1.5)
el = time.time() - t0
if el > 20 and not stall_warned:
self.log("Still starting after 20s -- possible stall. Click Kill if window doesn't appear.")
self.log(f"--- last log lines ---\n{last_log_lines()}")
stall_warned = True
if self._proc:
rc = self._proc.poll()
el = time.time() - t0
self.log(f"Desktop exited (code {rc}) after {el:.0f}s")
if el < 12:
self.log("TIP: Quick exit -- may be warm-start handoff (normal) or crash.")
last_lines = last_log_lines()
self.log(f"--- last log lines ---\n{last_lines}")
if rc == 0 and "warm-start" in last_lines.lower():
self._proc = None
return True
self._proc = None
return False
def _launch_cli(self, ep, model):
self.log(f"Launching Codex CLI with {ep['name']}...")
term = detect_terminal()
if not term:
self.log("ERROR: no terminal found")
return
term_name, term_args, _ = term
cmd_parts = [term_name] + term_args
if ep["backend_type"] == "native":
cmd_parts.extend(["codex", "-c", f"model={model}"])
else:
cmd_parts.extend(["codex", "--profile", ep["name"], "-c", f"model={model}"])
self.log(f"Running: {' '.join(cmd_parts)}")
if IS_WINDOWS:
self._proc = subprocess.Popen(cmd_parts, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
else:
self._proc = subprocess.Popen(cmd_parts, preexec_fn=os.setsid)
pid = self._proc.pid
self.log(f"CLI started in terminal (PID {pid})")
while self._proc and self._proc.poll() is None:
time.sleep(1.5)
if self._proc:
rc = self._proc.poll()
self.log(f"CLI exited (code {rc})")
self._proc = None
def _launch_desktop_direct(self):
self.log("Launching Codex Desktop (default OAuth)...")
desktop_path = self._desktop_info
if not desktop_path:
self.log("ERROR: Codex Desktop not found")
return
if IS_WINDOWS:
self._proc = subprocess.Popen(
[desktop_path], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
else:
self._proc = subprocess.Popen(
[desktop_path], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
preexec_fn=os.setsid)
pid = self._proc.pid
self.log(f"Desktop started (PID {pid})")
t0 = time.time()
stall_warned = False
while self._proc and self._proc.poll() is None:
time.sleep(1.5)
el = time.time() - t0
if el > 20 and not stall_warned:
self.log("Still starting after 20s -- possible stall.")
self.log(f"--- last log lines ---\n{last_log_lines()}")
stall_warned = True
if self._proc:
rc = self._proc.poll()
el = time.time() - t0
self.log(f"Desktop exited (code {rc}) after {el:.0f}s")
self._proc = None
def _launch_cli_default(self):
self.log("Launching Codex CLI (default OAuth)...")
term = detect_terminal()
if not term:
self.log("ERROR: no terminal found")
return
term_name, term_args, _ = term
cmd_parts = [term_name] + term_args + ["codex"]
self.log(f"Running: {' '.join(cmd_parts)}")
if IS_WINDOWS:
self._proc = subprocess.Popen(cmd_parts, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
else:
self._proc = subprocess.Popen(cmd_parts, preexec_fn=os.setsid)
pid = self._proc.pid
self.log(f"CLI started in terminal (PID {pid})")
while self._proc and self._proc.poll() is None:
time.sleep(1.5)
if self._proc:
rc = self._proc.poll()
self.log(f"CLI exited (code {rc})")
self._proc = None
# ── Kill ─────────────────────────────────────────────────────────
def _kill(self):
self.log("=== Killing ===")
if self._proc and self._proc.poll() is None:
try:
if IS_WINDOWS:
subprocess.run(["taskkill", "/F", "/T", "/PID", str(self._proc.pid)],
capture_output=True, timeout=10)
else:
import signal as sig
pgid = os.getpgid(self._proc.pid)
os.killpg(pgid, sig.SIGTERM)
time.sleep(1)
if self._proc.poll() is None:
os.killpg(pgid, sig.SIGKILL)
except (ProcessLookupError, PermissionError):
pass
self._proc = None
stop_proxy()
safe_cleanup_owned(self.log)
restore_config()
end_config_transaction()
LOG_DIR.mkdir(parents=True, exist_ok=True)
if LAUNCH_LOG.exists():
try:
LAUNCH_LOG.unlink()
except Exception:
pass
self.log("Cleanup complete")
self._set_busy(False)
self.log("Ready.")
def _do_close(self):
if self._proc and self._proc.poll() is None:
if not messagebox.askyesno("Confirm", "Codex is still running. Kill it?"):
return
self._kill()
stop_proxy()
self._root.destroy()
# ═══════════════════════════════════════════════════════════════════════
# Entry point
# ═══════════════════════════════════════════════════════════════════════
if __name__ == "__main__":
ensure_dirs()
create_default_endpoints()
root = tk.Tk()
root.title("Codex Launcher")
root.geometry("800x680")
root.minsize(640, 520)
app = LauncherWin(root)
root.mainloop()