fix: rewrite chatWithAI as unified agentic tool loop

OLD: streaming and non-streaming were separate paths. Streaming detected tool
calls and recursively called non-streaming which only did ONE round of tool
execution with no loop-back. This caused silent hangs.

NEW: single chatWithAI with internal while loop (max 10 turns):
  1. Call API (stream or non-stream)
  2. If tool_calls → execute all → append results → loop
  3. If text content → return final answer

Key fixes:
- streamChat now ACCUMULATES tool_call deltas instead of aborting
- Tool results are fed back to the AI in the same conversation
- Multi-turn: AI can call tools multiple times before answering
- Max 10 turns with forced final answer as safety net
- Proper { content, tool_calls, error } return type from both paths
- Non-streaming fallback if SSE fails
- No more recursive calls between stream/non-stream
This commit is contained in:
admin
2026-05-05 17:12:16 +00:00
Unverified
parent 78349bb22b
commit fe6d3f4db8

View File

@@ -353,101 +353,142 @@ export async function initBot(config, api, tools, skills, agents) {
}, },
}; };
// ── AI chat with function calling ── // ── AI chat with agentic tool loop ──
// Unified streaming + non-streaming with multi-turn tool execution.
// Pattern: call API → if tool_calls → execute → feed back → loop → else return text.
const MAX_TOOL_TURNS = 10;
async function chatWithAI(messages, opts = {}) { async function chatWithAI(messages, opts = {}) {
const model = opts.model || svc.config?.api?.models?.default || 'glm-5.1'; const model = opts.model || svc.config?.api?.models?.default || 'glm-5.1';
const tools = [];
const toolMap = svc.toolMap; const toolMap = svc.toolMap;
const onDelta = opts.onDelta || null;
// Register all tools that have a matching class loaded // Build tool schemas (only once, reused across turns)
const toolSchemas = [];
for (const [name, def] of Object.entries(TOOL_DEFS)) { for (const [name, def] of Object.entries(TOOL_DEFS)) {
if (name === 'delegate_agent' && !svc.agents.length) continue; if (name === 'delegate_agent' && !svc.agents.length) continue;
if (name === 'run_skill' && !svc.skills.length) continue; if (name === 'run_skill' && !svc.skills.length) continue;
// delegate is special — dynamically created, always available
if (!toolMap.has(name) && name !== 'delegate_agent' && name !== 'run_skill' && name !== 'delegate') continue; if (!toolMap.has(name) && name !== 'delegate_agent' && name !== 'run_skill' && name !== 'delegate') continue;
tools.push({ type: 'function', function: { name, ...def } }); toolSchemas.push({ type: 'function', function: { name, ...def } });
} }
try { // Working copy of messages — tool results get appended here
const loopMessages = [...messages];
let turns = 0;
while (turns < MAX_TOOL_TURNS) {
const body = { const body = {
model, model,
messages, messages: loopMessages,
temperature: opts.temperature ?? 0.7, temperature: opts.temperature ?? 0.7,
max_tokens: opts.maxTokens || 4096, max_tokens: opts.maxTokens || 4096,
stream: !!opts.onDelta, // Enable SSE when delta callback is provided
}; };
if (tools.length) body.tools = tools; if (toolSchemas.length) body.tools = toolSchemas;
// ── Streaming path (SSE) ── let response; // { content: string, tool_calls: array|null }
if (opts.onDelta) {
return await chatWithAIStream(svc, body, tools, toolHandlers, opts.onDelta); if (onDelta) {
// ── Streaming path (SSE) ──
response = await streamChat(svc, body, onDelta);
} else {
// ── Non-streaming path ──
response = await nonStreamChat(body);
} }
// ── Non-streaming path (original) ── if (response.error) {
const response = await api.client.post('/chat/completions', body); // On first turn, return error. On subsequent turns, return what we have.
const choice = response.data.choices?.[0]; if (turns === 0) return `${response.error}`;
if (!choice) return '❌ No response from model.'; logger.error(`AI error on turn ${turns}: ${response.error}`);
return response.content || `${response.error}`;
}
const msg = choice.message; // No tool calls → final text answer
if (msg.tool_calls?.length) { if (!response.tool_calls?.length) {
// Execute all tool calls, feed results back to AI for final answer return response.content || '✅ Done.';
const toolMessages = [{ role: 'assistant', tool_calls: msg.tool_calls }]; }
for (const tc of msg.tool_calls) {
const fn = tc.function; // ── Execute tool calls ──
try { turns++;
const handler = toolHandlers[fn.name]; logger.info(`🔧 Tool turn ${turns}/${MAX_TOOL_TURNS}${response.tool_calls.length} call(s)`);
if (!handler) {
toolMessages.push({ role: 'tool', tool_call_id: tc.id, content: `❌ Unknown tool: ${fn.name}` }); // Append assistant message with tool_calls to conversation
continue; loopMessages.push({ role: 'assistant', tool_calls: response.tool_calls });
}
const args = JSON.parse(fn.arguments); for (const tc of response.tool_calls) {
logger.info(`🔧 ${fn.name}(${fn.arguments?.slice(0, 80)})`); const fn = tc.function;
const result = await handler(args); let result;
toolMessages.push({ role: 'tool', tool_call_id: tc.id, content: String(result).slice(0, 4000) }); try {
} catch (e) { const handler = toolHandlers[fn.name];
toolMessages.push({ role: 'tool', tool_call_id: tc.id, content: `❌ Error: ${e.message}` }); if (!handler) {
result = `❌ Unknown tool: ${fn.name}`;
} else {
const args = JSON.parse(fn.arguments || '{}');
logger.info(`${fn.name}(${fn.arguments?.slice(0, 100)})`);
result = String(await handler(args)).slice(0, 8000);
} }
} catch (e) {
result = `${fn.name} error: ${e.message}`;
logger.error(`${fn.name} failed: ${e.message}`);
} }
// Ask AI to produce final text answer using tool results loopMessages.push({ role: 'tool', tool_call_id: tc.id, content: result });
const followUp = [...body.messages, ...toolMessages, { role: 'user', content: 'Based on the tool results above, provide your final answer.' }];
const followRes = await api.client.post('/chat/completions', { ...body, messages: followUp, tools: [] });
return followRes.data.choices?.[0]?.message?.content || '✅ Done.';
} }
return msg.content || '✅ Done.'; // Loop continues — AI will see tool results and either call more tools or answer
} catch (error) { }
logger.error('AI error:', error.response?.data || error.message);
return `${error.response?.data?.error?.message || error.message}`; // Exhausted turns — do one final call without tools to force a text answer
logger.warn(`⚠ Max tool turns (${MAX_TOOL_TURNS}) reached, forcing final answer`);
try {
const final = await nonStreamChat({
model, messages: loopMessages, temperature: 0.3,
max_tokens: opts.maxTokens || 4096,
});
return final.content || '✅ Done (max tool turns reached).';
} catch (e) {
return `⚠ Max tool turns reached. Last error: ${e.message}`;
} }
} }
/** // ── Non-streaming API call — returns { content, tool_calls, error } ──
* Streaming chat completion via SSE. async function nonStreamChat(body) {
* Pipes each token chunk to onDelta() callback in real-time. try {
* Falls back to non-streaming if SSE fails. const res = await api.client.post('/chat/completions', { ...body, stream: false });
*/ const choice = res.data.choices?.[0];
async function chatWithAIStream(svc, body, tools, toolHandlers, onDelta) { if (!choice) return { content: '', tool_calls: null, error: 'No response from model' };
const msg = choice.message || {};
return {
content: msg.content || '',
tool_calls: msg.tool_calls || null,
error: null,
};
} catch (e) {
return { content: '', tool_calls: null, error: e.response?.data?.error?.message || e.message };
}
}
// ── Streaming API call (SSE) — returns { content, tool_calls, error } ──
// Streams tokens via onDelta. If tool_calls detected, accumulates them and returns.
async function streamChat(svc, body, onDelta) {
const baseUrl = svc.api?.config?.baseUrl || 'https://api.z.ai/api/coding/paas/v4'; const baseUrl = svc.api?.config?.baseUrl || 'https://api.z.ai/api/coding/paas/v4';
const apiKey = svc.api?.config?.apiKey || ''; const apiKey = svc.api?.config?.apiKey || '';
let fullContent = '';
const toolCallMap = {}; // index → { id, name, arguments }
let finishReason = null;
let fullResponse = '';
try { try {
const response = await fetch(`${baseUrl}/chat/completions`, { const res = await fetch(`${baseUrl}/chat/completions`, {
method: 'POST', method: 'POST',
headers: { headers: { 'Authorization': `Bearer ${apiKey}`, 'Content-Type': 'application/json' },
'Authorization': `Bearer ${apiKey}`, body: JSON.stringify({ ...body, stream: true }),
'Content-Type': 'application/json',
},
body: JSON.stringify(body),
}); });
if (!response.ok) { if (!res.ok) {
const errText = await response.text(); const errText = await res.text();
logger.error(`SSE error ${response.status}: ${errText}`); logger.error(`SSE ${res.status}: ${errText.slice(0, 200)}`);
// Fallback to non-streaming // Fallback to non-streaming
return await chatWithAI(body.messages, { ...body, stream: false }); return await nonStreamChat(body);
} }
const reader = response.body.getReader(); const reader = res.body.getReader();
const decoder = new TextDecoder(); const decoder = new TextDecoder();
let buffer = ''; let buffer = '';
@@ -457,45 +498,56 @@ export async function initBot(config, api, tools, skills, agents) {
buffer += decoder.decode(value, { stream: true }); buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n'); const lines = buffer.split('\n');
buffer = lines.pop() || ''; // Keep incomplete line in buffer buffer = lines.pop() || '';
for (const line of lines) { for (const line of lines) {
const trimmed = line.trim(); const trimmed = line.trim();
if (!trimmed || !trimmed.startsWith('data: ')) continue; if (!trimmed.startsWith('data: ')) continue;
const data = trimmed.slice(6); const data = trimmed.slice(6);
if (data === '[DONE]') continue; if (data === '[DONE]') continue;
try { try {
const parsed = JSON.parse(data); const parsed = JSON.parse(data);
const choices = parsed.choices || []; const choice = parsed.choices?.[0];
if (choices.length > 0) { if (!choice) continue;
const delta = choices[0].delta || {}; finishReason = choice.finish_reason;
const content = delta.content || '';
if (content) { const delta = choice.delta || {};
fullResponse += content; // Stream text content
onDelta(content); if (delta.content) {
} fullContent += delta.content;
// Tool calls in streaming - abort stream, fall back to non-streaming (handles tools) onDelta(delta.content);
if (delta.tool_calls) { }
logger.info("Tool call in stream, falling back to non-streaming"); // Accumulate tool calls from stream deltas
reader.cancel().catch(() => {}); if (delta.tool_calls) {
return await chatWithAI(body.messages, { maxTokens: body.max_tokens }); for (const tc of delta.tool_calls) {
const idx = tc.index ?? 0;
if (!toolCallMap[idx]) toolCallMap[idx] = { id: tc.id || '', name: '', arguments: '' };
if (tc.id) toolCallMap[idx].id = tc.id;
if (tc.function?.name) toolCallMap[idx].name += tc.function.name;
if (tc.function?.arguments) toolCallMap[idx].arguments += tc.function.arguments;
} }
} }
} catch { } catch { /* skip malformed chunks */ }
// Ignore malformed JSON lines
}
} }
} }
} catch (e) { } catch (e) {
logger.error('SSE stream error:', e.message); logger.error('SSE error:', e.message);
// Fallback to non-streaming if (!fullContent && !Object.keys(toolCallMap).length) {
if (!fullResponse) { return await nonStreamChat(body);
return await chatWithAI(body.messages, { maxTokens: body.max_tokens });
} }
} }
return fullResponse || '✅ Done.'; // Build tool_calls array from accumulated deltas
const toolCalls = Object.keys(toolCallMap).length > 0
? Object.values(toolCallMap).map(tc => ({
id: tc.id,
type: 'function',
function: { name: tc.name, arguments: tc.arguments },
}))
: null;
return { content: fullContent, tool_calls: toolCalls, error: null };
} }
// ── Tool handlers: route API tool_calls to tool class methods ── // ── Tool handlers: route API tool_calls to tool class methods ──