3 Commits

14 changed files with 1892 additions and 9 deletions

View File

@@ -1,5 +1,22 @@
# Changelog # Changelog
## v3.12.0 (2026-05-27)
**gRPC Auto-Fallback for Antigravity Provider (PR #13)**
### New Features
- **gRPC auto-fallback**: When REST API returns 404 (model not found), automatically retries via gRPC
- **New `antigravity_grpc` module**: Full protobuf client with CloudCode PredictionService stubs
- **Display name remapping**: gRPC uses display names (e.g. "Gemini 3.5 Flash (High)") instead of REST slugs
- **Streaming and unary support**: gRPC fallback works for both streaming and non-streaming requests
- **Dynamic version fetch with validation**: Probes fetched versions to ensure they work before caching
- **Antigravity v2 handler rewrite**: Based on anti-api approach with proper safety settings, stopSequences, sessionId
- **Lazy import**: grpcio is only imported when needed — zero impact if not installed
### Bug Fixes
- Antigravity 404 caused by invalid version — now validates with probe requests
- Version fallback: auto-retries with re-fetched version if all endpoints return 404
## v3.11.12 (2026-05-26) ## v3.11.12 (2026-05-26)
**New Antigravity v2 Handler (Mimicking anti-api)** **New Antigravity v2 Handler (Mimicking anti-api)**

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,24 @@
"""
antigravity_grpc — gRPC fallback client for Google CloudCode (Antigravity).
When the REST API rejects a request (404 model not found, 400 bad request due to
model ID mismatch, etc.), this module provides a gRPC fallback path that uses
Google's native PredictionService protocol — the same one the agy CLI uses.
This module is imported lazily and only when grpcio is installed. If grpcio is
not available, the fallback is silently skipped.
"""
from .client import (
GrpcFallbackResult,
AntigravityGrpcClient,
is_grpc_available,
get_client,
)
__all__ = [
"GrpcFallbackResult",
"AntigravityGrpcClient",
"is_grpc_available",
"get_client",
]

View File

@@ -0,0 +1,609 @@
"""
antigravity_grpc.client — gRPC fallback client for Google CloudCode (Antigravity).
This module provides a gRPC client that can be used as an automatic fallback when
the CloudCode REST API rejects requests. The gRPC path uses the same
PredictionService that the native agy CLI binary uses, giving access to models
that are unavailable via REST (e.g. models that return 404 on REST but work on gRPC).
Key design decisions:
- Lazy import: grpcio is only imported when actually needed. If not installed,
is_grpc_available() returns False and the fallback is silently skipped.
- Zero impact on other providers: this module is only called from
_handle_antigravity_v2() when REST returns a fallback-eligible error.
- Same output format as REST: the client returns structured dicts that match
the SSE/JSON response shapes the proxy already processes.
- Thread-safe: the gRPC channel is created once per endpoint and reused.
Usage from translate-proxy.py:
from antigravity_grpc import is_grpc_available, AntigravityGrpcClient
if is_grpc_available():
client = AntigravityGrpcClient()
result = client.try_generate(request_dict, stream=False)
if result.ok:
# Use result.response_data (dict matching REST response shape)
else:
# gRPC also failed, fall through to error
"""
import json
import os
import sys
import time
import threading
import collections
# ═══════════════════════════════════════════════════════════════════
# Lazy gRPC import — never crash if grpcio is missing
# ═══════════════════════════════════════════════════════════════════
_grpc = None
_pb2 = None
_pb2_grpc = None
_import_error = None
def _try_import():
global _grpc, _pb2, _pb2_grpc, _import_error
if _grpc is not None:
return _grpc is not False
try:
import grpc as _real_grpc
# Import the generated stubs relative to this package
from . import cloudcode_pb2 as _real_pb2
from . import cloudcode_pb2_grpc as _real_pb2_grpc
_grpc = _real_grpc
_pb2 = _real_pb2
_pb2_grpc = _real_pb2_grpc
return True
except Exception as e:
_import_error = str(e)
_grpc = False
return False
def is_grpc_available():
"""Return True if grpcio and the generated stubs are importable."""
return _try_import()
# ═══════════════════════════════════════════════════════════════════
# gRPC endpoints for Antigravity (same hosts, different port/path)
# ═══════════════════════════════════════════════════════════════════
# The CloudCode gRPC service runs on the same hosts as REST but uses
# the gRPC protocol. The agy CLI connects to:
# - cloudcode-pa.googleapis.com:443
# - daily-cloudcode-pa.googleapis.com:443
# - daily-cloudcode-pa.sandbox.googleapis.com:443
_GRPC_ENDPOINTS = [
"daily-cloudcode-pa.googleapis.com:443",
"cloudcode-pa.googleapis.com:443",
]
_ALLOW_STAGING_ENV = "ALLOW_ANTIGRAVITY_STAGING"
# ═══════════════════════════════════════════════════════════════════
# Result type
# ═══════════════════════════════════════════════════════════════════
class GrpcFallbackResult:
"""Result of a gRPC fallback attempt."""
__slots__ = ("ok", "response_data", "stream_chunks", "error_message",
"endpoint_used", "model_used", "elapsed_s")
def __init__(self, ok=False, response_data=None, stream_chunks=None,
error_message="", endpoint_used="", model_used="", elapsed_s=0.0):
self.ok = ok
self.response_data = response_data # dict (non-streaming)
self.stream_chunks = stream_chunks # list[dict] (streaming)
self.error_message = error_message
self.endpoint_used = endpoint_used
self.model_used = model_used
self.elapsed_s = elapsed_s
def __repr__(self):
if self.ok:
if self.stream_chunks is not None:
return f"<GrpcFallbackResult OK stream chunks={len(self.stream_chunks)}>"
return f"<GrpcFallbackResult OK data_keys={list(self.response_data.keys()) if self.response_data else None}>"
return f"<GrpcFallbackResult FAIL error={self.error_message!r}>"
# ═══════════════════════════════════════════════════════════════════
# JSON → Protobuf conversion helpers
# ═══════════════════════════════════════════════════════════════════
def _struct_to_protobuf(d, struct_obj=None):
"""Convert a Python dict to a google.protobuf.Struct."""
from google.protobuf.struct_pb2 import Struct, Value, NullValue, ListValue
if struct_obj is None:
struct_obj = Struct()
if isinstance(d, dict):
for k, v in d.items():
if isinstance(v, str):
struct_obj.fields[k].string_value = v
elif isinstance(v, bool):
struct_obj.fields[k].bool_value = v
elif isinstance(v, int):
struct_obj.fields[k].number_value = float(v)
elif isinstance(v, float):
struct_obj.fields[k].number_value = v
elif isinstance(v, dict):
_struct_to_protobuf(v, struct_obj.fields[k].struct_value)
elif isinstance(v, list):
lst = struct_obj.fields[k].list_value
for item in v:
if isinstance(item, str):
lst.values.add().string_value = item
elif isinstance(item, bool):
lst.values.add().bool_value = item
elif isinstance(item, (int, float)):
lst.values.add().number_value = float(item)
elif isinstance(item, dict):
_struct_to_protobuf(item, lst.values.add().struct_value)
elif item is None:
lst.values.add().null_value = 0
elif v is None:
struct_obj.fields[k].null_value = 0
return struct_obj
def _protobuf_struct_to_dict(struct):
"""Convert a google.protobuf.Struct to a Python dict."""
from google.protobuf.struct_pb2 import Value, NullValue
result = {}
for k, v in struct.fields.items():
kind = v.WhichOneof("kind")
if kind == "null_value":
result[k] = None
elif kind == "number_value":
result[k] = v.number_value
elif kind == "string_value":
result[k] = v.string_value
elif kind == "bool_value":
result[k] = v.bool_value
elif kind == "struct_value":
result[k] = _protobuf_struct_to_dict(v.struct_value)
elif kind == "list_value":
result[k] = [_value_to_python(item) for item in v.list_value.values]
else:
result[k] = None
return result
def _value_to_python(v):
"""Convert a google.protobuf.Value to a Python value."""
kind = v.WhichOneof("kind")
if kind == "null_value":
return None
elif kind == "number_value":
return v.number_value
elif kind == "string_value":
return v.string_value
elif kind == "bool_value":
return v.bool_value
elif kind == "struct_value":
return _protobuf_struct_to_dict(v.struct_value)
elif kind == "list_value":
return [_value_to_python(item) for item in v.list_value.values]
return None
def _json_parts_to_proto(parts_json):
"""Convert a list of JSON content parts to protobuf Part messages."""
result = []
for p in parts_json:
if not isinstance(p, dict):
continue
part = _pb2.Part()
# Thought signature
sig = p.get("thoughtSignature") or p.get("thought_signature")
if sig:
part.thought_signature = sig
if p.get("thought"):
part.thought = True
if "text" in p:
part.text = p["text"]
elif "text" in p and "functionCall" not in p:
part.text = p["text"]
elif "functionCall" in p:
fc = p["functionCall"]
part.function_call.name = fc.get("name", "")
part.function_call.id = fc.get("id", "")
args = fc.get("args", fc.get("arguments", {}))
if isinstance(args, dict):
_struct_to_protobuf(args, part.function_call.args)
elif isinstance(args, str):
try:
_struct_to_protobuf(json.loads(args), part.function_call.args)
except Exception:
pass
elif "functionResponse" in p:
fr = p["functionResponse"]
part.function_response.name = fr.get("name", "")
part.function_response.id = fr.get("id", "")
resp = fr.get("response", {})
if "result" in resp:
result_val = resp["result"]
if isinstance(result_val, (dict, list)):
_struct_to_protobuf({"result": result_val}, part.function_response.response)
else:
_struct_to_protobuf({"result": str(result_val)}, part.function_response.response)
elif isinstance(resp, dict):
_struct_to_protobuf(resp, part.function_response.response)
elif "inlineData" in p:
idata = p["inlineData"]
import base64
part.inline_data.mime_type = idata.get("mimeType", "image/png")
b64data = idata.get("data", "")
part.inline_data.data = base64.b64decode(b64data) if b64data else b""
result.append(part)
return result
def _json_contents_to_proto(contents_json):
"""Convert a list of JSON content objects to protobuf Content messages."""
result = []
for c in contents_json:
if not isinstance(c, dict):
continue
content = _pb2.Content()
content.role = c.get("role", "user")
for part in _json_parts_to_proto(c.get("parts", [])):
content.parts.append(part)
result.append(content)
return result
def _proto_candidate_to_json(candidate):
"""Convert a protobuf Candidate to a JSON-compatible dict."""
content_json = {"role": candidate.content.role, "parts": []}
for part in candidate.content.parts:
p = {}
if part.thought_signature:
p["thoughtSignature"] = part.thought_signature
if part.thought:
p["thought"] = True
if part.text:
p["text"] = part.text
elif part.text and not part.HasField("function_call"):
p["text"] = part.text
elif part.HasField("function_call"):
fc = part.function_call
args_dict = _protobuf_struct_to_dict(fc.args) if fc.HasField("args") else {}
p["functionCall"] = {
"name": fc.name,
"args": args_dict,
"id": fc.id,
}
elif part.HasField("function_response"):
fr = part.function_response
resp_dict = _protobuf_struct_to_dict(fr.response) if fr.HasField("response") else {}
p["functionResponse"] = {
"name": fr.name,
"response": resp_dict,
"id": fr.id,
}
elif part.HasField("inline_data"):
import base64
p["inlineData"] = {
"mimeType": part.inline_data.mime_type,
"data": base64.b64encode(part.inline_data.data).decode(),
}
if p:
content_json["parts"].append(p)
return {
"content": content_json,
"finishReason": candidate.finish_reason,
"index": candidate.index,
}
# ═══════════════════════════════════════════════════════════════════
# Client
# ═══════════════════════════════════════════════════════════════════
class AntigravityGrpcClient:
"""
gRPC fallback client for Google CloudCode Antigravity.
Thread-safe. Channels are cached per endpoint and reused.
"""
def __init__(self):
self._channels = {}
self._stubs = {}
self._lock = threading.Lock()
def _get_channel(self, endpoint):
"""Get or create a gRPC channel for the given endpoint."""
with self._lock:
if endpoint not in self._channels:
# Use secure channel with default SSL credentials
creds = _grpc.ssl_channel_credentials()
channel = _grpc.secure_channel(endpoint, creds)
self._channels[endpoint] = channel
self._stubs[endpoint] = _pb2_grpc.PredictionServiceStub(channel)
return self._channels[endpoint], self._stubs[endpoint]
def _build_request(self, wrapped_dict):
"""
Build a GenerateContentRequest protobuf from the same wrapped dict
that the REST API uses.
wrapped_dict shape:
{
"project": "...",
"model": "...",
"requestType": "agent",
"userAgent": "antigravity/...",
"requestId": "agent-...",
"request": {
"contents": [...],
"systemInstruction": {...},
"generationConfig": {...},
"tools": [...],
"safetySettings": [...],
"toolConfig": {...},
"sessionId": "..."
}
}
"""
req = _pb2.GenerateContentRequest()
req.project = wrapped_dict.get("project", "")
req.model = wrapped_dict.get("model", "")
req.request_type = wrapped_dict.get("requestType", "agent")
req.user_agent = wrapped_dict.get("userAgent", "")
req.request_id = wrapped_dict.get("requestId", "")
inner = wrapped_dict.get("request", {})
# Contents
for c in _json_contents_to_proto(inner.get("contents", [])):
req.request.contents.append(c)
# System instruction
si = inner.get("systemInstruction", {})
if si:
si_parts = si.get("parts", [])
if si.get("role"):
req.request.system_instruction.role = si.get("role", "user")
for part in _json_parts_to_proto(si_parts):
req.request.system_instruction.parts.append(part)
# Generation config
gc = inner.get("generationConfig", {})
if gc:
cfg = req.request.generation_config
if "maxOutputTokens" in gc:
cfg.max_output_tokens = int(gc["maxOutputTokens"])
if "temperature" in gc:
cfg.temperature = float(gc["temperature"])
if "topP" in gc:
cfg.top_p = float(gc["top_p" if "top_p" in gc else "topP"])
for ss in gc.get("stopSequences", []):
cfg.stop_sequences.append(ss)
# Thinking config (Gemini 3 native)
tc = gc.get("thinkingConfig", gc.get("thinking_config"))
if tc:
cfg.thinking_config.include_thoughts = tc.get("includeThoughts", tc.get("include_thoughts", False))
cfg.thinking_config.thinking_budget = int(tc.get("thinkingBudget", tc.get("thinking_budget", 8192)))
# Legacy thinking fields
if "includeThoughts" in gc and not tc:
cfg.thinking_config.include_thoughts = gc["includeThoughts"]
if "thinkingBudget" in gc and not tc:
cfg.thinking_config.thinking_budget = int(gc["thinkingBudget"])
# Tools
for tool_json in inner.get("tools", []):
tool = _pb2.Tool()
for fd_json in tool_json.get("functionDeclarations", []):
fd = tool.function_declarations.add()
fd.name = fd_json.get("name", "")
fd.description = fd_json.get("description", "")
params = fd_json.get("parameters", {})
if isinstance(params, dict) and params:
_struct_to_protobuf(params, fd.parameters)
req.request.tools.append(tool)
# Safety settings
for ss in inner.get("safetySettings", []):
ss_msg = _pb2.SafetySetting()
ss_msg.category = ss.get("category", "")
ss_msg.threshold = ss.get("threshold", "OFF")
req.request.safety_settings.append(ss_msg)
# Tool config
tcfg = inner.get("toolConfig", {})
if tcfg:
fcc = tcfg.get("functionCallingConfig", {})
if fcc:
req.request.tool_config.function_calling_config.mode = fcc.get("mode", "AUTO")
for afn in fcc.get("allowed_function_names", []):
req.request.tool_config.function_calling_config.allowed_function_names.append(afn)
# Session ID
sid = inner.get("sessionId", "")
if sid:
req.request.session_id = sid
return req
def try_generate(self, wrapped_dict, stream=False, access_token="",
timeout_s=180):
"""
Try a gRPC GenerateContent or StreamGenerateContent request.
Args:
wrapped_dict: The same wrapped dict used for REST requests.
stream: If True, use server-streaming RPC.
access_token: OAuth2 Bearer token for authentication.
timeout_s: Request timeout in seconds.
Returns:
GrpcFallbackResult with ok=True if successful.
For non-streaming: result.response_data is a dict matching
the REST JSON response shape.
For streaming: result.stream_chunks is a list of dicts matching
REST SSE chunk shapes.
"""
if not is_grpc_available():
return GrpcFallbackResult(ok=False, error_message="grpcio not installed")
t0 = time.time()
# Build metadata (gRPC uses metadata instead of HTTP headers)
metadata = []
if access_token:
metadata.append(("authorization", f"Bearer {access_token}"))
ua = wrapped_dict.get("userAgent", "")
if ua:
metadata.append(("user-agent", ua))
metadata.append(("x-client-name", "antigravity"))
# Required for Google's gRPC gateway
metadata.append(("x-goog-api-client", "gl-node/18.18.2 fire/0.8.6 grpc/1.10.x"))
# Build endpoints list
endpoints = list(_GRPC_ENDPOINTS)
if os.environ.get(_ALLOW_STAGING_ENV, "0") == "1":
endpoints.append("daily-cloudcode-pa.sandbox.googleapis.com:443")
endpoints.append("autopush-cloudcode-pa.sandbox.googleapis.com:443")
model = wrapped_dict.get("model", "?")
last_error = ""
for ep in endpoints:
try:
channel, stub = self._get_channel(ep)
req = self._build_request(wrapped_dict)
if stream:
return self._do_stream(stub, req, metadata, ep, model,
timeout_s, t0)
else:
return self._do_unary(stub, req, metadata, ep, model,
timeout_s, t0)
except Exception as e:
last_error = str(e)
err_str = last_error.lower()
print(f"[antigravity-grpc] {ep} failed: {last_error[:300]}", file=sys.stderr)
# Don't retry on auth errors
if "unauthenticated" in err_str or "permission" in err_str:
break
# Don't retry on invalid argument (model truly doesn't exist)
if "not_found" in err_str or "not found" in err_str:
break
continue
elapsed = time.time() - t0
return GrpcFallbackResult(
ok=False,
error_message=f"All gRPC endpoints failed: {last_error}",
model_used=model,
elapsed_s=elapsed,
)
def _do_unary(self, stub, req, metadata, endpoint, model, timeout_s, t0):
"""Execute a unary (non-streaming) gRPC call."""
response = stub.GenerateContent(
req,
metadata=metadata,
timeout=timeout_s,
)
elapsed = time.time() - t0
# Convert protobuf response to REST-compatible JSON shape
candidates_json = []
for candidate in response.response.candidates:
candidates_json.append(_proto_candidate_to_json(candidate))
# Match the REST response envelope:
# { "response": { "candidates": [...] } }
rest_shape = {
"response": {
"candidates": candidates_json,
}
}
print(f"[antigravity-grpc] {endpoint} unary OK, candidates={len(candidates_json)}, elapsed={elapsed:.1f}s", file=sys.stderr)
return GrpcFallbackResult(
ok=True,
response_data=rest_shape,
endpoint_used=endpoint,
model_used=model,
elapsed_s=elapsed,
)
def _do_stream(self, stub, req, metadata, endpoint, model, timeout_s, t0):
"""Execute a server-streaming gRPC call."""
chunks = []
chunk_count = 0
response_iter = stub.StreamGenerateContent(
req,
metadata=metadata,
timeout=timeout_s,
)
for chunk_proto in response_iter:
chunk_count += 1
# Each chunk_proto is a StreamGenerateContentChunk
# which wraps a Response with candidates
candidates_json = []
for candidate in chunk_proto.response.candidates:
candidates_json.append(_proto_candidate_to_json(candidate))
# Match REST SSE chunk shape: { "response": { "candidates": [...] } }
chunk_json = {
"response": {
"candidates": candidates_json,
}
}
chunks.append(chunk_json)
elapsed = time.time() - t0
print(f"[antigravity-grpc] {endpoint} stream OK, chunks={chunk_count}, elapsed={elapsed:.1f}s", file=sys.stderr)
return GrpcFallbackResult(
ok=True,
stream_chunks=chunks,
endpoint_used=endpoint,
model_used=model,
elapsed_s=elapsed,
)
def close(self):
"""Close all gRPC channels."""
with self._lock:
for ep, channel in self._channels.items():
try:
channel.close()
except Exception:
pass
self._channels.clear()
self._stubs.clear()
# ═══════════════════════════════════════════════════════════════════
# Module-level singleton
# ═══════════════════════════════════════════════════════════════════
_client = None
_client_lock = threading.Lock()
def get_client():
"""Get the module-level AntigravityGrpcClient singleton."""
global _client
with _client_lock:
if _client is None:
_client = AntigravityGrpcClient()
return _client

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,275 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import warnings
from antigravity_grpc import cloudcode_pb2 as cloudcode__pb2
GRPC_GENERATED_VERSION = '1.80.0'
GRPC_VERSION = grpc.__version__
_version_not_supported = False
try:
from grpc._utilities import first_version_is_lower
_version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION)
except ImportError:
_version_not_supported = True
if _version_not_supported:
raise RuntimeError(
f'The grpc package installed is at version {GRPC_VERSION},'
+ ' but the generated code in cloudcode_pb2_grpc.py depends on'
+ f' grpcio>={GRPC_GENERATED_VERSION}.'
+ f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}'
+ f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.'
)
class PredictionServiceStub(object):
"""─── Service ──────────────────────────────────────────────────────────
"""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.GenerateContent = channel.unary_unary(
'/google.internal.cloud.code.v1internal.PredictionService/GenerateContent',
request_serializer=cloudcode__pb2.GenerateContentRequest.SerializeToString,
response_deserializer=cloudcode__pb2.GenerateContentResponse.FromString,
_registered_method=True)
self.StreamGenerateContent = channel.unary_stream(
'/google.internal.cloud.code.v1internal.PredictionService/StreamGenerateContent',
request_serializer=cloudcode__pb2.GenerateContentRequest.SerializeToString,
response_deserializer=cloudcode__pb2.StreamGenerateContentChunk.FromString,
_registered_method=True)
self.FetchAvailableModels = channel.unary_unary(
'/google.internal.cloud.code.v1internal.PredictionService/FetchAvailableModels',
request_serializer=cloudcode__pb2.FetchAvailableModelsRequest.SerializeToString,
response_deserializer=cloudcode__pb2.FetchAvailableModelsResponse.FromString,
_registered_method=True)
self.CountTokens = channel.unary_unary(
'/google.internal.cloud.code.v1internal.PredictionService/CountTokens',
request_serializer=cloudcode__pb2.CountTokensRequest.SerializeToString,
response_deserializer=cloudcode__pb2.CountTokensResponse.FromString,
_registered_method=True)
self.RetrieveUserQuota = channel.unary_unary(
'/google.internal.cloud.code.v1internal.PredictionService/RetrieveUserQuota',
request_serializer=cloudcode__pb2.RetrieveUserQuotaRequest.SerializeToString,
response_deserializer=cloudcode__pb2.RetrieveUserQuotaResponse.FromString,
_registered_method=True)
class PredictionServiceServicer(object):
"""─── Service ──────────────────────────────────────────────────────────
"""
def GenerateContent(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def StreamGenerateContent(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def FetchAvailableModels(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def CountTokens(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def RetrieveUserQuota(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_PredictionServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
'GenerateContent': grpc.unary_unary_rpc_method_handler(
servicer.GenerateContent,
request_deserializer=cloudcode__pb2.GenerateContentRequest.FromString,
response_serializer=cloudcode__pb2.GenerateContentResponse.SerializeToString,
),
'StreamGenerateContent': grpc.unary_stream_rpc_method_handler(
servicer.StreamGenerateContent,
request_deserializer=cloudcode__pb2.GenerateContentRequest.FromString,
response_serializer=cloudcode__pb2.StreamGenerateContentChunk.SerializeToString,
),
'FetchAvailableModels': grpc.unary_unary_rpc_method_handler(
servicer.FetchAvailableModels,
request_deserializer=cloudcode__pb2.FetchAvailableModelsRequest.FromString,
response_serializer=cloudcode__pb2.FetchAvailableModelsResponse.SerializeToString,
),
'CountTokens': grpc.unary_unary_rpc_method_handler(
servicer.CountTokens,
request_deserializer=cloudcode__pb2.CountTokensRequest.FromString,
response_serializer=cloudcode__pb2.CountTokensResponse.SerializeToString,
),
'RetrieveUserQuota': grpc.unary_unary_rpc_method_handler(
servicer.RetrieveUserQuota,
request_deserializer=cloudcode__pb2.RetrieveUserQuotaRequest.FromString,
response_serializer=cloudcode__pb2.RetrieveUserQuotaResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'google.internal.cloud.code.v1internal.PredictionService', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
server.add_registered_method_handlers('google.internal.cloud.code.v1internal.PredictionService', rpc_method_handlers)
# This class is part of an EXPERIMENTAL API.
class PredictionService(object):
"""─── Service ──────────────────────────────────────────────────────────
"""
@staticmethod
def GenerateContent(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/google.internal.cloud.code.v1internal.PredictionService/GenerateContent',
cloudcode__pb2.GenerateContentRequest.SerializeToString,
cloudcode__pb2.GenerateContentResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def StreamGenerateContent(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_stream(
request,
target,
'/google.internal.cloud.code.v1internal.PredictionService/StreamGenerateContent',
cloudcode__pb2.GenerateContentRequest.SerializeToString,
cloudcode__pb2.StreamGenerateContentChunk.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def FetchAvailableModels(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/google.internal.cloud.code.v1internal.PredictionService/FetchAvailableModels',
cloudcode__pb2.FetchAvailableModelsRequest.SerializeToString,
cloudcode__pb2.FetchAvailableModelsResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def CountTokens(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/google.internal.cloud.code.v1internal.PredictionService/CountTokens',
cloudcode__pb2.CountTokensRequest.SerializeToString,
cloudcode__pb2.CountTokensResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def RetrieveUserQuota(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/google.internal.cloud.code.v1internal.PredictionService/RetrieveUserQuota',
cloudcode__pb2.RetrieveUserQuotaRequest.SerializeToString,
cloudcode__pb2.RetrieveUserQuotaResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)

View File

@@ -0,0 +1,183 @@
// Copyright 2026 Codex Launcher Contributors
// SPDX-License-Identifier: MIT
//
// CloudCode internal gRPC service definitions.
// Reverse-engineered from the agy-core binary for Antigravity proxy fallback.
// Service: google.internal.cloud.code.v1internal.PredictionService
//
// NOTE: google/api/annotations.proto is NOT imported here because it conflicts
// with the google namespace package at runtime. The HTTP annotations are only
// needed for Google's Envoy/gRPC-gateway and are unnecessary for our client.
syntax = "proto3";
package google.internal.cloud.code.v1internal;
import "google/protobuf/struct.proto";
option go_package = "google.golang.org/internal/cloud/code/v1internal";
// ─── Reused message types ───────────────────────────────────────────
message Content {
string role = 1;
repeated Part parts = 2;
}
message Part {
oneof data {
string text = 1;
InlineData inline_data = 2;
FunctionCall function_call = 3;
FunctionResponse function_response = 4;
}
// Thought signature for Gemini continuity
string thought_signature = 10;
// Thought part (reasoning)
bool thought = 11;
}
message InlineData {
string mime_type = 1;
bytes data = 2;
}
message FunctionCall {
string name = 1;
google.protobuf.Struct args = 2;
string id = 3;
}
message FunctionResponse {
string name = 1;
google.protobuf.Struct response = 2;
string id = 3;
}
message SafetySetting {
string category = 1;
string threshold = 2;
}
message GenerationConfig {
int32 max_output_tokens = 1;
float temperature = 2;
float top_p = 3;
int32 thinking_budget = 4;
bool include_thoughts = 5;
repeated string stop_sequences = 6;
message ThinkingConfig {
bool include_thoughts = 1;
int32 thinking_budget = 2;
}
ThinkingConfig thinking_config = 7;
}
message Tool {
repeated FunctionDeclaration function_declarations = 1;
}
message FunctionDeclaration {
string name = 1;
string description = 2;
google.protobuf.Struct parameters = 3;
}
message ToolConfig {
message FunctionCallingConfig {
string mode = 1; // "AUTO", "ANY", "NONE", "VALIDATED"
repeated string allowed_function_names = 2;
}
FunctionCallingConfig function_calling_config = 1;
}
message Candidate {
Content content = 1;
string finish_reason = 2;
int32 index = 3;
}
// ─── GenerateContent ─────────────────────────────────────────────────
message GenerateContentRequest {
string project = 1;
string model = 2;
string request_type = 3;
string user_agent = 4;
string request_id = 5;
message InnerRequest {
repeated Content contents = 1;
Content system_instruction = 2;
GenerationConfig generation_config = 3;
repeated Tool tools = 4;
repeated SafetySetting safety_settings = 5;
ToolConfig tool_config = 6;
string session_id = 7;
}
InnerRequest request = 10;
}
message GenerateContentResponse {
message Response {
repeated Candidate candidates = 1;
}
Response response = 1;
}
// ─── StreamGenerateContent ────────────────────────────────────────────
message StreamGenerateContentChunk {
GenerateContentResponse.Response response = 1;
}
// ─── FetchAvailableModels ────────────────────────────────────────────
message FetchAvailableModelsRequest {
string project = 1;
}
message FetchAvailableModelsResponse {
message ModelInfo {
string name = 1;
string display_name = 2;
string description = 3;
int64 context_window = 4;
}
repeated ModelInfo models = 1;
}
// ─── CountTokens ──────────────────────────────────────────────────────
message CountTokensRequest {
string project = 1;
string model = 2;
repeated Content contents = 3;
}
message CountTokensResponse {
int32 total_tokens = 1;
}
// ─── RetrieveUserQuota ───────────────────────────────────────────────
message RetrieveUserQuotaRequest {
string project = 1;
}
message RetrieveUserQuotaResponse {
int64 daily_limit = 1;
int64 daily_usage = 2;
int64 daily_remaining = 3;
}
// ─── Service ──────────────────────────────────────────────────────────
service PredictionService {
rpc GenerateContent(GenerateContentRequest) returns (GenerateContentResponse);
rpc StreamGenerateContent(GenerateContentRequest) returns (stream StreamGenerateContentChunk);
rpc FetchAvailableModels(FetchAvailableModelsRequest) returns (FetchAvailableModelsResponse);
rpc CountTokens(CountTokensRequest) returns (CountTokensResponse);
rpc RetrieveUserQuota(RetrieveUserQuotaRequest) returns (RetrieveUserQuotaResponse);
}

View File

@@ -0,0 +1,14 @@
// Minimal google/api/annotations.proto for code generation.
syntax = "proto3";
package google.api;
import "google/api/http.proto";
import "google/protobuf/descriptor.proto";
option go_package = "google.golang.org/genproto/googleapis/api/annotations";
extend google.protobuf.MethodOptions {
HttpRule http = 72295728;
}

View File

@@ -0,0 +1,18 @@
// Minimal google/api/http.proto for code generation.
syntax = "proto3";
package google.api;
option go_package = "google.golang.org/genproto/googleapis/api/annotations";
message HttpRule {
string get = 1;
string put = 2;
string post = 3;
string delete = 4;
string patch = 5;
repeated HttpRule additional_bindings = 11;
string body = 7;
string response_body = 12;
}

View File

@@ -27,9 +27,15 @@ model_catalog_json = ""
""" """
CHANGELOG = [ CHANGELOG = [
("3.11.12", "2026-05-26", [ ("3.12.1", "2026-05-27", [
"New Antigravity v2 handler mimicking anti-api", "Fix Antigravity adapter (PR #15): simplified model resolution",
"Safety settings, stopSequences, simplified sanitizer", "Removed broken schema sanitization, restored headers",
"Re-enabled gRPC fallback by default",
]),
("3.12.0", "2026-05-27", [
"gRPC auto-fallback for Antigravity (PR #13)",
"Dynamic version fetch with probe validation",
"Antigravity v2 handler rewrite (anti-api)",
]), ]),
("3.11.10", "2026-05-26", [ ("3.11.10", "2026-05-26", [
"Fix Antigravity: interleave function_call/output pairs (PR #11)", "Fix Antigravity: interleave function_call/output pairs (PR #11)",

View File

@@ -83,12 +83,20 @@ model_catalog_json = ""
""" """
CHANGELOG = [ CHANGELOG = [
("3.11.12", "2026-05-26", [ ("3.12.1", "2026-05-27", [
"New Antigravity v2 handler mimicking anti-api approach", "Fix Antigravity adapter (PR #15): simplify model resolution",
"Removed broken schema sanitization, restored correct headers",
"Expanded model alias map for all Antigravity variants",
"Re-enabled gRPC fallback by default",
]),
("3.12.0", "2026-05-27", [
"gRPC auto-fallback for Antigravity provider (PR #13)",
"New antigravity_grpc module with protobuf client",
"REST 404 triggers gRPC fallback using display names",
"gRPC supports streaming and unary generate",
"Dynamic version fetch with probe validation",
"Antigravity v2 handler rewrite (anti-api approach)",
"Safety settings, stopSequences, sessionId, requestType: agent", "Safety settings, stopSequences, sessionId, requestType: agent",
"Simplified sanitizer preserving functionCall/functionResponse",
"Endpoint priority: daily-cloudcode-pa first",
"functionResponse uses response.result (string) format",
]), ]),
("3.11.11", "2026-05-26", [ ("3.11.11", "2026-05-26", [
"Final trimming only removes plain messages, never function_call_output", "Final trimming only removes plain messages, never function_call_output",

View File

@@ -165,6 +165,56 @@ import tempfile
_IS_WINDOWS = sys.platform == "win32" _IS_WINDOWS = sys.platform == "win32"
# ═══════════════════════════════════════════════════════════════════
# Lazy gRPC import for Antigravity fallback
# ═══════════════════════════════════════════════════════════════════
_antigravity_grpc_client = None
_antigravity_grpc_available = None
def _get_grpc_client():
"""Lazy-load the Antigravity gRPC client. Returns None if grpcio is not installed."""
global _antigravity_grpc_client, _antigravity_grpc_available
if _antigravity_grpc_available is False:
return None
if _antigravity_grpc_client is not None:
return _antigravity_grpc_client
try:
# Add the src directory to sys.path so antigravity_grpc package is found
_src_dir = os.path.dirname(os.path.abspath(__file__))
if _src_dir not in sys.path:
sys.path.insert(0, _src_dir)
from antigravity_grpc import is_grpc_available, AntigravityGrpcClient, get_client
if is_grpc_available():
_antigravity_grpc_client = get_client()
_antigravity_grpc_available = True
print("[antigravity-grpc] gRPC fallback module loaded OK", file=sys.stderr)
return _antigravity_grpc_client
else:
_antigravity_grpc_available = False
print("[antigravity-grpc] grpcio available but stubs failed to load, gRPC fallback disabled", file=sys.stderr)
return None
except ImportError as e:
_antigravity_grpc_available = False
print(f"[antigravity-grpc] grpcio not installed ({e}), gRPC fallback disabled", file=sys.stderr)
return None
# Reverse alias map: REST slug → gRPC display name
# gRPC uses display names (e.g. "Gemini 3.5 Flash (High)") while REST uses slugs (e.g. "gemini-3-flash")
_GRPC_REVERSE_ALIAS = {
"gemini-3-flash": "Gemini 3.5 Flash (High)",
"gemini-3.5-flash-low": "Gemini 3.5 Flash (Low)",
"gemini-3.1-pro-low": "Gemini 3.1 Pro (High)",
"claude-sonnet-4-6": "Claude Sonnet 4.6 (Thinking)",
"claude-opus-4-6-thinking": "Claude Opus 4.6 (Thinking)",
"gpt-oss-120b-medium": "GPT-OSS 120B (Medium)",
"gemini-2.5-flash": "Gemini 2.5 Flash",
"gemini-2.5-pro": "Gemini 2.5 Pro",
"gemini-2.5-flash-lite": "Gemini 2.5 Flash Lite",
}
# Errors from REST that should trigger gRPC fallback
_GRPC_FALLBACK_REST_ERRORS = {404} # Model not found via REST (model exists in gRPC but not REST)
# ═══════════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════════
# Config # Config
# ═══════════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════════
@@ -5762,7 +5812,7 @@ class Handler(http.server.BaseHTTPRequestHandler):
_antigravity_endpoints.append("https://autopush-cloudcode-pa.sandbox.googleapis.com") _antigravity_endpoints.append("https://autopush-cloudcode-pa.sandbox.googleapis.com")
body_b = json.dumps(wrapped).encode() body_b = json.dumps(wrapped).encode()
print(f"[{self._session_id}] [antigravity-v2] model={model} stream={stream} contents={len(contents)} tools={bool(gemini_tools)} project={project_id} ver={_versions[0]}", file=sys.stderr) print(f"[{self._session_id}] [antigravity-v2] model={model} stream={stream} contents={len(contents)} tools={bool(gemini_tools)} project={project_id} ver={_fetched_ver}", file=sys.stderr)
try: try:
debug_path = os.path.join(_LOG_DIR, f"antigravity-v2-request-{self._session_id}.json") debug_path = os.path.join(_LOG_DIR, f"antigravity-v2-request-{self._session_id}.json")
with open(debug_path, "w") as dbg: with open(debug_path, "w") as dbg:
@@ -5863,6 +5913,14 @@ class Handler(http.server.BaseHTTPRequestHandler):
continue continue
if upstream is None: if upstream is None:
# ─── gRPC FALLBACK ─────────────────────────────────────────
# If REST failed with 404 (model not available via REST API),
# try gRPC which supports display names and has a wider model catalog.
if _all_404:
grpc_result = self._try_grpc_fallback(wrapped, access_token, stream, tracker)
if grpc_result is not None:
return # gRPC succeeded, response already sent
# ─── END gRPC FALLBACK ─────────────────────────────────────
return self.send_json(502, {"error": {"type": "proxy_error", "message": "All endpoints failed"}}) return self.send_json(502, {"error": {"type": "proxy_error", "message": "All endpoints failed"}})
if stream: if stream:
@@ -5870,6 +5928,190 @@ class Handler(http.server.BaseHTTPRequestHandler):
else: else:
self._forward_gemini_json(upstream, model, body, input_data) self._forward_gemini_json(upstream, model, body, input_data)
# ═══════════════════════════════════════════════════════════════════
# gRPC Fallback for Antigravity
# ═══════════════════════════════════════════════════════════════════
def _try_grpc_fallback(self, wrapped_dict, access_token, stream, tracker=None):
"""
Try gRPC fallback when REST API returns 404 (model not found).
gRPC uses display names (e.g. "Gemini 3.5 Flash (High)") instead of
REST slugs (e.g. "gemini-3-flash"), so models unavailable via REST
may work via gRPC.
Returns None if gRPC is unavailable or also failed (caller should
send its own error response). Returns True if gRPC succeeded and
the response was already sent to the client.
"""
grpc_client = _get_grpc_client()
if grpc_client is None:
print(f"[{self._session_id}] [antigravity-grpc] gRPC fallback not available (grpcio not installed), skipping", file=sys.stderr)
return None
# gRPC uses display names, not REST slugs — remap the model ID
grpc_wrapped = dict(wrapped_dict)
rest_model = grpc_wrapped.get("model", "")
grpc_model = _GRPC_REVERSE_ALIAS.get(rest_model, rest_model)
grpc_wrapped["model"] = grpc_model
if grpc_model != rest_model:
print(f"[{self._session_id}] [antigravity-grpc] model remapped for gRPC: REST={rest_model} -> gRPC={grpc_model}", file=sys.stderr)
print(f"[{self._session_id}] [antigravity-grpc] REST 404, trying gRPC fallback with model={grpc_model} stream={stream}", file=sys.stderr)
try:
result = grpc_client.try_generate(
grpc_wrapped,
stream=stream,
access_token=access_token,
timeout_s=180,
)
except Exception as e:
print(f"[{self._session_id}] [antigravity-grpc] gRPC call exception: {e}", file=sys.stderr)
return None
if not result.ok:
print(f"[{self._session_id}] [antigravity-grpc] gRPC fallback also failed: {result.error_message}", file=sys.stderr)
return None
print(f"[{self._session_id}] [antigravity-grpc] gRPC fallback OK! endpoint={result.endpoint_used} model={result.model_used} elapsed={result.elapsed_s:.1f}s", file=sys.stderr)
# Process the gRPC response through the same forwarding paths as REST
if stream and result.stream_chunks is not None:
self._forward_grpc_sse(result, grpc_model)
elif not stream and result.response_data is not None:
self._forward_grpc_json(result, grpc_model)
else:
print(f"[{self._session_id}] [antigravity-grpc] unexpected result shape, no data to forward", file=sys.stderr)
return None
return True # Response sent successfully via gRPC
def _forward_grpc_sse(self, grpc_result, model):
"""
Forward a gRPC streaming result to the client as SSE events.
The gRPC result contains stream_chunks that match the REST SSE chunk shape,
so we can process them through the same _forward_gemini_sse logic.
"""
resp_id = f"resp-{uuid.uuid4().hex[:24]}"
created = int(time.time())
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.send_header("Connection", "keep-alive")
self.end_headers()
full_text = ""
output_items = []
current_tool_calls = {}
message_started = False
message_id = f"msg-{uuid.uuid4().hex[:24]}"
def flush_event(event_type, data):
self.wfile.write(f"event: {event_type}\ndata: {json.dumps(data)}\n\n".encode())
self.wfile.flush()
flush_event("response.created", {"type": "response.created", "response": {"id": resp_id, "object": "response", "model": model, "status": "in_progress", "created": created, "output": []}})
flush_event("response.in_progress", {"type": "response.in_progress", "response": {"id": resp_id}})
# Process each gRPC chunk (same shape as REST SSE chunks)
for chunk in grpc_result.stream_chunks:
candidates = chunk.get("response", chunk).get("candidates", [])
if not candidates:
continue
parts = candidates[0].get("content", {}).get("parts", [])
for part in parts:
sig = _extract_gemini_sig(part)
if sig:
if part.get("functionCall"):
fc_id = part["functionCall"].get("id") or part["functionCall"].get("name")
fc_name = part["functionCall"].get("name")
if fc_id:
_gemini_store_sig(f"fc:{fc_id}", sig)
if fc_name:
_gemini_store_sig(f"fc:{fc_name}", sig)
_gemini_store_sig(f"turn:{resp_id}", sig)
if part.get("thought"):
sig_from_thought = _extract_gemini_sig(part)
if sig_from_thought:
_gemini_store_sig(f"turn:{resp_id}", sig_from_thought)
continue
if "text" in part and not part.get("functionCall"):
text_delta = part["text"]
if not text_delta:
continue
full_text += text_delta
if not message_started:
flush_event("response.output_item.added", {"type": "response.output_item.added", "output_index": 0, "item": {"type": "message", "id": message_id, "role": "assistant", "content": []}})
flush_event("response.content_part.added", {"type": "response.content_part.added", "output_index": 0, "content_index": 0, "part": {"type": "output_text", "text": ""}})
output_items.append({"text": True})
message_started = True
flush_event("response.output_text.delta", {"type": "response.output_text.delta", "output_index": 0, "content_index": 0, "delta": text_delta})
elif part.get("functionCall"):
fc = part["functionCall"]
call_id = f"call_{uuid.uuid4().hex[:24]}"
args_str = json.dumps(fc.get("args", fc.get("arguments", {})))
output_index = len(output_items)
flush_event("response.output_item.added", {"type": "response.output_item.added", "output_index": output_index, "item": {"type": "function_call", "id": call_id, "call_id": call_id, "name": fc.get("name", ""), "arguments": ""}})
flush_event("response.function_call_arguments.delta", {"type": "response.function_call_arguments.delta", "output_index": output_index, "item_id": call_id, "delta": args_str})
flush_event("response.function_call_arguments.done", {"type": "response.function_call_arguments.done", "output_index": output_index, "item_id": call_id, "arguments": args_str})
current_tool_calls[call_id] = fc
output_items.append({"tool": True})
# Build final response
out = []
if full_text:
out.append({"type": "message", "id": message_id, "role": "assistant", "content": [{"type": "output_text", "text": full_text}]})
tool_outputs = []
for cid, fc in current_tool_calls.items():
tool_outputs.append({"type": "function_call", "id": cid, "call_id": cid, "name": fc.get("name", ""), "arguments": json.dumps(fc.get("args", fc.get("arguments", {})))})
out.extend(tool_outputs)
final_resp = {"id": resp_id, "object": "response", "model": model, "status": "completed", "created": created, "output": out}
if full_text:
flush_event("response.output_text.done", {"type": "response.output_text.done", "output_index": 0, "content_index": 0, "text": full_text})
flush_event("response.content_part.done", {"type": "response.content_part.done", "output_index": 0, "content_index": 0, "part": {"type": "output_text", "text": full_text}})
flush_event("response.output_item.done", {"type": "response.output_item.done", "output_index": 0, "item": out[0]})
for idx, item in enumerate(tool_outputs, start=(1 if full_text else 0)):
flush_event("response.output_item.done", {"type": "response.output_item.done", "output_index": idx, "item": item})
flush_event("response.completed", {"type": "response.completed", "response": final_resp})
self.close_connection = True
with _response_store_lock:
_response_store[resp_id] = final_resp
while len(_response_store) > _MAX_STORED:
_response_store.popitem(last=False)
def _forward_grpc_json(self, grpc_result, model):
"""Forward a gRPC non-streaming result to the client as JSON."""
resp_id = f"resp-{uuid.uuid4().hex[:24]}"
created = int(time.time())
out = []
full_text = ""
data = grpc_result.response_data
candidates = data.get("response", data).get("candidates", [])
if candidates:
parts = candidates[0].get("content", {}).get("parts", [])
text_parts = []
for part in parts:
if part.get("thought"):
continue
if "text" in part and not part.get("functionCall"):
text_parts.append(part["text"])
elif part.get("functionCall"):
fc = part["functionCall"]
call_id = f"call_{uuid.uuid4().hex[:24]}"
out.append({"type": "function_call", "id": call_id, "call_id": call_id, "name": fc.get("name", ""), "arguments": json.dumps(fc.get("args", fc.get("arguments", {})))})
if text_parts:
full_text = "".join(text_parts)
out.insert(0, {"type": "message", "id": f"msg-{uuid.uuid4().hex[:24]}", "role": "assistant", "content": [{"type": "output_text", "text": full_text}]})
resp = {"id": resp_id, "object": "response", "model": model, "status": "completed", "created": created, "output": out}
with _response_store_lock:
_response_store[resp_id] = resp
while len(_response_store) > _MAX_STORED:
_response_store.popitem(last=False)
self.send_json(200, resp)
def _handle_gemini_oauth(self, body, model, stream, tracker=None): def _handle_gemini_oauth(self, body, model, stream, tracker=None):
input_data = body.get("input", "") input_data = body.get("input", "")
policy = provider_policy() policy = provider_policy()
@@ -6329,6 +6571,9 @@ class Handler(http.server.BaseHTTPRequestHandler):
headers["X-Client-Name"] = "antigravity" headers["X-Client-Name"] = "antigravity"
headers["X-Client-Version"] = _ensure_antigravity_client_version() headers["X-Client-Version"] = _ensure_antigravity_client_version()
headers["x-goog-api-client"] = "gl-node/18.18.2 fire/0.8.6 grpc/1.10.x" headers["x-goog-api-client"] = "gl-node/18.18.2 fire/0.8.6 grpc/1.10.x"
# Add X-Machine-Session-Id header as seen in badrisnarayanan/antigravity-claude-proxy
if "request" in wrapped and "sessionId" in wrapped["request"]:
headers["X-Machine-Session-Id"] = wrapped["request"]["sessionId"]
else: else:
headers["User-Agent"] = "google-api-nodejs-client/9.15.1" headers["User-Agent"] = "google-api-nodejs-client/9.15.1"
headers["X-Goog-Api-Client"] = "gl-node/22.17.0" headers["X-Goog-Api-Client"] = "gl-node/22.17.0"

View File

@@ -0,0 +1,396 @@
#!/usr/bin/env python3
"""
Unit tests for the Antigravity gRPC fallback module.
Tests cover:
1. Module import and availability detection
2. Protobuf conversion helpers (JSON <-> protobuf)
3. Request building from wrapped REST dict
4. Reverse alias map correctness
5. GrpcFallbackResult type
6. Integration: _try_grpc_fallback triggers correctly on REST 404
"""
import json
import os
import sys
import unittest
from unittest.mock import patch, MagicMock
# Add src to path so we can import the antigravity_grpc package
_src_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "src")
if _src_dir not in sys.path:
sys.path.insert(0, _src_dir)
class TestGrpcModuleAvailability(unittest.TestCase):
"""Tests for is_grpc_available() and module loading."""
def test_is_grpc_available_returns_bool(self):
"""is_grpc_available should return a boolean."""
from antigravity_grpc import is_grpc_available
result = is_grpc_available()
self.assertIsInstance(result, bool)
def test_is_grpc_available_true_when_installed(self):
"""If grpcio is installed and stubs are loadable, should return True."""
from antigravity_grpc import is_grpc_available
# grpcio was installed at test time, so this should be True
self.assertTrue(is_grpc_available())
def test_client_instantiation(self):
"""AntigravityGrpcClient should be instantiatable."""
from antigravity_grpc import AntigravityGrpcClient
client = AntigravityGrpcClient()
self.assertIsNotNone(client)
def test_get_client_singleton(self):
"""get_client should return the same singleton."""
from antigravity_grpc import get_client
c1 = get_client()
c2 = get_client()
self.assertIs(c1, c2)
class TestGrpcFallbackResult(unittest.TestCase):
"""Tests for GrpcFallbackResult type."""
def test_default_values(self):
from antigravity_grpc import GrpcFallbackResult
r = GrpcFallbackResult()
self.assertFalse(r.ok)
self.assertIsNone(r.response_data)
self.assertIsNone(r.stream_chunks)
self.assertEqual(r.error_message, "")
self.assertEqual(r.endpoint_used, "")
self.assertEqual(r.model_used, "")
self.assertEqual(r.elapsed_s, 0.0)
def test_success_result(self):
from antigravity_grpc import GrpcFallbackResult
r = GrpcFallbackResult(ok=True, response_data={"response": {"candidates": []}},
endpoint_used="daily-cloudcode-pa.googleapis.com:443",
model_used="Gemini 3.5 Flash (High)",
elapsed_s=2.5)
self.assertTrue(r.ok)
self.assertIsNotNone(r.response_data)
self.assertEqual(r.elapsed_s, 2.5)
def test_failure_result(self):
from antigravity_grpc import GrpcFallbackResult
r = GrpcFallbackResult(ok=False, error_message="All gRPC endpoints failed")
self.assertFalse(r.ok)
self.assertIn("failed", r.error_message)
def test_repr(self):
from antigravity_grpc import GrpcFallbackResult
r_ok = GrpcFallbackResult(ok=True, response_data={"response": {"candidates": []}})
self.assertIn("OK", repr(r_ok))
r_fail = GrpcFallbackResult(ok=False, error_message="timeout")
self.assertIn("FAIL", repr(r_fail))
class TestReverseAliasMap(unittest.TestCase):
"""Tests for the _GRPC_REVERSE_ALIAS map in translate-proxy.py."""
def test_import_reverse_alias(self):
"""The reverse alias map should be importable from the proxy module."""
import importlib
_spec = importlib.util.spec_from_file_location(
"translate_proxy",
os.path.join(_src_dir, "translate-proxy.py"),
)
tp = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(tp)
self.assertIsInstance(tp._GRPC_REVERSE_ALIAS, dict)
def test_key_models_have_reverse_aliases(self):
"""All key REST model slugs should have gRPC display name mappings."""
import importlib
_spec = importlib.util.spec_from_file_location(
"translate_proxy",
os.path.join(_src_dir, "translate-proxy.py"),
)
tp = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(tp)
required_slugs = [
"gemini-3-flash",
"gemini-3.5-flash-low",
"gemini-3.1-pro-low",
"claude-sonnet-4-6",
"claude-opus-4-6-thinking",
"gemini-2.5-flash",
]
for slug in required_slugs:
self.assertIn(slug, tp._GRPC_REVERSE_ALIAS,
f"Missing reverse alias for REST slug '{slug}'")
def test_reverse_alias_values_are_display_names(self):
"""gRPC display names should contain spaces and parentheses, not hyphens."""
import importlib
_spec = importlib.util.spec_from_file_location(
"translate_proxy",
os.path.join(_src_dir, "translate-proxy.py"),
)
tp = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(tp)
for slug, display_name in tp._GRPC_REVERSE_ALIAS.items():
# Display names typically have spaces (e.g. "Gemini 3.5 Flash (High)")
# while slugs use hyphens (e.g. "gemini-3-flash")
self.assertNotEqual(slug, display_name,
f"Reverse alias for '{slug}' should differ from slug (gRPC uses display names)")
class TestProtobufConversion(unittest.TestCase):
"""Tests for JSON -> protobuf conversion helpers."""
def test_struct_to_protobuf(self):
"""_struct_to_protobuf should convert a simple dict to Struct."""
from antigravity_grpc.client import _struct_to_protobuf
result = _struct_to_protobuf({"key": "value", "num": 42})
self.assertIsNotNone(result)
# Verify round-trip
from antigravity_grpc.client import _protobuf_struct_to_dict
d = _protobuf_struct_to_dict(result)
self.assertEqual(d["key"], "value")
self.assertEqual(d["num"], 42.0)
def test_struct_round_trip_nested(self):
"""Nested dicts should survive a round-trip through protobuf."""
from antigravity_grpc.client import _struct_to_protobuf, _protobuf_struct_to_dict
original = {"outer": {"inner": "hello"}, "list_val": [1, 2, 3]}
proto = _struct_to_protobuf(original)
result = _protobuf_struct_to_dict(proto)
self.assertEqual(result["outer"]["inner"], "hello")
self.assertEqual(result["list_val"], [1.0, 2.0, 3.0])
def test_json_parts_to_proto_text(self):
"""Text parts should convert to protobuf Part with text field."""
from antigravity_grpc.client import _json_parts_to_proto
parts = _json_parts_to_proto([{"text": "Hello world"}])
self.assertEqual(len(parts), 1)
self.assertEqual(parts[0].text, "Hello world")
def test_json_parts_to_proto_function_call(self):
"""FunctionCall parts should convert correctly."""
from antigravity_grpc.client import _json_parts_to_proto
parts = _json_parts_to_proto([{
"functionCall": {
"name": "exec_command",
"args": {"cmd": "ls -la"},
"id": "call_123"
}
}])
self.assertEqual(len(parts), 1)
self.assertTrue(parts[0].HasField("function_call"))
self.assertEqual(parts[0].function_call.name, "exec_command")
self.assertEqual(parts[0].function_call.id, "call_123")
def test_json_parts_to_proto_function_response(self):
"""FunctionResponse parts should convert correctly."""
from antigravity_grpc.client import _json_parts_to_proto
parts = _json_parts_to_proto([{
"functionResponse": {
"name": "exec_command",
"response": {"result": "file1.txt"},
"id": "call_123"
}
}])
self.assertEqual(len(parts), 1)
self.assertTrue(parts[0].HasField("function_response"))
self.assertEqual(parts[0].function_response.name, "exec_command")
def test_json_contents_to_proto(self):
"""Content objects should convert correctly."""
from antigravity_grpc.client import _json_contents_to_proto
contents = _json_contents_to_proto([
{"role": "user", "parts": [{"text": "Hello"}]},
{"role": "model", "parts": [{"text": "Hi there"}]},
])
self.assertEqual(len(contents), 2)
self.assertEqual(contents[0].role, "user")
self.assertEqual(contents[1].role, "model")
def test_proto_candidate_to_json(self):
"""Protobuf candidates should convert back to JSON-compatible dicts."""
from antigravity_grpc.client import _json_contents_to_proto, _proto_candidate_to_json
from antigravity_grpc import cloudcode_pb2 as pb2
# Build a candidate manually
candidate = pb2.Candidate()
candidate.content.role = "model"
candidate.content.parts.add().text = "Hello from gRPC"
candidate.finish_reason = "STOP"
candidate.index = 0
result = _proto_candidate_to_json(candidate)
self.assertEqual(result["finishReason"], "STOP")
self.assertEqual(result["content"]["role"], "model")
self.assertEqual(result["content"]["parts"][0]["text"], "Hello from gRPC")
class TestGrpcRequestBuilding(unittest.TestCase):
"""Tests for _build_request (wrapped REST dict → protobuf)."""
def _get_client(self):
from antigravity_grpc import AntigravityGrpcClient
return AntigravityGrpcClient()
def test_build_request_basic(self):
"""Basic request fields should be populated correctly."""
client = self._get_client()
wrapped = {
"project": "test-project-123",
"model": "Gemini 3.5 Flash (High)",
"requestType": "agent",
"userAgent": "antigravity/2.0.6",
"requestId": "agent-test123",
"request": {
"contents": [
{"role": "user", "parts": [{"text": "Say hello"}]}
],
"safetySettings": [
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "OFF"},
],
}
}
req = client._build_request(wrapped)
self.assertEqual(req.project, "test-project-123")
self.assertEqual(req.model, "Gemini 3.5 Flash (High)")
self.assertEqual(req.request_type, "agent")
self.assertEqual(len(req.request.contents), 1)
self.assertEqual(req.request.contents[0].role, "user")
def test_build_request_with_tools(self):
"""Tools should be converted to function declarations."""
client = self._get_client()
wrapped = {
"project": "test-project",
"model": "gemini-3-flash",
"request": {
"contents": [],
"tools": [{
"functionDeclarations": [{
"name": "exec_command",
"description": "Run a shell command",
"parameters": {"type": "object", "properties": {"cmd": {"type": "string"}}}
}]
}],
}
}
req = client._build_request(wrapped)
self.assertEqual(len(req.request.tools), 1)
self.assertEqual(req.request.tools[0].function_declarations[0].name, "exec_command")
def test_build_request_with_generation_config(self):
"""Generation config should be populated correctly."""
client = self._get_client()
wrapped = {
"project": "test-project",
"model": "gemini-3-flash",
"request": {
"contents": [],
"generationConfig": {
"maxOutputTokens": 64000,
"temperature": 0.7,
"stopSequences": ["\n\nHuman:"],
"thinkingConfig": {
"includeThoughts": True,
"thinkingBudget": 8192,
}
}
}
}
req = client._build_request(wrapped)
self.assertEqual(req.request.generation_config.max_output_tokens, 64000)
self.assertAlmostEqual(req.request.generation_config.temperature, 0.7, places=2)
self.assertTrue(req.request.generation_config.thinking_config.include_thoughts)
self.assertEqual(req.request.generation_config.thinking_config.thinking_budget, 8192)
def test_build_request_with_function_call_history(self):
"""Function call/response pairs in contents should be preserved."""
client = self._get_client()
wrapped = {
"project": "test-project",
"model": "gemini-3-flash",
"request": {
"contents": [
{"role": "user", "parts": [{"text": "List files"}]},
{"role": "model", "parts": [{
"functionCall": {"name": "exec_command", "args": {"cmd": "ls"}, "id": "call_1"}
}]},
{"role": "user", "parts": [{
"functionResponse": {"name": "exec_command", "response": {"result": "file.txt"}, "id": "call_1"}
}]},
]
}
}
req = client._build_request(wrapped)
self.assertEqual(len(req.request.contents), 3)
# Verify function call preserved
self.assertTrue(req.request.contents[1].parts[0].HasField("function_call"))
self.assertEqual(req.request.contents[1].parts[0].function_call.name, "exec_command")
# Verify function response preserved
self.assertTrue(req.request.contents[2].parts[0].HasField("function_response"))
self.assertEqual(req.request.contents[2].parts[0].function_response.name, "exec_command")
class TestGrpcEndpointsConfig(unittest.TestCase):
"""Tests for gRPC endpoint configuration."""
def test_default_endpoints(self):
"""Default endpoints should include production and daily."""
from antigravity_grpc.client import _GRPC_ENDPOINTS
self.assertGreaterEqual(len(_GRPC_ENDPOINTS), 2)
hostnames = [ep.split(":")[0] for ep in _GRPC_ENDPOINTS]
self.assertIn("daily-cloudcode-pa.googleapis.com", hostnames)
self.assertIn("cloudcode-pa.googleapis.com", hostnames)
def test_staging_env_var(self):
"""Staging endpoints should be controlled by env var."""
from antigravity_grpc.client import _ALLOW_STAGING_ENV
self.assertEqual(_ALLOW_STAGING_ENV, "ALLOW_ANTIGRAVITY_STAGING")
class TestProxyIntegration(unittest.TestCase):
"""Tests for the proxy's gRPC fallback integration."""
def _load_proxy_module(self):
import importlib
_spec = importlib.util.spec_from_file_location(
"translate_proxy",
os.path.join(_src_dir, "translate-proxy.py"),
)
tp = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(tp)
return tp
def test_get_grpc_client_function_exists(self):
"""_get_grpc_client should exist as a module-level function."""
tp = self._load_proxy_module()
self.assertTrue(callable(tp._get_grpc_client))
def test_grpc_fallback_errors_set(self):
"""_GRPC_FALLBACK_REST_ERRORS should include 404."""
tp = self._load_proxy_module()
self.assertIn(404, tp._GRPC_FALLBACK_REST_ERRORS)
def test_versions_bug_fixed(self):
"""The _versions[0] NameError should be fixed (should be _fetched_ver)."""
# Read the source file and verify _versions is not used incorrectly
with open(os.path.join(_src_dir, "translate-proxy.py")) as f:
source = f.read()
# The bug was: ver={_versions[0]} -- should be ver={_fetched_ver}
self.assertNotIn("_versions[0]", source,
"Bug: _versions[0] should have been replaced with _fetched_ver")
if __name__ == "__main__":
print("=" * 70)
print("Antigravity gRPC Fallback - Unit Tests")
print("=" * 70)
print()
unittest.main(verbosity=2)