Files
Zportal-Wiki-VectorDB-Chat/wiki-api.py
admin ae621ecbb5 Initial release: Multi-provider AI chat with RAG
FastAPI backend (wiki-vector-chat.py) with Odysseus-style frontend.
Features: multi-provider LLM, Wiki KB + VectorDB RAG, session history,
chat modes, save-to-wiki, markdown rendering, SSE streaming.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-03 10:25:29 +00:00

172 lines
6.0 KiB
Python
Executable File

#!/usr/bin/env python3
"""Z.ai Wiki KB Search API - token-protected"""
import json, os, sys, urllib.parse, hashlib
from http.server import HTTPServer, BaseHTTPRequestHandler
KB_PATH = "/opt/blog/wiki-kb.json"
TOKEN_PATH = "/opt/blog/.wiki-api-token"
PORT = 8097
LOG_PATH = "/opt/blog/data/search-logs.json"
MAX_LOG_ENTRIES = 5000
def load_logs():
if os.path.exists(LOG_PATH):
try:
with open(LOG_PATH) as f:
return json.load(f)
except:
pass
return []
def save_logs(logs):
os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True)
# Keep only last MAX_LOG_ENTRIES
with open(LOG_PATH, 'w') as f:
json.dump(logs[-MAX_LOG_ENTRIES:], f)
def log_search(source, query, results, meta=None):
logs = load_logs()
entry = {
"ts": __import__('time').strftime("%Y-%m-%dT%H:%M:%SZ"),
"source": source,
"query": query,
"result_count": len(results) if isinstance(results, list) else 0,
"top_results": [
{"q": r.get("q", r.get("content", ""))[:100], "score": r.get("score", 0), "source": r.get("source", "")}
for r in (results[:3] if isinstance(results, list) else [])
],
}
if meta:
entry["meta"] = meta
logs.append(entry)
save_logs(logs)
return entry
# Load or generate token
def load_token():
if os.path.exists(TOKEN_PATH):
with open(TOKEN_PATH) as f:
return f.read().strip()
return None
API_TOKEN = load_token()
with open(KB_PATH) as f:
KB = json.load(f)
def search_kb(query, topic=None, limit=20):
query_lower = query.lower()
query_words = set(query_lower.split())
results = []
for entry in KB:
score = 0
q_text = entry.get("q", "").lower()
a_text = entry.get("a", "").lower()
q_words = set(q_text.split())
a_words = set(a_text.split())
score += len(query_words & q_words) * 3
score += len(query_words & a_words) * 1
if query_lower in q_text: score += 10
if query_lower in a_text: score += 5
if topic and entry.get("topic", "").lower() != topic.lower(): score -= 50
if score > 0:
results.append({**entry, "score": score})
results.sort(key=lambda x: -x["score"])
return results[:limit]
def check_auth(params, headers):
if not API_TOKEN:
return False
# Check query param ?token=...
token = params.get("token", [""])[0]
if token == API_TOKEN:
return True
# Check header Authorization: Bearer ...
auth = headers.get("Authorization", "")
if auth.startswith("Bearer "):
if auth[7:] == API_TOKEN:
return True
# Check header X-API-Key
api_key = headers.get("X-Api-Key", "")
if api_key == API_TOKEN:
return True
return False
class Handler(BaseHTTPRequestHandler):
def do_GET(self):
parsed = urllib.parse.urlparse(self.path)
params = urllib.parse.parse_qs(parsed.query)
if not check_auth(params, self.headers):
self.send_response(401)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(json.dumps({"error": "Unauthorized. Provide ?token=YOUR_TOKEN or Authorization: Bearer YOUR_TOKEN"}).encode())
return
if parsed.path == "/search":
query = params.get("q", [""])[0]
topic = params.get("topic", [None])[0]
limit = int(params.get("limit", [20])[0])
if not query:
body = json.dumps({"error": "Missing ?q= parameter"}).encode()
else:
results = search_kb(query, topic, limit)
log_search("kb", query, results, {"topic": topic, "limit": limit, "ip": self.headers.get("X-Real-IP", self.client_address[0])})
body = json.dumps({"query": query, "count": len(results), "results": results}, ensure_ascii=False).encode()
elif parsed.path == "/kb":
body = json.dumps(KB, ensure_ascii=False).encode()
elif parsed.path == "/logs":
logs = load_logs()
body = json.dumps({"total": len(logs), "logs": logs[::-1]}, ensure_ascii=False).encode()
else:
self.send_response(404)
self.end_headers()
return
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Content-Length", len(body))
self.end_headers()
self.wfile.write(body)
def do_DELETE(self):
parsed = urllib.parse.urlparse(self.path)
params = urllib.parse.parse_qs(parsed.query)
if not check_auth(params, self.headers):
self.send_response(401)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(json.dumps({"error": "Unauthorized"}).encode())
return
if parsed.path == "/logs":
save_logs([])
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(json.dumps({"cleared": True}).encode())
return
self.send_response(404)
self.end_headers()
def do_OPTIONS(self):
self.send_response(200)
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET, POST, DELETE, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Authorization, X-Api-Key, Content-Type")
self.end_headers()
def log_message(self, format, *args):
pass
if __name__ == "__main__":
print(f"Wiki KB API running on port {PORT}")
server = HTTPServer(("127.0.0.1", PORT), Handler)
server.serve_forever()