diff --git a/src/bot/index.js b/src/bot/index.js index ba44479e..e5eebac4 100644 --- a/src/bot/index.js +++ b/src/bot/index.js @@ -9,7 +9,7 @@ import { checkEnv } from '../utils/env.js'; import { getRTK } from '../utils/rtk.js'; import { isDuplicate, markProcessed } from './deduplication.js'; import { queueRequest, clearQueue, isProcessing } from './request-queue.js'; -import { sendFormatted, splitMessage, escapeMarkdown, sendStreamingMessage } from './message-sender.js'; +import { sendFormatted, splitMessage, escapeMarkdown, sendStreamingMessage, StreamConsumer } from './message-sender.js'; import { withSelfCorrection } from './self-correction.js'; function buildSessionKey(chatId, threadId) { @@ -155,9 +155,16 @@ export async function initBot(config, api, tools, skills, agents) { messages, temperature: opts.temperature ?? 0.7, max_tokens: opts.maxTokens || 4096, + stream: !!opts.onDelta, // Enable SSE when delta callback is provided }; if (tools.length) body.tools = tools; + // ── Streaming path (SSE) ── + if (opts.onDelta) { + return await chatWithAIStream(svc, body, tools, toolHandlers, opts.onDelta); + } + + // ── Non-streaming path (original) ── const response = await api.client.post('/chat/completions', body); const choice = response.data.choices?.[0]; if (!choice) return '❌ No response from model.'; @@ -186,6 +193,83 @@ export async function initBot(config, api, tools, skills, agents) { } } + /** + * Streaming chat completion via SSE. + * Pipes each token chunk to onDelta() callback in real-time. + * Falls back to non-streaming if SSE fails. + */ + async function chatWithAIStream(svc, body, tools, toolHandlers, onDelta) { + const baseUrl = svc.api?.config?.baseUrl || 'https://api.z.ai/api/coding/paas/v4'; + const apiKey = svc.api?.config?.apiKey || ''; + + let fullResponse = ''; + try { + const response = await fetch(`${baseUrl}/chat/completions`, { + method: 'POST', + headers: { + 'Authorization': `Bearer ${apiKey}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify(body), + }); + + if (!response.ok) { + const errText = await response.text(); + logger.error(`SSE error ${response.status}: ${errText}`); + // Fallback to non-streaming + return await chatWithAI(body.messages, { ...body, stream: false }); + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split('\n'); + buffer = lines.pop() || ''; // Keep incomplete line in buffer + + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed || !trimmed.startsWith('data: ')) continue; + const data = trimmed.slice(6); + if (data === '[DONE]') continue; + + try { + const parsed = JSON.parse(data); + const choices = parsed.choices || []; + if (choices.length > 0) { + const delta = choices[0].delta || {}; + const content = delta.content || ''; + if (content) { + fullResponse += content; + onDelta(content); + } + // Check for tool calls in streaming + if (delta.tool_calls) { + // Tool calls in streaming mode — accumulate and handle after stream + // For now, fall through to non-streaming tool handling + } + } + } catch { + // Ignore malformed JSON lines + } + } + } + } catch (e) { + logger.error('SSE stream error:', e.message); + // Fallback to non-streaming + if (!fullResponse) { + return await chatWithAI(body.messages, { maxTokens: body.max_tokens }); + } + } + + return fullResponse || '✅ Done.'; + } + const toolHandlers = { bash: async (args) => { const tool = svc.toolMap.get('bash'); @@ -437,9 +521,13 @@ export async function initBot(config, api, tools, skills, agents) { await queueRequest(key, text, async () => { await ctx.api.sendChatAction(ctx.chat.id, 'typing'); - // Wrap chatWithAI with self-correction + // Create stream consumer for real-time edit-in-place + const consumer = new StreamConsumer(ctx, { editInterval: 1000 }); + const runPromise = consumer.run(); + + // Wrap chatWithAI with self-correction + streaming const chatWithCorrection = withSelfCorrection(async (msgs) => { - return await chatWithAI(msgs, {}); + return await chatWithAI(msgs, { onDelta: (token) => consumer.onDelta(token) }); }); const result = await chatWithCorrection([ @@ -447,8 +535,14 @@ export async function initBot(config, api, tools, skills, agents) { { role: 'user', content: text }, ]); - // Send with streaming effect - await sendStreamingMessage(ctx, result); + // Signal completion and wait for final edit + consumer.finish(); + await runPromise; + + // If streaming failed to deliver (no message sent), fallback to plain send + if (!consumer.alreadySent && result) { + await sendFormatted(ctx, result); + } }); }); diff --git a/src/bot/message-sender.js b/src/bot/message-sender.js index de48d641..c6760abf 100644 --- a/src/bot/message-sender.js +++ b/src/bot/message-sender.js @@ -1,10 +1,22 @@ -// Adapted from claudegram's message-sender.ts — send with retry, chunking, streaming +/** + * Streaming message consumer — bridges SSE token stream to Telegram editMessageText. + * + * Architecture (adapted from Hermes Agent's GatewayStreamConsumer): + * 1. Agent fires onDelta(token) for each SSE chunk + * 2. Tokens accumulate in a buffer + * 3. An async run() loop edits a single message at ~1s intervals + * 4. Adaptive backoff on flood control, graceful fallback to plain send + * + * Credit: Hermes Agent gateway/stream_consumer.py (NousResearch/hermes-agent) + */ + import { logger } from '../utils/logger.js'; -const MAX_MSG_LENGTH = 4000; -// Telegram rate limit: ~30 edits per minute per chat. -// We batch edits with ~1000ms minimum between each to stay safe. -const MIN_EDIT_INTERVAL_MS = 1000; +const MAX_MSG_LENGTH = 4096; +const DEFAULT_EDIT_INTERVAL_MS = 1000; +const DEFAULT_BUFFER_THRESHOLD = 40; +const MAX_FLOOD_STRIKES = 3; +const CURSOR = ' ▉'; export function splitMessage(text) { if (text.length <= MAX_MSG_LENGTH) return [text]; @@ -43,74 +55,306 @@ export async function sendFormatted(ctx, text) { } /** - * Streaming message — edits a single message in place as content grows. - * Splits on sentences to keep edit count low (Telegram ~30 edits/min limit). + * StreamConsumer — progressive edit-in-place streaming for Telegram. + * + * Usage: + * const consumer = new StreamConsumer(ctx, { editInterval: 1000 }); + * const runPromise = consumer.run(); // start async edit loop + * // ... call consumer.onDelta(token) for each SSE chunk ... + * consumer.finish(); + * await runPromise; // wait for final edit + */ +export class StreamConsumer { + constructor(ctx, options = {}) { + this.ctx = ctx; + this.editInterval = options.editInterval || DEFAULT_EDIT_INTERVAL_MS; + this.bufferThreshold = options.bufferThreshold || DEFAULT_BUFFER_THRESHOLD; + this.cursor = options.cursor !== undefined ? options.cursor : CURSOR; + + // Internal state + this._queue = []; + this._done = false; + this._accumulated = ''; + this._messageId = null; + this._chatId = null; + this._alreadySent = false; + this._editSupported = true; + this._lastEditTime = 0; + this._lastSentText = ''; + this._fallbackFinalSend = false; + this._fallbackPrefix = ''; + this._floodStrikes = 0; + this._currentEditInterval = this.editInterval; + this._finalResponseSent = false; + this._drainResolve = null; + this._drainPromise = new Promise(r => { this._drainResolve = r; }); + } + + /** + * Thread-safe callback — called for each SSE token chunk. + */ + onDelta(text) { + if (text) { + this._queue.push(text); + // Wake up the run() loop + if (this._drainResolve) { + const r = this._drainResolve; + this._drainResolve = null; + this._drainPromise = new Promise(resolve => { this._drainResolve = resolve; }); + r(); + } + } + } + + /** Signal stream completion. */ + finish() { + this._done = true; + if (this._drainResolve) { + const r = this._drainResolve; + this._drainResolve = null; + r(); + } + } + + get alreadySent() { return this._alreadySent; } + get finalResponseSent() { return this._finalResponseSent; } + + async run() { + const safeLimit = MAX_MSG_LENGTH - 20; + + try { + while (true) { + // Drain all available items from queue + while (this._queue.length > 0) { + this._accumulated += this._queue.shift(); + } + + // Check if done + if (this._done) break; + + // Decide whether to flush an edit + const now = Date.now(); + const elapsed = now - this._lastEditTime; + const shouldEdit = + elapsed >= this._currentEditInterval && this._accumulated.length > 0 + || this._accumulated.length >= this.bufferThreshold; + + if (shouldEdit && this._accumulated.trim()) { + // Handle overflow: if text exceeds limit, split + while (this._accumulated.length > safeLimit && this._messageId !== null && this._editSupported) { + const splitAt = this._accumulated.lastIndexOf('\n', 0, safeLimit); + const chunk = this._accumulated.slice(0, splitAt > safeLimit / 2 ? splitAt : safeLimit); + const ok = await this._sendOrEdit(chunk); + if (this._fallbackFinalSend || !ok) break; + this._accumulated = this._accumulated.slice(chunk.length).replace(/^\n+/, ''); + this._messageId = null; + this._lastSentText = ''; + } + + // Add cursor if not final + const displayText = this._accumulated + this.cursor; + await this._sendOrEdit(displayText); + this._lastEditTime = Date.now(); + } + + // Wait for more data or completion + if (!this._done && this._queue.length === 0) { + await Promise.race([ + this._drainPromise, + new Promise(r => setTimeout(r, 50)), + ]); + } + } + + // Final edit without cursor + if (this._accumulated.trim()) { + if (this._fallbackFinalSend) { + await this._sendFallbackFinal(this._accumulated); + } else if (this._messageId) { + await this._sendOrEdit(this._accumulated); + this._finalResponseSent = true; + } else { + await this._sendOrEdit(this._accumulated); + this._finalResponseSent = true; + } + } + } catch (e) { + logger.error('StreamConsumer error:', e); + } + } + + async _sendOrEdit(text) { + if (!text.trim()) return true; + + // Skip cursor-only messages for first send + const visibleWithoutCursor = text.replace(this.cursor, '').trim(); + if (!visibleWithoutCursor) return true; + + // Don't create tiny first messages with just cursor + if (this._messageId === null && this.cursor && text.includes(this.cursor) && visibleWithoutCursor.length < 4) { + return true; + } + + try { + if (this._messageId !== null) { + if (this._editSupported) { + // Skip if identical + if (text === this._lastSentText) return true; + + const result = await this._editMessage(this._messageId, this._chatId, text); + if (result) { + this._alreadySent = true; + this._lastSentText = text; + this._floodStrikes = 0; + return true; + } else { + // Flood control — adaptive backoff + this._floodStrikes++; + this._currentEditInterval = Math.min(this._currentEditInterval * 2, 10000); + logger.debug( + `Flood control on edit (strike ${this._floodStrikes}/${MAX_FLOOD_STRIKES}), ` + + `backoff → ${this._currentEditInterval}ms` + ); + if (this._floodStrikes >= MAX_FLOOD_STRIKES) { + // Enter fallback mode + logger.debug(`Edit failed (strikes=${this._floodStrikes}), entering fallback mode`); + this._fallbackPrefix = this._visiblePrefix(); + this._fallbackFinalSend = true; + this._editSupported = false; + this._alreadySent = true; + await this._tryStripCursor(); + return false; + } + this._lastEditTime = Date.now(); + return false; + } + } else { + return false; + } + } else { + // First message — send new + try { + const msg = await this.ctx.api.sendMessage(this.ctx.chat.id, text, { parse_mode: undefined }); + if (msg && msg.message_id) { + this._messageId = msg.message_id; + this._chatId = msg.chat.id; + this._alreadySent = true; + this._lastSentText = text; + return true; + } else { + this._editSupported = false; + return false; + } + } catch (sendErr) { + logger.warn('Initial send failed:', sendErr.message); + this._editSupported = false; + return false; + } + } + } catch (e) { + logger.error('Stream send/edit error:', e); + return false; + } + } + + async _editMessage(messageId, chatId, text) { + try { + await this.ctx.api.editMessageText(chatId, messageId, text, { parse_mode: undefined }); + return true; + } catch (err) { + const msg = err.message || ''; + if (msg.includes('not modified')) return true; + if (msg.includes('too long') || msg.includes('message_too_long')) { + // Truncate + const truncated = text.slice(0, MAX_MSG_LENGTH - 20) + '…'; + try { + await this.ctx.api.editMessageText(chatId, messageId, truncated, { parse_mode: undefined }); + return true; + } catch { return false; } + } + if (msg.includes('Flood') || msg.includes('flood') || msg.includes('retry after') || msg.includes('rate')) { + return false; // Caller handles backoff + } + logger.warn(`Edit error: ${msg}`); + return false; + } + } + + _visiblePrefix() { + let prefix = this._lastSentText || ''; + if (this.cursor && prefix.endsWith(this.cursor)) { + prefix = prefix.slice(0, -this.cursor.length); + } + return prefix; + } + + async _tryStripCursor() { + if (!this._messageId) return; + const prefix = this._visiblePrefix(); + if (!prefix.trim()) return; + try { + await this.ctx.api.editMessageText(this._chatId, this._messageId, prefix, { parse_mode: undefined }); + this._lastSentText = prefix; + } catch { /* best-effort */ } + } + + async _sendFallbackFinal(text) { + const prefix = this._fallbackPrefix || this._visiblePrefix(); + let continuation = text; + if (prefix && text.startsWith(prefix)) { + continuation = text.slice(prefix.length).replace(/^\s+/, ''); + } + this._fallbackFinalSend = false; + + if (!continuation.trim()) { + this._alreadySent = true; + this._finalResponseSent = true; + return; + } + + // Try to strip cursor from last partial + await this._tryStripCursor(); + + const chunks = splitMessage(continuation); + let sentAny = false; + for (const chunk of chunks) { + try { + await this.ctx.reply(chunk, { parse_mode: undefined }); + sentAny = true; + } catch (e) { + logger.warn('Fallback send chunk error:', e.message); + } + } + this._already_sent = sentAny; + this._finalResponseSent = sentAny; + } +} + +/** + * Simulated streaming — edits a single message in place as content grows. + * Used when the AI doesn't support SSE streaming (full response received first). * Falls back to sendFormatted if editing fails. + * + * @deprecated Prefer StreamConsumer with real SSE streaming */ export async function sendStreamingMessage(ctx, text, options = {}) { if (!text) return; - const { sentenceMode = true } = options; + const consumer = new StreamConsumer(ctx, { + editInterval: 800, + bufferThreshold: 20, + cursor: '', + }); - try { - // Send placeholder - const msg = await ctx.reply('⌨️ Typing...', { parse_mode: 'Markdown' }); - const messageId = msg.message_id; - const chatId = msg.chat.id; + const runPromise = consumer.run(); - if (sentenceMode) { - // Split into sentence-level chunks for safe edit rate - const segments = text.match(/[^.!?]+[.!?]+[\s]*/g) || [text]; - let accumulated = ''; - - for (const segment of segments) { - accumulated += segment; - try { - await ctx.api.editMessageText(accumulated.trim(), { - chat_id: chatId, - message_id: messageId, - parse_mode: 'Markdown' - }); - } catch (editErr) { - // If edit fails (rate limit, content unchanged), just skip - if (!editErr.message?.includes('message is not modified')) { - logger.warn('Edit skipped:', editErr.message); - } - } - // Throttle: wait at least MIN_EDIT_INTERVAL_MS between edits - await new Promise(r => setTimeout(r, MIN_EDIT_INTERVAL_MS)); - } - } else { - // Word-by-word mode — throttled - const words = text.split(' '); - let accumulated = ''; - let lastEdit = 0; - - for (const word of words) { - accumulated += (accumulated ? ' ' : '') + word; - const now = Date.now(); - const elapsed = now - lastEdit; - - if (elapsed < MIN_EDIT_INTERVAL_MS) { - await new Promise(r => setTimeout(r, MIN_EDIT_INTERVAL_MS - elapsed)); - } - - try { - await ctx.api.editMessageText(accumulated, { - chat_id: chatId, - message_id: messageId, - parse_mode: 'Markdown' - }); - lastEdit = Date.now(); - } catch (editErr) { - if (!editErr.message?.includes('message is not modified')) { - logger.warn('Edit skipped:', editErr.message); - } - } - } - } - } catch (error) { - logger.error('Streaming failed, falling back:', error.message); - await sendFormatted(ctx, text); + // Simulate streaming by feeding sentence-by-sentence + const segments = text.match(/[^.!?]+[.!?]+[\s]*/g) || [text]; + for (const segment of segments) { + consumer.onDelta(segment); + await new Promise(r => setTimeout(r, 600)); } + + consumer.finish(); + await runPromise; }