Files
Codex-Launcher---Any-AI-Por…/src/translate-proxy.py

860 lines
38 KiB
Python
Executable File

#!/usr/bin/env python3
"""
translate-proxy.py — Responses API → backend API translation proxy.
Backends:
openai-compat — any OpenAI-compatible Chat Completions API
anthropic — Anthropic Messages API
Usage:
python3 translate-proxy.py --config proxy-config.json
python3 translate-proxy.py --backend openai-compat --target-url https://... --api-key sk-...
"""
import json, http.server, urllib.request, time, uuid, os, sys, argparse
# ═══════════════════════════════════════════════════════════════════
# Config
# ═══════════════════════════════════════════════════════════════════
DEFAULT_MODELS = {
"openai-compat": [
{"id": "gpt-4o-mini", "object": "model", "created": 1700000000, "owned_by": "custom"},
],
"anthropic": [
{"id": "claude-sonnet-4-20250514", "object": "model", "created": 1700000000, "owned_by": "anthropic"},
],
}
def load_config():
p = argparse.ArgumentParser(description="Responses API translation proxy")
p.add_argument("--config", help="JSON config file path")
p.add_argument("--port", type=int, default=None)
p.add_argument("--backend", default=None, choices=["openai-compat", "anthropic", "command-code"])
p.add_argument("--target-url", default=None)
p.add_argument("--api-key", default=None)
p.add_argument("--models-file", default=None, help="JSON file with model list array")
args = p.parse_args()
cfg = {}
if args.config:
with open(args.config) as f:
cfg = json.load(f)
for ck, ak in [("port", "port"), ("backend_type", "backend"),
("target_url", "target_url"), ("api_key", "api_key")]:
v = getattr(args, ak, None)
if v is not None:
cfg[ck] = v
env_map = {
"port": ("PROXY_PORT", "ZAI_PROXY_PORT", int),
"backend_type": ("PROXY_BACKEND", None, str),
"target_url": ("PROXY_TARGET_URL", "ZAI_BASE_URL", str),
"api_key": ("PROXY_API_KEY", "ZAI_API_KEY", str),
}
for ck, (ev1, ev2, conv) in env_map.items():
if ck not in cfg:
v = os.environ.get(ev1) or (os.environ.get(ev2) if ev2 else None)
if v:
cfg[ck] = conv(v) if conv == int else v
cfg.setdefault("port", 8080)
cfg.setdefault("backend_type", "openai-compat")
cfg.setdefault("target_url", "http://localhost:11434/v1")
cfg.setdefault("api_key", "")
models = cfg.get("models", [])
if not models and args.models_file:
with open(args.models_file) as f:
models = json.load(f)
if not models:
models = DEFAULT_MODELS.get(cfg["backend_type"], [])
cfg["models"] = models
return cfg
CONFIG = load_config()
PORT = CONFIG["port"]
BACKEND = CONFIG["backend_type"]
TARGET_URL = CONFIG["target_url"].rstrip("/")
API_KEY = CONFIG["api_key"]
MODELS = CONFIG["models"]
CC_VERSION = CONFIG.get("cc_version", "")
# ═══════════════════════════════════════════════════════════════════
# Shared helpers
# ═══════════════════════════════════════════════════════════════════
_pool = uuid.uuid4().hex[:8]
_HOP_BY_HOP_HEADERS = {
"connection",
"keep-alive",
"proxy-authenticate",
"proxy-authorization",
"te",
"trailers",
"transfer-encoding",
"upgrade",
"host",
"content-length",
}
def uid(prefix="id"):
return f"{prefix}-{_pool}-{uuid.uuid4().hex[:12]}"
def emit(event, data):
return f"event: {event}\ndata: {json.dumps(data)}\n\n"
def upstream_target(base_url, suffix):
base = base_url.rstrip("/")
if base.endswith(suffix):
return base
return f"{base}{suffix}"
_BROWSER_HEADERS = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36",
"Accept": "application/json, text/event-stream, */*",
"Accept-Language": "en-US,en;q=0.9",
"Sec-Ch-Ua": '"Chromium";v="137", "Not/A)Brand";v="99"',
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"Linux"',
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
}
def forwarded_headers(request_headers, extra=None, browser_ua=False):
headers = {}
if browser_ua:
headers.update(_BROWSER_HEADERS)
for key, value in request_headers.items():
if key.lower() in _HOP_BY_HOP_HEADERS:
continue
if browser_ua and key.lower() == "user-agent":
continue
headers[key] = value
if extra:
headers.update(extra)
return headers
# ═══════════════════════════════════════════════════════════════════
# OpenAI-compat backend
# ═══════════════════════════════════════════════════════════════════
def oa_input_to_messages(input_data):
msgs = []
if isinstance(input_data, str):
msgs.append({"role": "user", "content": input_data})
elif isinstance(input_data, list):
pending_tool_calls = []
for item in input_data:
t = item.get("type")
if t == "function_call":
pending_tool_calls.append(
{"id": item.get("call_id", item.get("id", uid("tc"))),
"type": "function",
"function": {"name": item.get("name", ""),
"arguments": item.get("arguments", "{}")}})
continue
if pending_tool_calls:
msgs.append({"role": "assistant", "content": None, "tool_calls": pending_tool_calls})
pending_tool_calls = []
if t == "message":
role = item.get("role", "user")
if role == "developer":
role = "system"
text = ""
for part in item.get("content", []):
pt = part.get("type", "")
if pt in ("input_text", "output_text"):
text += part.get("text", "")
elif pt == "input_image":
img = part.get("image_url", part)
msgs.append({"role": role, "content": [{"type": "text", "text": text},
{"type": "image_url", "image_url": img}]})
text = None
break
if text is not None:
msgs.append({"role": role, "content": text})
elif t == "function_call_output":
msgs.append({"role": "tool", "tool_call_id": item.get("id", ""),
"content": item.get("output", "")})
if pending_tool_calls:
msgs.append({"role": "assistant", "content": None, "tool_calls": pending_tool_calls})
return msgs
def oa_convert_tools(tools):
if not tools:
return None
out = []
for t in tools:
if t.get("type") != "function":
continue
fn = t.get("function", {})
if fn:
out.append(t)
else:
out.append({
"type": "function",
"function": {"name": t.get("name", ""), "description": t.get("description", ""),
"parameters": t.get("parameters", {})}
})
return out or None
def oa_resp_to_responses(chat_resp, model, resp_id=None):
choice = chat_resp["choices"][0]
msg = choice["message"]
content = msg.get("content") or ""
finish = choice.get("finish_reason", "stop")
fm = {"stop": "completed", "length": "incomplete", "tool_calls": "completed", "content_filter": "incomplete"}
status = fm.get(finish, "incomplete")
outputs = []
rc = msg.get("reasoning_content")
if rc:
outputs.append({"type": "reasoning", "id": uid("rsn"), "status": "completed",
"content": [{"type": "text", "text": rc}]})
if content:
outputs.append({"type": "message", "id": uid("msg"), "role": "assistant", "status": "completed",
"content": [{"type": "output_text", "text": content, "annotations": []}]})
for tc in msg.get("tool_calls") or []:
fn = tc.get("function", {})
outputs.append({"type": "function_call", "id": uid("fc"), "call_id": tc.get("id"),
"name": fn.get("name"), "arguments": fn.get("arguments", "{}"), "status": "completed"})
usage = chat_resp.get("usage", {})
return {"id": resp_id or uid("resp"), "object": "response", "created": int(time.time()),
"model": model, "status": status, "output": outputs,
"usage": {"input_tokens": usage.get("prompt_tokens", 0),
"output_tokens": usage.get("completion_tokens", 0),
"total_tokens": usage.get("total_tokens", 0),
"input_tokens_details": {"cached_tokens": usage.get("prompt_tokens_details", {}).get("cached_tokens", 0)}}}
def oa_stream_to_sse(chat_stream, model, req_id):
resp_id = req_id or uid("resp")
msg_id = uid("msg")
text_buf = ""
tc_buf = {}
fr = None
yield emit("response.created", {"type": "response.created",
"response": {"id": resp_id, "object": "response", "model": model,
"status": "in_progress", "created": int(time.time()), "output": []}})
yield emit("response.in_progress", {"type": "response.in_progress", "response": {"id": resp_id}})
yield emit("response.output_item.added", {"type": "response.output_item.added",
"item": {"type": "message", "id": msg_id, "role": "assistant", "status": "in_progress", "content": []}})
yield emit("response.content_part.added", {"type": "response.content_part.added",
"part": {"type": "output_text", "text": "", "annotations": []}, "item_id": msg_id})
for line in chat_stream:
line = line.decode("utf-8", errors="replace").strip()
if not line or line.startswith(":") or line == "data: [DONE]":
continue
if not line.startswith("data: "):
continue
try:
chunk = json.loads(line[6:])
except json.JSONDecodeError:
continue
choices = chunk.get("choices", [])
if not choices:
continue
delta = choices[0].get("delta", {})
fr = choices[0].get("finish_reason")
content = delta.get("content")
if content:
text_buf += content
yield emit("response.output_text.delta", {"type": "response.output_text.delta",
"delta": content, "item_id": msg_id, "content_index": 0})
for tc in delta.get("tool_calls") or []:
idx = tc.get("index", 0)
if idx not in tc_buf:
fid = uid("fc")
tc_buf[idx] = {"id": fid, "call_id": tc.get("id", fid), "name": "", "args": ""}
yield emit("response.output_item.added", {"type": "response.output_item.added",
"item": {"type": "function_call", "id": fid, "call_id": tc_buf[idx]["call_id"],
"name": "", "arguments": "", "status": "in_progress"}})
fn = tc.get("function", {})
if "name" in fn and fn["name"]:
tc_buf[idx]["name"] = fn["name"]
if "arguments" in fn and fn["arguments"]:
tc_buf[idx]["args"] += fn["arguments"]
yield emit("response.output_text.delta", {"type": "response.function_call_arguments.delta",
"delta": fn["arguments"], "item_id": tc_buf[idx]["id"]})
rc = delta.get("reasoning_content")
if rc:
yield emit("response.reasoning.delta", {"type": "response.reasoning.delta", "delta": rc})
if text_buf:
yield emit("response.output_text.done", {"type": "response.output_text.done",
"text": text_buf, "item_id": msg_id, "content_index": 0})
yield emit("response.content_part.done", {"type": "response.content_part.done",
"part": {"type": "output_text", "text": text_buf, "annotations": []}, "item_id": msg_id})
yield emit("response.output_item.done", {"type": "response.output_item.done",
"item": {"type": "message", "id": msg_id, "role": "assistant", "status": "completed",
"content": [{"type": "output_text", "text": text_buf, "annotations": []}]}})
for idx in sorted(tc_buf):
t = tc_buf[idx]
yield emit("response.function_call_arguments.done", {"type": "response.function_call_arguments.done",
"item_id": t["id"], "name": t["name"], "arguments": t["args"]})
yield emit("response.output_item.done", {"type": "response.output_item.done",
"item": {"type": "function_call", "id": t["id"], "call_id": t["call_id"],
"name": t["name"], "arguments": t["args"], "status": "completed"}})
fm = {"stop": "completed", "length": "incomplete", "tool_calls": "completed", "content_filter": "incomplete"}
status = fm.get(fr, "incomplete")
final_out = []
if text_buf:
final_out.append({"type": "message", "id": msg_id, "role": "assistant", "status": "completed",
"content": [{"type": "output_text", "text": text_buf, "annotations": []}]})
for idx in sorted(tc_buf):
t = tc_buf[idx]
final_out.append({"type": "function_call", "id": t["id"], "call_id": t["call_id"],
"name": t["name"], "arguments": t["args"], "status": "completed"})
yield emit("response.completed", {"type": "response.completed",
"response": {"id": resp_id, "object": "response", "model": model,
"status": status, "created": int(time.time()), "output": final_out}})
# ═══════════════════════════════════════════════════════════════════
# Anthropic backend
# ═══════════════════════════════════════════════════════════════════
def an_input_to_messages(input_data):
msgs = []
if isinstance(input_data, str):
msgs.append({"role": "user", "content": input_data})
elif isinstance(input_data, list):
for item in input_data:
t = item.get("type")
if t == "message":
role = item.get("role", "user")
if role == "developer":
role = "user"
text = ""
for part in item.get("content", []):
pt = part.get("type", "")
if pt in ("input_text", "output_text"):
text += part.get("text", "")
if role == "assistant":
msgs.append({"role": "assistant", "content": text})
else:
msgs.append({"role": "user", "content": text})
elif t == "function_call":
msgs.append({"role": "assistant", "content": [
{"type": "tool_use", "id": item.get("call_id", item.get("id", uid("tu"))),
"name": item.get("name", ""),
"input": json.loads(item.get("arguments", "{}"))}
]})
elif t == "function_call_output":
msgs.append({"role": "user", "content": [
{"type": "tool_result", "tool_use_id": item.get("id", ""),
"content": item.get("output", "")}
]})
return msgs
def an_convert_tools(tools):
if not tools:
return None
out = []
for t in tools:
if t.get("type") != "function":
continue
fn = t.get("function", {})
if fn:
out.append({"name": fn.get("name"), "description": fn.get("description", ""),
"input_schema": fn.get("parameters", {"type": "object", "properties": {}})})
else:
out.append({"name": t.get("name"), "description": t.get("description", ""),
"input_schema": t.get("parameters", {"type": "object", "properties": {}})})
return out or None
def an_resp_to_responses(anthro_resp, model, resp_id=None):
blocks = anthro_resp.get("content", [])
sr = anthro_resp.get("stop_reason", "end_turn")
sm = {"end_turn": "completed", "max_tokens": "incomplete", "stop_sequence": "completed", "tool_use": "completed"}
status = sm.get(sr, "incomplete")
outputs = []
for b in blocks:
bt = b.get("type", "")
if bt == "text":
outputs.append({"type": "message", "id": uid("msg"), "role": "assistant", "status": "completed",
"content": [{"type": "output_text", "text": b.get("text", ""), "annotations": []}]})
elif bt == "tool_use":
outputs.append({"type": "function_call", "id": uid("fc"), "call_id": b.get("id", ""),
"name": b.get("name", ""), "arguments": json.dumps(b.get("input", {})),
"status": "completed"})
elif bt == "thinking":
outputs.append({"type": "reasoning", "id": uid("rsn"), "status": "completed",
"content": [{"type": "text", "text": b.get("thinking", "")}]})
usage = anthro_resp.get("usage", {})
return {"id": resp_id or uid("resp"), "object": "response", "created": int(time.time()),
"model": model, "status": status, "output": outputs,
"usage": {"input_tokens": usage.get("input_tokens", 0),
"output_tokens": usage.get("output_tokens", 0),
"total_tokens": usage.get("input_tokens", 0) + usage.get("output_tokens", 0),
"input_tokens_details": {"cached_tokens": 0}}}
def an_stream_to_sse(stream, model, req_id):
resp_id = req_id or uid("resp")
completed = []
msg_id = uid("msg")
text_buf = ""
tc_id = None
tc_call_id = None
tc_name = ""
tc_args = ""
block_type = None
stop_reason = "end_turn"
yield emit("response.created", {"type": "response.created",
"response": {"id": resp_id, "object": "response", "model": model,
"status": "in_progress", "created": int(time.time()), "output": []}})
yield emit("response.in_progress", {"type": "response.in_progress", "response": {"id": resp_id}})
for raw in stream:
line = raw.decode("utf-8", errors="replace").strip()
if not line:
continue
if line.startswith("event: "):
evt_type = line[7:]
continue
if not line.startswith("data: "):
continue
try:
data = json.loads(line[6:])
except json.JSONDecodeError:
continue
et = data.get("type", "")
if et == "message_start":
pass
elif et == "content_block_start":
cb_type = data.get("content_block", {}).get("type", "")
block_type = cb_type
if cb_type == "text":
msg_id = uid("msg")
yield emit("response.output_item.added", {"type": "response.output_item.added",
"item": {"type": "message", "id": msg_id, "role": "assistant",
"status": "in_progress", "content": []}})
yield emit("response.content_part.added", {"type": "response.content_part.added",
"part": {"type": "output_text", "text": "", "annotations": []}, "item_id": msg_id})
elif cb_type == "tool_use":
cb = data.get("content_block", {})
tc_id = uid("fc")
tc_call_id = cb.get("id", tc_id)
tc_name = cb.get("name", "")
yield emit("response.output_item.added", {"type": "response.output_item.added",
"item": {"type": "function_call", "id": tc_id, "call_id": tc_call_id,
"name": tc_name, "arguments": "", "status": "in_progress"}})
elif cb_type == "thinking":
pass
elif et == "content_block_delta":
dd = data.get("delta", {})
dt = dd.get("type", "")
if dt == "text_delta":
txt = dd.get("text", "")
text_buf += txt
yield emit("response.output_text.delta", {"type": "response.output_text.delta",
"delta": txt, "item_id": msg_id, "content_index": 0})
elif dt == "input_json_delta":
pj = dd.get("partial_json", "")
tc_args += pj
yield emit("response.output_text.delta", {"type": "response.function_call_arguments.delta",
"delta": pj, "item_id": tc_id})
elif dt == "thinking_delta":
tk = dd.get("thinking", "")
yield emit("response.reasoning.delta", {"type": "response.reasoning.delta", "delta": tk})
elif et == "content_block_stop":
if block_type == "text":
yield emit("response.output_text.done", {"type": "response.output_text.done",
"text": text_buf, "item_id": msg_id, "content_index": 0})
yield emit("response.content_part.done", {"type": "response.content_part.done",
"part": {"type": "output_text", "text": text_buf, "annotations": []}, "item_id": msg_id})
yield emit("response.output_item.done", {"type": "response.output_item.done",
"item": {"type": "message", "id": msg_id, "role": "assistant", "status": "completed",
"content": [{"type": "output_text", "text": text_buf, "annotations": []}]}})
completed.append({"type": "message", "id": msg_id, "role": "assistant", "status": "completed",
"content": [{"type": "output_text", "text": text_buf, "annotations": []}]})
text_buf = ""
elif block_type == "tool_use":
yield emit("response.function_call_arguments.done", {"type": "response.function_call_arguments.done",
"item_id": tc_id, "name": tc_name, "arguments": tc_args})
yield emit("response.output_item.done", {"type": "response.output_item.done",
"item": {"type": "function_call", "id": tc_id, "call_id": tc_call_id,
"name": tc_name, "arguments": tc_args, "status": "completed"}})
completed.append({"type": "function_call", "id": tc_id, "call_id": tc_call_id,
"name": tc_name, "arguments": tc_args, "status": "completed"})
tc_id = None
tc_args = ""
block_type = None
elif et == "message_delta":
stop_reason = data.get("delta", {}).get("stop_reason", "end_turn")
elif et == "message_stop":
sm = {"end_turn": "completed", "max_tokens": "incomplete",
"stop_sequence": "completed", "tool_use": "completed"}
status = sm.get(stop_reason, "incomplete")
yield emit("response.completed", {"type": "response.completed",
"response": {"id": resp_id, "object": "response", "model": model,
"status": status, "created": int(time.time()), "output": completed}})
_DEFAULT_CC_CONFIG = {
"workingDir": "/tmp",
"date": "",
"environment": "linux",
"shell": "bash",
"files": [],
"structure": [],
"isGitRepo": False,
"currentBranch": "",
"mainBranch": "",
"gitStatus": "",
"recentCommits": [],
}
def _cc_config():
cfg = dict(_DEFAULT_CC_CONFIG)
cfg["date"] = time.strftime("%Y-%m-%d")
return cfg
def cc_input_to_messages(input_data):
return oa_input_to_messages(input_data)
def cc_convert_tools(tools):
return oa_convert_tools(tools)
def cc_resp_to_responses(cc_lines, model, resp_id=None):
text = ""
usage = {}
for line in cc_lines:
try:
d = json.loads(line)
except (json.JSONDecodeError, TypeError):
continue
t = d.get("type", "")
if t == "text-delta":
text += d.get("text", "")
elif t == "finish-step":
u = d.get("usage", {})
usage = {
"input_tokens": u.get("inputTokens", 0),
"output_tokens": u.get("outputTokens", 0),
"total_tokens": u.get("inputTokens", 0) + u.get("outputTokens", 0),
}
outputs = []
if text:
outputs.append({"type": "message", "id": uid("msg"), "role": "assistant",
"status": "completed",
"content": [{"type": "output_text", "text": text, "annotations": []}]})
return {"id": resp_id or uid("resp"), "object": "response", "created": int(time.time()),
"model": model, "status": "completed", "output": outputs,
"usage": {"input_tokens": usage.get("input_tokens", 0),
"output_tokens": usage.get("output_tokens", 0),
"total_tokens": usage.get("total_tokens", 0),
"input_tokens_details": {"cached_tokens": 0}}}
def cc_stream_to_sse(cc_stream, model, req_id):
resp_id = req_id or uid("resp")
msg_id = uid("msg")
text_buf = ""
yield emit("response.created", {"type": "response.created",
"response": {"id": resp_id, "object": "response", "model": model,
"status": "in_progress", "created": int(time.time()), "output": []}})
yield emit("response.in_progress", {"type": "response.in_progress", "response": {"id": resp_id}})
yield emit("response.output_item.added", {"type": "response.output_item.added",
"item": {"type": "message", "id": msg_id, "role": "assistant", "status": "in_progress", "content": []}})
yield emit("response.content_part.added", {"type": "response.content_part.added",
"part": {"type": "output_text", "text": "", "annotations": []}, "item_id": msg_id})
total_usage = {}
for raw in cc_stream:
line = raw.decode("utf-8", errors="replace").strip()
if not line:
continue
try:
d = json.loads(line)
except json.JSONDecodeError:
continue
t = d.get("type", "")
if t == "text-delta":
txt = d.get("text", "")
if txt:
text_buf += txt
yield emit("response.output_text.delta", {"type": "response.output_text.delta",
"delta": txt, "item_id": msg_id, "content_index": 0})
elif t == "finish-step":
u = d.get("usage", {})
total_usage = {
"input_tokens": u.get("inputTokens", 0),
"output_tokens": u.get("outputTokens", 0),
"total_tokens": u.get("inputTokens", 0) + u.get("outputTokens", 0),
}
if text_buf:
yield emit("response.output_text.done", {"type": "response.output_text.done",
"text": text_buf, "item_id": msg_id, "content_index": 0})
yield emit("response.content_part.done", {"type": "response.content_part.done",
"part": {"type": "output_text", "text": text_buf, "annotations": []}, "item_id": msg_id})
yield emit("response.output_item.done", {"type": "response.output_item.done",
"item": {"type": "message", "id": msg_id, "role": "assistant", "status": "completed",
"content": [{"type": "output_text", "text": text_buf, "annotations": []}]}})
final_out = []
if text_buf:
final_out.append({"type": "message", "id": msg_id, "role": "assistant", "status": "completed",
"content": [{"type": "output_text", "text": text_buf, "annotations": []}]})
yield emit("response.completed", {"type": "response.completed",
"response": {"id": resp_id, "object": "response", "model": model,
"status": "completed", "created": int(time.time()), "output": final_out,
"usage": total_usage}})
# ═══════════════════════════════════════════════════════════════════
# HTTP Server
# ═══════════════════════════════════════════════════════════════════
class Handler(http.server.BaseHTTPRequestHandler):
protocol_version = "HTTP/1.1"
def do_GET(self):
if self.path in ("/v1/models", "/models"):
self.send_json(200, {"object": "list", "data": MODELS})
else:
self.send_error(404)
def do_POST(self):
if self.path in ("/v1/responses", "/responses"):
self._handle()
else:
self.send_error(404)
def _handle(self):
try:
clen = int(self.headers.get("Content-Length", 0))
body = json.loads(self.rfile.read(clen))
except Exception as e:
return self.send_json(400, {"error": {"message": f"Bad request: {e}"}})
model = body.get("model", MODELS[0]["id"] if MODELS else "unknown")
stream = body.get("stream", False)
if BACKEND == "anthropic":
self._handle_anthropic(body, model, stream)
elif BACKEND == "command-code":
self._handle_command_code(body, model, stream)
else:
self._handle_openai_compat(body, model, stream)
def _handle_openai_compat(self, body, model, stream):
input_data = body.get("input", "")
messages = oa_input_to_messages(input_data)
instructions = body.get("instructions", "").strip()
if instructions:
messages.insert(0, {"role": "system", "content": instructions})
chat_body = {"model": model, "messages": messages}
for k in ("temperature", "top_p", "max_output_tokens"):
if k in body:
chat_body["max_tokens" if k == "max_output_tokens" else k] = body[k]
tools = oa_convert_tools(body.get("tools"))
if tools:
chat_body["tools"] = tools
if body.get("tool_choice"):
chat_body["tool_choice"] = body["tool_choice"]
chat_body["stream"] = stream
target = upstream_target(TARGET_URL, "/chat/completions")
fwd = forwarded_headers(self.headers, {
"Content-Type": "application/json",
"Authorization": f"Bearer {API_KEY}",
}, browser_ua=True)
print(f"[translate-proxy] POST {target} model={model} stream={stream} ua={fwd.get('User-Agent','')[:50]}", file=sys.stderr)
req = urllib.request.Request(
target,
data=json.dumps(chat_body).encode(),
headers=fwd,
)
self._forward(req, stream, model,
lambda r: oa_resp_to_responses(json.loads(r.read()), model),
lambda s: oa_stream_to_sse(s, model, body.get("request_id") or body.get("id")))
def _handle_anthropic(self, body, model, stream):
input_data = body.get("input", "")
an_body = {"model": model, "messages": an_input_to_messages(input_data),
"max_tokens": body.get("max_output_tokens", 8192)}
instructions = body.get("instructions", "").strip()
if instructions:
an_body["system"] = instructions
for k in ("temperature", "top_p"):
if k in body:
an_body[k] = body[k]
tools = an_convert_tools(body.get("tools"))
if tools:
an_body["tools"] = tools
if body.get("tool_choice"):
tc = body["tool_choice"]
if isinstance(tc, str):
an_body["tool_choice"] = {"type": tc}
elif isinstance(tc, dict):
an_body["tool_choice"] = tc
an_body["stream"] = stream
target = upstream_target(TARGET_URL, "/messages")
req = urllib.request.Request(
target,
data=json.dumps(an_body).encode(),
headers=forwarded_headers(self.headers, {
"Content-Type": "application/json",
"x-api-key": API_KEY,
"anthropic-version": "2023-06-01",
}),
)
self._forward(req, stream, model,
lambda r: an_resp_to_responses(json.loads(r.read()), model),
lambda s: an_stream_to_sse(s, model, body.get("request_id") or body.get("id")))
def _handle_command_code(self, body, model, stream):
input_data = body.get("input", "")
raw_msgs = oa_input_to_messages(input_data)
instructions = body.get("instructions", "").strip()
cc_msgs = []
if instructions:
cc_msgs.append({"role": "user", "content": [{"type": "text", "text": instructions}]})
for m in raw_msgs:
role = m.get("role", "user")
if role == "system":
role = "user"
content = m.get("content", "")
if isinstance(content, str):
content = [{"type": "text", "text": content}]
elif content is None:
content = [{"type": "text", "text": ""}]
cc_msgs.append({"role": role, "content": content})
for tc in m.get("tool_calls") or []:
fn = tc.get("function", {})
cc_msgs.append({"role": "assistant", "content": [{"type": "text", "text": ""}],
"tool_calls": [{"id": tc.get("id", uid("tc")), "type": "function",
"function": {"name": fn.get("name", ""), "arguments": fn.get("arguments", "{}")}}]})
if m.get("tool_call_id"):
cc_msgs.append({"role": "tool", "tool_call_id": m["tool_call_id"],
"content": [{"type": "text", "text": m.get("content", "")}]})
thread_id = body.get("request_id") or body.get("id") or ""
try:
uuid.UUID(thread_id)
except (ValueError, AttributeError):
thread_id = str(uuid.uuid4())
cc_body = {
"config": _cc_config(),
"memory": "",
"taste": "",
"skills": "",
"params": {
"stream": True,
"max_tokens": body.get("max_output_tokens", 64000),
"temperature": body.get("temperature", 0.3),
"messages": cc_msgs,
"model": model,
"tools": [],
},
"threadId": thread_id,
}
target = upstream_target(TARGET_URL, "/alpha/generate")
fwd = forwarded_headers(self.headers, {
"Content-Type": "application/json",
"Authorization": f"Bearer {API_KEY}",
"Accept": "text/event-stream, application/json",
"x-command-code-version": CC_VERSION or "0.26.8",
}, browser_ua=True)
print(f"[translate-proxy] POST {target} model={model} stream={stream} [command-code]", file=sys.stderr)
req = urllib.request.Request(
target,
data=json.dumps(cc_body).encode(),
headers=fwd,
)
if stream:
try:
upstream = urllib.request.urlopen(req)
except urllib.error.HTTPError as e:
err = e.read().decode()
return self.send_json(e.code, {"error": {"type": "upstream_error", "message": err}})
except Exception as e:
return self.send_json(500, {"error": {"type": "proxy_error", "message": str(e)}})
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.send_header("Connection", "keep-alive")
self.end_headers()
for event in cc_stream_to_sse(upstream, model, body.get("request_id") or body.get("id")):
self.wfile.write(event.encode("utf-8"))
self.wfile.flush()
else:
try:
upstream = urllib.request.urlopen(req)
except urllib.error.HTTPError as e:
err = e.read().decode()
return self.send_json(e.code, {"error": {"type": "upstream_error", "message": err}})
except Exception as e:
return self.send_json(500, {"error": {"type": "proxy_error", "message": str(e)}})
raw = upstream.read().decode()
lines = raw.strip().split("\n")
result = cc_resp_to_responses(lines, model)
self.send_json(200, result)
def _forward(self, req, stream, model, nonstream_fn, stream_fn):
try:
upstream = urllib.request.urlopen(req)
except urllib.error.HTTPError as e:
err = e.read().decode()
return self.send_json(e.code, {"error": {"type": "upstream_error", "message": err}})
except Exception as e:
return self.send_json(500, {"error": {"type": "proxy_error", "message": str(e)}})
if stream:
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.send_header("Connection", "keep-alive")
self.end_headers()
for event in stream_fn(upstream):
self.wfile.write(event.encode("utf-8"))
self.wfile.flush()
else:
result = nonstream_fn(upstream)
self.send_json(200, result)
def send_json(self, status, data):
body = json.dumps(data).encode()
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def log_message(self, fmt, *args):
msg = fmt % args if args else fmt
print(f"[translate-proxy] {BACKEND} {msg}", file=sys.stderr)
if __name__ == "__main__":
server = http.server.HTTPServer(("127.0.0.1", PORT), Handler)
print(f"translate-proxy ({BACKEND}) listening on http://127.0.0.1:{PORT}", flush=True)
print(f"Target: {TARGET_URL}", flush=True)
print(f"Models: {[m['id'] for m in MODELS]}", flush=True)
server.serve_forever()