Files
Codex-Launcher---Any-AI-Por…/codex_launcher_lib.py

2278 lines
98 KiB
Python

#!/usr/bin/env python3
"""Codex Launcher shared library — pure Python stdlib, zero GUI dependencies.
Provides cross-platform utility functions for both the GTK GUI (Linux) and
the tkinter GUI (Windows). No pip dependencies. No GTK/PyGObject imports.
"""
import base64
import collections
import contextlib
import hashlib
import json
import os
import re
import secrets
import shutil
import signal
import socket
import ssl
import subprocess
import sys
import tempfile
import threading
import time
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
UA = "codex-launcher/1.0"
# ═══════════════════════════════════════════════════════════════════════
# Platform detection
# ═══════════════════════════════════════════════════════════════════════
IS_WINDOWS = sys.platform == "win32"
HOME = Path.home()
if IS_WINDOWS:
_LOCAL_APPDATA = Path(os.environ.get("LOCALAPPDATA", HOME / "AppData/Local"))
PROXY_CONFIG_DIR = _LOCAL_APPDATA / "codex-proxy"
CONFIG_DIR = HOME / ".codex"
BIN_DIR = _LOCAL_APPDATA / "Programs" / "Codex-Launcher"
LOG_DIR = _LOCAL_APPDATA / "codex-proxy"
PID_REGISTRY = _LOCAL_APPDATA / "codex-proxy" / "pids.json"
_USAGE_STATS_FILE = _LOCAL_APPDATA / "codex-proxy" / "usage-stats.json"
MONITORING_FILE = _LOCAL_APPDATA / "codex-proxy" / "monitoring-config.json"
INCIDENT_STORE_FILE = _LOCAL_APPDATA / "codex-proxy" / "incident-store.json"
MONITORING_LOG = _LOCAL_APPDATA / "codex-proxy" / "monitoring.log"
REQUEST_SNAP_DIR = _LOCAL_APPDATA / "codex-proxy" / "requests"
else:
PROXY_CONFIG_DIR = HOME / ".cache/codex-proxy"
CONFIG_DIR = HOME / ".codex"
BIN_DIR = HOME / ".local/bin"
LOG_DIR = HOME / ".cache/codex-proxy"
PID_REGISTRY = HOME / ".cache/codex-proxy" / "pids.json"
_USAGE_STATS_FILE = HOME / ".cache/codex-proxy/usage-stats.json"
MONITORING_FILE = HOME / ".cache/codex-proxy/monitoring-config.json"
INCIDENT_STORE_FILE = HOME / ".cache/codex-proxy/incident-store.json"
MONITORING_LOG = HOME / ".cache/codex-proxy/monitoring.log"
REQUEST_SNAP_DIR = HOME / ".cache/codex-proxy/requests"
CONFIG = CONFIG_DIR / "config.toml"
CONFIG_BAK = CONFIG_DIR / "config.toml.launcher-bak"
CONFIG_TXN = CONFIG_DIR / "config.toml.launcher-txn.json"
ENDPOINTS_FILE = CONFIG_DIR / "endpoints.json"
BGP_POOLS_FILE = CONFIG_DIR / "bgp-pools.json"
LAUNCH_LOG = LOG_DIR / "launcher.log"
OAUTH_SECRETS_PATH = HOME / ".config" / "codex-launcher" / "oauth-secrets.json"
if IS_WINDOWS:
PROXY = BIN_DIR / "translate-proxy.py"
CLEANUP = BIN_DIR / "cleanup-codex-stale.py"
START_SH = None
else:
PROXY = BIN_DIR / "translate-proxy.py"
CLEANUP = BIN_DIR / "cleanup-codex-stale.sh"
START_SH = Path("/opt/codex-desktop/start.sh")
DEFAULT_CONFIG = """model = ""
model_provider = ""
model_catalog_json = ""
"""
CHANGELOG = [
("10.13.8", "2026-05-27", [
"Fix: force_finalize skips Gemini call entirely (was hallucinating tool calls without tools)",
"Fix: _send_ag_finalize returns status=failed (was stored as valid history causing loops)",
"Fix: _forward_gemini_sse wrapped in try/except for TimeoutError/BrokenPipe",
"Fix: file tracker mutations inside lock scope (was racing in ThreadingHTTPServer)",
"Fix: compaction summary strips raw tool outputs (was re-triggering read loops)",
"Fix: post-compaction write directive when 10+ reads with 0 writes",
"Fix: detect get_goal/completion_budget null-tool loops (3+ → force finalize)",
"Fix: read-loop threshold raised to 8 same-file / 40 total (was too aggressive at 5/30)",
"Fix: strip timestamps from loop hash, base64 image data from normalizer",
]),
("3.12.1", "2026-05-27", [
"Fix Antigravity adapter (PR #15): simplify model resolution",
"Removed broken schema sanitization, restored correct headers",
"Expanded model alias map for all Antigravity variants",
"Re-enabled gRPC fallback by default",
]),
("3.12.0", "2026-05-27", [
"gRPC auto-fallback for Antigravity provider (PR #13)",
"New antigravity_grpc module with protobuf client",
"REST 404 triggers gRPC fallback using display names",
"gRPC supports streaming and unary generate",
"Dynamic version fetch with probe validation",
"Antigravity v2 handler rewrite (anti-api approach)",
"Safety settings, stopSequences, sessionId, requestType: agent",
]),
("3.11.11", "2026-05-26", [
"Final trimming only removes plain messages, never function_call_output",
]),
("3.11.10", "2026-05-26", [
"Fix Antigravity: interleave function_call/output pairs in correct sequence (PR #11)",
"Fix Gemini sanitizer: trim leading/trailing non-user turns for Google API compliance",
"Stricter function call/response isolation — no merging across role boundaries",
]),
("3.11.9", "2026-05-26", [
"Fix Antigravity: preserve functionCall/functionResponse in Gemini sanitizer (PR #10)",
"Prevents tool responses from being merged/dropped in multi-turn Antigravity sessions",
]),
("3.11.8", "2026-05-26", [
"Vision description cache persisted across requests (no redundant API calls for same image)",
"Merge PR #8: fix vision cache persistence across requests",
]),
("3.11.7", "2026-05-26", [
"Vision auto-detect: uses provider's own vision model (e.g. 0G-Qwen-VL) as fallback for image description",
"Vision preprocessing replaces image stripping: images described via API instead of just removed",
"Fix AttributeError in image_url handling when value is string not dict",
"Merge PR #6: vision/OCR preprocessing for text-only models",
"Merge PR #7: 177 unit tests for translate-proxy.py",
"Auth os error 2 fix: GUI shows config-missing message instead of raw error",
]),
("3.11.6", "2026-05-26", [
"Antigravity loop breakers: per-session tracking, edit-intent nudge (first turn only)",
"Loop breaker: same tool+args repeated 5+ times triggers force finalization",
"Latest user instruction appended exactly once per request",
"Detailed [antigravity-loop] logging for all tracking fields",
"has_content fix: function_call now counts as valid output (no more infinite loops)",
"Antigravity-only changes, no touch to other providers",
]),
("3.11.5", "2026-05-26", [
"Token-aware compaction: fixes context_length_exceeded on small-context models (25 items x 1600 tokens)",
"Proactive compaction triggers on token count (>80% model limit), not just item count",
"Universal adaptive compaction: removed crof.ai-only gates, all providers get compaction",
"Vision model detection: strips images for non-vision models, keeps for vision-capable ones",
"Per-model token limit learning from context_length_exceeded error messages",
"Compaction aggression levels: normal vs extreme when tokens > 1.5x model limit",
"Smart-continue text-tool detection: triggers on tool-call text patterns, not just function_call_output",
"Active endpoint sync: GUI auto-removes stale endpoint references on startup",
]),
("3.11.0", "2026-05-26", [
"Merge cobra PR: concurrency semaphore (max 3), auto-continue for truncated text",
"SO_REUSEADDR on sticky port, proxy-stderr.log, stream diagnostics logging",
"Timeout/OSError handler sends response.failed SSE instead of silent drop",
"Restart Proxy button: only restarts proxy without killing Codex Desktop",
"Tool call argument normalizer: fixes Arguments->arguments, strips markdown wrapping",
"Smart-continue loop (2x retries): escalating nudges when model stops text-only mid-task",
"XML tool call extraction: parses patterns from text, injects as real calls",
"Auto-continue + smart-continue ordered with skip guard to avoid double-firing",
"API key hot-reload with mtime tracking + /admin/reload + /admin/verify-key endpoints",
"GUI hot-reload: auto-refreshes proxy key on endpoint edit, verifies with upstream",
"Synthetic tool-results disabled: was causing deepseek-v4-pro truncation on opencode.ai",
]),
("3.10.12", "2026-05-26", [
"Sticky endpoint: caches last working endpoint, sequential fallback on failure",
"Endpoint order: cloudcode-pa first (matches agy CLI), daily-cloudcode-pa fallback",
"Anti-stall engine: kills stale proxy processes + clears pycache on startup",
"Smart error classification: quota vs capacity vs banned vs validation vs auth",
"Rate limit reset parsing: extracts cooldown from error body for accuracy",
"Missing headers: X-Client-Name, X-Client-Version, x-goog-api-client, sessionId",
"Guardrail skip: simple messages (hi) skip agent guardrail, no more tool-call loops",
"Claude fixes: preserve all tools, skip compaction/normalizer/sanitization for Claude",
"Normalizer model param: distinguishes Claude vs Gemini for correct behavior",
]),
("3.10.11", "2026-05-26", [
"Hybrid endpoint fallback: cloudcode-pa then daily-cloudcode-pa on 429",
"daily-cloudcode-pa.googleapis.com (same endpoint agy-core uses)",
"429 errors log full response body for debugging",
"Rate-limit marking only after ALL endpoints fail",
"Restored SERVICE_DISABLED (403) fallthrough",
]),
("3.10.10", "2026-05-25", [
"Fix normalizer stripping ALL context after compaction on resumed sessions",
"No auto-reset when compaction summary present (preserves 1925+ turn history)",
"Always preserve compaction summaries in normalizer output",
"Deduplicate consecutive identical goal_context messages",
"Emergency reset preserves compaction summaries",
"Fix hashlib NameError in _antigravity_normalize_context (string comparison instead)",
]),
("3.10.9", "2026-05-25", [
"Antigravity: production-only endpoints (cloudcode-pa.googleapis.com), sandbox blocked unless ALLOW_ANTIGRAVITY_STAGING=1",
"Antigravity: 403 SERVICE_DISABLED falls through, 429 returns to client (no sandbox fallback)",
"AntigravityContextNormalizer: bounded context — simple messages send minimal payload",
"Simple message detector: 'hi' etc sends only user message, no tool history",
"Auto-reset polluted context: 200+ items with simple message resets to minimal",
"Duplicate user message removal, tool output budget (max 2 verbatim, rest summarized)",
"Hard limits: 20 contents, 120K/250K/500K char budgets",
"Claude thinking fix: maxOutputTokens=64000, snake_case thinking config, VALIDATED toolConfig",
"Claude budgets: low=8192, medium=16384, high=32768",
"All fixes scoped to OAUTH_PROVIDER==google-antigravity only",
"Project discovery uses production endpoint (not staging)",
"z.ai: full OpenClaw attribution headers (cobra91 PR #4)",
"OpenRouter: X-OpenRouter-Cache header (cobra91 PR #4)",
"Fix Linux Re-OAuth: load_oauth_secrets() was undefined",
"Fix GLib.idle_add lambda returning truthy tuple",
]),
("3.10.7", "2026-05-25", [
"Prompt Enhancer: per-provider toggle to improve prompt clarity after compaction",
"Two modes: offline (template injection) and ai-powered (external LLM rewrites)",
"Offline mode: injects structured instructions to keep model focused post-compaction",
"AI-powered mode: uses configurable model/URL/key to rewrite prompts for clarity",
"Linux/Windows GUI: Prompt Enhancer switch + mode selector + model/URL/key fields",
"Prevents lost context issues in long sessions with aggressive compaction",
]),
("3.10.6", "2026-05-25", [
"Freebuff integration: free DeepSeek/Kimi via codebuff.com API",
"Fixed Freebuff User-Agent to match official SDK (ai-sdk/openai-compatible/1.0.25/codebuff)",
"Fixed Freebuff metadata: freebuff_instance_id + client_id (base36) + cost_mode: free",
"Fixed Codebuff OAuth: use www.codebuff.com (307 redirect on bare domain)",
"GUI preset aliases: Freebuff, FreeBuff, Codebuff all map to same backend",
"Windows GUI consolidated into src/ (merged by cobra91)",
"CROF adaptive logic gated to crof.ai only — no log pollution for other providers",
"Data dir consolidation: all data in codex-proxy/",
"Sticky proxy port: persists in .last-proxy-port for restart persistence",
"Adaptive compact budget raised 60% to 80% for large-context models",
"Config cleanup fix: stale proxy-*.json moved after _init_runtime()",
"Windows GUI: Clear Log, Restart Proxy, View Log buttons (cobra91 PR #3)",
"OAuth Secrets dialog shows all providers: Google + Freebuff/Codebuff",
"Re-OAuth buttons for each provider: re-authenticate Google or GitHub/Codebuff",
"Token status indicators (valid/missing) for each Google provider",
"Shows logged-in email and auth status for Freebuff/Codebuff",
"Linux/Windows feature parity: both GUIs have identical features",
"Windows: OAuth Secrets all-providers + Codebuff OAuth + Sync from Preset",
"Linux: Clear Log + Restart Proxy buttons added",
]),
("3.10.5", "2026-05-25", [
"Context compaction for Antigravity/Gemini OAuth — prevents token limit errors",
"Aggressive compaction policies at 60% of model context limit",
"Compaction for cloudcode-pa and googleapis provider policies",
"REST model IDs added to context size map (gemini-3-flash, etc.)",
"OAuth Secrets editor in GUI — update client ID/secret without editing files",
"Secrets stored in ~/.config/codex-launcher/oauth-secrets.json (not in repo)",
"Import JSON button — import client_secret_*.json from Google Cloud Console",
"All hardcoded OAuth secrets removed from source code",
"Antigravity model IDs fixed: display names → slug model IDs for REST API",
"Git history scrubbed of leaked credentials; pre-push hook installed",
"Antigravity REST API model IDs verified with live API testing",
"Gemini 3.5 Flash, 3.1 Pro, Claude 4.6, GPT-OSS 120B all working",
]),
("3.9.9", "2026-05-25", [
"Refresh Antigravity preset: Gemini 3.5 Flash, Gemini 3.1 Pro, Claude 4.6, GPT-OSS",
"Fix Antigravity alias map for tiered model IDs (high/medium/low/thinking)",
"Model context sizes for Gemini 3.5 Flash, 3.1 Pro, Claude 4.6, GPT-OSS 120B",
]),
("3.9.8", "2026-05-25", [
"Fix Desktop model leak — remap gpt-5.4-mini to user-selected model",
"send_json() catches BrokenPipeError globally — no crashes on disconnect",
"Proxy remaps Desktop forced models via CODEX_LAUNCHER_MODEL env",
]),
("3.9.7", "2026-05-25", [
"Forward real Codebuff error messages instead of generic 429",
"Return HTTP 200 with Responses API format for rate limits",
"Extract retryAfterMs from Codebuff 429 responses for cooldown",
"RateLimitError carries upstream message through all paths",
"BrokenPipeError fix on 'all accounts exhausted' response",
"Fix 3 SyntaxWarnings for invalid escape sequences",
"_codebuff_start_run returns actual error body",
]),
("3.9.6", "2026-05-25", [
"Fix Gemini follow-up turns: enforce latest user instruction as final turn",
"Edit-intent detection with tool-use nudge for file modifications",
"Thought signature preservation for Gemini 3 tool-call continuity",
"Smart tool output compaction: old=3000, recent=20000 chars",
"Multi-account rotation for codebuff, Google OAuth, API keys",
"/v1/accounts endpoint for account pool status",
]),
("3.9.0", "2026-05-24", [
"Multi-account rotation for OAuth providers (codebuff, Google, API keys)",
"Automatic failover on rate limit — next account used",
"Codebuff: accounts[] array in credentials.json",
"Google OAuth: multiple token files (google-*-oauth-token-N.json)",
"API keys: comma-separated keys rotate on 429 errors",
"/v1/accounts endpoint shows account pool status",
"x-codebuff-model and x-codebuff-instance-id headers",
]),
("3.8.4", "2026-05-24", [
"Codebuff streaming — SSE events reach Codex client",
"stream_buffered_events now called for codebuff",
"Codebuff OAuth built-in login flow (no external CLI)",
"Codebuff API: reverse-engineered www.codebuff.com endpoints",
"Codebuff session management with instance ID",
"Codebuff agent run lifecycle (start/finish) with model routing",
"Free DeepSeek V4 Pro, V4 Flash, Kimi K2.6, MiniMax M2.7",
"Reasoning mode works with codebuff (thinking tokens supported)",
"GUI: Sandbox mode selector (Read-only / Workspace / Full Access)",
"GUI: Approval mode selector (Untrusted / On Request / Full Auto)",
"GUI: Codebuff Login button in endpoint editor",
]),
("3.8.1", "2026-05-24", [
"Freebuff integration — free DeepSeek V4 Pro, V4 Flash, Kimi K2.6, MiniMax M2.7",
"Freebuff backend: auto agent-run lifecycle, credential detection, model routing",
"Restored all provider presets (Command Code, Crof, OpenAdapter, OpenRouter, etc.)",
"AI Monitoring — self-healing watchdog with 3-tier response system",
"HealthWatcher: monitors proxy health every 5s, auto-restarts on crash",
"LogAnalyzer: tails debug logs for 18 failure signal patterns",
"Tier 1: 14 rule-based auto-recovery rules (< 1 s response)",
"Tier 2: Incident pattern store with success rate tracking",
"Tier 3: AI diagnostic agent — configurable provider/model for novel failures",
"30 fault types catalogued across 5 categories (A-E)",
"GUI: AI Monitor panel with ON/OFF, provider selector, incident log",
"Enhanced /health endpoint with memory and uptime metrics",
]),
("3.7.0", "2026-05-22", [
"Intelligence Routing — self-healing parser system for Command Code",
"Layer 1: Deep URL extraction from nested JSON in explore_agent blocks",
"Layer 2: Auto-proceed on require_escalation / request_escalation_permission blocks",
"Layer 3: Intent-based command synthesis when all parsers fail (5 heuristics)",
"Module-level _build_explore_cmd() — reuses URL extraction across parser + stream",
"54 self-test patterns covering all three Intelligence Routing layers",
]),
("3.6.0", "2026-05-22", [
"Connection pooling — persistent HTTPS connections per host",
"Stream idle timeout (300s) — kills silent streams instead of hanging",
"Retry-After header support on all retry paths",
"Bounded stream buffers (8MB) — prevents OOM",
"Dual logging to proxy.log + stderr",
]),
("3.5.0", "2026-05-22", [
"Command Code adapter overhaul — 17 patches for multi-format tool-call parsing",
"DSML, XML, explore_agent, bash blocks, raw JSON parser chain",
"Self-revive watchdog — auto-restarts proxy on crash",
"Debug-to-file logging in cc-debug.log",
"Inline self-test (19 patterns)",
]),
("3.3.0", "2026-05-20", [
"Antigravity + Gemini CLI OAuth — full Codex agent loop working",
"Auto-continue on MAX_TOKENS for Gemini/Antigravity",
"BGP++ route scoring and provider policy layer",
]),
("3.0.0", "2026-05-20", [
"Major overhaul — ThreadingHTTPServer, thread-safe state, graceful shutdown",
"Dynamic port allocation, proxy health gating, atomic config",
"Usage Dashboard v2 with dark theme",
]),
("2.7.0", "2026-05-20", [
"Usage Dashboard redesigned (OpenUsage-inspired dark theme)",
"TCP_NODELAY streaming, Anthropic prompt caching",
]),
("2.6.1", "2026-05-20", [
"Google OAuth rebuilt to emulate Gemini CLI — no client_secret.json needed",
"Uses Google's public OAuth client_id (same as gemini-cli)",
"PKCE + CSRF state protection for secure auth",
"Just click OAuth Login — browser opens — authorize — done",
"Includes cloud-platform scope for Gemini Code Assist compatibility",
]),
("2.6.0", "2026-05-20", [
"Usage Dashboard — per-provider request/token/latency tracking",
"Visual cards with success rate bars, model breakdown, error tracking",
"Google OAuth: browse for client_secret.json instead of fixed path",
]),
("2.5.1", "2026-05-20", [
"Adaptive retry for 429/502/503 errors with exponential backoff",
"BGP routes also retry transient errors before failing over",
"Proxy socket reuse — no more 'Address already in use' crashes",
"BGP route count shown at proxy startup",
]),
("2.5.0", "2026-05-20", [
"AI BGP — multi-provider routing with automatic failover",
"Create BGP pools with ordered routes from any configured endpoint",
"Each route uses its own endpoint URL, API key, and model",
"Failover strategy: tries primary, falls back on error/timeout",
"BGP pools appear in endpoint dropdown with shuffle icon",
"Up/down reordering for route priority in pool editor",
"Fixed TOML config breakage from multi-line paste in fields",
]),
("2.4.0", "2026-05-20", [
"Added OpenAdapter provider preset (api.openadapter.in)",
"One API key access to 40+ models — GLM, DeepSeek, Kimi, Qwen, Claude, GPT, Gemini",
"Fixed Add/Edit dialog crash (missing _on_reasoning_toggled method)",
"Redesigned Google OAuth flow with live status dialog",
]),
("2.3.2", "2026-05-20", [
"Added Google Gemini provider with OAuth support",
"Two presets: 'Google Gemini (API Key)' and 'Google Gemini (OAuth)'",
"OAuth Login button in endpoint editor — full Google OAuth2 flow with auto-refresh",
"Auto-refreshes OAuth access tokens when expired (no manual re-login needed)",
"Supports gemini-2.5-flash, gemini-2.5-pro, gemini-2.0-flash, and more",
"Uses Gemini's OpenAI-compatible endpoint — works with existing proxy",
]),
("2.3.0", "2026-05-20", [
"Adaptive Crof self-healing system — auto-adjusts to Crof model limits",
"Tracks per-model success/failure history, learns item count limits dynamically",
"Proactively compacts input when above learned limit before sending to Crof",
"Auto-retries on finish_reason=length — aggressively compacts and resends",
"Prevents 'stream disconnected' and 'incomplete' errors on long conversations",
]),
("2.2.1", "2026-05-20", [
"Fixed compaction orphaning function_call_output items — root cause of Crof incomplete responses",
"Compaction now respects function_call/function_call_output pairs — no more dangling tool results",
"Fixed reasoning control: reasoning_effort=none now always sends enable_thinking=false too",
]),
("2.2.0", "2026-05-20", [
"Added per-provider Reasoning On/Off toggle in endpoint editor",
"Added Reasoning Effort level per provider: None, Minimal, Low, Medium, High, Max",
"When reasoning is OFF: sends enable_thinking=false + reasoning_effort=none to upstream API",
"When reasoning is ON: sends user-selected effort level (default: Medium)",
"Fixes Crof mimo-v2.5-pro and similar reasoning models exhausting output tokens",
"Strip reasoning_content from proxy output — Codex doesn't use it",
"Force max_tokens=64000 minimum for openai-compat providers",
]),
("2.1.3", "2026-05-19", [
"Fixed Crof mimo-v2.5-pro stopping: reasoning_content exhausted all output tokens",
"Strip reasoning_content from proxy output — Codex doesn't use it, avoids token waste",
"Force max_tokens=64000 minimum for openai-compat providers",
]),
("2.1.2", "2026-05-19", [
"Fixed Crof.ai and providers stopping after first tool call (root cause: None tool IDs)",
"Codex sends function_call items with id=None — proxy now matches tool results to calls by position",
"Fixed orphan message output item when response has only tool calls (no text)",
"Auto-trims long conversations (>30 items) to prevent context overflow on providers like Crof",
"Added request/response logging to ~/.cache/codex-proxy/requests.log",
]),
("2.1.1", "2026-05-19", [
"Fixed proxy: map 'developer' role to 'system' for Chat Completions providers",
"Fixed proxy: map 'developer' role to 'user' for Anthropic providers",
"Forward 'instructions' field from Responses API as system message/param",
"Fixes DeepSeek and other providers rejecting unknown 'developer' role",
]),
("2.1.0", "2026-05-19", [
"Added Codex auth status detection (codex login status)",
"Added Re-login button to re-authenticate via codex login",
"Auto-checks auth before launching Codex Default mode",
"Warns if OAuth token expired or missing before launch",
]),
("2.0.1", "2026-05-19", [
"Added Codex CLI/Desktop installation verifier to main page",
"Disables Desktop/CLI launch buttons when corresponding tool is missing",
"Shows install instructions in status area on startup",
]),
("2.0.0", "2026-05-19", [
"Initial release: multi-provider Codex Launcher",
"Translation proxy: Responses API to Chat Completions + Anthropic Messages",
"GTK endpoint manager with 10+ provider presets",
"Codex Default mode (built-in OAuth, zero config)",
"Browser UA injection for Cloudflare-protected providers (OpenCode)",
"Streaming SSE, tool calls, reasoning content support",
"Profile backup/import, model auto-fetch, bulk import",
"Refresh Models in background thread",
"URL normalization to prevent double-path bugs",
"Config backup/restore around sessions",
".deb installer package",
]),
]
# ═══════════════════════════════════════════════════════════════════════
# Provider presets (17 providers)
# ═══════════════════════════════════════════════════════════════════════
PROVIDER_PRESETS = {
"Custom": {
"backend_type": "openai-compat",
"base_url": "",
"models": [],
},
"OpenAI": {
"backend_type": "native",
"base_url": "https://api.openai.com/v1",
"models": ["gpt-4o", "gpt-4o-mini"],
},
"Anthropic": {
"backend_type": "anthropic",
"base_url": "https://api.anthropic.com/v1",
"models": ["claude-sonnet-4-5", "claude-3-5-haiku-latest"],
},
"OpenCode Zen (OpenAI-compatible)": {
"backend_type": "openai-compat",
"base_url": "https://opencode.ai/zen/v1",
"models": [
"glm-5.1", "glm-5", "kimi-k2.5", "kimi-k2.6",
"minimax-m2.7", "minimax-m2.5", "minimax-m2.5-free",
"deepseek-v4-flash-free", "nemotron-3-super-free",
"qwen3.6-plus", "qwen3.5-plus", "big-pickle",
],
},
"OpenCode Zen (Anthropic)": {
"backend_type": "anthropic",
"base_url": "https://opencode.ai/zen/v1",
"models": [
"claude-opus-4-7", "claude-opus-4-6", "claude-opus-4-5",
"claude-opus-4-1", "claude-sonnet-4-6", "claude-sonnet-4-5",
"claude-sonnet-4", "claude-haiku-4-5", "claude-3-5-haiku",
],
},
"OpenCode Go (OpenAI-compatible)": {
"backend_type": "openai-compat",
"base_url": "https://opencode.ai/zen/go/v1",
"models": [
"glm-5.1", "glm-5", "kimi-k2.5", "kimi-k2.6",
"mimo-v2.5", "mimo-v2.5-pro", "minimax-m2.7", "minimax-m2.5",
"qwen3.6-plus", "qwen3.5-plus", "deepseek-v4-pro", "deepseek-v4-flash",
],
},
"OpenCode Go (Anthropic)": {
"backend_type": "anthropic",
"base_url": "https://opencode.ai/zen/go/v1",
"models": ["minimax-m2.7", "minimax-m2.5"],
},
"Crof.ai": {
"backend_type": "openai-compat",
"base_url": "https://crof.ai/v1",
"models": [],
},
"Ocenza": {
"backend_type": "openai-compat",
"base_url": "https://global.ocenza.com/v1",
"models": [
"gpt-oss-120b", "mimo-v2-pro", "mimo-v2.5", "mimo-v2.5-pro",
],
},
"MiMo (Xiaomi)": {
"backend_type": "openai-compat",
"base_url": "https://token-plan-sgp.xiaomimimo.com/v1",
"models": [
"mimo-v2-omni", "mimo-v2-pro", "mimo-v2.5", "mimo-v2.5-pro",
],
},
"NVIDIA NIM": {
"backend_type": "openai-compat",
"base_url": "https://integrate.api.nvidia.com/v1",
"models": [],
},
"Kilo.ai Gateway": {
"backend_type": "openai-compat",
"base_url": "https://api.kilo.ai/api/gateway",
"models": [],
},
"Command Code": {
"backend_type": "command-code",
"base_url": "https://api.commandcode.ai",
"cc_version": "0.26.8",
"models": [
"deepseek/deepseek-v4-flash", "deepseek/deepseek-v4-pro",
"anthropic:claude-sonnet-4-6", "anthropic:claude-haiku-4-5-20251001",
"anthropic:claude-opus-4-7", "anthropic:claude-opus-4-6",
"openai:gpt-5.5", "openai:gpt-5.4", "openai:gpt-5.4-mini", "openai:gpt-5.3-codex",
"moonshotai/Kimi-K2.6", "moonshotai/Kimi-K2.5",
"zai-org/GLM-5.1", "zai-org/GLM-5",
"MiniMaxAI/MiniMax-M2.7", "MiniMaxAI/MiniMax-M2.5",
"Qwen/Qwen3.6-Max-Preview", "Qwen/Qwen3.6-Plus",
"stepfun/Step-3.5-Flash", "google/gemini-3.1-flash-lite",
],
},
"OpenRouter": {
"backend_type": "openai-compat",
"base_url": "https://openrouter.ai/api/v1",
"models": [],
},
"Google Gemini (API Key)": {
"backend_type": "openai-compat",
"base_url": "https://generativelanguage.googleapis.com/v1beta/openai",
"models": [
"gemini-2.5-flash", "gemini-2.5-pro",
"gemini-2.0-flash", "gemini-2.0-flash-lite",
"gemini-2.5-flash-preview-native-audio-dialog",
],
},
"Google Gemini (OAuth)": {
"backend_type": "gemini-oauth-cli",
"base_url": "https://cloudcode-pa.googleapis.com",
"oauth_provider": "google-cli",
"models": [
"gemini-2.5-flash", "gemini-2.5-pro",
],
},
"Google Antigravity (OAuth)": {
"backend_type": "gemini-oauth-antigravity",
"base_url": "https://cloudcode-pa.googleapis.com",
"oauth_provider": "google-antigravity",
"models": [
"antigravity-gemini-3-flash",
"antigravity-gemini-3-pro",
"antigravity-gemini-3.1-pro",
"antigravity-claude-sonnet-4-6",
"antigravity-claude-opus-4-6-thinking",
"gemini-2.5-flash", "gemini-2.5-pro",
"gemini-3-flash-preview", "gemini-3-pro-preview", "gemini-3.1-pro-preview",
],
},
"OpenAdapter": {
"backend_type": "openai-compat",
"base_url": "https://api.openadapter.in/v1",
"models": [
"0G-DeepSeek-V3",
"0G-DeepSeek-v4-Pro",
"0G-GLM-5",
"0G-GLM-5.1",
"0G-Qwen3.6",
"0G-Qwen-VL",
],
},
"Z.ai Coding": {
"backend_type": "openai-compat",
"base_url": "https://api.z.ai/api/coding/paas/v4",
"models": [
"glm-5.1", "glm-4.7", "GLM-4-Plus", "GLM-4-Long",
"GLM-4-Flash", "GLM-4-FlashX", "GLM-Z1-Flash",
],
},
"Freebuff (Free DeepSeek/Kimi)": {
"backend_type": "freebuff",
"base_url": "https://freebuff.com",
"models": [
"deepseek/deepseek-v4-pro", "deepseek/deepseek-v4-flash",
"moonshotai/kimi-k2.6", "minimax/minimax-m2.7",
],
},
"Ollama (local)": {
"backend_type": "openai-compat",
"base_url": "http://localhost:11434/v1",
"models": [],
},
}
# ═══════════════════════════════════════════════════════════════════════
# Cross-platform process management
# ═══════════════════════════════════════════════════════════════════════
def _subprocess_new_group_flag():
if IS_WINDOWS:
return subprocess.CREATE_NEW_PROCESS_GROUP
return None
def _subprocess_preexec_fn():
if IS_WINDOWS:
return None
return os.setsid
def _kill_process_group(pid):
if IS_WINDOWS:
try:
subprocess.run(
["taskkill", "/F", "/T", "/PID", str(pid)],
capture_output=True, timeout=10,
)
except Exception:
pass
else:
try:
pgid = os.getpgid(pid)
os.killpg(pgid, signal.SIGTERM)
time.sleep(0.5)
try:
os.killpg(pgid, signal.SIGKILL)
except (ProcessLookupError, PermissionError):
pass
except (ProcessLookupError, PermissionError):
pass
def _kill_process_group_soft(pid):
if IS_WINDOWS:
try:
subprocess.run(
["taskkill", "/T", "/PID", str(pid)],
capture_output=True, timeout=10,
)
except Exception:
pass
else:
try:
pgid = os.getpgid(pid)
os.killpg(pgid, signal.SIGTERM)
except (ProcessLookupError, PermissionError):
pass
def _register_pgid_entry(kind, pid):
data = _load_pid_registry()
if IS_WINDOWS:
data[kind] = {"pid": pid, "pgid": pid, "ts": time.time()}
else:
try:
pgid = os.getpgid(pid)
except ProcessLookupError:
return
data[kind] = {"pid": pid, "pgid": pgid, "ts": time.time()}
_save_pid_registry(data)
# ═══════════════════════════════════════════════════════════════════════
# Cross-platform terminal detection
# ═══════════════════════════════════════════════════════════════════════
def detect_terminal():
if IS_WINDOWS:
for term in ["wt.exe", "cmd.exe", "powershell.exe"]:
path = shutil.which(term)
if path:
return (term, [], path)
return None
terms = [
("x-terminal-emulator", ["-e"]),
("kgx", ["--"]),
("gnome-terminal", ["--"]),
("konsole", ["-e"]),
("xterm", ["-e"]),
]
for t in terms:
if shutil.which(t[0]):
return (t[0], t[1], shutil.which(t[0]))
return None
# ═══════════════════════════════════════════════════════════════════════
# Cross-platform URL/file opening
# ═══════════════════════════════════════════════════════════════════════
def open_url(url):
if IS_WINDOWS:
os.startfile(url)
elif shutil.which("xdg-open"):
subprocess.Popen(["xdg-open", url], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
elif sys.platform == "darwin":
subprocess.Popen(["open", url], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def open_file(path):
open_url(str(path))
# ═══════════════════════════════════════════════════════════════════════
# String / utility helpers
# ═══════════════════════════════════════════════════════════════════════
def safe_name(name):
base = "".join(ch if ch.isalnum() or ch in "._-" else "_" for ch in name).strip("._-") or "endpoint"
digest = hashlib.sha1(name.encode("utf-8")).hexdigest()[:8]
return f"{base}-{digest}"
def _profile_slug(name):
return "".join(ch if ch.isalnum() else "-" for ch in name).strip("-") or "default"
def label_for_backend(backend_type):
return {
"openai-compat": "OpenAI-compatible",
"anthropic": "Anthropic",
"command-code": "Command Code",
"freebuff": "Freebuff (Free AI)",
"native": "Native",
}.get(backend_type, backend_type)
def normalize_model_id(text):
value = text.strip().lower()
if not value:
return ""
value = value.replace("/", "-")
value = value.replace("+", "plus")
value = "".join(ch if ch.isalnum() or ch in ".-" else "-" for ch in value)
while "--" in value:
value = value.replace("--", "-")
return value.strip("-.")
def normalize_base_url(url):
base = (url or "").strip().rstrip("/")
for suffix in ("/chat/completions", "/responses", "/messages"):
if base.endswith(suffix):
base = base[: -len(suffix)]
break
return base.rstrip("/")
def parse_model_list(text):
out = []
seen = set()
for raw in text.replace(",", "\n").splitlines():
mid = normalize_model_id(raw)
if mid and mid not in seen:
seen.add(mid)
out.append(mid)
return out
def now_utc_iso():
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
def _fmt_tok(n):
if n >= 1_000_000:
return f"{n/1_000_000:.1f}M"
if n >= 1_000:
return f"{n/1_000:.1f}K"
return str(n)
def _fmt_dur(s):
if s >= 3600:
return f"{s/3600:.1f}h"
if s >= 60:
return f"{s/60:.1f}m"
return f"{s:.1f}s"
def _status_pill(success_rate, fail_pct):
_U = _usage_theme()
if fail_pct > 0.15:
return ("ERR", _U["red"])
if fail_pct > 0.05:
return ("WARN", _U["yellow"])
return ("OK", _U["green"])
def _usage_theme():
return {
"base": "#0C0E16", "surface0": "#161928", "surface1": "#1E2235",
"surface2": "#2A2F47", "text": "#E4E6F0", "subtext": "#B0B4C8",
"dim": "#5C6180", "accent": "#7EB8F7", "blue": "#5DA4E8",
"sapphire": "#4EC5C1", "green": "#59D4A0", "yellow": "#F0C75E",
"red": "#F06A77", "peach": "#F09860", "teal": "#4EC5C1",
"lavender": "#A899F0", "sky": "#70C8E8", "maroon": "#C44B5C",
"flamingo": "#E878B0", "rosewater": "#F0D0C0",
"model_palette": ["#F09860", "#4EC5C1", "#5DA4E8", "#59D4A0",
"#F0C75E", "#A899F0", "#70C8E8", "#E878B0",
"#C44B5C", "#F0D0C0", "#7EB8F7", "#F06A77"],
}
# ═══════════════════════════════════════════════════════════════════════
# Provider preset helpers
# ═══════════════════════════════════════════════════════════════════════
def apply_provider_preset(endpoint, preset_name):
preset = PROVIDER_PRESETS.get(preset_name)
if not preset:
return endpoint
updated = dict(endpoint)
updated["provider_preset"] = preset_name
updated["backend_type"] = preset["backend_type"]
updated["base_url"] = normalize_base_url(preset["base_url"])
if preset.get("cc_version") and not updated.get("cc_version"):
updated["cc_version"] = preset["cc_version"]
if not updated.get("models") or (preset.get("backend_type") or "").startswith("gemini-oauth"):
updated["models"] = list(preset.get("models", []))
if preset.get("oauth_provider"):
updated["oauth_provider"] = preset["oauth_provider"]
if not updated.get("default_model") and updated.get("models"):
updated["default_model"] = updated["models"][0]
return updated
# ═══════════════════════════════════════════════════════════════════════
# Endpoint CRUD
# ═══════════════════════════════════════════════════════════════════════
def load_endpoints():
if ENDPOINTS_FILE.exists():
try:
return json.loads(ENDPOINTS_FILE.read_text())
except Exception:
pass
return {"default": None, "endpoints": []}
def save_endpoints(data):
ENDPOINTS_FILE.parent.mkdir(parents=True, exist_ok=True)
ENDPOINTS_FILE.write_text(json.dumps(data, indent=2))
def load_bgp_pools():
if BGP_POOLS_FILE.exists():
try:
return json.loads(BGP_POOLS_FILE.read_text())
except Exception:
pass
return {"pools": []}
def save_bgp_pools(data):
BGP_POOLS_FILE.parent.mkdir(parents=True, exist_ok=True)
BGP_POOLS_FILE.write_text(json.dumps(data, indent=2))
def get_endpoint(name):
for e in load_endpoints()["endpoints"]:
if e["name"] == name:
return e
return None
# ═══════════════════════════════════════════════════════════════════════
# Profile bundle import/export
# ═══════════════════════════════════════════════════════════════════════
def build_profile_bundle():
return {
"version": 1,
"exported_at": now_utc_iso(),
"endpoints": load_endpoints(),
"codex_config_toml": CONFIG.read_text(encoding="utf-8") if CONFIG.exists() else "",
}
def save_profile_bundle(path):
bundle = build_profile_bundle()
Path(path).write_text(json.dumps(bundle, indent=2), encoding="utf-8")
def import_profile_bundle(path):
data = json.loads(Path(path).read_text(encoding="utf-8"))
if not isinstance(data, dict):
raise ValueError("Invalid profile bundle")
endpoints = data.get("endpoints")
if not isinstance(endpoints, dict) or "endpoints" not in endpoints:
raise ValueError("Profile bundle missing endpoints")
if CONFIG.exists():
shutil.copy2(str(CONFIG), str(CONFIG_BAK))
if ENDPOINTS_FILE.exists():
shutil.copy2(str(ENDPOINTS_FILE), str(ENDPOINTS_FILE.with_suffix(".json.import-bak")))
save_endpoints(endpoints)
cfg = data.get("codex_config_toml", "")
if isinstance(cfg, str) and cfg.strip():
CONFIG.parent.mkdir(parents=True, exist_ok=True)
CONFIG.write_text(cfg, encoding="utf-8")
return endpoints
# ═══════════════════════════════════════════════════════════════════════
# Secure file write
# ═══════════════════════════════════════════════════════════════════════
def write_secure_text(path, text):
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_text(text, encoding="utf-8")
if not IS_WINDOWS:
os.chmod(str(tmp), 0o600)
os.replace(str(tmp), str(path))
# ═══════════════════════════════════════════════════════════════════════
# Config management
# ═══════════════════════════════════════════════════════════════════════
def backup_config():
if CONFIG.exists():
tmp = CONFIG_BAK.with_suffix(".tmp")
shutil.copy2(str(CONFIG), str(tmp))
os.replace(str(tmp), str(CONFIG_BAK))
def restore_config():
if CONFIG_BAK.exists():
tmp = CONFIG.with_suffix(".tmp")
shutil.copy2(str(CONFIG_BAK), str(tmp))
os.replace(str(tmp), str(CONFIG))
def begin_config_transaction(reason):
txn = {"started_at": time.time(), "reason": reason,
"config_existed": CONFIG.exists(), "backup_path": str(CONFIG_BAK)}
if CONFIG.exists():
backup_config()
CONFIG_TXN.parent.mkdir(parents=True, exist_ok=True)
CONFIG_TXN.write_text(json.dumps(txn, indent=2))
def end_config_transaction():
CONFIG_TXN.unlink(missing_ok=True)
def recover_config_if_needed(logfn=None):
if not CONFIG_TXN.exists():
return
try:
txn = json.loads(CONFIG_TXN.read_text())
if txn.get("config_existed") and CONFIG_BAK.exists():
restore_config()
if logfn:
logfn("Recovered Codex config from interrupted session.")
elif CONFIG.exists():
CONFIG.unlink()
if logfn:
logfn("Removed generated config from interrupted session.")
finally:
CONFIG_TXN.unlink(missing_ok=True)
def _toml_safe(val):
val = str(val).replace("\\", "/").replace('"', '\\"')
return val.split('\n', 1)[0].strip()
def _resolve_secret(value):
value = (value or "").strip()
m = re.fullmatch(r"\$\{ENV:([A-Z0-9_]+)\}", value)
if m:
return os.environ.get(m.group(1), "")
return value
def _merge_toml(existing_text, new_sections_text):
"""Merge launcher-generated TOML sections into an existing config.toml.
Preserves all existing sections/keys that are not overwritten by the
launcher. This is a simple line-based merge — good enough for the flat
TOML structure Codex uses.
"""
if not existing_text:
return new_sections_text
new_lines = new_sections_text.rstrip().splitlines()
root_keys = []
new_section_blocks = {}
current_section = None
current_block_lines = []
for line in new_lines:
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
if stripped.startswith("[") and not stripped.startswith("[["):
if current_section is not None:
new_section_blocks[current_section] = current_block_lines
current_section = stripped
current_block_lines = []
elif current_section is None:
root_keys.append(line)
else:
current_block_lines.append(line)
if current_section is not None:
new_section_blocks[current_section] = current_block_lines
existing_lines = existing_text.splitlines()
existing_sections = {}
existing_root_lines = []
existing_section_order = []
cur_sec = None
for line in existing_lines:
stripped = line.strip()
if stripped.startswith("[") and not stripped.startswith("[["):
if cur_sec is not None:
pass
cur_sec = stripped
existing_section_order.append(cur_sec)
existing_sections[cur_sec] = [line]
elif cur_sec is not None:
existing_sections[cur_sec].append(line)
else:
existing_root_lines.append(line)
merged_root = []
root_key_names = set()
for rk in root_keys:
key_name = rk.strip().split("=")[0].strip() if "=" in rk else ""
if key_name:
root_key_names.add(key_name)
for line in existing_root_lines:
stripped = line.strip()
if stripped.startswith("#") or not stripped:
merged_root.append(line)
continue
if "=" in stripped:
key_name = stripped.split("=")[0].strip()
if key_name in root_key_names:
continue
merged_root.append(line)
merged_root.extend(root_keys)
all_sections = list(existing_section_order)
for sec in new_section_blocks:
if sec not in all_sections:
all_sections.append(sec)
merged = list(merged_root)
if merged and merged[-1] != "":
merged.append("")
for sec in all_sections:
if sec in new_section_blocks:
merged.append(sec)
merged.extend(new_section_blocks[sec])
else:
merged.extend(existing_sections.get(sec, []))
merged.append("")
return "\n".join(merged).strip() + "\n"
def _gen_model_catalog(endpoint, selected_model=None):
default_model = selected_model or endpoint.get("default_model")
models = []
for mid in endpoint.get("models", []):
models.append({
"slug": mid, "model": mid, "display_name": mid,
"description": f"{endpoint['name']} {mid}",
"hidden": False, "isDefault": mid == default_model,
"shell_type": "shell_command", "visibility": "list",
"default_reasoning_level": "medium",
"supported_reasoning_levels": [
{"effort": "low", "description": "Fast"},
{"effort": "medium", "description": "Balanced"},
{"effort": "high", "description": "Deep"},
{"effort": "xhigh", "description": "Extra deep"},
],
"supportedReasoningEfforts": [
{"reasoningEffort": "low", "description": "Fast"},
{"reasoningEffort": "medium", "description": "Balanced"},
{"reasoningEffort": "high", "description": "Deep"},
{"reasoningEffort": "xhigh", "description": "Extra deep"},
],
"priority": 30, "context_size": 128000,
"additional_speed_tiers": [], "service_tiers": [],
"supports_reasoning_summaries": True, "support_verbosity": True,
"reasoning": True, "tool_call": True,
"supports_parallel_tool_calls": True,
"experimental_supported_tools": [], "supported_in_api": True,
"truncation_policy": {"mode": "tokens", "limit": 128000},
"base_instructions": "You are Codex, a coding agent.",
})
return {"models": models}
def write_config_for_native(endpoint, selected_model):
backup_config()
model_catalog = _gen_model_catalog(endpoint, selected_model)
mc_path = PROXY_CONFIG_DIR / f"models-{safe_name(endpoint['name'])}.json"
mc_path.parent.mkdir(parents=True, exist_ok=True)
mc_path.write_text(json.dumps(model_catalog, indent=2))
mc_str = str(mc_path).replace("\\", "/")
main_config = [
f'model = "{_toml_safe(selected_model)}"\n',
f'model_provider = "{_toml_safe(endpoint["name"])}"\n',
f'model_catalog_json = "{mc_str}"\n',
f'\n[model_providers."{endpoint["name"]}"]\n',
f'name = "{_toml_safe(endpoint["name"])}"\n',
f'base_url = "{_toml_safe(endpoint["base_url"])}"\n',
f'experimental_bearer_token = "{_toml_safe(_resolve_secret(endpoint["api_key"]))}"\n',
]
existing = CONFIG.read_text(encoding="utf-8") if CONFIG.exists() else ""
merged = _merge_toml(existing, "".join(main_config))
write_secure_text(CONFIG, merged)
profile_slug = _profile_slug(endpoint["name"])
profile_path = CONFIG.parent / f"{profile_slug}.config.toml"
profile_lines = [
f'model = "{_toml_safe(selected_model)}"\n',
f'model_provider = "{_toml_safe(endpoint["name"])}"\n',
f'model_catalog_json = "{mc_str}"\n',
f'service_tier = "default"\n',
f'approvals_reviewer = "user"\n',
]
write_secure_text(profile_path, "".join(profile_lines))
def write_config_for_translated(endpoint, selected_model, proxy_port=8080):
backup_config()
model_catalog = _gen_model_catalog(endpoint, selected_model)
mc_path = PROXY_CONFIG_DIR / f"models-{safe_name(endpoint['name'])}.json"
mc_path.parent.mkdir(parents=True, exist_ok=True)
mc_path.write_text(json.dumps(model_catalog, indent=2))
mc_str = str(mc_path).replace("\\", "/")
main_config = [
f'model = "{_toml_safe(selected_model)}"\n',
f'model_provider = "{_toml_safe(endpoint["name"])}"\n',
f'model_catalog_json = "{mc_str}"\n',
f'\n[model_providers."{endpoint["name"]}"]\n',
f'name = "{_toml_safe(endpoint["name"])}"\n',
f'base_url = "http://127.0.0.1:{proxy_port}"\n',
f'experimental_bearer_token = "codex-launcher-local"\n',
]
existing = CONFIG.read_text(encoding="utf-8") if CONFIG.exists() else ""
merged = _merge_toml(existing, "".join(main_config))
write_secure_text(CONFIG, merged)
profile_slug = _profile_slug(endpoint["name"])
profile_path = CONFIG.parent / f"{profile_slug}.config.toml"
profile_lines = [
f'model = "{_toml_safe(selected_model)}"\n',
f'model_provider = "{_toml_safe(endpoint["name"])}"\n',
f'model_catalog_json = "{mc_str}"\n',
f'service_tier = "fast"\n',
f'approvals_reviewer = "user"\n',
]
write_secure_text(profile_path, "".join(profile_lines))
# ═══════════════════════════════════════════════════════════════════════
# Model fetching
# ═══════════════════════════════════════════════════════════════════════
def endpoint_models_url(endpoint):
base = normalize_base_url(endpoint.get("base_url") or "")
if not base:
return ""
return f"{base}/models"
def endpoint_model_headers(endpoint):
key = (endpoint.get("api_key") or "").strip()
backend = endpoint.get("backend_type", "openai-compat")
headers = {"User-Agent": UA}
if backend == "anthropic":
if key:
headers["x-api-key"] = key
headers["anthropic-version"] = "2023-06-01"
elif key:
headers["Authorization"] = f"Bearer {key}"
return headers
def fetch_models_for_endpoint(endpoint, timeout=10):
bt = endpoint.get("backend_type", "")
if bt == "gemini-oauth-antigravity":
return list(ANTIGRAVITY_MODELS), None
url = endpoint_models_url(endpoint)
if not url:
return None, "Base URL is empty"
try:
req = urllib.request.Request(url, headers=endpoint_model_headers(endpoint))
raw = urllib.request.urlopen(req, timeout=timeout).read()
payload = json.loads(raw)
items = payload.get("data") or payload.get("models") or []
ids = []
seen = set()
for item in items:
mid = item.get("id") if isinstance(item, dict) else None
if mid and mid not in seen:
seen.add(mid)
ids.append(mid)
if not ids:
return None, "No models returned"
return ids, None
except Exception as e:
return None, str(e)
def refresh_endpoint_models(endpoint):
ids, err = fetch_models_for_endpoint(endpoint)
if not ids:
return None, err
updated = dict(endpoint)
updated["models"] = ids
if updated.get("default_model") not in ids:
updated["default_model"] = ids[0]
return updated, None
# ═══════════════════════════════════════════════════════════════════════
# Antigravity model list (static — no /v1/models REST endpoint)
# ═══════════════════════════════════════════════════════════════════════
ANTIGRAVITY_MODELS = [
"Gemini 3.5 Flash (High)", "Gemini 3.5 Flash (Medium)", "Gemini 3.5 Flash (Low)",
"Gemini 3.1 Pro (High)", "Gemini 3.1 Pro (Low)",
"Claude Sonnet 4.6 (Thinking)",
"Claude Opus 4.6 (Thinking)",
"GPT-OSS 120B (Medium)",
]
# ═══════════════════════════════════════════════════════════════════════
# OAuth secrets (local, never in repo)
# ═══════════════════════════════════════════════════════════════════════
def load_oauth_secrets():
try:
with open(OAUTH_SECRETS_PATH, encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
def save_oauth_secrets(data):
os.makedirs(os.path.dirname(OAUTH_SECRETS_PATH), exist_ok=True)
tmp = str(OAUTH_SECRETS_PATH) + ".tmp"
with open(tmp, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
os.replace(tmp, OAUTH_SECRETS_PATH)
# ═══════════════════════════════════════════════════════════════════════
# Doctor checks
# ═══════════════════════════════════════════════════════════════════════
def _doctor_check_streaming(base_url, key, bt, model, add):
if bt == "anthropic":
test_url = f"{base_url}/v1/messages"
headers = {"User-Agent": UA, "x-api-key": key, "anthropic-version": "2023-06-01", "content-type": "application/json"}
body = json.dumps({"model": model or "claude-3-5-haiku-20241022", "max_tokens": 1, "stream": True,
"messages": [{"role": "user", "content": "hi"}]}).encode()
else:
test_url = f"{base_url}/chat/completions"
headers = {"User-Agent": UA, "Authorization": f"Bearer {key}", "content-type": "application/json"}
body = json.dumps({"model": model, "max_tokens": 1, "stream": True,
"messages": [{"role": "user", "content": "hi"}]}).encode()
try:
req = urllib.request.Request(test_url, data=body, headers=headers, method="POST")
t0 = time.time()
resp = urllib.request.urlopen(req, timeout=20)
content_type = resp.headers.get("content-type", "")
first_chunk = resp.read(512)
lat = (time.time() - t0) * 1000
is_sse = "text/event-stream" in content_type or first_chunk.startswith(b"data:")
if is_sse:
add("Streaming support", True, f"SSE OK in {lat:.0f}ms")
else:
add("Streaming support", False, f"Expected SSE, got {content_type[:60]}")
except urllib.error.HTTPError as e:
body_text = ""
try:
body_text = e.read(200).decode(errors="replace")
except Exception:
pass
if e.code == 429:
add("Streaming support", None, "Rate limited (skipped)")
elif e.code in (400, 404, 422):
add("Streaming support", False, f"HTTP {e.code}: {body_text[:80]}")
else:
add("Streaming support", False, f"HTTP {e.code}")
except Exception as e:
add("Streaming support", False, str(e)[:100])
def _doctor_check_toolcall(base_url, key, bt, model, add):
tool = {"type": "function", "function": {"name": "test_tool", "parameters": {"type": "object", "properties": {"x": {"type": "string"}}}}}
if bt == "anthropic":
test_url = f"{base_url}/v1/messages"
headers = {"User-Agent": UA, "x-api-key": key, "anthropic-version": "2023-06-01", "content-type": "application/json"}
body = json.dumps({"model": model or "claude-3-5-haiku-20241022", "max_tokens": 50, "stream": False,
"tools": [tool], "messages": [{"role": "user", "content": "Use the test_tool with x=hello"}]}).encode()
else:
test_url = f"{base_url}/chat/completions"
headers = {"User-Agent": UA, "Authorization": f"Bearer {key}", "content-type": "application/json"}
body = json.dumps({"model": model, "max_tokens": 50, "stream": False, "tools": [tool],
"messages": [{"role": "user", "content": "Use the test_tool with x=hello"}]}).encode()
try:
req = urllib.request.Request(test_url, data=body, headers=headers, method="POST")
t0 = time.time()
resp = urllib.request.urlopen(req, timeout=30)
raw = resp.read()
lat = (time.time() - t0) * 1000
payload = json.loads(raw)
has_tools = False
if bt == "anthropic":
for block in (payload.get("content") or []):
if block.get("type") == "tool_use":
has_tools = True
break
else:
choices = payload.get("choices") or []
for ch in choices:
if (ch.get("message", {}).get("tool_calls")):
has_tools = True
break
if has_tools:
add("Tool-call support", True, f"Tool call received in {lat:.0f}ms")
else:
add("Tool-call support", None, f"Responded but no tool_call ({lat:.0f}ms)")
except urllib.error.HTTPError as e:
if e.code == 429:
add("Tool-call support", None, "Rate limited (skipped)")
elif e.code in (400, 404, 422):
err_body = ""
try:
err_body = e.read(200).decode(errors="replace")
except Exception:
pass
add("Tool-call support", False, f"HTTP {e.code}: {err_body[:80]}")
else:
add("Tool-call support", False, f"HTTP {e.code}")
except Exception as e:
add("Tool-call support", False, str(e)[:100])
def run_endpoint_doctor(endpoint):
"""Comprehensive health checks for an endpoint. Returns [(name, ok, detail), ...].
ok: True=pass, False=fail, None=warn/skip."""
checks = []
def add(name, ok, detail=""):
checks.append((name, ok, detail))
url = normalize_base_url(endpoint.get("base_url") or "")
key = (endpoint.get("api_key") or "").strip()
bt = endpoint.get("backend_type", "openai-compat")
model = endpoint.get("default_model") or (endpoint.get("models", [""])[0] if endpoint.get("models") else "")
parsed = urllib.parse.urlparse(url)
has_url = bool(parsed.scheme and parsed.netloc)
add("URL format", has_url, url if has_url else "Missing scheme or host")
if not has_url:
return checks
host = parsed.hostname
port = parsed.port or (443 if parsed.scheme == "https" else 80)
try:
t0 = time.time()
addrs = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM)
dns_ms = (time.time() - t0) * 1000
add("DNS resolution", True, f"{addrs[0][4][0]} ({dns_ms:.0f}ms)")
except socket.gaierror as e:
add("DNS resolution", False, str(e))
return checks
try:
t0 = time.time()
sock = socket.create_connection((host, port), timeout=10)
tcp_ms = (time.time() - t0) * 1000
if parsed.scheme == "https":
ctx = ssl.create_default_context()
try:
ssock = ctx.wrap_socket(sock, server_hostname=host)
tls_ms = (time.time() - t0) * 1000
add("TLS connection", True, f"TCP {tcp_ms:.0f}ms + handshake {tls_ms:.0f}ms")
ssock.close()
except ssl.SSLError as e:
add("TLS certificate", False, str(e)[:120])
sock.close()
return checks
else:
add("TCP connection", True, f"{tcp_ms:.0f}ms")
sock.close()
except (socket.timeout, ConnectionRefusedError, OSError) as e:
add("TCP connection", False, str(e)[:100])
return checks
if bt == "anthropic":
add("/models endpoint", None, "Anthropic has no /models endpoint — testing via /messages")
try:
t0 = time.time()
msg_url = f"{url}/v1/messages"
body = json.dumps({"model": model or "claude-3-5-haiku-20241022", "max_tokens": 1,
"messages": [{"role": "user", "content": "hi"}]}).encode()
req = urllib.request.Request(msg_url, data=body, headers={
"User-Agent": UA, "x-api-key": key, "anthropic-version": "2023-06-01", "content-type": "application/json",
}, method="POST")
urllib.request.urlopen(req, timeout=15)
lat = (time.time() - t0) * 1000
add("Auth valid", True, f"Responded in {lat:.0f}ms")
except urllib.error.HTTPError as e:
if e.code in (401, 403):
add("Auth valid", False, f"HTTP {e.code} — check API key")
elif e.code == 400:
add("Auth valid", True, "Authenticated (model or param error)")
else:
add("Auth valid", False, f"HTTP {e.code}")
except Exception as e:
add("Auth valid", False, str(e)[:100])
elif bt.startswith("gemini-oauth"):
token_name = "google-antigravity-oauth-token.json" if "antigravity" in bt else "google-cli-oauth-token.json"
token_path = PROXY_CONFIG_DIR / token_name
if token_path.exists():
try:
td = json.loads(token_path.read_text())
exp = td.get("expires_at", 0)
if exp > time.time():
remaining = exp - time.time()
add("OAuth token", True, f"Valid ({remaining / 60:.0f} min remaining)")
else:
add("OAuth token", False, "Token expired — re-login required")
except Exception as e:
add("OAuth token", False, str(e)[:80])
else:
add("OAuth token", False, f"No token file ({token_name})")
try:
t0 = time.time()
ids, err = fetch_models_for_endpoint(endpoint)
lat = (time.time() - t0) * 1000
if ids:
add("Network reachable", True, f"{lat:.0f}ms")
add("/models endpoint", True, f"{len(ids)} models ({lat:.0f}ms)")
if model:
add("Selected model exists", model in ids,
model if model in ids else f"'{model}' not in {ids[:5]}...")
elif err and ("401" in str(err) or "403" in str(err)):
add("Network reachable", True, f"{lat:.0f}ms")
add("Auth valid", False, str(err)[:100])
else:
add("Network reachable", False, str(err or "no response")[:100])
except Exception as e:
add("Network", False, str(e)[:100])
else:
try:
t0 = time.time()
ids, err = fetch_models_for_endpoint(endpoint)
lat = (time.time() - t0) * 1000
if ids:
add("Network reachable", True, f"{lat:.0f}ms")
add("Auth valid", True)
add("/models endpoint", True, f"{len(ids)} models ({lat:.0f}ms)")
if model:
add("Selected model exists", model in ids,
model if model in ids else f"'{model}' not found in {len(ids)} models")
else:
add("Selected model", False, "No model selected")
elif err and ("401" in str(err) or "403" in str(err)):
add("Network reachable", True, f"{lat:.0f}ms")
add("Auth valid", False, "HTTP 401/403 — check API key")
elif err and "429" in str(err):
add("Network reachable", True, f"{lat:.0f}ms")
add("Auth valid", True, "Authenticated but rate-limited")
add("/models endpoint", None, "Rate limited — skipped")
else:
add("Network reachable", False, str(err or "no response")[:100])
except Exception as e:
add("Network", False, str(e)[:100])
if bt not in ("native", "command-code"):
_doctor_check_streaming(url, key, bt, model, add)
if bt not in ("native", "command-code"):
_doctor_check_toolcall(url, key, bt, model, add)
return checks
# ═══════════════════════════════════════════════════════════════════════
# PID registry
# ═══════════════════════════════════════════════════════════════════════
def _load_pid_registry():
if PID_REGISTRY.exists():
try:
return json.loads(PID_REGISTRY.read_text())
except Exception:
pass
return {}
def _save_pid_registry(data):
PID_REGISTRY.parent.mkdir(parents=True, exist_ok=True)
tmp = PID_REGISTRY.with_suffix(".tmp")
tmp.write_text(json.dumps(data, indent=2))
os.replace(str(tmp), str(PID_REGISTRY))
def safe_cleanup_owned(logfn=None):
data = _load_pid_registry()
changed = False
for kind, meta in list(data.items()):
pid = meta.get("pid") or meta.get("pgid")
if not pid:
continue
try:
_kill_process_group(pid)
if logfn:
logfn(f"Stopped {kind} (pid {pid})")
changed = True
except ProcessLookupError:
changed = True
except Exception as e:
if logfn:
logfn(f"Could not stop {kind}: {e}")
if changed:
_save_pid_registry({})
# ═══════════════════════════════════════════════════════════════════════
# Proxy lifecycle
# ═══════════════════════════════════════════════════════════════════════
_proxy_proc = None
_proxy_port = None
_PROXY_PORT_FILE = PROXY_CONFIG_DIR / ".last-proxy-port"
def _pick_free_port():
saved = None
try:
saved = int(_PROXY_PORT_FILE.read_text().strip())
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("127.0.0.1", saved))
return saved
except (ValueError, OSError, FileNotFoundError):
pass
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]
def get_proxy_state():
return _proxy_proc, _proxy_port
def set_proxy_state(proc, port):
global _proxy_proc, _proxy_port
_proxy_proc = proc
_proxy_port = port
def stop_proxy():
global _proxy_proc
if _proxy_proc and _proxy_proc.poll() is None:
_kill_process_group(_proxy_proc.pid)
_proxy_proc = None
def start_proxy_for(endpoint, logfn):
"""Start the translation proxy for an endpoint. Returns the port.
logfn(msg) is used for status messages (may be called from any thread).
"""
global _proxy_proc, _proxy_port
stop_proxy()
port = _pick_free_port()
_proxy_port = port
_PROXY_PORT_FILE.parent.mkdir(parents=True, exist_ok=True)
_PROXY_PORT_FILE.write_text(str(port))
model_list = endpoint.get("models", [])
if (endpoint.get("backend_type") or "").startswith("gemini-oauth") and (endpoint.get("oauth_provider") or "").startswith("google"):
token_name = "google-antigravity-oauth-token.json" if endpoint.get("oauth_provider") == "google-antigravity" else "google-cli-oauth-token.json"
token_path = PROXY_CONFIG_DIR / token_name
try:
with open(token_path) as tf:
td = json.load(tf)
discovered = [] if endpoint.get("oauth_provider") == "google-antigravity" else td.get("available_models", [])
if discovered:
model_list = discovered
except Exception:
pass
pcfg = {
"port": port,
"backend_type": endpoint["backend_type"],
"target_url": normalize_base_url(endpoint["base_url"]),
"api_key": endpoint["api_key"],
"cc_version": endpoint.get("cc_version", ""),
"oauth_provider": endpoint.get("oauth_provider", ""),
"reasoning_enabled": endpoint.get("reasoning_enabled", True),
"reasoning_effort": endpoint.get("reasoning_effort", "medium"),
"force_model": endpoint.get("default_model") or "",
"models": [{"id": m, "object": "model", "created": 1700000000, "owned_by": endpoint["name"]}
for m in model_list],
}
pcfg_path = PROXY_CONFIG_DIR / f"proxy-{safe_name(endpoint['name'])}-{port}.json"
pcfg_path.parent.mkdir(parents=True, exist_ok=True)
pcfg_path.write_text(json.dumps(pcfg, indent=2))
_start_proxy_with_config(pcfg_path, port, logfn)
return port
def _start_proxy_with_config(pcfg_path, port, logfn):
global _proxy_proc
python_bin = sys.executable
proxy_script = str(PROXY)
popen_kwargs = {
"stdout": subprocess.DEVNULL,
"stderr": subprocess.PIPE,
"text": True,
}
if IS_WINDOWS:
popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
else:
popen_kwargs["preexec_fn"] = os.setsid
_proxy_proc = subprocess.Popen(
[python_bin, proxy_script, "--config", str(pcfg_path)],
**popen_kwargs,
)
_register_pgid_entry("proxy", _proxy_proc.pid)
_proxy_log_path = PROXY_CONFIG_DIR / "proxy-stderr.log"
_proxy_log_file = open(_proxy_log_path, "a", encoding="utf-8")
def _pipe_stderr():
if not _proxy_proc.stderr:
return
for line in _proxy_proc.stderr:
logfn(f"[proxy] {line.rstrip()}")
try:
_proxy_log_file.write(line)
_proxy_log_file.flush()
except Exception:
pass
threading.Thread(target=_pipe_stderr, daemon=True).start()
deadline = time.time() + 15
last_err = None
while time.time() < deadline:
if _proxy_proc.poll() is not None:
raise RuntimeError(f"Proxy exited early with code {_proxy_proc.returncode}")
try:
urllib.request.urlopen(f"http://127.0.0.1:{port}/v1/models", timeout=2)
logfn(f"Proxy ready on port {port}")
return
except Exception as e:
last_err = e
time.sleep(0.3)
_kill_process_group(_proxy_proc.pid)
raise RuntimeError(f"Proxy failed health check on port {port}: {last_err}")
def start_bgp_proxy(pool, model, logfn):
"""Start a BGP proxy for a pool. Returns (port, bgp_endpoint, pcfg_path)."""
global _proxy_proc, _proxy_port
stop_proxy()
port = _pick_free_port()
_proxy_port = port
_PROXY_PORT_FILE.parent.mkdir(parents=True, exist_ok=True)
_PROXY_PORT_FILE.write_text(str(port))
bgp_ep = {
"name": pool["name"],
"backend_type": "openai-compat",
"base_url": "http://bgp.placeholder",
"api_key": "",
"default_model": model,
"models": list(dict.fromkeys(r.get("model", model) for r in pool.get("routes", []))),
}
pcfg = {
"port": port,
"backend_type": "openai-compat",
"target_url": "http://bgp.placeholder",
"api_key": "",
"bgp_routes": pool.get("routes", []),
"models": [{"id": m, "object": "model", "created": 1700000000, "owned_by": "bgp"} for m in bgp_ep["models"]],
}
pcfg_path = PROXY_CONFIG_DIR / f"proxy-{safe_name(pool['name'])}-{port}.json"
pcfg_path.parent.mkdir(parents=True, exist_ok=True)
pcfg_path.write_text(json.dumps(pcfg, indent=2))
_start_proxy_with_config(pcfg_path, port, logfn)
return port, bgp_ep
# ═══════════════════════════════════════════════════════════════════════
# Codex detection
# ═══════════════════════════════════════════════════════════════════════
def detect_codex_cli():
try:
path = shutil.which("codex")
if not path:
return None
out = subprocess.run(["codex", "--version"], capture_output=True, text=True, timeout=5)
ver = (out.stdout or "").strip() or (out.stderr or "").strip() or "unknown"
return (path, ver)
except Exception:
return None
def detect_codex_desktop():
if IS_WINDOWS:
la = os.environ.get("LOCALAPPDATA", "")
pf = os.environ.get("PROGRAMFILES", "")
pf86 = os.environ.get("PROGRAMFILES(X86)", "")
desktop_paths = [
Path(la) / "Programs" / "Codex Desktop" / "Codex Desktop.exe",
Path(pf) / "Codex Desktop" / "Codex Desktop.exe",
Path(pf86) / "Codex Desktop" / "Codex Desktop.exe",
Path(la) / "OpenAI" / "Codex Desktop" / "Codex Desktop.exe",
]
for p in desktop_paths:
if p.exists():
return str(p)
# MSIX / Microsoft Store install: locate via Get-AppxPackage
try:
r = subprocess.run(
["powershell", "-NoProfile", "-Command",
"(Get-AppxPackage *OpenAI.Codex*).InstallLocation"],
capture_output=True, text=True, timeout=10,
)
loc = r.stdout.strip() if r.returncode == 0 else ""
if loc:
msix_exe = Path(loc) / "app" / "Codex.exe"
if msix_exe.exists():
return str(msix_exe)
except Exception:
pass
return None
if START_SH and START_SH.exists():
return str(START_SH)
return None
def check_codex_auth():
try:
out = subprocess.run(
["codex", "login", "status"],
capture_output=True, text=True, timeout=10,
)
text = (out.stdout or "").strip()
if not text:
text = (out.stderr or "").strip()
if out.returncode == 0 and text:
return ("logged_in", text)
if text:
return ("error", text)
return ("unknown", "No output from codex login status")
except FileNotFoundError:
return ("not_installed", "codex not found")
except OSError as e:
if e.errno == 2:
return ("not_configured", "Config not found — launch Codex once to create it")
return ("error", str(e))
except Exception as e:
return ("error", str(e))
# ═══════════════════════════════════════════════════════════════════════
# Log helpers
# ═══════════════════════════════════════════════════════════════════════
def last_log_lines(n=15):
try:
t = LAUNCH_LOG.read_text()
return "\n".join(t.splitlines()[-n:])
except Exception:
return "(no log file)"
# ═══════════════════════════════════════════════════════════════════════
# Process helpers (desktop kill etc.)
# ═══════════════════════════════════════════════════════════════════════
def kill_existing_desktop(logfn=None):
if IS_WINDOWS:
try:
out = subprocess.run(
["tasklist", "/FI", "IMAGENAME eq Codex Desktop.exe", "/FO", "CSV", "/NH"],
capture_output=True, text=True, timeout=5,
)
for line in out.stdout.strip().splitlines():
parts = line.split(",")
if len(parts) >= 2:
pid_str = parts[1].strip('"')
if pid_str.isdigit():
pid = int(pid_str)
_kill_process_group(pid)
if logfn:
logfn(f"Killed existing Codex Desktop (pid {pid})")
time.sleep(2)
except Exception as e:
if logfn:
logfn(f"Note: could not kill existing Desktop: {e}")
else:
try:
out = subprocess.run(["pgrep", "-f", "/opt/codex-desktop/electron"], capture_output=True, text=True, timeout=5)
pids = [p for p in out.stdout.strip().splitlines() if p.strip().isdigit()]
if not pids:
return
main_pid = int(pids[0])
pgid = os.getpgid(main_pid)
if pgid > 0:
os.killpg(pgid, signal.SIGTERM)
if logfn:
logfn(f"Killed existing Codex Desktop (pid {main_pid}, pgid {pgid})")
time.sleep(2)
try:
os.killpg(pgid, signal.SIGKILL)
except (ProcessLookupError, PermissionError):
pass
except Exception as e:
if logfn:
logfn(f"Note: could not kill existing Desktop: {e}")
# ═══════════════════════════════════════════════════════════════════════
# AI Monitoring — Self-Healing Watchdog
# ═══════════════════════════════════════════════════════════════════════
_TIER1_RULES = [
("proxy_health_fail", "restart_proxy", 30),
("proxy_port_conflict", "kill_stale_restart", 60),
("upstream_429", "wait_retry", 0),
("upstream_502_503", "retry_backoff", 30),
("upstream_500_repeat", "switch_provider", 60),
("upstream_timeout", "retry_increase_timeout",30),
("upstream_401_403", "alert_bad_key", 0),
("stream_broken_pipe", "restart_proxy", 30),
("stream_reset", "restart_proxy", 30),
("parsed_tool_calls_0_x3", "clear_schema_cache", 300),
("sanitizer_suspicious_5x","alert_model_issue", 0),
("stuck_recovery_x5", "suggest_switch_model", 0),
("codex_process_dead", "alert_restart", 0),
("schema_corrupt", "delete_provider_caps", 0),
]
_FAILURE_SIGNALS = {
"parsed_tool_calls=0": ("C1", "parser_empty"),
"[STUCK-RECOVERY]": ("C3", "stuck_recovery"),
"suspicious cmd": ("C4", "sanitizer_flag"),
"empty cmd recovered": ("C6", "empty_cmd"),
"HTTP 429": ("B1", "rate_limited"),
"HTTP 500": ("B2", "server_error"),
"HTTP 502": ("B2", "server_error"),
"HTTP 503": ("B2", "server_error"),
"HTTP 401": ("B3", "auth_failure"),
"HTTP 403": ("B4", "forbidden"),
"Connection refused": ("A1", "proxy_dead"),
"Address already in use": ("A2", "port_conflict"),
"Broken pipe": ("B7", "broken_pipe"),
"Connection reset": ("B6", "connection_reset"),
"timed out": ("B5", "timeout"),
"SELF-REVIVE CRASH": ("A5", "proxy_crash"),
"stream error": ("B6", "stream_error"),
"content_type.*array": ("E1", "schema_corrupt"),
}
_DIAGNOSTIC_SYSTEM_PROMPT = (
'You are a diagnostic agent for "Codex Launcher" — a desktop app that runs a local '
'translation proxy between OpenAI Codex CLI/Desktop and AI providers.\n\n'
'Analyze the incident and respond with ONLY a JSON object:\n'
'{"action": "...", "reason": "...", "confidence": 0.0-1.0}\n\n'
'Available actions: restart_proxy, kill_stale_processes, clear_schema_cache, '
'switch_provider, increase_timeout, regenerate_config, cleanup_stale, '
'alert_user, ignore, retry_now\n\n'
'Rules:\n'
'- upstream 401/403 with auth error -> alert_user\n'
'- proxy dead -> restart_proxy\n'
'- same error 5+ times -> switch_provider or alert_user\n'
'- schema/content_type error -> clear_schema_cache\n'
'- "Address already in use" -> kill_stale_processes then restart_proxy\n'
'- timeout on slow upstream -> increase_timeout\n'
'- single transient 429/502/503 -> ignore\n'
'- "stream disconnected" + proxy healthy -> ignore\n'
'- no extra text, no markdown, just the JSON object'
)
def load_monitoring_config():
if MONITORING_FILE.exists():
try:
return json.loads(MONITORING_FILE.read_text())
except Exception:
pass
return {
"enabled": False,
"provider_url": "",
"model": "",
"api_key": "",
"health_check_interval_s": 5,
"auto_restart_proxy": True,
"auto_switch_provider": False,
}
def save_monitoring_config(cfg):
MONITORING_FILE.parent.mkdir(parents=True, exist_ok=True)
MONITORING_FILE.write_text(json.dumps(cfg, indent=2))
def load_incident_store():
if INCIDENT_STORE_FILE.exists():
try:
return json.loads(INCIDENT_STORE_FILE.read_text())
except Exception:
pass
return {"version": 1, "incidents": {}, "stats": {"ai_calls": 0, "tokens_used": 0}}
def save_incident_store(store):
INCIDENT_STORE_FILE.parent.mkdir(parents=True, exist_ok=True)
INCIDENT_STORE_FILE.write_text(json.dumps(store, indent=2))
def monitoring_log(msg):
try:
with open(str(MONITORING_LOG), "a") as f:
f.write(f"[{time.strftime('%H:%M:%S')}] {msg}\n")
except Exception:
pass
class IncidentStore:
def __init__(self):
self._store = load_incident_store()
self._dirty = False
def lookup(self, pattern):
inc = self._store.get("incidents", {}).get(pattern)
if inc and inc.get("success_count", 0) > 0:
rate = inc["success_count"] / max(inc["success_count"] + inc.get("fail_count", 0), 1)
if rate > 0.5:
return inc
return None
def record(self, pattern, fix, success=True):
incs = self._store.setdefault("incidents", {})
inc = incs.setdefault(pattern, {
"fix": fix, "success_count": 0, "fail_count": 0,
"last_seen": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"occurrences": 0,
})
inc["last_seen"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
inc["occurrences"] = inc.get("occurrences", 0) + 1
if success:
inc["success_count"] = inc.get("success_count", 0) + 1
else:
inc["fail_count"] = inc.get("fail_count", 0) + 1
self._dirty = True
def record_ai_call(self, tokens=0):
stats = self._store.setdefault("stats", {"ai_calls": 0, "tokens_used": 0})
stats["ai_calls"] = stats.get("ai_calls", 0) + 1
stats["tokens_used"] = stats.get("tokens_used", 0) + tokens
self._dirty = True
def flush(self):
if self._dirty:
save_incident_store(self._store)
self._dirty = False
@property
def stats(self):
return self._store.get("stats", {"ai_calls": 0, "tokens_used": 0})
class AIDiagnosticAgent:
def __init__(self, provider_url, model, api_key):
self.provider_url = provider_url
self.model = model
self.api_key = api_key
self.incident_store = IncidentStore()
def diagnose(self, context):
pattern = self._extract_pattern(context)
known = self.incident_store.lookup(pattern)
if known:
monitoring_log(f"Tier 2 HIT: pattern={pattern} fix={known['fix']}")
return {"action": known["fix"], "reason": "known_pattern", "confidence": 0.9, "tier": 2}
action = self._call_model(context)
if action:
self.incident_store.record(pattern, action.get("action", "unknown"))
self.incident_store.flush()
return action
def _extract_pattern(self, context):
parts = []
for k in sorted(context.get("signals", [])):
parts.append(k)
if context.get("http_code"):
parts.append(f"http_{context['http_code']}")
return "+".join(parts[:3]) or "unknown"
def _call_model(self, context):
prompt = (
f"INCIDENT REPORT:\n"
f"Time: {time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())}\n"
f"Proxy health: {context.get('proxy_alive', 'unknown')}\n"
f"Upstream: {context.get('upstream_url', 'unknown')}\n"
f"Model: {context.get('model', 'unknown')}\n"
f"Last HTTP code: {context.get('http_code', 'n/a')}\n"
f"Recent signals: {context.get('signals', [])}\n"
f"Recent log tail:\n{context.get('log_tail', '')[:1500]}\n"
)
body = {
"model": self.model,
"messages": [
{"role": "system", "content": _DIAGNOSTIC_SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
"max_tokens": 200,
"temperature": 0.1,
}
try:
req = urllib.request.Request(
self.provider_url,
data=json.dumps(body).encode(),
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}",
},
)
resp = urllib.request.urlopen(req, timeout=15)
result = json.loads(resp.read())
text = result["choices"][0]["message"]["content"].strip()
self.incident_store.record_ai_call(tokens=800)
action = json.loads(text)
action["tier"] = 3
monitoring_log(f"Tier 3 AI: action={action.get('action')} reason={action.get('reason')}")
return action
except Exception as e:
monitoring_log(f"Tier 3 AI FAILED: {e}")
return {"action": "alert_user", "reason": f"ai_diag_failed: {e}", "confidence": 0.0, "tier": 3}
class HealthWatcher(threading.Thread):
def __init__(self, on_failure, on_recovery, on_signal, on_action):
super().__init__(daemon=True)
self.cfg = load_monitoring_config()
self.on_failure = on_failure
self.on_recovery = on_recovery
self.on_signal = on_signal
self.on_action = on_action
self.failures = 0
self.running = False
self._signal_counts = collections.defaultdict(int)
self._last_actions = {}
self._restart_count = 0
self._last_restart_time = 0
def run(self):
self.running = True
self.incident_store = IncidentStore()
self._log_analyzer = _LogAnalyzerThread(self._on_log_signal)
self._log_analyzer.start()
while self.running:
self.cfg = load_monitoring_config()
if not self.cfg.get("enabled"):
time.sleep(5)
continue
port = self._get_proxy_port()
if port:
healthy = self._check_health(port)
if healthy:
if self.failures > 0:
self.failures = 0
self.on_recovery()
else:
self.failures += 1
if self.failures >= 3:
self._handle_failure("proxy_health_fail")
self.incident_store.flush()
interval = self.cfg.get("health_check_interval_s", 5)
time.sleep(interval)
def stop(self):
self.running = False
if hasattr(self, '_log_analyzer'):
self._log_analyzer.running = False
def _get_proxy_port(self):
try:
cfg_path = PROXY_CONFIG_DIR / "proxy-config.json"
if cfg_path.exists():
d = json.loads(cfg_path.read_text())
return d.get("port")
except Exception:
pass
return None
def _check_health(self, port):
try:
req = urllib.request.Request(f"http://localhost:{port}/health")
resp = urllib.request.urlopen(req, timeout=5)
return resp.status == 200
except Exception:
return False
def _on_log_signal(self, fault_id, category, line):
self._signal_counts[category] += 1
self.on_signal(fault_id, category, line[:200])
count = self._signal_counts[category]
if category in ("proxy_dead", "port_conflict") and count >= 2:
self._handle_failure(category)
elif category in ("server_error", "timeout") and count >= 3:
self._handle_failure(category + "_repeat")
elif category in ("sanitizer_flag",) and count >= 5:
self._handle_failure("sanitizer_suspicious_5x")
elif category in ("stuck_recovery",) and count >= 5:
self._handle_failure("stuck_recovery_x5")
elif category in ("parser_empty",) and count >= 3:
self._handle_failure("parsed_tool_calls_0_x3")
elif category in ("schema_corrupt",):
self._handle_failure("schema_corrupt")
def _handle_failure(self, trigger):
now = time.time()
for rule_trigger, action, cooldown in _TIER1_RULES:
if rule_trigger == trigger:
last_t = self._last_actions.get(action, 0)
if now - last_t < cooldown:
return
self._last_actions[action] = now
monitoring_log(f"Tier 1: trigger={trigger} action={action}")
self.on_action(action, trigger)
self.incident_store.record(trigger, action, success=True)
return
self._try_tier2_3(trigger)
def _try_tier2_3(self, trigger):
cfg = self.cfg
if not cfg.get("provider_url") or not cfg.get("model") or not cfg.get("api_key"):
monitoring_log(f"No AI configured for Tier 2/3 — alerting user for trigger={trigger}")
self.on_action("alert_user", trigger)
return
agent = AIDiagnosticAgent(cfg["provider_url"], cfg["model"], cfg["api_key"])
context = {
"signals": [trigger],
"proxy_alive": self.failures == 0,
"log_tail": self._get_recent_log(),
}
result = agent.diagnose(context)
if result:
action = result.get("action", "alert_user")
monitoring_log(f"Tier {result.get('tier', '?')}: action={action}")
self.on_action(action, trigger)
def _get_recent_log(self):
lines = []
for log_name in ["cc-debug.log", "proxy.log"]:
log_path = PROXY_CONFIG_DIR / log_name
try:
text = log_path.read_text()
lines.extend(text.splitlines()[-20:])
except Exception:
pass
return "\n".join(lines[-30:])
class _LogAnalyzerThread(threading.Thread):
def __init__(self, on_signal):
super().__init__(daemon=True)
self.on_signal = on_signal
self.running = False
def run(self):
self.running = True
log_paths = [
str(PROXY_CONFIG_DIR / "cc-debug.log"),
str(PROXY_CONFIG_DIR / "proxy.log"),
]
fhs = {}
for p in log_paths:
try:
f = open(p, "r")
f.seek(0, 2)
fhs[p] = f
except Exception:
pass
while self.running:
activity = False
for p, fh in list(fhs.items()):
try:
line = fh.readline()
if line:
activity = True
for pattern, (fault_id, category) in _FAILURE_SIGNALS.items():
if re.search(pattern, line):
self.on_signal(fault_id, category, line.strip())
break
except Exception:
pass
if not activity:
time.sleep(0.5)
# ═══════════════════════════════════════════════════════════════════════
# Usage stats
# ═══════════════════════════════════════════════════════════════════════
def load_usage_stats():
try:
if _USAGE_STATS_FILE.exists():
return json.loads(_USAGE_STATS_FILE.read_text())
except Exception:
pass
return {"providers": {}, "updated": None}
# ═══════════════════════════════════════════════════════════════════════
# Default endpoints creation
# ═══════════════════════════════════════════════════════════════════════
def create_default_endpoints():
if not ENDPOINTS_FILE.exists():
save_endpoints({
"default": "OpenAI",
"endpoints": [
{"name": "OpenAI", "backend_type": "native", "base_url": "https://api.openai.com/v1",
"api_key": "", "default_model": "gpt-4o", "models": ["gpt-4o", "gpt-4o-mini"],
"provider_preset": "OpenAI"},
{"name": "Z.AI", "backend_type": "openai-compat",
"base_url": "https://api.z.ai/api/coding/paas/v4",
"api_key": "", "default_model": "glm-5.1",
"models": ["glm-4.5", "glm-4.5-air", "glm-4.6", "glm-4.7", "glm-5", "glm-5-turbo", "glm-5.1"],
"provider_preset": "Custom"},
],
})
def ensure_dirs():
for d in [LOG_DIR, PROXY_CONFIG_DIR]:
d.mkdir(parents=True, exist_ok=True)