Files
Codex-Launcher---Any-AI-Por…/src/codex-launcher-gui
Roman 8343837b3c v2.6.1: rebuild Google OAuth to emulate Gemini CLI
- Uses Google's public OAuth client_id (no client_secret.json needed)
- PKCE + CSRF state protection for secure auth
- Scopes: cloud-platform, generative-language, userinfo
- Just click OAuth Login -> browser -> authorize -> done
- Zero setup required
2026-05-20 17:38:08 +04:00

2701 lines
110 KiB
Python
Executable File

#!/usr/bin/env python3
"""Codex Launcher GUI — manage endpoints, launch Desktop or CLI with any provider."""
import gi
gi.require_version("Gtk", "3.0")
from gi.repository import Gtk, GLib
import subprocess, os, signal, sys, threading, time, json, urllib.request, tempfile, shutil
import hashlib
from pathlib import Path
HOME = Path.home()
START_SH = Path("/opt/codex-desktop/start.sh")
CONFIG = HOME / ".codex/config.toml"
CONFIG_BAK = HOME / ".codex/config.toml.launcher-bak"
CLEANUP = HOME / ".local/bin/cleanup-codex-stale.sh"
PROXY = HOME / ".local/bin/translate-proxy.py"
ENDPOINTS_FILE = HOME / ".codex/endpoints.json"
BGP_POOLS_FILE = HOME / ".codex/bgp-pools.json"
LOG_DIR = HOME / ".cache/codex-desktop"
LAUNCH_LOG = LOG_DIR / "launcher.log"
PROXY_CONFIG_DIR = HOME / ".cache/codex-proxy"
DEFAULT_CONFIG = """model = ""
model_provider = ""
model_catalog_json = ""
"""
CHANGELOG = [
("2.6.1", "2026-05-20", [
"Google OAuth rebuilt to emulate Gemini CLI — no client_secret.json needed",
"Uses Google's public OAuth client_id (same as gemini-cli)",
"PKCE + CSRF state protection for secure auth",
"Just click OAuth Login → browser opens → authorize → done",
"Includes cloud-platform scope for Gemini Code Assist compatibility",
]),
("2.6.0", "2026-05-20", [
"Usage Dashboard — per-provider request/token/latency tracking",
"Visual cards with success rate bars, model breakdown, error tracking",
"Google OAuth: browse for client_secret.json instead of fixed path",
]),
("2.5.1", "2026-05-20", [
"Adaptive retry for 429/502/503 errors with exponential backoff",
"BGP routes also retry transient errors before failing over",
"Proxy socket reuse — no more 'Address already in use' crashes",
"BGP route count shown at proxy startup",
]),
("2.5.0", "2026-05-20", [
"AI BGP — multi-provider routing with automatic failover",
"Create BGP pools with ordered routes from any configured endpoint",
"Each route uses its own endpoint URL, API key, and model",
"Failover strategy: tries primary, falls back on error/timeout",
"BGP pools appear in endpoint dropdown with shuffle icon",
"Up/down reordering for route priority in pool editor",
"Fixed TOML config breakage from multi-line paste in fields",
]),
("2.4.0", "2026-05-20", [
"Added OpenAdapter provider preset (api.openadapter.in)",
"One API key access to 40+ models — GLM, DeepSeek, Kimi, Qwen, Claude, GPT, Gemini",
"Fixed Add/Edit dialog crash (missing _on_reasoning_toggled method)",
"Redesigned Google OAuth flow with live status dialog",
]),
("2.3.2", "2026-05-20", [
"Added Google Gemini provider with OAuth support",
"Two presets: 'Google Gemini (API Key)' and 'Google Gemini (OAuth)'",
"OAuth Login button in endpoint editor — full Google OAuth2 flow with auto-refresh",
"Auto-refreshes OAuth access tokens when expired (no manual re-login needed)",
"Supports gemini-2.5-flash, gemini-2.5-pro, gemini-2.0-flash, and more",
"Uses Gemini's OpenAI-compatible endpoint — works with existing proxy",
]),
("2.3.0", "2026-05-20", [
"Adaptive Crof self-healing system — auto-adjusts to Crof model limits",
"Tracks per-model success/failure history, learns item count limits dynamically",
"Proactively compacts input when above learned limit before sending to Crof",
"Auto-retries on finish_reason=length — aggressively compacts and resends",
"Prevents 'stream disconnected' and 'incomplete' errors on long conversations",
]),
("2.2.1", "2026-05-20", [
"Fixed compaction orphaning function_call_output items — root cause of Crof incomplete responses",
"Compaction now respects function_call/function_call_output pairs — no more dangling tool results",
"Fixed reasoning control: reasoning_effort=none now always sends enable_thinking=false too",
]),
("2.2.0", "2026-05-20", [
"Added per-provider Reasoning On/Off toggle in endpoint editor",
"Added Reasoning Effort level per provider: None, Minimal, Low, Medium, High, Max",
"When reasoning is OFF: sends enable_thinking=false + reasoning_effort=none to upstream API",
"When reasoning is ON: sends user-selected effort level (default: Medium)",
"Fixes Crof mimo-v2.5-pro and similar reasoning models exhausting output tokens",
"Strip reasoning_content from proxy output — Codex doesn't use it",
"Force max_tokens=64000 minimum for openai-compat providers",
]),
("2.1.3", "2026-05-19", [
"Fixed Crof mimo-v2.5-pro stopping: reasoning_content exhausted all output tokens",
"Strip reasoning_content from proxy output — Codex doesn't use it, avoids token waste",
"Force max_tokens=64000 minimum for openai-compat providers — gives models room for both reasoning and content",
]),
("2.1.2", "2026-05-19", [
"Fixed Crof.ai and providers stopping after first tool call (root cause: None tool IDs)",
"Codex sends function_call items with id=None — proxy now matches tool results to calls by position",
"Fixed orphan message output item when response has only tool calls (no text)",
"Auto-trims long conversations (>30 items) to prevent context overflow on providers like Crof",
"Added request/response logging to ~/.cache/codex-proxy/requests.log",
]),
("2.1.1", "2026-05-19", [
"Fixed proxy: map 'developer' role to 'system' for Chat Completions providers",
"Fixed proxy: map 'developer' role to 'user' for Anthropic providers",
"Forward 'instructions' field from Responses API as system message/param",
"Fixes DeepSeek and other providers rejecting unknown 'developer' role",
]),
("2.1.0", "2026-05-19", [
"Added Codex auth status detection (codex login status)",
"Added Re-login button to re-authenticate via codex login",
"Auto-checks auth before launching Codex Default mode",
"Warns if OAuth token expired or missing before launch",
]),
("2.0.1", "2026-05-19", [
"Added Codex CLI/Desktop installation verifier to main page",
"Disables Desktop/CLI launch buttons when corresponding tool is missing",
"Shows install instructions in status area on startup",
]),
("2.0.0", "2026-05-19", [
"Initial release: multi-provider Codex Launcher",
"Translation proxy: Responses API to Chat Completions + Anthropic Messages",
"GTK endpoint manager with 10+ provider presets",
"Codex Default mode (built-in OAuth, zero config)",
"Browser UA injection for Cloudflare-protected providers (OpenCode)",
"Streaming SSE, tool calls, reasoning content support",
"Profile backup/import, model auto-fetch, bulk import",
"Refresh Models in background thread",
"URL normalization to prevent double-path bugs",
"Config backup/restore around sessions",
".deb installer package",
]),
]
PROVIDER_PRESETS = {
"Custom": {
"backend_type": "openai-compat",
"base_url": "",
"models": [],
},
"OpenAI": {
"backend_type": "native",
"base_url": "https://api.openai.com/v1",
"models": ["gpt-4o", "gpt-4o-mini"],
},
"Anthropic": {
"backend_type": "anthropic",
"base_url": "https://api.anthropic.com/v1",
"models": ["claude-sonnet-4-5", "claude-3-5-haiku-latest"],
},
"OpenCode Zen (OpenAI-compatible)": {
"backend_type": "openai-compat",
"base_url": "https://opencode.ai/zen/v1",
"models": [
"glm-5.1", "glm-5", "kimi-k2.5", "kimi-k2.6",
"minimax-m2.7", "minimax-m2.5", "minimax-m2.5-free",
"deepseek-v4-flash-free", "nemotron-3-super-free",
"qwen3.6-plus", "qwen3.5-plus", "big-pickle",
],
},
"OpenCode Zen (Anthropic)": {
"backend_type": "anthropic",
"base_url": "https://opencode.ai/zen/v1",
"models": [
"claude-opus-4-7", "claude-opus-4-6", "claude-opus-4-5",
"claude-opus-4-1", "claude-sonnet-4-6", "claude-sonnet-4-5",
"claude-sonnet-4", "claude-haiku-4-5", "claude-3-5-haiku",
],
},
"OpenCode Go (OpenAI-compatible)": {
"backend_type": "openai-compat",
"base_url": "https://opencode.ai/zen/go/v1",
"models": [
"glm-5.1", "glm-5", "kimi-k2.5", "kimi-k2.6",
"mimo-v2.5", "mimo-v2.5-pro", "minimax-m2.7", "minimax-m2.5",
"qwen3.6-plus", "qwen3.5-plus", "deepseek-v4-pro", "deepseek-v4-flash",
],
},
"OpenCode Go (Anthropic)": {
"backend_type": "anthropic",
"base_url": "https://opencode.ai/zen/go/v1",
"models": ["minimax-m2.7", "minimax-m2.5"],
},
"Crof.ai": {
"backend_type": "openai-compat",
"base_url": "https://crof.ai/v1",
"models": [],
},
"NVIDIA NIM": {
"backend_type": "openai-compat",
"base_url": "https://integrate.api.nvidia.com/v1",
"models": [],
},
"Kilo.ai Gateway": {
"backend_type": "openai-compat",
"base_url": "https://api.kilo.ai/api/gateway",
"models": [],
},
"Command Code": {
"backend_type": "command-code",
"base_url": "https://api.commandcode.ai",
"cc_version": "0.26.8",
"models": [
"deepseek/deepseek-v4-flash", "deepseek/deepseek-v4-pro",
"anthropic:claude-sonnet-4-6", "anthropic:claude-haiku-4-5-20251001",
"anthropic:claude-opus-4-7", "anthropic:claude-opus-4-6",
"openai:gpt-5.5", "openai:gpt-5.4", "openai:gpt-5.4-mini", "openai:gpt-5.3-codex",
"moonshotai/Kimi-K2.6", "moonshotai/Kimi-K2.5",
"zai-org/GLM-5.1", "zai-org/GLM-5",
"MiniMaxAI/MiniMax-M2.7", "MiniMaxAI/MiniMax-M2.5",
"Qwen/Qwen3.6-Max-Preview", "Qwen/Qwen3.6-Plus",
"stepfun/Step-3.5-Flash", "google/gemini-3.1-flash-lite",
],
},
"OpenRouter": {
"backend_type": "openai-compat",
"base_url": "https://openrouter.ai/api/v1",
"models": [],
},
"Google Gemini (API Key)": {
"backend_type": "openai-compat",
"base_url": "https://generativelanguage.googleapis.com/v1beta/openai",
"models": [
"gemini-2.5-flash", "gemini-2.5-pro",
"gemini-2.0-flash", "gemini-2.0-flash-lite",
"gemini-2.5-flash-preview-native-audio-dialog",
],
},
"Google Gemini (OAuth)": {
"backend_type": "openai-compat",
"base_url": "https://generativelanguage.googleapis.com/v1beta/openai",
"oauth_provider": "google",
"models": [
"gemini-2.5-flash", "gemini-2.5-pro",
"gemini-2.0-flash", "gemini-2.0-flash-lite",
"gemini-2.5-flash-preview-native-audio-dialog",
],
},
"OpenAdapter": {
"backend_type": "openai-compat",
"base_url": "https://api.openadapter.in/v1",
"models": [
"0G-DeepSeek-V3",
"0G-DeepSeek-v4-Pro",
"0G-GLM-5",
"0G-GLM-5.1",
"0G-Qwen3.6",
"0G-Qwen-VL",
],
},
}
def safe_name(name):
base = "".join(ch if ch.isalnum() or ch in "._-" else "_" for ch in name).strip("._-") or "endpoint"
digest = hashlib.sha1(name.encode("utf-8")).hexdigest()[:8]
return f"{base}-{digest}"
def label_for_backend(backend_type):
return {
"openai-compat": "OpenAI-compatible",
"anthropic": "Anthropic",
"command-code": "Command Code",
"native": "Native",
}.get(backend_type, backend_type)
def normalize_model_id(text):
value = text.strip().lower()
if not value:
return ""
value = value.replace("/", "-")
value = value.replace("+", "plus")
value = "".join(ch if ch.isalnum() or ch in ".-" else "-" for ch in value)
while "--" in value:
value = value.replace("--", "-")
return value.strip("-.")
def normalize_base_url(url):
base = (url or "").strip().rstrip("/")
for suffix in ("/chat/completions", "/responses", "/messages"):
if base.endswith(suffix):
base = base[: -len(suffix)]
break
return base.rstrip("/")
def parse_model_list(text):
out = []
seen = set()
for raw in text.replace(",", "\n").splitlines():
mid = normalize_model_id(raw)
if mid and mid not in seen:
seen.add(mid)
out.append(mid)
return out
def apply_provider_preset(endpoint, preset_name):
preset = PROVIDER_PRESETS.get(preset_name)
if not preset:
return endpoint
updated = dict(endpoint)
updated["provider_preset"] = preset_name
updated["backend_type"] = preset["backend_type"]
updated["base_url"] = normalize_base_url(preset["base_url"])
if preset.get("cc_version") and not updated.get("cc_version"):
updated["cc_version"] = preset["cc_version"]
if not updated.get("models"):
updated["models"] = list(preset.get("models", []))
if not updated.get("default_model") and updated.get("models"):
updated["default_model"] = updated["models"][0]
return updated
def endpoint_models_url(endpoint):
base = normalize_base_url(endpoint.get("base_url") or "")
if not base:
return ""
return f"{base}/models"
def endpoint_model_headers(endpoint):
key = (endpoint.get("api_key") or "").strip()
backend = endpoint.get("backend_type", "openai-compat")
headers = {}
if backend == "anthropic":
if key:
headers["x-api-key"] = key
headers["anthropic-version"] = "2023-06-01"
elif key:
headers["Authorization"] = f"Bearer {key}"
return headers
def fetch_models_for_endpoint(endpoint, timeout=10):
url = endpoint_models_url(endpoint)
if not url:
return None, "Base URL is empty"
try:
req = urllib.request.Request(url, headers=endpoint_model_headers(endpoint))
raw = urllib.request.urlopen(req, timeout=timeout).read()
payload = json.loads(raw)
items = payload.get("data") or payload.get("models") or []
ids = []
seen = set()
for item in items:
mid = item.get("id") if isinstance(item, dict) else None
if mid and mid not in seen:
seen.add(mid)
ids.append(mid)
if not ids:
return None, "No models returned"
return ids, None
except Exception as e:
return None, str(e)
def refresh_endpoint_models(endpoint):
ids, err = fetch_models_for_endpoint(endpoint)
if not ids:
return None, err
updated = dict(endpoint)
updated["models"] = ids
if updated.get("default_model") not in ids:
updated["default_model"] = ids[0]
return updated, None
# ═══════════════════════════════════════════════════════════════════
# Endpoint storage
# ═══════════════════════════════════════════════════════════════════
def load_endpoints():
if ENDPOINTS_FILE.exists():
try:
return json.loads(ENDPOINTS_FILE.read_text())
except Exception:
pass
return {"default": None, "endpoints": []}
def save_endpoints(data):
ENDPOINTS_FILE.parent.mkdir(parents=True, exist_ok=True)
ENDPOINTS_FILE.write_text(json.dumps(data, indent=2))
def load_bgp_pools():
if BGP_POOLS_FILE.exists():
try:
return json.loads(BGP_POOLS_FILE.read_text())
except Exception:
pass
return {"pools": []}
def save_bgp_pools(data):
BGP_POOLS_FILE.parent.mkdir(parents=True, exist_ok=True)
BGP_POOLS_FILE.write_text(json.dumps(data, indent=2))
def get_endpoint(name):
for e in load_endpoints()["endpoints"]:
if e["name"] == name:
return e
return None
def now_utc_iso():
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
def build_profile_bundle():
return {
"version": 1,
"exported_at": now_utc_iso(),
"endpoints": load_endpoints(),
"codex_config_toml": CONFIG.read_text(encoding="utf-8") if CONFIG.exists() else "",
}
def save_profile_bundle(path):
bundle = build_profile_bundle()
Path(path).write_text(json.dumps(bundle, indent=2), encoding="utf-8")
def import_profile_bundle(path):
data = json.loads(Path(path).read_text(encoding="utf-8"))
if not isinstance(data, dict):
raise ValueError("Invalid profile bundle")
endpoints = data.get("endpoints")
if not isinstance(endpoints, dict) or "endpoints" not in endpoints:
raise ValueError("Profile bundle missing endpoints")
# Keep a local rollback point before overwriting the current profile.
if CONFIG.exists():
shutil.copy2(str(CONFIG), str(CONFIG_BAK))
if ENDPOINTS_FILE.exists():
shutil.copy2(str(ENDPOINTS_FILE), str(ENDPOINTS_FILE.with_suffix(".json.import-bak")))
save_endpoints(endpoints)
cfg = data.get("codex_config_toml", "")
if isinstance(cfg, str) and cfg.strip():
CONFIG.parent.mkdir(parents=True, exist_ok=True)
CONFIG.write_text(cfg, encoding="utf-8")
return endpoints
# ═══════════════════════════════════════════════════════════════════
# Config management
# ═══════════════════════════════════════════════════════════════════
def backup_config():
if CONFIG.exists():
shutil.copy2(str(CONFIG), str(CONFIG_BAK))
def restore_config():
if CONFIG_BAK.exists():
CONFIG_BAK.rename(CONFIG)
def write_config_for_native(endpoint, selected_model):
"""Write config for native OpenAI (no proxy needed)."""
backup_config()
model_catalog = _gen_model_catalog(endpoint, selected_model)
mc_path = PROXY_CONFIG_DIR / f"models-{safe_name(endpoint['name'])}.json"
mc_path.parent.mkdir(parents=True, exist_ok=True)
mc_path.write_text(json.dumps(model_catalog, indent=2))
lines = [
f'model = "{_toml_safe(selected_model)}"\n',
f'model_provider = "{_toml_safe(endpoint["name"])}"\n',
f'model_catalog_json = "{mc_path}"\n',
f'\n[model_providers."{endpoint["name"]}"]\n',
f'name = "{_toml_safe(endpoint["name"])}"\n',
f'base_url = "{_toml_safe(endpoint["base_url"])}"\n',
f'experimental_bearer_token = "{_toml_safe(endpoint["api_key"])}"\n',
f'\n[profiles."{endpoint["name"]}"]\n',
f'model_provider = "{_toml_safe(endpoint["name"])}"\n',
f'model = "{_toml_safe(selected_model)}"\n',
f'model_catalog_json = "{mc_path}"\n',
f'service_tier = "default"\n',
f'approvals_reviewer = "user"\n',
]
CONFIG.write_text("".join(lines))
def _toml_safe(val):
val = str(val).replace('"', '\\"')
return val.split('\n', 1)[0].strip()
def write_config_for_translated(endpoint, selected_model):
"""Write config pointing at local proxy."""
backup_config()
model_catalog = _gen_model_catalog(endpoint, selected_model)
mc_path = PROXY_CONFIG_DIR / f"models-{safe_name(endpoint['name'])}.json"
mc_path.parent.mkdir(parents=True, exist_ok=True)
mc_path.write_text(json.dumps(model_catalog, indent=2))
lines = [
f'model = "{_toml_safe(selected_model)}"\n',
f'model_provider = "{_toml_safe(endpoint["name"])}"\n',
f'model_catalog_json = "{mc_path}"\n',
f'\n[model_providers."{endpoint["name"]}"]\n',
f'name = "{_toml_safe(endpoint["name"])}"\n',
f'base_url = "http://127.0.0.1:8080"\n',
f'experimental_bearer_token = "{_toml_safe(endpoint["api_key"])}"\n',
f'\n[profiles."{endpoint["name"]}"]\n',
f'model_provider = "{_toml_safe(endpoint["name"])}"\n',
f'model = "{_toml_safe(selected_model)}"\n',
f'model_catalog_json = "{mc_path}"\n',
f'service_tier = "fast"\n',
f'approvals_reviewer = "user"\n',
]
CONFIG.write_text("".join(lines))
def _gen_model_catalog(endpoint, selected_model=None):
default_model = selected_model or endpoint.get("default_model")
models = []
for mid in endpoint.get("models", []):
models.append({
"slug": mid, "model": mid, "display_name": mid,
"description": f"{endpoint['name']} {mid}",
"hidden": False, "isDefault": mid == default_model,
"shell_type": "shell_command", "visibility": "list",
"default_reasoning_level": "medium",
"supported_reasoning_levels": [
{"effort": "low", "description": "Fast"},
{"effort": "medium", "description": "Balanced"},
{"effort": "high", "description": "Deep"},
{"effort": "xhigh", "description": "Extra deep"},
],
"supportedReasoningEfforts": [
{"reasoningEffort": "low", "description": "Fast"},
{"reasoningEffort": "medium", "description": "Balanced"},
{"reasoningEffort": "high", "description": "Deep"},
{"reasoningEffort": "xhigh", "description": "Extra deep"},
],
"priority": 30, "context_size": 128000,
"additional_speed_tiers": [], "service_tiers": [],
"supports_reasoning_summaries": True, "support_verbosity": True,
"reasoning": True, "tool_call": True,
"supports_parallel_tool_calls": True,
"experimental_supported_tools": [], "supported_in_api": True,
"truncation_policy": {"mode": "tokens", "limit": 128000},
"base_instructions": "You are Codex, a coding agent.",
})
return {"models": models}
# ═══════════════════════════════════════════════════════════════════
# Proxy management
# ═══════════════════════════════════════════════════════════════════
_proxy_proc = None
def _start_proxy_for(endpoint, logfn):
global _proxy_proc
_stop_proxy()
pcfg = {
"port": 8080,
"backend_type": endpoint["backend_type"],
"target_url": normalize_base_url(endpoint["base_url"]),
"api_key": endpoint["api_key"],
"cc_version": endpoint.get("cc_version", ""),
"oauth_provider": endpoint.get("oauth_provider", ""),
"reasoning_enabled": endpoint.get("reasoning_enabled", True),
"reasoning_effort": endpoint.get("reasoning_effort", "medium"),
"models": [{"id": m, "object": "model", "created": 1700000000, "owned_by": endpoint["name"]}
for m in endpoint.get("models", [])],
}
pcfg_path = PROXY_CONFIG_DIR / f"proxy-{safe_name(endpoint['name'])}.json"
pcfg_path.parent.mkdir(parents=True, exist_ok=True)
pcfg_path.write_text(json.dumps(pcfg, indent=2))
_start_proxy_with_config(pcfg_path, logfn)
def _start_proxy_with_config(pcfg_path, logfn):
global _proxy_proc
_proxy_proc = subprocess.Popen(
["python3", str(PROXY), "--config", str(pcfg_path)],
stdout=subprocess.DEVNULL,
preexec_fn=os.setsid,
)
for _ in range(30):
try:
urllib.request.urlopen("http://127.0.0.1:8080/v1/models", timeout=2)
logfn("Proxy ready on port 8080")
return
except Exception:
time.sleep(0.5)
logfn("WARNING: proxy may not have started in time")
def _stop_proxy():
global _proxy_proc
if _proxy_proc and _proxy_proc.poll() is None:
try:
os.killpg(os.getpgid(_proxy_proc.pid), signal.SIGTERM)
time.sleep(0.5)
if _proxy_proc.poll() is None:
os.killpg(os.getpgid(_proxy_proc.pid), signal.SIGKILL)
except (ProcessLookupError, PermissionError):
pass
_proxy_proc = None
def _run_cleanup():
subprocess.run(["bash", str(CLEANUP)], capture_output=True, timeout=30)
def _last_log_lines(n=15):
try:
t = LAUNCH_LOG.read_text()
return "\n".join(t.splitlines()[-n:])
except Exception:
return "(no log file)"
def _detect_codex_cli():
try:
path = shutil.which("codex")
if not path:
return None
out = subprocess.run(["codex", "--version"], capture_output=True, text=True, timeout=5)
ver = (out.stdout or "").strip() or (out.stderr or "").strip() or "unknown"
return (path, ver)
except Exception:
return None
def _detect_codex_desktop():
if START_SH.exists():
return str(START_SH)
return None
def _check_codex_auth():
try:
out = subprocess.run(
["codex", "login", "status"],
capture_output=True, text=True, timeout=10,
)
text = (out.stdout or "").strip()
if not text:
text = (out.stderr or "").strip()
if out.returncode == 0 and text:
return ("logged_in", text)
if text:
return ("error", text)
return ("unknown", "No output from codex login status")
except FileNotFoundError:
return ("not_installed", "codex not found")
except Exception as e:
return ("error", str(e))
# ═══════════════════════════════════════════════════════════════════
# Main window
# ═══════════════════════════════════════════════════════════════════
class LauncherWin(Gtk.Window):
def __init__(self):
super().__init__(title="Codex Launcher")
self.set_default_size(560, 460)
self.set_border_width(12)
self.set_position(Gtk.WindowPosition.CENTER)
self._proc = None
self._endpoints_data = load_endpoints()
vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8)
self.add(vbox)
# header row
hdr = Gtk.Box(spacing=8)
vbox.pack_start(hdr, False, False, 0)
lbl = Gtk.Label(label="<b>Codex Launcher v2.6.1</b>")
lbl.set_use_markup(True)
hdr.pack_start(lbl, False, False, 0)
changelog_btn = Gtk.Button(label="Changelog")
changelog_btn.connect("clicked", lambda b: self._show_changelog())
hdr.pack_end(changelog_btn, False, False, 0)
usage_btn = Gtk.Button(label="Usage")
usage_btn.connect("clicked", lambda b: self._open_usage())
hdr.pack_end(usage_btn, False, False, 0)
bgp_btn = Gtk.Button(label="AI BGP")
bgp_btn.connect("clicked", lambda b: self._open_bgp())
hdr.pack_end(bgp_btn, False, False, 0)
mgr_btn = Gtk.Button(label="Manage Endpoints")
mgr_btn.connect("clicked", lambda b: self._open_mgr())
hdr.pack_end(mgr_btn, False, False, 0)
# verification status bar
self._cli_info = _detect_codex_cli()
self._desktop_info = _detect_codex_desktop()
ver_box = Gtk.Box(spacing=12)
vbox.pack_start(ver_box, False, False, 0)
if self._cli_info:
cli_path, cli_ver = self._cli_info
cli_lbl = Gtk.Label()
cli_lbl.set_markup(f"<span foreground='#2ea043'>✔ Codex CLI</span> <small>{cli_ver} ({cli_path})</small>")
cli_lbl.set_use_markup(True)
ver_box.pack_start(cli_lbl, False, False, 0)
else:
cli_lbl = Gtk.Label()
cli_lbl.set_markup("<span foreground='#d29922'>✘ Codex CLI — not found</span>")
cli_lbl.set_use_markup(True)
ver_box.pack_start(cli_lbl, False, False, 0)
cli_install_btn = Gtk.Button(label="Install")
cli_install_btn.connect("clicked", lambda b: self._show_install_guide("cli"))
ver_box.pack_start(cli_install_btn, False, False, 0)
ver_box.pack_start(Gtk.Label(label=" "), False, False, 0)
if self._desktop_info:
desk_lbl = Gtk.Label()
desk_lbl.set_markup(f"<span foreground='#2ea043'>✔ Codex Desktop</span> <small>({self._desktop_info})</small>")
desk_lbl.set_use_markup(True)
ver_box.pack_start(desk_lbl, False, False, 0)
else:
desk_lbl = Gtk.Label()
desk_lbl.set_markup("<span foreground='#d29922'>✘ Codex Desktop — not found</span>")
desk_lbl.set_use_markup(True)
ver_box.pack_start(desk_lbl, False, False, 0)
desk_install_btn = Gtk.Button(label="Install")
desk_install_btn.connect("clicked", lambda b: self._show_install_guide("desktop"))
ver_box.pack_start(desk_install_btn, False, False, 0)
self._missing = []
if not self._cli_info:
self._missing.append("cli")
if not self._desktop_info:
self._missing.append("desktop")
auth_box = Gtk.Box(spacing=12)
vbox.pack_start(auth_box, False, False, 0)
self._auth_label = Gtk.Label()
self._auth_label.set_markup("<span foreground='#888'>Checking auth…</span>")
self._auth_label.set_use_markup(True)
self._auth_label.set_ellipsize(3)
auth_box.pack_start(self._auth_label, False, False, 0)
self._relogin_btn = Gtk.Button(label="Re-login")
self._relogin_btn.set_sensitive(False)
self._relogin_btn.connect("clicked", lambda b: self._codex_relogin())
auth_box.pack_end(self._relogin_btn, False, False, 0)
threading.Thread(target=self._check_auth_async, daemon=True).start()
ops_box = Gtk.Box(spacing=8)
vbox.pack_start(ops_box, False, False, 0)
self._refresh_all_btn = Gtk.Button(label="Refresh Models")
self._refresh_all_btn.connect("clicked", lambda b: self._refresh_all_models())
ops_box.pack_start(self._refresh_all_btn, False, False, 0)
self._backup_btn = Gtk.Button(label="Backup Profile")
self._backup_btn.connect("clicked", lambda b: self._backup_profile())
ops_box.pack_start(self._backup_btn, False, False, 0)
self._import_btn = Gtk.Button(label="Import Profile")
self._import_btn.connect("clicked", lambda b: self._import_profile())
ops_box.pack_start(self._import_btn, False, False, 0)
# endpoint selector
sel_box = Gtk.Box(spacing=6)
vbox.pack_start(sel_box, False, False, 4)
sel_box.pack_start(Gtk.Label(label="Endpoint:"), False, False, 0)
self._combo = Gtk.ComboBoxText()
self._combo.connect("changed", lambda c: self._on_endpoint_changed())
sel_box.pack_start(self._combo, True, True, 0)
# model selector
sel_box.pack_start(Gtk.Label(label="Model:"), False, False, 0)
self._model_combo = Gtk.ComboBoxText()
sel_box.pack_start(self._model_combo, True, True, 0)
# launch buttons
btn_box = Gtk.Box(spacing=8, homogeneous=True)
vbox.pack_start(btn_box, False, False, 8)
self._btn_desktop = Gtk.Button(label="Launch Desktop")
self._btn_desktop.connect("clicked", lambda b: self._launch("desktop"))
if "desktop" in self._missing:
self._btn_desktop.set_tooltip_text("Codex Desktop is not installed")
self._btn_desktop.set_sensitive(False)
btn_box.pack_start(self._btn_desktop, True, True, 0)
self._btn_cli = Gtk.Button(label="Launch CLI")
self._btn_cli.connect("clicked", lambda b: self._launch("cli"))
if "cli" in self._missing:
self._btn_cli.set_tooltip_text("Codex CLI is not installed")
self._btn_cli.set_sensitive(False)
btn_box.pack_start(self._btn_cli, True, True, 0)
btn_box2 = Gtk.Box(spacing=8, homogeneous=True)
vbox.pack_start(btn_box2, False, False, 0)
self._btn_codex_desktop = Gtk.Button(label="Codex Default (Desktop)")
self._btn_codex_desktop.connect("clicked", lambda b: self._launch_codex_default("desktop"))
if "desktop" in self._missing:
self._btn_codex_desktop.set_tooltip_text("Codex Desktop is not installed")
self._btn_codex_desktop.set_sensitive(False)
btn_box2.pack_start(self._btn_codex_desktop, True, True, 0)
self._btn_codex_cli = Gtk.Button(label="Codex Default (CLI)")
self._btn_codex_cli.connect("clicked", lambda b: self._launch_codex_default("cli"))
if "cli" in self._missing:
self._btn_codex_cli.set_tooltip_text("Codex CLI is not installed")
self._btn_codex_cli.set_sensitive(False)
btn_box2.pack_start(self._btn_codex_cli, True, True, 0)
# status
sw = Gtk.ScrolledWindow()
sw.set_policy(Gtk.PolicyType.AUTOMATIC, Gtk.PolicyType.AUTOMATIC)
vbox.pack_start(sw, True, True, 0)
self._buf = Gtk.TextBuffer()
self._tv = Gtk.TextView(buffer=self._buf)
self._tv.set_editable(False)
self._tv.set_cursor_visible(False)
self._tv.set_wrap_mode(Gtk.WrapMode.WORD_CHAR)
sw.add(self._tv)
# bottom bar
bb = Gtk.Box(spacing=8)
vbox.pack_start(bb, False, False, 0)
self._kill_btn = Gtk.Button(label="Kill && Cleanup")
self._kill_btn.connect("clicked", lambda b: self._kill())
self._kill_btn.set_sensitive(False)
bb.pack_start(self._kill_btn, True, True, 0)
self._view_log_btn = Gtk.Button(label="View Log")
self._view_log_btn.connect("clicked", lambda b: subprocess.Popen(["xdg-open", str(LAUNCH_LOG)]))
bb.pack_start(self._view_log_btn, False, False, 0)
self._close_btn = Gtk.Button(label="Close")
self._close_btn.connect("clicked", lambda b: self._do_close())
bb.pack_start(self._close_btn, False, False, 0)
self.show_all()
self._rebuild_combo()
self._log_dependency_status()
# ── helpers ──────────────────────────────────────────────────
def log(self, msg):
GLib.idle_add(self._append_log, msg)
def _append_log(self, msg):
e = self._buf.get_end_iter()
self._buf.insert(e, msg + "\n")
m = self._buf.create_mark(None, e, False)
self._tv.scroll_to_mark(m, 0.0, True, 0.0, 0.5)
self._buf.delete_mark(m)
def _log_dependency_status(self):
if self._cli_info:
_, ver = self._cli_info
self.log(f"✔ Codex CLI detected ({ver})")
else:
self.log("✘ Codex CLI NOT found — CLI launch disabled. Click 'Install' above.")
if self._desktop_info:
self.log(f"✔ Codex Desktop detected ({self._desktop_info})")
else:
self.log("✘ Codex Desktop NOT found — Desktop launch disabled. Click 'Install' above.")
if self._missing:
self.log("⚠ Install missing tools before using the launcher.")
else:
self.log("All dependencies OK.")
def _check_auth_async(self):
status, msg = _check_codex_auth()
GLib.idle_add(self._update_auth_status, status, msg)
def _update_auth_status(self, status, msg):
if status == "logged_in":
self._auth_label.set_markup(f"<span foreground='#2ea043'>✔ Auth: {msg}</span>")
self._relogin_btn.set_sensitive("cli" not in self._missing)
elif status == "not_installed":
self._auth_label.set_markup("<span foreground='#888'>Auth: N/A (CLI not installed)</span>")
else:
self._auth_label.set_markup(f"<span foreground='#d29922'>⚠ Auth: {msg}</span>")
self._relogin_btn.set_sensitive("cli" not in self._missing)
return False
def _codex_relogin(self):
self.log("Opening codex login in terminal…")
terms = [
("x-terminal-emulator", ["-e"]),
("kgx", ["--"]),
("gnome-terminal", ["--"]),
("konsole", ["-e"]),
("xterm", ["-e"]),
]
term = None
term_args = None
for t in terms:
if shutil.which(t[0]):
term = t[0]
term_args = t[1]
break
if not term:
self.log("ERROR: no terminal emulator found for re-login")
return
cmd_parts = [term] + term_args + ["codex", "login"]
subprocess.Popen(cmd_parts, preexec_fn=os.setsid)
self.log("Login flow started in terminal. Re-checking auth in 30s…")
self._auth_label.set_markup("<span foreground='#888'>Auth: waiting for login…</span>")
threading.Thread(target=self._delayed_auth_check, daemon=True).start()
def _delayed_auth_check(self):
time.sleep(30)
self._check_auth_async()
def _set_busy(self, busy):
def _update():
has_cli = "cli" not in self._missing
has_desk = "desktop" not in self._missing
self._btn_desktop.set_sensitive(not busy and has_desk)
self._btn_cli.set_sensitive(not busy and has_cli)
self._btn_codex_desktop.set_sensitive(not busy and has_desk)
self._btn_codex_cli.set_sensitive(not busy and has_cli)
self._kill_btn.set_sensitive(busy)
GLib.idle_add(_update)
def _rebuild_combo(self):
self._endpoints_data = load_endpoints()
self._combo.remove_all()
names = [e["name"] for e in self._endpoints_data["endpoints"]]
for n in names:
self._combo.append_text(n)
bgp_names = [p["name"] for p in load_bgp_pools().get("pools", [])]
for n in bgp_names:
self._combo.append_text(f"🔀 {n}")
if names or bgp_names:
default = self._endpoints_data.get("default")
if default and default in names:
self._combo.set_active(names.index(default))
else:
self._combo.set_active(0)
self._on_endpoint_changed()
def _on_endpoint_changed(self):
name = self._combo.get_active_text()
is_bgp = name and name.startswith("🔀 ")
bgp_name = name[2:] if is_bgp else None
ep = get_endpoint(name) if name and not is_bgp else None
self._model_combo.remove_all()
if is_bgp:
pool = None
for p in load_bgp_pools().get("pools", []):
if p["name"] == bgp_name:
pool = p
break
if pool:
seen = set()
for r in pool.get("routes", []):
m = r.get("model", "")
if m and m not in seen:
self._model_combo.append_text(m)
seen.add(m)
if seen:
self._model_combo.set_active(0)
elif ep:
for m in ep.get("models", []):
self._model_combo.append_text(m)
GLib.idle_add(self._select_default_model, ep)
def _select_default_model(self, ep):
dm = ep.get("default_model", "")
models = ep.get("models", [])
if dm in models:
self._model_combo.set_active(models.index(dm))
elif models:
self._model_combo.set_active(0)
# ── endpoint mgr ─────────────────────────────────────────────
def _open_mgr(self):
try:
self._mgr_window = EndpointMgr(self)
self._mgr_window.connect("destroy", lambda *_: setattr(self, "_mgr_window", None))
except Exception as e:
import traceback; traceback.print_exc()
d = Gtk.MessageDialog(self, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, f"Error: {e}")
d.run(); d.destroy()
def _open_bgp(self):
try:
self._bgp_window = BGPPoolMgr(self)
self._bgp_window.connect("destroy", lambda *_: setattr(self, "_bgp_window", None))
except Exception as e:
import traceback; traceback.print_exc()
d = Gtk.MessageDialog(self, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, f"Error: {e}")
d.run(); d.destroy()
def _open_usage(self):
try:
self._usage_window = UsageWindow(self)
self._usage_window.connect("destroy", lambda *_: setattr(self, "_usage_window", None))
except Exception as e:
import traceback; traceback.print_exc()
d = Gtk.MessageDialog(self, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, f"Error: {e}")
d.run(); d.destroy()
def _backup_profile(self):
chooser = Gtk.FileChooserDialog(
title="Backup Codex Profile",
parent=self,
action=Gtk.FileChooserAction.SAVE,
)
chooser.add_buttons(Gtk.STOCK_CANCEL, Gtk.ResponseType.CANCEL,
Gtk.STOCK_SAVE, Gtk.ResponseType.OK)
chooser.set_do_overwrite_confirmation(True)
chooser.set_current_name(f"codex-profile-{time.strftime('%Y%m%d-%H%M%S')}.json")
resp = chooser.run()
filename = chooser.get_filename() if resp == Gtk.ResponseType.OK else None
chooser.destroy()
if not filename:
return
try:
save_profile_bundle(filename)
self.log(f"Profile backed up to {filename}")
except Exception as e:
self._show_message(Gtk.MessageType.ERROR, f"Backup failed:\n{e}")
def _refresh_all_models(self):
if getattr(self, "_refresh_running", False):
return
self._refresh_running = True
self._refresh_all_btn.set_sensitive(False)
self.log("Refreshing models for all providers...")
threading.Thread(target=self._refresh_all_models_worker, daemon=True).start()
def _refresh_all_models_worker(self):
try:
data = load_endpoints()
updated = 0
failed = []
for idx, ep in enumerate(list(data["endpoints"])):
refreshed, err = refresh_endpoint_models(ep)
if refreshed:
data["endpoints"][idx] = refreshed
updated += 1
else:
failed.append(f"{ep['name']}: {err}")
if updated:
save_endpoints(data)
GLib.idle_add(self._finish_refresh_all_models, updated, failed)
except Exception as e:
GLib.idle_add(self._finish_refresh_all_models_error, str(e))
def _finish_refresh_all_models(self, updated, failed):
try:
if updated:
self._rebuild_combo()
if getattr(self, "_mgr_window", None):
try:
self._mgr_window._rebuild()
except Exception:
pass
self.log(f"Refreshed models for {updated} provider(s)")
if failed:
self._show_message(
Gtk.MessageType.WARNING,
"Some providers could not auto-fetch models.\n\n"
+ "\n".join(failed)
+ "\n\nThose providers were left unchanged so you can manage them manually."
)
elif updated:
self._show_message(Gtk.MessageType.INFO, f"Refreshed models for {updated} provider(s).")
else:
self._show_message(Gtk.MessageType.INFO, "No providers were refreshed.")
finally:
self._refresh_running = False
self._refresh_all_btn.set_sensitive(True)
return False
def _finish_refresh_all_models_error(self, err):
try:
self._show_message(Gtk.MessageType.ERROR, f"Refresh failed:\n{err}")
finally:
self._refresh_running = False
self._refresh_all_btn.set_sensitive(True)
return False
def _import_profile(self):
if self._proc and self._proc.poll() is None:
self._show_message(Gtk.MessageType.WARNING, "Stop Codex before importing a profile.")
return
chooser = Gtk.FileChooserDialog(
title="Import Codex Profile",
parent=self,
action=Gtk.FileChooserAction.OPEN,
)
chooser.add_buttons(Gtk.STOCK_CANCEL, Gtk.ResponseType.CANCEL,
Gtk.STOCK_OPEN, Gtk.ResponseType.OK)
resp = chooser.run()
filename = chooser.get_filename() if resp == Gtk.ResponseType.OK else None
chooser.destroy()
if not filename:
return
confirm = Gtk.MessageDialog(
self, 0, Gtk.MessageType.QUESTION, Gtk.ButtonsType.YES_NO,
"Importing will replace the current endpoints and Codex config. Continue?"
)
ok = confirm.run() == Gtk.ResponseType.YES
confirm.destroy()
if not ok:
return
try:
import_profile_bundle(filename)
self._rebuild_combo()
self.log(f"Profile imported from {filename}")
self._show_message(Gtk.MessageType.INFO, "Profile imported successfully.")
except Exception as e:
self._show_message(Gtk.MessageType.ERROR, f"Import failed:\n{e}")
def _on_endpoints_updated(self):
self._rebuild_combo()
def _show_message(self, msg_type, text):
d = Gtk.MessageDialog(self, 0, msg_type, Gtk.ButtonsType.OK, text)
d.run()
d.destroy()
def _show_changelog(self):
d = Gtk.Dialog(title="Changelog", transient_for=self, modal=True)
d.set_default_size(520, 480)
d.add_button("Close", Gtk.ResponseType.CLOSE)
area = d.get_content_area()
area.set_margin_start(12)
area.set_margin_end(12)
area.set_margin_top(12)
area.set_margin_bottom(12)
sw = Gtk.ScrolledWindow()
sw.set_policy(Gtk.PolicyType.AUTOMATIC, Gtk.PolicyType.AUTOMATIC)
area.pack_start(sw, True, True, 0)
buf = Gtk.TextBuffer()
tv = Gtk.TextView(buffer=buf)
tv.set_editable(False)
tv.set_cursor_visible(False)
tv.set_wrap_mode(Gtk.WrapMode.WORD_CHAR)
sw.add(tv)
lines = []
for ver, date, items in CHANGELOG:
lines.append(f"<b>v{ver}</b> ({date})")
for item in items:
lines.append(f" \u2022 {item}")
lines.append("")
txt = "\n".join(lines).strip()
buf.insert(buf.get_end_iter(), txt)
d.show_all()
d.run()
d.destroy()
def _show_install_guide(self, which):
if which == "cli":
title = "Install Codex CLI"
guide = (
"Codex CLI is required to use CLI launch features.\n\n"
"Install with npm:\n"
" npm install -g @openai/codex\n\n"
"Or download from:\n"
" https://github.com/openai/codex\n\n"
"After installing, restart the launcher."
)
else:
title = "Install Codex Desktop"
guide = (
"Codex Desktop is required to use Desktop launch features.\n\n"
"Expected location: /opt/codex-desktop/start.sh\n\n"
"Download from:\n"
" https://codex.desktop.openai.com\n\n"
"After installing, restart the launcher."
)
d = Gtk.MessageDialog(self, 0, Gtk.MessageType.INFO, Gtk.ButtonsType.OK, guide)
d.set_title(title)
d.run()
d.destroy()
# ── launch ───────────────────────────────────────────────────
def _launch(self, target):
name = self._combo.get_active_text()
if not name:
self.log("ERROR: no endpoint selected")
return
model = self._model_combo.get_active_text()
if not model:
self.log("ERROR: no model selected")
return
is_bgp = name.startswith("🔀 ")
if is_bgp:
pool_name = name[2:]
pool = None
for p in load_bgp_pools().get("pools", []):
if p["name"] == pool_name:
pool = p
break
if not pool:
self.log(f"ERROR: BGP pool '{pool_name}' not found")
return
self._set_busy(True)
self.log(f"=== 🔀 BGP: {pool_name} / {model}{'Desktop' if target == 'desktop' else 'CLI'} ===")
threading.Thread(target=self._run_bgp, args=(pool, model, target), daemon=True).start()
return
ep = get_endpoint(name)
if not ep:
self.log("ERROR: endpoint not found")
return
self._set_busy(True)
self.log(f"=== {ep['name']} / {model}{'Desktop' if target == 'desktop' else 'CLI'} ===")
threading.Thread(target=self._run, args=(ep, model, target), daemon=True).start()
def _launch_codex_default(self, target):
if "cli" not in self._missing:
status, msg = _check_codex_auth()
if status != "logged_in":
d = Gtk.MessageDialog(
self, 0, Gtk.MessageType.WARNING, Gtk.ButtonsType.YES_NO,
f"Codex auth check: {msg}\n\n"
"Launch may fail without valid authentication.\n"
"Continue anyway?"
)
r = d.run()
d.destroy()
if r != Gtk.ResponseType.YES:
self._set_busy(False)
return
self._set_busy(True)
self.log(f"=== Codex Default (OAuth) → {'Desktop' if target == 'desktop' else 'CLI'} ===")
threading.Thread(target=self._run_codex_default, args=(target,), daemon=True).start()
def _run(self, ep, model, target):
try:
self.log("Cleaning up stale processes…")
_run_cleanup()
needs_proxy = ep["backend_type"] != "native"
if needs_proxy:
self.log("Starting translation proxy…")
_start_proxy_for(ep, self.log)
self.log(f"Configuring Codex for {ep['name']} (proxied)…")
write_config_for_translated(ep, model)
else:
self.log(f"Configuring Codex for {ep['name']} (native)…")
write_config_for_native(ep, model)
if target == "desktop":
self._launch_desktop(ep, model)
else:
self._launch_cli(ep, model)
except Exception as e:
self.log(f"ERROR: {e}")
finally:
_stop_proxy()
restore_config()
self._set_busy(False)
self.log("Ready.")
def _run_bgp(self, pool, model, target):
try:
self.log("Cleaning up stale processes…")
_run_cleanup()
self.log(f"Starting BGP proxy with {len(pool.get('routes', []))} routes…")
bgp_ep = {
"name": pool["name"],
"backend_type": "openai-compat",
"base_url": "http://bgp.placeholder",
"api_key": "",
"default_model": model,
"models": list(dict.fromkeys(r.get("model", model) for r in pool.get("routes", []))),
}
pcfg = {
"port": 8080,
"backend_type": "openai-compat",
"target_url": "http://bgp.placeholder",
"api_key": "",
"bgp_routes": pool.get("routes", []),
"models": [{"id": m, "object": "model", "created": 1700000000, "owned_by": "bgp"} for m in bgp_ep["models"]],
}
pcfg_path = PROXY_CONFIG_DIR / f"proxy-{safe_name(pool['name'])}.json"
pcfg_path.parent.mkdir(parents=True, exist_ok=True)
pcfg_path.write_text(json.dumps(pcfg, indent=2))
_start_proxy_with_config(pcfg_path, self.log)
write_config_for_translated(bgp_ep, model)
if target == "desktop":
self._launch_desktop(bgp_ep, model)
else:
self._launch_cli(bgp_ep, model)
except Exception as e:
self.log(f"ERROR: {e}")
finally:
_stop_proxy()
restore_config()
self._set_busy(False)
self.log("Ready.")
def _run_codex_default(self, target):
try:
self.log("Cleaning up stale processes…")
_run_cleanup()
_stop_proxy()
self.log("Resetting config to Codex defaults (OAuth)…")
backup_config()
if CONFIG.exists():
CONFIG.unlink()
if target == "desktop":
self._launch_desktop_direct()
else:
self._launch_cli_default()
except Exception as e:
self.log(f"ERROR: {e}")
finally:
restore_config()
self._set_busy(False)
self.log("Ready.")
def _launch_desktop(self, ep, model):
args = [str(START_SH)]
if ep["backend_type"] != "native":
args += ["--", "--ozone-platform=wayland"]
self._proc = subprocess.Popen(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, preexec_fn=os.setsid)
pid = self._proc.pid
self.log(f"Desktop started (PID {pid})")
self.log(f"Log: {LAUNCH_LOG}")
t0 = time.time()
stall_warned = False
while self._proc and self._proc.poll() is None:
time.sleep(1.5)
el = time.time() - t0
if el > 20 and not stall_warned:
self.log("⚠ Still starting after 20 s — possible stall. Click Kill if window doesn't appear.")
self.log(f"--- last log lines ---\n{_last_log_lines()}")
stall_warned = True
if self._proc:
rc = self._proc.poll()
el = time.time() - t0
self.log(f"Desktop exited (code {rc}) after {el:.0f}s")
if el < 12:
self.log("TIP: Quick exit — may be warm-start handoff (normal) or crash. Kill && retry if needed.")
self.log(f"--- last log lines ---\n{_last_log_lines()}")
self._proc = None
def _launch_cli(self, ep, model):
"""Launch codex CLI in a terminal with the selected endpoint."""
self.log(f"Launching Codex CLI with {ep['name']}")
# Find a terminal emulator
terms = [
("x-terminal-emulator", ["-e"]),
("kgx", ["--"]),
("gnome-terminal", ["--"]),
("konsole", ["-e"]),
("xterm", ["-e"]),
]
term = None
term_args = None
for t in terms:
if shutil.which(t[0]):
term = t[0]
term_args = t[1]
break
if not term:
self.log("ERROR: no terminal emulator found (tried x-terminal-emulator, kgx, gnome-terminal, konsole, xterm)")
return
# For proxied endpoints, the proxy is already running (from _run)
# For native, no proxy needed
cmd_parts = [term] + term_args
if ep["backend_type"] == "native":
# Just run codex directly — config.toml is already set up
cmd_parts.extend(["codex", "-c", f"model={model}"])
else:
# Proxy is running, run codex with the profile
cmd_parts.extend(["codex", "--profile", ep["name"], "-c", f"model={model}"])
self.log(f"Running: {' '.join(cmd_parts)}")
self._proc = subprocess.Popen(cmd_parts, preexec_fn=os.setsid)
pid = self._proc.pid
self.log(f"CLI started in terminal (PID {pid})")
# Wait for terminal process
while self._proc and self._proc.poll() is None:
time.sleep(1.5)
if self._proc:
rc = self._proc.poll()
self.log(f"CLI exited (code {rc})")
self._proc = None
def _launch_desktop_direct(self):
self.log("Launching Codex Desktop (default OAuth)…")
self._proc = subprocess.Popen(
[str(START_SH), "--", "--ozone-platform=wayland"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, preexec_fn=os.setsid,
)
pid = self._proc.pid
self.log(f"Desktop started (PID {pid})")
self.log(f"Log: {LAUNCH_LOG}")
t0 = time.time()
stall_warned = False
while self._proc and self._proc.poll() is None:
time.sleep(1.5)
el = time.time() - t0
if el > 20 and not stall_warned:
self.log("Still starting after 20s — possible stall. Click Kill if window doesn't appear.")
self.log(f"--- last log lines ---\n{_last_log_lines()}")
stall_warned = True
if self._proc:
rc = self._proc.poll()
el = time.time() - t0
self.log(f"Desktop exited (code {rc}) after {el:.0f}s")
if el < 12:
self.log("TIP: Quick exit — may be warm-start handoff (normal) or crash.")
self.log(f"--- last log lines ---\n{_last_log_lines()}")
self._proc = None
def _launch_cli_default(self):
self.log("Launching Codex CLI (default OAuth)…")
terms = [
("x-terminal-emulator", ["-e"]),
("kgx", ["--"]),
("gnome-terminal", ["--"]),
("konsole", ["-e"]),
("xterm", ["-e"]),
]
term = None
term_args = None
for t in terms:
if shutil.which(t[0]):
term = t[0]
term_args = t[1]
break
if not term:
self.log("ERROR: no terminal emulator found")
return
cmd_parts = [term] + term_args + ["codex"]
self.log(f"Running: {' '.join(cmd_parts)}")
self._proc = subprocess.Popen(cmd_parts, preexec_fn=os.setsid)
pid = self._proc.pid
self.log(f"CLI started in terminal (PID {pid})")
while self._proc and self._proc.poll() is None:
time.sleep(1.5)
if self._proc:
rc = self._proc.poll()
self.log(f"CLI exited (code {rc})")
self._proc = None
# ── kill ─────────────────────────────────────────────────────
def _kill(self):
self.log("=== Killing ===")
if self._proc and self._proc.poll() is None:
try:
pgid = os.getpgid(self._proc.pid)
os.killpg(pgid, signal.SIGTERM)
time.sleep(1)
if self._proc.poll() is None:
os.killpg(pgid, signal.SIGKILL)
except (ProcessLookupError, PermissionError):
pass
self._proc = None
_stop_proxy()
_run_cleanup()
restore_config()
LOG_DIR.mkdir(parents=True, exist_ok=True)
LAUNCH_LOG.unlink(missing_ok=True)
self.log("Cleanup complete")
self._set_busy(False)
self.log("Ready.")
def _do_close(self):
if self._proc and self._proc.poll() is None:
d = Gtk.MessageDialog(self, 0, Gtk.MessageType.QUESTION, Gtk.ButtonsType.YES_NO,
"Codex is still running. Kill it?")
r = d.run()
d.destroy()
if r != Gtk.ResponseType.YES:
return
self._kill()
_stop_proxy()
Gtk.main_quit()
# ═══════════════════════════════════════════════════════════════════
# Endpoint manager dialog
# ═══════════════════════════════════════════════════════════════════
class EndpointMgr(Gtk.Window):
def __init__(self, parent):
super().__init__(title="Manage Endpoints")
self.set_transient_for(parent)
self.set_modal(True)
self._parent = parent
self.set_default_size(500, 350)
self.set_border_width(12)
self.set_position(Gtk.WindowPosition.CENTER_ON_PARENT)
vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8)
self.add(vbox)
title_lbl = Gtk.Label(label="<b>Endpoints</b>")
title_lbl.set_use_markup(True)
vbox.pack_start(title_lbl, False, False, 0)
sw = Gtk.ScrolledWindow()
vbox.pack_start(sw, True, True, 0)
self._store = Gtk.ListStore(str, str, str, str) # name, provider, backend, default_model
self._tree = Gtk.TreeView(model=self._store)
for i, title in enumerate(["Name", "Provider", "Type", "Default Model"]):
col = Gtk.TreeViewColumn(title, Gtk.CellRendererText(), text=i)
col.set_resizable(True)
self._tree.append_column(col)
sw.add(self._tree)
btn_bar = Gtk.Box(spacing=8)
vbox.pack_start(btn_bar, False, False, 0)
self._add_btn = Gtk.Button(label="Add")
self._add_btn.connect("clicked", lambda b: self._add())
btn_bar.pack_start(self._add_btn, False, False, 0)
self._edit_btn = Gtk.Button(label="Edit")
self._edit_btn.connect("clicked", lambda b: self._edit())
btn_bar.pack_start(self._edit_btn, False, False, 0)
self._delete_btn = Gtk.Button(label="Delete")
self._delete_btn.connect("clicked", lambda b: self._delete())
btn_bar.pack_start(self._delete_btn, False, False, 0)
self._default_btn = Gtk.Button(label="Set Default")
self._default_btn.connect("clicked", lambda b: self._set_default())
btn_bar.pack_start(self._default_btn, False, False, 0)
self._mgr_close_btn = Gtk.Button(label="Close")
self._mgr_close_btn.connect("clicked", lambda b: self.destroy())
btn_bar.pack_end(self._mgr_close_btn, False, False, 0)
self._rebuild()
self.show_all()
def _rebuild(self):
data = load_endpoints()
self._store.clear()
for ep in data["endpoints"]:
provider = ep.get("provider_preset", "Custom")
bt = label_for_backend(ep["backend_type"])
self._store.append([ep["name"], provider, bt, ep.get("default_model", "")])
def _selected(self):
sel = self._tree.get_selection()
m, i = sel.get_selected()
if i is None:
return None
return self._store[i][0]
def _add(self):
try:
self._dialog = EditEndpointDialog(self, None)
self._dialog.connect("destroy", lambda *_: setattr(self, "_dialog", None))
except Exception as e:
import traceback; traceback.print_exc()
d = Gtk.MessageDialog(self, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, f"Error: {e}")
d.run(); d.destroy()
def _edit(self):
name = self._selected()
if name:
try:
self._dialog = EditEndpointDialog(self, name)
self._dialog.connect("destroy", lambda *_: setattr(self, "_dialog", None))
except Exception as e:
import traceback; traceback.print_exc()
d = Gtk.MessageDialog(self, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, f"Error: {e}")
d.run(); d.destroy()
def _delete(self):
name = self._selected()
if not name:
return
d = Gtk.MessageDialog(self, 0, Gtk.MessageType.QUESTION, Gtk.ButtonsType.YES_NO,
f'Delete endpoint "{name}"?')
r = d.run()
d.destroy()
if r != Gtk.ResponseType.YES:
return
data = load_endpoints()
data["endpoints"] = [e for e in data["endpoints"] if e["name"] != name]
if data.get("default") == name:
data["default"] = data["endpoints"][0]["name"] if data["endpoints"] else None
save_endpoints(data)
self._rebuild()
self._parent._on_endpoints_updated()
def _set_default(self):
name = self._selected()
if not name:
return
data = load_endpoints()
data["default"] = name
save_endpoints(data)
self._rebuild()
self._parent._on_endpoints_updated()
# ═══════════════════════════════════════════════════════════════════
# Edit endpoint dialog
# ═══════════════════════════════════════════════════════════════════
class EditEndpointDialog(Gtk.Dialog):
def __init__(self, parent, existing_name):
title = "Edit Endpoint" if existing_name else "Add Endpoint"
Gtk.Dialog.__init__(self, title=title)
self.set_transient_for(parent)
self.set_modal(True)
self._parent_mgr = parent
self._existing_name = existing_name
self._data = get_endpoint(existing_name) if existing_name else {
"name": "", "backend_type": "openai-compat",
"base_url": "", "api_key": "", "default_model": "", "models": [],
"provider_preset": "Custom",
}
self.set_default_size(480, 520)
area = self.get_content_area()
area.set_spacing(6)
area.set_margin_start(12)
area.set_margin_end(12)
area.set_margin_top(12)
area.set_margin_bottom(12)
grid = Gtk.Grid(column_spacing=8, row_spacing=6)
area.pack_start(grid, False, False, 0)
def add_row(row, label, widget):
grid.attach(Gtk.Label(label=label, xalign=1), 0, row, 1, 1)
grid.attach(widget, 1, row, 1, 1)
self._entry_name = Gtk.Entry(text=self._data.get("name", ""))
add_row(0, "Name:", self._entry_name)
self._combo_preset = Gtk.ComboBoxText()
self._preset_names = list(PROVIDER_PRESETS.keys())
for preset_name in self._preset_names:
self._combo_preset.append_text(preset_name)
self._combo_preset.set_active(self._preset_names.index(self._data.get("provider_preset", "Custom")) if self._data.get("provider_preset", "Custom") in self._preset_names else 0)
self._combo_preset.connect("changed", lambda c: self._apply_selected_preset())
add_row(1, "Preset:", self._combo_preset)
self._combo_type = Gtk.ComboBoxText()
for val, lab in [("openai-compat", "OpenAI-compatible (needs proxy)"),
("anthropic", "Anthropic (needs proxy)"),
("command-code", "Command Code (needs proxy)"),
("native", "Native OpenAI (no proxy)")]:
self._combo_type.append(val, lab)
bt = self._data.get("backend_type", "openai-compat")
self._combo_type.set_active_id(bt)
add_row(2, "Type:", self._combo_type)
self._entry_url = Gtk.Entry(text=self._data.get("base_url", ""))
add_row(3, "Base URL:", self._entry_url)
self._entry_key = Gtk.Entry(text=self._data.get("api_key", ""))
self._entry_key.set_visibility(False)
key_box = Gtk.Box(spacing=6)
key_box.pack_start(self._entry_key, True, True, 0)
self._oauth_btn = Gtk.Button(label="OAuth Login")
self._oauth_btn.connect("clicked", lambda b: self._do_oauth_login())
key_box.pack_start(self._oauth_btn, False, False, 0)
add_row(4, "API Key:", key_box)
self._oauth_btn.set_visible(False)
self._entry_cc_ver = Gtk.Entry(text=self._data.get("cc_version", ""))
self._entry_cc_ver.set_placeholder_text("e.g. 0.26.8 (Command Code only)")
add_row(5, "CC Version:", self._entry_cc_ver)
reasoning_css = b"""
switch.reasoning-toggle {
min-width: 56px; min-height: 28px;
border-radius: 14px;
background: #e67e22;
border: 2px solid #cf6d17;
}
switch.reasoning-toggle:checked {
background: #2ecc71;
border: 2px solid #27ae60;
}
switch.reasoning-toggle slider {
min-width: 24px; min-height: 24px;
border-radius: 12px;
background: white;
border: 1px solid #bbb;
}
"""
reasoning_box = Gtk.Box(spacing=10)
self._switch_reasoning = Gtk.Switch()
self._switch_reasoning.set_name("reasoning-toggle")
ctx = self._switch_reasoning.get_style_context()
ctx.add_class("reasoning-toggle")
try:
css_prov = Gtk.CssProvider()
css_prov.load_from_data(reasoning_css)
ctx.add_provider(css_prov, Gtk.STYLE_PROVIDER_PRIORITY_USER)
except Exception:
pass
self._switch_reasoning.set_active(self._data.get("reasoning_enabled", True))
self._switch_reasoning.connect("notify::active", lambda *a: self._on_reasoning_toggled())
reasoning_box.pack_start(self._switch_reasoning, False, False, 0)
self._lbl_reasoning = Gtk.Label()
reasoning_box.pack_start(self._lbl_reasoning, False, False, 0)
add_row(6, "Reasoning:", reasoning_box)
self._combo_effort = Gtk.ComboBoxText()
for ev, el in [("none", "None"), ("minimal", "Minimal"), ("low", "Low"),
("medium", "Medium"), ("high", "High"), ("max", "Max")]:
self._combo_effort.append(ev, el)
saved_effort = self._data.get("reasoning_effort", "medium")
self._combo_effort.set_active_id(saved_effort if saved_effort in ("none","minimal","low","medium","high","max") else "medium")
add_row(7, "Effort:", self._combo_effort)
self._on_reasoning_toggled()
# Models
mlbl = Gtk.Label(label="Models:", xalign=0)
area.pack_start(mlbl, False, False, 4)
mbox = Gtk.Box(spacing=6)
area.pack_start(mbox, False, False, 0)
self._entry_model = Gtk.Entry()
mbox.pack_start(self._entry_model, True, True, 0)
self._add_model_btn = Gtk.Button(label="Add")
self._add_model_btn.connect("clicked", lambda b: self._add_model())
mbox.pack_start(self._add_model_btn, False, False, 0)
self._add_list_btn = Gtk.Button(label="Add List")
self._add_list_btn.connect("clicked", lambda b: self._add_models_from_text())
mbox.pack_start(self._add_list_btn, False, False, 0)
self._fetch_models_btn = Gtk.Button(label="Fetch from API")
self._fetch_models_btn.connect("clicked", lambda b: self._fetch_models())
mbox.pack_start(self._fetch_models_btn, False, False, 0)
bulk_lbl = Gtk.Label(label="Bulk add models (one per line or comma-separated):", xalign=0)
area.pack_start(bulk_lbl, False, False, 2)
bulk_sw = Gtk.ScrolledWindow()
bulk_sw.set_min_content_height(72)
area.pack_start(bulk_sw, False, False, 0)
self._bulk_buf = Gtk.TextBuffer()
self._bulk_text = Gtk.TextView(buffer=self._bulk_buf)
self._bulk_text.set_wrap_mode(Gtk.WrapMode.WORD_CHAR)
bulk_sw.add(self._bulk_text)
sw = Gtk.ScrolledWindow()
sw.set_min_content_height(120)
area.pack_start(sw, True, True, 0)
self._model_store = Gtk.ListStore(str)
self._model_tree = Gtk.TreeView(model=self._model_store)
self._model_tree.append_column(Gtk.TreeViewColumn("Model ID", Gtk.CellRendererText(), text=0))
self._model_tree.set_rules_hint(True)
sw.add(self._model_tree)
self._model_tree.connect("row-activated", lambda t, p, c: self._remove_model(p))
for m in self._data.get("models", []):
self._model_store.append([m])
# Default model combo
dbox = Gtk.Box(spacing=6)
area.pack_start(dbox, False, False, 0)
dbox.pack_start(Gtk.Label(label="Default Model:"), False, False, 0)
self._combo_default = Gtk.ComboBoxText()
self._refresh_default_combo()
dbox.pack_start(self._combo_default, True, True, 0)
dm = self._data.get("default_model", "")
if dm:
self._combo_default.set_active_id(dm)
self._apply_selected_preset(initial=True)
# Buttons
self.add_button("Cancel", Gtk.ResponseType.CANCEL)
self.add_button("Save", Gtk.ResponseType.OK)
self.connect("response", self._on_response)
self.show_all()
def _add_model(self):
m = normalize_model_id(self._entry_model.get_text())
if m:
current = self._combo_default.get_active_text()
self._model_store.append([m])
self._refresh_default_combo(current or m)
self._entry_model.set_text("")
def _add_models_from_text(self):
buf = self._bulk_buf.get_text(self._bulk_buf.get_start_iter(), self._bulk_buf.get_end_iter(), True)
models = parse_model_list(buf)
if not models:
return
current = self._combo_default.get_active_text()
existing = {self._model_store[i][0] for i in range(len(self._model_store))}
added = False
for mid in models:
if mid not in existing:
self._model_store.append([mid])
existing.add(mid)
added = True
if added:
self._refresh_default_combo(current or models[0])
self._bulk_buf.set_text("")
def _apply_selected_preset(self, initial=False):
preset_name = self._combo_preset.get_active_text() or "Custom"
preset = PROVIDER_PRESETS.get(preset_name, PROVIDER_PRESETS["Custom"])
is_oauth = bool(preset.get("oauth_provider"))
self._oauth_btn.set_visible(is_oauth)
if is_oauth:
self._entry_key.set_placeholder_text("Auto-filled by OAuth")
else:
self._entry_key.set_placeholder_text("")
if not initial or self._existing_name is None:
self._combo_type.set_active_id(preset.get("backend_type", "openai-compat"))
self._entry_url.set_text(preset.get("base_url", ""))
if not self._entry_key.get_text().strip():
self._entry_key.set_text("")
cc_ver = preset.get("cc_version", "")
if cc_ver and not self._entry_cc_ver.get_text().strip():
self._entry_cc_ver.set_text(cc_ver)
if preset.get("models") and len(self._model_store) == 0:
for mid in preset["models"]:
self._model_store.append([mid])
self._refresh_default_combo(preset["models"][0])
if initial and self._data.get("models"):
self._refresh_default_combo(self._data.get("default_model", ""))
def _on_reasoning_toggled(self, *_):
active = self._switch_reasoning.get_active()
self._combo_effort.set_sensitive(active)
if active:
self._lbl_reasoning.set_markup('<span foreground="#27ae60" weight="bold">ON</span>')
else:
self._lbl_reasoning.set_markup('<span foreground="#e67e22" weight="bold">OFF</span>')
def _do_oauth_login(self):
preset_name = self._combo_preset.get_active_text() or "Custom"
preset = PROVIDER_PRESETS.get(preset_name, {})
provider = preset.get("oauth_provider", "")
if provider == "google":
self._google_oauth_flow()
def _google_oauth_flow(self):
token_path = os.path.expanduser("~/.cache/codex-proxy/google-oauth-token.json")
CLIENT_ID = "681255809395-oo8ft2oprdrnp9e3aqf6av3hmdib135j.apps.googleusercontent.com"
CLIENT_SECRET = "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w"
SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
"https://www.googleapis.com/auth/generative-language.retriever",
"https://www.googleapis.com/auth/userinfo.email",
"https://www.googleapis.com/auth/userinfo.profile",
]
import http.server, hashlib, secrets, socket
port = 8085
state = secrets.token_hex(32)
verifier = secrets.token_urlsafe(32)
challenge = hashlib.sha256(verifier.encode()).digest()
challenge_b64 = urllib.parse.quote_plus(__import__('base64').urlsafe_b64encode(challenge).rstrip(b'=').decode())
redirect_uri = f"http://127.0.0.1:{port}/oauth2callback"
scope_str = " ".join(SCOPES)
auth_url = (
f"https://accounts.google.com/o/oauth2/v2/auth?"
f"client_id={CLIENT_ID}"
f"&redirect_uri={urllib.parse.quote(redirect_uri)}"
f"&response_type=code"
f"&scope={urllib.parse.quote(scope_str)}"
f"&access_type=offline"
f"&prompt=consent"
f"&state={state}"
f"&code_challenge={challenge_b64}"
f"&code_challenge_method=S256"
)
dlg = Gtk.Dialog(title="Google OAuth (Gemini Mode)", parent=self, modal=True)
dlg.add_button("Cancel", Gtk.ResponseType.CANCEL)
dlg.set_default_size(520, 280)
area = dlg.get_content_area()
area.set_margin_start(16)
area.set_margin_end(16)
area.set_margin_top(12)
area.set_margin_bottom(12)
area.set_spacing(8)
area.pack_start(Gtk.Label(label="<b>Sign in with Google</b>", use_markup=True, xalign=0), False, False, 0)
area.pack_start(Gtk.Label(label="Emulating Gemini CLI OAuth — no client_secret.json needed.", xalign=0), False, False, 0)
link_lbl = Gtk.Label()
link_lbl.set_markup(f'<a href="{auth_url}">Click here to open Google authorization</a>')
link_lbl.set_line_wrap(True)
area.pack_start(link_lbl, False, False, 4)
self._oauth_status = Gtk.Label(label="Opening browser…", xalign=0)
area.pack_start(self._oauth_status, False, False, 4)
spinner = Gtk.Spinner()
spinner.start()
area.pack_start(spinner, False, False, 8)
area.show_all()
code_holder = [None]
error_holder = [None]
received_state = [None]
class OAuthHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self2):
qs = urllib.parse.urlparse(self2.path).query
params = urllib.parse.parse_qs(qs)
received_state[0] = params.get("state", [None])[0]
if "code" in params:
if received_state[0] != state:
self2.send_response(400)
self2.send_header("Content-Type", "text/html")
self2.end_headers()
self2.wfile.write(b"<html><body style='font-family:sans-serif;text-align:center;padding-top:80px'>"
b"<h2 style='color:#e74c3c'>CSRF state mismatch.</h2></body></html>")
error_holder[0] = "CSRF state mismatch"
return
code_holder[0] = params["code"][0]
self2.send_response(302)
self2.send_header("Location", "https://developers.google.com/gemini-code-assist/auth_success_gemini")
self2.end_headers()
else:
error_holder[0] = params.get("error", ["unknown"])[0]
self2.send_response(302)
self2.send_header("Location", "https://developers.google.com/gemini-code-assist/auth_failure_gemini")
self2.end_headers()
def log_message(self2, *a): pass
try:
server = http.server.HTTPServer(("127.0.0.1", port), OAuthHandler)
except OSError:
self._oauth_status.set_text(f"Port {port} already in use — close other apps and retry.")
spinner.stop()
dlg.run(); dlg.destroy()
return
def wait_for_code():
server.handle_request()
server.server_close()
GLib.idle_add(self._google_oauth_complete_gemini, dlg, code_holder, error_holder,
CLIENT_ID, CLIENT_SECRET, redirect_uri, token_path, spinner, verifier)
threading.Thread(target=wait_for_code, daemon=True).start()
subprocess.Popen(["xdg-open", auth_url], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
dlg.run()
dlg.destroy()
def _google_oauth_complete_gemini(self, dlg, code_holder, error_holder,
client_id, client_secret, redirect_uri, token_path, spinner, verifier):
spinner.stop()
if error_holder[0]:
self._oauth_status.set_markup(f'<span foreground="#e74c3c">Error: {error_holder[0]}</span>')
return
if not code_holder[0]:
self._oauth_status.set_text("No authorization code received.")
return
self._oauth_status.set_text("Exchanging code for token…")
try:
token_data = urllib.parse.urlencode({
"code": code_holder[0],
"client_id": client_id,
"client_secret": client_secret,
"redirect_uri": redirect_uri,
"grant_type": "authorization_code",
"code_verifier": verifier,
}).encode()
req = urllib.request.Request("https://oauth2.googleapis.com/token", data=token_data,
headers={"Content-Type": "application/x-www-form-urlencoded"})
resp = urllib.request.urlopen(req, timeout=30)
tokens = json.loads(resp.read())
tokens["client_id"] = client_id
tokens["client_secret"] = client_secret
tokens["expires_at"] = time.time() + tokens.get("expires_in", 3600)
os.makedirs(os.path.dirname(token_path), exist_ok=True)
with open(token_path, "w") as f:
json.dump(tokens, f, indent=2)
os.chmod(token_path, 0o600)
self._entry_key.set_text(tokens.get("access_token", ""))
self._oauth_status.set_markup('<span foreground="#27ae60" weight="bold">Authorization successful! Token saved.</span>')
dlg.set_title("Google OAuth — Success")
except Exception as e:
self._oauth_status.set_markup(f'<span foreground="#e74c3c">Token exchange failed: {e}</span>')
def _remove_model(self, path):
current = self._combo_default.get_active_text()
self._model_store.remove(self._model_store.get_iter(path))
self._refresh_default_combo(current)
def _refresh_default_combo(self, active=None):
if active is None:
active = self._combo_default.get_active_text()
self._combo_default.remove_all()
for row in self._model_store:
self._combo_default.append(row[0], row[0])
if active and any(row[0] == active for row in self._model_store):
self._combo_default.set_active_id(active)
elif len(self._model_store) > 0:
self._combo_default.set_active(0)
def _fetch_models(self):
ok, err = self._try_fetch_models()
if not ok:
d = Gtk.MessageDialog(self, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK,
f"Failed to fetch models:\n{err}")
d.run()
d.destroy()
def _try_fetch_models(self):
endpoint = {
"base_url": self._entry_url.get_text().strip(),
"api_key": self._entry_key.get_text().strip(),
"backend_type": self._combo_type.get_active_id() or "openai-compat",
}
ids, err = fetch_models_for_endpoint(endpoint)
if ids:
current = self._combo_default.get_active_text()
added = 0
for mid in ids:
# check dupes
found = any(self._model_store[i][0] == mid for i in range(len(self._model_store)))
if not found:
self._model_store.append([mid])
added += 1
self._refresh_default_combo(current)
return True, None
return False, err or "No models returned by endpoint"
def _on_response(self, dialog, response):
if response != Gtk.ResponseType.OK:
self.destroy()
return
name = self._entry_name.get_text().strip()
if not name:
self._show_error("Name is required")
return
bt = self._combo_type.get_active_id()
url = self._entry_url.get_text().strip()
key = self._entry_key.get_text().strip()
models = [self._model_store[i][0] for i in range(len(self._model_store))]
if not models:
ok, err = self._try_fetch_models()
if ok:
models = [self._model_store[i][0] for i in range(len(self._model_store))]
else:
d = Gtk.MessageDialog(
self, 0, Gtk.MessageType.QUESTION, Gtk.ButtonsType.YES_NO,
f"Auto-fetch failed ({err}).\n\nAdd models manually now?"
)
r = d.run()
d.destroy()
if r == Gtk.ResponseType.YES:
self._entry_model.grab_focus()
return
self.destroy()
return
if not models:
self._show_error("At least one model is required")
self._entry_model.grab_focus()
return
default = self._combo_default.get_active_text() or models[0]
data = load_endpoints()
# If renaming, remove old entry
if self._existing_name and self._existing_name != name:
data["endpoints"] = [e for e in data["endpoints"] if e["name"] != self._existing_name]
# Check for duplicate name
existing = [e for e in data["endpoints"] if e["name"] == name and e != self._data]
if existing:
self._show_error(f'Endpoint "{name}" already exists')
return
new_ep = {"name": name, "backend_type": bt, "base_url": url,
"api_key": key, "default_model": default, "models": models,
"provider_preset": self._combo_preset.get_active_text() or "Custom"}
cc_ver = self._entry_cc_ver.get_text().strip()
if cc_ver:
new_ep["cc_version"] = cc_ver
new_ep["reasoning_enabled"] = self._switch_reasoning.get_active()
new_ep["reasoning_effort"] = self._combo_effort.get_active_id() or "medium"
preset_name = self._combo_preset.get_active_text() or "Custom"
preset = PROVIDER_PRESETS.get(preset_name, {})
if preset.get("oauth_provider"):
new_ep["oauth_provider"] = preset["oauth_provider"]
new_ep["base_url"] = normalize_base_url(new_ep["base_url"])
# Update or append
found = False
for i, e in enumerate(data["endpoints"]):
if e["name"] == name:
data["endpoints"][i] = new_ep
found = True
break
if not found:
data["endpoints"].append(new_ep)
if data.get("default") is None:
data["default"] = name
save_endpoints(data)
self._parent_mgr._rebuild()
self._parent_mgr._parent._on_endpoints_updated()
self.destroy()
def _show_error(self, msg):
d = Gtk.MessageDialog(self, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, msg)
d.run(); d.destroy()
# ═══════════════════════════════════════════════════════════════════
# Entry point
# ═══════════════════════════════════════════════════════════════════
# ═══════════════════════════════════════════════════════════════════
# BGP Pool Manager
# ═══════════════════════════════════════════════════════════════════
class BGPPoolMgr(Gtk.Window):
def __init__(self, parent):
super().__init__(title="AI BGP — Pool Manager")
self.set_transient_for(parent)
self.set_default_size(620, 440)
self._parent = parent
vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8)
vbox.set_margin_start(12)
vbox.set_margin_end(12)
vbox.set_margin_top(12)
vbox.set_margin_bottom(12)
self.add(vbox)
hdr = Gtk.Box(spacing=8)
vbox.pack_start(hdr, False, False, 0)
hdr.pack_start(Gtk.Label(label="<b>AI BGP Pools</b> — multi-provider routing with automatic failover", use_markup=True), False, False, 0)
self._store = Gtk.ListStore(str, str, str)
self._tree = Gtk.TreeView(model=self._store)
for i, (title, w) in enumerate([("Pool Name", 200), ("Routes", 250), ("Strategy", 100)]):
r = Gtk.CellRendererText()
c = Gtk.TreeViewColumn(title, r, text=i)
c.set_min_width(w)
self._tree.append_column(c)
self._tree.set_headers_visible(True)
sw = Gtk.ScrolledWindow()
sw.add(self._tree)
vbox.pack_start(sw, True, True, 0)
sel = self._tree.get_selection()
sel.connect("changed", lambda *_: self._on_select())
bbox = Gtk.Box(spacing=8)
vbox.pack_start(bbox, False, False, 0)
self._add_btn = Gtk.Button(label="Create Pool")
self._add_btn.connect("clicked", lambda b: self._add_pool())
bbox.pack_start(self._add_btn, True, True, 0)
self._edit_btn = Gtk.Button(label="Edit Pool")
self._edit_btn.connect("clicked", lambda b: self._edit_pool())
self._edit_btn.set_sensitive(False)
bbox.pack_start(self._edit_btn, True, True, 0)
self._del_btn = Gtk.Button(label="Delete Pool")
self._del_btn.connect("clicked", lambda b: self._del_pool())
self._del_btn.set_sensitive(False)
bbox.pack_start(self._del_btn, True, True, 0)
close_btn = Gtk.Button(label="Close")
close_btn.connect("clicked", lambda b: self.destroy())
bbox.pack_start(close_btn, True, True, 0)
self._rebuild()
self.show_all()
def _rebuild(self):
self._store.clear()
for pool in load_bgp_pools().get("pools", []):
routes_str = "".join(f'{r.get("name","?")}/{r.get("model","?")}' for r in pool.get("routes", []))
self._store.append([pool["name"], routes_str, pool.get("strategy", "failover")])
def _selected_name(self):
sel = self._tree.get_selection()
m, i = sel.get_selected()
return self._store[i][0] if i else None
def _on_select(self):
name = self._selected_name()
self._edit_btn.set_sensitive(bool(name))
self._del_btn.set_sensitive(bool(name))
def _add_pool(self):
d = BGPPoolEditDialog(self, None)
d.connect("response", lambda *_: self._rebuild())
def _edit_pool(self):
name = self._selected_name()
if name:
d = BGPPoolEditDialog(self, name)
d.connect("response", lambda *_: self._rebuild())
def _del_pool(self):
name = self._selected_name()
if not name:
return
d = Gtk.MessageDialog(self, 0, Gtk.MessageType.QUESTION, Gtk.ButtonsType.YES_NO,
f'Delete BGP pool "{name}"?')
r = d.run(); d.destroy()
if r != Gtk.ResponseType.YES:
return
data = load_bgp_pools()
data["pools"] = [p for p in data["pools"] if p["name"] != name]
save_bgp_pools(data)
self._rebuild()
self._parent._on_endpoints_updated()
class BGPPoolEditDialog(Gtk.Dialog):
def __init__(self, parent, existing_name):
title = "Edit BGP Pool" if existing_name else "Create BGP Pool"
Gtk.Dialog.__init__(self, title=title, parent=parent, modal=True)
self.add_button("Cancel", Gtk.ResponseType.CANCEL)
self.add_button("Save", Gtk.ResponseType.OK)
self.set_default_size(580, 480)
self._existing_name = existing_name
self._parent_mgr = parent
data = load_bgp_pools()
pool = None
if existing_name:
for p in data.get("pools", []):
if p["name"] == existing_name:
pool = p
break
if not pool:
pool = {"name": "", "strategy": "failover", "routes": []}
area = self.get_content_area()
area.set_margin_start(12)
area.set_margin_end(12)
area.set_margin_top(12)
area.set_margin_bottom(12)
area.set_spacing(8)
grid = Gtk.Grid(column_spacing=8, row_spacing=6)
area.pack_start(grid, False, False, 0)
grid.attach(Gtk.Label(label="Pool Name:", xalign=1), 0, 0, 1, 1)
self._entry_name = Gtk.Entry(text=pool["name"])
grid.attach(self._entry_name, 1, 0, 1, 1)
grid.attach(Gtk.Label(label="Strategy:", xalign=1), 0, 1, 1, 1)
self._combo_strategy = Gtk.ComboBoxText()
self._combo_strategy.append("failover", "Failover (try primary, fall back on error)")
self._combo_strategy.append("race", "Race (send to all, return fastest)")
self._combo_strategy.set_active_id(pool.get("strategy", "failover"))
grid.attach(self._combo_strategy, 1, 1, 1, 1)
area.pack_start(Gtk.Label(label="<b>Routes</b> (drag to reorder priority)", use_markup=True, xalign=0), False, False, 8)
self._route_store = Gtk.ListStore(str, str, str, str, str, str)
for r in pool.get("routes", []):
self._route_store.append([
r.get("name", ""), r.get("endpoint_name", ""),
r.get("target_url", ""), r.get("api_key", ""),
r.get("model", ""), str(r.get("priority", 99))
])
self._route_tree = Gtk.TreeView(model=self._route_store)
for i, (title, w) in enumerate([
("Route Name", 120), ("Endpoint", 120), ("URL", 150),
("API Key", 80), ("Model", 120), ("Priority", 60)
]):
renderer = Gtk.CellRendererText()
renderer.set_property("editable", False)
col = Gtk.TreeViewColumn(title, renderer, text=i)
col.set_min_width(w)
col.set_resizable(True)
self._route_tree.append_column(col)
self._route_tree.set_headers_visible(True)
sw = Gtk.ScrolledWindow()
sw.set_policy(Gtk.PolicyType.AUTOMATIC, Gtk.PolicyType.AUTOMATIC)
sw.add(self._route_tree)
sw.set_min_content_height(200)
area.pack_start(sw, True, True, 0)
bbox = Gtk.Box(spacing=6)
area.pack_start(bbox, False, False, 0)
add_r = Gtk.Button(label="Add Route")
add_r.connect("clicked", lambda b: self._add_route())
bbox.pack_start(add_r, True, True, 0)
edit_r = Gtk.Button(label="Edit Route")
edit_r.connect("clicked", lambda b: self._edit_route())
bbox.pack_start(edit_r, True, True, 0)
rm_r = Gtk.Button(label="Remove Route")
rm_r.connect("clicked", lambda b: self._remove_route())
bbox.pack_start(rm_r, True, True, 0)
up_r = Gtk.Button(label="↑ Up")
up_r.connect("clicked", lambda b: self._move_route(-1))
bbox.pack_start(up_r, True, True, 0)
down_r = Gtk.Button(label="↓ Down")
down_r.connect("clicked", lambda b: self._move_route(1))
bbox.pack_start(down_r, True, True, 0)
self.show_all()
if self.run() == Gtk.ResponseType.OK:
self._save()
self.destroy()
def _save(self):
name = self._entry_name.get_text().strip()
if not name:
return
strategy = self._combo_strategy.get_active_id() or "failover"
routes = []
for i, row in enumerate(self._route_store):
if not row[2]:
continue
routes.append({
"name": row[0] or f"Route {i+1}",
"endpoint_name": row[1],
"target_url": row[2],
"api_key": row[3],
"model": row[4],
"priority": i + 1,
"reasoning_enabled": True,
"reasoning_effort": "medium",
})
data = load_bgp_pools()
if self._existing_name:
data["pools"] = [p for p in data["pools"] if p["name"] != self._existing_name]
data["pools"].append({"name": name, "strategy": strategy, "routes": routes})
save_bgp_pools(data)
self._parent_mgr._parent._on_endpoints_updated()
def _add_route(self):
endpoints = load_endpoints().get("endpoints", [])
if not endpoints:
d = Gtk.MessageDialog(self, 0, Gtk.MessageType.INFO, Gtk.ButtonsType.OK,
"No endpoints configured. Add endpoints in Manage Endpoints first.")
d.run(); d.destroy()
return
d = BGPRouteDialog(self, endpoints, None)
if d.result:
r = d.result
self._route_store.append([
r.get("name", ""), r.get("endpoint_name", ""),
r.get("target_url", ""), r.get("api_key", ""),
r.get("model", ""), str(r.get("priority", 99))
])
def _edit_route(self):
sel = self._route_tree.get_selection()
m, i = sel.get_selected()
if not i:
return
endpoints = load_endpoints().get("endpoints", [])
existing = {
"name": m[i][0], "endpoint_name": m[i][1],
"target_url": m[i][2], "api_key": m[i][3],
"model": m[i][4], "priority": int(m[i][5]) if m[i][5] else 99,
}
d = BGPRouteDialog(self, endpoints, existing)
if d.result:
r = d.result
m[i][0] = r.get("name", "")
m[i][1] = r.get("endpoint_name", "")
m[i][2] = r.get("target_url", "")
m[i][3] = r.get("api_key", "")
m[i][4] = r.get("model", "")
m[i][5] = str(r.get("priority", 99))
def _remove_route(self):
sel = self._route_tree.get_selection()
m, i = sel.get_selected()
if i:
self._route_store.remove(i)
def _move_route(self, direction):
sel = self._route_tree.get_selection()
m, i = sel.get_selected()
if not i:
return
path = m.get_path(i)
idx = path.get_indices()[0]
new_idx = idx + direction
if new_idx < 0 or new_idx >= len(self._route_store):
return
row_data = [m[idx][c] for c in range(6)]
self._route_store.remove(m.get_iter(Gtk.TreePath(idx)))
new_iter = self._route_store.insert(new_idx)
for c, v in enumerate(row_data):
self._route_store.set_value(new_iter, c, v)
class BGPRouteDialog(Gtk.Dialog):
def __init__(self, parent, endpoints, existing):
Gtk.Dialog.__init__(self, title="BGP Route", parent=parent, modal=True)
self.add_button("Cancel", Gtk.ResponseType.CANCEL)
self.add_button("OK", Gtk.ResponseType.OK)
self.set_default_size(440, 300)
self.result = None
area = self.get_content_area()
area.set_margin_start(12)
area.set_margin_end(12)
area.set_margin_top(12)
area.set_margin_bottom(12)
area.set_spacing(6)
grid = Gtk.Grid(column_spacing=8, row_spacing=6)
area.pack_start(grid, False, False, 0)
def add_row(row, label, widget):
grid.attach(Gtk.Label(label=label, xalign=1), 0, row, 1, 1)
grid.attach(widget, 1, row, 1, 1)
self._entry_name = Gtk.Entry(text=existing.get("name", "") if existing else "")
add_row(0, "Route Name:", self._entry_name)
self._combo_ep = Gtk.ComboBoxText()
ep_names = [e["name"] for e in endpoints]
for en in ep_names:
self._combo_ep.append(en, en)
if existing and existing.get("endpoint_name") in ep_names:
self._combo_ep.set_active_id(existing["endpoint_name"])
elif ep_names:
self._combo_ep.set_active(0)
self._combo_ep.connect("changed", lambda b: self._on_ep_changed(endpoints))
add_row(1, "Endpoint:", self._combo_ep)
self._entry_url = Gtk.Entry()
add_row(2, "URL:", self._entry_url)
self._entry_key = Gtk.Entry()
self._entry_key.set_visibility(False)
add_row(3, "API Key:", self._entry_key)
self._combo_model = Gtk.ComboBoxText()
add_row(4, "Model:", self._combo_model)
if existing:
self._entry_url.set_text(existing.get("target_url", ""))
self._entry_key.set_text(existing.get("api_key", ""))
self._on_ep_changed(endpoints)
if existing and existing.get("model"):
self._combo_model.set_active_id(existing["model"])
self.show_all()
if self.run() == Gtk.ResponseType.OK:
ep_name = self._combo_ep.get_active_text() or ""
ep = None
for e in endpoints:
if e["name"] == ep_name:
ep = e
break
self.result = {
"name": self._entry_name.get_text().strip() or ep_name,
"endpoint_name": ep_name,
"target_url": self._entry_url.get_text().strip(),
"api_key": self._entry_key.get_text().strip(),
"model": self._combo_model.get_active_text() or "",
"priority": 99,
}
if ep:
self.result["reasoning_enabled"] = ep.get("reasoning_enabled", True)
self.result["reasoning_effort"] = ep.get("reasoning_effort", "medium")
self.result["oauth_provider"] = ep.get("oauth_provider", "")
self.destroy()
def _on_ep_changed(self, endpoints):
ep_name = self._combo_ep.get_active_text()
ep = None
for e in endpoints:
if e["name"] == ep_name:
ep = e
break
if ep:
self._entry_url.set_text(normalize_base_url(ep.get("base_url", "")))
self._entry_key.set_text(ep.get("api_key", ""))
self._combo_model.remove_all()
for m in ep.get("models", []):
mid = normalize_model_id(m) if m else ""
self._combo_model.append(mid, m)
if ep.get("default_model"):
self._combo_model.set_active_id(normalize_model_id(ep["default_model"]))
elif len(ep.get("models", [])) > 0:
self._combo_model.set_active(0)
_USAGE_COLORS = {
"green": "#27ae60", "yellow": "#f39c12", "orange": "#e67e22",
"red": "#e74c3c", "blue": "#3498db", "purple": "#9b59b6",
"dark": "#2c3e50", "light": "#ecf0f1", "mid": "#bdc3c7",
}
_USAGE_STATS_FILE = HOME / ".cache/codex-proxy/usage-stats.json"
def _load_usage_stats():
try:
if _USAGE_STATS_FILE.exists():
return json.loads(_USAGE_STATS_FILE.read_text())
except Exception:
pass
return {"providers": {}, "updated": None}
def _bar_color(pct):
if pct < 0.5:
return _USAGE_COLORS["green"]
if pct < 0.8:
return _USAGE_COLORS["yellow"]
return _USAGE_COLORS["red"]
class UsageWindow(Gtk.Window):
def __init__(self, parent):
super().__init__(title="Usage Stats")
self.set_transient_for(parent)
self.set_default_size(640, 560)
self.set_position(Gtk.WindowPosition.CENTER)
self._parent = parent
vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=0)
self.add(vbox)
header = Gtk.Box(spacing=8)
header.set_margin_start(16)
header.set_margin_end(16)
header.set_margin_top(12)
header.set_margin_bottom(8)
vbox.pack_start(header, False, False, 0)
title = Gtk.Label()
title.set_markup('<span font="14" weight="bold" foreground="#2c3e50">Usage Dashboard</span>')
header.pack_start(title, False, False, 0)
refresh_btn = Gtk.Button(label="Refresh")
refresh_btn.connect("clicked", lambda b: self._refresh())
header.pack_end(refresh_btn, False, False, 0)
self._updated_lbl = Gtk.Label()
self._updated_lbl.set_markup('<span foreground="#95a5a6" size="small">Never</span>')
header.pack_end(self._updated_lbl, False, False, 8)
sep = Gtk.Separator()
vbox.pack_start(sep, False, False, 0)
self._cards_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8)
sw = Gtk.ScrolledWindow()
sw.set_policy(Gtk.PolicyType.NEVER, Gtk.PolicyType.AUTOMATIC)
sw.add(self._cards_box)
vbox.pack_start(sw, True, True, 0)
self._refresh()
self.show_all()
def _refresh(self):
for c in self._cards_box.get_children():
self._cards_box.remove(c)
stats = _load_usage_stats()
updated = stats.get("updated")
if updated:
self._updated_lbl.set_markup(f'<span foreground="#95a5a6" size="small">Updated: {updated}</span>')
providers = stats.get("providers", {})
if not providers:
empty = Gtk.Label()
empty.set_markup('<span foreground="#95a5a6" size="large">No usage data yet.\nLaunch a session to start tracking.</span>')
empty.set_margin_top(60)
self._cards_box.pack_start(empty, False, False, 0)
self._cards_box.show_all()
return
sorted_providers = sorted(providers.items(), key=lambda x: x[1].get("total_requests", 0), reverse=True)
for prov_name, prov_data in sorted_providers:
card = self._build_card(prov_name, prov_data)
self._cards_box.pack_start(card, False, False, 0)
self._cards_box.show_all()
def _build_card(self, name, data):
frame = Gtk.Frame()
frame.set_margin_start(12)
frame.set_margin_end(12)
frame.set_margin_top(4)
frame.set_margin_bottom(4)
style = frame.get_style_context()
style.add_class("card")
outer = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=4)
outer.set_margin_start(12)
outer.set_margin_end(12)
outer.set_margin_top(8)
outer.set_margin_bottom(8)
frame.add(outer)
top_row = Gtk.Box(spacing=8)
outer.pack_start(top_row, False, False, 0)
total = data.get("total_requests", 0)
ok = data.get("successes", 0)
fail = data.get("failures", 0)
success_rate = ok / total if total > 0 else 1.0
name_lbl = Gtk.Label()
short = name.replace("https://", "").replace("http://", "").split("/")[0]
name_lbl.set_markup(f'<span weight="bold" foreground="#2c3e50" size="medium">{short}</span>')
top_row.pack_start(name_lbl, False, False, 0)
req_lbl = Gtk.Label()
req_lbl.set_markup(f'<span foreground="#7f8c8d" size="small">{total} requests</span>')
top_row.pack_start(req_lbl, False, False, 8)
if fail > 0:
err_lbl = Gtk.Label()
err_lbl.set_markup(f'<span foreground="{_USAGE_COLORS["red"]}" size="small">{fail} failed</span>')
top_row.pack_start(err_lbl, False, False, 4)
last_used = data.get("last_used", "")
if last_used:
lu_lbl = Gtk.Label()
lu_lbl.set_markup(f'<span foreground="#95a5a6" size="x-small">{last_used}</span>')
top_row.pack_end(lu_lbl, False, False, 0)
# Progress bar for success rate
bar = Gtk.ProgressBar()
bar.set_fraction(success_rate)
bar_pct = int(success_rate * 100)
bar.set_text(f"{bar_pct}% success")
bar.set_show_text(True)
bar.set_margin_top(2)
bar.set_margin_bottom(2)
color = _bar_color(1.0 - success_rate)
bar_css = f'progress {{ background-color: {color}; border-radius: 4px; }} trough {{ border-radius: 4px; min-height: 10px; }}'
provider = Gtk.CssProvider()
provider.load_from_data(bar_css.encode())
bar.get_style_context().add_provider(provider, Gtk.STYLE_PROVIDER_PRIORITY_USER)
outer.pack_start(bar, False, False, 0)
# Stats row
stats_row = Gtk.Box(spacing=16)
outer.pack_start(stats_row, False, False, 0)
t_in = data.get("total_tokens_in", 0)
t_out = data.get("total_tokens_out", 0)
dur = data.get("total_duration_s", 0.0)
avg_dur = dur / total if total > 0 else 0
for label, value in [
("Tokens In", f"{t_in:,}"),
("Tokens Out", f"{t_out:,}"),
("Avg Latency", f"{avg_dur:.1f}s"),
]:
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=1)
l = Gtk.Label()
l.set_markup(f'<span foreground="#95a5a6" size="x-small">{label}</span>')
box.pack_start(l, False, False, 0)
v = Gtk.Label()
v.set_markup(f'<span weight="bold" foreground="#2c3e50" size="small">{value}</span>')
box.pack_start(v, False, False, 0)
stats_row.pack_start(box, False, False, 0)
# Models breakdown
models = data.get("models", {})
if len(models) > 0:
model_str = " ".join(
f'<span foreground="#3498db" size="x-small">{m}</span> '
f'<span foreground="#7f8c8d" size="x-small">({md.get("requests",0)})</span>'
for m, md in sorted(models.items(), key=lambda x: x[1].get("requests", 0), reverse=True)[:4]
)
m_lbl = Gtk.Label()
m_lbl.set_markup(f'<span size="x-small">Models:</span> {model_str}')
m_lbl.set_line_wrap(True)
m_lbl.set_xalign(0)
outer.pack_start(m_lbl, False, False, 2)
# Error info
last_err = data.get("last_error")
if last_err:
err_lbl = Gtk.Label()
err_lbl.set_markup(f'<span foreground="{_USAGE_COLORS["red"]}" size="x-small">Last error: {last_err}</span>')
err_lbl.set_xalign(0)
outer.pack_start(err_lbl, False, False, 0)
return frame
def main():
for d in [LOG_DIR, PROXY_CONFIG_DIR]:
d.mkdir(parents=True, exist_ok=True)
# Create default endpoints if none exist
if not ENDPOINTS_FILE.exists():
save_endpoints({
"default": "OpenAI",
"endpoints": [
{"name": "OpenAI", "backend_type": "native", "base_url": "https://api.openai.com/v1",
"api_key": "", "default_model": "gpt-4o", "models": ["gpt-4o", "gpt-4o-mini"],
"provider_preset": "OpenAI"},
{"name": "Z.AI", "backend_type": "openai-compat",
"base_url": "https://api.z.ai/api/coding/paas/v4",
"api_key": "", "default_model": "glm-5.1",
"models": ["glm-4.5", "glm-4.5-air", "glm-4.6", "glm-4.7", "glm-5", "glm-5-turbo", "glm-5.1"],
"provider_preset": "Custom"},
],
})
w = LauncherWin()
w.connect("destroy", Gtk.main_quit)
Gtk.main()
if __name__ == "__main__":
main()