v3.3.0: Antigravity OAuth + Gemini CLI OAuth, full Codex agent loop with tool calls, history hardening, SSE fixes

This commit is contained in:
Roman
2026-05-20 21:44:33 +04:00
Unverified
parent c0c4d7e420
commit 7aa1f10877
6 changed files with 1085 additions and 87 deletions

View File

@@ -11,7 +11,7 @@ Usage:
python3 translate-proxy.py --backend openai-compat --target-url https://... --api-key sk-...
"""
import json, http.server, socketserver, urllib.request, urllib.parse, urllib.error
import json, http.server, socketserver, urllib.request, urllib.parse, urllib.error, re
import time, uuid, os, sys, argparse, threading, socket, collections, contextlib, signal
# ═══════════════════════════════════════════════════════════════════
@@ -107,9 +107,57 @@ _active_connections = 0
_active_connections_lock = threading.Lock()
_pool = uuid.uuid4().hex[:8]
_antigravity_version = "1.18.3"
_antigravity_version_checked = 0
_antigravity_version_lock = threading.Lock()
def _fetch_antigravity_version():
cache_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", "antigravity-version.json")
try:
with open(cache_path) as f:
cached = json.load(f)
if cached.get("version") and cached.get("checked_at", 0) > time.time() - 6 * 3600:
return cached["version"]
except Exception:
pass
urls = [
("https://antigravity-auto-updater-974169037036.us-central1.run.app", None),
("https://antigravity.google/changelog", 5000),
]
for url, limit in urls:
try:
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
resp = urllib.request.urlopen(req, timeout=5)
text = resp.read().decode(errors="replace")
if limit:
text = text[:limit]
m = re.search(r"\d+\.\d+\.\d+", text)
if m:
version = m.group(0)
try:
os.makedirs(os.path.dirname(cache_path), exist_ok=True)
with open(cache_path, "w") as f:
json.dump({"version": version, "checked_at": time.time()}, f)
except Exception:
pass
return version
except Exception:
pass
return _antigravity_version
def _ensure_antigravity_version():
global _antigravity_version, _antigravity_version_checked
if time.time() - _antigravity_version_checked < 6 * 3600:
return _antigravity_version
with _antigravity_version_lock:
if time.time() - _antigravity_version_checked < 6 * 3600:
return _antigravity_version
_antigravity_version = _fetch_antigravity_version()
_antigravity_version_checked = time.time()
return _antigravity_version
def _init_runtime():
global CONFIG, PORT, BACKEND, TARGET_URL, API_KEY, OAUTH_PROVIDER
global CONFIG, PORT, BACKEND, TARGET_URL, API_KEY, OAUTH_PROVIDER, _antigravity_version
global MODELS, CC_VERSION, REASONING_ENABLED, REASONING_EFFORT, BGP_ROUTES
CONFIG = load_config()
@@ -117,12 +165,15 @@ def _init_runtime():
BACKEND = CONFIG["backend_type"]
TARGET_URL = CONFIG["target_url"].rstrip("/")
API_KEY = CONFIG["api_key"]
OAUTH_PROVIDER = CONFIG.get("oauth_provider", "")
OAUTH_PROVIDER = CONFIG.get("oauth_provider") or ""
MODELS = CONFIG["models"]
CC_VERSION = CONFIG.get("cc_version", "")
REASONING_ENABLED = CONFIG.get("reasoning_enabled", True)
REASONING_EFFORT = CONFIG.get("reasoning_effort", "medium")
BGP_ROUTES = CONFIG.get("bgp_routes", [])
if OAUTH_PROVIDER == "google-antigravity":
_antigravity_version = _ensure_antigravity_version()
print(f"[antigravity] version={_antigravity_version}", file=sys.stderr)
bgp_models = []
for _r in BGP_ROUTES:
@@ -134,13 +185,33 @@ def _init_runtime():
MODELS = [{"id": m, "object": "model", "created": 1700000000, "owned_by": "bgp"} for m in bgp_models]
CONFIG["models"] = MODELS
if (BACKEND or "").startswith("gemini-oauth") and (OAUTH_PROVIDER or "").startswith("google"):
token_name = "google-antigravity-oauth-token.json" if OAUTH_PROVIDER == "google-antigravity" else "google-cli-oauth-token.json"
token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", token_name)
try:
with open(token_path) as _tf:
_td = json.load(_tf)
_discovered = [] if OAUTH_PROVIDER == "google-antigravity" else _td.get("available_models", [])
if _discovered:
_seen = []
for _m in _discovered:
if _m not in _seen:
_seen.append(_m)
MODELS = [{"id": m, "object": "model", "created": 1700000000, "owned_by": "gemini-oauth"} for m in _seen]
CONFIG["models"] = MODELS
print(f"[gemini-oauth] loaded {len(_seen)} discovered models: {_seen}", file=sys.stderr)
except Exception:
pass
def _refresh_oauth_token():
return _refresh_oauth_token_for(API_KEY, OAUTH_PROVIDER)
def _refresh_oauth_token_for(api_key, oauth_provider):
if oauth_provider != "google":
oauth_provider = oauth_provider or ""
if not oauth_provider.startswith("google"):
return api_key
token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", "google-oauth-token.json")
token_name = "google-antigravity-oauth-token.json" if oauth_provider == "google-antigravity" else "google-cli-oauth-token.json"
token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", token_name)
if not os.path.exists(token_path):
return api_key
try:
@@ -329,6 +400,70 @@ _CROF_ADAPTIVE = {
"min_keep_recent": 4,
}
_BGP_STATS_PATH = os.path.join(_LOG_DIR, "bgp-route-stats.json")
_bgp_stats_lock = threading.Lock()
def _route_key(route):
return f"{route.get('name', '')}::{route.get('target_url', '')}::{route.get('model', '')}"
def _load_bgp_stats():
try:
if os.path.exists(_BGP_STATS_PATH):
return json.load(open(_BGP_STATS_PATH))
except Exception:
pass
return {}
def _save_bgp_stats(stats):
tmp = _BGP_STATS_PATH + ".tmp"
with open(tmp, "w") as f:
json.dump(stats, f, indent=2)
os.replace(tmp, _BGP_STATS_PATH)
def _score_route(route, stats):
key = _route_key(route)
rs = stats.get(key, {})
now = time.time()
if float(rs.get("open_until_ts", 0)) > now:
return 1_000_000
priority = int(route.get("priority", 99))
ewma = float(rs.get("ewma_latency_s", 0))
failures = int(rs.get("consecutive_failures", 0))
score = priority + min(ewma * 5, 50) + failures * 20
if float(rs.get("rate_limited_until", 0)) > now:
score += 500
return score
def _update_route_stats(route, success, duration_s, http_code=None, error_type=None):
with _bgp_stats_lock:
stats = _load_bgp_stats()
key = _route_key(route)
rs = stats.setdefault(key, {
"ewma_latency_s": duration_s, "consecutive_failures": 0,
"last_success": None, "last_failure": None,
"open_until_ts": 0, "rate_limited_until": 0, "last_error": None,
})
alpha = 0.25
rs["ewma_latency_s"] = alpha * duration_s + (1 - alpha) * float(rs.get("ewma_latency_s", duration_s))
if success:
rs["consecutive_failures"] = 0
rs["last_success"] = time.time()
else:
rs["consecutive_failures"] = int(rs.get("consecutive_failures", 0)) + 1
rs["last_failure"] = time.time()
rs["last_error"] = error_type or (f"http_{http_code}" if http_code else "unknown")
if http_code == 429:
rs["rate_limited_until"] = time.time() + 120
if rs["consecutive_failures"] >= 3:
rs["open_until_ts"] = time.time() + 60
rs["consecutive_failures"] = 0
_save_bgp_stats(stats)
def _sorted_bgp_routes():
with _bgp_stats_lock:
stats = _load_bgp_stats()
return sorted(BGP_ROUTES, key=lambda r: _score_route(r, stats))
def _crof_record(model, n_items, success):
if not isinstance(n_items, int) or n_items < 1:
return
@@ -536,6 +671,193 @@ def _compact_input(input_data):
print(f"[compact] {len(input_data)} items -> {len(head) + 1 + len(tail)} (compacted {len(body)} old items into summary)", file=sys.stderr)
return head + [summary_msg] + tail
# ═══════════════════════════════════════════════════════════════════
# Provider policies
# ═══════════════════════════════════════════════════════════════════
_PROVIDER_POLICIES = {
"crof": {"reasoning_mode": "off", "max_tokens": 32768, "strip_reasoning": True,
"tool_output_limit": 4000, "max_input_items": 18, "compaction": "aggressive"},
"chats-llm": {"reasoning_mode": "off", "max_tokens": 32768, "strip_reasoning": True,
"tool_output_limit": 4000, "max_input_items": 20, "compaction": "aggressive"},
"z.ai": {"reasoning_mode": "medium", "max_tokens": 65536, "strip_reasoning": True,
"tool_output_limit": 8000, "max_input_items": 40, "compaction": "balanced"},
"openrouter": {"reasoning_mode": "provider_default", "max_tokens": 32768, "strip_reasoning": True,
"tool_output_limit": 6000, "max_input_items": 35, "compaction": "balanced"},
"openadapter": {"reasoning_mode": "off", "max_tokens": 32768, "strip_reasoning": True,
"tool_output_limit": 6000, "max_input_items": 30, "compaction": "balanced"},
}
def provider_policy(target_url=None, backend=None):
host = urllib.parse.urlparse(target_url or TARGET_URL).netloc.lower()
for key, policy in _PROVIDER_POLICIES.items():
if key in host:
return policy
return {}
# ═══════════════════════════════════════════════════════════════════
# Adaptive context compaction (model-aware)
# ═══════════════════════════════════════════════════════════════════
_MODEL_CONTEXT = {
"gpt-4o": 128000, "gpt-4o-mini": 128000, "gpt-5": 128000,
"claude-sonnet": 200000, "claude-haiku": 200000,
"glm-5.1": 128000, "glm-5": 128000, "glm-4": 128000,
"deepseek": 64000, "gemini-2.5-flash": 1000000, "gemini-2.5-pro": 2000000,
"mimo": 32768, "minimax": 32768, "kimi": 128000,
"_default": 32768,
}
def _context_limit_for_model(model):
if not model:
return _MODEL_CONTEXT["_default"]
ml = model.lower()
for key, limit in _MODEL_CONTEXT.items():
if key != "_default" and key in ml:
return limit
return _MODEL_CONTEXT["_default"]
def _estimate_tokens(obj):
if obj is None:
return 0
if isinstance(obj, str):
return max(1, len(obj) // 4)
try:
raw = json.dumps(obj, ensure_ascii=False)
except Exception:
raw = str(obj)
return max(1, len(raw) // 4)
def _adaptive_compact(input_data, model, policy=None):
policy = policy or {}
context_size = int(policy.get("context_size", _context_limit_for_model(model)))
input_budget = int(context_size * 0.60)
estimated = _estimate_tokens(input_data)
if estimated <= input_budget:
return input_data, False
if not isinstance(input_data, list):
return input_data, False
reduction = max(0.15, input_budget / max(estimated, 1))
target_items = max(int(len(input_data) * reduction), 6)
if target_items >= len(input_data):
return input_data, False
head_end = 0
for i, item in enumerate(input_data):
t = item.get("type")
if t == "message" and item.get("role") in ("developer", "system"):
head_end = i + 1
elif t == "message" and item.get("role") == "user" and head_end == i:
head_end = i + 1
else:
break
head = input_data[:head_end]
keep = max(4, target_items // 3)
tail_start = max(head_end, len(input_data) - keep)
while tail_start > head_end:
t = input_data[tail_start].get("type")
if t in ("function_call_output", "function_call"):
tail_start -= 1
elif t == "message" and input_data[tail_start].get("role") == "assistant":
tail_start -= 1
else:
break
tail = input_data[tail_start:]
body = input_data[head_end:tail_start]
if not body:
return head + tail, True
summary_lines = [f"[Auto-compacted: {len(body)} turns removed (budget={input_budget}tok, model={model})]"]
for item in body[-5:]:
summary_lines.append(_item_summary(item, max_len=120))
summary_msg = {"type": "message", "role": "user",
"content": [{"type": "input_text", "text": "\n".join(summary_lines)}]}
print(f"[adaptive-compact] model={model} est={estimated}tok budget={input_budget}tok "
f"items {len(input_data)}->{len(head)+1+len(tail)}", file=sys.stderr)
return head + [summary_msg] + tail, True
# ═══════════════════════════════════════════════════════════════════
# Tool-call pairing validator
# ═══════════════════════════════════════════════════════════════════
def validate_tool_pairs(input_items):
if not isinstance(input_items, list):
return []
calls = {}
errors = []
for idx, item in enumerate(input_items):
t = item.get("type")
if t == "function_call":
cid = item.get("call_id") or item.get("id")
if cid:
calls[cid] = idx
elif t == "function_call_output":
cid = item.get("call_id") or item.get("id")
if not cid or cid not in calls:
errors.append({"index": idx, "call_id": cid, "error": "orphan_function_call_output"})
return errors
def repair_orphan_tool_outputs(input_items, errors):
bad = {e["index"] for e in errors}
repaired = []
for idx, item in enumerate(input_items):
if idx in bad:
output = item.get("output", "")
repaired.append({"type": "message", "role": "user",
"content": [{"type": "input_text",
"text": f"[Proxy: unmatched tool output]\n{str(output)[:4000]}"}]})
else:
repaired.append(item)
return repaired
# ═══════════════════════════════════════════════════════════════════
# Log redaction
# ═══════════════════════════════════════════════════════════════════
_SECRET_PATTERNS = [
(r"sk-[A-Za-z0-9_\-]{20,}", "[REDACTED:key]"),
(r"sk-ant-[A-Za-z0-9_\-]{20,}", "[REDACTED:anthropic]"),
(r"gh[pousr]_[A-Za-z0-9_]{20,}", "[REDACTED:github]"),
(r"Bearer\s+[A-Za-z0-9._\-]{20,}", "Bearer [REDACTED]"),
]
def _redact(text):
if not text:
return text
import re
for pattern, replacement in _SECRET_PATTERNS:
text = re.sub(pattern, replacement, text)
return text
# ═══════════════════════════════════════════════════════════════════
# Rate-limit token buckets
# ═══════════════════════════════════════════════════════════════════
class TokenBucket:
def __init__(self, capacity=10, refill=1.0):
self.capacity = float(capacity)
self.tokens = float(capacity)
self.refill = float(refill)
self.updated = time.monotonic()
self.lock = threading.Lock()
def allow(self, cost=1):
with self.lock:
now = time.monotonic()
self.tokens = min(self.capacity, self.tokens + (now - self.updated) * self.refill)
self.updated = now
if self.tokens >= cost:
self.tokens -= cost
return True
return False
_rate_buckets = {}
_rate_buckets_lock = threading.Lock()
def _bucket_for_route(route):
name = route.get("name") or route.get("target_url") or "default"
with _rate_buckets_lock:
if name not in _rate_buckets:
_rate_buckets[name] = TokenBucket(capacity=10, refill=1.0)
return _rate_buckets[name]
# ═══════════════════════════════════════════════════════════════════
# OpenAI-compat backend
# ═══════════════════════════════════════════════════════════════════
@@ -1154,14 +1476,31 @@ class Handler(http.server.BaseHTTPRequestHandler):
self._handle_anthropic(body, model, stream)
elif BACKEND == "command-code":
self._handle_command_code(body, model, stream)
elif (BACKEND or "").startswith("gemini-oauth"):
self._handle_gemini_oauth(body, model, stream)
else:
self._handle_openai_compat(body, model, stream)
def _handle_openai_compat(self, body, model, stream):
input_data = body.get("input", "")
policy = provider_policy()
pair_errors = validate_tool_pairs(input_data)
if pair_errors:
print(f"[tool-validator] repairing {len(pair_errors)} orphan tool outputs", file=sys.stderr)
input_data = repair_orphan_tool_outputs(input_data, pair_errors)
body = dict(body)
body["input"] = input_data
compacted = False
if policy.get("compaction") and isinstance(input_data, list):
input_data, compacted = _adaptive_compact(input_data, model, policy)
if compacted:
body = dict(body)
body["input"] = input_data
crof_limit = _crof_item_limit(model)
if isinstance(input_data, list) and len(input_data) > crof_limit:
if not compacted and isinstance(input_data, list) and len(input_data) > crof_limit:
print(f"[crof-adaptive] proactive compact: {len(input_data)} items > limit {crof_limit}", file=sys.stderr)
input_data = _crof_compact_for_retry(input_data, model)
body = dict(body)
@@ -1228,8 +1567,379 @@ class Handler(http.server.BaseHTTPRequestHandler):
chat_body["reasoning_effort"] = REASONING_EFFORT
return chat_body
def _handle_gemini_oauth(self, body, model, stream):
input_data = body.get("input", "")
policy = provider_policy()
if OAUTH_PROVIDER == "google-antigravity":
alias_map = {
"antigravity-gemini-3-flash": "gemini-3-flash",
"antigravity-gemini-3-pro": "gemini-3-pro-low",
"antigravity-gemini-3.1-pro": "gemini-3.1-pro-low",
"gemini-3-flash-preview": "gemini-3-flash",
"gemini-3-pro-preview": "gemini-3-pro-low",
"gemini-3.1-pro-preview": "gemini-3.1-pro-low",
"gemini-3-pro": "gemini-3-pro-low",
"gemini-3.1-pro": "gemini-3.1-pro-low",
"antigravity-claude-sonnet-4-6": "claude-sonnet-4-6",
"antigravity-claude-opus-4-6-thinking": "claude-opus-4-6-thinking",
}
model = alias_map.get(model, model)
pair_errors = validate_tool_pairs(input_data)
if pair_errors:
input_data = repair_orphan_tool_outputs(input_data, pair_errors)
body = dict(body)
body["input"] = input_data
compacted = False
if policy.get("compaction") and isinstance(input_data, list):
input_data, compacted = _adaptive_compact(input_data, model, policy)
if compacted:
body = dict(body)
body["input"] = input_data
access_token = _refresh_oauth_token()
token_name = "google-antigravity-oauth-token.json" if OAUTH_PROVIDER == "google-antigravity" else "google-cli-oauth-token.json"
token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", token_name)
project_id = ""
try:
with open(token_path) as f:
project_id = json.load(f).get("project_id", "")
except Exception:
pass
contents = []
system_parts = []
instructions = body.get("instructions", "").strip()
tool_call_names = {}
if isinstance(input_data, list):
for item in input_data:
t = item.get("type")
if t == "message":
role = "user" if item.get("role") == "user" else "model"
content = item.get("content", "")
if isinstance(content, list):
parts = []
for c in content:
ct = c.get("type")
if ct == "input_text":
parts.append({"text": c.get("text", "")})
elif ct == "text":
parts.append({"text": c.get("text", "")})
elif ct == "input_image" or ct == "image_url":
iu = c.get("image_url") or c.get("url", {})
url = iu.get("url", iu) if isinstance(iu, dict) else iu
if isinstance(url, str) and url.startswith("data:"):
mime, _, b64 = url.partition(";base64,")
mime = mime.replace("data:", "") or "image/png"
parts.append({"inlineData": {"mimeType": mime, "data": b64}})
else:
parts.append({"text": str(url)})
if parts:
contents.append({"role": role, "parts": parts})
elif isinstance(content, str):
contents.append({"role": role, "parts": [{"text": content}]})
elif t == "function_call":
call_id = item.get("call_id") or item.get("id") or f"call_{uuid.uuid4().hex[:24]}"
fname = item.get("name", "")
if call_id and fname:
tool_call_names[call_id] = fname
args = item.get("arguments", "{}")
if isinstance(args, str):
try:
args = json.loads(args)
except Exception:
args = {}
contents.append({"role": "model", "parts": [{"functionCall": {"name": fname, "args": args, "id": call_id}, "thoughtSignature": "skip_thought_signature_validator"}]})
elif t == "function_call_output":
call_id = item.get("call_id", item.get("id", ""))
output = item.get("output", "")
fname = item.get("name", "") or tool_call_names.get(call_id, "")
try:
output_parsed = json.loads(output) if isinstance(output, str) else output
except Exception:
output_parsed = output
resp_part = {"functionResponse": {"name": fname or "unknown", "response": {"result": output_parsed if isinstance(output_parsed, (dict, list)) else output}}}
if call_id:
resp_part["functionResponse"]["id"] = call_id
contents.append({"role": "user", "parts": [resp_part]})
if OAUTH_PROVIDER.startswith("google"):
sanitized = []
last_user_text = None
last_role = None
for content in contents:
role = content.get("role")
parts = [p for p in content.get("parts", []) if isinstance(p, dict)]
if not parts:
continue
text_key = "\n".join([p.get("text", "") for p in parts if "text" in p]).strip()
if role == "user" and text_key and text_key == last_user_text:
continue
if role == last_role and role in ("user", "model") and sanitized:
sanitized[-1].setdefault("parts", []).extend(parts)
else:
sanitized.append({"role": role, "parts": parts})
if role == "user" and text_key:
last_user_text = text_key
last_role = role
while sanitized and sanitized[0].get("role") != "user":
sanitized.pop(0)
while sanitized and sanitized[-1].get("role") != "user":
sanitized.pop()
contents = sanitized
if instructions:
system_parts.append({"text": instructions})
if OAUTH_PROVIDER == "google-antigravity":
system_parts.append({"text": (
"You are connected through a Responses API translation proxy. "
"If tools are available and the user's request requires changing files, call the appropriate tool immediately. "
"Do not announce plans, do not say you will list files, browse, fetch, inspect, or start by exploring unless you are emitting the actual tool call in the same response. "
"For file creation requests, use tools to create or modify the file instead of only printing code in chat. "
"If no suitable tool is available, answer directly with the complete result. "
"Never answer only with a plan such as 'I will start by...' or 'I am going to...'."
)})
gen_config = {}
mot = body.get("max_output_tokens", 0)
if mot:
gen_config["maxOutputTokens"] = mot
if body.get("temperature") is not None:
gen_config["temperature"] = body["temperature"]
if body.get("top_p") is not None:
gen_config["topP"] = body["top_p"]
if REASONING_ENABLED and REASONING_EFFORT != "none":
budget = {"low": 2048, "medium": 8192, "high": 24576}.get(REASONING_EFFORT, 8192)
gen_config["thinkingConfig"] = {"includeThoughts": True, "thinkingBudget": budget}
oa_tools = body.get("tools", [])
gemini_tools = []
if oa_tools:
func_decls = []
for tool in oa_tools:
ttype = tool.get("type", "function")
fname = tool.get("name", "")
if ttype == "function":
fn = tool.get("function", tool)
name = fn.get("name", fname)
desc = fn.get("description", "")
params = fn.get("parameters", fn.get("input_schema", {}))
func_decls.append({"name": name, "description": desc, "parameters": params})
elif fname:
func_decls.append({"name": fname, "description": tool.get("description", ""), "parameters": tool.get("parameters", {"type": "object", "properties": {}})})
if func_decls:
gemini_tools = [{"functionDeclarations": func_decls}]
request_body = {"contents": contents}
if system_parts:
request_body["systemInstruction"] = {"parts": system_parts}
if gen_config:
request_body["generationConfig"] = gen_config
if gemini_tools:
request_body["tools"] = gemini_tools
wrapped = {
"project": project_id,
"model": model,
"request": request_body,
}
if OAUTH_PROVIDER == "google-antigravity":
wrapped["requestType"] = "agent"
wrapped["userAgent"] = "antigravity"
wrapped["requestId"] = f"agent-{uuid.uuid4().hex[:12]}"
endpoints = ([
"https://daily-cloudcode-pa.sandbox.googleapis.com",
"https://autopush-cloudcode-pa.sandbox.googleapis.com",
"https://cloudcode-pa.googleapis.com",
] if OAUTH_PROVIDER == "google-antigravity" else [
"https://cloudcode-pa.googleapis.com",
])
action = "streamGenerateContent" if stream else "generateContent"
url_suffix = f"v1internal:{action}?alt=sse" if stream else f"v1internal:{action}"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {access_token}",
}
if OAUTH_PROVIDER == "google-antigravity":
version = _ensure_antigravity_version()
headers["User-Agent"] = f"antigravity/{version} darwin/arm64"
else:
headers["User-Agent"] = "google-api-nodejs-client/9.15.1"
headers["X-Goog-Api-Client"] = "gl-node/22.17.0"
headers["Client-Metadata"] = "ideType=IDE_UNSPECIFIED,platform=PLATFORM_UNSPECIFIED,pluginType=GEMINI"
body_b = json.dumps(wrapped).encode()
print(f"[gemini-oauth] model={model} stream={stream} items={len(input_data) if isinstance(input_data, list) else 1} project={project_id}", file=sys.stderr)
for ep in endpoints:
target = f"{ep}/{url_suffix}"
req = urllib.request.Request(target, data=body_b, headers=headers)
try:
upstream = urllib.request.urlopen(req, timeout=_upstream_timeout(body, stream))
break
except urllib.error.HTTPError as e:
err_body = e.read().decode()
if e.code == 400 and OAUTH_PROVIDER.startswith("google"):
try:
debug_path = os.path.join(_LOG_DIR, "gemini-last-400-request.json")
with open(debug_path, "w") as dbg:
json.dump({"endpoint": ep, "model": model, "wrapped": wrapped, "error": err_body}, dbg, indent=2)
print(f"[gemini-oauth] saved 400 debug request to {debug_path}", file=sys.stderr)
except Exception:
pass
if e.code == 429 and ep != endpoints[-1]:
print(f"[gemini-oauth] {ep} HTTP 429, trying next endpoint", file=sys.stderr)
continue
return self.send_json(e.code, {"error": {"type": "upstream_error", "message": err_body}})
except Exception as e:
if ep == endpoints[-1]:
return self.send_json(502, {"error": {"type": "proxy_error", "message": str(e)}})
print(f"[gemini-oauth] {ep} failed: {e}, trying next", file=sys.stderr)
continue
if stream:
self._forward_gemini_sse(upstream, model, body, input_data)
else:
self._forward_gemini_json(upstream, model, body, input_data)
def _forward_gemini_sse(self, upstream, model, body, input_data):
resp_id = f"resp-{uuid.uuid4().hex[:24]}"
created = int(time.time())
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.send_header("Connection", "keep-alive")
self.end_headers()
full_text = ""
output_items = []
current_tool_calls = {}
message_started = False
message_id = f"msg-{uuid.uuid4().hex[:24]}"
def flush_event(event_type, data):
self.wfile.write(f"event: {event_type}\ndata: {json.dumps(data)}\n\n".encode())
self.wfile.flush()
flush_event("response.created", {"type": "response.created", "response": {"id": resp_id, "object": "response", "model": model, "status": "in_progress", "created": created, "output": []}})
flush_event("response.in_progress", {"type": "response.in_progress", "response": {"id": resp_id}})
buf = ""
stream_finished = False
for raw_line in upstream:
if stream_finished:
break
line = raw_line.decode(errors="replace")
if line.startswith("data: "):
buf += line[6:]
continue
if not line.strip() and buf:
try:
chunk = json.loads(buf)
except Exception:
buf = ""
continue
buf = ""
candidates = chunk.get("response", chunk).get("candidates", [])
if not candidates:
if chunk.get("error"):
print(f"[gemini-oauth] stream error chunk: {str(chunk.get('error'))[:300]}", file=sys.stderr)
continue
if candidates[0].get("finishReason") and not candidates[0].get("content", {}).get("parts"):
print(f"[gemini-oauth] finish without parts: {candidates[0].get('finishReason')}", file=sys.stderr)
parts = candidates[0].get("content", {}).get("parts", [])
for part in parts:
if part.get("thought"):
continue
if "text" in part and not part.get("functionCall"):
text_delta = part["text"]
if not text_delta:
continue
full_text += text_delta
if not message_started:
flush_event("response.output_item.added", {"type": "response.output_item.added", "output_index": 0, "item": {"type": "message", "id": message_id, "role": "assistant", "content": []}})
flush_event("response.content_part.added", {"type": "response.content_part.added", "output_index": 0, "content_index": 0, "part": {"type": "output_text", "text": ""}})
output_items.append({"text": True})
message_started = True
flush_event("response.output_text.delta", {"type": "response.output_text.delta", "output_index": 0, "content_index": 0, "delta": text_delta})
elif part.get("functionCall"):
fc = part["functionCall"]
call_id = f"call_{uuid.uuid4().hex[:24]}"
args_str = json.dumps(fc.get("args", fc.get("arguments", {})))
output_index = len(output_items)
flush_event("response.output_item.added", {"type": "response.output_item.added", "output_index": output_index, "item": {"type": "function_call", "id": call_id, "call_id": call_id, "name": fc.get("name", ""), "arguments": ""}})
flush_event("response.function_call_arguments.delta", {"type": "response.function_call_arguments.delta", "output_index": output_index, "item_id": call_id, "delta": args_str})
flush_event("response.function_call_arguments.done", {"type": "response.function_call_arguments.done", "output_index": output_index, "item_id": call_id, "arguments": args_str})
current_tool_calls[call_id] = fc
output_items.append({"tool": True})
if OAUTH_PROVIDER == "google-antigravity" and full_text and candidates[0].get("finishReason"):
stream_finished = True
break
out = []
if not full_text and not current_tool_calls:
print("[gemini-oauth] WARNING: completed with empty output", file=sys.stderr)
if full_text:
out.append({"type": "message", "id": message_id, "role": "assistant", "content": [{"type": "output_text", "text": full_text}]})
tool_outputs = []
for cid, fc in current_tool_calls.items():
tool_outputs.append({"type": "function_call", "id": cid, "call_id": cid, "name": fc.get("name", ""), "arguments": json.dumps(fc.get("args", fc.get("arguments", {})))})
out.extend(tool_outputs)
final_resp = {"id": resp_id, "object": "response", "model": model, "status": "completed", "created": created, "output": out}
if full_text:
flush_event("response.output_text.done", {"type": "response.output_text.done", "output_index": 0, "content_index": 0, "text": full_text})
flush_event("response.content_part.done", {"type": "response.content_part.done", "output_index": 0, "content_index": 0, "part": {"type": "output_text", "text": full_text}})
flush_event("response.output_item.done", {"type": "response.output_item.done", "output_index": 0, "item": out[0]})
for idx, item in enumerate(tool_outputs, start=(1 if full_text else 0)):
flush_event("response.output_item.done", {"type": "response.output_item.done", "output_index": idx, "item": item})
flush_event("response.completed", {"type": "response.completed", "response": final_resp})
self.close_connection = True
with _response_store_lock:
_response_store[resp_id] = final_resp
while len(_response_store) > _MAX_STORED:
_response_store.popitem(last=False)
def _forward_gemini_json(self, upstream, model, body, input_data):
data = json.loads(upstream.read().decode())
resp_id = f"resp-{uuid.uuid4().hex[:24]}"
created = int(time.time())
out = []
full_text = ""
candidates = data.get("response", data).get("candidates", [])
if candidates:
parts = candidates[0].get("content", {}).get("parts", [])
text_parts = []
for part in parts:
if part.get("thought"):
continue
if "text" in part and not part.get("functionCall"):
text_parts.append(part["text"])
elif part.get("functionCall"):
fc = part["functionCall"]
call_id = f"call_{uuid.uuid4().hex[:24]}"
out.append({"type": "function_call", "id": call_id, "call_id": call_id, "name": fc.get("name", ""), "arguments": json.dumps(fc.get("args", fc.get("arguments", {})))})
if text_parts:
full_text = "".join(text_parts)
out.insert(0, {"type": "message", "id": f"msg-{uuid.uuid4().hex[:24]}", "role": "assistant", "content": [{"type": "output_text", "text": full_text}]})
resp = {"id": resp_id, "object": "response", "model": model, "status": "completed", "created": created, "output": out}
with _response_store_lock:
_response_store[resp_id] = resp
while len(_response_store) > _MAX_STORED:
_response_store.popitem(last=False)
self.send_json(200, resp)
def _handle_bgp(self, body, model, stream, messages, input_data):
routes = sorted(BGP_ROUTES, key=lambda r: r.get("priority", 99))
routes = _sorted_bgp_routes()
routes = [r for r in routes if _bucket_for_route(r).allow()]
if not routes:
return self.send_json(503, {"error": {"type": "bgp_rate_limited", "message": "All routes rate-limited"}})
errors = []
for route in routes:
r_model = route.get("model", model)
@@ -1266,11 +1976,13 @@ class Handler(http.server.BaseHTTPRequestHandler):
}, browser_ua=True)
print(f"[bgp] trying route '{route.get('name', r_url)}' model={r_model}", file=sys.stderr)
req = urllib.request.Request(target, data=json.dumps(chat_body).encode(), headers=fwd)
t0_route = time.time()
route_ok = False
for attempt in range(3):
try:
upstream = urllib.request.urlopen(req, timeout=_upstream_timeout(body, stream))
print(f"[bgp] route '{route.get('name', r_url)}' connected OK", file=sys.stderr)
_update_route_stats(route, True, time.time() - t0_route)
self._forward_oa_compat(upstream, stream, r_model, chat_body, body, input_data, fwd, target)
return
except urllib.error.HTTPError as e:
@@ -1282,6 +1994,7 @@ class Handler(http.server.BaseHTTPRequestHandler):
req = urllib.request.Request(target, data=json.dumps(chat_body).encode(), headers=fwd)
continue
print(f"[bgp] route '{route.get('name', r_url)}' FAILED: HTTP {e.code}: {err[:200]}", file=sys.stderr)
_update_route_stats(route, False, time.time() - t0_route, http_code=e.code)
errors.append(f"{route.get('name','?')}: HTTP {e.code}")
break
except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError) as e:
@@ -1291,10 +2004,12 @@ class Handler(http.server.BaseHTTPRequestHandler):
time.sleep(wait)
req = urllib.request.Request(target, data=json.dumps(chat_body).encode(), headers=fwd)
continue
_update_route_stats(route, False, time.time() - t0_route, error_type=str(e))
errors.append(f"{route.get('name','?')}: {e}")
break
except Exception as e:
print(f"[bgp] route '{route.get('name', r_url)}' FAILED: {e}", file=sys.stderr)
_update_route_stats(route, False, time.time() - t0_route, error_type=str(e))
errors.append(f"{route.get('name','?')}: {e}")
break