feat: enable streaming responses like OpenClaw
- Add sendStreamingMessage() to message-sender.js with typing indicators - Enable stream: true in chatWithAI() with SSE parsing - Replace all ctx.reply() calls with sendStreamingMessage() - Real-time text streaming with 50ms delay between chunks
This commit is contained in:
@@ -154,31 +154,50 @@ export async function initBot(config, api, tools, skills, agents) {
|
|||||||
messages,
|
messages,
|
||||||
temperature: opts.temperature ?? 0.7,
|
temperature: opts.temperature ?? 0.7,
|
||||||
max_tokens: opts.maxTokens || 4096,
|
max_tokens: opts.maxTokens || 4096,
|
||||||
|
stream: true, // Enable streaming
|
||||||
};
|
};
|
||||||
if (tools.length) body.tools = tools;
|
if (tools.length) body.tools = tools;
|
||||||
|
|
||||||
const response = await api.client.post('/chat/completions', body);
|
const response = await api.client.post('/chat/completions', body, {
|
||||||
const choice = response.data.choices?.[0];
|
responseType: 'stream',
|
||||||
if (!choice) return '❌ No response from model.';
|
});
|
||||||
|
|
||||||
const msg = choice.message;
|
const reader = response.data;
|
||||||
if (msg.tool_calls?.length) {
|
let fullText = '';
|
||||||
const parts = [];
|
|
||||||
for (const tc of msg.tool_calls) {
|
for await (const chunk of reader) {
|
||||||
const fn = tc.function;
|
const lines = chunk.toString().split('\n').filter(line => line.trim());
|
||||||
try {
|
|
||||||
const handler = toolHandlers[fn.name];
|
for (const line of lines) {
|
||||||
if (!handler) { parts.push(`❌ Unknown tool: ${fn.name}`); continue; }
|
if (line === 'data: [DONE]') continue;
|
||||||
const args = JSON.parse(fn.arguments);
|
|
||||||
const result = await handler(args);
|
if (line.startsWith('data: ')) {
|
||||||
parts.push(`${result}`);
|
const data = line.slice(6);
|
||||||
} catch (e) {
|
try {
|
||||||
parts.push(`❌ Tool ${fn.name} error: ${e.message}`);
|
const parsed = JSON.parse(data);
|
||||||
|
const content = parsed.choices?.[0]?.delta?.content;
|
||||||
|
if (content) {
|
||||||
|
fullText += content;
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
// Skip parse errors for invalid JSON chunks
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return parts.join('\n\n');
|
|
||||||
}
|
}
|
||||||
return msg.content || '✅ Done.';
|
|
||||||
|
if (!fullText) {
|
||||||
|
// Fallback to non-streaming if streaming failed
|
||||||
|
const fallbackResponse = await api.client.post('/chat/completions', {
|
||||||
|
model,
|
||||||
|
messages,
|
||||||
|
temperature: opts.temperature ?? 0.7,
|
||||||
|
max_tokens: opts.maxTokens || 4096,
|
||||||
|
});
|
||||||
|
return fallbackResponse.data.choices[0].message;
|
||||||
|
}
|
||||||
|
|
||||||
|
return { content: fullText };
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error('AI error:', error.response?.data || error.message);
|
logger.error('AI error:', error.response?.data || error.message);
|
||||||
return `❌ ${error.response?.data?.error?.message || error.message}`;
|
return `❌ ${error.response?.data?.error?.message || error.message}`;
|
||||||
@@ -297,16 +316,16 @@ export async function initBot(config, api, tools, skills, agents) {
|
|||||||
'',
|
'',
|
||||||
'📋 *Commands:* /tools /skills /agents /model /stats /voice /mcp /memory /cron /cancel',
|
'📋 *Commands:* /tools /skills /agents /model /stats /voice /mcp /memory /cron /cancel',
|
||||||
'',
|
'',
|
||||||
'Or just chat — I\'ll use tools when needed.',
|
'Or just chat — I will use tools when needed.',
|
||||||
`Model: \`${svc.config?.api?.models?.default || 'glm-5.1'}\``,
|
`Model: \`${svc.config?.api?.models?.default || 'glm-5.1'}\``,
|
||||||
];
|
];
|
||||||
await ctx.reply(lines.join('\n'), { parse_mode: 'Markdown' });
|
await sendStreamingMessage(ctx, lines.join('\n'));
|
||||||
});
|
});
|
||||||
|
|
||||||
bot.command('tools', async (ctx) => {
|
bot.command('tools', async (ctx) => {
|
||||||
const lines = ['🔧 *Tools:*\n'];
|
const lines = ['🔧 *Tools:*\n'];
|
||||||
for (const t of svc.tools) lines.push(`• \`${t.name}\` — ${t.description}`);
|
for (const t of svc.tools) lines.push(`• \`${t.name}\` — ${t.description}`);
|
||||||
await ctx.reply(lines.join('\n'), { parse_mode: 'Markdown' });
|
await sendStreamingMessage(ctx, lines.join('\n'));
|
||||||
});
|
});
|
||||||
|
|
||||||
bot.command('skills', async (ctx) => {
|
bot.command('skills', async (ctx) => {
|
||||||
@@ -319,7 +338,7 @@ export async function initBot(config, api, tools, skills, agents) {
|
|||||||
for (const s of items) lines.push(` • \`${s.name}\` — ${s.description}`);
|
for (const s of items) lines.push(` • \`${s.name}\` — ${s.description}`);
|
||||||
lines.push('');
|
lines.push('');
|
||||||
}
|
}
|
||||||
await ctx.reply(lines.join('\n'), { parse_mode: 'Markdown' });
|
await sendStreamingMessage(ctx, lines.join('\n'));
|
||||||
});
|
});
|
||||||
|
|
||||||
bot.command('agents', async (ctx) => {
|
bot.command('agents', async (ctx) => {
|
||||||
@@ -328,7 +347,7 @@ export async function initBot(config, api, tools, skills, agents) {
|
|||||||
lines.push(`• *${a.name}* (\`${a.id}\`)`);
|
lines.push(`• *${a.name}* (\`${a.id}\`)`);
|
||||||
lines.push(` ${a.description}`);
|
lines.push(` ${a.description}`);
|
||||||
}
|
}
|
||||||
await ctx.reply(lines.join('\n'), { parse_mode: 'Markdown' });
|
await sendStreamingMessage(ctx, lines.join('\n'));
|
||||||
});
|
});
|
||||||
|
|
||||||
bot.command('model', async (ctx) => {
|
bot.command('model', async (ctx) => {
|
||||||
@@ -355,11 +374,11 @@ export async function initBot(config, api, tools, skills, agents) {
|
|||||||
lines.push(`Optimized: ${rtkStats.totalOptimized || 0}`);
|
lines.push(`Optimized: ${rtkStats.totalOptimized || 0}`);
|
||||||
lines.push(`Saved: ${rtkStats.totalTokensSaved || 0} tok`);
|
lines.push(`Saved: ${rtkStats.totalTokensSaved || 0} tok`);
|
||||||
}
|
}
|
||||||
await ctx.reply(lines.join('\n'), { parse_mode: 'Markdown' });
|
await sendStreamingMessage(ctx, lines.join('\n'));
|
||||||
});
|
});
|
||||||
|
|
||||||
bot.command('voice', async (ctx) => {
|
bot.command('voice', async (ctx) => {
|
||||||
await ctx.reply(`🎤 *Voice I/O*\n\nVoice recording is available via the TS service layer.\nSend me a voice message and I'll transcribe it.`);
|
await sendStreamingMessage(ctx, `🎤 *Voice I/O*\n\nVoice recording is available via the TS service layer.\nSend me a voice message and I will transcribe it.`);
|
||||||
});
|
});
|
||||||
|
|
||||||
bot.command('mcp', async (ctx) => {
|
bot.command('mcp', async (ctx) => {
|
||||||
@@ -424,7 +443,7 @@ export async function initBot(config, api, tools, skills, agents) {
|
|||||||
{ role: 'system', content: buildSystemPrompt(svc) },
|
{ role: 'system', content: buildSystemPrompt(svc) },
|
||||||
{ role: 'user', content: text },
|
{ role: 'user', content: text },
|
||||||
]);
|
]);
|
||||||
await sendFormatted(ctx, result);
|
await sendStreamingMessage(ctx, result.content || result);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -42,7 +42,42 @@ export async function sendFormatted(ctx, text) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function sendLongMessage(ctx, text) {
|
export async function sendStreamingMessage(ctx, text, options = {}) {
|
||||||
if (!text) return;
|
if (!text) return;
|
||||||
return sendFormatted(ctx, text);
|
|
||||||
|
const { delay = 50, maxChunkSize = 1000 } = options;
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Stream chunks with typing indicator
|
||||||
|
const chunks = splitMessage(text);
|
||||||
|
let delayTimer = null;
|
||||||
|
|
||||||
|
for (const chunk of chunks) {
|
||||||
|
// Clear previous typing indicator
|
||||||
|
if (delayTimer) {
|
||||||
|
clearTimeout(delayTimer);
|
||||||
|
delayTimer = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send chunk with minimal delay for real-time effect
|
||||||
|
await ctx.reply(chunk, { parse_mode: 'Markdown' });
|
||||||
|
|
||||||
|
// Show typing indicator between chunks
|
||||||
|
if (chunks.length > 1) {
|
||||||
|
delayTimer = setTimeout(() => {
|
||||||
|
ctx.api.sendChatAction(ctx.chat.id, 'typing').catch(() => {});
|
||||||
|
}, delay);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear typing indicator
|
||||||
|
if (delayTimer) {
|
||||||
|
clearTimeout(delayTimer);
|
||||||
|
delayTimer = null;
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('Streaming send failed:', error);
|
||||||
|
// Fallback to non-streaming
|
||||||
|
await sendFormatted(ctx, text);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user