Switch to async httpx for RAG searches to prevent event loop blocking
The multi-query vector search with blocking urllib.request.urlopen calls was stalling the single-threaded uvicorn event loop. Now uses async httpx.AsyncClient with asyncio.gather for parallel requests. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -125,11 +125,17 @@ async def search_kb(query: str, limit: int = 5) -> str:
|
||||
queries = list(dict.fromkeys(queries))[:2]
|
||||
|
||||
all_results = {}
|
||||
for q in queries:
|
||||
url = f"{WIKI_API}/search?q={urllib.parse.quote(q)}&limit={limit}&token={_API_TOKEN}"
|
||||
req = urllib.request.Request(url)
|
||||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||||
data = json.loads(resp.read())
|
||||
import httpx
|
||||
async with httpx.AsyncClient(timeout=5) as client:
|
||||
tasks = []
|
||||
for q in queries:
|
||||
url = f"{WIKI_API}/search?q={urllib.parse.quote(q)}&limit={limit}&token={_API_TOKEN}"
|
||||
tasks.append(client.get(url))
|
||||
responses = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
for resp in responses:
|
||||
if isinstance(resp, Exception):
|
||||
continue
|
||||
data = resp.json()
|
||||
for r in data.get("results", []):
|
||||
key = r.get("q", "")[:80]
|
||||
if key not in all_results:
|
||||
@@ -173,17 +179,22 @@ async def search_vector(query: str, top_k: int = 10) -> str:
|
||||
queries = list(dict.fromkeys(queries))[:4]
|
||||
|
||||
all_hits = {}
|
||||
for q in queries:
|
||||
data = json.dumps({"query": q, "top_k": 30}).encode()
|
||||
req = urllib.request.Request(
|
||||
f"{VECTOR_DB}/vector/search",
|
||||
data=data,
|
||||
headers={"Content-Type": "application/json", "x-api-key": _API_TOKEN},
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=8) as resp:
|
||||
result = json.loads(resp.read())
|
||||
# Use async httpx to avoid blocking the event loop
|
||||
import httpx
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
tasks = []
|
||||
for q in queries:
|
||||
tasks.append(client.post(
|
||||
f"{VECTOR_DB}/vector/search",
|
||||
json={"query": q, "top_k": 30},
|
||||
headers={"Content-Type": "application/json", "x-api-key": _API_TOKEN},
|
||||
))
|
||||
responses = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
for resp in responses:
|
||||
if isinstance(resp, Exception):
|
||||
continue
|
||||
result = resp.json()
|
||||
for h in result.get("results", []):
|
||||
# Deduplicate by content
|
||||
text = h.get("content", "") or h.get("text", "")
|
||||
key = text[:80]
|
||||
if key not in all_hits or h.get("score", 0) > all_hits[key].get("score", 0):
|
||||
|
||||
Reference in New Issue
Block a user