v3.10.12: replace parallel probe with sticky+sequential fallback (matches reference)
This commit is contained in:
Binary file not shown.
@@ -5186,22 +5186,25 @@ class Handler(http.server.BaseHTTPRequestHandler):
|
|||||||
_pref = _antigravity_preferred_endpoint
|
_pref = _antigravity_preferred_endpoint
|
||||||
|
|
||||||
if _pref and _pref in endpoints:
|
if _pref and _pref in endpoints:
|
||||||
ep = _pref
|
ordered = [_pref] + [e for e in endpoints if e != _pref]
|
||||||
|
else:
|
||||||
|
ordered = list(endpoints)
|
||||||
|
|
||||||
|
for ep in ordered:
|
||||||
target = f"{ep}/{url_suffix}"
|
target = f"{ep}/{url_suffix}"
|
||||||
req = urllib.request.Request(target, data=body_b, headers=headers)
|
req = urllib.request.Request(target, data=body_b, headers=headers)
|
||||||
try:
|
try:
|
||||||
upstream = urllib.request.urlopen(req, timeout=_upstream_timeout(body, stream))
|
upstream = urllib.request.urlopen(req, timeout=_upstream_timeout(body, stream))
|
||||||
chosen_ep = ep
|
chosen_ep = ep
|
||||||
print(f"[{self._session_id}] sticky OK: {ep.replace('https://','')}", file=sys.stderr)
|
with _antigravity_endpoint_lock:
|
||||||
|
_antigravity_preferred_endpoint = ep
|
||||||
|
if ep != _pref:
|
||||||
|
print(f"[{self._session_id}] fallback OK: {ep.replace('https://','')}", file=sys.stderr)
|
||||||
|
break
|
||||||
except urllib.error.HTTPError as e:
|
except urllib.error.HTTPError as e:
|
||||||
err_body = e.read().decode()
|
err_body = e.read().decode()
|
||||||
if e.code in (429, 502, 503):
|
err_class = _classify_antigravity_error(e.code, err_body)
|
||||||
print(f"[{self._session_id}] sticky {ep.replace('https://','')} failed ({e.code}), parallel probing all", file=sys.stderr)
|
print(f"[{self._session_id}] {ep.replace('https://','')} {e.code} class={err_class}", file=sys.stderr)
|
||||||
with _antigravity_endpoint_lock:
|
|
||||||
if _antigravity_preferred_endpoint == ep:
|
|
||||||
_antigravity_preferred_endpoint = None
|
|
||||||
upstream = None
|
|
||||||
else:
|
|
||||||
if e.code == 400 and OAUTH_PROVIDER.startswith("google"):
|
if e.code == 400 and OAUTH_PROVIDER.startswith("google"):
|
||||||
try:
|
try:
|
||||||
debug_path = os.path.join(_LOG_DIR, "gemini-last-400-request.json")
|
debug_path = os.path.join(_LOG_DIR, "gemini-last-400-request.json")
|
||||||
@@ -5211,84 +5214,36 @@ class Handler(http.server.BaseHTTPRequestHandler):
|
|||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
return self.send_json(e.code, {"error": {"type": "upstream_error", "message": _sanitize_err_body(err_body)}})
|
return self.send_json(e.code, {"error": {"type": "upstream_error", "message": _sanitize_err_body(err_body)}})
|
||||||
except Exception:
|
if err_class == "auth_permanent":
|
||||||
with _antigravity_endpoint_lock:
|
return self.send_json(e.code, {"error": {"type": "upstream_error", "message": _sanitize_err_body(err_body)}})
|
||||||
if _antigravity_preferred_endpoint == ep:
|
|
||||||
_antigravity_preferred_endpoint = None
|
|
||||||
upstream = None
|
|
||||||
print(f"[{self._session_id}] sticky {ep.replace('https://','')} conn failed, parallel probing", file=sys.stderr)
|
|
||||||
|
|
||||||
if upstream is None:
|
|
||||||
_probe_results = {}
|
|
||||||
_probe_winner = threading.Event()
|
|
||||||
_probe_data = [None, None]
|
|
||||||
|
|
||||||
def _probe_try(ep):
|
|
||||||
if _probe_winner.is_set():
|
|
||||||
return
|
|
||||||
target = f"{ep}/{url_suffix}"
|
|
||||||
req = urllib.request.Request(target, data=body_b, headers=headers)
|
|
||||||
try:
|
|
||||||
resp = urllib.request.urlopen(req, timeout=30)
|
|
||||||
if _probe_winner.is_set():
|
|
||||||
try: resp.close()
|
|
||||||
except: pass
|
|
||||||
return
|
|
||||||
_probe_data[0] = resp
|
|
||||||
_probe_data[1] = ep
|
|
||||||
_probe_winner.set()
|
|
||||||
except urllib.error.HTTPError as e:
|
|
||||||
err_body = e.read().decode()
|
|
||||||
_probe_results[ep] = (e.code, err_body)
|
|
||||||
except Exception as e:
|
|
||||||
_probe_results[ep] = (0, str(e))
|
|
||||||
|
|
||||||
probe_threads = []
|
|
||||||
for ep in endpoints:
|
|
||||||
t = threading.Thread(target=_probe_try, args=(ep,), daemon=True)
|
|
||||||
t.start()
|
|
||||||
probe_threads.append(t)
|
|
||||||
|
|
||||||
_probe_winner.wait(timeout=30)
|
|
||||||
|
|
||||||
if _probe_data[0] is not None:
|
|
||||||
upstream = _probe_data[0]
|
|
||||||
chosen_ep = _probe_data[1]
|
|
||||||
with _antigravity_endpoint_lock:
|
|
||||||
_antigravity_preferred_endpoint = chosen_ep
|
|
||||||
print(f"[{self._session_id}] parallel probe winner: {chosen_ep.replace('https://','')}", file=sys.stderr)
|
|
||||||
else:
|
|
||||||
for t in probe_threads:
|
|
||||||
t.join(timeout=5)
|
|
||||||
best_err = None
|
|
||||||
best_ep = None
|
|
||||||
for ep in endpoints:
|
|
||||||
if ep in _probe_results:
|
|
||||||
code, err_body = _probe_results[ep]
|
|
||||||
if code == 400 and OAUTH_PROVIDER.startswith("google"):
|
|
||||||
try:
|
|
||||||
debug_path = os.path.join(_LOG_DIR, "gemini-last-400-request.json")
|
|
||||||
with open(debug_path, "w", encoding="utf-8") as dbg:
|
|
||||||
json.dump({"endpoint": ep, "model": model, "wrapped": wrapped, "error": err_body}, dbg, indent=2)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
if best_err is None:
|
|
||||||
best_err = (code, err_body)
|
|
||||||
best_ep = ep
|
|
||||||
if best_err:
|
|
||||||
code, err_body = best_err
|
|
||||||
err_class = _classify_antigravity_error(code, err_body)
|
|
||||||
print(f"[{self._session_id}] all endpoints failed: {code} class={err_class}", file=sys.stderr)
|
|
||||||
if err_class in ("quota_exhausted", "rate_limited"):
|
if err_class in ("quota_exhausted", "rate_limited"):
|
||||||
reset_s = _parse_rate_limit_reset(err_body)
|
reset_s = _parse_rate_limit_reset(err_body)
|
||||||
|
if ep == ordered[-1]:
|
||||||
pool = _google_antigravity_pool if OAUTH_PROVIDER == "google-antigravity" else _google_cli_pool
|
pool = _google_antigravity_pool if OAUTH_PROVIDER == "google-antigravity" else _google_cli_pool
|
||||||
_, acct = _get_google_account(OAUTH_PROVIDER)
|
_, acct = _get_google_account(OAUTH_PROVIDER)
|
||||||
if acct:
|
if acct:
|
||||||
cooldown = reset_s if reset_s and reset_s > 10 else 60
|
cooldown = reset_s if reset_s and reset_s > 10 else 60
|
||||||
pool.mark_rate_limited(acct, cooldown)
|
pool.mark_rate_limited(acct, cooldown)
|
||||||
if reset_s:
|
|
||||||
print(f"[{self._session_id}] quota reset in ~{reset_s}s, cooldown={cooldown}s", file=sys.stderr)
|
print(f"[{self._session_id}] quota reset in ~{reset_s}s, cooldown={cooldown}s", file=sys.stderr)
|
||||||
return self.send_json(code, {"error": {"type": "upstream_error", "message": _sanitize_err_body(err_body)}})
|
return self.send_json(e.code, {"error": {"type": "upstream_error", "message": _sanitize_err_body(err_body)}})
|
||||||
|
print(f"[{self._session_id}] {ep.replace('https://','')} 429, trying next", file=sys.stderr)
|
||||||
|
with _antigravity_endpoint_lock:
|
||||||
|
_antigravity_preferred_endpoint = None
|
||||||
|
continue
|
||||||
|
if err_class in ("service_disabled", "forbidden", "account_banned", "validation_required"):
|
||||||
|
if ep == ordered[-1]:
|
||||||
|
return self.send_json(e.code, {"error": {"type": "upstream_error", "message": _sanitize_err_body(err_body)}})
|
||||||
|
continue
|
||||||
|
if ep == ordered[-1]:
|
||||||
|
return self.send_json(e.code, {"error": {"type": "upstream_error", "message": _sanitize_err_body(err_body)}})
|
||||||
|
continue
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[{self._session_id}] {ep.replace('https://','')} conn failed: {e}", file=sys.stderr)
|
||||||
|
if ep == ordered[-1]:
|
||||||
|
return self.send_json(502, {"error": {"type": "proxy_error", "message": str(e)}})
|
||||||
|
continue
|
||||||
|
|
||||||
|
if upstream is None:
|
||||||
return self.send_json(502, {"error": {"type": "proxy_error", "message": "All endpoints failed"}})
|
return self.send_json(502, {"error": {"type": "proxy_error", "message": "All endpoints failed"}})
|
||||||
|
|
||||||
if stream:
|
if stream:
|
||||||
|
|||||||
Reference in New Issue
Block a user