29 Commits

20 changed files with 5407 additions and 6903 deletions

View File

@@ -1,5 +1,196 @@
# Changelog
## v3.12.0 (2026-05-27)
**gRPC Auto-Fallback for Antigravity Provider (PR #13)**
### New Features
- **gRPC auto-fallback**: When REST API returns 404 (model not found), automatically retries via gRPC
- **New `antigravity_grpc` module**: Full protobuf client with CloudCode PredictionService stubs
- **Display name remapping**: gRPC uses display names (e.g. "Gemini 3.5 Flash (High)") instead of REST slugs
- **Streaming and unary support**: gRPC fallback works for both streaming and non-streaming requests
- **Dynamic version fetch with validation**: Probes fetched versions to ensure they work before caching
- **Antigravity v2 handler rewrite**: Based on anti-api approach with proper safety settings, stopSequences, sessionId
- **Lazy import**: grpcio is only imported when needed — zero impact if not installed
### Bug Fixes
- Antigravity 404 caused by invalid version — now validates with probe requests
- Version fallback: auto-retries with re-fetched version if all endpoints return 404
## v3.11.12 (2026-05-26)
**New Antigravity v2 Handler (Mimicking anti-api)**
### New Features
- **Complete rewrite of Antigravity handler** based on https://github.com/ink1ing/anti-api approach
- Safety settings (all OFF), stopSequences, sessionId, requestType: agent
- functionResponse uses `response: { result: string }` format matching anti-api
- Endpoint priority: `daily-cloudcode-pa.googleapis.com` first
- Simplified sanitizer: only deduplicates consecutive user text, never touches tool messages
## v3.11.11 (2026-05-26)
## v3.11.11 (2026-05-26)
**Antigravity Fix: Stricter function_call/output Pairing + Gemini Sanitizer Rewrite (PR #12)**
### Bug Fixes
- **Stricter function_call/output pairing**: Only includes pairs where BOTH call and output exist — no orphan calls sent to Gemini
- **Gemini sanitizer rewritten**: Tool messages (`functionCall`/`functionResponse`) are always preserved as-is, never merged or skipped
- **Text merging more conservative**: Checks last message for tool content before merging consecutive text messages
- **Final trimming safe**: Only removes plain `message` items, never `function_call_output` (which would break tool pairs)
- **Merge PR #12**: Fix by qwen-chat coder
## v3.11.10 (2026-05-26)
## v3.11.10 (2026-05-26)
**Antigravity Fix: Interleave function_call/output Pairs, Gemini Turn Trimming (PR #11)**
### Bug Fixes
- **Fix Antigravity function_call/output ordering**: Tool calls and their responses are now properly interleaved in sequence (`function_call``function_call_output``function_call` → ...) instead of being grouped separately
- **Gemini sanitizer trimming**: Leading/trailing non-user turns removed for Google API compliance (Google requires conversation to start and end with user turn)
- **Stricter role boundary enforcement**: `functionCall` (model) and `functionResponse` (user) never merged across role boundaries
- **Merge PR #11**: Fix by qwen-chat coder
## v3.11.9 (2026-05-26)
## v3.11.9 (2026-05-26)
**Antigravity Fix: Preserve functionCall/functionResponse in Gemini Sanitizer (PR #10)**
### Bug Fixes
- **Fix Antigravity multi-turn tool use**: The Gemini message sanitizer was incorrectly merging/dropping `functionCall` and `functionResponse` turns, causing Antigravity to think forever without responding. These turns are now always preserved as separate messages.
- **Merge PR #10**: `fix: preserve functionCall/functionResponse in Gemini sanitizer` (qwen-chat coder)
## v3.11.8 (2026-05-26)
## v3.11.8 (2026-05-26)
**Vision Cache Persistence, PR #8 Merge**
### New Features
- **Vision description cache persisted across requests**: Image descriptions from the vision fallback API are now cached in a file (`~/.cache/codex-proxy/vision-cache.json`) so the same image URL is never described twice — saves API calls and latency
- **Merge PR #8**: `fix: persist vision description cache across requests` (cobra91)
## v3.11.7 (2026-05-26)
**Vision Auto-Detect, Proactive Non-Vision Model Detection, Unit Tests, Bug Fixes**
### New Features
- **Vision auto-detect fallback**: When no explicit vision fallback is configured, automatically uses the current provider's own vision model (e.g., `0G-Qwen-VL` for OpenAdapter) as the image description API — no separate API key needed
- **Proactive non-vision model detection**: Models matching name patterns (`glm`, `deepseek`, `llama`, `qwen` without `vl`, etc.) are detected as non-vision on first request without waiting for an error from the provider
- **Vision preprocessing is now the primary image handling solution**: Replaces old `_strip_images_from_input()` (which just removed images with a placeholder). Images are now described via API and sent as rich text descriptions to text-only models
- **Merge PR #6**: Vision/OCR preprocessing for text-only models (cobra91)
- **Merge PR #7**: 177 unit tests for translate-proxy.py (cobra91)
### Bug Fixes
- **AttributeError fix**: `image_url` field can be a string (bare URL) not always a dict — fixed in both `_preprocess_vision_input()` and old strip function
- **Auth os error 2 fix**: GUI shows "Config missing" message instead of raw OSError when `~/.codex/` directory doesn't exist
- **Removed duplicate vision functions**: Cleaned up duplicate `_vision_describe_image()`, `_preprocess_vision()`, `_preprocess_vision_input()` from merge
## v3.11.6 (2026-05-26)
**Antigravity Loop Breakers, Vision/OCR Preprocessing, has_content Fix, Auth Error Fix**
### New Features (Antigravity-only, no other providers affected)
- **Per-session loop tracking**: `_ANTIGRAVITY_LOOP_TRACKER` global dict with `_antigravity_loop_key()` function tracks state per session: `latest_user_hash`, `nudge_injected`, `latest_user_appended`, `tool_calls_for_request`, `repeated_tool`, `force_finalize`, `last_tool`, `last_tool_count`
- **Edit-intent nudge injection**: Injected only on the first turn per request, preventing duplicate nudges across retries
- **Latest user instruction append**: Appended exactly once per request to prevent redundant instruction stacking
- **Loop breaker**: If the same tool + arguments is repeated ≥ 5 times in a session, `force_finalize` is triggered to break the infinite loop
- **Detailed `[antigravity-loop]` logging**: All tracking fields logged on every Antigravity request for debugging
### New Features (All OpenAI-compatible providers)
- **Vision/OCR preprocessing**: When a provider doesn't support images (detected via error messages like "unknown variant image_url", "does not support image"), the proxy automatically calls a configurable vision fallback API (default: Kilo.ai) to describe images as text, then replaces image blocks with text descriptions before sending to text-only models
- **`_vision_describe_image()`**: Calls vision fallback model to describe a single image, with MD5-based caching to avoid re-describing same URL
- **`_preprocess_vision()`**: Replaces `image_url`/`input_image` blocks in Chat Completions message format with text descriptions when provider lacks vision support
- **`_preprocess_vision_input()`**: Same for Responses API input format — runs BEFORE adapter conversion so images are replaced early
- **Vision error retry**: On HTTP 4xx errors containing image-related keywords, automatically retries with images preprocessed instead of failing
- **Configurable via env vars**: `VISION_FALLBACK_URL`, `VISION_FALLBACK_MODEL`, `VISION_FALLBACK_KEY`
- **ProviderSchema `supports_vision` field**: Auto-detected from error responses and persisted in provider-caps.json
### Critical Fixes
- **`has_content` now includes `function_call`** (v3.11.5 fix): `_observe_event` only checked for `"type": "message"` — when models return only tool calls (no text), `has_content` was `False`, causing Codex to loop infinitely and build context until `context_length_exceeded`. Now checks both `"message"` and `"function_call"`.
- **`has_message`/`has_tool_call` initialized in all 5 locations**: Previous fix added variables inside `_observe_event` closure but missed 4 other `has_content = False` locations, causing `NameError: name 'has_message' is not defined` crashes.
- **Auth config-not-found error handling**: When Codex's `config.toml` is missing or deleted, `codex login status` returns "Error loading configuration: No such file or directory (os error 2)". Now caught specifically (`OSError errno==2`) and returns ("not_configured", "Config missing — launch once to create") with clear GUI guidance.
### Bug Fixes (GUI)
- **Active endpoint sync**: GUI auto-removes stale endpoint references on startup
## v3.11.5 (2026-05-26)
**Vision Filter, Token-Aware Compaction, Universal Adaptive Compaction, Smart-Continue Text Detection**
### Critical Fixes
- **Token-aware compaction for small-context models (FIX)**: `_crof_compact_for_retry()` had an early return at `len(input_data) <= limit` (item count) — if you had 25 items × 1600 tokens = 40K tokens, it skipped compaction entirely because 25 < 30 (the default item limit). Now also checks estimated token count vs learned model max, and compacts when either item count OR token count exceeds limits. Fixes repeated `context_length_exceeded` errors on models like 0G-GLM-5.1 (~35K token context).
- **Proactive compaction now token-aware**: Previously only triggered when item count > 30. Now also triggers when estimated tokens exceed 80% of the model's learned token limit, even if item count is below the threshold. Prevents the first-request failure pattern on small-context models.
- **Compaction aggression threshold**: Changed `est > max_tok` to `est >= max_tok * 0.9` to avoid edge case where estimated tokens exactly equal the limit and compaction is skipped.
- **Removed all `crof.ai` gates from adaptive compaction**: Proactive compaction, `finish_reason=length` retry, `_crof_record`, and compaction logging were gated behind `"crof.ai" in TARGET_URL`. These gates prevented OpenAdapter and other providers from getting proactive/retry compaction, causing repeated `context_length_exceeded` failures. Now applies universally to ALL providers.
### New Features
- **Vision model detection + image stripping**: `_strip_images_from_input()` and `_model_supports_vision()` detect vision capability by model name pattern. Non-vision models (deepseek, glm, mixtral, llama, command, dbrx, qwen, phi-3) have `input_image`/`image_url` parts stripped and replaced with `[User attached image: filename — this model does not support vision]` text notice. Vision models (gpt-4o, gemini, claude, qwen-vl, glm-5v) keep images intact. Applied in 3 paths: main request, context_length_exceeded retry, smart-continue nudge.
- **Token estimation and per-model limit learning**: `_estimate_tokens()`, `_estimate_input_tokens()`, `_get_model_max_tokens()`, `_set_model_max_tokens()`. Extracts `~N tokens` from `context_length_exceeded` error messages and stores per-model token limits. Used by proactive compaction and retry compaction to adjust `keep` count dynamically.
- **Compaction aggression levels**: `_crof_compact_for_retry()` accepts `aggression` parameter (0=normal, 1=extreme). Extreme mode kicks in when estimated tokens > 1.5× the learned limit or on 2nd+ retry attempt. Reduces `keep` count to minimum, ensuring the compacted request fits within model limits.
- **Smart-continue text-tool detection**: Removed hard requirement for `has_function_call_output(input_data)`. Added `_TOOL_CALL_TEXT_PATTERNS` and `_text_looks_like_tool_calls()` to trigger nudging when model outputs text matching tool-call patterns (e.g., `• (exec_command cmd ...)`, `write_to_file`, `exec_command`) even without prior `function_call_output` in context. Essential for models like 0G-GLM-5.1 that never emit real `function_call_output` items.
- **Parenthesized tool call regex**: `_PAREN_TC_RE` pattern to match `• (name args...)` format from non-vision models that output tool calls as parenthesized text.
### GUI Fixes
- **Active endpoint sync**: Added `set_active_endpoint()` and `validate_active_endpoint()` to Linux GTK GUI. Syncs `.active-endpoint.json` with `config.toml` on every launch; auto-removes stale references to deleted providers. Fixed `"Error loading configuration: No such file or directory (os error 2)"` crash when active endpoint referenced a deleted provider.
- **Config state**: `~/.codex/.active-endpoint.json` and `config.toml` model catalog path validated and auto-corrected on GUI startup.
## v3.11.0 (2026-05-26)
**Cobra PR Merge + Smart Continuation + API Key Hot-Reload**
### New Features
- **Concurrency semaphore (max 3)**: limits parallel upstream requests to prevent rate-limiting
- **Auto-continue for truncated text**: detects text ending in `:`, `(`, `;`, `…` or `finish_reason=length`, continues seamlessly
- **SO_REUSEADDR on sticky port**: prevents `TIME_WAIT` from changing port on restart
- **proxy-stderr.log**: persistent log file for proxy errors
- **Stream diagnostics**: logs event count, finish reason, content flag, elapsed time after each stream
- **Timeout/OSError handler**: sends proper `response.failed` SSE event instead of silently dropping connection
- **Restart Proxy button**: now only restarts proxy without killing Codex Desktop
- **Tool call argument normalizer**: fixes capital-A `Arguments` key, strips markdown/JSON code block wrapping from tool call arguments
- **Smart-continue loop (2× retries)**: escalating nudge messages when model returns text-only stop mid-task
- **XML tool call extraction**: parses `<tool_call>name{args}</tool_call>` from model text output, injects as real `function_call` items
- **Auto-continue + smart-continue ordered with skip guard**: prevents both from double-firing on the same response
- **API key hot-reload**: mtime tracking detects config changes, `/admin/reload` endpoint triggers hot-reload, `/admin/verify-key` tests key against upstream
- **GUI hot-reload**: auto-refreshes proxy key on endpoint edit, verifies with upstream — no proxy restart needed
- **Synthetic tool-results disabled**: was causing deepseek-v4-pro truncation on opencode.ai
## v3.10.12 (2026-05-26)
**Sticky Endpoint, Claude Fixes, Guardrail Skip, Anti-Stall**
### New Features
- **Sticky endpoint caching**: remembers which endpoint last succeeded, reuses it on every subsequent request (zero overhead)
- **Sequential fallback**: if sticky endpoint fails (429/502/503), tries next endpoint in order — no parallel probing, no wasted requests
- **Endpoint order**: `cloudcode-pa.googleapis.com` first (matches agy CLI), `daily-cloudcode-pa.googleapis.com` as fallback
- **Anti-stall engine**: kills stale proxy processes and clears `__pycache__` on every new session start
- **Smart error classification**: distinguishes `quota_exhausted` vs `capacity_exhausted` vs `account_banned` vs `validation_required` vs `service_disabled` vs `auth_permanent`
- **Rate limit reset time parsing**: extracts cooldown from error body (`quotaResetDelay`, `Resets in ~1h27m`, etc.) for accurate cooldown
- **Missing Antigravity headers**: `X-Client-Name`, `X-Client-Version`, `x-goog-api-client`, platform-aware `User-Agent`
- **Session ID**: added `sessionId` to request wrapper for proper session tracking
### Bug Fixes (TRAE Agent)
- **Guardrail skip for simple messages**: when user sends simple messages (e.g. "hi"), skip injecting `_GEMINI_AGENT_GUARDRAIL` — prevents model from aggressively calling tools and looping `ls -la` 50+ times
- **Claude tool preservation**: Claude models through Antigravity now keep ALL tool outputs in normalizer (no summarization/truncation) — prevents context loss that broke Claude sessions
- **Claude compaction guard**: `_adaptive_compact` skipped for Claude models — Claude handles its own context, no forced compaction
- **Claude normalizer guard**: `_antigravity_normalize_context` skipped for Claude models — avoids stripping Claude-specific message structure
- **Claude sanitization guard**: Google content sanitization loop skipped for Claude models — prevents mangling Claude's response format
- **Normalizer model parameter**: `_antigravity_normalize_context` now receives `model` param to distinguish Claude vs Gemini behavior
## v3.10.11 (2026-05-26)
**Hybrid Endpoint Fallback — Redundant Antigravity Endpoints**

View File

@@ -130,6 +130,14 @@ A three-component system:
- **Response store TTL** — evicts stored responses older than 10 minutes, prevents memory leaks
- **Bounded stream buffers** — 8MB cap prevents OOM on pathological responses
- **Dual logging** — all proxy messages written to both stderr and `~/.cache/codex-proxy/proxy.log`
- **Vision model detection** (v3.11.5) — automatically strips images for non-vision models (DeepSeek, GLM, Qwen, etc.) and replaces with text notice; vision-capable models (GPT-4o, Gemini, Claude, Qwen-VL) keep images intact
- **Token-aware compaction** (v3.11.5) — learns per-model token limits from `context_length_exceeded` errors; proactively compacts when estimated tokens exceed 80% of limit; prevents repeated context overflow on small-context models (~35K tokens)
- **Universal adaptive compaction** (v3.11.5) — compaction now works for ALL providers (was Crof.ai-only); proactive + retry compaction with aggression levels (normal/extreme)
- **Smart-continue text detection** (v3.11.5) — triggers continuation nudging when model outputs text matching tool-call patterns, essential for text-only models that never emit real `function_call_output` items
- **Antigravity loop breakers** (v3.11.6) — per-session tracking with automatic finalization when same tool+args repeats 5+ times; edit-intent nudge injected only on first turn; latest user instruction appended exactly once per request
- **has_content function_call fix** (v3.11.6) — tool-call-only responses now correctly flagged as having content, preventing infinite loops on OpenAdapter/Z.AI/OpenRouter providers
- **Vision/OCR preprocessing** (v3.11.6) — when provider rejects images, automatically calls a configurable vision fallback API (Kilo.ai) to describe images as text for text-only models; MD5-cached; retries on vision errors with preprocessed text
- **Auth config-missing fix** (v3.11.6) — graceful handling when Codex config.toml is missing instead of showing raw os error
- Zero dependencies — pure Python stdlib
### Command Code Adapter
@@ -554,6 +562,7 @@ The launcher generates model catalog JSON with dual field naming to satisfy both
Codex Launcher includes special handling for Gemini 3 / Antigravity OAuth:
- **Sticky endpoint with parallel discovery**: First request probes `cloudcode-pa.googleapis.com` and `daily-cloudcode-pa.googleapis.com` simultaneously — first 200 wins and is cached. All subsequent requests go straight to the cached endpoint. If it fails (429/502/503), cache is cleared and all endpoints are re-probed in parallel. Zero wasted time on rate-limited endpoints.
- **Thought signature preservation**: Captures `thoughtSignature` from Gemini responses
and reattaches them on follow-up requests to maintain tool-call continuity.
- **Edit-intent detection**: When follow-up requests contain edit keywords, a tool-use
@@ -561,7 +570,7 @@ Codex Launcher includes special handling for Gemini 3 / Antigravity OAuth:
- **User instruction enforcement**: The latest user message is guaranteed to be the
final content turn sent to Gemini, even after compaction.
- **Smart compaction**: Old tool outputs capped at 3000 chars, recent 6 at 20000 chars.
- **Context compaction**: Aggressive auto-trimming when approaching 60% of model context
- **Context compaction**: Aggressive auto-trimming when approaching 80% of model context
limit (1M tokens Gemini, 200K Claude, 128K GPT-OSS). Prevents token limit errors.
- **Model ID mapping**: Display names (e.g. `Gemini 3.5 Flash (High)`) mapped to REST API
slugs (e.g. `gemini-3-flash`). See `docs/ANTIGRAVITY.md` for details.

Binary file not shown.

Binary file not shown.

127
install.ps1 Normal file
View File

@@ -0,0 +1,127 @@
<#
.SYNOPSIS
Codex Launcher Windows Installer
.DESCRIPTION
Installs Codex Launcher for the current user.
.NOTES
Requires: Python 3.8+ (stdlib only, zero pip dependencies).
#>
param(
[switch]$Uninstall
)
$ErrorActionPreference = 'Stop'
$BinDir = Join-Path $env:LOCALAPPDATA 'Programs\Codex-Launcher'
$StartMenu = Join-Path $env:APPDATA 'Microsoft\Windows\Start Menu\Programs'
if ($Uninstall) {
Write-Host 'Uninstalling Codex Launcher...' -ForegroundColor Yellow
if (Test-Path $BinDir) {
Remove-Item -Recurse -Force $BinDir
Write-Host " Removed $BinDir"
}
$shortcut = Join-Path $StartMenu 'Codex Launcher.lnk'
if (Test-Path $shortcut) {
Remove-Item -Force $shortcut
Write-Host ' Removed Start Menu shortcut'
}
$userPath = [Environment]::GetEnvironmentVariable('PATH', 'User')
if ($userPath -like "*$BinDir*") {
$newPath = ($userPath -split ';' | Where-Object { $_ -ne $BinDir }) -join ';'
[Environment]::SetEnvironmentVariable('PATH', $newPath, 'User')
Write-Host ' Removed from PATH'
}
Write-Host 'Uninstall complete.' -ForegroundColor Green
return
}
Write-Host ''
Write-Host ' Codex Launcher - Windows Installer' -ForegroundColor Cyan
Write-Host ' ====================================' -ForegroundColor Cyan
Write-Host ''
# Check Python
$pythonExe = Get-Command python -ErrorAction SilentlyContinue
if (-not $pythonExe) {
$pythonExe = Get-Command python3 -ErrorAction SilentlyContinue
}
if (-not $pythonExe) {
Write-Host 'ERROR: Python not found. Install Python 3.8+ and add to PATH.' -ForegroundColor Red
exit 1
}
Write-Host " Python: $($pythonExe.Source)" -ForegroundColor Gray
# Create install directory
New-Item -ItemType Directory -Force -Path $BinDir | Out-Null
# Copy files
$srcDir = Join-Path $PSScriptRoot 'src'
$files = @(
'translate-proxy.py',
'codex-launcher-gui.py',
'codex_launcher_lib.py',
'cleanup-codex-stale.py'
)
foreach ($file in $files) {
$src = Join-Path $srcDir $file
if (Test-Path $src) {
Copy-Item -Force $src $BinDir
Write-Host " Installed: $file" -ForegroundColor Green
} else {
Write-Host " WARNING: $file not found in src/" -ForegroundColor Yellow
}
}
# Create Start Menu shortcut
$WshShell = New-Object -ComObject WScript.Shell
$shortcutPath = Join-Path $StartMenu 'Codex Launcher.lnk'
$Shortcut = $WshShell.CreateShortcut($shortcutPath)
# Find pythonw.exe for no-console launch
$pythonw = Get-Command pythonw -ErrorAction SilentlyContinue
if (-not $pythonw) {
$pythonDir = Split-Path $pythonExe.Source
$pythonwCandidate = Join-Path $pythonDir 'pythonw.exe'
if (Test-Path $pythonwCandidate) {
$pythonw = $pythonwCandidate
}
}
if ($pythonw) {
$targetPath = if ($pythonw.Source) { $pythonw.Source } else { $pythonw }
} else {
$targetPath = $pythonExe.Source
}
$Shortcut.TargetPath = $targetPath
$guiPath = Join-Path $BinDir 'codex-launcher-gui.py'
$Shortcut.Arguments = $guiPath
$Shortcut.WorkingDirectory = $BinDir
$Shortcut.Description = 'Launch Codex Desktop with any AI provider'
$Shortcut.Save()
Write-Host ' Created Start Menu shortcut' -ForegroundColor Green
# Add to PATH
$userPath = [Environment]::GetEnvironmentVariable('PATH', 'User')
if ($userPath -notlike "*$BinDir*") {
$newUserPath = $userPath + ';' + $BinDir
[Environment]::SetEnvironmentVariable('PATH', $newUserPath, 'User')
$env:PATH = $env:PATH + ';' + $BinDir
Write-Host ' Added to user PATH' -ForegroundColor Green
}
# Verify
Write-Host ''
Write-Host ' Installation complete!' -ForegroundColor Cyan
Write-Host " Install dir: $BinDir" -ForegroundColor Gray
Write-Host ''
Write-Host ' Launch options:' -ForegroundColor White
Write-Host ' Start Menu: Codex Launcher' -ForegroundColor Gray
Write-Host ' Command: codex-launcher-gui.py' -ForegroundColor Gray
Write-Host ' Uninstall: powershell -File install.ps1 -Uninstall' -ForegroundColor Gray
Write-Host ''

View File

@@ -3,11 +3,13 @@ set -e
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
if [ -f "$SCRIPT_DIR/codex-launcher_3.10.11_all.deb" ]; then
echo "Installing codex-launcher_3.10.11_all.deb ..."
sudo dpkg -i "$SCRIPT_DIR/codex-launcher_3.10.11_all.deb"
echo ""
echo "Installed v3.10.11 via .deb package."
if [ -f "$SCRIPT_DIR/codex-launcher_3.11.6_all.deb" ]; then
echo "Installing codex-launcher_3.11.6_all.deb ..."
sudo dpkg -i "$SCRIPT_DIR/codex-launcher_3.11.6_all.deb"
else
echo "WARNING: codex-launcher_3.11.6_all.deb not found; copying files manually."
fi
echo "Installed v3.11.6 via .deb package."
echo " translate-proxy.py -> /usr/bin/translate-proxy.py"
echo " codex-launcher-gui -> /usr/bin/codex-launcher-gui"
echo " cleanup-codex-stale -> /usr/bin/cleanup-codex-stale.sh"

View File

@@ -0,0 +1,24 @@
"""
antigravity_grpc — gRPC fallback client for Google CloudCode (Antigravity).
When the REST API rejects a request (404 model not found, 400 bad request due to
model ID mismatch, etc.), this module provides a gRPC fallback path that uses
Google's native PredictionService protocol — the same one the agy CLI uses.
This module is imported lazily and only when grpcio is installed. If grpcio is
not available, the fallback is silently skipped.
"""
from .client import (
GrpcFallbackResult,
AntigravityGrpcClient,
is_grpc_available,
get_client,
)
__all__ = [
"GrpcFallbackResult",
"AntigravityGrpcClient",
"is_grpc_available",
"get_client",
]

View File

@@ -0,0 +1,609 @@
"""
antigravity_grpc.client — gRPC fallback client for Google CloudCode (Antigravity).
This module provides a gRPC client that can be used as an automatic fallback when
the CloudCode REST API rejects requests. The gRPC path uses the same
PredictionService that the native agy CLI binary uses, giving access to models
that are unavailable via REST (e.g. models that return 404 on REST but work on gRPC).
Key design decisions:
- Lazy import: grpcio is only imported when actually needed. If not installed,
is_grpc_available() returns False and the fallback is silently skipped.
- Zero impact on other providers: this module is only called from
_handle_antigravity_v2() when REST returns a fallback-eligible error.
- Same output format as REST: the client returns structured dicts that match
the SSE/JSON response shapes the proxy already processes.
- Thread-safe: the gRPC channel is created once per endpoint and reused.
Usage from translate-proxy.py:
from antigravity_grpc import is_grpc_available, AntigravityGrpcClient
if is_grpc_available():
client = AntigravityGrpcClient()
result = client.try_generate(request_dict, stream=False)
if result.ok:
# Use result.response_data (dict matching REST response shape)
else:
# gRPC also failed, fall through to error
"""
import json
import os
import sys
import time
import threading
import collections
# ═══════════════════════════════════════════════════════════════════
# Lazy gRPC import — never crash if grpcio is missing
# ═══════════════════════════════════════════════════════════════════
_grpc = None
_pb2 = None
_pb2_grpc = None
_import_error = None
def _try_import():
global _grpc, _pb2, _pb2_grpc, _import_error
if _grpc is not None:
return _grpc is not False
try:
import grpc as _real_grpc
# Import the generated stubs relative to this package
from . import cloudcode_pb2 as _real_pb2
from . import cloudcode_pb2_grpc as _real_pb2_grpc
_grpc = _real_grpc
_pb2 = _real_pb2
_pb2_grpc = _real_pb2_grpc
return True
except Exception as e:
_import_error = str(e)
_grpc = False
return False
def is_grpc_available():
"""Return True if grpcio and the generated stubs are importable."""
return _try_import()
# ═══════════════════════════════════════════════════════════════════
# gRPC endpoints for Antigravity (same hosts, different port/path)
# ═══════════════════════════════════════════════════════════════════
# The CloudCode gRPC service runs on the same hosts as REST but uses
# the gRPC protocol. The agy CLI connects to:
# - cloudcode-pa.googleapis.com:443
# - daily-cloudcode-pa.googleapis.com:443
# - daily-cloudcode-pa.sandbox.googleapis.com:443
_GRPC_ENDPOINTS = [
"daily-cloudcode-pa.googleapis.com:443",
"cloudcode-pa.googleapis.com:443",
]
_ALLOW_STAGING_ENV = "ALLOW_ANTIGRAVITY_STAGING"
# ═══════════════════════════════════════════════════════════════════
# Result type
# ═══════════════════════════════════════════════════════════════════
class GrpcFallbackResult:
"""Result of a gRPC fallback attempt."""
__slots__ = ("ok", "response_data", "stream_chunks", "error_message",
"endpoint_used", "model_used", "elapsed_s")
def __init__(self, ok=False, response_data=None, stream_chunks=None,
error_message="", endpoint_used="", model_used="", elapsed_s=0.0):
self.ok = ok
self.response_data = response_data # dict (non-streaming)
self.stream_chunks = stream_chunks # list[dict] (streaming)
self.error_message = error_message
self.endpoint_used = endpoint_used
self.model_used = model_used
self.elapsed_s = elapsed_s
def __repr__(self):
if self.ok:
if self.stream_chunks is not None:
return f"<GrpcFallbackResult OK stream chunks={len(self.stream_chunks)}>"
return f"<GrpcFallbackResult OK data_keys={list(self.response_data.keys()) if self.response_data else None}>"
return f"<GrpcFallbackResult FAIL error={self.error_message!r}>"
# ═══════════════════════════════════════════════════════════════════
# JSON → Protobuf conversion helpers
# ═══════════════════════════════════════════════════════════════════
def _struct_to_protobuf(d, struct_obj=None):
"""Convert a Python dict to a google.protobuf.Struct."""
from google.protobuf.struct_pb2 import Struct, Value, NullValue, ListValue
if struct_obj is None:
struct_obj = Struct()
if isinstance(d, dict):
for k, v in d.items():
if isinstance(v, str):
struct_obj.fields[k].string_value = v
elif isinstance(v, bool):
struct_obj.fields[k].bool_value = v
elif isinstance(v, int):
struct_obj.fields[k].number_value = float(v)
elif isinstance(v, float):
struct_obj.fields[k].number_value = v
elif isinstance(v, dict):
_struct_to_protobuf(v, struct_obj.fields[k].struct_value)
elif isinstance(v, list):
lst = struct_obj.fields[k].list_value
for item in v:
if isinstance(item, str):
lst.values.add().string_value = item
elif isinstance(item, bool):
lst.values.add().bool_value = item
elif isinstance(item, (int, float)):
lst.values.add().number_value = float(item)
elif isinstance(item, dict):
_struct_to_protobuf(item, lst.values.add().struct_value)
elif item is None:
lst.values.add().null_value = 0
elif v is None:
struct_obj.fields[k].null_value = 0
return struct_obj
def _protobuf_struct_to_dict(struct):
"""Convert a google.protobuf.Struct to a Python dict."""
from google.protobuf.struct_pb2 import Value, NullValue
result = {}
for k, v in struct.fields.items():
kind = v.WhichOneof("kind")
if kind == "null_value":
result[k] = None
elif kind == "number_value":
result[k] = v.number_value
elif kind == "string_value":
result[k] = v.string_value
elif kind == "bool_value":
result[k] = v.bool_value
elif kind == "struct_value":
result[k] = _protobuf_struct_to_dict(v.struct_value)
elif kind == "list_value":
result[k] = [_value_to_python(item) for item in v.list_value.values]
else:
result[k] = None
return result
def _value_to_python(v):
"""Convert a google.protobuf.Value to a Python value."""
kind = v.WhichOneof("kind")
if kind == "null_value":
return None
elif kind == "number_value":
return v.number_value
elif kind == "string_value":
return v.string_value
elif kind == "bool_value":
return v.bool_value
elif kind == "struct_value":
return _protobuf_struct_to_dict(v.struct_value)
elif kind == "list_value":
return [_value_to_python(item) for item in v.list_value.values]
return None
def _json_parts_to_proto(parts_json):
"""Convert a list of JSON content parts to protobuf Part messages."""
result = []
for p in parts_json:
if not isinstance(p, dict):
continue
part = _pb2.Part()
# Thought signature
sig = p.get("thoughtSignature") or p.get("thought_signature")
if sig:
part.thought_signature = sig
if p.get("thought"):
part.thought = True
if "text" in p:
part.text = p["text"]
elif "text" in p and "functionCall" not in p:
part.text = p["text"]
elif "functionCall" in p:
fc = p["functionCall"]
part.function_call.name = fc.get("name", "")
part.function_call.id = fc.get("id", "")
args = fc.get("args", fc.get("arguments", {}))
if isinstance(args, dict):
_struct_to_protobuf(args, part.function_call.args)
elif isinstance(args, str):
try:
_struct_to_protobuf(json.loads(args), part.function_call.args)
except Exception:
pass
elif "functionResponse" in p:
fr = p["functionResponse"]
part.function_response.name = fr.get("name", "")
part.function_response.id = fr.get("id", "")
resp = fr.get("response", {})
if "result" in resp:
result_val = resp["result"]
if isinstance(result_val, (dict, list)):
_struct_to_protobuf({"result": result_val}, part.function_response.response)
else:
_struct_to_protobuf({"result": str(result_val)}, part.function_response.response)
elif isinstance(resp, dict):
_struct_to_protobuf(resp, part.function_response.response)
elif "inlineData" in p:
idata = p["inlineData"]
import base64
part.inline_data.mime_type = idata.get("mimeType", "image/png")
b64data = idata.get("data", "")
part.inline_data.data = base64.b64decode(b64data) if b64data else b""
result.append(part)
return result
def _json_contents_to_proto(contents_json):
"""Convert a list of JSON content objects to protobuf Content messages."""
result = []
for c in contents_json:
if not isinstance(c, dict):
continue
content = _pb2.Content()
content.role = c.get("role", "user")
for part in _json_parts_to_proto(c.get("parts", [])):
content.parts.append(part)
result.append(content)
return result
def _proto_candidate_to_json(candidate):
"""Convert a protobuf Candidate to a JSON-compatible dict."""
content_json = {"role": candidate.content.role, "parts": []}
for part in candidate.content.parts:
p = {}
if part.thought_signature:
p["thoughtSignature"] = part.thought_signature
if part.thought:
p["thought"] = True
if part.text:
p["text"] = part.text
elif part.text and not part.HasField("function_call"):
p["text"] = part.text
elif part.HasField("function_call"):
fc = part.function_call
args_dict = _protobuf_struct_to_dict(fc.args) if fc.HasField("args") else {}
p["functionCall"] = {
"name": fc.name,
"args": args_dict,
"id": fc.id,
}
elif part.HasField("function_response"):
fr = part.function_response
resp_dict = _protobuf_struct_to_dict(fr.response) if fr.HasField("response") else {}
p["functionResponse"] = {
"name": fr.name,
"response": resp_dict,
"id": fr.id,
}
elif part.HasField("inline_data"):
import base64
p["inlineData"] = {
"mimeType": part.inline_data.mime_type,
"data": base64.b64encode(part.inline_data.data).decode(),
}
if p:
content_json["parts"].append(p)
return {
"content": content_json,
"finishReason": candidate.finish_reason,
"index": candidate.index,
}
# ═══════════════════════════════════════════════════════════════════
# Client
# ═══════════════════════════════════════════════════════════════════
class AntigravityGrpcClient:
"""
gRPC fallback client for Google CloudCode Antigravity.
Thread-safe. Channels are cached per endpoint and reused.
"""
def __init__(self):
self._channels = {}
self._stubs = {}
self._lock = threading.Lock()
def _get_channel(self, endpoint):
"""Get or create a gRPC channel for the given endpoint."""
with self._lock:
if endpoint not in self._channels:
# Use secure channel with default SSL credentials
creds = _grpc.ssl_channel_credentials()
channel = _grpc.secure_channel(endpoint, creds)
self._channels[endpoint] = channel
self._stubs[endpoint] = _pb2_grpc.PredictionServiceStub(channel)
return self._channels[endpoint], self._stubs[endpoint]
def _build_request(self, wrapped_dict):
"""
Build a GenerateContentRequest protobuf from the same wrapped dict
that the REST API uses.
wrapped_dict shape:
{
"project": "...",
"model": "...",
"requestType": "agent",
"userAgent": "antigravity/...",
"requestId": "agent-...",
"request": {
"contents": [...],
"systemInstruction": {...},
"generationConfig": {...},
"tools": [...],
"safetySettings": [...],
"toolConfig": {...},
"sessionId": "..."
}
}
"""
req = _pb2.GenerateContentRequest()
req.project = wrapped_dict.get("project", "")
req.model = wrapped_dict.get("model", "")
req.request_type = wrapped_dict.get("requestType", "agent")
req.user_agent = wrapped_dict.get("userAgent", "")
req.request_id = wrapped_dict.get("requestId", "")
inner = wrapped_dict.get("request", {})
# Contents
for c in _json_contents_to_proto(inner.get("contents", [])):
req.request.contents.append(c)
# System instruction
si = inner.get("systemInstruction", {})
if si:
si_parts = si.get("parts", [])
if si.get("role"):
req.request.system_instruction.role = si.get("role", "user")
for part in _json_parts_to_proto(si_parts):
req.request.system_instruction.parts.append(part)
# Generation config
gc = inner.get("generationConfig", {})
if gc:
cfg = req.request.generation_config
if "maxOutputTokens" in gc:
cfg.max_output_tokens = int(gc["maxOutputTokens"])
if "temperature" in gc:
cfg.temperature = float(gc["temperature"])
if "topP" in gc:
cfg.top_p = float(gc["top_p" if "top_p" in gc else "topP"])
for ss in gc.get("stopSequences", []):
cfg.stop_sequences.append(ss)
# Thinking config (Gemini 3 native)
tc = gc.get("thinkingConfig", gc.get("thinking_config"))
if tc:
cfg.thinking_config.include_thoughts = tc.get("includeThoughts", tc.get("include_thoughts", False))
cfg.thinking_config.thinking_budget = int(tc.get("thinkingBudget", tc.get("thinking_budget", 8192)))
# Legacy thinking fields
if "includeThoughts" in gc and not tc:
cfg.thinking_config.include_thoughts = gc["includeThoughts"]
if "thinkingBudget" in gc and not tc:
cfg.thinking_config.thinking_budget = int(gc["thinkingBudget"])
# Tools
for tool_json in inner.get("tools", []):
tool = _pb2.Tool()
for fd_json in tool_json.get("functionDeclarations", []):
fd = tool.function_declarations.add()
fd.name = fd_json.get("name", "")
fd.description = fd_json.get("description", "")
params = fd_json.get("parameters", {})
if isinstance(params, dict) and params:
_struct_to_protobuf(params, fd.parameters)
req.request.tools.append(tool)
# Safety settings
for ss in inner.get("safetySettings", []):
ss_msg = _pb2.SafetySetting()
ss_msg.category = ss.get("category", "")
ss_msg.threshold = ss.get("threshold", "OFF")
req.request.safety_settings.append(ss_msg)
# Tool config
tcfg = inner.get("toolConfig", {})
if tcfg:
fcc = tcfg.get("functionCallingConfig", {})
if fcc:
req.request.tool_config.function_calling_config.mode = fcc.get("mode", "AUTO")
for afn in fcc.get("allowed_function_names", []):
req.request.tool_config.function_calling_config.allowed_function_names.append(afn)
# Session ID
sid = inner.get("sessionId", "")
if sid:
req.request.session_id = sid
return req
def try_generate(self, wrapped_dict, stream=False, access_token="",
timeout_s=180):
"""
Try a gRPC GenerateContent or StreamGenerateContent request.
Args:
wrapped_dict: The same wrapped dict used for REST requests.
stream: If True, use server-streaming RPC.
access_token: OAuth2 Bearer token for authentication.
timeout_s: Request timeout in seconds.
Returns:
GrpcFallbackResult with ok=True if successful.
For non-streaming: result.response_data is a dict matching
the REST JSON response shape.
For streaming: result.stream_chunks is a list of dicts matching
REST SSE chunk shapes.
"""
if not is_grpc_available():
return GrpcFallbackResult(ok=False, error_message="grpcio not installed")
t0 = time.time()
# Build metadata (gRPC uses metadata instead of HTTP headers)
metadata = []
if access_token:
metadata.append(("authorization", f"Bearer {access_token}"))
ua = wrapped_dict.get("userAgent", "")
if ua:
metadata.append(("user-agent", ua))
metadata.append(("x-client-name", "antigravity"))
# Required for Google's gRPC gateway
metadata.append(("x-goog-api-client", "gl-node/18.18.2 fire/0.8.6 grpc/1.10.x"))
# Build endpoints list
endpoints = list(_GRPC_ENDPOINTS)
if os.environ.get(_ALLOW_STAGING_ENV, "0") == "1":
endpoints.append("daily-cloudcode-pa.sandbox.googleapis.com:443")
endpoints.append("autopush-cloudcode-pa.sandbox.googleapis.com:443")
model = wrapped_dict.get("model", "?")
last_error = ""
for ep in endpoints:
try:
channel, stub = self._get_channel(ep)
req = self._build_request(wrapped_dict)
if stream:
return self._do_stream(stub, req, metadata, ep, model,
timeout_s, t0)
else:
return self._do_unary(stub, req, metadata, ep, model,
timeout_s, t0)
except Exception as e:
last_error = str(e)
err_str = last_error.lower()
print(f"[antigravity-grpc] {ep} failed: {last_error[:300]}", file=sys.stderr)
# Don't retry on auth errors
if "unauthenticated" in err_str or "permission" in err_str:
break
# Don't retry on invalid argument (model truly doesn't exist)
if "not_found" in err_str or "not found" in err_str:
break
continue
elapsed = time.time() - t0
return GrpcFallbackResult(
ok=False,
error_message=f"All gRPC endpoints failed: {last_error}",
model_used=model,
elapsed_s=elapsed,
)
def _do_unary(self, stub, req, metadata, endpoint, model, timeout_s, t0):
"""Execute a unary (non-streaming) gRPC call."""
response = stub.GenerateContent(
req,
metadata=metadata,
timeout=timeout_s,
)
elapsed = time.time() - t0
# Convert protobuf response to REST-compatible JSON shape
candidates_json = []
for candidate in response.response.candidates:
candidates_json.append(_proto_candidate_to_json(candidate))
# Match the REST response envelope:
# { "response": { "candidates": [...] } }
rest_shape = {
"response": {
"candidates": candidates_json,
}
}
print(f"[antigravity-grpc] {endpoint} unary OK, candidates={len(candidates_json)}, elapsed={elapsed:.1f}s", file=sys.stderr)
return GrpcFallbackResult(
ok=True,
response_data=rest_shape,
endpoint_used=endpoint,
model_used=model,
elapsed_s=elapsed,
)
def _do_stream(self, stub, req, metadata, endpoint, model, timeout_s, t0):
"""Execute a server-streaming gRPC call."""
chunks = []
chunk_count = 0
response_iter = stub.StreamGenerateContent(
req,
metadata=metadata,
timeout=timeout_s,
)
for chunk_proto in response_iter:
chunk_count += 1
# Each chunk_proto is a StreamGenerateContentChunk
# which wraps a Response with candidates
candidates_json = []
for candidate in chunk_proto.response.candidates:
candidates_json.append(_proto_candidate_to_json(candidate))
# Match REST SSE chunk shape: { "response": { "candidates": [...] } }
chunk_json = {
"response": {
"candidates": candidates_json,
}
}
chunks.append(chunk_json)
elapsed = time.time() - t0
print(f"[antigravity-grpc] {endpoint} stream OK, chunks={chunk_count}, elapsed={elapsed:.1f}s", file=sys.stderr)
return GrpcFallbackResult(
ok=True,
stream_chunks=chunks,
endpoint_used=endpoint,
model_used=model,
elapsed_s=elapsed,
)
def close(self):
"""Close all gRPC channels."""
with self._lock:
for ep, channel in self._channels.items():
try:
channel.close()
except Exception:
pass
self._channels.clear()
self._stubs.clear()
# ═══════════════════════════════════════════════════════════════════
# Module-level singleton
# ═══════════════════════════════════════════════════════════════════
_client = None
_client_lock = threading.Lock()
def get_client():
"""Get the module-level AntigravityGrpcClient singleton."""
global _client
with _client_lock:
if _client is None:
_client = AntigravityGrpcClient()
return _client

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,275 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import warnings
from antigravity_grpc import cloudcode_pb2 as cloudcode__pb2
GRPC_GENERATED_VERSION = '1.80.0'
GRPC_VERSION = grpc.__version__
_version_not_supported = False
try:
from grpc._utilities import first_version_is_lower
_version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION)
except ImportError:
_version_not_supported = True
if _version_not_supported:
raise RuntimeError(
f'The grpc package installed is at version {GRPC_VERSION},'
+ ' but the generated code in cloudcode_pb2_grpc.py depends on'
+ f' grpcio>={GRPC_GENERATED_VERSION}.'
+ f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}'
+ f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.'
)
class PredictionServiceStub(object):
"""─── Service ──────────────────────────────────────────────────────────
"""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.GenerateContent = channel.unary_unary(
'/google.internal.cloud.code.v1internal.PredictionService/GenerateContent',
request_serializer=cloudcode__pb2.GenerateContentRequest.SerializeToString,
response_deserializer=cloudcode__pb2.GenerateContentResponse.FromString,
_registered_method=True)
self.StreamGenerateContent = channel.unary_stream(
'/google.internal.cloud.code.v1internal.PredictionService/StreamGenerateContent',
request_serializer=cloudcode__pb2.GenerateContentRequest.SerializeToString,
response_deserializer=cloudcode__pb2.StreamGenerateContentChunk.FromString,
_registered_method=True)
self.FetchAvailableModels = channel.unary_unary(
'/google.internal.cloud.code.v1internal.PredictionService/FetchAvailableModels',
request_serializer=cloudcode__pb2.FetchAvailableModelsRequest.SerializeToString,
response_deserializer=cloudcode__pb2.FetchAvailableModelsResponse.FromString,
_registered_method=True)
self.CountTokens = channel.unary_unary(
'/google.internal.cloud.code.v1internal.PredictionService/CountTokens',
request_serializer=cloudcode__pb2.CountTokensRequest.SerializeToString,
response_deserializer=cloudcode__pb2.CountTokensResponse.FromString,
_registered_method=True)
self.RetrieveUserQuota = channel.unary_unary(
'/google.internal.cloud.code.v1internal.PredictionService/RetrieveUserQuota',
request_serializer=cloudcode__pb2.RetrieveUserQuotaRequest.SerializeToString,
response_deserializer=cloudcode__pb2.RetrieveUserQuotaResponse.FromString,
_registered_method=True)
class PredictionServiceServicer(object):
"""─── Service ──────────────────────────────────────────────────────────
"""
def GenerateContent(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def StreamGenerateContent(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def FetchAvailableModels(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def CountTokens(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def RetrieveUserQuota(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_PredictionServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
'GenerateContent': grpc.unary_unary_rpc_method_handler(
servicer.GenerateContent,
request_deserializer=cloudcode__pb2.GenerateContentRequest.FromString,
response_serializer=cloudcode__pb2.GenerateContentResponse.SerializeToString,
),
'StreamGenerateContent': grpc.unary_stream_rpc_method_handler(
servicer.StreamGenerateContent,
request_deserializer=cloudcode__pb2.GenerateContentRequest.FromString,
response_serializer=cloudcode__pb2.StreamGenerateContentChunk.SerializeToString,
),
'FetchAvailableModels': grpc.unary_unary_rpc_method_handler(
servicer.FetchAvailableModels,
request_deserializer=cloudcode__pb2.FetchAvailableModelsRequest.FromString,
response_serializer=cloudcode__pb2.FetchAvailableModelsResponse.SerializeToString,
),
'CountTokens': grpc.unary_unary_rpc_method_handler(
servicer.CountTokens,
request_deserializer=cloudcode__pb2.CountTokensRequest.FromString,
response_serializer=cloudcode__pb2.CountTokensResponse.SerializeToString,
),
'RetrieveUserQuota': grpc.unary_unary_rpc_method_handler(
servicer.RetrieveUserQuota,
request_deserializer=cloudcode__pb2.RetrieveUserQuotaRequest.FromString,
response_serializer=cloudcode__pb2.RetrieveUserQuotaResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'google.internal.cloud.code.v1internal.PredictionService', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
server.add_registered_method_handlers('google.internal.cloud.code.v1internal.PredictionService', rpc_method_handlers)
# This class is part of an EXPERIMENTAL API.
class PredictionService(object):
"""─── Service ──────────────────────────────────────────────────────────
"""
@staticmethod
def GenerateContent(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/google.internal.cloud.code.v1internal.PredictionService/GenerateContent',
cloudcode__pb2.GenerateContentRequest.SerializeToString,
cloudcode__pb2.GenerateContentResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def StreamGenerateContent(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_stream(
request,
target,
'/google.internal.cloud.code.v1internal.PredictionService/StreamGenerateContent',
cloudcode__pb2.GenerateContentRequest.SerializeToString,
cloudcode__pb2.StreamGenerateContentChunk.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def FetchAvailableModels(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/google.internal.cloud.code.v1internal.PredictionService/FetchAvailableModels',
cloudcode__pb2.FetchAvailableModelsRequest.SerializeToString,
cloudcode__pb2.FetchAvailableModelsResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def CountTokens(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/google.internal.cloud.code.v1internal.PredictionService/CountTokens',
cloudcode__pb2.CountTokensRequest.SerializeToString,
cloudcode__pb2.CountTokensResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def RetrieveUserQuota(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/google.internal.cloud.code.v1internal.PredictionService/RetrieveUserQuota',
cloudcode__pb2.RetrieveUserQuotaRequest.SerializeToString,
cloudcode__pb2.RetrieveUserQuotaResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)

View File

@@ -0,0 +1,183 @@
// Copyright 2026 Codex Launcher Contributors
// SPDX-License-Identifier: MIT
//
// CloudCode internal gRPC service definitions.
// Reverse-engineered from the agy-core binary for Antigravity proxy fallback.
// Service: google.internal.cloud.code.v1internal.PredictionService
//
// NOTE: google/api/annotations.proto is NOT imported here because it conflicts
// with the google namespace package at runtime. The HTTP annotations are only
// needed for Google's Envoy/gRPC-gateway and are unnecessary for our client.
syntax = "proto3";
package google.internal.cloud.code.v1internal;
import "google/protobuf/struct.proto";
option go_package = "google.golang.org/internal/cloud/code/v1internal";
// ─── Reused message types ───────────────────────────────────────────
message Content {
string role = 1;
repeated Part parts = 2;
}
message Part {
oneof data {
string text = 1;
InlineData inline_data = 2;
FunctionCall function_call = 3;
FunctionResponse function_response = 4;
}
// Thought signature for Gemini continuity
string thought_signature = 10;
// Thought part (reasoning)
bool thought = 11;
}
message InlineData {
string mime_type = 1;
bytes data = 2;
}
message FunctionCall {
string name = 1;
google.protobuf.Struct args = 2;
string id = 3;
}
message FunctionResponse {
string name = 1;
google.protobuf.Struct response = 2;
string id = 3;
}
message SafetySetting {
string category = 1;
string threshold = 2;
}
message GenerationConfig {
int32 max_output_tokens = 1;
float temperature = 2;
float top_p = 3;
int32 thinking_budget = 4;
bool include_thoughts = 5;
repeated string stop_sequences = 6;
message ThinkingConfig {
bool include_thoughts = 1;
int32 thinking_budget = 2;
}
ThinkingConfig thinking_config = 7;
}
message Tool {
repeated FunctionDeclaration function_declarations = 1;
}
message FunctionDeclaration {
string name = 1;
string description = 2;
google.protobuf.Struct parameters = 3;
}
message ToolConfig {
message FunctionCallingConfig {
string mode = 1; // "AUTO", "ANY", "NONE", "VALIDATED"
repeated string allowed_function_names = 2;
}
FunctionCallingConfig function_calling_config = 1;
}
message Candidate {
Content content = 1;
string finish_reason = 2;
int32 index = 3;
}
// ─── GenerateContent ─────────────────────────────────────────────────
message GenerateContentRequest {
string project = 1;
string model = 2;
string request_type = 3;
string user_agent = 4;
string request_id = 5;
message InnerRequest {
repeated Content contents = 1;
Content system_instruction = 2;
GenerationConfig generation_config = 3;
repeated Tool tools = 4;
repeated SafetySetting safety_settings = 5;
ToolConfig tool_config = 6;
string session_id = 7;
}
InnerRequest request = 10;
}
message GenerateContentResponse {
message Response {
repeated Candidate candidates = 1;
}
Response response = 1;
}
// ─── StreamGenerateContent ────────────────────────────────────────────
message StreamGenerateContentChunk {
GenerateContentResponse.Response response = 1;
}
// ─── FetchAvailableModels ────────────────────────────────────────────
message FetchAvailableModelsRequest {
string project = 1;
}
message FetchAvailableModelsResponse {
message ModelInfo {
string name = 1;
string display_name = 2;
string description = 3;
int64 context_window = 4;
}
repeated ModelInfo models = 1;
}
// ─── CountTokens ──────────────────────────────────────────────────────
message CountTokensRequest {
string project = 1;
string model = 2;
repeated Content contents = 3;
}
message CountTokensResponse {
int32 total_tokens = 1;
}
// ─── RetrieveUserQuota ───────────────────────────────────────────────
message RetrieveUserQuotaRequest {
string project = 1;
}
message RetrieveUserQuotaResponse {
int64 daily_limit = 1;
int64 daily_usage = 2;
int64 daily_remaining = 3;
}
// ─── Service ──────────────────────────────────────────────────────────
service PredictionService {
rpc GenerateContent(GenerateContentRequest) returns (GenerateContentResponse);
rpc StreamGenerateContent(GenerateContentRequest) returns (stream StreamGenerateContentChunk);
rpc FetchAvailableModels(FetchAvailableModelsRequest) returns (FetchAvailableModelsResponse);
rpc CountTokens(CountTokensRequest) returns (CountTokensResponse);
rpc RetrieveUserQuota(RetrieveUserQuotaRequest) returns (RetrieveUserQuotaResponse);
}

View File

@@ -0,0 +1,14 @@
// Minimal google/api/annotations.proto for code generation.
syntax = "proto3";
package google.api;
import "google/api/http.proto";
import "google/protobuf/descriptor.proto";
option go_package = "google.golang.org/genproto/googleapis/api/annotations";
extend google.protobuf.MethodOptions {
HttpRule http = 72295728;
}

View File

@@ -0,0 +1,18 @@
// Minimal google/api/http.proto for code generation.
syntax = "proto3";
package google.api;
option go_package = "google.golang.org/genproto/googleapis/api/annotations";
message HttpRule {
string get = 1;
string put = 2;
string post = 3;
string delete = 4;
string patch = 5;
repeated HttpRule additional_bindings = 11;
string body = 7;
string response_body = 12;
}

View File

@@ -20,12 +20,65 @@ BGP_POOLS_FILE = HOME / ".codex/bgp-pools.json"
LOG_DIR = HOME / ".cache/codex-desktop"
LAUNCH_LOG = LOG_DIR / "launcher.log"
PROXY_CONFIG_DIR = HOME / ".cache/codex-proxy"
ACTIVE_ENDPOINT_FILE = HOME / ".codex/.active-endpoint.json"
DEFAULT_CONFIG = """model = ""
model_provider = ""
model_catalog_json = ""
"""
CHANGELOG = [
("3.12.0", "2026-05-27", [
"gRPC auto-fallback for Antigravity (PR #13)",
"Dynamic version fetch with probe validation",
"Antigravity v2 handler rewrite (anti-api)",
]),
("3.11.10", "2026-05-26", [
"Fix Antigravity: interleave function_call/output pairs (PR #11)",
"Gemini sanitizer: trim non-user turns for Google API compliance",
]),
("3.11.9", "2026-05-26", [
"Fix Antigravity: preserve functionCall/functionResponse (PR #10)",
"Prevents tool responses from being dropped in multi-turn sessions",
]),
("3.11.8", "2026-05-26", [
"Vision cache persisted across requests (PR #8 merge)",
"No redundant vision API calls for same image URL",
]),
("3.11.7", "2026-05-26", [
"Vision auto-detect: uses provider's vision model for image description",
"Vision preprocessing replaces image stripping",
"Fix AttributeError in image_url string handling",
"Merge PR #6: vision/OCR preprocessing, PR #7: 177 unit tests",
"Auth os error 2 fix: proper config-missing message in GUI",
]),
("3.11.6", "2026-05-26", [
"Antigravity loop breakers: per-session tracking, repeated tool detection",
"has_content fix: function_call counts as valid output",
"Latest user instruction appended once per request for Antigravity",
"Antigravity-only changes, no touch to other providers",
]),
("3.11.5", "2026-05-26", [
"Token-aware compaction: fixes context_length_exceeded on small-context models",
"Proactive compaction triggers on token count, not just item count",
"Universal adaptive compaction for all providers (removed crof.ai gates)",
"Vision model detection + image stripping for non-vision models",
"Per-model token limit learning from error messages",
"Smart-continue text-tool detection for text-only models",
"Active endpoint sync: auto-removes stale references on startup",
]),
("3.11.0", "2026-05-26", [
"Merge cobra PR: concurrency semaphore (max 3), auto-continue for truncated text",
"SO_REUSEADDR on sticky port, proxy-stderr.log, stream diagnostics logging",
"Timeout/OSError handler sends response.failed SSE instead of silent drop",
"Restart Proxy button: only restarts proxy without killing Codex Desktop",
"Tool call argument normalizer: fixes Arguments→arguments, strips markdown wrapping",
"Smart-continue loop (2× retries): escalating nudges when model stops text-only mid-task",
"XML tool call extraction: parses <name> patterns from text, injects as real calls",
"Auto-continue + smart-continue ordered with skip guard to avoid double-firing",
"API key hot-reload with mtime tracking + /admin/reload + /admin/verify-key endpoints",
"GUI hot-reload: auto-refreshes proxy key on endpoint edit, verifies with upstream",
"Synthetic tool-results disabled: was causing deepseek-v4-pro truncation on opencode.ai",
]),
("3.10.4", "2026-05-25", [
"OAuth Secrets editor in GUI — update client ID/secret without editing files",
"Secrets stored in ~/.config/codex-launcher/oauth-secrets.json (not in repo)",
@@ -361,7 +414,7 @@ PROVIDER_PRESETS = {
},
"Google Antigravity (OAuth)": {
"backend_type": "gemini-oauth-antigravity",
"base_url": "https://daily-cloudcode-pa.sandbox.googleapis.com",
"base_url": "https://cloudcode-pa.googleapis.com",
"oauth_provider": "google-antigravity",
"models": [
"Gemini 3.5 Flash (High)", "Gemini 3.5 Flash (Medium)", "Gemini 3.5 Flash (Low)",
@@ -910,6 +963,27 @@ def restore_config():
shutil.copy2(str(CONFIG_BAK), str(tmp))
os.replace(str(tmp), str(CONFIG))
def set_active_endpoint(name):
ACTIVE_ENDPOINT_FILE.parent.mkdir(parents=True, exist_ok=True)
write_secure_text(ACTIVE_ENDPOINT_FILE, json.dumps({"active": name}, indent=2))
def validate_active_endpoint(logfn=None):
if not ACTIVE_ENDPOINT_FILE.exists():
return
try:
d = json.loads(ACTIVE_ENDPOINT_FILE.read_text())
active = d.get("active", "")
if not active:
return
eps = load_endpoints()
names = {ep.get("name", "") for ep in eps}
if active not in names:
ACTIVE_ENDPOINT_FILE.unlink()
if logfn:
logfn(f"Removed stale active-endpoint '{active}' (provider no longer exists)")
except Exception:
pass
def write_secure_text(path, text):
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
@@ -1253,6 +1327,9 @@ def _check_codex_auth():
if out.returncode == 0 and text:
return ("logged_in", text)
if text:
_tl = text.lower()
if "no such file" in _tl or "os error 2" in _tl or "not found" in _tl:
return ("not_configured", "Config missing — launch once to create")
return ("error", text)
return ("unknown", "No output from codex login status")
except FileNotFoundError:
@@ -1782,6 +1859,64 @@ class AIMonitoringWindow(Gtk.Window):
# Main window
# ═══════════════════════════════════════════════════════════════════
def _oauth_discover_project(access_token, token_path, tokens):
project_id = ""
try:
lr = urllib.request.Request(
"https://cloudcode-pa.googleapis.com/v1internal:loadCodeAssist",
data=json.dumps({}).encode(),
headers={"Content-Type": "application/json",
"Authorization": f"Bearer {access_token}",
"User-Agent": "google-api-nodejs-client/9.15.1"})
lresp = urllib.request.urlopen(lr, timeout=15)
ldata = json.loads(lresp.read())
p = ldata.get("cloudaicompanionProject", "")
if isinstance(p, dict):
project_id = p.get("id", "")
elif isinstance(p, str):
project_id = p
except Exception:
pass
if not project_id:
return ""
try:
test_url = f"https://cloudcode-pa.googleapis.com/v1internal:listModels?project={project_id}"
test_req = urllib.request.Request(test_url,
headers={"Authorization": f"Bearer {access_token}",
"User-Agent": "google-api-nodejs-client/9.15.1"})
urllib.request.urlopen(test_req, timeout=10)
except urllib.error.HTTPError as e:
if e.code == 403 and "SERVICE_DISABLED" in (e.read().decode()[:500]):
print(f"[oauth] project {project_id} has API disabled, searching for valid project...", file=sys.stderr)
try:
list_req = urllib.request.Request(
"https://cloudresourcemanager.googleapis.com/v1/projects?filter=lifecycleState:ACTIVE",
headers={"Authorization": f"Bearer {access_token}"})
list_resp = urllib.request.urlopen(list_req, timeout=15)
projects = json.loads(list_resp.read()).get("projects", [])
for proj in projects:
pid = proj.get("projectId", "")
if not pid or pid == project_id:
continue
try:
t2 = urllib.request.Request(
f"https://cloudcode-pa.googleapis.com/v1internal:listModels?project={pid}",
headers={"Authorization": f"Bearer {access_token}",
"User-Agent": "google-api-nodejs-client/9.15.1"})
urllib.request.urlopen(t2, timeout=10)
project_id = pid
print(f"[oauth] found working project: {pid}", file=sys.stderr)
break
except Exception:
continue
except Exception:
pass
tokens["project_id"] = project_id
with open(token_path, "w") as f:
json.dump(tokens, f, indent=2)
os.chmod(token_path, 0o600)
return project_id
class LauncherWin(Gtk.Window):
def __init__(self):
super().__init__(title="Codex Launcher")
@@ -1791,6 +1926,7 @@ class LauncherWin(Gtk.Window):
self._proc = None
self._endpoints_data = load_endpoints()
recover_config_if_needed()
validate_active_endpoint()
vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8)
self.add(vbox)
@@ -1798,7 +1934,7 @@ class LauncherWin(Gtk.Window):
# header row
hdr = Gtk.Box(spacing=8)
vbox.pack_start(hdr, False, False, 0)
lbl = Gtk.Label(label="<b>Codex Launcher v3.10.7</b>")
lbl = Gtk.Label(label=f"<b>Codex Launcher v{CHANGELOG[0][0]}</b>")
lbl.set_use_markup(True)
hdr.pack_start(lbl, False, False, 0)
changelog_btn = Gtk.Button(label="Changelog")
@@ -2037,6 +2173,8 @@ class LauncherWin(Gtk.Window):
self._relogin_btn.set_sensitive("cli" not in self._missing)
elif status == "not_installed":
self._auth_label.set_markup("<span foreground='#888'>Auth: N/A (CLI not installed)</span>")
elif status == "not_configured":
self._auth_label.set_markup("<span foreground='#d29922'>⚠ Config missing — launch once to create</span>")
else:
self._auth_label.set_markup(f"<span foreground='#d29922'>⚠ Auth: {msg}</span>")
self._relogin_btn.set_sensitive("cli" not in self._missing)
@@ -2536,6 +2674,8 @@ class LauncherWin(Gtk.Window):
begin_config_transaction(f"launch:{ep['name']}")
write_config_for_native(ep, model)
set_active_endpoint(ep["name"])
if target == "desktop":
if needs_proxy:
_kill_existing_desktop(self.log)
@@ -2593,6 +2733,7 @@ class LauncherWin(Gtk.Window):
begin_config_transaction(f"launch:bgp:{pool['name']}")
write_config_for_translated(bgp_ep, model, port)
set_active_endpoint(pool["name"])
if target == "desktop":
_kill_existing_desktop(self.log)
@@ -2832,63 +2973,163 @@ class LauncherWin(Gtk.Window):
_stop_proxy()
Gtk.main_quit()
def _google_reoauth(self, provider):
secrets_path = os.path.expanduser("~/.config/codex-launcher/oauth-secrets.json")
try:
with open(secrets_path) as f:
secrets = json.load(f)
except Exception:
secrets = {}
def _google_reoauth(self, provider, parent_dlg=None):
import http.server
is_antigravity = provider == "google-antigravity"
sec_key = "antigravity" if is_antigravity else "gemini_cli"
sec = secrets.get(sec_key, {})
client_id = sec.get("client_id", "")
client_secret = sec.get("client_secret", "")
if not client_id or not client_secret:
_sp = os.path.expanduser("~/.config/codex-launcher/oauth-secrets.json")
try:
with open(_sp) as _f:
_secrets_data = json.load(_f)
except Exception:
_secrets_data = {}
sec = _secrets_data.get(sec_key, {})
CLIENT_ID = sec.get("client_id", "")
CLIENT_SECRET = sec.get("client_secret", "")
if not CLIENT_ID or not CLIENT_SECRET:
self._show_error_dialog("Missing OAuth secrets",
f"No client_id/client_secret for {sec_key}.\nSet them in OAuth Secrets first.")
return
token_file = "google-antigravity-oauth-token.json" if is_antigravity else "google-cli-oauth-token.json"
token_path = os.path.expanduser(f"~/.cache/codex-proxy/{token_file}")
redirect = "urn:ietf:wg:oauth:2.0:oob"
auth_url = (f"https://accounts.google.com/o/oauth2/v2/auth?client_id={client_id}"
f"&redirect_uri={urllib.parse.quote(redirect)}"
f"&response_type=code&scope={urllib.parse.quote('https://www.googleapis.com/auth/cloud-platform')}"
f"&access_type=offline&prompt=consent")
webbrowser.open(auth_url)
code_dlg = Gtk.Dialog(title=f"Re-OAuth: {'Antigravity' if is_antigravity else 'Gemini CLI'}", parent=self, modal=True)
code_dlg.add_button("Cancel", Gtk.ResponseType.CANCEL)
code_dlg.add_button("Exchange", Gtk.ResponseType.OK)
code_dlg.set_default_size(500, 180)
ca = code_dlg.get_content_area()
provider_kind = "antigravity" if is_antigravity else "cli"
if is_antigravity:
SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
"https://www.googleapis.com/auth/userinfo.email",
"https://www.googleapis.com/auth/userinfo.profile",
"https://www.googleapis.com/auth/cclog",
"https://www.googleapis.com/auth/experimentsandconfigs",
]
port = 51121
redirect_uri = f"http://localhost:{port}/oauth-callback"
callback_path = "/oauth-callback"
else:
SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
"https://www.googleapis.com/auth/userinfo.email",
"https://www.googleapis.com/auth/userinfo.profile",
]
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
port = s.getsockname()[1]
redirect_uri = f"http://127.0.0.1:{port}/oauth2callback"
callback_path = "/oauth2callback"
state = secrets.token_hex(32)
verifier = secrets.token_urlsafe(64)
challenge = base64.urlsafe_b64encode(hashlib.sha256(verifier.encode()).digest()).rstrip(b"=").decode()
scope_str = " ".join(SCOPES)
auth_url = (
f"https://accounts.google.com/o/oauth2/v2/auth?"
f"client_id={CLIENT_ID}"
f"&redirect_uri={urllib.parse.quote(redirect_uri)}"
f"&response_type=code"
f"&scope={urllib.parse.quote(scope_str)}"
f"&access_type=offline"
f"&prompt=select_account%20consent"
f"&state={state}"
f"&code_challenge={challenge}"
f"&code_challenge_method=S256"
)
oauth_dlg = Gtk.Dialog(title=f"Re-OAuth: {'Antigravity' if is_antigravity else 'Gemini CLI'}", parent=parent_dlg or self, modal=True)
oauth_dlg.add_button("Cancel", Gtk.ResponseType.CANCEL)
oauth_dlg.set_default_size(520, 200)
ca = oauth_dlg.get_content_area()
ca.set_margin_start(12)
ca.set_margin_end(12)
ca.set_spacing(6)
ca.pack_start(Gtk.Label(label="Browser opened for Google OAuth.\nPaste the authorization code below:", xalign=0), False, False, 0)
code_entry = Gtk.Entry()
code_entry.set_placeholder_text("4/0AX...")
ca.pack_start(code_entry, False, False, 4)
ca.pack_start(Gtk.Label(label=f"<b>Re-authenticating {'Antigravity' if is_antigravity else 'Gemini CLI'}</b>", use_markup=True, xalign=0), False, False, 0)
link_lbl = Gtk.Label(label="Click here to open Google authorization", use_markup=True, xalign=0)
link_lbl.set_markup(f'<a href="{auth_url}">Click here to open Google authorization</a>')
ca.pack_start(link_lbl, False, False, 4)
status_lbl = Gtk.Label(label="Waiting for browser callback...", xalign=0)
ca.pack_start(status_lbl, False, False, 4)
ca.show_all()
if code_dlg.run() == Gtk.ResponseType.OK:
code = code_entry.get_text().strip()
if code:
code_holder = [None]
error_holder = [None]
class OAuthHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self2):
qs = urllib.parse.urlparse(self2.path).query
params = urllib.parse.parse_qs(qs)
if "code" in params:
if params.get("state", [None])[0] != state:
self2.send_response(400)
self2.end_headers()
self2.wfile.write(b"CSRF state mismatch")
error_holder[0] = "CSRF state mismatch"
return
code_holder[0] = params["code"][0]
self2.send_response(302)
self2.send_header("Location", "https://developers.google.com/gemini-code-assist/auth_success_gemini")
self2.end_headers()
else:
error_holder[0] = params.get("error", ["unknown"])[0]
self2.send_response(302)
self2.send_header("Location", "https://developers.google.com/gemini-code-assist/auth_failure_gemini")
self2.end_headers()
def log_message(self2, fmt, *args):
pass
try:
bind_host = "localhost" if is_antigravity else "127.0.0.1"
server = http.server.HTTPServer((bind_host, port), OAuthHandler)
except OSError:
status_lbl.set_text(f"Port {port} in use — close other apps and retry.")
oauth_dlg.run()
oauth_dlg.destroy()
return
def _wait():
deadline = time.time() + 120
while code_holder[0] is None and error_holder[0] is None and time.time() < deadline:
server.handle_request()
server.server_close()
if code_holder[0]:
try:
tok_req = urllib.request.Request("https://oauth2.googleapis.com/token",
data=urllib.parse.urlencode({
"code": code, "client_id": client_id, "client_secret": client_secret,
"redirect_uri": redirect, "grant_type": "authorization_code"
}).encode(),
tok_data = urllib.parse.urlencode({
"code": code_holder[0], "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET,
"redirect_uri": redirect_uri, "grant_type": "authorization_code",
"code_verifier": verifier,
}).encode()
req = urllib.request.Request("https://oauth2.googleapis.com/token", data=tok_data,
headers={"Content-Type": "application/x-www-form-urlencoded"})
tok_resp = urllib.request.urlopen(tok_req, timeout=30)
tok_data = json.loads(tok_resp.read())
tok_data["_updated"] = time.time()
resp = urllib.request.urlopen(req, timeout=30)
tokens = json.loads(resp.read())
tokens["client_id"] = CLIENT_ID
tokens["client_secret"] = CLIENT_SECRET
tokens["provider_kind"] = provider_kind
tokens["expires_at"] = time.time() + tokens.get("expires_in", 3600)
os.makedirs(os.path.dirname(token_path), exist_ok=True)
with open(token_path, "w") as f:
json.dump(tok_data, f, indent=2)
self._log(f"[oauth] Refreshed {provider} token → {token_path}")
json.dump(tokens, f, indent=2)
os.chmod(token_path, 0o600)
project_id = _oauth_discover_project(tokens["access_token"], token_path, tokens)
def _on_success():
status_lbl.set_text(f"Authorization successful! Project: {project_id or 'none'}")
GLib.timeout_add_seconds(2, lambda: oauth_dlg.destroy())
return False
GLib.idle_add(_on_success)
except Exception as e:
self._show_error_dialog("Token exchange failed", str(e)[:300])
code_dlg.destroy()
def _on_err(exc=str(e)):
status_lbl.set_text(f"Token exchange failed: {exc[:200]}")
return False
GLib.idle_add(_on_err)
else:
def _on_fail(err=error_holder[0]):
status_lbl.set_text(f"Failed: {err or 'No code received'}")
return False
GLib.idle_add(_on_fail)
webbrowser.open(auth_url)
threading.Thread(target=_wait, daemon=True).start()
oauth_dlg.run()
oauth_dlg.destroy()
def _codebuff_reoauth(self):
self._codebuff_oauth_standalone()
@@ -3019,7 +3260,7 @@ class LauncherWin(Gtk.Window):
hdr_row.pack_start(Gtk.Label(label=f"\n<b>{section_label}</b>", use_markup=True, xalign=0), True, True, 0)
reauth_btn = Gtk.Button(label="Re-OAuth")
reauth_btn.set_size_request(80, -1)
reauth_btn.connect("clicked", lambda b, p=oauth_prov: self._google_reoauth(p))
reauth_btn.connect("clicked", lambda b, p=oauth_prov: self._google_reoauth(p, dlg))
hdr_row.pack_end(reauth_btn, False, False, 0)
import_btn = Gtk.Button(label="Import JSON")
import_btn.set_size_request(100, -1)
@@ -3868,32 +4109,8 @@ class EditEndpointDialog(Gtk.Dialog):
json.dump(tokens, f, indent=2)
os.chmod(token_path, 0o600)
_oauth_log(f"Token saved to {token_path}")
project_id = ""
try:
_oauth_log("Discovering project ID via loadCodeAssist...")
lr = urllib.request.Request(
"https://cloudcode-pa.googleapis.com/v1internal:loadCodeAssist",
data=json.dumps({}).encode(),
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {tokens['access_token']}",
"User-Agent": "google-api-nodejs-client/9.15.1",
})
lresp = urllib.request.urlopen(lr, timeout=15)
ldata = json.loads(lresp.read())
p = ldata.get("cloudaicompanionProject", "")
if isinstance(p, dict):
project_id = p.get("id", "")
elif isinstance(p, str):
project_id = p
_oauth_log(f"Project ID: {project_id or '(none)'}")
if project_id:
tokens["project_id"] = project_id
with open(token_path, "w") as f2:
json.dump(tokens, f2, indent=2)
os.chmod(token_path, 0o600)
except Exception as pe:
_oauth_log(f"loadCodeAssist failed (non-fatal): {pe}")
project_id = _oauth_discover_project(tokens["access_token"], token_path, tokens)
_oauth_log(f"Project ID: {project_id or '(none)'}")
if is_antigravity:
found_models = [
"gemini-2.5-flash", "gemini-2.5-pro",
@@ -3915,7 +4132,7 @@ class EditEndpointDialog(Gtk.Dialog):
for mc in probe_candidates:
try:
pr = urllib.request.Request(
"https://daily-cloudcode-pa.sandbox.googleapis.com/v1internal:generateContent",
"https://cloudcode-pa.googleapis.com/v1internal:generateContent",
data=json.dumps({
"project": project_id,
"model": mc,
@@ -4264,10 +4481,54 @@ class EditEndpointDialog(Gtk.Dialog):
data["default"] = name
save_endpoints(data)
self._hot_reload_proxy_key(new_ep)
self._parent_mgr._rebuild()
self._parent_mgr._parent._on_endpoints_updated()
self.destroy()
def _hot_reload_proxy_key(self, ep):
try:
ep_name = ep.get("name", "")
proxy_port = None
import glob as _glob
for cfg_file in _glob.glob(str(PROXY_CONFIG_DIR / "proxy-*.json")):
try:
with open(cfg_file) as f:
pcfg = json.load(f)
if ep_name.lower().replace(" ", "-") in cfg_file.lower():
proxy_port = pcfg.get("port")
pcfg["api_key"] = ep.get("api_key", "")
with open(cfg_file, "w") as f:
json.dump(pcfg, f, indent=2)
break
except Exception:
continue
if proxy_port:
import urllib.request as _ur
try:
url = f"http://127.0.0.1:{proxy_port}/admin/reload"
resp = _ur.urlopen(url, timeout=3)
result = json.loads(resp.read())
reloaded = result.get("reloaded", False)
preview = result.get("api_key_preview", "?")
self._parent_mgr._parent.log(
f"[hot-reload] key {'updated' if reloaded else 'unchanged'}: {preview}")
if reloaded:
verify_url = f"http://127.0.0.1:{proxy_port}/admin/verify-key"
vresp = _ur.urlopen(verify_url, timeout=10)
vresult = json.loads(vresp.read())
valid = vresult.get("valid", False)
if valid:
self._parent_mgr._parent.log(
f"[hot-reload] key verified OK ({vresult.get('models', '?')} models)")
else:
self._parent_mgr._parent.log(
f"[hot-reload] WARNING: key verification failed: {vresult.get('error', 'unknown')}")
except Exception:
pass
except Exception:
pass
def _show_error(self, msg):
d = Gtk.MessageDialog(self, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, msg)
d.run(); d.destroy()

View File

@@ -83,6 +83,81 @@ model_catalog_json = ""
"""
CHANGELOG = [
("3.12.0", "2026-05-27", [
"gRPC auto-fallback for Antigravity provider (PR #13)",
"New antigravity_grpc module with protobuf client",
"REST 404 triggers gRPC fallback using display names",
"gRPC supports streaming and unary generate",
"Dynamic version fetch with probe validation",
"Antigravity v2 handler rewrite (anti-api approach)",
"Safety settings, stopSequences, sessionId, requestType: agent",
]),
("3.11.11", "2026-05-26", [
"Final trimming only removes plain messages, never function_call_output",
]),
("3.11.10", "2026-05-26", [
"Fix Antigravity: interleave function_call/output pairs in correct sequence (PR #11)",
"Fix Gemini sanitizer: trim leading/trailing non-user turns for Google API compliance",
"Stricter function call/response isolation — no merging across role boundaries",
]),
("3.11.9", "2026-05-26", [
"Fix Antigravity: preserve functionCall/functionResponse in Gemini sanitizer (PR #10)",
"Prevents tool responses from being merged/dropped in multi-turn Antigravity sessions",
]),
("3.11.8", "2026-05-26", [
"Vision description cache persisted across requests (no redundant API calls for same image)",
"Merge PR #8: fix vision cache persistence across requests",
]),
("3.11.7", "2026-05-26", [
"Vision auto-detect: uses provider's own vision model (e.g. 0G-Qwen-VL) as fallback for image description",
"Vision preprocessing replaces image stripping: images described via API instead of just removed",
"Fix AttributeError in image_url handling when value is string not dict",
"Merge PR #6: vision/OCR preprocessing for text-only models",
"Merge PR #7: 177 unit tests for translate-proxy.py",
"Auth os error 2 fix: GUI shows config-missing message instead of raw error",
]),
("3.11.6", "2026-05-26", [
"Antigravity loop breakers: per-session tracking, edit-intent nudge (first turn only)",
"Loop breaker: same tool+args repeated 5+ times triggers force finalization",
"Latest user instruction appended exactly once per request",
"Detailed [antigravity-loop] logging for all tracking fields",
"has_content fix: function_call now counts as valid output (no more infinite loops)",
"Antigravity-only changes, no touch to other providers",
]),
("3.11.5", "2026-05-26", [
"Token-aware compaction: fixes context_length_exceeded on small-context models (25 items x 1600 tokens)",
"Proactive compaction triggers on token count (>80% model limit), not just item count",
"Universal adaptive compaction: removed crof.ai-only gates, all providers get compaction",
"Vision model detection: strips images for non-vision models, keeps for vision-capable ones",
"Per-model token limit learning from context_length_exceeded error messages",
"Compaction aggression levels: normal vs extreme when tokens > 1.5x model limit",
"Smart-continue text-tool detection: triggers on tool-call text patterns, not just function_call_output",
"Active endpoint sync: GUI auto-removes stale endpoint references on startup",
]),
("3.11.0", "2026-05-26", [
"Merge cobra PR: concurrency semaphore (max 3), auto-continue for truncated text",
"SO_REUSEADDR on sticky port, proxy-stderr.log, stream diagnostics logging",
"Timeout/OSError handler sends response.failed SSE instead of silent drop",
"Restart Proxy button: only restarts proxy without killing Codex Desktop",
"Tool call argument normalizer: fixes Arguments->arguments, strips markdown wrapping",
"Smart-continue loop (2x retries): escalating nudges when model stops text-only mid-task",
"XML tool call extraction: parses patterns from text, injects as real calls",
"Auto-continue + smart-continue ordered with skip guard to avoid double-firing",
"API key hot-reload with mtime tracking + /admin/reload + /admin/verify-key endpoints",
"GUI hot-reload: auto-refreshes proxy key on endpoint edit, verifies with upstream",
"Synthetic tool-results disabled: was causing deepseek-v4-pro truncation on opencode.ai",
]),
("3.10.12", "2026-05-26", [
"Sticky endpoint: caches last working endpoint, sequential fallback on failure",
"Endpoint order: cloudcode-pa first (matches agy CLI), daily-cloudcode-pa fallback",
"Anti-stall engine: kills stale proxy processes + clears pycache on startup",
"Smart error classification: quota vs capacity vs banned vs validation vs auth",
"Rate limit reset parsing: extracts cooldown from error body for accuracy",
"Missing headers: X-Client-Name, X-Client-Version, x-goog-api-client, sessionId",
"Guardrail skip: simple messages (hi) skip agent guardrail, no more tool-call loops",
"Claude fixes: preserve all tools, skip compaction/normalizer/sanitization for Claude",
"Normalizer model param: distinguishes Claude vs Gemini for correct behavior",
]),
("3.10.11", "2026-05-26", [
"Hybrid endpoint fallback: cloudcode-pa then daily-cloudcode-pa on 429",
"daily-cloudcode-pa.googleapis.com (same endpoint agy-core uses)",
@@ -1457,6 +1532,7 @@ def _pick_free_port():
try:
saved = int(_PROXY_PORT_FILE.read_text().strip())
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("127.0.0.1", saved))
return saved
except (ValueError, OSError, FileNotFoundError):
@@ -1548,11 +1624,19 @@ def _start_proxy_with_config(pcfg_path, port, logfn):
)
_register_pgid_entry("proxy", _proxy_proc.pid)
_proxy_log_path = PROXY_CONFIG_DIR / "proxy-stderr.log"
_proxy_log_file = open(_proxy_log_path, "a", encoding="utf-8")
def _pipe_stderr():
if not _proxy_proc.stderr:
return
for line in _proxy_proc.stderr:
logfn(f"[proxy] {line.rstrip()}")
try:
_proxy_log_file.write(line)
_proxy_log_file.flush()
except Exception:
pass
threading.Thread(target=_pipe_stderr, daemon=True).start()
@@ -1670,6 +1754,10 @@ def check_codex_auth():
return ("unknown", "No output from codex login status")
except FileNotFoundError:
return ("not_installed", "codex not found")
except OSError as e:
if e.errno == 2:
return ("not_configured", "Config not found — launch Codex once to create it")
return ("error", str(e))
except Exception as e:
return ("error", str(e))

File diff suppressed because it is too large Load Diff

0
tests/__init__.py Normal file
View File

View File

@@ -0,0 +1,396 @@
#!/usr/bin/env python3
"""
Unit tests for the Antigravity gRPC fallback module.
Tests cover:
1. Module import and availability detection
2. Protobuf conversion helpers (JSON <-> protobuf)
3. Request building from wrapped REST dict
4. Reverse alias map correctness
5. GrpcFallbackResult type
6. Integration: _try_grpc_fallback triggers correctly on REST 404
"""
import json
import os
import sys
import unittest
from unittest.mock import patch, MagicMock
# Add src to path so we can import the antigravity_grpc package
_src_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "src")
if _src_dir not in sys.path:
sys.path.insert(0, _src_dir)
class TestGrpcModuleAvailability(unittest.TestCase):
"""Tests for is_grpc_available() and module loading."""
def test_is_grpc_available_returns_bool(self):
"""is_grpc_available should return a boolean."""
from antigravity_grpc import is_grpc_available
result = is_grpc_available()
self.assertIsInstance(result, bool)
def test_is_grpc_available_true_when_installed(self):
"""If grpcio is installed and stubs are loadable, should return True."""
from antigravity_grpc import is_grpc_available
# grpcio was installed at test time, so this should be True
self.assertTrue(is_grpc_available())
def test_client_instantiation(self):
"""AntigravityGrpcClient should be instantiatable."""
from antigravity_grpc import AntigravityGrpcClient
client = AntigravityGrpcClient()
self.assertIsNotNone(client)
def test_get_client_singleton(self):
"""get_client should return the same singleton."""
from antigravity_grpc import get_client
c1 = get_client()
c2 = get_client()
self.assertIs(c1, c2)
class TestGrpcFallbackResult(unittest.TestCase):
"""Tests for GrpcFallbackResult type."""
def test_default_values(self):
from antigravity_grpc import GrpcFallbackResult
r = GrpcFallbackResult()
self.assertFalse(r.ok)
self.assertIsNone(r.response_data)
self.assertIsNone(r.stream_chunks)
self.assertEqual(r.error_message, "")
self.assertEqual(r.endpoint_used, "")
self.assertEqual(r.model_used, "")
self.assertEqual(r.elapsed_s, 0.0)
def test_success_result(self):
from antigravity_grpc import GrpcFallbackResult
r = GrpcFallbackResult(ok=True, response_data={"response": {"candidates": []}},
endpoint_used="daily-cloudcode-pa.googleapis.com:443",
model_used="Gemini 3.5 Flash (High)",
elapsed_s=2.5)
self.assertTrue(r.ok)
self.assertIsNotNone(r.response_data)
self.assertEqual(r.elapsed_s, 2.5)
def test_failure_result(self):
from antigravity_grpc import GrpcFallbackResult
r = GrpcFallbackResult(ok=False, error_message="All gRPC endpoints failed")
self.assertFalse(r.ok)
self.assertIn("failed", r.error_message)
def test_repr(self):
from antigravity_grpc import GrpcFallbackResult
r_ok = GrpcFallbackResult(ok=True, response_data={"response": {"candidates": []}})
self.assertIn("OK", repr(r_ok))
r_fail = GrpcFallbackResult(ok=False, error_message="timeout")
self.assertIn("FAIL", repr(r_fail))
class TestReverseAliasMap(unittest.TestCase):
"""Tests for the _GRPC_REVERSE_ALIAS map in translate-proxy.py."""
def test_import_reverse_alias(self):
"""The reverse alias map should be importable from the proxy module."""
import importlib
_spec = importlib.util.spec_from_file_location(
"translate_proxy",
os.path.join(_src_dir, "translate-proxy.py"),
)
tp = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(tp)
self.assertIsInstance(tp._GRPC_REVERSE_ALIAS, dict)
def test_key_models_have_reverse_aliases(self):
"""All key REST model slugs should have gRPC display name mappings."""
import importlib
_spec = importlib.util.spec_from_file_location(
"translate_proxy",
os.path.join(_src_dir, "translate-proxy.py"),
)
tp = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(tp)
required_slugs = [
"gemini-3-flash",
"gemini-3.5-flash-low",
"gemini-3.1-pro-low",
"claude-sonnet-4-6",
"claude-opus-4-6-thinking",
"gemini-2.5-flash",
]
for slug in required_slugs:
self.assertIn(slug, tp._GRPC_REVERSE_ALIAS,
f"Missing reverse alias for REST slug '{slug}'")
def test_reverse_alias_values_are_display_names(self):
"""gRPC display names should contain spaces and parentheses, not hyphens."""
import importlib
_spec = importlib.util.spec_from_file_location(
"translate_proxy",
os.path.join(_src_dir, "translate-proxy.py"),
)
tp = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(tp)
for slug, display_name in tp._GRPC_REVERSE_ALIAS.items():
# Display names typically have spaces (e.g. "Gemini 3.5 Flash (High)")
# while slugs use hyphens (e.g. "gemini-3-flash")
self.assertNotEqual(slug, display_name,
f"Reverse alias for '{slug}' should differ from slug (gRPC uses display names)")
class TestProtobufConversion(unittest.TestCase):
"""Tests for JSON -> protobuf conversion helpers."""
def test_struct_to_protobuf(self):
"""_struct_to_protobuf should convert a simple dict to Struct."""
from antigravity_grpc.client import _struct_to_protobuf
result = _struct_to_protobuf({"key": "value", "num": 42})
self.assertIsNotNone(result)
# Verify round-trip
from antigravity_grpc.client import _protobuf_struct_to_dict
d = _protobuf_struct_to_dict(result)
self.assertEqual(d["key"], "value")
self.assertEqual(d["num"], 42.0)
def test_struct_round_trip_nested(self):
"""Nested dicts should survive a round-trip through protobuf."""
from antigravity_grpc.client import _struct_to_protobuf, _protobuf_struct_to_dict
original = {"outer": {"inner": "hello"}, "list_val": [1, 2, 3]}
proto = _struct_to_protobuf(original)
result = _protobuf_struct_to_dict(proto)
self.assertEqual(result["outer"]["inner"], "hello")
self.assertEqual(result["list_val"], [1.0, 2.0, 3.0])
def test_json_parts_to_proto_text(self):
"""Text parts should convert to protobuf Part with text field."""
from antigravity_grpc.client import _json_parts_to_proto
parts = _json_parts_to_proto([{"text": "Hello world"}])
self.assertEqual(len(parts), 1)
self.assertEqual(parts[0].text, "Hello world")
def test_json_parts_to_proto_function_call(self):
"""FunctionCall parts should convert correctly."""
from antigravity_grpc.client import _json_parts_to_proto
parts = _json_parts_to_proto([{
"functionCall": {
"name": "exec_command",
"args": {"cmd": "ls -la"},
"id": "call_123"
}
}])
self.assertEqual(len(parts), 1)
self.assertTrue(parts[0].HasField("function_call"))
self.assertEqual(parts[0].function_call.name, "exec_command")
self.assertEqual(parts[0].function_call.id, "call_123")
def test_json_parts_to_proto_function_response(self):
"""FunctionResponse parts should convert correctly."""
from antigravity_grpc.client import _json_parts_to_proto
parts = _json_parts_to_proto([{
"functionResponse": {
"name": "exec_command",
"response": {"result": "file1.txt"},
"id": "call_123"
}
}])
self.assertEqual(len(parts), 1)
self.assertTrue(parts[0].HasField("function_response"))
self.assertEqual(parts[0].function_response.name, "exec_command")
def test_json_contents_to_proto(self):
"""Content objects should convert correctly."""
from antigravity_grpc.client import _json_contents_to_proto
contents = _json_contents_to_proto([
{"role": "user", "parts": [{"text": "Hello"}]},
{"role": "model", "parts": [{"text": "Hi there"}]},
])
self.assertEqual(len(contents), 2)
self.assertEqual(contents[0].role, "user")
self.assertEqual(contents[1].role, "model")
def test_proto_candidate_to_json(self):
"""Protobuf candidates should convert back to JSON-compatible dicts."""
from antigravity_grpc.client import _json_contents_to_proto, _proto_candidate_to_json
from antigravity_grpc import cloudcode_pb2 as pb2
# Build a candidate manually
candidate = pb2.Candidate()
candidate.content.role = "model"
candidate.content.parts.add().text = "Hello from gRPC"
candidate.finish_reason = "STOP"
candidate.index = 0
result = _proto_candidate_to_json(candidate)
self.assertEqual(result["finishReason"], "STOP")
self.assertEqual(result["content"]["role"], "model")
self.assertEqual(result["content"]["parts"][0]["text"], "Hello from gRPC")
class TestGrpcRequestBuilding(unittest.TestCase):
"""Tests for _build_request (wrapped REST dict → protobuf)."""
def _get_client(self):
from antigravity_grpc import AntigravityGrpcClient
return AntigravityGrpcClient()
def test_build_request_basic(self):
"""Basic request fields should be populated correctly."""
client = self._get_client()
wrapped = {
"project": "test-project-123",
"model": "Gemini 3.5 Flash (High)",
"requestType": "agent",
"userAgent": "antigravity/2.0.6",
"requestId": "agent-test123",
"request": {
"contents": [
{"role": "user", "parts": [{"text": "Say hello"}]}
],
"safetySettings": [
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "OFF"},
],
}
}
req = client._build_request(wrapped)
self.assertEqual(req.project, "test-project-123")
self.assertEqual(req.model, "Gemini 3.5 Flash (High)")
self.assertEqual(req.request_type, "agent")
self.assertEqual(len(req.request.contents), 1)
self.assertEqual(req.request.contents[0].role, "user")
def test_build_request_with_tools(self):
"""Tools should be converted to function declarations."""
client = self._get_client()
wrapped = {
"project": "test-project",
"model": "gemini-3-flash",
"request": {
"contents": [],
"tools": [{
"functionDeclarations": [{
"name": "exec_command",
"description": "Run a shell command",
"parameters": {"type": "object", "properties": {"cmd": {"type": "string"}}}
}]
}],
}
}
req = client._build_request(wrapped)
self.assertEqual(len(req.request.tools), 1)
self.assertEqual(req.request.tools[0].function_declarations[0].name, "exec_command")
def test_build_request_with_generation_config(self):
"""Generation config should be populated correctly."""
client = self._get_client()
wrapped = {
"project": "test-project",
"model": "gemini-3-flash",
"request": {
"contents": [],
"generationConfig": {
"maxOutputTokens": 64000,
"temperature": 0.7,
"stopSequences": ["\n\nHuman:"],
"thinkingConfig": {
"includeThoughts": True,
"thinkingBudget": 8192,
}
}
}
}
req = client._build_request(wrapped)
self.assertEqual(req.request.generation_config.max_output_tokens, 64000)
self.assertAlmostEqual(req.request.generation_config.temperature, 0.7, places=2)
self.assertTrue(req.request.generation_config.thinking_config.include_thoughts)
self.assertEqual(req.request.generation_config.thinking_config.thinking_budget, 8192)
def test_build_request_with_function_call_history(self):
"""Function call/response pairs in contents should be preserved."""
client = self._get_client()
wrapped = {
"project": "test-project",
"model": "gemini-3-flash",
"request": {
"contents": [
{"role": "user", "parts": [{"text": "List files"}]},
{"role": "model", "parts": [{
"functionCall": {"name": "exec_command", "args": {"cmd": "ls"}, "id": "call_1"}
}]},
{"role": "user", "parts": [{
"functionResponse": {"name": "exec_command", "response": {"result": "file.txt"}, "id": "call_1"}
}]},
]
}
}
req = client._build_request(wrapped)
self.assertEqual(len(req.request.contents), 3)
# Verify function call preserved
self.assertTrue(req.request.contents[1].parts[0].HasField("function_call"))
self.assertEqual(req.request.contents[1].parts[0].function_call.name, "exec_command")
# Verify function response preserved
self.assertTrue(req.request.contents[2].parts[0].HasField("function_response"))
self.assertEqual(req.request.contents[2].parts[0].function_response.name, "exec_command")
class TestGrpcEndpointsConfig(unittest.TestCase):
"""Tests for gRPC endpoint configuration."""
def test_default_endpoints(self):
"""Default endpoints should include production and daily."""
from antigravity_grpc.client import _GRPC_ENDPOINTS
self.assertGreaterEqual(len(_GRPC_ENDPOINTS), 2)
hostnames = [ep.split(":")[0] for ep in _GRPC_ENDPOINTS]
self.assertIn("daily-cloudcode-pa.googleapis.com", hostnames)
self.assertIn("cloudcode-pa.googleapis.com", hostnames)
def test_staging_env_var(self):
"""Staging endpoints should be controlled by env var."""
from antigravity_grpc.client import _ALLOW_STAGING_ENV
self.assertEqual(_ALLOW_STAGING_ENV, "ALLOW_ANTIGRAVITY_STAGING")
class TestProxyIntegration(unittest.TestCase):
"""Tests for the proxy's gRPC fallback integration."""
def _load_proxy_module(self):
import importlib
_spec = importlib.util.spec_from_file_location(
"translate_proxy",
os.path.join(_src_dir, "translate-proxy.py"),
)
tp = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(tp)
return tp
def test_get_grpc_client_function_exists(self):
"""_get_grpc_client should exist as a module-level function."""
tp = self._load_proxy_module()
self.assertTrue(callable(tp._get_grpc_client))
def test_grpc_fallback_errors_set(self):
"""_GRPC_FALLBACK_REST_ERRORS should include 404."""
tp = self._load_proxy_module()
self.assertIn(404, tp._GRPC_FALLBACK_REST_ERRORS)
def test_versions_bug_fixed(self):
"""The _versions[0] NameError should be fixed (should be _fetched_ver)."""
# Read the source file and verify _versions is not used incorrectly
with open(os.path.join(_src_dir, "translate-proxy.py")) as f:
source = f.read()
# The bug was: ver={_versions[0]} -- should be ver={_fetched_ver}
self.assertNotIn("_versions[0]", source,
"Bug: _versions[0] should have been replaced with _fetched_ver")
if __name__ == "__main__":
print("=" * 70)
print("Antigravity gRPC Fallback - Unit Tests")
print("=" * 70)
print()
unittest.main(verbosity=2)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff