#!/usr/bin/env python3 """Z.ai Wiki KB Search API - token-protected""" import json, os, sys, urllib.parse, hashlib from http.server import HTTPServer, BaseHTTPRequestHandler KB_PATH = "/opt/blog/wiki-kb.json" TOKEN_PATH = "/opt/blog/.wiki-api-token" PORT = 8097 LOG_PATH = "/opt/blog/data/search-logs.json" MAX_LOG_ENTRIES = 5000 def load_logs(): if os.path.exists(LOG_PATH): try: with open(LOG_PATH) as f: return json.load(f) except: pass return [] def save_logs(logs): os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True) # Keep only last MAX_LOG_ENTRIES with open(LOG_PATH, 'w') as f: json.dump(logs[-MAX_LOG_ENTRIES:], f) def log_search(source, query, results, meta=None): logs = load_logs() entry = { "ts": __import__('time').strftime("%Y-%m-%dT%H:%M:%SZ"), "source": source, "query": query, "result_count": len(results) if isinstance(results, list) else 0, "top_results": [ {"q": r.get("q", r.get("content", ""))[:100], "score": r.get("score", 0), "source": r.get("source", "")} for r in (results[:3] if isinstance(results, list) else []) ], } if meta: entry["meta"] = meta logs.append(entry) save_logs(logs) return entry # Load or generate token def load_token(): if os.path.exists(TOKEN_PATH): with open(TOKEN_PATH) as f: return f.read().strip() return None API_TOKEN = load_token() with open(KB_PATH) as f: KB = json.load(f) def search_kb(query, topic=None, limit=20): query_lower = query.lower() query_words = set(query_lower.split()) results = [] for entry in KB: score = 0 q_text = entry.get("q", "").lower() a_text = entry.get("a", "").lower() q_words = set(q_text.split()) a_words = set(a_text.split()) score += len(query_words & q_words) * 3 score += len(query_words & a_words) * 1 if query_lower in q_text: score += 10 if query_lower in a_text: score += 5 if topic and entry.get("topic", "").lower() != topic.lower(): score -= 50 if score > 0: results.append({**entry, "score": score}) results.sort(key=lambda x: -x["score"]) return results[:limit] def check_auth(params, headers): if not API_TOKEN: return False # Check query param ?token=... token = params.get("token", [""])[0] if token == API_TOKEN: return True # Check header Authorization: Bearer ... auth = headers.get("Authorization", "") if auth.startswith("Bearer "): if auth[7:] == API_TOKEN: return True # Check header X-API-Key api_key = headers.get("X-Api-Key", "") if api_key == API_TOKEN: return True return False class Handler(BaseHTTPRequestHandler): def do_GET(self): parsed = urllib.parse.urlparse(self.path) params = urllib.parse.parse_qs(parsed.query) if not check_auth(params, self.headers): self.send_response(401) self.send_header("Content-Type", "application/json") self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.wfile.write(json.dumps({"error": "Unauthorized. Provide ?token=YOUR_TOKEN or Authorization: Bearer YOUR_TOKEN"}).encode()) return if parsed.path == "/search": query = params.get("q", [""])[0] topic = params.get("topic", [None])[0] limit = int(params.get("limit", [20])[0]) if not query: body = json.dumps({"error": "Missing ?q= parameter"}).encode() else: results = search_kb(query, topic, limit) log_search("kb", query, results, {"topic": topic, "limit": limit, "ip": self.headers.get("X-Real-IP", self.client_address[0])}) body = json.dumps({"query": query, "count": len(results), "results": results}, ensure_ascii=False).encode() elif parsed.path == "/kb": body = json.dumps(KB, ensure_ascii=False).encode() elif parsed.path == "/logs": logs = load_logs() body = json.dumps({"total": len(logs), "logs": logs[::-1]}, ensure_ascii=False).encode() else: self.send_response(404) self.end_headers() return self.send_response(200) self.send_header("Content-Type", "application/json") self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Content-Length", len(body)) self.end_headers() self.wfile.write(body) def do_DELETE(self): parsed = urllib.parse.urlparse(self.path) params = urllib.parse.parse_qs(parsed.query) if not check_auth(params, self.headers): self.send_response(401) self.send_header("Content-Type", "application/json") self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.wfile.write(json.dumps({"error": "Unauthorized"}).encode()) return if parsed.path == "/logs": save_logs([]) self.send_response(200) self.send_header("Content-Type", "application/json") self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.wfile.write(json.dumps({"cleared": True}).encode()) return self.send_response(404) self.end_headers() def do_OPTIONS(self): self.send_response(200) self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "GET, POST, DELETE, OPTIONS") self.send_header("Access-Control-Allow-Headers", "Authorization, X-Api-Key, Content-Type") self.end_headers() def log_message(self, format, *args): pass if __name__ == "__main__": print(f"Wiki KB API running on port {PORT}") server = HTTPServer(("127.0.0.1", PORT), Handler) server.serve_forever()