feat: add 9 provider presets, latency indicator, fix _start_proxy bug, fix sandbox/approval flags, update Antigravity models

This commit is contained in:
Roman | RyzenAdvanced
2026-05-29 09:47:04 +04:00
Unverified
parent 745d3f9eb1
commit 64049f5c94
2 changed files with 488 additions and 41 deletions

View File

@@ -444,18 +444,20 @@ PROVIDER_PRESETS = {
"gemini-2.5-flash", "gemini-2.5-pro",
],
},
"Google Antigravity (OAuth)": {
"backend_type": "gemini-oauth-antigravity",
"base_url": "https://cloudcode-pa.googleapis.com",
"oauth_provider": "google-antigravity",
"models": [
"Gemini 3.5 Flash (High)", "Gemini 3.5 Flash (Medium)", "Gemini 3.5 Flash (Low)",
"Gemini 3.1 Pro (High)", "Gemini 3.1 Pro (Low)",
"Claude Sonnet 4.6 (Thinking)",
"Claude Opus 4.6 (Thinking)",
"GPT-OSS 120B (Medium)",
],
},
"Google Antigravity (OAuth)": {
"backend_type": "gemini-oauth-antigravity",
"base_url": "https://cloudcode-pa.googleapis.com",
"oauth_provider": "google-antigravity",
"models": [
"antigravity-gemini-3-flash",
"antigravity-gemini-3-pro",
"antigravity-gemini-3.1-pro",
"antigravity-claude-sonnet-4-6",
"antigravity-claude-opus-4-6-thinking",
"gemini-2.5-flash", "gemini-2.5-pro",
"gemini-3-flash-preview", "gemini-3-pro-preview", "gemini-3.1-pro-preview",
],
},
"OpenAdapter": {
"backend_type": "openai-compat",
"base_url": "https://api.openadapter.in/v1",
@@ -503,6 +505,46 @@ PROVIDER_PRESETS = {
"moonshotai/kimi-k2.6", "minimax/minimax-m2.7",
],
},
"Perplexity": {
"backend_type": "openai-compat",
"base_url": "https://api.perplexity.ai",
"models": ["sonar", "sonar-pro", "sonar-reasoning-pro", "sonar-deep-research"],
},
"Cohere": {
"backend_type": "openai-compat",
"base_url": "https://api.cohere.ai/compatibility/v1",
"models": [],
},
"Hugging Face": {
"backend_type": "openai-compat",
"base_url": "https://router.huggingface.co/v1",
"models": [],
},
"Together AI": {
"backend_type": "openai-compat",
"base_url": "https://api.together.xyz/v1",
"models": [],
},
"Groq": {
"backend_type": "openai-compat",
"base_url": "https://api.groq.com/openai/v1",
"models": [],
},
"Fireworks AI": {
"backend_type": "openai-compat",
"base_url": "https://api.fireworks.ai/inference/v1",
"models": [],
},
"LM Studio (local)": {
"backend_type": "openai-compat",
"base_url": "http://127.0.0.1:1234/v1",
"models": [],
},
"vLLM / OpenAI-Compatible (self-hosted)": {
"backend_type": "openai-compat",
"base_url": "http://localhost:8000/v1",
"models": [],
},
}
def safe_name(name):
@@ -519,6 +561,7 @@ def label_for_backend(backend_type):
"anthropic": "Anthropic",
"command-code": "Command Code",
"codebuff": "Codebuff (Free AI)",
"freebuff": "Freebuff (Free AI)",
"native": "Native",
}.get(backend_type, backend_type)
@@ -569,6 +612,27 @@ def apply_provider_preset(endpoint, preset_name):
updated["default_model"] = updated["models"][0]
return updated
def _check_provider_latency(endpoint, timeout=5):
import urllib.request
base_url = (endpoint.get("base_url") or "").rstrip("/")
api_key = endpoint.get("api_key", "")
bt = endpoint.get("backend_type", "openai-compat")
if not base_url or not api_key:
return None
url = base_url + "/models" if not base_url.endswith("/models") else base_url
headers = {"Authorization": f"Bearer {api_key}"}
if bt == "anthropic":
headers = {"x-api-key": api_key, "anthropic-version": "2023-06-01"}
url = base_url.replace("/v1", "") + "/v1/models" if "/v1" not in base_url else base_url + "/models"
try:
req = urllib.request.Request(url, headers=headers)
t0 = time.time()
with urllib.request.urlopen(req, timeout=timeout) as resp:
resp.read(1)
return time.time() - t0
except Exception:
return None
def _doctor_check_streaming(base_url, key, bt, model, add):
if bt == "anthropic":
test_url = f"{base_url}/v1/messages"
@@ -2086,6 +2150,10 @@ class LauncherWin(Gtk.Window):
self._combo.connect("changed", lambda c: self._on_endpoint_changed())
sel_box.pack_start(self._combo, True, True, 0)
self._latency_label = Gtk.Label(label=" -- ")
self._latency_label.set_opacity(0.7)
sel_box.pack_start(self._latency_label, False, False, 4)
# model selector
sel_box.pack_start(Gtk.Label(label="Model:"), False, False, 0)
self._model_combo = Gtk.ComboBoxText()
@@ -2288,10 +2356,11 @@ class LauncherWin(Gtk.Window):
def _on_endpoint_changed(self):
name = self._combo.get_active_text()
is_bgp = name and name.startswith("🔀 ")
is_bgp = name and name.startswith("\U0001f500 ")
bgp_name = name[2:] if is_bgp else None
ep = get_endpoint(name) if name and not is_bgp else None
self._model_combo.remove_all()
self._check_latency(name)
if is_bgp:
pool = None
for p in load_bgp_pools().get("pools", []):
@@ -2320,6 +2389,34 @@ class LauncherWin(Gtk.Window):
elif models:
self._model_combo.set_active(0)
def _check_latency(self, ep_name):
self._latency_label.set_text("...")
is_bgp = ep_name and ep_name.startswith("\U0001f500 ")
if is_bgp or not ep_name:
self._latency_label.set_text(" -- ")
return
ep = get_endpoint(ep_name)
if not ep:
self._latency_label.set_text(" -- ")
return
def _run():
lat = _check_provider_latency(ep)
def _update():
if lat is None:
self._latency_label.set_text(" -- ")
self._latency_label.set_opacity(0.5)
elif lat < 1.0:
self._latency_label.set_text(f" {lat:.2f}s")
self._latency_label.set_markup(f'<span foreground="#2ea043">{lat:.2f}s</span>')
elif lat < 3.0:
self._latency_label.set_markup(f'<span foreground="#d29922">{lat:.2f}s</span>')
else:
self._latency_label.set_markup(f'<span foreground="#e74c3c">{lat:.2f}s</span>')
GLib.idle_add(_update)
threading.Thread(target=_run, daemon=True).start()
# ── endpoint mgr ─────────────────────────────────────────────
def _open_mgr(self):
@@ -2406,7 +2503,7 @@ class LauncherWin(Gtk.Window):
return
for ep in load_endpoints().get("endpoints", []):
if ep.get("name") == ep_name:
self._start_proxy(ep)
_start_proxy_for(ep, self.log)
break
except Exception as e:
self.log(f"[AI Monitor] Proxy restart failed: {e}")
@@ -2421,7 +2518,7 @@ class LauncherWin(Gtk.Window):
return
for ep in load_endpoints().get("endpoints", []):
if ep.get("name") == ep_name:
self._start_proxy(ep)
_start_proxy_for(ep, self.log)
self.log("Proxy restarted")
break
except Exception as e:

