The vector-db API returns message content in a top-level "content" field with "author" and "channel" also top-level (not nested under metadata). Previous code read "text" and "metadata.author" which returned empty strings, making all vector search results invisible to the LLM. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
503 lines
18 KiB
Python
503 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""Wiki VectorDB Chat — Multi-Provider AI Chat with RAG (KB + VectorDB).
|
|
|
|
Serves at port 8770, proxied via nginx at /zportal/wiki/api/chat
|
|
Uses wiki-api (:8097) for KB search and vector-db (:8099) for vector search.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import re
|
|
import time
|
|
import urllib.request
|
|
from pathlib import Path
|
|
|
|
PROVIDERS_FILE = Path("/opt/blog/wiki-chat-providers.json")
|
|
CUSTOM_PROVIDERS_FILE = Path("/opt/blog/wiki-chat-providers.json")
|
|
|
|
WIKI_API = "http://127.0.0.1:8097"
|
|
VECTOR_DB = "http://127.0.0.1:8099"
|
|
|
|
# Shared API token for wiki-api and vector-db
|
|
_API_TOKEN = ""
|
|
try:
|
|
_API_TOKEN = Path("/opt/blog/.wiki-api-token").read_text().strip()
|
|
except Exception:
|
|
pass
|
|
|
|
PRESETS = [
|
|
{
|
|
"id": "zai-coding",
|
|
"name": "Z.ai Coding Plan",
|
|
"base_url": "https://api.z.ai/api/coding/paas/v4",
|
|
"model": "glm-4-plus",
|
|
"format": "openai",
|
|
"icon": "\u26a1",
|
|
"description": "Official Z.ai coding plan API",
|
|
},
|
|
{
|
|
"id": "openadapter",
|
|
"name": "OpenAdapter",
|
|
"base_url": "https://api.openadapter.com/v1",
|
|
"model": "gpt-4o-mini",
|
|
"format": "openai",
|
|
"icon": "\u1f512",
|
|
"description": "OpenAdapter unified API",
|
|
},
|
|
{
|
|
"id": "openrouter",
|
|
"name": "OpenRouter",
|
|
"base_url": "https://openrouter.ai/api/v1",
|
|
"model": "anthropic/claude-sonnet-4",
|
|
"format": "openrouter",
|
|
"icon": "\u1f6e3",
|
|
"description": "Model router across providers",
|
|
},
|
|
{
|
|
"id": "crofai",
|
|
"name": "Crof.AI",
|
|
"base_url": "https://api.crof.ai/v1",
|
|
"model": "crof-4-plus",
|
|
"format": "openai",
|
|
"icon": "\u1f42a",
|
|
"description": "Crof AI models",
|
|
},
|
|
{
|
|
"id": "opencode-zen",
|
|
"name": "Opencode Zen",
|
|
"base_url": "https://api.zen.opencode.com/v1",
|
|
"model": "glm-4-plus",
|
|
"format": "openai",
|
|
"icon": "\u1f9e0",
|
|
"description": "Opencode Zen hosted models",
|
|
},
|
|
]
|
|
|
|
|
|
def load_custom_providers():
|
|
try:
|
|
if CUSTOM_PROVIDERS_FILE.exists():
|
|
return json.loads(CUSTOM_PROVIDERS_FILE.read_text())
|
|
except Exception:
|
|
pass
|
|
return []
|
|
|
|
|
|
def save_custom_providers(providers):
|
|
CUSTOM_PROVIDERS_FILE.write_text(json.dumps(providers, indent=2))
|
|
|
|
|
|
def get_all_providers():
|
|
"""Return presets + custom providers."""
|
|
custom = load_custom_providers()
|
|
seen = {p["id"] for p in PRESETS}
|
|
result = list(PRESETS)
|
|
for p in custom:
|
|
if p.get("id") not in seen:
|
|
seen.add(p["id"])
|
|
result.append(p)
|
|
return result
|
|
|
|
|
|
def detect_provider_format(base_url: str) -> str:
|
|
from urllib.parse import urlparse
|
|
host = urlparse(base_url).hostname.lower()
|
|
if "ollama" in host or host in ("localhost", "127.0.0.1"):
|
|
return "ollama"
|
|
if "anthropic" in host:
|
|
return "anthropic"
|
|
if "openrouter" in host:
|
|
return "openrouter"
|
|
if "groq" in host:
|
|
return "groq"
|
|
return "openai"
|
|
|
|
|
|
async def search_kb(query: str, limit: int = 3) -> str:
|
|
"""Search wiki-kb.json via wiki-api."""
|
|
try:
|
|
url = f"{WIKI_API}/search?q={urllib.parse.quote(query)}&limit={limit}&token={_API_TOKEN}"
|
|
req = urllib.request.Request(url)
|
|
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
data = json.loads(resp.read())
|
|
results = data.get("results", [])
|
|
if not results:
|
|
return ""
|
|
lines = []
|
|
for r in results[:limit]:
|
|
q_text = r.get("q", "")
|
|
a_text = r.get("a", "")
|
|
topic = r.get("topic", "")
|
|
score = r.get("_score", 0)
|
|
lines.append(f"[{topic}] Q: {q_text}\nA: {a_text}")
|
|
return "\n\n".join(lines)
|
|
except Exception as e:
|
|
return f"(KB search error: {e})"
|
|
|
|
|
|
async def search_vector(query: str, top_k: int = 5) -> str:
|
|
"""Search vector-db for related Discord/Reddit messages."""
|
|
try:
|
|
data = json.dumps({"query": query, "top_k": top_k}).encode()
|
|
req = urllib.request.Request(
|
|
f"{VECTOR_DB}/vector/search",
|
|
data=data,
|
|
headers={"Content-Type": "application/json", "x-api-key": _API_TOKEN},
|
|
)
|
|
with urllib.request.urlopen(req, timeout=8) as resp:
|
|
result = json.loads(resp.read())
|
|
hits = result.get("results", [])
|
|
if not hits:
|
|
return ""
|
|
lines = []
|
|
for h in hits[:top_k]:
|
|
text = h.get("content", "") or h.get("text", "")
|
|
score = h.get("score", 0)
|
|
source = h.get("source", "unknown")
|
|
author = h.get("author", "")
|
|
channel = h.get("channel", "")
|
|
preview = text.replace("\n", " ")[:200]
|
|
lines.append(f"[{source}] @{author} in #{channel}: {preview} (score: {score:.2f})")
|
|
return "\n\n".join(lines)
|
|
except Exception as e:
|
|
return f"(Vector search error: {e})"
|
|
|
|
|
|
async def build_rag_context(user_message: str, rag_wiki: bool = True, rag_vector: bool = True) -> str:
|
|
"""Build RAG context from KB + VectorDB searches."""
|
|
kb_results = ""
|
|
vec_results = ""
|
|
tasks = []
|
|
if rag_wiki:
|
|
tasks.append(search_kb(user_message, 3))
|
|
if rag_vector:
|
|
tasks.append(search_vector(user_message, 5))
|
|
|
|
if tasks:
|
|
results = await asyncio.gather(*tasks)
|
|
idx = 0
|
|
if rag_wiki:
|
|
kb_results = results[idx]; idx += 1
|
|
if rag_vector:
|
|
vec_results = results[idx]
|
|
|
|
parts = [
|
|
"You are Z.ai Wiki Assistant. Use ALL the knowledge sources below to answer the user's question.",
|
|
"Draw from both the Wiki KB and Community Messages. Synthesize information even from partial matches.",
|
|
"If the context mentions anything relevant, include it in your answer. Be specific — quote authors, channels, and details when available.",
|
|
"Only say you don't have information if the sources are truly empty or completely unrelated.",
|
|
"",
|
|
]
|
|
if rag_wiki:
|
|
parts += ["=== Wiki Knowledge Base ===", kb_results or "(no KB results found)", ""]
|
|
if rag_vector:
|
|
parts += ["=== Related Community Messages (Discord/Reddit) ===", vec_results or "(no community messages found)"]
|
|
if not rag_wiki and not rag_vector:
|
|
parts.append("(RAG sources disabled for this session)")
|
|
return "\n".join(parts)
|
|
|
|
|
|
# ── LLM Provider Calls ──
|
|
|
|
def format_messages_openai(system: str, messages: list, model: str) -> dict:
|
|
"""Format for OpenAI-compatible /chat/completions endpoint."""
|
|
return {"model": model, "messages": [{"role": "system", "content": system}] + messages,
|
|
"temperature": 0.7, "max_tokens": 2048, "stream": True}
|
|
|
|
|
|
def format_messages_anthropic(system: str, messages: list, model: str) -> dict:
|
|
"""Convert OpenAI-format messages to Anthropic format."""
|
|
anthropic_msgs = []
|
|
for m in messages:
|
|
role = "user" if m["role"] == "user" else "assistant"
|
|
anthropic_msgs.append({"role": role, "content": m["content"]})
|
|
return {"model": model, "system": system, "messages": anthropic_msgs,
|
|
"max_tokens": 2048, "stream": True}
|
|
|
|
|
|
def format_messages_ollama(system: str, messages: list, model: str) -> dict:
|
|
"""Format for Ollama /api/chat endpoint."""
|
|
ollama_msgs = []
|
|
for m in messages:
|
|
role = "user" if m["role"] == "user" else "assistant"
|
|
ollama_msgs.append({"role": role, "content": m["content"]})
|
|
return {"model": model, "messages": ollama_msgs, "stream": True}
|
|
|
|
|
|
async def call_llm_stream(provider: dict, system: str, messages: list):
|
|
"""Call LLM provider and yield SSE delta chunks."""
|
|
base_url = provider["base_url"].rstrip("/")
|
|
fmt = provider.get("format", detect_provider_format(base_url))
|
|
api_key = provider.get("api_key", "")
|
|
model = provider.get("model", "gpt-4o-mini")
|
|
|
|
if fmt == "anthropic":
|
|
payload = format_messages_anthropic(system, messages, model)
|
|
url = f"{base_url}/v1/messages"
|
|
headers = {"x-api-key": api_key, "Content-Type": "application/json",
|
|
"anthropic-version": "2023-06-01"}
|
|
elif fmt == "ollama":
|
|
payload = format_messages_ollama(system, messages, model)
|
|
url = f"{base_url}/api/chat"
|
|
headers = {"Content-Type": "application/json"}
|
|
else:
|
|
# openai / openrouter / groq / default
|
|
payload = format_messages_openai(system, messages, model)
|
|
url = f"{base_url}/chat/completions"
|
|
headers = {}
|
|
if api_key:
|
|
headers["Authorization"] = f"Bearer {api_key}"
|
|
if fmt == "openrouter":
|
|
headers["HTTP-OpenRouter-AI-Model"] = model
|
|
headers["Content-Type"] = "application/json"
|
|
|
|
data = json.dumps(payload).encode()
|
|
req = urllib.request.Request(url, data=data, headers=headers)
|
|
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=60) as resp:
|
|
reader = resp
|
|
buf = b""
|
|
while True:
|
|
chunk = reader.read(4096)
|
|
if not chunk:
|
|
break
|
|
buf += chunk
|
|
while b"\n" in buf:
|
|
line, buf = buf.split(b"\n", 1)
|
|
line = line.decode("utf-8", errors="replace").strip()
|
|
if not line:
|
|
continue
|
|
if line.startswith("data: "):
|
|
data_str = line[5:].strip()
|
|
if data_str == "[DONE]":
|
|
yield {"type": "done"}
|
|
return
|
|
try:
|
|
chunk_data = json.loads(data_str)
|
|
except json.JSONDecodeError:
|
|
yield {"delta": data_str, "type": "raw"}
|
|
continue
|
|
|
|
# Anthropic SSE format
|
|
if fmt == "anthropic":
|
|
evt_type = chunk_data.get("type", "")
|
|
if evt_type == "content_block_delta":
|
|
text = chunk_data.get("delta", {}).get("text", "")
|
|
if text:
|
|
yield {"delta": text, "type": "delta"}
|
|
elif evt_type == "message_stop":
|
|
yield {"type": "done"}
|
|
return
|
|
elif evt_type == "error":
|
|
err_msg = chunk_data.get("error", {}).get("message", str(chunk_data))
|
|
yield {"type": "error", "delta": err_msg}
|
|
return
|
|
continue
|
|
|
|
# OpenAI-compatible SSE format
|
|
deltas = chunk_data.get("choices", [{}])[0].get("delta", {})
|
|
content = deltas.get("content", "")
|
|
if content:
|
|
yield {"delta": content, "type": "delta"}
|
|
tool_calls = deltas.get("tool_calls")
|
|
if tool_calls:
|
|
names = [tc.get("function", {}).get("name", "?") for tc in tool_calls]
|
|
yield {"delta": f"\n[Using tools: {', '.join(names)}]", "type": "tool"}
|
|
finish = chunk_data.get("finish_reason")
|
|
if finish:
|
|
yield {"type": "done"}
|
|
elif line.startswith("event:"):
|
|
pass # SSE event name, skip
|
|
except urllib.error.HTTPError as e:
|
|
body = e.read().decode("utf-8", errors="replace")[:500]
|
|
yield {"type": "error", "delta": f"HTTP {e.code}: {body}"}
|
|
except Exception as e:
|
|
yield {"type": "error", "delta": str(e)}
|
|
|
|
|
|
# ── FastAPI App ──
|
|
|
|
try:
|
|
from fastapi import FastAPI
|
|
from fastapi.responses import StreamingResponse, JSONResponse, Response
|
|
from pydantic import BaseModel
|
|
except ImportError:
|
|
print("Installing fastapi...")
|
|
os.system("pip install fastapi uvicorn httpx -q")
|
|
from fastapi import FastAPI
|
|
from fastapi.responses import StreamingResponse, JSONResponse, Response
|
|
from pydantic import BaseModel
|
|
|
|
app = FastAPI(title="Wiki VectorDB Chat")
|
|
|
|
|
|
class ChatMessage(BaseModel):
|
|
message: str
|
|
provider_id: str = "zai-coding"
|
|
history: list = []
|
|
rag_wiki: bool = True
|
|
rag_vector: bool = True
|
|
mode: str = "chat"
|
|
|
|
|
|
class ProviderSave(BaseModel):
|
|
id: str
|
|
name: str
|
|
base_url: str
|
|
model: str
|
|
api_key: str = ""
|
|
format: str = "openai"
|
|
icon: str = "\u2b99"
|
|
description: str = ""
|
|
|
|
|
|
@app.get("/providers/presets")
|
|
async def get_presets():
|
|
return PRESETS
|
|
|
|
|
|
@app.get("/providers")
|
|
async def list_providers():
|
|
return get_all_providers()
|
|
|
|
|
|
@app.post("/providers/save")
|
|
async def save_provider(p: ProviderSave):
|
|
custom = load_custom_providers()
|
|
p_dict = p.model_dump()
|
|
# Update or append
|
|
found = False
|
|
for i, existing in enumerate(custom):
|
|
if existing.get("id") == p.id:
|
|
custom[i] = p_dict
|
|
found = True
|
|
break
|
|
if not found:
|
|
custom.append(p_dict)
|
|
save_custom_providers(custom)
|
|
return {"ok": True, "provider": p_dict}
|
|
|
|
|
|
@app.delete("/providers/{provider_id}")
|
|
async def delete_provider(provider_id: str):
|
|
custom = load_custom_providers()
|
|
custom = [p for p in custom if p.get("id") != provider_id]
|
|
save_custom_providers(custom)
|
|
return {"ok": True}
|
|
|
|
|
|
@app.post("/chat/message")
|
|
async def chat_message(msg: ChatMessage):
|
|
async def generate():
|
|
providers = get_all_providers()
|
|
provider = next((p for p in providers if p.get("id") == msg.provider_id), None)
|
|
if not provider:
|
|
yield f"data: {json.dumps({'type':'error','delta':'Provider not found'})}\n\n"
|
|
return
|
|
|
|
# Build conversation history
|
|
messages = []
|
|
for h in msg.history[-10:]:
|
|
messages.append(h)
|
|
|
|
messages.append({"role": "user", "content": msg.message})
|
|
|
|
# Build RAG context with per-session toggles
|
|
rag_context = await build_rag_context(msg.message, msg.rag_wiki, msg.rag_vector)
|
|
|
|
# Mode-specific system prompt additions
|
|
mode_hints = {
|
|
"chat": "",
|
|
"code": "\n\nMODE: Coding. The user is working on code. Provide precise, well-structured code examples with explanations. Use markdown code blocks. Be concise and technical.",
|
|
"brain": "\n\nMODE: Brainstorm. The user wants creative exploration. Think freely, offer multiple perspectives, suggest unconventional approaches. Be enthusiastic and expansive.",
|
|
}
|
|
system_prompt = rag_context + mode_hints.get(msg.mode, "")
|
|
|
|
async for chunk in call_llm_stream(provider, system_prompt, messages):
|
|
data = json.dumps(chunk, ensure_ascii=False)
|
|
yield f"data: {data}\n\n"
|
|
yield f"data: {json.dumps({'type':'done'})}\n\n"
|
|
|
|
return StreamingResponse(generate(), media_type="text/event-stream",
|
|
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
|
|
|
|
|
|
@app.post("/chat/tunnel")
|
|
async def chat_tunnel(msg: ChatMessage):
|
|
"""Server-side token chat — uses ZAI_API_TOKEN env var if available."""
|
|
async def generate():
|
|
providers = get_all_providers()
|
|
provider = next((p for p in providers if p.get("id") == msg.provider_id), None)
|
|
if not provider:
|
|
yield f"data: {json.dumps({'type':'error','delta':'Provider not found'})}\n\n"
|
|
return
|
|
|
|
# Use server-side token if available (for tunnel mode)
|
|
token = os.environ.get("ZAI_API_TOKEN", "")
|
|
if token and not provider.get("api_key"):
|
|
provider = dict(provider)
|
|
provider["api_key"] = token
|
|
|
|
messages = []
|
|
for h in msg.history[-10:]:
|
|
messages.append(h)
|
|
messages.append({"role": "user", "content": msg.message})
|
|
|
|
rag_context = await build_rag_context(msg.message)
|
|
|
|
async for chunk in call_llm_stream(provider, rag_context, messages):
|
|
data = json.dumps(chunk, ensure_ascii=False)
|
|
yield f"data: {data}\n\n"
|
|
yield f"data: {json.dumps({'type':'done'})}\n\n"
|
|
|
|
return StreamingResponse(generate(), media_type="text/event-stream",
|
|
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
|
|
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
return {"status": "ok", "providers": len(get_all_providers())}
|
|
|
|
|
|
class WikiSave(BaseModel):
|
|
question: str
|
|
answer: str
|
|
topic: str = "chat-saved"
|
|
|
|
|
|
@app.post("/chat/save-to-wiki")
|
|
async def save_to_wiki(item: WikiSave):
|
|
"""Save a Q&A pair directly to wiki-kb.json."""
|
|
try:
|
|
kb_path = Path("/opt/blog/wiki-kb.json")
|
|
kb = json.loads(kb_path.read_text())
|
|
entry = {
|
|
"q": item.question,
|
|
"a": item.answer,
|
|
"topic": item.topic,
|
|
"author": "chat-assistant",
|
|
"source": "chat-saved",
|
|
"timestamp": time.strftime("%Y-%m-%d %H:%M"),
|
|
}
|
|
kb.append(entry)
|
|
kb_path.write_text(json.dumps(kb, ensure_ascii=False, indent=2))
|
|
return {"ok": True, "total": len(kb)}
|
|
except Exception as e:
|
|
return {"ok": False, "error": str(e)}
|
|
|
|
|
|
def main():
|
|
import uvicorn
|
|
port = 8770
|
|
for i, arg in enumerate(__import__("sys").argv):
|
|
if arg == "--port" and i + 1 < len(__import__("sys").argv):
|
|
port = int(__import__("sys").argv[i + 1])
|
|
print(f"Wiki VectorDB Chat starting on port {port}")
|
|
uvicorn.run(app, host="127.0.0.1", port=port, log_level="warning")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|