#!/usr/bin/env python3 """ translate-proxy.py — Responses API → backend API translation proxy. Backends: openai-compat — any OpenAI-compatible Chat Completions API anthropic — Anthropic Messages API Usage: python3 translate-proxy.py --config proxy-config.json python3 translate-proxy.py --backend openai-compat --target-url https://... --api-key sk-... """ import json, http.server, urllib.request, time, uuid, os, sys, argparse # ═══════════════════════════════════════════════════════════════════ # Config # ═══════════════════════════════════════════════════════════════════ DEFAULT_MODELS = { "openai-compat": [ {"id": "gpt-4o-mini", "object": "model", "created": 1700000000, "owned_by": "custom"}, ], "anthropic": [ {"id": "claude-sonnet-4-20250514", "object": "model", "created": 1700000000, "owned_by": "anthropic"}, ], } def load_config(): p = argparse.ArgumentParser(description="Responses API translation proxy") p.add_argument("--config", help="JSON config file path") p.add_argument("--port", type=int, default=None) p.add_argument("--backend", default=None, choices=["openai-compat", "anthropic", "command-code"]) p.add_argument("--target-url", default=None) p.add_argument("--api-key", default=None) p.add_argument("--models-file", default=None, help="JSON file with model list array") args = p.parse_args() cfg = {} if args.config: with open(args.config) as f: cfg = json.load(f) for ck, ak in [("port", "port"), ("backend_type", "backend"), ("target_url", "target_url"), ("api_key", "api_key")]: v = getattr(args, ak, None) if v is not None: cfg[ck] = v env_map = { "port": ("PROXY_PORT", "ZAI_PROXY_PORT", int), "backend_type": ("PROXY_BACKEND", None, str), "target_url": ("PROXY_TARGET_URL", "ZAI_BASE_URL", str), "api_key": ("PROXY_API_KEY", "ZAI_API_KEY", str), } for ck, (ev1, ev2, conv) in env_map.items(): if ck not in cfg: v = os.environ.get(ev1) or (os.environ.get(ev2) if ev2 else None) if v: cfg[ck] = conv(v) if conv == int else v cfg.setdefault("port", 8080) cfg.setdefault("backend_type", "openai-compat") cfg.setdefault("target_url", "http://localhost:11434/v1") cfg.setdefault("api_key", "") models = cfg.get("models", []) if not models and args.models_file: with open(args.models_file) as f: models = json.load(f) if not models: models = DEFAULT_MODELS.get(cfg["backend_type"], []) cfg["models"] = models return cfg CONFIG = load_config() PORT = CONFIG["port"] BACKEND = CONFIG["backend_type"] TARGET_URL = CONFIG["target_url"].rstrip("/") API_KEY = CONFIG["api_key"] MODELS = CONFIG["models"] CC_VERSION = CONFIG.get("cc_version", "") # ═══════════════════════════════════════════════════════════════════ # Shared helpers # ═══════════════════════════════════════════════════════════════════ _pool = uuid.uuid4().hex[:8] _HOP_BY_HOP_HEADERS = { "connection", "keep-alive", "proxy-authenticate", "proxy-authorization", "te", "trailers", "transfer-encoding", "upgrade", "host", "content-length", } def uid(prefix="id"): return f"{prefix}-{_pool}-{uuid.uuid4().hex[:12]}" def emit(event, data): return f"event: {event}\ndata: {json.dumps(data)}\n\n" def upstream_target(base_url, suffix): base = base_url.rstrip("/") if base.endswith(suffix): return base return f"{base}{suffix}" _BROWSER_HEADERS = { "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36", "Accept": "application/json, text/event-stream, */*", "Accept-Language": "en-US,en;q=0.9", "Sec-Ch-Ua": '"Chromium";v="137", "Not/A)Brand";v="99"', "Sec-Ch-Ua-Mobile": "?0", "Sec-Ch-Ua-Platform": '"Linux"', "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", } def forwarded_headers(request_headers, extra=None, browser_ua=False): headers = {} if browser_ua: headers.update(_BROWSER_HEADERS) for key, value in request_headers.items(): if key.lower() in _HOP_BY_HOP_HEADERS: continue if browser_ua and key.lower() == "user-agent": continue headers[key] = value if extra: headers.update(extra) return headers # ═══════════════════════════════════════════════════════════════════ # OpenAI-compat backend # ═══════════════════════════════════════════════════════════════════ def oa_input_to_messages(input_data): msgs = [] if isinstance(input_data, str): msgs.append({"role": "user", "content": input_data}) elif isinstance(input_data, list): pending_tool_calls = [] for item in input_data: t = item.get("type") if t == "function_call": pending_tool_calls.append( {"id": item.get("call_id", item.get("id", uid("tc"))), "type": "function", "function": {"name": item.get("name", ""), "arguments": item.get("arguments", "{}")}}) continue if pending_tool_calls: msgs.append({"role": "assistant", "content": None, "tool_calls": pending_tool_calls}) pending_tool_calls = [] if t == "message": role = item.get("role", "user") if role == "developer": role = "system" text = "" for part in item.get("content", []): pt = part.get("type", "") if pt in ("input_text", "output_text"): text += part.get("text", "") elif pt == "input_image": img = part.get("image_url", part) msgs.append({"role": role, "content": [{"type": "text", "text": text}, {"type": "image_url", "image_url": img}]}) text = None break if text is not None: msgs.append({"role": role, "content": text}) elif t == "function_call_output": msgs.append({"role": "tool", "tool_call_id": item.get("id", ""), "content": item.get("output", "")}) if pending_tool_calls: msgs.append({"role": "assistant", "content": None, "tool_calls": pending_tool_calls}) return msgs def oa_convert_tools(tools): if not tools: return None out = [] for t in tools: if t.get("type") != "function": continue fn = t.get("function", {}) if fn: out.append(t) else: out.append({ "type": "function", "function": {"name": t.get("name", ""), "description": t.get("description", ""), "parameters": t.get("parameters", {})} }) return out or None def oa_resp_to_responses(chat_resp, model, resp_id=None): choice = chat_resp["choices"][0] msg = choice["message"] content = msg.get("content") or "" finish = choice.get("finish_reason", "stop") fm = {"stop": "completed", "length": "incomplete", "tool_calls": "completed", "content_filter": "incomplete"} status = fm.get(finish, "incomplete") outputs = [] rc = msg.get("reasoning_content") if rc: outputs.append({"type": "reasoning", "id": uid("rsn"), "status": "completed", "content": [{"type": "text", "text": rc}]}) if content: outputs.append({"type": "message", "id": uid("msg"), "role": "assistant", "status": "completed", "content": [{"type": "output_text", "text": content, "annotations": []}]}) for tc in msg.get("tool_calls") or []: fn = tc.get("function", {}) outputs.append({"type": "function_call", "id": uid("fc"), "call_id": tc.get("id"), "name": fn.get("name"), "arguments": fn.get("arguments", "{}"), "status": "completed"}) usage = chat_resp.get("usage", {}) return {"id": resp_id or uid("resp"), "object": "response", "created": int(time.time()), "model": model, "status": status, "output": outputs, "usage": {"input_tokens": usage.get("prompt_tokens", 0), "output_tokens": usage.get("completion_tokens", 0), "total_tokens": usage.get("total_tokens", 0), "input_tokens_details": {"cached_tokens": usage.get("prompt_tokens_details", {}).get("cached_tokens", 0)}}} def oa_stream_to_sse(chat_stream, model, req_id): resp_id = req_id or uid("resp") msg_id = uid("msg") text_buf = "" tc_buf = {} fr = None yield emit("response.created", {"type": "response.created", "response": {"id": resp_id, "object": "response", "model": model, "status": "in_progress", "created": int(time.time()), "output": []}}) yield emit("response.in_progress", {"type": "response.in_progress", "response": {"id": resp_id}}) yield emit("response.output_item.added", {"type": "response.output_item.added", "item": {"type": "message", "id": msg_id, "role": "assistant", "status": "in_progress", "content": []}}) yield emit("response.content_part.added", {"type": "response.content_part.added", "part": {"type": "output_text", "text": "", "annotations": []}, "item_id": msg_id}) for line in chat_stream: line = line.decode("utf-8", errors="replace").strip() if not line or line.startswith(":") or line == "data: [DONE]": continue if not line.startswith("data: "): continue try: chunk = json.loads(line[6:]) except json.JSONDecodeError: continue choices = chunk.get("choices", []) if not choices: continue delta = choices[0].get("delta", {}) fr = choices[0].get("finish_reason") content = delta.get("content") if content: text_buf += content yield emit("response.output_text.delta", {"type": "response.output_text.delta", "delta": content, "item_id": msg_id, "content_index": 0}) for tc in delta.get("tool_calls") or []: idx = tc.get("index", 0) if idx not in tc_buf: fid = uid("fc") tc_buf[idx] = {"id": fid, "call_id": tc.get("id", fid), "name": "", "args": ""} yield emit("response.output_item.added", {"type": "response.output_item.added", "item": {"type": "function_call", "id": fid, "call_id": tc_buf[idx]["call_id"], "name": "", "arguments": "", "status": "in_progress"}}) fn = tc.get("function", {}) if "name" in fn and fn["name"]: tc_buf[idx]["name"] = fn["name"] if "arguments" in fn and fn["arguments"]: tc_buf[idx]["args"] += fn["arguments"] yield emit("response.output_text.delta", {"type": "response.function_call_arguments.delta", "delta": fn["arguments"], "item_id": tc_buf[idx]["id"]}) rc = delta.get("reasoning_content") if rc: yield emit("response.reasoning.delta", {"type": "response.reasoning.delta", "delta": rc}) if text_buf: yield emit("response.output_text.done", {"type": "response.output_text.done", "text": text_buf, "item_id": msg_id, "content_index": 0}) yield emit("response.content_part.done", {"type": "response.content_part.done", "part": {"type": "output_text", "text": text_buf, "annotations": []}, "item_id": msg_id}) yield emit("response.output_item.done", {"type": "response.output_item.done", "item": {"type": "message", "id": msg_id, "role": "assistant", "status": "completed", "content": [{"type": "output_text", "text": text_buf, "annotations": []}]}}) for idx in sorted(tc_buf): t = tc_buf[idx] yield emit("response.function_call_arguments.done", {"type": "response.function_call_arguments.done", "item_id": t["id"], "name": t["name"], "arguments": t["args"]}) yield emit("response.output_item.done", {"type": "response.output_item.done", "item": {"type": "function_call", "id": t["id"], "call_id": t["call_id"], "name": t["name"], "arguments": t["args"], "status": "completed"}}) fm = {"stop": "completed", "length": "incomplete", "tool_calls": "completed", "content_filter": "incomplete"} status = fm.get(fr, "incomplete") final_out = [] if text_buf: final_out.append({"type": "message", "id": msg_id, "role": "assistant", "status": "completed", "content": [{"type": "output_text", "text": text_buf, "annotations": []}]}) for idx in sorted(tc_buf): t = tc_buf[idx] final_out.append({"type": "function_call", "id": t["id"], "call_id": t["call_id"], "name": t["name"], "arguments": t["args"], "status": "completed"}) yield emit("response.completed", {"type": "response.completed", "response": {"id": resp_id, "object": "response", "model": model, "status": status, "created": int(time.time()), "output": final_out}}) # ═══════════════════════════════════════════════════════════════════ # Anthropic backend # ═══════════════════════════════════════════════════════════════════ def an_input_to_messages(input_data): msgs = [] if isinstance(input_data, str): msgs.append({"role": "user", "content": input_data}) elif isinstance(input_data, list): for item in input_data: t = item.get("type") if t == "message": role = item.get("role", "user") if role == "developer": role = "user" text = "" for part in item.get("content", []): pt = part.get("type", "") if pt in ("input_text", "output_text"): text += part.get("text", "") if role == "assistant": msgs.append({"role": "assistant", "content": text}) else: msgs.append({"role": "user", "content": text}) elif t == "function_call": msgs.append({"role": "assistant", "content": [ {"type": "tool_use", "id": item.get("call_id", item.get("id", uid("tu"))), "name": item.get("name", ""), "input": json.loads(item.get("arguments", "{}"))} ]}) elif t == "function_call_output": msgs.append({"role": "user", "content": [ {"type": "tool_result", "tool_use_id": item.get("id", ""), "content": item.get("output", "")} ]}) return msgs def an_convert_tools(tools): if not tools: return None out = [] for t in tools: if t.get("type") != "function": continue fn = t.get("function", {}) if fn: out.append({"name": fn.get("name"), "description": fn.get("description", ""), "input_schema": fn.get("parameters", {"type": "object", "properties": {}})}) else: out.append({"name": t.get("name"), "description": t.get("description", ""), "input_schema": t.get("parameters", {"type": "object", "properties": {}})}) return out or None def an_resp_to_responses(anthro_resp, model, resp_id=None): blocks = anthro_resp.get("content", []) sr = anthro_resp.get("stop_reason", "end_turn") sm = {"end_turn": "completed", "max_tokens": "incomplete", "stop_sequence": "completed", "tool_use": "completed"} status = sm.get(sr, "incomplete") outputs = [] for b in blocks: bt = b.get("type", "") if bt == "text": outputs.append({"type": "message", "id": uid("msg"), "role": "assistant", "status": "completed", "content": [{"type": "output_text", "text": b.get("text", ""), "annotations": []}]}) elif bt == "tool_use": outputs.append({"type": "function_call", "id": uid("fc"), "call_id": b.get("id", ""), "name": b.get("name", ""), "arguments": json.dumps(b.get("input", {})), "status": "completed"}) elif bt == "thinking": outputs.append({"type": "reasoning", "id": uid("rsn"), "status": "completed", "content": [{"type": "text", "text": b.get("thinking", "")}]}) usage = anthro_resp.get("usage", {}) return {"id": resp_id or uid("resp"), "object": "response", "created": int(time.time()), "model": model, "status": status, "output": outputs, "usage": {"input_tokens": usage.get("input_tokens", 0), "output_tokens": usage.get("output_tokens", 0), "total_tokens": usage.get("input_tokens", 0) + usage.get("output_tokens", 0), "input_tokens_details": {"cached_tokens": 0}}} def an_stream_to_sse(stream, model, req_id): resp_id = req_id or uid("resp") completed = [] msg_id = uid("msg") text_buf = "" tc_id = None tc_call_id = None tc_name = "" tc_args = "" block_type = None stop_reason = "end_turn" yield emit("response.created", {"type": "response.created", "response": {"id": resp_id, "object": "response", "model": model, "status": "in_progress", "created": int(time.time()), "output": []}}) yield emit("response.in_progress", {"type": "response.in_progress", "response": {"id": resp_id}}) for raw in stream: line = raw.decode("utf-8", errors="replace").strip() if not line: continue if line.startswith("event: "): evt_type = line[7:] continue if not line.startswith("data: "): continue try: data = json.loads(line[6:]) except json.JSONDecodeError: continue et = data.get("type", "") if et == "message_start": pass elif et == "content_block_start": cb_type = data.get("content_block", {}).get("type", "") block_type = cb_type if cb_type == "text": msg_id = uid("msg") yield emit("response.output_item.added", {"type": "response.output_item.added", "item": {"type": "message", "id": msg_id, "role": "assistant", "status": "in_progress", "content": []}}) yield emit("response.content_part.added", {"type": "response.content_part.added", "part": {"type": "output_text", "text": "", "annotations": []}, "item_id": msg_id}) elif cb_type == "tool_use": cb = data.get("content_block", {}) tc_id = uid("fc") tc_call_id = cb.get("id", tc_id) tc_name = cb.get("name", "") yield emit("response.output_item.added", {"type": "response.output_item.added", "item": {"type": "function_call", "id": tc_id, "call_id": tc_call_id, "name": tc_name, "arguments": "", "status": "in_progress"}}) elif cb_type == "thinking": pass elif et == "content_block_delta": dd = data.get("delta", {}) dt = dd.get("type", "") if dt == "text_delta": txt = dd.get("text", "") text_buf += txt yield emit("response.output_text.delta", {"type": "response.output_text.delta", "delta": txt, "item_id": msg_id, "content_index": 0}) elif dt == "input_json_delta": pj = dd.get("partial_json", "") tc_args += pj yield emit("response.output_text.delta", {"type": "response.function_call_arguments.delta", "delta": pj, "item_id": tc_id}) elif dt == "thinking_delta": tk = dd.get("thinking", "") yield emit("response.reasoning.delta", {"type": "response.reasoning.delta", "delta": tk}) elif et == "content_block_stop": if block_type == "text": yield emit("response.output_text.done", {"type": "response.output_text.done", "text": text_buf, "item_id": msg_id, "content_index": 0}) yield emit("response.content_part.done", {"type": "response.content_part.done", "part": {"type": "output_text", "text": text_buf, "annotations": []}, "item_id": msg_id}) yield emit("response.output_item.done", {"type": "response.output_item.done", "item": {"type": "message", "id": msg_id, "role": "assistant", "status": "completed", "content": [{"type": "output_text", "text": text_buf, "annotations": []}]}}) completed.append({"type": "message", "id": msg_id, "role": "assistant", "status": "completed", "content": [{"type": "output_text", "text": text_buf, "annotations": []}]}) text_buf = "" elif block_type == "tool_use": yield emit("response.function_call_arguments.done", {"type": "response.function_call_arguments.done", "item_id": tc_id, "name": tc_name, "arguments": tc_args}) yield emit("response.output_item.done", {"type": "response.output_item.done", "item": {"type": "function_call", "id": tc_id, "call_id": tc_call_id, "name": tc_name, "arguments": tc_args, "status": "completed"}}) completed.append({"type": "function_call", "id": tc_id, "call_id": tc_call_id, "name": tc_name, "arguments": tc_args, "status": "completed"}) tc_id = None tc_args = "" block_type = None elif et == "message_delta": stop_reason = data.get("delta", {}).get("stop_reason", "end_turn") elif et == "message_stop": sm = {"end_turn": "completed", "max_tokens": "incomplete", "stop_sequence": "completed", "tool_use": "completed"} status = sm.get(stop_reason, "incomplete") yield emit("response.completed", {"type": "response.completed", "response": {"id": resp_id, "object": "response", "model": model, "status": status, "created": int(time.time()), "output": completed}}) _DEFAULT_CC_CONFIG = { "workingDir": "/tmp", "date": "", "environment": "linux", "shell": "bash", "files": [], "structure": [], "isGitRepo": False, "currentBranch": "", "mainBranch": "", "gitStatus": "", "recentCommits": [], } def _cc_config(): cfg = dict(_DEFAULT_CC_CONFIG) cfg["date"] = time.strftime("%Y-%m-%d") return cfg def cc_input_to_messages(input_data): return oa_input_to_messages(input_data) def cc_convert_tools(tools): return oa_convert_tools(tools) def cc_resp_to_responses(cc_lines, model, resp_id=None): text = "" usage = {} for line in cc_lines: try: d = json.loads(line) except (json.JSONDecodeError, TypeError): continue t = d.get("type", "") if t == "text-delta": text += d.get("text", "") elif t == "finish-step": u = d.get("usage", {}) usage = { "input_tokens": u.get("inputTokens", 0), "output_tokens": u.get("outputTokens", 0), "total_tokens": u.get("inputTokens", 0) + u.get("outputTokens", 0), } outputs = [] if text: outputs.append({"type": "message", "id": uid("msg"), "role": "assistant", "status": "completed", "content": [{"type": "output_text", "text": text, "annotations": []}]}) return {"id": resp_id or uid("resp"), "object": "response", "created": int(time.time()), "model": model, "status": "completed", "output": outputs, "usage": {"input_tokens": usage.get("input_tokens", 0), "output_tokens": usage.get("output_tokens", 0), "total_tokens": usage.get("total_tokens", 0), "input_tokens_details": {"cached_tokens": 0}}} def cc_stream_to_sse(cc_stream, model, req_id): resp_id = req_id or uid("resp") msg_id = uid("msg") text_buf = "" yield emit("response.created", {"type": "response.created", "response": {"id": resp_id, "object": "response", "model": model, "status": "in_progress", "created": int(time.time()), "output": []}}) yield emit("response.in_progress", {"type": "response.in_progress", "response": {"id": resp_id}}) yield emit("response.output_item.added", {"type": "response.output_item.added", "item": {"type": "message", "id": msg_id, "role": "assistant", "status": "in_progress", "content": []}}) yield emit("response.content_part.added", {"type": "response.content_part.added", "part": {"type": "output_text", "text": "", "annotations": []}, "item_id": msg_id}) total_usage = {} for raw in cc_stream: line = raw.decode("utf-8", errors="replace").strip() if not line: continue try: d = json.loads(line) except json.JSONDecodeError: continue t = d.get("type", "") if t == "text-delta": txt = d.get("text", "") if txt: text_buf += txt yield emit("response.output_text.delta", {"type": "response.output_text.delta", "delta": txt, "item_id": msg_id, "content_index": 0}) elif t == "finish-step": u = d.get("usage", {}) total_usage = { "input_tokens": u.get("inputTokens", 0), "output_tokens": u.get("outputTokens", 0), "total_tokens": u.get("inputTokens", 0) + u.get("outputTokens", 0), } if text_buf: yield emit("response.output_text.done", {"type": "response.output_text.done", "text": text_buf, "item_id": msg_id, "content_index": 0}) yield emit("response.content_part.done", {"type": "response.content_part.done", "part": {"type": "output_text", "text": text_buf, "annotations": []}, "item_id": msg_id}) yield emit("response.output_item.done", {"type": "response.output_item.done", "item": {"type": "message", "id": msg_id, "role": "assistant", "status": "completed", "content": [{"type": "output_text", "text": text_buf, "annotations": []}]}}) final_out = [] if text_buf: final_out.append({"type": "message", "id": msg_id, "role": "assistant", "status": "completed", "content": [{"type": "output_text", "text": text_buf, "annotations": []}]}) yield emit("response.completed", {"type": "response.completed", "response": {"id": resp_id, "object": "response", "model": model, "status": "completed", "created": int(time.time()), "output": final_out, "usage": total_usage}}) # ═══════════════════════════════════════════════════════════════════ # HTTP Server # ═══════════════════════════════════════════════════════════════════ class Handler(http.server.BaseHTTPRequestHandler): protocol_version = "HTTP/1.1" def do_GET(self): if self.path in ("/v1/models", "/models"): self.send_json(200, {"object": "list", "data": MODELS}) else: self.send_error(404) def do_POST(self): if self.path in ("/v1/responses", "/responses"): self._handle() else: self.send_error(404) def _handle(self): try: clen = int(self.headers.get("Content-Length", 0)) body = json.loads(self.rfile.read(clen)) except Exception as e: return self.send_json(400, {"error": {"message": f"Bad request: {e}"}}) model = body.get("model", MODELS[0]["id"] if MODELS else "unknown") stream = body.get("stream", False) if BACKEND == "anthropic": self._handle_anthropic(body, model, stream) elif BACKEND == "command-code": self._handle_command_code(body, model, stream) else: self._handle_openai_compat(body, model, stream) def _handle_openai_compat(self, body, model, stream): input_data = body.get("input", "") messages = oa_input_to_messages(input_data) instructions = body.get("instructions", "").strip() if instructions: messages.insert(0, {"role": "system", "content": instructions}) chat_body = {"model": model, "messages": messages} for k in ("temperature", "top_p", "max_output_tokens"): if k in body: chat_body["max_tokens" if k == "max_output_tokens" else k] = body[k] tools = oa_convert_tools(body.get("tools")) if tools: chat_body["tools"] = tools if body.get("tool_choice"): chat_body["tool_choice"] = body["tool_choice"] chat_body["stream"] = stream target = upstream_target(TARGET_URL, "/chat/completions") fwd = forwarded_headers(self.headers, { "Content-Type": "application/json", "Authorization": f"Bearer {API_KEY}", }, browser_ua=True) print(f"[translate-proxy] POST {target} model={model} stream={stream} ua={fwd.get('User-Agent','')[:50]}", file=sys.stderr) req = urllib.request.Request( target, data=json.dumps(chat_body).encode(), headers=fwd, ) self._forward(req, stream, model, lambda r: oa_resp_to_responses(json.loads(r.read()), model), lambda s: oa_stream_to_sse(s, model, body.get("request_id") or body.get("id"))) def _handle_anthropic(self, body, model, stream): input_data = body.get("input", "") an_body = {"model": model, "messages": an_input_to_messages(input_data), "max_tokens": body.get("max_output_tokens", 8192)} instructions = body.get("instructions", "").strip() if instructions: an_body["system"] = instructions for k in ("temperature", "top_p"): if k in body: an_body[k] = body[k] tools = an_convert_tools(body.get("tools")) if tools: an_body["tools"] = tools if body.get("tool_choice"): tc = body["tool_choice"] if isinstance(tc, str): an_body["tool_choice"] = {"type": tc} elif isinstance(tc, dict): an_body["tool_choice"] = tc an_body["stream"] = stream target = upstream_target(TARGET_URL, "/messages") req = urllib.request.Request( target, data=json.dumps(an_body).encode(), headers=forwarded_headers(self.headers, { "Content-Type": "application/json", "x-api-key": API_KEY, "anthropic-version": "2023-06-01", }), ) self._forward(req, stream, model, lambda r: an_resp_to_responses(json.loads(r.read()), model), lambda s: an_stream_to_sse(s, model, body.get("request_id") or body.get("id"))) def _handle_command_code(self, body, model, stream): input_data = body.get("input", "") raw_msgs = oa_input_to_messages(input_data) instructions = body.get("instructions", "").strip() cc_msgs = [] if instructions: cc_msgs.append({"role": "user", "content": [{"type": "text", "text": instructions}]}) for m in raw_msgs: role = m.get("role", "user") if role == "system": role = "user" content = m.get("content", "") if isinstance(content, str): content = [{"type": "text", "text": content}] elif content is None: content = [{"type": "text", "text": ""}] cc_msgs.append({"role": role, "content": content}) for tc in m.get("tool_calls") or []: fn = tc.get("function", {}) cc_msgs.append({"role": "assistant", "content": [{"type": "text", "text": ""}], "tool_calls": [{"id": tc.get("id", uid("tc")), "type": "function", "function": {"name": fn.get("name", ""), "arguments": fn.get("arguments", "{}")}}]}) if m.get("tool_call_id"): cc_msgs.append({"role": "tool", "tool_call_id": m["tool_call_id"], "content": [{"type": "text", "text": m.get("content", "")}]}) thread_id = body.get("request_id") or body.get("id") or "" try: uuid.UUID(thread_id) except (ValueError, AttributeError): thread_id = str(uuid.uuid4()) cc_body = { "config": _cc_config(), "memory": "", "taste": "", "skills": "", "params": { "stream": True, "max_tokens": body.get("max_output_tokens", 64000), "temperature": body.get("temperature", 0.3), "messages": cc_msgs, "model": model, "tools": [], }, "threadId": thread_id, } target = upstream_target(TARGET_URL, "/alpha/generate") fwd = forwarded_headers(self.headers, { "Content-Type": "application/json", "Authorization": f"Bearer {API_KEY}", "Accept": "text/event-stream, application/json", "x-command-code-version": CC_VERSION or "0.26.8", }, browser_ua=True) print(f"[translate-proxy] POST {target} model={model} stream={stream} [command-code]", file=sys.stderr) req = urllib.request.Request( target, data=json.dumps(cc_body).encode(), headers=fwd, ) if stream: try: upstream = urllib.request.urlopen(req) except urllib.error.HTTPError as e: err = e.read().decode() return self.send_json(e.code, {"error": {"type": "upstream_error", "message": err}}) except Exception as e: return self.send_json(500, {"error": {"type": "proxy_error", "message": str(e)}}) self.send_response(200) self.send_header("Content-Type", "text/event-stream") self.send_header("Cache-Control", "no-cache") self.send_header("Connection", "keep-alive") self.end_headers() for event in cc_stream_to_sse(upstream, model, body.get("request_id") or body.get("id")): self.wfile.write(event.encode("utf-8")) self.wfile.flush() else: try: upstream = urllib.request.urlopen(req) except urllib.error.HTTPError as e: err = e.read().decode() return self.send_json(e.code, {"error": {"type": "upstream_error", "message": err}}) except Exception as e: return self.send_json(500, {"error": {"type": "proxy_error", "message": str(e)}}) raw = upstream.read().decode() lines = raw.strip().split("\n") result = cc_resp_to_responses(lines, model) self.send_json(200, result) def _forward(self, req, stream, model, nonstream_fn, stream_fn): try: upstream = urllib.request.urlopen(req) except urllib.error.HTTPError as e: err = e.read().decode() return self.send_json(e.code, {"error": {"type": "upstream_error", "message": err}}) except Exception as e: return self.send_json(500, {"error": {"type": "proxy_error", "message": str(e)}}) if stream: self.send_response(200) self.send_header("Content-Type", "text/event-stream") self.send_header("Cache-Control", "no-cache") self.send_header("Connection", "keep-alive") self.end_headers() for event in stream_fn(upstream): self.wfile.write(event.encode("utf-8")) self.wfile.flush() else: result = nonstream_fn(upstream) self.send_json(200, result) def send_json(self, status, data): body = json.dumps(data).encode() self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def log_message(self, fmt, *args): msg = fmt % args if args else fmt print(f"[translate-proxy] {BACKEND} {msg}", file=sys.stderr) if __name__ == "__main__": server = http.server.HTTPServer(("127.0.0.1", PORT), Handler) print(f"translate-proxy ({BACKEND}) listening on http://127.0.0.1:{PORT}", flush=True) print(f"Target: {TARGET_URL}", flush=True) print(f"Models: {[m['id'] for m in MODELS]}", flush=True) server.serve_forever()