View File

@@ -242,6 +242,7 @@ def load_config():
p = argparse.ArgumentParser(description="Responses API translation proxy")
p.add_argument("--config", help="JSON config file path")
p.add_argument("--port", type=int, default=None)
p.add_argument("--host", default=None, help="Bind address (default: 127.0.0.1, use 0.0.0.0 for Docker)")
p.add_argument("--backend", default=None, choices=["openai-compat", "anthropic", "command-code", "codebuff", "freebuff", "auto"])
p.add_argument("--target-url", default=None)
p.add_argument("--api-key", default=None)
@@ -308,6 +309,8 @@ REASONING_ENABLED = True
REASONING_EFFORT = "medium"
FORCE_MODEL = ""
BGP_ROUTES = []
CAVEMAN_MODE = False
RTK_COMPRESSION = False
PROMPT_ENHANCER = False
PROMPT_ENHANCER_MODE = "offline"
PROMPT_ENHANCER_MODEL = ""
@@ -374,6 +377,54 @@ _antigravity_version = "2.0.1"
_antigravity_version_checked = 0
_antigravity_version_lock = threading.Lock()
_antigravity_version_validated = False
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN
self.last_state_change = time.time()
self.lock = threading.Lock()
def can_execute(self):
with self.lock:
now = time.time()
if self.state == "OPEN":
if now - self.last_state_change > self.recovery_timeout:
self.state = "HALF_OPEN"
self.last_state_change = now
print(f"[circuit-breaker] Circuit transitioning from OPEN to HALF_OPEN", file=sys.stderr)
return True
return False
return True
def record_success(self):
with self.lock:
if self.state == "HALF_OPEN":
print(f"[circuit-breaker] Circuit transitioning from HALF_OPEN to CLOSED (success)", file=sys.stderr)
self.failure_count = 0
self.state = "CLOSED"
self.last_state_change = time.time()
def record_failure(self):
with self.lock:
self.failure_count += 1
now = time.time()
if self.state in ("CLOSED", "HALF_OPEN"):
if self.failure_count >= self.failure_threshold or self.state == "HALF_OPEN":
self.state = "OPEN"
self.last_state_change = now
print(f"[circuit-breaker] Circuit tripped! Transitioning to OPEN. Failure count: {self.failure_count}", file=sys.stderr)
_circuit_breakers = {}
_circuit_breakers_lock = threading.Lock()
def get_circuit_breaker(backend_name):
with _circuit_breakers_lock:
if backend_name not in _circuit_breakers:
_circuit_breakers[backend_name] = CircuitBreaker()
return _circuit_breakers[backend_name]
_last_user_urls = collections.deque(maxlen=20)
_conn_pool_lock = threading.Lock()
@@ -721,7 +772,7 @@ class GoogleAccountPool(AccountPool):
self.variant = variant
def _do_load(self):
cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy")
cache_dir = _LOG_DIR
accounts = []
primary = f"google-{self.variant}-oauth-token.json"
primary_path = os.path.join(cache_dir, primary)
@@ -865,7 +916,7 @@ def _refresh_google_token(token_data, token_path):
return token_data.get("access_token", "")
def _force_refresh_google_token():
token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy",
token_path = os.path.join(_LOG_DIR,
"google-antigravity-oauth-token.json" if OAUTH_PROVIDER == "google-antigravity"
else "google-oauth-token.json")
try:
@@ -960,7 +1011,7 @@ def _validate_antigravity_version(version, access_token=None, project_id=None):
if not access_token:
access_token = _refresh_oauth_token()
if not project_id:
token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", "google-antigravity-oauth-token.json")
token_path = os.path.join(_LOG_DIR, "google-antigravity-oauth-token.json")
try:
with open(token_path) as f:
project_id = json.load(f).get("project_id", "")
@@ -1014,7 +1065,7 @@ def _validate_antigravity_version(version, access_token=None, project_id=None):
return True
def _fetch_antigravity_version():
cache_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", "antigravity-version.json")
cache_path = os.path.join(_LOG_DIR, "antigravity-version.json")
try:
with open(cache_path) as f:
cached = json.load(f)
@@ -1027,7 +1078,7 @@ def _fetch_antigravity_version():
project_id = None
try:
access_token = _refresh_oauth_token()
token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", "google-antigravity-oauth-token.json")
token_path = os.path.join(_LOG_DIR, "google-antigravity-oauth-token.json")
with open(token_path) as f:
project_id = json.load(f).get("project_id", "")
except Exception:
@@ -1128,6 +1179,7 @@ def _init_runtime():
global MODELS, CC_VERSION, REASONING_ENABLED, REASONING_EFFORT, BGP_ROUTES
global _api_key_pool, PROMPT_ENHANCER
global VISION_FALLBACK_URL, VISION_FALLBACK_MODEL, VISION_FALLBACK_KEY
global CAVEMAN_MODE, RTK_COMPRESSION
CONFIG = load_config()
PORT = CONFIG["port"]
@@ -1161,6 +1213,8 @@ def _init_runtime():
if not VISION_FALLBACK_KEY:
VISION_FALLBACK_KEY = _vision_key
BGP_ROUTES = CONFIG.get("bgp_routes", [])
CAVEMAN_MODE = CONFIG.get("caveman_mode", False) or os.environ.get("CAVEMAN_MODE") == "1"
RTK_COMPRESSION = CONFIG.get("rtk_compression", False) or os.environ.get("RTK_COMPRESSION") == "1"
_api_key_pool = None
if API_KEY and "," in API_KEY and not OAUTH_PROVIDER.startswith("google") and BACKEND not in ("codebuff", "freebuff"):
_api_key_pool = APIKeyPool(BACKEND, API_KEY)
@@ -1243,7 +1297,7 @@ def _hot_reload_api_key():
if (BACKEND or "").startswith("gemini-oauth") and (OAUTH_PROVIDER or "").startswith("google"):
token_name = "google-antigravity-oauth-token.json" if OAUTH_PROVIDER == "google-antigravity" else "google-cli-oauth-token.json"
token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", token_name)
token_path = os.path.join(_LOG_DIR, token_name)
_preemptive_refresh_token(token_path)
try:
with open(token_path) as _tf:
@@ -1395,7 +1449,7 @@ def _refresh_oauth_token_for(api_key, oauth_provider):
if not oauth_provider.startswith("google"):
return api_key
token_name = "google-antigravity-oauth-token.json" if oauth_provider == "google-antigravity" else "google-cli-oauth-token.json"
token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", token_name)
token_path = os.path.join(_LOG_DIR, token_name)
if not os.path.exists(token_path):
return api_key
try:
@@ -1764,7 +1818,7 @@ _CROF_ADAPTIVE = {
_model_max_tokens = {}
_model_max_tokens_lock = threading.Lock()
def _estimate_tokens(item):
def _estimate_item_tokens(item):
if not isinstance(item, dict):
return 4
t = item.get("type", "")
@@ -1794,7 +1848,7 @@ def _estimate_tokens(item):
def _estimate_input_tokens(input_data):
if not isinstance(input_data, list):
return 0
return sum(_estimate_tokens(i) for i in input_data)
return sum(_estimate_item_tokens(i) for i in input_data)
def _get_model_max_tokens(model):
with _model_max_tokens_lock:
@@ -1873,6 +1927,8 @@ def _sorted_bgp_routes():
return sorted(BGP_ROUTES, key=lambda r: _score_route(r, stats))
def _crof_record(model, n_items, success):
if "crof.ai" not in TARGET_URL:
return
if not isinstance(n_items, int) or n_items < 1:
return
entry = {"model": model, "items": n_items, "ok": success}
@@ -2010,9 +2066,89 @@ def _extract_files(items):
pass
return files
def _rtk_compress_tool_output(output_str):
if not isinstance(output_str, str) or not output_str:
return output_str
lines = output_str.splitlines()
compressed_lines = []
# Heuristic 1: Compress git diffs
is_diff = output_str.startswith("diff --git") or "@@ -" in output_str
if is_diff:
unchanged_count = 0
held_lines = []
for line in lines:
if line.startswith("+") or line.startswith("-") or line.startswith("@@") or line.startswith("diff --git") or line.startswith("---") or line.startswith("+++"):
if unchanged_count > 2:
compressed_lines.append(f"... [omitted {unchanged_count} unchanged lines] ...")
else:
compressed_lines.extend(held_lines)
held_lines = []
unchanged_count = 0
compressed_lines.append(line)
else:
held_lines.append(line)
unchanged_count += 1
if held_lines:
if unchanged_count > 2:
compressed_lines.append(f"... [omitted {unchanged_count} unchanged lines] ...")
else:
compressed_lines.extend(held_lines)
return "\n".join(compressed_lines)
# Heuristic 2: Compress directory listings / file trees
is_tree_or_list = any("node_modules" in line or ".git/" in line or "dist/" in line for line in lines[:50])
if is_tree_or_list:
ignored_patterns = re.compile(r'(\.git/|node_modules/|__pycache__/|\.venv/|\.pyc$|\.o$|dist/|build/|\.next/)')
omitted = 0
for line in lines:
if ignored_patterns.search(line):
omitted += 1
else:
compressed_lines.append(line)
if omitted > 0:
compressed_lines.append(f"... [omitted {omitted} dependency/build files from tree] ...")
return "\n".join(compressed_lines)
# Heuristic 3: Compress long logs or multiple repeating lines
last_line = None
repeat_count = 0
for line in lines:
if line == last_line and line.strip():
repeat_count += 1
if repeat_count <= 1:
compressed_lines.append(line)
else:
if repeat_count > 1:
compressed_lines.append(f"... [repeated {repeat_count - 1} times] ...")
repeat_count = 0
compressed_lines.append(line)
last_line = line
if repeat_count > 1:
compressed_lines.append(f"... [repeated {repeat_count - 1} times] ...")
return "\n".join(compressed_lines)
def _compact_input(input_data):
if isinstance(input_data, str):
return input_data
if RTK_COMPRESSION and isinstance(input_data, list):
compressed_data = []
for item in input_data:
if isinstance(item, dict) and item.get("type") == "function_call_output":
o = item.get("output", "")
if isinstance(o, str) and o:
compressed = _rtk_compress_tool_output(o)
if len(compressed) < len(o):
print(f"[rtk] compressed tool output length: {len(o)} -> {len(compressed)} (-{((len(o)-len(compressed))/len(o))*100:.1f}%)", file=sys.stderr)
item = dict(item)
item["output"] = compressed
compressed_data.append(item)
input_data = compressed_data
if not isinstance(input_data, list) or len(input_data) <= _MAX_INPUT_ITEMS:
out = []
for item in input_data:
@@ -2129,11 +2265,21 @@ _PROVIDER_POLICIES = {
"tool_output_limit": 8000, "max_input_items": 200},
"googleapis": {"compaction": "conservative", "context_size": 1000000,
"tool_output_limit": 8000, "max_input_items": 250},
"groq": {"reasoning_mode": "off", "max_tokens": 32768, "strip_reasoning": True,
"tool_output_limit": 4000, "max_input_items": 20, "compaction": "aggressive",
"prompt_caching": "none"},
"cerebras": {"reasoning_mode": "off", "max_tokens": 32768, "strip_reasoning": True,
"tool_output_limit": 4000, "max_input_items": 20, "compaction": "aggressive",
"prompt_caching": "none"},
"xiaomimimo": {"reasoning_mode": "provider_default", "max_tokens": 65536, "strip_reasoning": True,
"tool_output_limit": 8000, "max_input_items": 40, "compaction": "balanced",
"prompt_caching": "none"},
}
_DEFAULT_PROVIDER_POLICY = {
"compaction": "balanced", "context_size": 128000,
"tool_output_limit": 6000, "max_input_items": 60,
"prompt_caching": "auto",
}
def provider_policy(target_url=None, backend=None):
@@ -2923,12 +3069,14 @@ def oa_stream_to_sse(chat_stream, model, req_id, _reasoning_out=None):
tc_buf = {}
fr = None
msg_opened = False
_last_chunk = {}
yield emit("response.created", {"type": "response.created",
"response": {"id": resp_id, "object": "response", "model": model,
"status": "in_progress", "created": int(time.time()), "output": []}})
yield emit("response.in_progress", {"type": "response.in_progress", "response": {"id": resp_id}})
_last_stream_usage = {}
for line in _stream_with_idle_timeout(chat_stream):
line = line.decode("utf-8", errors="replace").strip()
if not line or line.startswith(":") or line == "data: [DONE]":
@@ -2942,6 +3090,10 @@ def oa_stream_to_sse(chat_stream, model, req_id, _reasoning_out=None):
choices = chunk.get("choices", [])
if not choices:
continue
usage = chunk.get("usage")
if usage:
_last_stream_usage = usage
fr = choices[0].get("finish_reason")
delta = choices[0].get("delta", {})
fr = choices[0].get("finish_reason")
@@ -3130,6 +3282,8 @@ def an_stream_to_sse(stream, model, req_id):
tc_args = ""
block_type = None
stop_reason = "end_turn"
an_input_tokens = 0
an_output_tokens = 0
yield emit("response.created", {"type": "response.created",
"response": {"id": resp_id, "object": "response", "model": model,
@@ -3153,7 +3307,9 @@ def an_stream_to_sse(stream, model, req_id):
et = data.get("type", "")
if et == "message_start":
pass
msg_usage = data.get("message", {}).get("usage", {})
if msg_usage:
an_input_tokens = msg_usage.get("input_tokens", 0)
elif et == "content_block_start":
cb_type = data.get("content_block", {}).get("type", "")
@@ -3219,14 +3375,24 @@ def an_stream_to_sse(stream, model, req_id):
elif et == "message_delta":
stop_reason = data.get("delta", {}).get("stop_reason", "end_turn")
delta_usage = data.get("usage", {})
if delta_usage:
an_output_tokens = delta_usage.get("output_tokens", 0)
elif et == "message_stop":
sm = {"end_turn": "completed", "max_tokens": "incomplete",
"stop_sequence": "completed", "tool_use": "completed"}
status = sm.get(stop_reason, "incomplete")
_final_usage = {
"input_tokens": an_input_tokens,
"output_tokens": an_output_tokens,
"total_tokens": an_input_tokens + an_output_tokens
}
yield emit("response.completed", {"type": "response.completed",
"response": {"id": resp_id, "object": "response", "model": model,
"status": status, "created": int(time.time()), "output": completed}})
"status": status, "created": int(time.time()), "output": completed,
"usage": _final_usage}})
_DEFAULT_CC_CONFIG = {
"workingDir": tempfile.gettempdir(),
@@ -4086,7 +4252,7 @@ def cc_stream_to_sse(cc_stream, model, req_id):
total_usage = {}
_event_types_seen = set()
_debug_log_path = os.path.expanduser("~/.cache/codex-proxy/cc-debug.log")
_debug_log_path = os.path.join(_LOG_DIR, "cc-debug.log")
_debug_fh = open(_debug_log_path, "a") # [FIX 14] always write debug to FILE (not just stderr which may be piped)
_deflog = lambda *a, **kw: print(*a, file=_debug_fh, flush=True, **kw)
@@ -4924,6 +5090,80 @@ def _preprocess_vision_input(input_data, schema):
_MAX_REQLOG_LINES = 2000
def _log_cache_stats(raw_resp):
try:
usage = raw_resp.get("usage", {})
cached = usage.get("prompt_tokens_details", {}).get("cached_tokens", 0)
total = usage.get("prompt_tokens", 0) or usage.get("input_tokens", 0)
if cached and total:
pct = (cached / total * 100) if total else 0
print(f"[cache] stats: cached={cached}/{total} ({pct:.1f}%)", file=sys.stderr)
except Exception:
pass
def _extract_text_length(items):
if not items:
return 0
if isinstance(items, str):
return len(items)
if isinstance(items, dict):
items = [items]
if not isinstance(items, list):
return 0
total_len = 0
for item in items:
if not isinstance(item, dict):
continue
itype = item.get("type")
if itype == "message":
content = item.get("content", "")
if isinstance(content, str):
total_len += len(content)
elif isinstance(content, list):
for part in content:
if isinstance(part, dict):
total_len += len(part.get("text", part.get("input_text", "")))
elif itype == "function_call":
total_len += len(item.get("name", ""))
args = item.get("arguments", "")
if isinstance(args, str):
total_len += len(args)
elif isinstance(args, dict):
total_len += len(json.dumps(args))
elif itype == "function_call_output":
out = item.get("output", "")
if isinstance(out, str):
total_len += len(out)
elif isinstance(out, list):
for part in out:
if isinstance(part, dict):
total_len += len(part.get("text", ""))
return total_len
def _estimate_tokens_from_items(items):
return _extract_text_length(items) // 4
def _record_usage_with_tokens(provider, model, success, duration_s, raw_resp, input_items=None, output_items=None, error_type=None, **kwargs):
try:
usage = raw_resp.get("usage", {}) if isinstance(raw_resp, dict) else {}
if not usage and isinstance(raw_resp, dict) and ("input_tokens" in raw_resp or "prompt_tokens" in raw_resp):
usage = raw_resp
tokens_in = int(usage.get("prompt_tokens", 0) or usage.get("input_tokens", 0) or 0)
tokens_out = int(usage.get("completion_tokens", 0) or usage.get("output_tokens", 0) or 0)
# Fallback estimation heuristic
if tokens_in == 0 and input_items:
tokens_in = _estimate_tokens_from_items(input_items)
if tokens_out == 0 and output_items:
tokens_out = _estimate_tokens_from_items(output_items)
_record_usage(provider, model, success, duration_s, tokens_in, tokens_out, error_type)
except Exception as e:
print(f"[usage] error recording usage: {e}", file=sys.stderr)
_record_usage(provider, model, success, duration_s, error_type=error_type)
def _log_resp(resp_id, status, output):
try:
import datetime as _dt
@@ -5341,6 +5581,7 @@ def _antigravity_normalize_context(input_data, model=""):
class Handler(http.server.BaseHTTPRequestHandler):
protocol_version = "HTTP/1.1"
_request_failed = False
def do_GET(self):
if self.path in ("/v1/models", "/models"):
@@ -5431,9 +5672,24 @@ class Handler(http.server.BaseHTTPRequestHandler):
except Exception as e:
return self.send_json(400, {"error": {"message": f"Bad request: {e}"}})
if CAVEMAN_MODE and isinstance(body, dict):
instructions = body.get("instructions", "").strip()
caveman_prompt = "You are in Caveman Mode. Answer concisely, using technical terms. Strip away all pleasantries, greetings, introductory filler, and concluding remarks. Go straight to the answer/code."
if instructions:
instructions = f"{instructions}\n\n{caveman_prompt}"
else:
instructions = caveman_prompt
body["instructions"] = instructions
self._session_id = uuid.uuid4().hex[:8]
_sid = self._session_id
# Check Circuit Breaker
cb = get_circuit_breaker(BACKEND)
if not cb.can_execute():
print(f"[{_sid}] Circuit breaker is OPEN for backend {BACKEND}. Rejecting request.", file=sys.stderr)
return self.send_json(503, {"error": {"type": "circuit_breaker_open", "message": f"Circuit breaker is open for backend {BACKEND}"}})
import datetime as _dt
_log_path = os.path.join(_LOG_DIR, "requests.log")
_ts = _dt.datetime.now().isoformat()
@@ -5493,6 +5749,9 @@ class Handler(http.server.BaseHTTPRequestHandler):
wait_ms = (time.monotonic() - wait_start) * 1000
if wait_ms > 100:
print(f"[{_sid}] waited {wait_ms:.0f}ms for upstream slot (concurrency gate)", file=sys.stderr)
self._last_status = 200
self._request_failed = False
cb = get_circuit_breaker(BACKEND)
try:
with RequestTracker(request_id) as tracker:
if BACKEND == "auto":
@@ -5510,8 +5769,19 @@ class Handler(http.server.BaseHTTPRequestHandler):
self._handle_gemini_oauth(body, model, stream, tracker)
else:
self._handle_openai_compat(body, model, stream, tracker)
# Record success or failure
is_4xx = (400 <= getattr(self, "_last_status", 200) < 500)
if self._request_failed and not is_4xx:
cb.record_failure()
else:
cb.record_success()
update_snapshot_response(request_id, "completed", time.time() - _req_t0)
except Exception as _snap_err:
is_4xx = (400 <= getattr(self, "_last_status", 200) < 500)
if not is_4xx:
cb.record_failure()
update_snapshot_response(request_id, "error", time.time() - _req_t0, _snap_err)
raise
finally:
@@ -5681,6 +5951,12 @@ class Handler(http.server.BaseHTTPRequestHandler):
def _is_mimo_provider():
return "xiaomimimo.com" in TARGET_URL
@staticmethod
def _make_cache_key(model, instructions):
import hashlib
raw = f"{model}|{instructions}"
return hashlib.sha256(raw.encode()).hexdigest()[:16]
def _build_chat_body(self, model, messages, body, stream):
chat_body = {"model": model, "messages": messages}
is_mimo = self._is_mimo_provider()
@@ -5698,6 +5974,12 @@ class Handler(http.server.BaseHTTPRequestHandler):
if body.get("tool_choice"):
chat_body["tool_choice"] = body["tool_choice"]
chat_body["stream"] = stream
_pc_policy = provider_policy().get("prompt_caching", "auto")
if _pc_policy != "none" and not is_mimo:
_instr = body.get("instructions", "").strip()
if _instr:
chat_body["prompt_cache_key"] = self._make_cache_key(model, _instr)
chat_body["prompt_cache_retention"] = "24h"
if is_mimo:
if not REASONING_ENABLED or REASONING_EFFORT == "none":
chat_body["thinking"] = {"type": "disabled"}
@@ -5749,7 +6031,7 @@ class Handler(http.server.BaseHTTPRequestHandler):
body["input"] = input_data
access_token = _refresh_oauth_token()
token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", "google-antigravity-oauth-token.json")
token_path = os.path.join(_LOG_DIR, "google-antigravity-oauth-token.json")
project_id = ""
try:
with open(token_path) as f:
@@ -6411,6 +6693,8 @@ class Handler(http.server.BaseHTTPRequestHandler):
self.send_json(200, resp)
def _handle_gemini_oauth(self, body, model, stream, tracker=None):
ag_state = {}
_mp_oa = {"max_tool_calls": 150, "warn_tool_calls": 100}
input_data = body.get("input", "")
policy = provider_policy()
original_model = model
@@ -6505,7 +6789,7 @@ class Handler(http.server.BaseHTTPRequestHandler):
access_token = _refresh_oauth_token()
token_name = "google-antigravity-oauth-token.json" if OAUTH_PROVIDER == "google-antigravity" else "google-cli-oauth-token.json"
token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", token_name)
token_path = os.path.join(_LOG_DIR, token_name)
project_id = ""
try:
with open(token_path) as f:
@@ -7083,6 +7367,7 @@ class Handler(http.server.BaseHTTPRequestHandler):
buf = ""
stream_finished = False
last_finish = ""
_last_stream_usage = {}
try:
for raw_line in _stream_with_idle_timeout(upstream, _idle_timeout_for_model(model)):
if tracker and tracker.cancelled.is_set():
@@ -7097,6 +7382,13 @@ class Handler(http.server.BaseHTTPRequestHandler):
if not line.strip() and buf:
try:
chunk = json.loads(buf)
usage = chunk.get("response", chunk).get("usageMetadata", {})
if usage:
_last_stream_usage = {
"prompt_tokens": usage.get("promptTokenCount", 0),
"completion_tokens": usage.get("candidatesTokenCount", 0),
"total_tokens": usage.get("totalTokenCount", 0)
}
except Exception:
buf = ""
continue
@@ -7165,6 +7457,7 @@ class Handler(http.server.BaseHTTPRequestHandler):
buf += line
except TimeoutError as te:
print(f"[{self._session_id}] [antigravity-v2] STREAM TIMEOUT: {te}", file=sys.stderr)
self._request_failed = True
_log_resp(resp_id, "stream_timeout", [{"type": "error", "code": "stream_timeout", "message": str(te)}])
try:
flush_event("response.failed", {"type": "response.failed", "response": {"id": resp_id, "object": "response", "status": "failed", "error": {"type": "stream_timeout", "message": str(te)[:200]}}})
@@ -7176,6 +7469,11 @@ class Handler(http.server.BaseHTTPRequestHandler):
print(f"[{self._session_id}] [antigravity-v2] client disconnected during stream", file=sys.stderr)
_log_resp(resp_id, "client_disconnect", [])
return
except Exception as e:
print(f"[{self._session_id}] [antigravity-v2] stream error: {e}", file=sys.stderr)
self._request_failed = True
self.close_connection = True
return
if OAUTH_PROVIDER.startswith("google") and full_text and not current_tool_calls and last_finish == "MAX_TOKENS" and not stream_finished:
result = _auto_continue_gemini(self, flush_event, message_id, model, gen_config, gemini_tools, system_parts, project_id, headers, endpoints, url_suffix, full_text, output_items, message_started)
@@ -7205,6 +7503,10 @@ class Handler(http.server.BaseHTTPRequestHandler):
flush_event("response.completed", {"type": "response.completed", "response": final_resp})
self.close_connection = True
provider = TARGET_URL.split("//")[-1].split("/")[0]
success = not self._request_failed
_record_usage_with_tokens(provider, model, success, time.time() - created, _last_stream_usage, input_items=input_data, output_items=out)
with _response_store_lock:
_response_store[resp_id] = final_resp
while len(_response_store) > _MAX_STORED:
@@ -7239,6 +7541,17 @@ class Handler(http.server.BaseHTTPRequestHandler):
_response_store.popitem(last=False)
self.send_json(200, resp)
provider = TARGET_URL.split("//")[-1].split("/")[0]
usage = data.get("response", data).get("usageMetadata", {})
_usage_dict = {}
if usage:
_usage_dict = {
"prompt_tokens": usage.get("promptTokenCount", 0),
"completion_tokens": usage.get("candidatesTokenCount", 0),
"total_tokens": usage.get("totalTokenCount", 0)
}
_record_usage_with_tokens(provider, model, True, time.time() - created, _usage_dict, input_items=input_data, output_items=out)
def _handle_bgp(self, body, model, stream, messages, input_data):
routes = _sorted_bgp_routes()
routes = [r for r in routes if _bucket_for_route(r).allow()]
@@ -7376,13 +7689,15 @@ class Handler(http.server.BaseHTTPRequestHandler):
collected_events.append(event)
_observe_event(event)
print(f"[{self._session_id}] stream ended: events={len(collected_events)} finish={finish_reason} has_content={has_content} has_message={has_message} has_tool_call={has_tool_call} elapsed={time.time()-t0:.1f}s", file=sys.stderr)
pass
except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError):
print("[translate-proxy] client disconnected during stream", file=sys.stderr)
_crof_record(model, n_items, False)
_log_resp(last_resp_id, "client_disconnect", last_output)
return
except (TimeoutError, OSError, urllib.error.URLError) as e:
print(f"[translate-proxy] upstream error during stream: {type(e).__name__}: {e}", file=sys.stderr)
print(f"[translate-proxy upstream error during stream: {type(e).__name__}: {e}", file=sys.stderr)
self._request_failed = True
err_resp_id = body.get("request_id") or body.get("id") or uid("resp")
try:
self.wfile.write(emit("response.failed", {"type": "response.failed",
@@ -7411,7 +7726,7 @@ class Handler(http.server.BaseHTTPRequestHandler):
while len(_last_reasoning_store) > _MAX_STORED:
oldest = next(iter(_last_reasoning_store))
del _last_reasoning_store[oldest]
_record_usage(provider, model, success, time.time() - t0, error_type="length" if not success else None)
_record_usage_with_tokens(provider, model, success, time.time() - t0, _last_stream_usage, error_type="length" if not success else None, input_items=input_data, output_items=last_output)
# Auto-learn provider quirks before flushing the bad response to Codex.
if finish_reason == "length" and not has_content and has_function_call_output(input_data):
@@ -7647,15 +7962,17 @@ class Handler(http.server.BaseHTTPRequestHandler):
self.stream_buffered_events(collected_events)
else:
result = oa_resp_to_responses(json.loads(upstream.read()), model)
raw_resp = json.loads(upstream.read())
result = oa_resp_to_responses(raw_resp, model)
success = result.get("status") != "incomplete"
_log_cache_stats(raw_resp)
_crof_record(model, n_items, success)
_record_usage_with_tokens(provider, model, success, time.time() - t0, raw_resp, input_items=input_data, output_items=result.get("output", []))
self.send_json(200, result)
rid = result.get("id")
_log_resp(rid, result.get("status"), result.get("output", []))
if rid and input_data is not None:
store_response(rid, input_data, result.get("output", []))
_record_usage(provider, model, success, time.time() - t0)
def _forward_oa_compat_retry(self, req, model, chat_body, body, input_data, tracker=None):
try:
@@ -7692,7 +8009,7 @@ class Handler(http.server.BaseHTTPRequestHandler):
last_resp_id = d.get("response", {}).get("id")
last_output = d.get("response", {}).get("output", [])
last_status = d.get("response", {}).get("status")
except: pass
except (json.JSONDecodeError, KeyError, TypeError): pass
return True
self.stream_buffered_events(oa_stream_to_sse(upstream, model, body.get("request_id") or body.get("id")), on_event=on_event)
except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError):
@@ -7834,11 +8151,14 @@ class Handler(http.server.BaseHTTPRequestHandler):
_save_schema(schema, model=model)
t0 = time.time()
provider = TARGET_URL.split("//")[-1].split("/")[0]
if stream:
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.send_header("Connection", "keep-alive")
self.send_header("Connection", "close")
self.end_headers()
if hasattr(self, 'connection') and self.connection:
try:
@@ -7847,8 +8167,9 @@ class Handler(http.server.BaseHTTPRequestHandler):
pass
last_resp_id = None
last_output = None
last_usage = {}
def on_event(event):
nonlocal last_resp_id, last_output
nonlocal last_resp_id, last_output, last_usage
if tracker and tracker.cancelled.is_set():
print("[command-code] stream cancelled", file=sys.stderr)
return False
@@ -7859,12 +8180,14 @@ class Handler(http.server.BaseHTTPRequestHandler):
if d.get("type") == "response.completed":
last_resp_id = d.get("response", {}).get("id")
last_output = d.get("response", {}).get("output", [])
except: pass
last_usage = d.get("response", {}).get("usage", {})
except (json.JSONDecodeError, KeyError, TypeError): pass
return True
try:
self.stream_buffered_events(cc_stream_to_sse(upstream, model, body.get("request_id") or body.get("id")), on_event=on_event)
except Exception as e:
print(f"[{self._session_id}] stream error: {e}", file=sys.stderr)
self._request_failed = True
try:
err_event = 'data: ' + json.dumps({"type": "response.completed",
"response": {"id": body.get("request_id") or body.get("id") or uid("resp"),
@@ -7876,6 +8199,9 @@ class Handler(http.server.BaseHTTPRequestHandler):
self.wfile.flush()
except Exception:
pass
success = not self._request_failed
_record_usage_with_tokens(provider, model, success, time.time() - t0, last_usage, input_items=body.get("input", ""), output_items=last_output)
if last_resp_id:
store_response(last_resp_id, body.get("input", ""), last_output)
else:
@@ -7883,6 +8209,8 @@ class Handler(http.server.BaseHTTPRequestHandler):
result = cc_resp_to_responses(raw, model)
self.send_json(200, result)
rid = result.get("id")
success = result.get("status") != "incomplete"
_record_usage_with_tokens(provider, model, success, time.time() - t0, result.get("usage", {}), input_items=body.get("input", ""), output_items=result.get("output", []))
if rid:
store_response(rid, body.get("input", ""), result.get("output", []))
@@ -8451,12 +8779,18 @@ class Handler(http.server.BaseHTTPRequestHandler):
return
def _forward(self, req, stream, model, nonstream_fn, stream_fn, input_data=None, tracker=None):
t0 = time.time()
provider = TARGET_URL.split("//")[-1].split("/")[0]
try:
upstream = urllib.request.urlopen(req, timeout=_upstream_timeout({}, stream))
except urllib.error.HTTPError as e:
err = e.read().decode()
self._request_failed = True
_record_usage(provider, model, False, time.time() - t0, error_type=f"HTTP_{e.code}", tokens_in=_estimate_tokens_from_items(input_data))
return self.send_json(e.code, {"error": {"type": "upstream_error", "message": err}})
except Exception as e:
self._request_failed = True
_record_usage(provider, model, False, time.time() - t0, error_type=type(e).__name__, tokens_in=_estimate_tokens_from_items(input_data))
return self.send_json(500, {"error": {"type": "proxy_error", "message": str(e)}})
if stream:
@@ -8473,9 +8807,10 @@ class Handler(http.server.BaseHTTPRequestHandler):
last_resp_id = None
last_output = None
last_status = None
_last_stream_usage = {}
try:
def on_event(event):
nonlocal last_resp_id, last_output, last_status
nonlocal last_resp_id, last_output, last_status, _last_stream_usage
if tracker and tracker.cancelled.is_set():
print("[translate-proxy] stream cancelled", file=sys.stderr)
return False
@@ -8487,11 +8822,20 @@ class Handler(http.server.BaseHTTPRequestHandler):
last_resp_id = d.get("response", {}).get("id")
last_output = d.get("response", {}).get("output", [])
last_status = d.get("response", {}).get("status")
except: pass
resp_usage = d.get("response", {}).get("usage")
if resp_usage:
_last_stream_usage = resp_usage
except (json.JSONDecodeError, KeyError, TypeError): pass
return True
self.stream_buffered_events(stream_fn(upstream), on_event=on_event)
except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError):
print("[translate-proxy] client disconnected during stream", file=sys.stderr)
except Exception as e:
print(f"[translate-proxy] stream error: {e}", file=sys.stderr)
self._request_failed = True
success = (last_status == "completed")
_record_usage_with_tokens(provider, model, success, time.time() - t0, _last_stream_usage, input_items=input_data, output_items=last_output)
_log_resp(last_resp_id, last_status or "client_disconnect", last_output)
if last_resp_id and input_data is not None:
store_response(last_resp_id, input_data, last_output)
@@ -8499,11 +8843,16 @@ class Handler(http.server.BaseHTTPRequestHandler):
result = nonstream_fn(upstream)
self.send_json(200, result)
rid = result.get("id")
success = result.get("status") != "incomplete"
_record_usage_with_tokens(provider, model, success, time.time() - t0, result.get("usage", {}), input_items=input_data, output_items=result.get("output", []))
_log_resp(rid, result.get("status"), result.get("output", []))
if rid and input_data is not None:
store_response(rid, input_data, result.get("output", []))
def send_json(self, status, data):
self._last_status = status
if status >= 500:
self._request_failed = True
try:
body = json.dumps(data).encode()
self.send_response(status)
@@ -8695,8 +9044,9 @@ def main():
allow_reuse_address = True
daemon_threads = True
request_queue_size = 64
SERVER = ReusableHTTPServer(("127.0.0.1", PORT), Handler)
print(f"translate-proxy ({BACKEND}) listening on http://127.0.0.1:{PORT}", flush=True)
BIND_HOST = getattr(_args, "host", None) or os.environ.get("CODEX_HOST", "127.0.0.1")
SERVER = ReusableHTTPServer((BIND_HOST, PORT), Handler)
print(f"translate-proxy ({BACKEND}) listening on http://{BIND_HOST}:{PORT}", flush=True)
print(f"Target: {TARGET_URL}", flush=True)
print(f"Models: {[m['id'] for m in MODELS]}", flush=True)
if BACKEND in ("codebuff", "freebuff"):