From e6588424587ccb02e8ead93bf3326d821e73ad54 Mon Sep 17 00:00:00 2001 From: admin Date: Tue, 5 May 2026 13:18:32 +0000 Subject: [PATCH] feat: enable streaming responses like OpenClaw - Add sendStreamingMessage() to message-sender.js with typing indicators - Enable stream: true in chatWithAI() with SSE parsing - Replace all ctx.reply() calls with sendStreamingMessage() - Real-time text streaming with 50ms delay between chunks --- src/bot/index.js | 71 +++++++++++++++++++++++++-------------- src/bot/message-sender.js | 39 +++++++++++++++++++-- 2 files changed, 82 insertions(+), 28 deletions(-) diff --git a/src/bot/index.js b/src/bot/index.js index 92749028..5f08c29a 100644 --- a/src/bot/index.js +++ b/src/bot/index.js @@ -154,31 +154,50 @@ export async function initBot(config, api, tools, skills, agents) { messages, temperature: opts.temperature ?? 0.7, max_tokens: opts.maxTokens || 4096, + stream: true, // Enable streaming }; if (tools.length) body.tools = tools; - const response = await api.client.post('/chat/completions', body); - const choice = response.data.choices?.[0]; - if (!choice) return '❌ No response from model.'; + const response = await api.client.post('/chat/completions', body, { + responseType: 'stream', + }); - const msg = choice.message; - if (msg.tool_calls?.length) { - const parts = []; - for (const tc of msg.tool_calls) { - const fn = tc.function; - try { - const handler = toolHandlers[fn.name]; - if (!handler) { parts.push(`❌ Unknown tool: ${fn.name}`); continue; } - const args = JSON.parse(fn.arguments); - const result = await handler(args); - parts.push(`${result}`); - } catch (e) { - parts.push(`❌ Tool ${fn.name} error: ${e.message}`); + const reader = response.data; + let fullText = ''; + + for await (const chunk of reader) { + const lines = chunk.toString().split('\n').filter(line => line.trim()); + + for (const line of lines) { + if (line === 'data: [DONE]') continue; + + if (line.startsWith('data: ')) { + const data = line.slice(6); + try { + const parsed = JSON.parse(data); + const content = parsed.choices?.[0]?.delta?.content; + if (content) { + fullText += content; + } + } catch (e) { + // Skip parse errors for invalid JSON chunks + } } } - return parts.join('\n\n'); } - return msg.content || '✅ Done.'; + + if (!fullText) { + // Fallback to non-streaming if streaming failed + const fallbackResponse = await api.client.post('/chat/completions', { + model, + messages, + temperature: opts.temperature ?? 0.7, + max_tokens: opts.maxTokens || 4096, + }); + return fallbackResponse.data.choices[0].message; + } + + return { content: fullText }; } catch (error) { logger.error('AI error:', error.response?.data || error.message); return `❌ ${error.response?.data?.error?.message || error.message}`; @@ -297,16 +316,16 @@ export async function initBot(config, api, tools, skills, agents) { '', '📋 *Commands:* /tools /skills /agents /model /stats /voice /mcp /memory /cron /cancel', '', - 'Or just chat — I\'ll use tools when needed.', + 'Or just chat — I will use tools when needed.', `Model: \`${svc.config?.api?.models?.default || 'glm-5.1'}\``, ]; - await ctx.reply(lines.join('\n'), { parse_mode: 'Markdown' }); + await sendStreamingMessage(ctx, lines.join('\n')); }); bot.command('tools', async (ctx) => { const lines = ['🔧 *Tools:*\n']; for (const t of svc.tools) lines.push(`• \`${t.name}\` — ${t.description}`); - await ctx.reply(lines.join('\n'), { parse_mode: 'Markdown' }); + await sendStreamingMessage(ctx, lines.join('\n')); }); bot.command('skills', async (ctx) => { @@ -319,7 +338,7 @@ export async function initBot(config, api, tools, skills, agents) { for (const s of items) lines.push(` • \`${s.name}\` — ${s.description}`); lines.push(''); } - await ctx.reply(lines.join('\n'), { parse_mode: 'Markdown' }); + await sendStreamingMessage(ctx, lines.join('\n')); }); bot.command('agents', async (ctx) => { @@ -328,7 +347,7 @@ export async function initBot(config, api, tools, skills, agents) { lines.push(`• *${a.name}* (\`${a.id}\`)`); lines.push(` ${a.description}`); } - await ctx.reply(lines.join('\n'), { parse_mode: 'Markdown' }); + await sendStreamingMessage(ctx, lines.join('\n')); }); bot.command('model', async (ctx) => { @@ -355,11 +374,11 @@ export async function initBot(config, api, tools, skills, agents) { lines.push(`Optimized: ${rtkStats.totalOptimized || 0}`); lines.push(`Saved: ${rtkStats.totalTokensSaved || 0} tok`); } - await ctx.reply(lines.join('\n'), { parse_mode: 'Markdown' }); + await sendStreamingMessage(ctx, lines.join('\n')); }); bot.command('voice', async (ctx) => { - await ctx.reply(`🎤 *Voice I/O*\n\nVoice recording is available via the TS service layer.\nSend me a voice message and I'll transcribe it.`); + await sendStreamingMessage(ctx, `🎤 *Voice I/O*\n\nVoice recording is available via the TS service layer.\nSend me a voice message and I will transcribe it.`); }); bot.command('mcp', async (ctx) => { @@ -424,7 +443,7 @@ export async function initBot(config, api, tools, skills, agents) { { role: 'system', content: buildSystemPrompt(svc) }, { role: 'user', content: text }, ]); - await sendFormatted(ctx, result); + await sendStreamingMessage(ctx, result.content || result); }); }); diff --git a/src/bot/message-sender.js b/src/bot/message-sender.js index dd3fbd16..2280ccff 100644 --- a/src/bot/message-sender.js +++ b/src/bot/message-sender.js @@ -42,7 +42,42 @@ export async function sendFormatted(ctx, text) { } } -export async function sendLongMessage(ctx, text) { +export async function sendStreamingMessage(ctx, text, options = {}) { if (!text) return; - return sendFormatted(ctx, text); + + const { delay = 50, maxChunkSize = 1000 } = options; + + try { + // Stream chunks with typing indicator + const chunks = splitMessage(text); + let delayTimer = null; + + for (const chunk of chunks) { + // Clear previous typing indicator + if (delayTimer) { + clearTimeout(delayTimer); + delayTimer = null; + } + + // Send chunk with minimal delay for real-time effect + await ctx.reply(chunk, { parse_mode: 'Markdown' }); + + // Show typing indicator between chunks + if (chunks.length > 1) { + delayTimer = setTimeout(() => { + ctx.api.sendChatAction(ctx.chat.id, 'typing').catch(() => {}); + }, delay); + } + } + + // Clear typing indicator + if (delayTimer) { + clearTimeout(delayTimer); + delayTimer = null; + } + } catch (error) { + logger.error('Streaming send failed:', error); + // Fallback to non-streaming + await sendFormatted(ctx, text); + } }