v3.11.0: merge cobra PR, smart-continue, hot-reload, XML extraction
- Merge PR #5 from cobra91: concurrency semaphore, auto-continue, SO_REUSEADDR, proxy-stderr.log, stream diagnostics, timeout handler, restart proxy fix - Tool call argument normalizer, smart-continue loop, XML extraction - API key hot-reload with mtime tracking + /admin/ endpoints - GUI hot-reload on endpoint edit with upstream verification - Synthetic tool-results disabled (caused deepseek-v4-pro truncation) - Version bump 3.10.12 -> 3.11.0, rebuild .deb
This commit is contained in:
@@ -323,6 +323,8 @@ _conn_pool_lock = threading.Lock()
|
||||
_conn_pool = {}
|
||||
|
||||
_STREAM_IDLE_TIMEOUT = 300
|
||||
_MAX_CONCURRENT_REQUESTS = 3
|
||||
_request_semaphore = threading.Semaphore(_MAX_CONCURRENT_REQUESTS)
|
||||
|
||||
_CODEBUFF_AUTH_URL = "https://www.codebuff.com"
|
||||
_CODEBUFF_API_URL = "https://www.codebuff.com"
|
||||
@@ -4829,6 +4831,11 @@ class Handler(http.server.BaseHTTPRequestHandler):
|
||||
_last_user_urls.append(url_m.group(0))
|
||||
save_request_snapshot(request_id, body)
|
||||
_req_t0 = time.time()
|
||||
wait_start = time.monotonic()
|
||||
_request_semaphore.acquire()
|
||||
wait_ms = (time.monotonic() - wait_start) * 1000
|
||||
if wait_ms > 100:
|
||||
print(f"[{_sid}] waited {wait_ms:.0f}ms for upstream slot (concurrency gate)", file=sys.stderr)
|
||||
try:
|
||||
with RequestTracker(request_id) as tracker:
|
||||
if BACKEND == "auto":
|
||||
@@ -4847,6 +4854,8 @@ class Handler(http.server.BaseHTTPRequestHandler):
|
||||
except Exception as _snap_err:
|
||||
update_snapshot_response(request_id, "error", time.time() - _req_t0, _snap_err)
|
||||
raise
|
||||
finally:
|
||||
_request_semaphore.release()
|
||||
|
||||
def _handle_openai_compat(self, body, model, stream, tracker=None):
|
||||
input_data = body.get("input", "")
|
||||
@@ -4859,7 +4868,8 @@ class Handler(http.server.BaseHTTPRequestHandler):
|
||||
body = dict(body)
|
||||
body["input"] = input_data
|
||||
|
||||
if (policy.get("synthetic_tool_results") or _provider_cap(model, "synthetic_tool_results", False)) and isinstance(input_data, list):
|
||||
# synthetic tool-results disabled: causes deepseek-v4-pro truncation on opencode.ai
|
||||
if False and (policy.get("synthetic_tool_results") or _provider_cap(model, "synthetic_tool_results", False)) and isinstance(input_data, list):
|
||||
input_data, synthesized = synthesize_tool_results_for_chat(input_data)
|
||||
if synthesized:
|
||||
print("[provider-adapter] using synthetic tool-result continuation", file=sys.stderr)
|
||||
@@ -5739,11 +5749,25 @@ class Handler(http.server.BaseHTTPRequestHandler):
|
||||
break
|
||||
collected_events.append(event)
|
||||
_observe_event(event)
|
||||
print(f"[{self._session_id}] stream ended: events={len(collected_events)} finish={finish_reason} has_content={has_content} elapsed={time.time()-t0:.1f}s", file=sys.stderr)
|
||||
except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError):
|
||||
print("[translate-proxy] client disconnected during stream", file=sys.stderr)
|
||||
_crof_record(model, n_items, False)
|
||||
_log_resp(last_resp_id, "client_disconnect", last_output)
|
||||
return
|
||||
except (TimeoutError, OSError, urllib.error.URLError) as e:
|
||||
print(f"[translate-proxy] upstream error during stream: {type(e).__name__}: {e}", file=sys.stderr)
|
||||
err_resp_id = body.get("request_id") or body.get("id") or uid("resp")
|
||||
try:
|
||||
self.wfile.write(emit("response.failed", {"type": "response.failed",
|
||||
"response": {"id": err_resp_id, "error": {"type": "upstream_error",
|
||||
"code": "stream_interrupted", "message": str(e)[:200]}}}).encode())
|
||||
self.wfile.flush()
|
||||
except Exception:
|
||||
pass
|
||||
_crof_record(model, n_items, False)
|
||||
_log_resp(last_resp_id, "upstream_error", last_output)
|
||||
return
|
||||
|
||||
# Record outcome
|
||||
success = (finish_reason != "length")
|
||||
@@ -5819,43 +5843,160 @@ class Handler(http.server.BaseHTTPRequestHandler):
|
||||
except Exception as e:
|
||||
print(f"[crof-adaptive] retry failed: {e}", file=sys.stderr)
|
||||
|
||||
# ── Auto-continue for truncated responses ── (cobra PR)
|
||||
_ac_did_run = False
|
||||
if stream and collected_events:
|
||||
_ac_text = ""
|
||||
_ac_msg_id = _ac_resp_id = None
|
||||
for _ev in collected_events:
|
||||
for _ln in _ev.strip().split("\n"):
|
||||
if not _ln.startswith("data: "):
|
||||
continue
|
||||
try:
|
||||
_d = json.loads(_ln[6:])
|
||||
_t = _d.get("type")
|
||||
if _t == "response.output_text.done":
|
||||
_ac_text = _d.get("text", "")
|
||||
elif _t == "response.output_item.added" and _d.get("item",{}).get("type") == "message":
|
||||
_ac_msg_id = _d.get("item",{}).get("id")
|
||||
elif _t == "response.completed":
|
||||
_ac_resp_id = _d.get("response",{}).get("id")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
_ac_tc = reasoning_out.get("tool_calls", [])
|
||||
_ac_truncated = False
|
||||
if not _ac_tc and _ac_text:
|
||||
_ac_stripped = _ac_text.rstrip()
|
||||
if finish_reason == "length":
|
||||
_ac_truncated = True
|
||||
elif len(_ac_stripped) > 10 and _ac_stripped[-1] in "(:,;…":
|
||||
_ac_truncated = True
|
||||
|
||||
if _ac_truncated and _ac_text:
|
||||
print(f"[{self._session_id}] auto-continue: truncated (finish={finish_reason}, ends '{_ac_text.rstrip()[-10:]}')", file=sys.stderr)
|
||||
_ac_did_run = True
|
||||
_ac_cut = len(collected_events)
|
||||
for _i, _ev2 in enumerate(collected_events):
|
||||
if "response.output_text.done" in _ev2:
|
||||
_ac_cut = _i
|
||||
break
|
||||
collected_events = collected_events[:_ac_cut]
|
||||
|
||||
_ac_accumulated = _ac_text
|
||||
_ac_max = 3
|
||||
for _ac_attempt in range(_ac_max):
|
||||
try:
|
||||
_ac_cont_msgs = list(chat_body.get("messages", []))
|
||||
_ac_cont_msgs.append({"role": "assistant", "content": _ac_accumulated})
|
||||
_ac_cont_msgs.append({"role": "user", "content": "Continue exactly where you left off. Do not repeat anything already written."})
|
||||
_ac_cont_body = dict(chat_body)
|
||||
_ac_cont_body["messages"] = _ac_cont_msgs
|
||||
_ac_cont_body["stream"] = False
|
||||
_ac_cont_req = urllib.request.Request(target, data=json.dumps(_ac_cont_body).encode(), headers=fwd)
|
||||
_ac_cont_resp = json.loads(urllib.request.urlopen(_ac_cont_req, timeout=120).read())
|
||||
_ac_choices = _ac_cont_resp.get("choices", [])
|
||||
if _ac_choices:
|
||||
_ac_chunk = _ac_choices[0].get("message",{}).get("content","")
|
||||
if not _ac_chunk:
|
||||
_ac_chunk = _ac_choices[0].get("delta",{}).get("content","")
|
||||
_ac_finish = _ac_choices[0].get("finish_reason")
|
||||
if _ac_chunk:
|
||||
_ac_accumulated += _ac_chunk
|
||||
collected_events.append(emit("response.output_text.delta", {
|
||||
"type": "response.output_text.delta",
|
||||
"delta": _ac_chunk, "item_id": _ac_msg_id, "content_index": 0}))
|
||||
if _ac_finish != "length":
|
||||
break
|
||||
_ac_text = _ac_accumulated
|
||||
except Exception as _ac_e:
|
||||
print(f"[{self._session_id}] auto-continue attempt {_ac_attempt+1} failed: {_ac_e}", file=sys.stderr)
|
||||
break
|
||||
|
||||
if _ac_msg_id:
|
||||
collected_events.append(emit("response.output_text.done", {
|
||||
"type": "response.output_text.done",
|
||||
"text": _ac_accumulated, "item_id": _ac_msg_id, "content_index": 0}))
|
||||
collected_events.append(emit("response.content_part.done", {
|
||||
"type": "response.content_part.done",
|
||||
"part": {"type": "output_text", "text": _ac_accumulated, "annotations": []}, "item_id": _ac_msg_id}))
|
||||
collected_events.append(emit("response.output_item.done", {
|
||||
"type": "response.output_item.done",
|
||||
"item": {"type": "message", "id": _ac_msg_id, "role": "assistant", "status": "completed",
|
||||
"content": [{"type": "output_text", "text": _ac_accumulated, "annotations": []}]}}))
|
||||
if _ac_resp_id:
|
||||
collected_events.append(emit("response.completed", {
|
||||
"type": "response.completed",
|
||||
"response": {"id": _ac_resp_id, "object": "response", "model": model,
|
||||
"status": "completed", "created": int(time.time()),
|
||||
"output": [{"type": "message", "id": _ac_msg_id, "role": "assistant",
|
||||
"status": "completed",
|
||||
"content": [{"type": "output_text", "text": _ac_accumulated, "annotations": []}]}]}}))
|
||||
has_content = True
|
||||
finish_reason = "stop"
|
||||
print(f"[{self._session_id}] auto-continue done: {len(_ac_text)} -> {len(_ac_accumulated)} chars", file=sys.stderr)
|
||||
|
||||
# Smart continuation: loop with escalating nudges when model stops text-only mid-task.
|
||||
_smart_max = 2
|
||||
_smart_attempt = 0
|
||||
while _smart_attempt < _smart_max:
|
||||
_has_tool_calls_in_output = any(o.get("type") == "function_call" for o in (last_output or []))
|
||||
if not (finish_reason == "stop" and has_content and not _has_tool_calls_in_output
|
||||
and isinstance(input_data, list) and len(input_data) >= 3
|
||||
and has_function_call_output(input_data)):
|
||||
break
|
||||
_smart_attempt += 1
|
||||
_nudges = [
|
||||
"Continue with the task using tool calls. Do NOT describe what to do — call the appropriate functions.",
|
||||
"You MUST use tool calls to complete the task. Read files, run commands, and make changes using tools. Do NOT output XML tool calls as text.",
|
||||
]
|
||||
nudge_text = _nudges[min(_smart_attempt - 1, len(_nudges) - 1)]
|
||||
# Try extracting XML tool calls from text as fallback before nudging
|
||||
last_text = ""
|
||||
for o in (last_output or []):
|
||||
if o.get("type") == "message":
|
||||
for c in (o.get("content") or []):
|
||||
if isinstance(c, dict) and c.get("type") == "output_text":
|
||||
last_text += c.get("text", "")
|
||||
xml_fc = _extract_xml_tool_calls(last_text)
|
||||
if xml_fc:
|
||||
print(f"[{self._session_id}] [smart-continue] extracted {len(xml_fc)} XML tool calls from text, injecting and retrying", file=sys.stderr)
|
||||
fake_input = list(input_data)
|
||||
for xfc in xml_fc:
|
||||
fake_input.append({"type": "function_call", "id": uid("fcx"), "call_id": uid("fcx"),
|
||||
"name": xfc["name"], "arguments": xfc["args"], "status": "completed"})
|
||||
fake_messages = oa_input_to_messages(fake_input)
|
||||
# Skip if auto-continue already handled the response.
|
||||
if not _ac_did_run:
|
||||
_smart_max = 2
|
||||
_smart_attempt = 0
|
||||
while _smart_attempt < _smart_max:
|
||||
_has_tool_calls_in_output = any(o.get("type") == "function_call" for o in (last_output or []))
|
||||
if not (finish_reason == "stop" and has_content and not _has_tool_calls_in_output
|
||||
and isinstance(input_data, list) and len(input_data) >= 3
|
||||
and has_function_call_output(input_data)):
|
||||
break
|
||||
_smart_attempt += 1
|
||||
_nudges = [
|
||||
"Continue with the task using tool calls. Do NOT describe what to do — call the appropriate functions.",
|
||||
"You MUST use tool calls to complete the task. Read files, run commands, and make changes using tools. Do NOT output XML tool calls as text.",
|
||||
]
|
||||
nudge_text = _nudges[min(_smart_attempt - 1, len(_nudges) - 1)]
|
||||
# Try extracting XML tool calls from text as fallback before nudging
|
||||
last_text = ""
|
||||
for o in (last_output or []):
|
||||
if o.get("type") == "message":
|
||||
for c in (o.get("content") or []):
|
||||
if isinstance(c, dict) and c.get("type") == "output_text":
|
||||
last_text += c.get("text", "")
|
||||
xml_fc = _extract_xml_tool_calls(last_text)
|
||||
if xml_fc:
|
||||
print(f"[{self._session_id}] [smart-continue] extracted {len(xml_fc)} XML tool calls from text, injecting and retrying", file=sys.stderr)
|
||||
fake_input = list(input_data)
|
||||
for xfc in xml_fc:
|
||||
fake_input.append({"type": "function_call", "id": uid("fcx"), "call_id": uid("fcx"),
|
||||
"name": xfc["name"], "arguments": xfc["args"], "status": "completed"})
|
||||
fake_messages = oa_input_to_messages(fake_input)
|
||||
instructions = body.get("instructions", "").strip()
|
||||
if instructions:
|
||||
fake_messages.insert(0, {"role": "system", "content": instructions})
|
||||
fake_chat_body = self._build_chat_body(model, fake_messages, body, stream)
|
||||
fake_req = urllib.request.Request(target, data=json.dumps(fake_chat_body).encode(), headers=fwd)
|
||||
try:
|
||||
retry_upstream = urllib.request.urlopen(fake_req, timeout=_upstream_timeout(body, True))
|
||||
collected_events = []
|
||||
last_resp_id = last_output = last_status = None
|
||||
finish_reason = None
|
||||
has_content = False
|
||||
for event in oa_stream_to_sse(retry_upstream, model, body.get("request_id") or body.get("id")):
|
||||
collected_events.append(event)
|
||||
_observe_event(event)
|
||||
input_data = fake_input
|
||||
continue
|
||||
except Exception as e:
|
||||
print(f"[{self._session_id}] [smart-continue] XML injection retry failed: {e}", file=sys.stderr)
|
||||
break
|
||||
_nudge_msg = {"role": "user", "content": nudge_text}
|
||||
nudge_messages = oa_input_to_messages(input_data) + [_nudge_msg]
|
||||
instructions = body.get("instructions", "").strip()
|
||||
if instructions:
|
||||
fake_messages.insert(0, {"role": "system", "content": instructions})
|
||||
fake_chat_body = self._build_chat_body(model, fake_messages, body, stream)
|
||||
fake_req = urllib.request.Request(target, data=json.dumps(fake_chat_body).encode(), headers=fwd)
|
||||
nudge_messages.insert(0, {"role": "system", "content": instructions})
|
||||
nudge_chat_body = self._build_chat_body(model, nudge_messages, body, stream)
|
||||
nudge_req = urllib.request.Request(target, data=json.dumps(nudge_chat_body).encode(), headers=fwd)
|
||||
print(f"[{self._session_id}] [smart-continue] attempt {_smart_attempt}/{_smart_max}: model stopped mid-task, nudging", file=sys.stderr)
|
||||
try:
|
||||
retry_upstream = urllib.request.urlopen(fake_req, timeout=_upstream_timeout(body, True))
|
||||
retry_upstream = urllib.request.urlopen(nudge_req, timeout=_upstream_timeout(body, True))
|
||||
collected_events = []
|
||||
last_resp_id = last_output = last_status = None
|
||||
finish_reason = None
|
||||
@@ -5863,31 +6004,9 @@ class Handler(http.server.BaseHTTPRequestHandler):
|
||||
for event in oa_stream_to_sse(retry_upstream, model, body.get("request_id") or body.get("id")):
|
||||
collected_events.append(event)
|
||||
_observe_event(event)
|
||||
input_data = fake_input
|
||||
continue
|
||||
except Exception as e:
|
||||
print(f"[{self._session_id}] [smart-continue] XML injection retry failed: {e}", file=sys.stderr)
|
||||
print(f"[{self._session_id}] [smart-continue] nudge attempt {_smart_attempt} failed: {e}", file=sys.stderr)
|
||||
break
|
||||
_nudge_msg = {"role": "user", "content": nudge_text}
|
||||
nudge_messages = oa_input_to_messages(input_data) + [_nudge_msg]
|
||||
instructions = body.get("instructions", "").strip()
|
||||
if instructions:
|
||||
nudge_messages.insert(0, {"role": "system", "content": instructions})
|
||||
nudge_chat_body = self._build_chat_body(model, nudge_messages, body, stream)
|
||||
nudge_req = urllib.request.Request(target, data=json.dumps(nudge_chat_body).encode(), headers=fwd)
|
||||
print(f"[{self._session_id}] [smart-continue] attempt {_smart_attempt}/{_smart_max}: model stopped mid-task, nudging", file=sys.stderr)
|
||||
try:
|
||||
retry_upstream = urllib.request.urlopen(nudge_req, timeout=_upstream_timeout(body, True))
|
||||
collected_events = []
|
||||
last_resp_id = last_output = last_status = None
|
||||
finish_reason = None
|
||||
has_content = False
|
||||
for event in oa_stream_to_sse(retry_upstream, model, body.get("request_id") or body.get("id")):
|
||||
collected_events.append(event)
|
||||
_observe_event(event)
|
||||
except Exception as e:
|
||||
print(f"[{self._session_id}] [smart-continue] nudge attempt {_smart_attempt} failed: {e}", file=sys.stderr)
|
||||
break
|
||||
|
||||
self.stream_buffered_events(collected_events)
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user