From fe6d3f4db85d24f9b7011c903efef86ee7ea6e7a Mon Sep 17 00:00:00 2001 From: admin Date: Tue, 5 May 2026 17:12:16 +0000 Subject: [PATCH] fix: rewrite chatWithAI as unified agentic tool loop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit OLD: streaming and non-streaming were separate paths. Streaming detected tool calls and recursively called non-streaming which only did ONE round of tool execution with no loop-back. This caused silent hangs. NEW: single chatWithAI with internal while loop (max 10 turns): 1. Call API (stream or non-stream) 2. If tool_calls → execute all → append results → loop 3. If text content → return final answer Key fixes: - streamChat now ACCUMULATES tool_call deltas instead of aborting - Tool results are fed back to the AI in the same conversation - Multi-turn: AI can call tools multiple times before answering - Max 10 turns with forced final answer as safety net - Proper { content, tool_calls, error } return type from both paths - Non-streaming fallback if SSE fails - No more recursive calls between stream/non-stream --- src/bot/index.js | 218 +++++++++++++++++++++++++++++------------------ 1 file changed, 135 insertions(+), 83 deletions(-) diff --git a/src/bot/index.js b/src/bot/index.js index 3b0cb1a5..509be9e8 100644 --- a/src/bot/index.js +++ b/src/bot/index.js @@ -353,101 +353,142 @@ export async function initBot(config, api, tools, skills, agents) { }, }; - // ── AI chat with function calling ── + // ── AI chat with agentic tool loop ── + // Unified streaming + non-streaming with multi-turn tool execution. + // Pattern: call API → if tool_calls → execute → feed back → loop → else return text. + const MAX_TOOL_TURNS = 10; + async function chatWithAI(messages, opts = {}) { const model = opts.model || svc.config?.api?.models?.default || 'glm-5.1'; - const tools = []; const toolMap = svc.toolMap; + const onDelta = opts.onDelta || null; - // Register all tools that have a matching class loaded + // Build tool schemas (only once, reused across turns) + const toolSchemas = []; for (const [name, def] of Object.entries(TOOL_DEFS)) { if (name === 'delegate_agent' && !svc.agents.length) continue; if (name === 'run_skill' && !svc.skills.length) continue; - // delegate is special — dynamically created, always available if (!toolMap.has(name) && name !== 'delegate_agent' && name !== 'run_skill' && name !== 'delegate') continue; - tools.push({ type: 'function', function: { name, ...def } }); + toolSchemas.push({ type: 'function', function: { name, ...def } }); } - try { + // Working copy of messages — tool results get appended here + const loopMessages = [...messages]; + let turns = 0; + + while (turns < MAX_TOOL_TURNS) { const body = { model, - messages, + messages: loopMessages, temperature: opts.temperature ?? 0.7, max_tokens: opts.maxTokens || 4096, - stream: !!opts.onDelta, // Enable SSE when delta callback is provided }; - if (tools.length) body.tools = tools; + if (toolSchemas.length) body.tools = toolSchemas; - // ── Streaming path (SSE) ── - if (opts.onDelta) { - return await chatWithAIStream(svc, body, tools, toolHandlers, opts.onDelta); + let response; // { content: string, tool_calls: array|null } + + if (onDelta) { + // ── Streaming path (SSE) ── + response = await streamChat(svc, body, onDelta); + } else { + // ── Non-streaming path ── + response = await nonStreamChat(body); } - // ── Non-streaming path (original) ── - const response = await api.client.post('/chat/completions', body); - const choice = response.data.choices?.[0]; - if (!choice) return '❌ No response from model.'; + if (response.error) { + // On first turn, return error. On subsequent turns, return what we have. + if (turns === 0) return `❌ ${response.error}`; + logger.error(`AI error on turn ${turns}: ${response.error}`); + return response.content || `❌ ${response.error}`; + } - const msg = choice.message; - if (msg.tool_calls?.length) { - // Execute all tool calls, feed results back to AI for final answer - const toolMessages = [{ role: 'assistant', tool_calls: msg.tool_calls }]; - for (const tc of msg.tool_calls) { - const fn = tc.function; - try { - const handler = toolHandlers[fn.name]; - if (!handler) { - toolMessages.push({ role: 'tool', tool_call_id: tc.id, content: `❌ Unknown tool: ${fn.name}` }); - continue; - } - const args = JSON.parse(fn.arguments); - logger.info(`🔧 ${fn.name}(${fn.arguments?.slice(0, 80)})`); - const result = await handler(args); - toolMessages.push({ role: 'tool', tool_call_id: tc.id, content: String(result).slice(0, 4000) }); - } catch (e) { - toolMessages.push({ role: 'tool', tool_call_id: tc.id, content: `❌ Error: ${e.message}` }); + // No tool calls → final text answer + if (!response.tool_calls?.length) { + return response.content || '✅ Done.'; + } + + // ── Execute tool calls ── + turns++; + logger.info(`🔧 Tool turn ${turns}/${MAX_TOOL_TURNS} — ${response.tool_calls.length} call(s)`); + + // Append assistant message with tool_calls to conversation + loopMessages.push({ role: 'assistant', tool_calls: response.tool_calls }); + + for (const tc of response.tool_calls) { + const fn = tc.function; + let result; + try { + const handler = toolHandlers[fn.name]; + if (!handler) { + result = `❌ Unknown tool: ${fn.name}`; + } else { + const args = JSON.parse(fn.arguments || '{}'); + logger.info(` → ${fn.name}(${fn.arguments?.slice(0, 100)})`); + result = String(await handler(args)).slice(0, 8000); } + } catch (e) { + result = `❌ ${fn.name} error: ${e.message}`; + logger.error(` → ${fn.name} failed: ${e.message}`); } - // Ask AI to produce final text answer using tool results - const followUp = [...body.messages, ...toolMessages, { role: 'user', content: 'Based on the tool results above, provide your final answer.' }]; - const followRes = await api.client.post('/chat/completions', { ...body, messages: followUp, tools: [] }); - return followRes.data.choices?.[0]?.message?.content || '✅ Done.'; + loopMessages.push({ role: 'tool', tool_call_id: tc.id, content: result }); } - return msg.content || '✅ Done.'; - } catch (error) { - logger.error('AI error:', error.response?.data || error.message); - return `❌ ${error.response?.data?.error?.message || error.message}`; + // Loop continues — AI will see tool results and either call more tools or answer + } + + // Exhausted turns — do one final call without tools to force a text answer + logger.warn(`⚠ Max tool turns (${MAX_TOOL_TURNS}) reached, forcing final answer`); + try { + const final = await nonStreamChat({ + model, messages: loopMessages, temperature: 0.3, + max_tokens: opts.maxTokens || 4096, + }); + return final.content || '✅ Done (max tool turns reached).'; + } catch (e) { + return `⚠ Max tool turns reached. Last error: ${e.message}`; } } - /** - * Streaming chat completion via SSE. - * Pipes each token chunk to onDelta() callback in real-time. - * Falls back to non-streaming if SSE fails. - */ - async function chatWithAIStream(svc, body, tools, toolHandlers, onDelta) { + // ── Non-streaming API call — returns { content, tool_calls, error } ── + async function nonStreamChat(body) { + try { + const res = await api.client.post('/chat/completions', { ...body, stream: false }); + const choice = res.data.choices?.[0]; + if (!choice) return { content: '', tool_calls: null, error: 'No response from model' }; + const msg = choice.message || {}; + return { + content: msg.content || '', + tool_calls: msg.tool_calls || null, + error: null, + }; + } catch (e) { + return { content: '', tool_calls: null, error: e.response?.data?.error?.message || e.message }; + } + } + + // ── Streaming API call (SSE) — returns { content, tool_calls, error } ── + // Streams tokens via onDelta. If tool_calls detected, accumulates them and returns. + async function streamChat(svc, body, onDelta) { const baseUrl = svc.api?.config?.baseUrl || 'https://api.z.ai/api/coding/paas/v4'; const apiKey = svc.api?.config?.apiKey || ''; + let fullContent = ''; + const toolCallMap = {}; // index → { id, name, arguments } + let finishReason = null; - let fullResponse = ''; try { - const response = await fetch(`${baseUrl}/chat/completions`, { + const res = await fetch(`${baseUrl}/chat/completions`, { method: 'POST', - headers: { - 'Authorization': `Bearer ${apiKey}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify(body), + headers: { 'Authorization': `Bearer ${apiKey}`, 'Content-Type': 'application/json' }, + body: JSON.stringify({ ...body, stream: true }), }); - if (!response.ok) { - const errText = await response.text(); - logger.error(`SSE error ${response.status}: ${errText}`); + if (!res.ok) { + const errText = await res.text(); + logger.error(`SSE ${res.status}: ${errText.slice(0, 200)}`); // Fallback to non-streaming - return await chatWithAI(body.messages, { ...body, stream: false }); + return await nonStreamChat(body); } - const reader = response.body.getReader(); + const reader = res.body.getReader(); const decoder = new TextDecoder(); let buffer = ''; @@ -457,45 +498,56 @@ export async function initBot(config, api, tools, skills, agents) { buffer += decoder.decode(value, { stream: true }); const lines = buffer.split('\n'); - buffer = lines.pop() || ''; // Keep incomplete line in buffer + buffer = lines.pop() || ''; for (const line of lines) { const trimmed = line.trim(); - if (!trimmed || !trimmed.startsWith('data: ')) continue; + if (!trimmed.startsWith('data: ')) continue; const data = trimmed.slice(6); if (data === '[DONE]') continue; try { const parsed = JSON.parse(data); - const choices = parsed.choices || []; - if (choices.length > 0) { - const delta = choices[0].delta || {}; - const content = delta.content || ''; - if (content) { - fullResponse += content; - onDelta(content); - } - // Tool calls in streaming - abort stream, fall back to non-streaming (handles tools) - if (delta.tool_calls) { - logger.info("Tool call in stream, falling back to non-streaming"); - reader.cancel().catch(() => {}); - return await chatWithAI(body.messages, { maxTokens: body.max_tokens }); + const choice = parsed.choices?.[0]; + if (!choice) continue; + finishReason = choice.finish_reason; + + const delta = choice.delta || {}; + // Stream text content + if (delta.content) { + fullContent += delta.content; + onDelta(delta.content); + } + // Accumulate tool calls from stream deltas + if (delta.tool_calls) { + for (const tc of delta.tool_calls) { + const idx = tc.index ?? 0; + if (!toolCallMap[idx]) toolCallMap[idx] = { id: tc.id || '', name: '', arguments: '' }; + if (tc.id) toolCallMap[idx].id = tc.id; + if (tc.function?.name) toolCallMap[idx].name += tc.function.name; + if (tc.function?.arguments) toolCallMap[idx].arguments += tc.function.arguments; } } - } catch { - // Ignore malformed JSON lines - } + } catch { /* skip malformed chunks */ } } } } catch (e) { - logger.error('SSE stream error:', e.message); - // Fallback to non-streaming - if (!fullResponse) { - return await chatWithAI(body.messages, { maxTokens: body.max_tokens }); + logger.error('SSE error:', e.message); + if (!fullContent && !Object.keys(toolCallMap).length) { + return await nonStreamChat(body); } } - return fullResponse || '✅ Done.'; + // Build tool_calls array from accumulated deltas + const toolCalls = Object.keys(toolCallMap).length > 0 + ? Object.values(toolCallMap).map(tc => ({ + id: tc.id, + type: 'function', + function: { name: tc.name, arguments: tc.arguments }, + })) + : null; + + return { content: fullContent, tool_calls: toolCalls, error: null }; } // ── Tool handlers: route API tool_calls to tool class methods ──