10 Commits

14 changed files with 2512 additions and 40 deletions

View File

@@ -1,5 +1,71 @@
# Changelog
## v3.12.0 (2026-05-27)
**gRPC Auto-Fallback for Antigravity Provider (PR #13)**
### New Features
- **gRPC auto-fallback**: When REST API returns 404 (model not found), automatically retries via gRPC
- **New `antigravity_grpc` module**: Full protobuf client with CloudCode PredictionService stubs
- **Display name remapping**: gRPC uses display names (e.g. "Gemini 3.5 Flash (High)") instead of REST slugs
- **Streaming and unary support**: gRPC fallback works for both streaming and non-streaming requests
- **Dynamic version fetch with validation**: Probes fetched versions to ensure they work before caching
- **Antigravity v2 handler rewrite**: Based on anti-api approach with proper safety settings, stopSequences, sessionId
- **Lazy import**: grpcio is only imported when needed — zero impact if not installed
### Bug Fixes
- Antigravity 404 caused by invalid version — now validates with probe requests
- Version fallback: auto-retries with re-fetched version if all endpoints return 404
## v3.11.12 (2026-05-26)
**New Antigravity v2 Handler (Mimicking anti-api)**
### New Features
- **Complete rewrite of Antigravity handler** based on https://github.com/ink1ing/anti-api approach
- Safety settings (all OFF), stopSequences, sessionId, requestType: agent
- functionResponse uses `response: { result: string }` format matching anti-api
- Endpoint priority: `daily-cloudcode-pa.googleapis.com` first
- Simplified sanitizer: only deduplicates consecutive user text, never touches tool messages
## v3.11.11 (2026-05-26)
## v3.11.11 (2026-05-26)
**Antigravity Fix: Stricter function_call/output Pairing + Gemini Sanitizer Rewrite (PR #12)**
### Bug Fixes
- **Stricter function_call/output pairing**: Only includes pairs where BOTH call and output exist — no orphan calls sent to Gemini
- **Gemini sanitizer rewritten**: Tool messages (`functionCall`/`functionResponse`) are always preserved as-is, never merged or skipped
- **Text merging more conservative**: Checks last message for tool content before merging consecutive text messages
- **Final trimming safe**: Only removes plain `message` items, never `function_call_output` (which would break tool pairs)
- **Merge PR #12**: Fix by qwen-chat coder
## v3.11.10 (2026-05-26)
## v3.11.10 (2026-05-26)
**Antigravity Fix: Interleave function_call/output Pairs, Gemini Turn Trimming (PR #11)**
### Bug Fixes
- **Fix Antigravity function_call/output ordering**: Tool calls and their responses are now properly interleaved in sequence (`function_call``function_call_output``function_call` → ...) instead of being grouped separately
- **Gemini sanitizer trimming**: Leading/trailing non-user turns removed for Google API compliance (Google requires conversation to start and end with user turn)
- **Stricter role boundary enforcement**: `functionCall` (model) and `functionResponse` (user) never merged across role boundaries
- **Merge PR #11**: Fix by qwen-chat coder
## v3.11.9 (2026-05-26)
## v3.11.9 (2026-05-26)
**Antigravity Fix: Preserve functionCall/functionResponse in Gemini Sanitizer (PR #10)**
### Bug Fixes
- **Fix Antigravity multi-turn tool use**: The Gemini message sanitizer was incorrectly merging/dropping `functionCall` and `functionResponse` turns, causing Antigravity to think forever without responding. These turns are now always preserved as separate messages.
- **Merge PR #10**: `fix: preserve functionCall/functionResponse in Gemini sanitizer` (qwen-chat coder)
## v3.11.8 (2026-05-26)
## v3.11.8 (2026-05-26)
**Vision Cache Persistence, PR #8 Merge**

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,24 @@
"""
antigravity_grpc — gRPC fallback client for Google CloudCode (Antigravity).
When the REST API rejects a request (404 model not found, 400 bad request due to
model ID mismatch, etc.), this module provides a gRPC fallback path that uses
Google's native PredictionService protocol — the same one the agy CLI uses.
This module is imported lazily and only when grpcio is installed. If grpcio is
not available, the fallback is silently skipped.
"""
from .client import (
GrpcFallbackResult,
AntigravityGrpcClient,
is_grpc_available,
get_client,
)
__all__ = [
"GrpcFallbackResult",
"AntigravityGrpcClient",
"is_grpc_available",
"get_client",
]

View File

@@ -0,0 +1,609 @@
"""
antigravity_grpc.client — gRPC fallback client for Google CloudCode (Antigravity).
This module provides a gRPC client that can be used as an automatic fallback when
the CloudCode REST API rejects requests. The gRPC path uses the same
PredictionService that the native agy CLI binary uses, giving access to models
that are unavailable via REST (e.g. models that return 404 on REST but work on gRPC).
Key design decisions:
- Lazy import: grpcio is only imported when actually needed. If not installed,
is_grpc_available() returns False and the fallback is silently skipped.
- Zero impact on other providers: this module is only called from
_handle_antigravity_v2() when REST returns a fallback-eligible error.
- Same output format as REST: the client returns structured dicts that match
the SSE/JSON response shapes the proxy already processes.
- Thread-safe: the gRPC channel is created once per endpoint and reused.
Usage from translate-proxy.py:
from antigravity_grpc import is_grpc_available, AntigravityGrpcClient
if is_grpc_available():
client = AntigravityGrpcClient()
result = client.try_generate(request_dict, stream=False)
if result.ok:
# Use result.response_data (dict matching REST response shape)
else:
# gRPC also failed, fall through to error
"""
import json
import os
import sys
import time
import threading
import collections
# ═══════════════════════════════════════════════════════════════════
# Lazy gRPC import — never crash if grpcio is missing
# ═══════════════════════════════════════════════════════════════════
_grpc = None
_pb2 = None
_pb2_grpc = None
_import_error = None
def _try_import():
global _grpc, _pb2, _pb2_grpc, _import_error
if _grpc is not None:
return _grpc is not False
try:
import grpc as _real_grpc
# Import the generated stubs relative to this package
from . import cloudcode_pb2 as _real_pb2
from . import cloudcode_pb2_grpc as _real_pb2_grpc
_grpc = _real_grpc
_pb2 = _real_pb2
_pb2_grpc = _real_pb2_grpc
return True
except Exception as e:
_import_error = str(e)
_grpc = False
return False
def is_grpc_available():
"""Return True if grpcio and the generated stubs are importable."""
return _try_import()
# ═══════════════════════════════════════════════════════════════════
# gRPC endpoints for Antigravity (same hosts, different port/path)
# ═══════════════════════════════════════════════════════════════════
# The CloudCode gRPC service runs on the same hosts as REST but uses
# the gRPC protocol. The agy CLI connects to:
# - cloudcode-pa.googleapis.com:443
# - daily-cloudcode-pa.googleapis.com:443
# - daily-cloudcode-pa.sandbox.googleapis.com:443
_GRPC_ENDPOINTS = [
"daily-cloudcode-pa.googleapis.com:443",
"cloudcode-pa.googleapis.com:443",
]
_ALLOW_STAGING_ENV = "ALLOW_ANTIGRAVITY_STAGING"
# ═══════════════════════════════════════════════════════════════════
# Result type
# ═══════════════════════════════════════════════════════════════════
class GrpcFallbackResult:
"""Result of a gRPC fallback attempt."""
__slots__ = ("ok", "response_data", "stream_chunks", "error_message",
"endpoint_used", "model_used", "elapsed_s")
def __init__(self, ok=False, response_data=None, stream_chunks=None,
error_message="", endpoint_used="", model_used="", elapsed_s=0.0):
self.ok = ok
self.response_data = response_data # dict (non-streaming)
self.stream_chunks = stream_chunks # list[dict] (streaming)
self.error_message = error_message
self.endpoint_used = endpoint_used
self.model_used = model_used
self.elapsed_s = elapsed_s
def __repr__(self):
if self.ok:
if self.stream_chunks is not None:
return f"<GrpcFallbackResult OK stream chunks={len(self.stream_chunks)}>"
return f"<GrpcFallbackResult OK data_keys={list(self.response_data.keys()) if self.response_data else None}>"
return f"<GrpcFallbackResult FAIL error={self.error_message!r}>"
# ═══════════════════════════════════════════════════════════════════
# JSON → Protobuf conversion helpers
# ═══════════════════════════════════════════════════════════════════
def _struct_to_protobuf(d, struct_obj=None):
"""Convert a Python dict to a google.protobuf.Struct."""
from google.protobuf.struct_pb2 import Struct, Value, NullValue, ListValue
if struct_obj is None:
struct_obj = Struct()
if isinstance(d, dict):
for k, v in d.items():
if isinstance(v, str):
struct_obj.fields[k].string_value = v
elif isinstance(v, bool):
struct_obj.fields[k].bool_value = v
elif isinstance(v, int):
struct_obj.fields[k].number_value = float(v)
elif isinstance(v, float):
struct_obj.fields[k].number_value = v
elif isinstance(v, dict):
_struct_to_protobuf(v, struct_obj.fields[k].struct_value)
elif isinstance(v, list):
lst = struct_obj.fields[k].list_value
for item in v:
if isinstance(item, str):
lst.values.add().string_value = item
elif isinstance(item, bool):
lst.values.add().bool_value = item
elif isinstance(item, (int, float)):
lst.values.add().number_value = float(item)
elif isinstance(item, dict):
_struct_to_protobuf(item, lst.values.add().struct_value)
elif item is None:
lst.values.add().null_value = 0
elif v is None:
struct_obj.fields[k].null_value = 0
return struct_obj
def _protobuf_struct_to_dict(struct):
"""Convert a google.protobuf.Struct to a Python dict."""
from google.protobuf.struct_pb2 import Value, NullValue
result = {}
for k, v in struct.fields.items():
kind = v.WhichOneof("kind")
if kind == "null_value":
result[k] = None
elif kind == "number_value":
result[k] = v.number_value
elif kind == "string_value":
result[k] = v.string_value
elif kind == "bool_value":
result[k] = v.bool_value
elif kind == "struct_value":
result[k] = _protobuf_struct_to_dict(v.struct_value)
elif kind == "list_value":
result[k] = [_value_to_python(item) for item in v.list_value.values]
else:
result[k] = None
return result
def _value_to_python(v):
"""Convert a google.protobuf.Value to a Python value."""
kind = v.WhichOneof("kind")
if kind == "null_value":
return None
elif kind == "number_value":
return v.number_value
elif kind == "string_value":
return v.string_value
elif kind == "bool_value":
return v.bool_value
elif kind == "struct_value":
return _protobuf_struct_to_dict(v.struct_value)
elif kind == "list_value":
return [_value_to_python(item) for item in v.list_value.values]
return None
def _json_parts_to_proto(parts_json):
"""Convert a list of JSON content parts to protobuf Part messages."""
result = []
for p in parts_json:
if not isinstance(p, dict):
continue
part = _pb2.Part()
# Thought signature
sig = p.get("thoughtSignature") or p.get("thought_signature")
if sig:
part.thought_signature = sig
if p.get("thought"):
part.thought = True
if "text" in p:
part.text = p["text"]
elif "text" in p and "functionCall" not in p:
part.text = p["text"]
elif "functionCall" in p:
fc = p["functionCall"]
part.function_call.name = fc.get("name", "")
part.function_call.id = fc.get("id", "")
args = fc.get("args", fc.get("arguments", {}))
if isinstance(args, dict):
_struct_to_protobuf(args, part.function_call.args)
elif isinstance(args, str):
try:
_struct_to_protobuf(json.loads(args), part.function_call.args)
except Exception:
pass
elif "functionResponse" in p:
fr = p["functionResponse"]
part.function_response.name = fr.get("name", "")
part.function_response.id = fr.get("id", "")
resp = fr.get("response", {})
if "result" in resp:
result_val = resp["result"]
if isinstance(result_val, (dict, list)):
_struct_to_protobuf({"result": result_val}, part.function_response.response)
else:
_struct_to_protobuf({"result": str(result_val)}, part.function_response.response)
elif isinstance(resp, dict):
_struct_to_protobuf(resp, part.function_response.response)
elif "inlineData" in p:
idata = p["inlineData"]
import base64
part.inline_data.mime_type = idata.get("mimeType", "image/png")
b64data = idata.get("data", "")
part.inline_data.data = base64.b64decode(b64data) if b64data else b""
result.append(part)
return result
def _json_contents_to_proto(contents_json):
"""Convert a list of JSON content objects to protobuf Content messages."""
result = []
for c in contents_json:
if not isinstance(c, dict):
continue
content = _pb2.Content()
content.role = c.get("role", "user")
for part in _json_parts_to_proto(c.get("parts", [])):
content.parts.append(part)
result.append(content)
return result
def _proto_candidate_to_json(candidate):
"""Convert a protobuf Candidate to a JSON-compatible dict."""
content_json = {"role": candidate.content.role, "parts": []}
for part in candidate.content.parts:
p = {}
if part.thought_signature:
p["thoughtSignature"] = part.thought_signature
if part.thought:
p["thought"] = True
if part.text:
p["text"] = part.text
elif part.text and not part.HasField("function_call"):
p["text"] = part.text
elif part.HasField("function_call"):
fc = part.function_call
args_dict = _protobuf_struct_to_dict(fc.args) if fc.HasField("args") else {}
p["functionCall"] = {
"name": fc.name,
"args": args_dict,
"id": fc.id,
}
elif part.HasField("function_response"):
fr = part.function_response
resp_dict = _protobuf_struct_to_dict(fr.response) if fr.HasField("response") else {}
p["functionResponse"] = {
"name": fr.name,
"response": resp_dict,
"id": fr.id,
}
elif part.HasField("inline_data"):
import base64
p["inlineData"] = {
"mimeType": part.inline_data.mime_type,
"data": base64.b64encode(part.inline_data.data).decode(),
}
if p:
content_json["parts"].append(p)
return {
"content": content_json,
"finishReason": candidate.finish_reason,
"index": candidate.index,
}
# ═══════════════════════════════════════════════════════════════════
# Client
# ═══════════════════════════════════════════════════════════════════
class AntigravityGrpcClient:
"""
gRPC fallback client for Google CloudCode Antigravity.
Thread-safe. Channels are cached per endpoint and reused.
"""
def __init__(self):
self._channels = {}
self._stubs = {}
self._lock = threading.Lock()
def _get_channel(self, endpoint):
"""Get or create a gRPC channel for the given endpoint."""
with self._lock:
if endpoint not in self._channels:
# Use secure channel with default SSL credentials
creds = _grpc.ssl_channel_credentials()
channel = _grpc.secure_channel(endpoint, creds)
self._channels[endpoint] = channel
self._stubs[endpoint] = _pb2_grpc.PredictionServiceStub(channel)
return self._channels[endpoint], self._stubs[endpoint]
def _build_request(self, wrapped_dict):
"""
Build a GenerateContentRequest protobuf from the same wrapped dict
that the REST API uses.
wrapped_dict shape:
{
"project": "...",
"model": "...",
"requestType": "agent",
"userAgent": "antigravity/...",
"requestId": "agent-...",
"request": {
"contents": [...],
"systemInstruction": {...},
"generationConfig": {...},
"tools": [...],
"safetySettings": [...],
"toolConfig": {...},
"sessionId": "..."
}
}
"""
req = _pb2.GenerateContentRequest()
req.project = wrapped_dict.get("project", "")
req.model = wrapped_dict.get("model", "")
req.request_type = wrapped_dict.get("requestType", "agent")
req.user_agent = wrapped_dict.get("userAgent", "")
req.request_id = wrapped_dict.get("requestId", "")
inner = wrapped_dict.get("request", {})
# Contents
for c in _json_contents_to_proto(inner.get("contents", [])):
req.request.contents.append(c)
# System instruction
si = inner.get("systemInstruction", {})
if si:
si_parts = si.get("parts", [])
if si.get("role"):
req.request.system_instruction.role = si.get("role", "user")
for part in _json_parts_to_proto(si_parts):
req.request.system_instruction.parts.append(part)
# Generation config
gc = inner.get("generationConfig", {})
if gc:
cfg = req.request.generation_config
if "maxOutputTokens" in gc:
cfg.max_output_tokens = int(gc["maxOutputTokens"])
if "temperature" in gc:
cfg.temperature = float(gc["temperature"])
if "topP" in gc:
cfg.top_p = float(gc["top_p" if "top_p" in gc else "topP"])
for ss in gc.get("stopSequences", []):
cfg.stop_sequences.append(ss)
# Thinking config (Gemini 3 native)
tc = gc.get("thinkingConfig", gc.get("thinking_config"))
if tc:
cfg.thinking_config.include_thoughts = tc.get("includeThoughts", tc.get("include_thoughts", False))
cfg.thinking_config.thinking_budget = int(tc.get("thinkingBudget", tc.get("thinking_budget", 8192)))
# Legacy thinking fields
if "includeThoughts" in gc and not tc:
cfg.thinking_config.include_thoughts = gc["includeThoughts"]
if "thinkingBudget" in gc and not tc:
cfg.thinking_config.thinking_budget = int(gc["thinkingBudget"])
# Tools
for tool_json in inner.get("tools", []):
tool = _pb2.Tool()
for fd_json in tool_json.get("functionDeclarations", []):
fd = tool.function_declarations.add()
fd.name = fd_json.get("name", "")
fd.description = fd_json.get("description", "")
params = fd_json.get("parameters", {})
if isinstance(params, dict) and params:
_struct_to_protobuf(params, fd.parameters)
req.request.tools.append(tool)
# Safety settings
for ss in inner.get("safetySettings", []):
ss_msg = _pb2.SafetySetting()
ss_msg.category = ss.get("category", "")
ss_msg.threshold = ss.get("threshold", "OFF")
req.request.safety_settings.append(ss_msg)
# Tool config
tcfg = inner.get("toolConfig", {})
if tcfg:
fcc = tcfg.get("functionCallingConfig", {})
if fcc:
req.request.tool_config.function_calling_config.mode = fcc.get("mode", "AUTO")
for afn in fcc.get("allowed_function_names", []):
req.request.tool_config.function_calling_config.allowed_function_names.append(afn)
# Session ID
sid = inner.get("sessionId", "")
if sid:
req.request.session_id = sid
return req
def try_generate(self, wrapped_dict, stream=False, access_token="",
timeout_s=180):
"""
Try a gRPC GenerateContent or StreamGenerateContent request.
Args:
wrapped_dict: The same wrapped dict used for REST requests.
stream: If True, use server-streaming RPC.
access_token: OAuth2 Bearer token for authentication.
timeout_s: Request timeout in seconds.
Returns:
GrpcFallbackResult with ok=True if successful.
For non-streaming: result.response_data is a dict matching
the REST JSON response shape.
For streaming: result.stream_chunks is a list of dicts matching
REST SSE chunk shapes.
"""
if not is_grpc_available():
return GrpcFallbackResult(ok=False, error_message="grpcio not installed")
t0 = time.time()
# Build metadata (gRPC uses metadata instead of HTTP headers)
metadata = []
if access_token:
metadata.append(("authorization", f"Bearer {access_token}"))
ua = wrapped_dict.get("userAgent", "")
if ua:
metadata.append(("user-agent", ua))
metadata.append(("x-client-name", "antigravity"))
# Required for Google's gRPC gateway
metadata.append(("x-goog-api-client", "gl-node/18.18.2 fire/0.8.6 grpc/1.10.x"))
# Build endpoints list
endpoints = list(_GRPC_ENDPOINTS)
if os.environ.get(_ALLOW_STAGING_ENV, "0") == "1":
endpoints.append("daily-cloudcode-pa.sandbox.googleapis.com:443")
endpoints.append("autopush-cloudcode-pa.sandbox.googleapis.com:443")
model = wrapped_dict.get("model", "?")
last_error = ""
for ep in endpoints:
try:
channel, stub = self._get_channel(ep)
req = self._build_request(wrapped_dict)
if stream:
return self._do_stream(stub, req, metadata, ep, model,
timeout_s, t0)
else:
return self._do_unary(stub, req, metadata, ep, model,
timeout_s, t0)
except Exception as e:
last_error = str(e)
err_str = last_error.lower()
print(f"[antigravity-grpc] {ep} failed: {last_error[:300]}", file=sys.stderr)
# Don't retry on auth errors
if "unauthenticated" in err_str or "permission" in err_str:
break
# Don't retry on invalid argument (model truly doesn't exist)
if "not_found" in err_str or "not found" in err_str:
break
continue
elapsed = time.time() - t0
return GrpcFallbackResult(
ok=False,
error_message=f"All gRPC endpoints failed: {last_error}",
model_used=model,
elapsed_s=elapsed,
)
def _do_unary(self, stub, req, metadata, endpoint, model, timeout_s, t0):
"""Execute a unary (non-streaming) gRPC call."""
response = stub.GenerateContent(
req,
metadata=metadata,
timeout=timeout_s,
)
elapsed = time.time() - t0
# Convert protobuf response to REST-compatible JSON shape
candidates_json = []
for candidate in response.response.candidates:
candidates_json.append(_proto_candidate_to_json(candidate))
# Match the REST response envelope:
# { "response": { "candidates": [...] } }
rest_shape = {
"response": {
"candidates": candidates_json,
}
}
print(f"[antigravity-grpc] {endpoint} unary OK, candidates={len(candidates_json)}, elapsed={elapsed:.1f}s", file=sys.stderr)
return GrpcFallbackResult(
ok=True,
response_data=rest_shape,
endpoint_used=endpoint,
model_used=model,
elapsed_s=elapsed,
)
def _do_stream(self, stub, req, metadata, endpoint, model, timeout_s, t0):
"""Execute a server-streaming gRPC call."""
chunks = []
chunk_count = 0
response_iter = stub.StreamGenerateContent(
req,
metadata=metadata,
timeout=timeout_s,
)
for chunk_proto in response_iter:
chunk_count += 1
# Each chunk_proto is a StreamGenerateContentChunk
# which wraps a Response with candidates
candidates_json = []
for candidate in chunk_proto.response.candidates:
candidates_json.append(_proto_candidate_to_json(candidate))
# Match REST SSE chunk shape: { "response": { "candidates": [...] } }
chunk_json = {
"response": {
"candidates": candidates_json,
}
}
chunks.append(chunk_json)
elapsed = time.time() - t0
print(f"[antigravity-grpc] {endpoint} stream OK, chunks={chunk_count}, elapsed={elapsed:.1f}s", file=sys.stderr)
return GrpcFallbackResult(
ok=True,
stream_chunks=chunks,
endpoint_used=endpoint,
model_used=model,
elapsed_s=elapsed,
)
def close(self):
"""Close all gRPC channels."""
with self._lock:
for ep, channel in self._channels.items():
try:
channel.close()
except Exception:
pass
self._channels.clear()
self._stubs.clear()
# ═══════════════════════════════════════════════════════════════════
# Module-level singleton
# ═══════════════════════════════════════════════════════════════════
_client = None
_client_lock = threading.Lock()
def get_client():
"""Get the module-level AntigravityGrpcClient singleton."""
global _client
with _client_lock:
if _client is None:
_client = AntigravityGrpcClient()
return _client

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,275 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import warnings
from antigravity_grpc import cloudcode_pb2 as cloudcode__pb2
GRPC_GENERATED_VERSION = '1.80.0'
GRPC_VERSION = grpc.__version__
_version_not_supported = False
try:
from grpc._utilities import first_version_is_lower
_version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION)
except ImportError:
_version_not_supported = True
if _version_not_supported:
raise RuntimeError(
f'The grpc package installed is at version {GRPC_VERSION},'
+ ' but the generated code in cloudcode_pb2_grpc.py depends on'
+ f' grpcio>={GRPC_GENERATED_VERSION}.'
+ f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}'
+ f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.'
)
class PredictionServiceStub(object):
"""─── Service ──────────────────────────────────────────────────────────
"""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.GenerateContent = channel.unary_unary(
'/google.internal.cloud.code.v1internal.PredictionService/GenerateContent',
request_serializer=cloudcode__pb2.GenerateContentRequest.SerializeToString,
response_deserializer=cloudcode__pb2.GenerateContentResponse.FromString,
_registered_method=True)
self.StreamGenerateContent = channel.unary_stream(
'/google.internal.cloud.code.v1internal.PredictionService/StreamGenerateContent',
request_serializer=cloudcode__pb2.GenerateContentRequest.SerializeToString,
response_deserializer=cloudcode__pb2.StreamGenerateContentChunk.FromString,
_registered_method=True)
self.FetchAvailableModels = channel.unary_unary(
'/google.internal.cloud.code.v1internal.PredictionService/FetchAvailableModels',
request_serializer=cloudcode__pb2.FetchAvailableModelsRequest.SerializeToString,
response_deserializer=cloudcode__pb2.FetchAvailableModelsResponse.FromString,
_registered_method=True)
self.CountTokens = channel.unary_unary(
'/google.internal.cloud.code.v1internal.PredictionService/CountTokens',
request_serializer=cloudcode__pb2.CountTokensRequest.SerializeToString,
response_deserializer=cloudcode__pb2.CountTokensResponse.FromString,
_registered_method=True)
self.RetrieveUserQuota = channel.unary_unary(
'/google.internal.cloud.code.v1internal.PredictionService/RetrieveUserQuota',
request_serializer=cloudcode__pb2.RetrieveUserQuotaRequest.SerializeToString,
response_deserializer=cloudcode__pb2.RetrieveUserQuotaResponse.FromString,
_registered_method=True)
class PredictionServiceServicer(object):
"""─── Service ──────────────────────────────────────────────────────────
"""
def GenerateContent(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def StreamGenerateContent(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def FetchAvailableModels(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def CountTokens(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def RetrieveUserQuota(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_PredictionServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
'GenerateContent': grpc.unary_unary_rpc_method_handler(
servicer.GenerateContent,
request_deserializer=cloudcode__pb2.GenerateContentRequest.FromString,
response_serializer=cloudcode__pb2.GenerateContentResponse.SerializeToString,
),
'StreamGenerateContent': grpc.unary_stream_rpc_method_handler(
servicer.StreamGenerateContent,
request_deserializer=cloudcode__pb2.GenerateContentRequest.FromString,
response_serializer=cloudcode__pb2.StreamGenerateContentChunk.SerializeToString,
),
'FetchAvailableModels': grpc.unary_unary_rpc_method_handler(
servicer.FetchAvailableModels,
request_deserializer=cloudcode__pb2.FetchAvailableModelsRequest.FromString,
response_serializer=cloudcode__pb2.FetchAvailableModelsResponse.SerializeToString,
),
'CountTokens': grpc.unary_unary_rpc_method_handler(
servicer.CountTokens,
request_deserializer=cloudcode__pb2.CountTokensRequest.FromString,
response_serializer=cloudcode__pb2.CountTokensResponse.SerializeToString,
),
'RetrieveUserQuota': grpc.unary_unary_rpc_method_handler(
servicer.RetrieveUserQuota,
request_deserializer=cloudcode__pb2.RetrieveUserQuotaRequest.FromString,
response_serializer=cloudcode__pb2.RetrieveUserQuotaResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'google.internal.cloud.code.v1internal.PredictionService', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
server.add_registered_method_handlers('google.internal.cloud.code.v1internal.PredictionService', rpc_method_handlers)
# This class is part of an EXPERIMENTAL API.
class PredictionService(object):
"""─── Service ──────────────────────────────────────────────────────────
"""
@staticmethod
def GenerateContent(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/google.internal.cloud.code.v1internal.PredictionService/GenerateContent',
cloudcode__pb2.GenerateContentRequest.SerializeToString,
cloudcode__pb2.GenerateContentResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def StreamGenerateContent(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_stream(
request,
target,
'/google.internal.cloud.code.v1internal.PredictionService/StreamGenerateContent',
cloudcode__pb2.GenerateContentRequest.SerializeToString,
cloudcode__pb2.StreamGenerateContentChunk.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def FetchAvailableModels(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/google.internal.cloud.code.v1internal.PredictionService/FetchAvailableModels',
cloudcode__pb2.FetchAvailableModelsRequest.SerializeToString,
cloudcode__pb2.FetchAvailableModelsResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def CountTokens(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/google.internal.cloud.code.v1internal.PredictionService/CountTokens',
cloudcode__pb2.CountTokensRequest.SerializeToString,
cloudcode__pb2.CountTokensResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def RetrieveUserQuota(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/google.internal.cloud.code.v1internal.PredictionService/RetrieveUserQuota',
cloudcode__pb2.RetrieveUserQuotaRequest.SerializeToString,
cloudcode__pb2.RetrieveUserQuotaResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)

View File

@@ -0,0 +1,183 @@
// Copyright 2026 Codex Launcher Contributors
// SPDX-License-Identifier: MIT
//
// CloudCode internal gRPC service definitions.
// Reverse-engineered from the agy-core binary for Antigravity proxy fallback.
// Service: google.internal.cloud.code.v1internal.PredictionService
//
// NOTE: google/api/annotations.proto is NOT imported here because it conflicts
// with the google namespace package at runtime. The HTTP annotations are only
// needed for Google's Envoy/gRPC-gateway and are unnecessary for our client.
syntax = "proto3";
package google.internal.cloud.code.v1internal;
import "google/protobuf/struct.proto";
option go_package = "google.golang.org/internal/cloud/code/v1internal";
// ─── Reused message types ───────────────────────────────────────────
message Content {
string role = 1;
repeated Part parts = 2;
}
message Part {
oneof data {
string text = 1;
InlineData inline_data = 2;
FunctionCall function_call = 3;
FunctionResponse function_response = 4;
}
// Thought signature for Gemini continuity
string thought_signature = 10;
// Thought part (reasoning)
bool thought = 11;
}
message InlineData {
string mime_type = 1;
bytes data = 2;
}
message FunctionCall {
string name = 1;
google.protobuf.Struct args = 2;
string id = 3;
}
message FunctionResponse {
string name = 1;
google.protobuf.Struct response = 2;
string id = 3;
}
message SafetySetting {
string category = 1;
string threshold = 2;
}
message GenerationConfig {
int32 max_output_tokens = 1;
float temperature = 2;
float top_p = 3;
int32 thinking_budget = 4;
bool include_thoughts = 5;
repeated string stop_sequences = 6;
message ThinkingConfig {
bool include_thoughts = 1;
int32 thinking_budget = 2;
}
ThinkingConfig thinking_config = 7;
}
message Tool {
repeated FunctionDeclaration function_declarations = 1;
}
message FunctionDeclaration {
string name = 1;
string description = 2;
google.protobuf.Struct parameters = 3;
}
message ToolConfig {
message FunctionCallingConfig {
string mode = 1; // "AUTO", "ANY", "NONE", "VALIDATED"
repeated string allowed_function_names = 2;
}
FunctionCallingConfig function_calling_config = 1;
}
message Candidate {
Content content = 1;
string finish_reason = 2;
int32 index = 3;
}
// ─── GenerateContent ─────────────────────────────────────────────────
message GenerateContentRequest {
string project = 1;
string model = 2;
string request_type = 3;
string user_agent = 4;
string request_id = 5;
message InnerRequest {
repeated Content contents = 1;
Content system_instruction = 2;
GenerationConfig generation_config = 3;
repeated Tool tools = 4;
repeated SafetySetting safety_settings = 5;
ToolConfig tool_config = 6;
string session_id = 7;
}
InnerRequest request = 10;
}
message GenerateContentResponse {
message Response {
repeated Candidate candidates = 1;
}
Response response = 1;
}
// ─── StreamGenerateContent ────────────────────────────────────────────
message StreamGenerateContentChunk {
GenerateContentResponse.Response response = 1;
}
// ─── FetchAvailableModels ────────────────────────────────────────────
message FetchAvailableModelsRequest {
string project = 1;
}
message FetchAvailableModelsResponse {
message ModelInfo {
string name = 1;
string display_name = 2;
string description = 3;
int64 context_window = 4;
}
repeated ModelInfo models = 1;
}
// ─── CountTokens ──────────────────────────────────────────────────────
message CountTokensRequest {
string project = 1;
string model = 2;
repeated Content contents = 3;
}
message CountTokensResponse {
int32 total_tokens = 1;
}
// ─── RetrieveUserQuota ───────────────────────────────────────────────
message RetrieveUserQuotaRequest {
string project = 1;
}
message RetrieveUserQuotaResponse {
int64 daily_limit = 1;
int64 daily_usage = 2;
int64 daily_remaining = 3;
}
// ─── Service ──────────────────────────────────────────────────────────
service PredictionService {
rpc GenerateContent(GenerateContentRequest) returns (GenerateContentResponse);
rpc StreamGenerateContent(GenerateContentRequest) returns (stream StreamGenerateContentChunk);
rpc FetchAvailableModels(FetchAvailableModelsRequest) returns (FetchAvailableModelsResponse);
rpc CountTokens(CountTokensRequest) returns (CountTokensResponse);
rpc RetrieveUserQuota(RetrieveUserQuotaRequest) returns (RetrieveUserQuotaResponse);
}

View File

@@ -0,0 +1,14 @@
// Minimal google/api/annotations.proto for code generation.
syntax = "proto3";
package google.api;
import "google/api/http.proto";
import "google/protobuf/descriptor.proto";
option go_package = "google.golang.org/genproto/googleapis/api/annotations";
extend google.protobuf.MethodOptions {
HttpRule http = 72295728;
}

View File

@@ -0,0 +1,18 @@
// Minimal google/api/http.proto for code generation.
syntax = "proto3";
package google.api;
option go_package = "google.golang.org/genproto/googleapis/api/annotations";
message HttpRule {
string get = 1;
string put = 2;
string post = 3;
string delete = 4;
string patch = 5;
repeated HttpRule additional_bindings = 11;
string body = 7;
string response_body = 12;
}

View File

@@ -27,6 +27,24 @@ model_catalog_json = ""
"""
CHANGELOG = [
("3.12.1", "2026-05-27", [
"Fix Antigravity adapter (PR #15): simplified model resolution",
"Removed broken schema sanitization, restored headers",
"Re-enabled gRPC fallback by default",
]),
("3.12.0", "2026-05-27", [
"gRPC auto-fallback for Antigravity (PR #13)",
"Dynamic version fetch with probe validation",
"Antigravity v2 handler rewrite (anti-api)",
]),
("3.11.10", "2026-05-26", [
"Fix Antigravity: interleave function_call/output pairs (PR #11)",
"Gemini sanitizer: trim non-user turns for Google API compliance",
]),
("3.11.9", "2026-05-26", [
"Fix Antigravity: preserve functionCall/functionResponse (PR #10)",
"Prevents tool responses from being dropped in multi-turn sessions",
]),
("3.11.8", "2026-05-26", [
"Vision cache persisted across requests (PR #8 merge)",
"No redundant vision API calls for same image URL",

View File

@@ -83,6 +83,33 @@ model_catalog_json = ""
"""
CHANGELOG = [
("3.12.1", "2026-05-27", [
"Fix Antigravity adapter (PR #15): simplify model resolution",
"Removed broken schema sanitization, restored correct headers",
"Expanded model alias map for all Antigravity variants",
"Re-enabled gRPC fallback by default",
]),
("3.12.0", "2026-05-27", [
"gRPC auto-fallback for Antigravity provider (PR #13)",
"New antigravity_grpc module with protobuf client",
"REST 404 triggers gRPC fallback using display names",
"gRPC supports streaming and unary generate",
"Dynamic version fetch with probe validation",
"Antigravity v2 handler rewrite (anti-api approach)",
"Safety settings, stopSequences, sessionId, requestType: agent",
]),
("3.11.11", "2026-05-26", [
"Final trimming only removes plain messages, never function_call_output",
]),
("3.11.10", "2026-05-26", [
"Fix Antigravity: interleave function_call/output pairs in correct sequence (PR #11)",
"Fix Gemini sanitizer: trim leading/trailing non-user turns for Google API compliance",
"Stricter function call/response isolation — no merging across role boundaries",
]),
("3.11.9", "2026-05-26", [
"Fix Antigravity: preserve functionCall/functionResponse in Gemini sanitizer (PR #10)",
"Prevents tool responses from being merged/dropped in multi-turn Antigravity sessions",
]),
("3.11.8", "2026-05-26", [
"Vision description cache persisted across requests (no redundant API calls for same image)",
"Merge PR #8: fix vision cache persistence across requests",

View File

@@ -165,6 +165,56 @@ import tempfile
_IS_WINDOWS = sys.platform == "win32"
# ═══════════════════════════════════════════════════════════════════
# Lazy gRPC import for Antigravity fallback
# ═══════════════════════════════════════════════════════════════════
_antigravity_grpc_client = None
_antigravity_grpc_available = None
def _get_grpc_client():
"""Lazy-load the Antigravity gRPC client. Returns None if grpcio is not installed."""
global _antigravity_grpc_client, _antigravity_grpc_available
if _antigravity_grpc_available is False:
return None
if _antigravity_grpc_client is not None:
return _antigravity_grpc_client
try:
# Add the src directory to sys.path so antigravity_grpc package is found
_src_dir = os.path.dirname(os.path.abspath(__file__))
if _src_dir not in sys.path:
sys.path.insert(0, _src_dir)
from antigravity_grpc import is_grpc_available, AntigravityGrpcClient, get_client
if is_grpc_available():
_antigravity_grpc_client = get_client()
_antigravity_grpc_available = True
print("[antigravity-grpc] gRPC fallback module loaded OK", file=sys.stderr)
return _antigravity_grpc_client
else:
_antigravity_grpc_available = False
print("[antigravity-grpc] grpcio available but stubs failed to load, gRPC fallback disabled", file=sys.stderr)
return None
except ImportError as e:
_antigravity_grpc_available = False
print(f"[antigravity-grpc] grpcio not installed ({e}), gRPC fallback disabled", file=sys.stderr)
return None
# Reverse alias map: REST slug → gRPC display name
# gRPC uses display names (e.g. "Gemini 3.5 Flash (High)") while REST uses slugs (e.g. "gemini-3-flash")
_GRPC_REVERSE_ALIAS = {
"gemini-3-flash": "Gemini 3.5 Flash (High)",
"gemini-3.5-flash-low": "Gemini 3.5 Flash (Low)",
"gemini-3.1-pro-low": "Gemini 3.1 Pro (High)",
"claude-sonnet-4-6": "Claude Sonnet 4.6 (Thinking)",
"claude-opus-4-6-thinking": "Claude Opus 4.6 (Thinking)",
"gpt-oss-120b-medium": "GPT-OSS 120B (Medium)",
"gemini-2.5-flash": "Gemini 2.5 Flash",
"gemini-2.5-pro": "Gemini 2.5 Pro",
"gemini-2.5-flash-lite": "Gemini 2.5 Flash Lite",
}
# Errors from REST that should trigger gRPC fallback
_GRPC_FALLBACK_REST_ERRORS = {404} # Model not found via REST (model exists in gRPC but not REST)
# ═══════════════════════════════════════════════════════════════════
# Config
# ═══════════════════════════════════════════════════════════════════
@@ -320,9 +370,10 @@ _active_requests = {}
_active_requests_lock = threading.Lock()
_pool = uuid.uuid4().hex[:8]
_antigravity_version = "1.18.3"
_antigravity_version = "2.0.1"
_antigravity_version_checked = 0
_antigravity_version_lock = threading.Lock()
_antigravity_version_validated = False
_last_user_urls = collections.deque(maxlen=20)
_conn_pool_lock = threading.Lock()
@@ -798,49 +849,137 @@ _ANTIGRAVITY_LOOP_TRACKER_LOCK = threading.Lock()
def _antigravity_loop_key(session_id):
return f"ag:{session_id}"
def _validate_antigravity_version(version, access_token=None, project_id=None):
if not version or not re.match(r"^\d+\.\d+\.\d+$", version):
return False
try:
if not access_token:
access_token = _refresh_oauth_token()
if not project_id:
token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", "google-antigravity-oauth-token.json")
try:
with open(token_path) as f:
project_id = json.load(f).get("project_id", "")
except Exception:
pass
if not access_token or not project_id:
return True
import platform as _plat
_os_name = _plat.system().lower()
_os_arch = _plat.machine().lower().replace("x86_64", "x64").replace("aarch64", "arm64")
ua = f"antigravity/{version} {_os_name}/{_os_arch}"
body = {
"project": project_id,
"model": "gemini-3-flash",
"requestType": "agent",
"userAgent": ua,
"requestId": f"probe-{uuid.uuid4().hex[:8]}",
"request": {
"contents": [{"role": "user", "parts": [{"text": "hi"}]}],
"sessionId": f"probe{int(time.time()*1000)}",
"safetySettings": [
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "OFF"},
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "OFF"},
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "OFF"},
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "OFF"},
{"category": "HARM_CATEGORY_CIVIC_INTEGRITY", "threshold": "OFF"},
],
"generationConfig": {"maxOutputTokens": 32, "stopSequences": ["\n\nHuman:", "[DONE]"]},
}
}
url = "https://daily-cloudcode-pa.googleapis.com/v1internal:streamGenerateContent?alt=sse"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {access_token}",
"User-Agent": ua,
}
req = urllib.request.Request(url, data=json.dumps(body).encode(), headers=headers)
resp = urllib.request.urlopen(req, timeout=15)
data = resp.read().decode()
if "no longer supported" in data.lower():
print(f"[antigravity-version] version {version} rejected (deprecated)", file=sys.stderr)
return False
return True
except urllib.error.HTTPError as e:
if e.code == 404:
print(f"[antigravity-version] version {version} rejected (404)", file=sys.stderr)
return False
return True
except Exception as e:
print(f"[antigravity-version] probe error for {version}: {e}", file=sys.stderr)
return True
def _fetch_antigravity_version():
cache_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", "antigravity-version.json")
try:
with open(cache_path) as f:
cached = json.load(f)
if cached.get("version") and cached.get("checked_at", 0) > time.time() - 6 * 3600:
if cached.get("version") and cached.get("validated") and cached.get("checked_at", 0) > time.time() - 6 * 3600:
return cached["version"]
except Exception:
pass
urls = [
access_token = None
project_id = None
try:
access_token = _refresh_oauth_token()
token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", "google-antigravity-oauth-token.json")
with open(token_path) as f:
project_id = json.load(f).get("project_id", "")
except Exception:
pass
sources = [
("https://antigravity-auto-updater-974169037036.us-central1.run.app", None),
("https://antigravity.google/changelog", 5000),
]
for url, limit in urls:
candidates = []
for url, limit in sources:
try:
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
resp = urllib.request.urlopen(req, timeout=5)
text = resp.read().decode(errors="replace")
if limit:
text = text[:limit]
m = re.search(r"\d+\.\d+\.\d+", text)
if m:
version = m.group(0)
try:
os.makedirs(os.path.dirname(cache_path), exist_ok=True)
with open(cache_path, "w", encoding="utf-8") as f:
json.dump({"version": version, "checked_at": time.time()}, f)
except Exception:
pass
return version
for m in re.finditer(r"\d+\.\d+\.\d+", text):
ver = m.group(0)
if ver not in candidates:
candidates.append(ver)
except Exception:
pass
return _antigravity_version
for ver in candidates:
if _validate_antigravity_version(ver, access_token, project_id):
print(f"[antigravity-version] fetched version {ver} validated", file=sys.stderr)
try:
os.makedirs(os.path.dirname(cache_path), exist_ok=True)
with open(cache_path, "w", encoding="utf-8") as f:
json.dump({"version": ver, "validated": True, "checked_at": time.time()}, f)
except Exception:
pass
return ver
fallback = "2.0.1"
print(f"[antigravity-version] all candidates failed, using fallback {fallback}", file=sys.stderr)
try:
os.makedirs(os.path.dirname(cache_path), exist_ok=True)
with open(cache_path, "w", encoding="utf-8") as f:
json.dump({"version": fallback, "validated": False, "checked_at": time.time()}, f)
except Exception:
pass
return fallback
def _ensure_antigravity_version():
global _antigravity_version, _antigravity_version_checked
if time.time() - _antigravity_version_checked < 6 * 3600:
global _antigravity_version, _antigravity_version_checked, _antigravity_version_validated
if _antigravity_version_validated and time.time() - _antigravity_version_checked < 6 * 3600:
return _antigravity_version
with _antigravity_version_lock:
if time.time() - _antigravity_version_checked < 6 * 3600:
if _antigravity_version_validated and time.time() - _antigravity_version_checked < 6 * 3600:
return _antigravity_version
_antigravity_version = _fetch_antigravity_version()
_antigravity_version_checked = time.time()
_antigravity_version_validated = True
return _antigravity_version
_antigravity_client_version = "1.110.0"
@@ -4841,6 +4980,14 @@ def _antigravity_is_simple_user(text):
return False
def _antigravity_normalize_context(input_data, model=""):
"""
Normalize context for Antigravity while PRESERVING function_call -> function_call_output pairs.
Google's Gemini API requires STRICT alternation:
- functionCall (role=model) MUST be immediately followed by functionResponse (role=user)
This function compacts old history but NEVER breaks tool call/response pairs.
"""
if not isinstance(input_data, list) or len(input_data) < 2:
return input_data
is_claude_model = "claude" in model.lower()
@@ -4889,7 +5036,7 @@ def _antigravity_normalize_context(input_data, model=""):
dev_messages = []
recent_items = []
tool_outputs = []
other_items = []
tool_calls = []
for i, item in enumerate(input_data):
if not isinstance(item, dict):
@@ -4899,8 +5046,8 @@ def _antigravity_normalize_context(input_data, model=""):
dev_messages.append(item)
elif t == "function_call_output":
tool_outputs.append((i, item))
elif t in ("function_call",):
other_items.append((i, item))
elif t == "function_call":
tool_calls.append((i, item))
elif t == "message":
recent_items.append((i, item))
@@ -4946,18 +5093,14 @@ def _antigravity_normalize_context(input_data, model=""):
deduped_tail.append((idx, msg_item))
recent_tail = deduped_tail if deduped_tail else recent_tail
tool_call_ids = set()
for _, t_item in kept_tools:
cid = t_item.get("call_id", t_item.get("id", ""))
# Build call_id -> function_call mapping
tool_call_map = {}
for _, call_item in tool_calls:
cid = call_item.get("call_id", call_item.get("id", ""))
if cid:
tool_call_ids.add(cid)
paired_calls = []
for idx, item in other_items:
cid = item.get("call_id", item.get("id", ""))
if cid in tool_call_ids:
paired_calls.append((idx, item))
tool_call_map[cid] = call_item
# Build result: maintain PAIRED sequence (function_call -> function_call_output)
result = list(dev_messages)
compaction_summaries = []
@@ -4973,11 +5116,22 @@ def _antigravity_normalize_context(input_data, model=""):
summary_text = f"[Tool history summary: {n_summarized} older tool outputs omitted. {n_tool_calls} prior function calls were made for file inspection/editing.]"
result.append({"type": "message", "role": "user", "content": [{"type": "input_text", "text": summary_text}]})
for _, call_item in paired_calls:
result.append(call_item)
# CRITICAL: Add tool CALLS and their corresponding OUTPUTS in PAIRED ORDER
# Only include pairs where BOTH call and output are present
added_pairs = set()
for _, tool_item in kept_tools:
result.append(tool_item)
cid = tool_item.get("call_id", tool_item.get("id", ""))
if cid and cid in tool_call_map and cid not in added_pairs:
# Add function_call FIRST, then function_call_output IMMEDIATELY
result.append(tool_call_map[cid])
result.append(tool_item)
added_pairs.add(cid)
# Add any orphan tool outputs (no matching call found) - these go at the end before messages
for _, tool_item in kept_tools:
cid = tool_item.get("call_id", tool_item.get("id", ""))
if cid not in added_pairs:
result.append(tool_item)
for cs_item in compaction_summaries:
result.append(cs_item)
@@ -5017,7 +5171,7 @@ def _antigravity_normalize_context(input_data, model=""):
while len(result) > _ANTIGRAVITY_MAX_CONTENTS and total_chars > _ANTIGRAVITY_SOFT_CHARS:
for i in range(1, len(result) - 1):
if isinstance(result[i], dict) and result[i].get("type") in ("message", "function_call_output"):
if isinstance(result[i], dict) and result[i].get("type") in ("message",):
removed = result.pop(i)
total_chars -= len(json.dumps(removed, ensure_ascii=False))
break
@@ -5197,7 +5351,10 @@ class Handler(http.server.BaseHTTPRequestHandler):
elif BACKEND in ("codebuff", "freebuff"):
self._handle_codebuff(body, model, stream, tracker)
elif (BACKEND or "").startswith("gemini-oauth"):
self._handle_gemini_oauth(body, model, stream, tracker)
if OAUTH_PROVIDER == "google-antigravity":
self._handle_antigravity_v2(body, model, stream, tracker)
else:
self._handle_gemini_oauth(body, model, stream, tracker)
else:
self._handle_openai_compat(body, model, stream, tracker)
update_snapshot_response(request_id, "completed", time.time() - _req_t0)
@@ -5386,6 +5543,575 @@ class Handler(http.server.BaseHTTPRequestHandler):
chat_body["reasoning_effort"] = REASONING_EFFORT
return chat_body
def _handle_antigravity_v2(self, body, model, stream, tracker=None):
input_data = body.get("input", "")
_schema = _load_schema(model=model)
if _schema and not _schema.supports_vision:
input_data = _preprocess_vision_input(input_data, _schema)
body = dict(body)
body["input"] = input_data
if isinstance(input_data, list) and len(input_data) > 30:
input_data = _antigravity_normalize_context(input_data, model)
body = dict(body)
body["input"] = input_data
access_token = _refresh_oauth_token()
token_path = os.path.join(os.path.expanduser("~"), ".cache", "codex-proxy", "google-antigravity-oauth-token.json")
project_id = ""
try:
with open(token_path) as f:
project_id = json.load(f).get("project_id", "")
except Exception:
pass
tool_call_names = {}
contents = []
if isinstance(input_data, list):
for item in input_data:
t = item.get("type")
if t == "message":
role = "user" if item.get("role") == "user" else "model"
content = item.get("content", "")
parts = []
if isinstance(content, list):
for c in content:
ct = c.get("type")
if ct in ("input_text", "text"):
parts.append({"text": c.get("text", "")})
elif ct in ("input_image", "image_url"):
iu = c.get("image_url") or c.get("url", {})
url = iu.get("url", iu) if isinstance(iu, dict) else iu
if isinstance(url, str) and url.startswith("data:"):
mime, _, b64 = url.partition(";base64,")
mime = mime.replace("data:", "") or "image/png"
parts.append({"inlineData": {"mimeType": mime, "data": b64}})
else:
parts.append({"text": str(url)})
elif isinstance(content, str):
parts.append({"text": content})
if parts:
contents.append({"role": role, "parts": parts})
elif t == "function_call":
call_id = item.get("call_id") or item.get("id") or f"call_{uuid.uuid4().hex[:24]}"
fname = item.get("name", "")
if call_id and fname:
tool_call_names[call_id] = fname
args = item.get("arguments", "{}")
if isinstance(args, str):
try:
args = json.loads(args)
except Exception:
args = {}
fc_part = {"functionCall": {"name": fname, "args": args, "id": call_id}}
stored_sig = _gemini_get_sig(f"fc:{call_id}") or _gemini_get_sig(f"fc:{fname}")
if stored_sig:
fc_part["thoughtSignature"] = stored_sig
fc_part["thought_signature"] = stored_sig
else:
fc_part["thought_signature"] = "skip_thought_signature_validator"
contents.append({"role": "model", "parts": [fc_part]})
elif t == "function_call_output":
call_id = item.get("call_id", item.get("id", ""))
output = item.get("output", "")
fname = item.get("name", "") or tool_call_names.get(call_id, "")
resp_part = {"functionResponse": {"name": fname or "unknown", "response": {"result": str(output)}}}
if call_id:
resp_part["functionResponse"]["id"] = call_id
contents.append({"role": "user", "parts": [resp_part]})
sanitized = []
last_user_text = None
last_role = None
for content in contents:
role = content.get("role")
parts = [p for p in content.get("parts", []) if isinstance(p, dict)]
if not parts:
continue
has_function_call = any("functionCall" in p for p in parts)
has_function_response = any("functionResponse" in p for p in parts)
text_key = "\n".join([p.get("text", "") for p in parts if "text" in p]).strip()
if has_function_call or has_function_response:
sanitized.append({"role": role, "parts": parts})
last_role = role
continue
if role == "user" and text_key and text_key == last_user_text:
continue
if role == last_role and role in ("user", "model") and sanitized:
last_parts = sanitized[-1].get("parts", [])
last_has_tool = any("functionCall" in p or "functionResponse" in p for p in last_parts)
if not last_has_tool:
sanitized[-1].setdefault("parts", []).extend(parts)
if role == "user" and text_key:
last_user_text = text_key
last_role = role
continue
sanitized.append({"role": role, "parts": parts})
if role == "user" and text_key:
last_user_text = text_key
last_role = role
while sanitized and sanitized[0].get("role") != "user":
sanitized.pop(0)
contents = sanitized
instructions = body.get("instructions", "").strip()
ag_identity = "You are Antigravity, a powerful agentic AI coding assistant designed by the Google Deepmind team working on Advanced Agentic Coding.\nYou are pair programming with a USER to solve their coding task. The task may require creating a new codebase, modifying or debugging an existing codebase, or simply answering a question.\n**Absolute paths only**\n**Proactiveness**"
system_parts = [{"text": ag_identity}, {"text": "\n--- [SYSTEM_PROMPT_END] ---"}]
if instructions:
system_parts.append({"text": instructions})
gen_config = {"maxOutputTokens": body.get("max_output_tokens", 64000), "stopSequences": ["\n\nHuman:", "[DONE]"]}
if body.get("temperature") is not None:
gen_config["temperature"] = body["temperature"]
if body.get("top_p") is not None:
gen_config["topP"] = body["top_p"]
_is_claude_model = "claude" in model.lower()
_is_claude_thinking = _is_claude_model and "thinking" in model.lower()
if REASONING_ENABLED and REASONING_EFFORT != "none":
if _is_claude_thinking:
budget = {"low": 8192, "medium": 16384, "high": 32768}.get(REASONING_EFFORT, 16384)
gen_config["thinkingConfig"] = {"include_thoughts": True, "thinking_budget": budget}
if gen_config.get("maxOutputTokens", 0) <= budget:
gen_config["maxOutputTokens"] = 64000
elif not _is_claude_model:
budget = {"low": 2048, "medium": 8192, "high": 24576}.get(REASONING_EFFORT, 8192)
gen_config["thinkingConfig"] = {"includeThoughts": True, "thinkingBudget": budget}
oa_tools = body.get("tools", [])
gemini_tools = []
if oa_tools:
func_decls = []
for tool in oa_tools:
ttype = tool.get("type", "function")
fname = tool.get("name", "")
if ttype == "function":
fn = tool.get("function", tool)
name = fn.get("name", fname)
desc = fn.get("description", "")
params = fn.get("parameters", fn.get("input_schema", {}))
func_decls.append({"name": name, "description": desc, "parameters": params})
elif fname:
func_decls.append({"name": fname, "description": tool.get("description", ""), "parameters": tool.get("parameters", {"type": "object", "properties": {}})})
if func_decls:
gemini_tools = [{"functionDeclarations": func_decls}]
contents = _gemini_reattach_sigs(contents)
ag_key = _antigravity_loop_key(self._session_id)
with _ANTIGRAVITY_LOOP_TRACKER_LOCK:
if ag_key not in _ANTIGRAVITY_LOOP_TRACKER:
_ANTIGRAVITY_LOOP_TRACKER[ag_key] = {
"latest_user_hash": None, "nudge_injected": False, "latest_user_appended": False,
"tool_calls_for_request": 0, "repeated_tool": False, "force_finalize": False,
"last_tool": None, "last_tool_count": 0,
}
ag_state = _ANTIGRAVITY_LOOP_TRACKER[ag_key]
latest_user = ""
if isinstance(input_data, list):
for item in reversed(input_data):
if item.get("type") == "message" and item.get("role") == "user":
c = item.get("content", "")
if isinstance(c, str):
latest_user = c
elif isinstance(c, list):
latest_user = "\n".join(p.get("text", p.get("input_text", "")) for p in c if isinstance(p, dict))
break
if latest_user:
latest_norm = " ".join(latest_user.strip().split())[:200]
latest_user_hash = hashlib.sha256(latest_norm.encode()).hexdigest()[:16]
if latest_user_hash != ag_state.get("latest_user_hash"):
ag_state["latest_user_hash"] = latest_user_hash
ag_state["nudge_injected"] = False
ag_state["latest_user_appended"] = False
ag_state["tool_calls_for_request"] = 0
ag_state["repeated_tool"] = False
ag_state["force_finalize"] = False
ag_state["last_tool"] = None
ag_state["last_tool_count"] = 0
n_tool_calls = sum(1 for it in input_data if isinstance(it, dict) and it.get("type") == "function_call")
ag_state["tool_calls_for_request"] = n_tool_calls
last_tool_key = None
for item in reversed(input_data):
if isinstance(item, dict) and item.get("type") == "function_call":
fname = item.get("name", "")
args_str = json.dumps(item.get("arguments", {}), sort_keys=True)[:100]
last_tool_key = f"{fname}:{args_str}"
break
if last_tool_key:
if last_tool_key == ag_state.get("last_tool"):
ag_state["last_tool_count"] = ag_state.get("last_tool_count", 0) + 1
if ag_state["last_tool_count"] >= 5:
ag_state["repeated_tool"] = True
ag_state["force_finalize"] = True
else:
ag_state["last_tool"] = last_tool_key
ag_state["last_tool_count"] = 1
if ag_state.get("force_finalize"):
contents.append({"role": "user", "parts": [{"text": "STOP CALLING TOOLS. APPLY THE FINAL EDIT OR SUMMARIZE WHAT BLOCKED YOU. DO NOT CALL ANY MORE TOOLS."}]})
if not _antigravity_is_simple_user(latest_user):
contents.insert(0, {"role": "user", "parts": [{"text": _GEMINI_AGENT_GUARDRAIL}]})
request_body = {"contents": contents, "safetySettings": [
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "OFF"},
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "OFF"},
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "OFF"},
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "OFF"},
{"category": "HARM_CATEGORY_CIVIC_INTEGRITY", "threshold": "OFF"},
]}
request_body["systemInstruction"] = {"role": "user", "parts": system_parts}
if gen_config:
request_body["generationConfig"] = gen_config
if gemini_tools:
request_body["tools"] = gemini_tools
if _is_claude_model and gemini_tools:
request_body["toolConfig"] = {"functionCallingConfig": {"mode": "VALIDATED"}}
import platform as _plat
_os_name = _plat.system().lower()
_os_arch = _plat.machine().lower().replace("x86_64", "x64").replace("aarch64", "arm64")
_fetched_ver = _ensure_antigravity_version()
_ag_ua = f"antigravity/{_fetched_ver} {_os_name}/{_os_arch}"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {access_token}",
"User-Agent": _ag_ua,
"X-Client-Name": "antigravity",
"X-Client-Version": _ensure_antigravity_client_version(),
"x-goog-api-client": "gl-node/18.18.2 fire/0.8.6 grpc/1.10.x",
}
wrapped = {
"project": project_id,
"model": model,
"requestType": "agent",
"userAgent": _ag_ua,
"requestId": f"agent-{uuid.uuid4().hex[:12]}",
"request": request_body,
}
wrapped["request"]["sessionId"] = f"{uuid.uuid4().hex}{int(time.time()*1000)}"
_allow_staging = os.environ.get("ALLOW_ANTIGRAVITY_STAGING", "0") == "1"
_antigravity_endpoints = [
"https://daily-cloudcode-pa.googleapis.com",
"https://daily-cloudcode-pa.sandbox.googleapis.com",
"https://cloudcode-pa.googleapis.com",
]
if _allow_staging:
_antigravity_endpoints.append("https://autopush-cloudcode-pa.sandbox.googleapis.com")
body_b = json.dumps(wrapped).encode()
print(f"[{self._session_id}] [antigravity-v2] model={model} stream={stream} contents={len(contents)} tools={bool(gemini_tools)} project={project_id} ver={_fetched_ver}", file=sys.stderr)
try:
debug_path = os.path.join(_LOG_DIR, f"antigravity-v2-request-{self._session_id}.json")
with open(debug_path, "w") as dbg:
json.dump(wrapped, dbg, indent=2)
except Exception:
pass
upstream = None
chosen_ep = None
global _antigravity_preferred_endpoint
with _antigravity_endpoint_lock:
_pref = _antigravity_preferred_endpoint
ordered = ([_pref] + [e for e in _antigravity_endpoints if e != _pref]) if _pref and _pref in _antigravity_endpoints else list(_antigravity_endpoints)
_all_404 = True
for ep in ordered:
action = "streamGenerateContent" if stream else "generateContent"
url_suffix = f"v1internal:{action}?alt=sse" if stream else f"v1internal:{action}"
target = f"{ep}/{url_suffix}"
req = urllib.request.Request(target, data=body_b, headers=headers)
try:
upstream = urllib.request.urlopen(req, timeout=_upstream_timeout(body, stream))
chosen_ep = ep
_all_404 = False
with _antigravity_endpoint_lock:
_antigravity_preferred_endpoint = ep
break
except urllib.error.HTTPError as e:
err_body = e.read().decode()
err_class = _classify_antigravity_error(e.code, err_body)
print(f"[{self._session_id}] [antigravity-v2] {ep.replace('https://','')} {e.code} class={err_class} body={err_body[:300]}", file=sys.stderr)
if e.code != 404:
_all_404 = False
if e.code in (400, 404):
try:
debug_path = os.path.join(_LOG_DIR, f"antigravity-v2-{e.code}.json")
with open(debug_path, "w") as dbg:
json.dump({"endpoint": ep, "url": target, "model": model, "wrapped": wrapped, "error": err_body}, dbg, indent=2)
except Exception:
pass
if e.code == 400:
return self.send_json(e.code, {"error": {"type": "upstream_error", "message": _sanitize_err_body(err_body)}})
if err_class in ("auth_permanent", "service_disabled", "forbidden", "account_banned", "validation_required"):
return self.send_json(e.code, {"error": {"type": "upstream_error", "message": _sanitize_err_body(err_body)}})
if err_class in ("quota_exhausted", "rate_limited"):
pool = _google_antigravity_pool
_, acct = _get_google_account(OAUTH_PROVIDER)
if acct:
reset_s = _parse_rate_limit_reset(err_body)
cooldown = reset_s if reset_s and reset_s > 10 else 60
pool.mark_rate_limited(acct, cooldown)
return self.send_json(e.code, {"error": {"type": "upstream_error", "message": _sanitize_err_body(err_body)}})
if ep == ordered[-1] and not _all_404:
return self.send_json(e.code, {"error": {"type": "upstream_error", "message": _sanitize_err_body(err_body)}})
continue
except Exception as e:
_all_404 = False
print(f"[{self._session_id}] [antigravity-v2] {ep.replace('https://','')} conn failed: {e}", file=sys.stderr)
if ep == ordered[-1]:
return self.send_json(502, {"error": {"type": "proxy_error", "message": str(e)}})
continue
if _all_404 and upstream is None:
print(f"[{self._session_id}] [antigravity-v2] all endpoints 404, invalidating version cache and re-fetching", file=sys.stderr)
global _antigravity_version_validated
with _antigravity_version_lock:
_antigravity_version_validated = False
_antigravity_version_checked = 0
_new_ver = _ensure_antigravity_version()
if _new_ver != _fetched_ver:
print(f"[{self._session_id}] [antigravity-v2] version changed {_fetched_ver} -> {_new_ver}, retrying", file=sys.stderr)
_ag_ua_new = f"antigravity/{_new_ver} {_os_name}/{_os_arch}"
headers["User-Agent"] = _ag_ua_new
wrapped["userAgent"] = _ag_ua_new
body_b = json.dumps(wrapped).encode()
for ep in ordered:
action = "streamGenerateContent" if stream else "generateContent"
url_suffix = f"v1internal:{action}?alt=sse" if stream else f"v1internal:{action}"
target = f"{ep}/{url_suffix}"
req = urllib.request.Request(target, data=body_b, headers=headers)
try:
upstream = urllib.request.urlopen(req, timeout=_upstream_timeout(body, stream))
chosen_ep = ep
with _antigravity_endpoint_lock:
_antigravity_preferred_endpoint = ep
break
except urllib.error.HTTPError as e:
err_body = e.read().decode()
print(f"[{self._session_id}] [antigravity-v2-retry] {ep.replace('https://','')} {e.code}", file=sys.stderr)
if e.code == 400:
return self.send_json(e.code, {"error": {"type": "upstream_error", "message": _sanitize_err_body(err_body)}})
if ep == ordered[-1]:
return self.send_json(e.code, {"error": {"type": "upstream_error", "message": _sanitize_err_body(err_body)}})
continue
except Exception as e:
if ep == ordered[-1]:
return self.send_json(502, {"error": {"type": "proxy_error", "message": str(e)}})
continue
if upstream is None:
# ─── gRPC FALLBACK ─────────────────────────────────────────
# If REST failed with 404 (model not available via REST API),
# try gRPC which supports display names and has a wider model catalog.
if _all_404:
grpc_result = self._try_grpc_fallback(wrapped, access_token, stream, tracker)
if grpc_result is not None:
return # gRPC succeeded, response already sent
# ─── END gRPC FALLBACK ─────────────────────────────────────
return self.send_json(502, {"error": {"type": "proxy_error", "message": "All endpoints failed"}})
if stream:
self._forward_gemini_sse(upstream, model, body, input_data, tracker)
else:
self._forward_gemini_json(upstream, model, body, input_data)
# ═══════════════════════════════════════════════════════════════════
# gRPC Fallback for Antigravity
# ═══════════════════════════════════════════════════════════════════
def _try_grpc_fallback(self, wrapped_dict, access_token, stream, tracker=None):
"""
Try gRPC fallback when REST API returns 404 (model not found).
gRPC uses display names (e.g. "Gemini 3.5 Flash (High)") instead of
REST slugs (e.g. "gemini-3-flash"), so models unavailable via REST
may work via gRPC.
Returns None if gRPC is unavailable or also failed (caller should
send its own error response). Returns True if gRPC succeeded and
the response was already sent to the client.
"""
grpc_client = _get_grpc_client()
if grpc_client is None:
print(f"[{self._session_id}] [antigravity-grpc] gRPC fallback not available (grpcio not installed), skipping", file=sys.stderr)
return None
# gRPC uses display names, not REST slugs — remap the model ID
grpc_wrapped = dict(wrapped_dict)
rest_model = grpc_wrapped.get("model", "")
grpc_model = _GRPC_REVERSE_ALIAS.get(rest_model, rest_model)
grpc_wrapped["model"] = grpc_model
if grpc_model != rest_model:
print(f"[{self._session_id}] [antigravity-grpc] model remapped for gRPC: REST={rest_model} -> gRPC={grpc_model}", file=sys.stderr)
print(f"[{self._session_id}] [antigravity-grpc] REST 404, trying gRPC fallback with model={grpc_model} stream={stream}", file=sys.stderr)
try:
result = grpc_client.try_generate(
grpc_wrapped,
stream=stream,
access_token=access_token,
timeout_s=180,
)
except Exception as e:
print(f"[{self._session_id}] [antigravity-grpc] gRPC call exception: {e}", file=sys.stderr)
return None
if not result.ok:
print(f"[{self._session_id}] [antigravity-grpc] gRPC fallback also failed: {result.error_message}", file=sys.stderr)
return None
print(f"[{self._session_id}] [antigravity-grpc] gRPC fallback OK! endpoint={result.endpoint_used} model={result.model_used} elapsed={result.elapsed_s:.1f}s", file=sys.stderr)
# Process the gRPC response through the same forwarding paths as REST
if stream and result.stream_chunks is not None:
self._forward_grpc_sse(result, grpc_model)
elif not stream and result.response_data is not None:
self._forward_grpc_json(result, grpc_model)
else:
print(f"[{self._session_id}] [antigravity-grpc] unexpected result shape, no data to forward", file=sys.stderr)
return None
return True # Response sent successfully via gRPC
def _forward_grpc_sse(self, grpc_result, model):
"""
Forward a gRPC streaming result to the client as SSE events.
The gRPC result contains stream_chunks that match the REST SSE chunk shape,
so we can process them through the same _forward_gemini_sse logic.
"""
resp_id = f"resp-{uuid.uuid4().hex[:24]}"
created = int(time.time())
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.send_header("Connection", "keep-alive")
self.end_headers()
full_text = ""
output_items = []
current_tool_calls = {}
message_started = False
message_id = f"msg-{uuid.uuid4().hex[:24]}"
def flush_event(event_type, data):
self.wfile.write(f"event: {event_type}\ndata: {json.dumps(data)}\n\n".encode())
self.wfile.flush()
flush_event("response.created", {"type": "response.created", "response": {"id": resp_id, "object": "response", "model": model, "status": "in_progress", "created": created, "output": []}})
flush_event("response.in_progress", {"type": "response.in_progress", "response": {"id": resp_id}})
# Process each gRPC chunk (same shape as REST SSE chunks)
for chunk in grpc_result.stream_chunks:
candidates = chunk.get("response", chunk).get("candidates", [])
if not candidates:
continue
parts = candidates[0].get("content", {}).get("parts", [])
for part in parts:
sig = _extract_gemini_sig(part)
if sig:
if part.get("functionCall"):
fc_id = part["functionCall"].get("id") or part["functionCall"].get("name")
fc_name = part["functionCall"].get("name")
if fc_id:
_gemini_store_sig(f"fc:{fc_id}", sig)
if fc_name:
_gemini_store_sig(f"fc:{fc_name}", sig)
_gemini_store_sig(f"turn:{resp_id}", sig)
if part.get("thought"):
sig_from_thought = _extract_gemini_sig(part)
if sig_from_thought:
_gemini_store_sig(f"turn:{resp_id}", sig_from_thought)
continue
if "text" in part and not part.get("functionCall"):
text_delta = part["text"]
if not text_delta:
continue
full_text += text_delta
if not message_started:
flush_event("response.output_item.added", {"type": "response.output_item.added", "output_index": 0, "item": {"type": "message", "id": message_id, "role": "assistant", "content": []}})
flush_event("response.content_part.added", {"type": "response.content_part.added", "output_index": 0, "content_index": 0, "part": {"type": "output_text", "text": ""}})
output_items.append({"text": True})
message_started = True
flush_event("response.output_text.delta", {"type": "response.output_text.delta", "output_index": 0, "content_index": 0, "delta": text_delta})
elif part.get("functionCall"):
fc = part["functionCall"]
call_id = f"call_{uuid.uuid4().hex[:24]}"
args_str = json.dumps(fc.get("args", fc.get("arguments", {})))
output_index = len(output_items)
flush_event("response.output_item.added", {"type": "response.output_item.added", "output_index": output_index, "item": {"type": "function_call", "id": call_id, "call_id": call_id, "name": fc.get("name", ""), "arguments": ""}})
flush_event("response.function_call_arguments.delta", {"type": "response.function_call_arguments.delta", "output_index": output_index, "item_id": call_id, "delta": args_str})
flush_event("response.function_call_arguments.done", {"type": "response.function_call_arguments.done", "output_index": output_index, "item_id": call_id, "arguments": args_str})
current_tool_calls[call_id] = fc
output_items.append({"tool": True})
# Build final response
out = []
if full_text:
out.append({"type": "message", "id": message_id, "role": "assistant", "content": [{"type": "output_text", "text": full_text}]})
tool_outputs = []
for cid, fc in current_tool_calls.items():
tool_outputs.append({"type": "function_call", "id": cid, "call_id": cid, "name": fc.get("name", ""), "arguments": json.dumps(fc.get("args", fc.get("arguments", {})))})
out.extend(tool_outputs)
final_resp = {"id": resp_id, "object": "response", "model": model, "status": "completed", "created": created, "output": out}
if full_text:
flush_event("response.output_text.done", {"type": "response.output_text.done", "output_index": 0, "content_index": 0, "text": full_text})
flush_event("response.content_part.done", {"type": "response.content_part.done", "output_index": 0, "content_index": 0, "part": {"type": "output_text", "text": full_text}})
flush_event("response.output_item.done", {"type": "response.output_item.done", "output_index": 0, "item": out[0]})
for idx, item in enumerate(tool_outputs, start=(1 if full_text else 0)):
flush_event("response.output_item.done", {"type": "response.output_item.done", "output_index": idx, "item": item})
flush_event("response.completed", {"type": "response.completed", "response": final_resp})
self.close_connection = True
with _response_store_lock:
_response_store[resp_id] = final_resp
while len(_response_store) > _MAX_STORED:
_response_store.popitem(last=False)
def _forward_grpc_json(self, grpc_result, model):
"""Forward a gRPC non-streaming result to the client as JSON."""
resp_id = f"resp-{uuid.uuid4().hex[:24]}"
created = int(time.time())
out = []
full_text = ""
data = grpc_result.response_data
candidates = data.get("response", data).get("candidates", [])
if candidates:
parts = candidates[0].get("content", {}).get("parts", [])
text_parts = []
for part in parts:
if part.get("thought"):
continue
if "text" in part and not part.get("functionCall"):
text_parts.append(part["text"])
elif part.get("functionCall"):
fc = part["functionCall"]
call_id = f"call_{uuid.uuid4().hex[:24]}"
out.append({"type": "function_call", "id": call_id, "call_id": call_id, "name": fc.get("name", ""), "arguments": json.dumps(fc.get("args", fc.get("arguments", {})))})
if text_parts:
full_text = "".join(text_parts)
out.insert(0, {"type": "message", "id": f"msg-{uuid.uuid4().hex[:24]}", "role": "assistant", "content": [{"type": "output_text", "text": full_text}]})
resp = {"id": resp_id, "object": "response", "model": model, "status": "completed", "created": created, "output": out}
with _response_store_lock:
_response_store[resp_id] = resp
while len(_response_store) > _MAX_STORED:
_response_store.popitem(last=False)
self.send_json(200, resp)
def _handle_gemini_oauth(self, body, model, stream, tracker=None):
input_data = body.get("input", "")
policy = provider_policy()
@@ -5553,6 +6279,9 @@ class Handler(http.server.BaseHTTPRequestHandler):
resp_part["functionResponse"]["id"] = call_id
contents.append({"role": "user", "parts": [resp_part]})
# CRITICAL FIX: Sanitize contents while PRESERVING functionCall -> functionResponse alternation.
# Google's Gemini API REQUIRES: functionCall (role=model) must be immediately followed by functionResponse (role=user).
# We NEVER merge, skip, or reorder tool-related messages.
if OAUTH_PROVIDER.startswith("google") and "claude" not in model.lower():
sanitized = []
last_user_text = None
@@ -5562,18 +6291,40 @@ class Handler(http.server.BaseHTTPRequestHandler):
parts = [p for p in content.get("parts", []) if isinstance(p, dict)]
if not parts:
continue
# Check if this content has functionCall or functionResponse - these MUST be preserved as-is
has_function_call = any("functionCall" in p for p in parts)
has_function_response = any("functionResponse" in p for p in parts)
text_key = "\n".join([p.get("text", "") for p in parts if "text" in p]).strip()
# Tool calls/responses are NEVER merged or skipped - they must maintain strict order
if has_function_call or has_function_response:
sanitized.append({"role": role, "parts": parts})
continue
# For plain text messages only: skip duplicate consecutive user text
if role == "user" and text_key and text_key == last_user_text:
continue
# Merge consecutive same-role TEXT-ONLY messages (no tool content)
if role == last_role and role in ("user", "model") and sanitized:
sanitized[-1].setdefault("parts", []).extend(parts)
else:
sanitized.append({"role": role, "parts": parts})
last_parts = sanitized[-1].get("parts", [])
# Only merge if the last message is also text-only (no functionCall/functionResponse)
last_has_tool = any("functionCall" in p or "functionResponse" in p for p in last_parts)
if not last_has_tool:
sanitized[-1].setdefault("parts", []).extend(parts)
if role == "user" and text_key:
last_user_text = text_key
continue
sanitized.append({"role": role, "parts": parts})
if role == "user" and text_key:
last_user_text = text_key
last_role = role
# Trim leading non-user messages (Google expects conversation to start with user)
while sanitized and sanitized[0].get("role") != "user":
sanitized.pop(0)
# Trim trailing non-user messages (must end with user turn for continuation)
while sanitized and sanitized[-1].get("role") != "user":
sanitized.pop()
contents = sanitized
@@ -5820,6 +6571,9 @@ class Handler(http.server.BaseHTTPRequestHandler):
headers["X-Client-Name"] = "antigravity"
headers["X-Client-Version"] = _ensure_antigravity_client_version()
headers["x-goog-api-client"] = "gl-node/18.18.2 fire/0.8.6 grpc/1.10.x"
# Add X-Machine-Session-Id header as seen in badrisnarayanan/antigravity-claude-proxy
if "request" in wrapped and "sessionId" in wrapped["request"]:
headers["X-Machine-Session-Id"] = wrapped["request"]["sessionId"]
else:
headers["User-Agent"] = "google-api-nodejs-client/9.15.1"
headers["X-Goog-Api-Client"] = "gl-node/22.17.0"

View File

@@ -0,0 +1,396 @@
#!/usr/bin/env python3
"""
Unit tests for the Antigravity gRPC fallback module.
Tests cover:
1. Module import and availability detection
2. Protobuf conversion helpers (JSON <-> protobuf)
3. Request building from wrapped REST dict
4. Reverse alias map correctness
5. GrpcFallbackResult type
6. Integration: _try_grpc_fallback triggers correctly on REST 404
"""
import json
import os
import sys
import unittest
from unittest.mock import patch, MagicMock
# Add src to path so we can import the antigravity_grpc package
_src_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "src")
if _src_dir not in sys.path:
sys.path.insert(0, _src_dir)
class TestGrpcModuleAvailability(unittest.TestCase):
"""Tests for is_grpc_available() and module loading."""
def test_is_grpc_available_returns_bool(self):
"""is_grpc_available should return a boolean."""
from antigravity_grpc import is_grpc_available
result = is_grpc_available()
self.assertIsInstance(result, bool)
def test_is_grpc_available_true_when_installed(self):
"""If grpcio is installed and stubs are loadable, should return True."""
from antigravity_grpc import is_grpc_available
# grpcio was installed at test time, so this should be True
self.assertTrue(is_grpc_available())
def test_client_instantiation(self):
"""AntigravityGrpcClient should be instantiatable."""
from antigravity_grpc import AntigravityGrpcClient
client = AntigravityGrpcClient()
self.assertIsNotNone(client)
def test_get_client_singleton(self):
"""get_client should return the same singleton."""
from antigravity_grpc import get_client
c1 = get_client()
c2 = get_client()
self.assertIs(c1, c2)
class TestGrpcFallbackResult(unittest.TestCase):
"""Tests for GrpcFallbackResult type."""
def test_default_values(self):
from antigravity_grpc import GrpcFallbackResult
r = GrpcFallbackResult()
self.assertFalse(r.ok)
self.assertIsNone(r.response_data)
self.assertIsNone(r.stream_chunks)
self.assertEqual(r.error_message, "")
self.assertEqual(r.endpoint_used, "")
self.assertEqual(r.model_used, "")
self.assertEqual(r.elapsed_s, 0.0)
def test_success_result(self):
from antigravity_grpc import GrpcFallbackResult
r = GrpcFallbackResult(ok=True, response_data={"response": {"candidates": []}},
endpoint_used="daily-cloudcode-pa.googleapis.com:443",
model_used="Gemini 3.5 Flash (High)",
elapsed_s=2.5)
self.assertTrue(r.ok)
self.assertIsNotNone(r.response_data)
self.assertEqual(r.elapsed_s, 2.5)
def test_failure_result(self):
from antigravity_grpc import GrpcFallbackResult
r = GrpcFallbackResult(ok=False, error_message="All gRPC endpoints failed")
self.assertFalse(r.ok)
self.assertIn("failed", r.error_message)
def test_repr(self):
from antigravity_grpc import GrpcFallbackResult
r_ok = GrpcFallbackResult(ok=True, response_data={"response": {"candidates": []}})
self.assertIn("OK", repr(r_ok))
r_fail = GrpcFallbackResult(ok=False, error_message="timeout")
self.assertIn("FAIL", repr(r_fail))
class TestReverseAliasMap(unittest.TestCase):
"""Tests for the _GRPC_REVERSE_ALIAS map in translate-proxy.py."""
def test_import_reverse_alias(self):
"""The reverse alias map should be importable from the proxy module."""
import importlib
_spec = importlib.util.spec_from_file_location(
"translate_proxy",
os.path.join(_src_dir, "translate-proxy.py"),
)
tp = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(tp)
self.assertIsInstance(tp._GRPC_REVERSE_ALIAS, dict)
def test_key_models_have_reverse_aliases(self):
"""All key REST model slugs should have gRPC display name mappings."""
import importlib
_spec = importlib.util.spec_from_file_location(
"translate_proxy",
os.path.join(_src_dir, "translate-proxy.py"),
)
tp = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(tp)
required_slugs = [
"gemini-3-flash",
"gemini-3.5-flash-low",
"gemini-3.1-pro-low",
"claude-sonnet-4-6",
"claude-opus-4-6-thinking",
"gemini-2.5-flash",
]
for slug in required_slugs:
self.assertIn(slug, tp._GRPC_REVERSE_ALIAS,
f"Missing reverse alias for REST slug '{slug}'")
def test_reverse_alias_values_are_display_names(self):
"""gRPC display names should contain spaces and parentheses, not hyphens."""
import importlib
_spec = importlib.util.spec_from_file_location(
"translate_proxy",
os.path.join(_src_dir, "translate-proxy.py"),
)
tp = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(tp)
for slug, display_name in tp._GRPC_REVERSE_ALIAS.items():
# Display names typically have spaces (e.g. "Gemini 3.5 Flash (High)")
# while slugs use hyphens (e.g. "gemini-3-flash")
self.assertNotEqual(slug, display_name,
f"Reverse alias for '{slug}' should differ from slug (gRPC uses display names)")
class TestProtobufConversion(unittest.TestCase):
"""Tests for JSON -> protobuf conversion helpers."""
def test_struct_to_protobuf(self):
"""_struct_to_protobuf should convert a simple dict to Struct."""
from antigravity_grpc.client import _struct_to_protobuf
result = _struct_to_protobuf({"key": "value", "num": 42})
self.assertIsNotNone(result)
# Verify round-trip
from antigravity_grpc.client import _protobuf_struct_to_dict
d = _protobuf_struct_to_dict(result)
self.assertEqual(d["key"], "value")
self.assertEqual(d["num"], 42.0)
def test_struct_round_trip_nested(self):
"""Nested dicts should survive a round-trip through protobuf."""
from antigravity_grpc.client import _struct_to_protobuf, _protobuf_struct_to_dict
original = {"outer": {"inner": "hello"}, "list_val": [1, 2, 3]}
proto = _struct_to_protobuf(original)
result = _protobuf_struct_to_dict(proto)
self.assertEqual(result["outer"]["inner"], "hello")
self.assertEqual(result["list_val"], [1.0, 2.0, 3.0])
def test_json_parts_to_proto_text(self):
"""Text parts should convert to protobuf Part with text field."""
from antigravity_grpc.client import _json_parts_to_proto
parts = _json_parts_to_proto([{"text": "Hello world"}])
self.assertEqual(len(parts), 1)
self.assertEqual(parts[0].text, "Hello world")
def test_json_parts_to_proto_function_call(self):
"""FunctionCall parts should convert correctly."""
from antigravity_grpc.client import _json_parts_to_proto
parts = _json_parts_to_proto([{
"functionCall": {
"name": "exec_command",
"args": {"cmd": "ls -la"},
"id": "call_123"
}
}])
self.assertEqual(len(parts), 1)
self.assertTrue(parts[0].HasField("function_call"))
self.assertEqual(parts[0].function_call.name, "exec_command")
self.assertEqual(parts[0].function_call.id, "call_123")
def test_json_parts_to_proto_function_response(self):
"""FunctionResponse parts should convert correctly."""
from antigravity_grpc.client import _json_parts_to_proto
parts = _json_parts_to_proto([{
"functionResponse": {
"name": "exec_command",
"response": {"result": "file1.txt"},
"id": "call_123"
}
}])
self.assertEqual(len(parts), 1)
self.assertTrue(parts[0].HasField("function_response"))
self.assertEqual(parts[0].function_response.name, "exec_command")
def test_json_contents_to_proto(self):
"""Content objects should convert correctly."""
from antigravity_grpc.client import _json_contents_to_proto
contents = _json_contents_to_proto([
{"role": "user", "parts": [{"text": "Hello"}]},
{"role": "model", "parts": [{"text": "Hi there"}]},
])
self.assertEqual(len(contents), 2)
self.assertEqual(contents[0].role, "user")
self.assertEqual(contents[1].role, "model")
def test_proto_candidate_to_json(self):
"""Protobuf candidates should convert back to JSON-compatible dicts."""
from antigravity_grpc.client import _json_contents_to_proto, _proto_candidate_to_json
from antigravity_grpc import cloudcode_pb2 as pb2
# Build a candidate manually
candidate = pb2.Candidate()
candidate.content.role = "model"
candidate.content.parts.add().text = "Hello from gRPC"
candidate.finish_reason = "STOP"
candidate.index = 0
result = _proto_candidate_to_json(candidate)
self.assertEqual(result["finishReason"], "STOP")
self.assertEqual(result["content"]["role"], "model")
self.assertEqual(result["content"]["parts"][0]["text"], "Hello from gRPC")
class TestGrpcRequestBuilding(unittest.TestCase):
"""Tests for _build_request (wrapped REST dict → protobuf)."""
def _get_client(self):
from antigravity_grpc import AntigravityGrpcClient
return AntigravityGrpcClient()
def test_build_request_basic(self):
"""Basic request fields should be populated correctly."""
client = self._get_client()
wrapped = {
"project": "test-project-123",
"model": "Gemini 3.5 Flash (High)",
"requestType": "agent",
"userAgent": "antigravity/2.0.6",
"requestId": "agent-test123",
"request": {
"contents": [
{"role": "user", "parts": [{"text": "Say hello"}]}
],
"safetySettings": [
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "OFF"},
],
}
}
req = client._build_request(wrapped)
self.assertEqual(req.project, "test-project-123")
self.assertEqual(req.model, "Gemini 3.5 Flash (High)")
self.assertEqual(req.request_type, "agent")
self.assertEqual(len(req.request.contents), 1)
self.assertEqual(req.request.contents[0].role, "user")
def test_build_request_with_tools(self):
"""Tools should be converted to function declarations."""
client = self._get_client()
wrapped = {
"project": "test-project",
"model": "gemini-3-flash",
"request": {
"contents": [],
"tools": [{
"functionDeclarations": [{
"name": "exec_command",
"description": "Run a shell command",
"parameters": {"type": "object", "properties": {"cmd": {"type": "string"}}}
}]
}],
}
}
req = client._build_request(wrapped)
self.assertEqual(len(req.request.tools), 1)
self.assertEqual(req.request.tools[0].function_declarations[0].name, "exec_command")
def test_build_request_with_generation_config(self):
"""Generation config should be populated correctly."""
client = self._get_client()
wrapped = {
"project": "test-project",
"model": "gemini-3-flash",
"request": {
"contents": [],
"generationConfig": {
"maxOutputTokens": 64000,
"temperature": 0.7,
"stopSequences": ["\n\nHuman:"],
"thinkingConfig": {
"includeThoughts": True,
"thinkingBudget": 8192,
}
}
}
}
req = client._build_request(wrapped)
self.assertEqual(req.request.generation_config.max_output_tokens, 64000)
self.assertAlmostEqual(req.request.generation_config.temperature, 0.7, places=2)
self.assertTrue(req.request.generation_config.thinking_config.include_thoughts)
self.assertEqual(req.request.generation_config.thinking_config.thinking_budget, 8192)
def test_build_request_with_function_call_history(self):
"""Function call/response pairs in contents should be preserved."""
client = self._get_client()
wrapped = {
"project": "test-project",
"model": "gemini-3-flash",
"request": {
"contents": [
{"role": "user", "parts": [{"text": "List files"}]},
{"role": "model", "parts": [{
"functionCall": {"name": "exec_command", "args": {"cmd": "ls"}, "id": "call_1"}
}]},
{"role": "user", "parts": [{
"functionResponse": {"name": "exec_command", "response": {"result": "file.txt"}, "id": "call_1"}
}]},
]
}
}
req = client._build_request(wrapped)
self.assertEqual(len(req.request.contents), 3)
# Verify function call preserved
self.assertTrue(req.request.contents[1].parts[0].HasField("function_call"))
self.assertEqual(req.request.contents[1].parts[0].function_call.name, "exec_command")
# Verify function response preserved
self.assertTrue(req.request.contents[2].parts[0].HasField("function_response"))
self.assertEqual(req.request.contents[2].parts[0].function_response.name, "exec_command")
class TestGrpcEndpointsConfig(unittest.TestCase):
"""Tests for gRPC endpoint configuration."""
def test_default_endpoints(self):
"""Default endpoints should include production and daily."""
from antigravity_grpc.client import _GRPC_ENDPOINTS
self.assertGreaterEqual(len(_GRPC_ENDPOINTS), 2)
hostnames = [ep.split(":")[0] for ep in _GRPC_ENDPOINTS]
self.assertIn("daily-cloudcode-pa.googleapis.com", hostnames)
self.assertIn("cloudcode-pa.googleapis.com", hostnames)
def test_staging_env_var(self):
"""Staging endpoints should be controlled by env var."""
from antigravity_grpc.client import _ALLOW_STAGING_ENV
self.assertEqual(_ALLOW_STAGING_ENV, "ALLOW_ANTIGRAVITY_STAGING")
class TestProxyIntegration(unittest.TestCase):
"""Tests for the proxy's gRPC fallback integration."""
def _load_proxy_module(self):
import importlib
_spec = importlib.util.spec_from_file_location(
"translate_proxy",
os.path.join(_src_dir, "translate-proxy.py"),
)
tp = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(tp)
return tp
def test_get_grpc_client_function_exists(self):
"""_get_grpc_client should exist as a module-level function."""
tp = self._load_proxy_module()
self.assertTrue(callable(tp._get_grpc_client))
def test_grpc_fallback_errors_set(self):
"""_GRPC_FALLBACK_REST_ERRORS should include 404."""
tp = self._load_proxy_module()
self.assertIn(404, tp._GRPC_FALLBACK_REST_ERRORS)
def test_versions_bug_fixed(self):
"""The _versions[0] NameError should be fixed (should be _fetched_ver)."""
# Read the source file and verify _versions is not used incorrectly
with open(os.path.join(_src_dir, "translate-proxy.py")) as f:
source = f.read()
# The bug was: ver={_versions[0]} -- should be ver={_fetched_ver}
self.assertNotIn("_versions[0]", source,
"Bug: _versions[0] should have been replaced with _fetched_ver")
if __name__ == "__main__":
print("=" * 70)
print("Antigravity gRPC Fallback - Unit Tests")
print("=" * 70)
print()
unittest.main(verbosity=2)