v10.13.8: FIX D force_finalize skip Gemini, FIX A status=failed, FIX B stream timeout, FIX C lock scope, threshold 8/40

This commit is contained in:
Roman | RyzenAdvanced
2026-05-27 17:52:17 +04:00
Unverified
parent 6861700c0d
commit 5055ff894d
4 changed files with 199 additions and 159 deletions

View File

@@ -5901,21 +5901,21 @@ class Handler(http.server.BaseHTTPRequestHandler):
if ag_key not in _ANTIGRAVITY_FILE_TRACKER:
_ANTIGRAVITY_FILE_TRACKER[ag_key] = {"last_path": None, "path_counts": {}, "total_reads": 0}
ft = _ANTIGRAVITY_FILE_TRACKER[ag_key]
for item in reversed(input_data):
if isinstance(item, dict) and item.get("type") == "function_call":
args_str = json.dumps(item.get("arguments", {}))
file_match = re.search(r'(/[\w/.-]+\.(?:html|py|js|ts|css|json|md|yaml|yml|xml|txt|sh))', args_str)
if file_match:
detected_path = file_match.group(1)
ft["total_reads"] += 1
ft["path_counts"][detected_path] = ft["path_counts"].get(detected_path, 0) + 1
ft["last_path"] = detected_path
if ft["path_counts"][detected_path] >= 5 or ft["total_reads"] > 30:
ag_state["force_finalize"] = True
print(f"[antigravity-loop] FILE READ LOOP: {detected_path} read "
f"{ft['path_counts'][detected_path]}x, total={ft['total_reads']}",
file=sys.stderr)
break
for item in reversed(input_data):
if isinstance(item, dict) and item.get("type") == "function_call":
args_str = json.dumps(item.get("arguments", {}))
file_match = re.search(r'(/[\w/.-]+\.(?:html|py|js|ts|css|json|md|yaml|yml|xml|txt|sh))', args_str)
if file_match:
detected_path = file_match.group(1)
ft["total_reads"] += 1
ft["path_counts"][detected_path] = ft["path_counts"].get(detected_path, 0) + 1
ft["last_path"] = detected_path
if ft["path_counts"][detected_path] >= 8 or ft["total_reads"] > 40:
ag_state["force_finalize"] = True
print(f"[antigravity-loop] FILE READ LOOP: {detected_path} read "
f"{ft['path_counts'][detected_path]}x, total={ft['total_reads']}",
file=sys.stderr)
break
null_tool_names = {"get_goal", "get_remaining_tokens", "get_completion_budget", "status"}
consecutive_null = 0
@@ -5947,7 +5947,10 @@ class Handler(http.server.BaseHTTPRequestHandler):
ag_state["last_tool_count"] = 1
if ag_state.get("force_finalize"):
contents.append({"role": "user", "parts": [{"text": "STOP CALLING TOOLS. APPLY THE FINAL EDIT OR SUMMARIZE WHAT BLOCKED YOU. DO NOT CALL ANY MORE TOOLS."}]})
return self._send_ag_finalize(
"Loop detected. The proxy is forcing a stop because the model repeatedly "
"called tools without making progress. Try a more specific or smaller request.",
stream=body.get("stream", False))
if not _antigravity_is_simple_user(latest_user):
contents.insert(0, {"role": "user", "parts": [{"text": _GEMINI_AGENT_GUARDRAIL}]})
@@ -6730,20 +6733,20 @@ class Handler(http.server.BaseHTTPRequestHandler):
if ag_key not in _ANTIGRAVITY_FILE_TRACKER:
_ANTIGRAVITY_FILE_TRACKER[ag_key] = {"last_path": None, "path_counts": {}, "total_reads": 0}
ft = _ANTIGRAVITY_FILE_TRACKER[ag_key]
for item in reversed(input_data):
if isinstance(item, dict) and item.get("type") == "function_call":
args_str = json.dumps(item.get("arguments", {}))
file_match = re.search(r'(/[\w/.-]+\.(?:html|py|js|ts|css|json|md|yaml|yml|xml|txt|sh))', args_str)
if file_match:
dp = file_match.group(1)
ft["total_reads"] += 1
ft["path_counts"][dp] = ft["path_counts"].get(dp, 0) + 1
ft["last_path"] = dp
if ft["path_counts"][dp] >= 5 or ft["total_reads"] > 30:
ag_state["force_finalize"] = True
print(f"[antigravity-loop] FILE READ LOOP: {dp} read "
f"{ft['path_counts'][dp]}x, total={ft['total_reads']}", file=sys.stderr)
break
for item in reversed(input_data):
if isinstance(item, dict) and item.get("type") == "function_call":
args_str = json.dumps(item.get("arguments", {}))
file_match = re.search(r'(/[\w/.-]+\.(?:html|py|js|ts|css|json|md|yaml|yml|xml|txt|sh))', args_str)
if file_match:
dp = file_match.group(1)
ft["total_reads"] += 1
ft["path_counts"][dp] = ft["path_counts"].get(dp, 0) + 1
ft["last_path"] = dp
if ft["path_counts"][dp] >= 8 or ft["total_reads"] > 40:
ag_state["force_finalize"] = True
print(f"[antigravity-loop] FILE READ LOOP: {dp} read "
f"{ft['path_counts'][dp]}x, total={ft['total_reads']}", file=sys.stderr)
break
null_tool_names = {"get_goal", "get_remaining_tokens", "get_completion_budget", "status"}
consecutive_null = 0
@@ -6785,8 +6788,11 @@ class Handler(http.server.BaseHTTPRequestHandler):
break
if ag_state["force_finalize"]:
contents.append({"role": "user", "parts": [{"text": "STOP CALLING TOOLS. APPLY THE FINAL EDIT OR SUMMARIZE WHAT BLOCKED YOU. DO NOT CALL ANY MORE TOOLS. DO NOT PRODUCE ANY MORE PLANNING TEXT. DO NOT PRODUCE ANY MORE EXPLORATORY TOOL CALLS. PRODUCE A FINAL ANSWER OR A CLEAR STATEMENT OF WHAT IS PREVENTING YOU FROM COMPLETING THE TASK."}]})
elif latest_lower and any(w in latest_lower for w in _EDIT_WORDS) and not ag_state["nudge_injected"] and not ag_state["force_finalize"]:
return self._send_ag_finalize(
"Loop detected. The proxy is forcing a stop because the model repeatedly "
"called tools without making progress. Try a more specific or smaller request.",
stream=body.get("stream", False) if isinstance(body, dict) else False)
elif latest_lower and any(w in latest_lower for w in _EDIT_WORDS) and not ag_state["nudge_injected"]:
contents.append({"role": "user", "parts": [{"text": "!!! ABSOLUTELY NO PLANNING - EMIT THE TOOL CALL NOW !!! IMPORTANT: The user is requesting a modification to existing files. You MUST use tools (exec_command, read_files, write, etc.) to make the changes RIGHT NOW. Do NOT just describe what to do — actually CALL THE TOOLS IN THIS RESPONSE. IMMEDIATELY INSPECT THE FILE OR LIST FILES USING exec_command TOOL CALL."}]})
ag_state["nudge_injected"] = True
print(f"[antigravity] edit-intent detected; injected tool-use nudge (first time for this request)", file=sys.stderr)
@@ -7014,82 +7020,100 @@ class Handler(http.server.BaseHTTPRequestHandler):
buf = ""
stream_finished = False
for raw_line in _stream_with_idle_timeout(upstream, _idle_timeout_for_model(model)):
if tracker and tracker.cancelled.is_set():
print("[gemini-oauth] stream cancelled", file=sys.stderr)
break
if stream_finished:
break
line = raw_line.decode(errors="replace")
if line.startswith("data: "):
buf += line[6:]
continue
if not line.strip() and buf:
try:
chunk = json.loads(buf)
except Exception:
buf = ""
continue
buf = ""
candidates = chunk.get("response", chunk).get("candidates", [])
if not candidates:
if chunk.get("error"):
print(f"[{self._session_id}] stream error chunk: {str(chunk.get('error'))[:300]}", file=sys.stderr)
continue
if candidates[0].get("finishReason") and not candidates[0].get("content", {}).get("parts"):
print(f"[{self._session_id}] finish without parts: {candidates[0].get('finishReason')}", file=sys.stderr)
parts = candidates[0].get("content", {}).get("parts", [])
for part in parts:
sig = _extract_gemini_sig(part)
if sig:
if part.get("functionCall"):
fc_id = part["functionCall"].get("id") or part["functionCall"].get("name")
fc_name = part["functionCall"].get("name")
if fc_id:
_gemini_store_sig(f"fc:{fc_id}", sig)
if fc_name:
_gemini_store_sig(f"fc:{fc_name}", sig)
_gemini_store_sig(f"turn:{resp_id}", sig)
if part.get("thought"):
sig_from_thought = _extract_gemini_sig(part)
if sig_from_thought:
_gemini_store_sig(f"turn:{resp_id}", sig_from_thought)
continue
if "text" in part and not part.get("functionCall"):
text_delta = part["text"]
if not text_delta:
continue
full_text += text_delta
if not message_started:
flush_event("response.output_item.added", {"type": "response.output_item.added", "output_index": 0, "item": {"type": "message", "id": message_id, "role": "assistant", "content": []}})
flush_event("response.content_part.added", {"type": "response.content_part.added", "output_index": 0, "content_index": 0, "part": {"type": "output_text", "text": ""}})
output_items.append({"text": True})
message_started = True
flush_event("response.output_text.delta", {"type": "response.output_text.delta", "output_index": 0, "content_index": 0, "delta": text_delta})
elif part.get("functionCall"):
fc = part["functionCall"]
call_id = f"call_{uuid.uuid4().hex[:24]}"
args_str = json.dumps(fc.get("args", fc.get("arguments", {})))
output_index = len(output_items)
flush_event("response.output_item.added", {"type": "response.output_item.added", "output_index": output_index, "item": {"type": "function_call", "id": call_id, "call_id": call_id, "name": fc.get("name", ""), "arguments": ""}})
flush_event("response.function_call_arguments.delta", {"type": "response.function_call_arguments.delta", "output_index": output_index, "item_id": call_id, "delta": args_str})
flush_event("response.function_call_arguments.done", {"type": "response.function_call_arguments.done", "output_index": output_index, "item_id": call_id, "arguments": args_str})
current_tool_calls[call_id] = fc
output_items.append({"tool": True})
last_finish = candidates[0].get("finishReason", "")
if last_finish:
part_kinds = []
for p in parts:
if "text" in p: part_kinds.append("text")
if "functionCall" in p: part_kinds.append("functionCall")
if _extract_gemini_sig(p): part_kinds.append("thoughtSignature")
print(f"[{self._session_id}] [antigravity] finish={last_finish} parts={part_kinds} tool_calls={len(current_tool_calls)}", file=sys.stderr)
if OAUTH_PROVIDER == "google-antigravity" and last_finish == "MAX_TOKENS" and full_text and not current_tool_calls:
print(f"[{self._session_id}] MAX_TOKENS hit ({len(full_text)} chars), auto-continuing...", file=sys.stderr)
break
stream_finished = True
last_finish = ""
try:
for raw_line in _stream_with_idle_timeout(upstream, _idle_timeout_for_model(model)):
if tracker and tracker.cancelled.is_set():
print("[gemini-oauth] stream cancelled", file=sys.stderr)
break
if stream_finished:
break
line = raw_line.decode(errors="replace")
if line.startswith("data: "):
buf += line[6:]
continue
if not line.strip() and buf:
try:
chunk = json.loads(buf)
except Exception:
buf = ""
continue
buf = ""
candidates = chunk.get("response", chunk).get("candidates", [])
if not candidates:
if chunk.get("error"):
print(f"[{self._session_id}] stream error chunk: {str(chunk.get('error'))[:300]}", file=sys.stderr)
continue
if candidates[0].get("finishReason") and not candidates[0].get("content", {}).get("parts"):
print(f"[{self._session_id}] finish without parts: {candidates[0].get('finishReason')}", file=sys.stderr)
parts = candidates[0].get("content", {}).get("parts", [])
for part in parts:
sig = _extract_gemini_sig(part)
if sig:
if part.get("functionCall"):
fc_id = part["functionCall"].get("id") or part["functionCall"].get("name")
fc_name = part["functionCall"].get("name")
if fc_id:
_gemini_store_sig(f"fc:{fc_id}", sig)
if fc_name:
_gemini_store_sig(f"fc:{fc_name}", sig)
_gemini_store_sig(f"turn:{resp_id}", sig)
if part.get("thought"):
sig_from_thought = _extract_gemini_sig(part)
if sig_from_thought:
_gemini_store_sig(f"turn:{resp_id}", sig_from_thought)
continue
if "text" in part and not part.get("functionCall"):
text_delta = part["text"]
if not text_delta:
continue
full_text += text_delta
if not message_started:
flush_event("response.output_item.added", {"type": "response.output_item.added", "output_index": 0, "item": {"type": "message", "id": message_id, "role": "assistant", "content": []}})
flush_event("response.content_part.added", {"type": "response.content_part.added", "output_index": 0, "content_index": 0, "part": {"type": "output_text", "text": ""}})
output_items.append({"text": True})
message_started = True
flush_event("response.output_text.delta", {"type": "response.output_text.delta", "output_index": 0, "content_index": 0, "delta": text_delta})
elif part.get("functionCall"):
fc = part["functionCall"]
call_id = f"call_{uuid.uuid4().hex[:24]}"
args_str = json.dumps(fc.get("args", fc.get("arguments", {})))
output_index = len(output_items)
flush_event("response.output_item.added", {"type": "response.output_item.added", "output_index": output_index, "item": {"type": "function_call", "id": call_id, "call_id": call_id, "name": fc.get("name", ""), "arguments": ""}})
flush_event("response.function_call_arguments.delta", {"type": "response.function_call_arguments.delta", "output_index": output_index, "item_id": call_id, "delta": args_str})
flush_event("response.function_call_arguments.done", {"type": "response.function_call_arguments.done", "output_index": output_index, "item_id": call_id, "arguments": args_str})
current_tool_calls[call_id] = fc
output_items.append({"tool": True})
last_finish = candidates[0].get("finishReason", "")
if last_finish:
part_kinds = []
for p in parts:
if "text" in p: part_kinds.append("text")
if "functionCall" in p: part_kinds.append("functionCall")
if _extract_gemini_sig(p): part_kinds.append("thoughtSignature")
print(f"[{self._session_id}] [antigravity] finish={last_finish} parts={part_kinds} tool_calls={len(current_tool_calls)}", file=sys.stderr)
if OAUTH_PROVIDER == "google-antigravity" and last_finish == "MAX_TOKENS" and full_text and not current_tool_calls:
print(f"[{self._session_id}] MAX_TOKENS hit ({len(full_text)} chars), auto-continuing...", file=sys.stderr)
break
stream_finished = True
break
else:
if line.strip():
buf += line
except TimeoutError as te:
print(f"[{self._session_id}] [antigravity-v2] STREAM TIMEOUT: {te}", file=sys.stderr)
_log_resp(resp_id, "stream_timeout", [{"type": "error", "code": "stream_timeout", "message": str(te)}])
try:
flush_event("response.failed", {"type": "response.failed", "response": {"id": resp_id, "object": "response", "status": "failed", "error": {"type": "stream_timeout", "message": str(te)[:200]}}})
except Exception:
pass
self.close_connection = True
return
except (BrokenPipeError, ConnectionResetError, ConnectionAbortedError):
print(f"[{self._session_id}] [antigravity-v2] client disconnected during stream", file=sys.stderr)
_log_resp(resp_id, "client_disconnect", [])
return
if OAUTH_PROVIDER.startswith("google") and full_text and not current_tool_calls and last_finish == "MAX_TOKENS" and not stream_finished:
result = _auto_continue_gemini(self, flush_event, message_id, model, gen_config, gemini_tools, system_parts, project_id, headers, endpoints, url_suffix, full_text, output_items, message_started)
@@ -8430,12 +8454,13 @@ class Handler(http.server.BaseHTTPRequestHandler):
def _send_ag_finalize(self, text, stream=False, is_responses_api=True):
sid = getattr(self, '_session_id', 'fin')
print(f"[{sid}] [antigravity-finalize] Sending finalize response: {text[:80]}...", file=sys.stderr)
_log_resp(f"finalize-{sid}", "finalized", [{"type": "message", "content": [{"text": text}]}])
print(f"[{sid}] [antigravity-finalize] Sending finalize-as-failed: {text[:80]}...", file=sys.stderr)
_log_resp(f"finalize-{sid}", "failed", [{"type": "error", "code": "rate_limit_error", "message": text}])
resp_id = f"resp_{uuid.uuid4().hex[:12]}"
msg_id = f"msg_{uuid.uuid4().hex[:12]}"
output_obj = [{"type": "message", "id": msg_id, "role": "assistant",
"content": [{"type": "output_text", "text": text}]}]
error_output = [{"type": "error", "code": "rate_limit_error", "message": text}]
text_output = [{"type": "message", "id": msg_id, "role": "assistant",
"content": [{"type": "output_text", "text": text}]}]
if stream:
events = [
f"event: response.created\ndata: {json.dumps({'type':'response.created','response':{'id':resp_id,'object':'response','status':'in_progress'}})}\n\n",
@@ -8445,7 +8470,7 @@ class Handler(http.server.BaseHTTPRequestHandler):
f"event: response.output_text.done\ndata: {json.dumps({'type':'response.output_text.done','output_index':0,'content_index':0,'text':text})}\n\n",
f"event: response.content_part.done\ndata: {json.dumps({'type':'response.content_part.done','output_index':0,'content_index':0,'part':{'type':'output_text','text':text}})}\n\n",
f"event: response.output_item.done\ndata: {json.dumps({'type':'response.output_item.done','output_index':0,'item':{'type':'message','id':msg_id,'role':'assistant','content':[{'type':'output_text','text':text}]}})}\n\n",
f"event: response.completed\ndata: {json.dumps({'type':'response.completed','response':{'id':resp_id,'object':'response','status':'completed','output':output_obj}})}\n\n",
f"event: response.failed\ndata: {json.dumps({'type':'response.failed','response':{'id':resp_id,'object':'response','status':'failed','output':error_output}})}\n\n",
]
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
@@ -8456,8 +8481,9 @@ class Handler(http.server.BaseHTTPRequestHandler):
self.wfile.write(evt.encode())
self.wfile.flush()
else:
self.send_json(200, {"id": resp_id, "object": "response", "status": "completed",
"output": output_obj, "model": "gemini-3-flash"})
self.send_json(200, {"id": resp_id, "object": "response", "status": "failed",
"output": error_output + text_output, "model": "gemini-3-flash",
"error": {"type": "rate_limit_error", "message": text}})
return None
def stream_buffered_events(self, event_iter, flush_interval=0.03, max_bytes=4096, on_event=None):