/** * Streaming message consumer — bridges SSE token stream to Telegram editMessageText. * * Architecture (adapted from Hermes Agent's GatewayStreamConsumer): * 1. Agent fires onDelta(token) for each SSE chunk * 2. Tokens accumulate in a buffer * 3. An async run() loop edits a single message at ~1s intervals * 4. Adaptive backoff on flood control, graceful fallback to plain send * 5. Final message delivered with HTML formatting * * Credit: Hermes Agent gateway/stream_consumer.py (NousResearch/hermes-agent) */ import { logger } from '../utils/logger.js'; const MAX_MSG_LENGTH = 4096; const DEFAULT_EDIT_INTERVAL_MS = 1000; const DEFAULT_BUFFER_THRESHOLD = 40; const MAX_FLOOD_STRIKES = 3; const CURSOR = ' ▉'; // ─────────────────────────────────────────── // Markdown → Telegram HTML converter // ─────────────────────────────────────────── /** * Convert common Markdown to Telegram-compatible HTML. * Handles: **bold**, *italic*, `code`, ```blocks```, [links](url), ~~strike~~, headings, lists. * Code content is properly escaped; surrounding text is escaped before tag insertion. */ export function markdownToHtml(text) { if (!text) return ''; // 1. Extract fenced code blocks → protect from escaping const codeBlocks = []; text = text.replace(/```(\w*)\n?([\s\S]*?)```/g, (_, lang, code) => { const idx = codeBlocks.length; const escaped = code .replace(/&/g, '&') .replace(//g, '>'); codeBlocks.push(`
${escaped}
`); return `\x00CB${idx}\x00`; }); // 2. Extract inline code → protect from escaping const inlineCodes = []; text = text.replace(/`([^`\n]+)`/g, (_, code) => { const idx = inlineCodes.length; const escaped = code .replace(/&/g, '&') .replace(//g, '>'); inlineCodes.push(`${escaped}`); return `\x00IC${idx}\x00`; }); // 3. Escape HTML entities in remaining text text = text .replace(/&/g, '&') .replace(//g, '>'); // 4. Convert Markdown patterns → HTML tags text = text .replace(/\*\*([\s\S]+?)\*\*/g, '$1') // **bold** (multiline) .replace(/(?$1') // *italic* (not inside **) .replace(/~~(.+?)~~/g, '$1') // ~~strike~~ .replace(/\[(.+?)\]\((.+?)\)/g, '$1') // [link](url) .replace(/^####\s+(.+)$/gm, '$1') // h4 .replace(/^###\s+(.+)$/gm, '$1') // h3 .replace(/^##\s+(.+)$/gm, '$1') // h2 .replace(/^#\s+(.+)$/gm, '$1') // h1 .replace(/^>\s+(.+)$/gm, '
$1
') // > quote .replace(/^[-*]\s+/gm, '• '); // - or * list → bullet // 5. Restore protected code blocks and inline code for (let i = 0; i < codeBlocks.length; i++) { text = text.replace(`\x00CB${i}\x00`, codeBlocks[i]); } for (let i = 0; i < inlineCodes.length; i++) { text = text.replace(`\x00IC${i}\x00`, inlineCodes[i]); } return text; } /** * Sanitize text for plain-text Telegram messages (no parse_mode). * Strips markdown formatting symbols so they don't show as raw text. */ export function stripMarkdown(text) { if (!text) return ''; return text .replace(/```[\s\S]*?```/g, (m) => m.replace(/```\w*\n?/g, '┌──\n').replace(/```/g, '\n└──')) .replace(/\*\*(.+?)\*\*/g, '$1') .replace(/\*(.+?)\*/g, '$1') .replace(/~~(.+?)~~/g, '$1') .replace(/`([^`\n]+)`/g, '「$1」') .replace(/\[(.+?)\]\((.+?)\)/g, '$1 ($2)') .replace(/^#{1,4}\s+/gm, '') .replace(/^[-*]\s+/gm, '• '); } export function splitMessage(text) { if (text.length <= MAX_MSG_LENGTH) return [text]; const chunks = []; let remaining = text; while (remaining.length > 0) { chunks.push(remaining.slice(0, MAX_MSG_LENGTH)); remaining = remaining.slice(MAX_MSG_LENGTH); } return chunks; } export function escapeMarkdown(text) { if (!text) return ''; return text .replace(/_/g, '\\_') .replace(/\*/g, '\\*') .replace(/\[/g, '\\[') .replace(/`/g, '\\`'); } export async function sendFormatted(ctx, text) { if (!text) return; const html = markdownToHtml(text); try { const chunks = splitMessage(html); for (const chunk of chunks) { await ctx.reply(chunk, { parse_mode: 'HTML' }); } } catch { logger.warn('HTML send failed, falling back to stripped plain text'); const plain = stripMarkdown(text); const chunks = splitMessage(plain); for (const chunk of chunks) { await ctx.reply(chunk, { parse_mode: undefined }); } } } /** * StreamConsumer — progressive edit-in-place streaming for Telegram. * * - Intermediate edits: plain text (no formatting — partial HTML would break) * - Final message: converted to Telegram HTML with full formatting * * Usage: * const consumer = new StreamConsumer(ctx, { editInterval: 1000 }); * const runPromise = consumer.run(); // start async edit loop * // ... call consumer.onDelta(token) for each SSE chunk ... * consumer.finish(); * await runPromise; // wait for final edit (HTML formatted) */ export class StreamConsumer { constructor(ctx, options = {}) { this.ctx = ctx; this.editInterval = options.editInterval || DEFAULT_EDIT_INTERVAL_MS; this.bufferThreshold = options.bufferThreshold || DEFAULT_BUFFER_THRESHOLD; this.cursor = options.cursor !== undefined ? options.cursor : CURSOR; // Internal state this._queue = []; this._done = false; this._accumulated = ''; this._messageId = null; this._chatId = null; this._alreadySent = false; this._editSupported = true; this._lastEditTime = 0; this._lastSentText = ''; this._fallbackFinalSend = false; this._fallbackPrefix = ''; this._floodStrikes = 0; this._currentEditInterval = this.editInterval; this._finalResponseSent = false; this._drainResolve = null; this._drainPromise = new Promise(r => { this._drainResolve = r; }); } /** * Thread-safe callback — called for each SSE token chunk. */ onDelta(text) { if (text) { this._queue.push(text); // Wake up the run() loop if (this._drainResolve) { const r = this._drainResolve; this._drainResolve = null; this._drainPromise = new Promise(resolve => { this._drainResolve = resolve; }); r(); } } } /** Signal stream completion. */ finish() { this._done = true; if (this._drainResolve) { const r = this._drainResolve; this._drainResolve = null; r(); } } get alreadySent() { return this._alreadySent; } get finalResponseSent() { return this._finalResponseSent; } async run() { const safeLimit = MAX_MSG_LENGTH - 20; try { while (true) { // Drain all available items from queue while (this._queue.length > 0) { this._accumulated += this._queue.shift(); } // Check if done if (this._done) break; // Decide whether to flush an edit const now = Date.now(); const elapsed = now - this._lastEditTime; const shouldEdit = elapsed >= this._currentEditInterval && this._accumulated.length > 0 || this._accumulated.length >= this.bufferThreshold; if (shouldEdit && this._accumulated.trim()) { // Handle overflow: if text exceeds limit, split while (this._accumulated.length > safeLimit && this._messageId !== null && this._editSupported) { const splitAt = this._accumulated.lastIndexOf('\n', 0, safeLimit); const chunk = this._accumulated.slice(0, splitAt > safeLimit / 2 ? splitAt : safeLimit); const ok = await this._sendOrEdit(chunk); if (this._fallbackFinalSend || !ok) break; this._accumulated = this._accumulated.slice(chunk.length).replace(/^\n+/, ''); this._messageId = null; this._lastSentText = ''; } // Intermediate edits: plain text + cursor (no parse_mode) const displayText = this._accumulated + this.cursor; await this._sendOrEdit(displayText); this._lastEditTime = Date.now(); } // Wait for more data or completion if (!this._done && this._queue.length === 0) { await Promise.race([ this._drainPromise, new Promise(r => setTimeout(r, 50)), ]); } } // ═══════════════════════════════════════ // FINAL EDIT — with HTML formatting // ═══════════════════════════════════════ if (this._accumulated.trim()) { if (this._fallbackFinalSend) { await this._sendFallbackFinal(this._accumulated); } else if (this._messageId) { // Edit the existing message with formatted HTML await this._sendFinalFormatted(this._accumulated); } else { // No message sent yet — send new with formatting await this._sendFinalFormatted(this._accumulated); } } } catch (e) { logger.error('StreamConsumer error:', e); } } /** * Send the final message with HTML formatting. * Falls back to stripped plain text if HTML parse fails. */ async _sendFinalFormatted(text) { // Delete streaming draft (plain text with raw ** markers) // Editing with parse_mode switch (none→HTML) is unreliable — always delete+resend if (this._messageId) { try { await this.ctx.api.deleteMessage(this._chatId, this._messageId); logger.info(`Deleted streaming draft msg ${this._messageId}`); } catch { // already gone } this._messageId = null; } // Send fresh formatted HTML try { await sendFormatted(this.ctx, text); this._alreadySent = true; this._finalResponseSent = true; logger.info(`Sent formatted final (${text.length} chars)`); } catch (e) { logger.error(`Formatted send failed: ${e.message}`); } } async _sendOrEdit(text) { if (!text.trim()) return true; // Skip cursor-only messages for first send const visibleWithoutCursor = text.replace(this.cursor, '').trim(); if (!visibleWithoutCursor) return true; // Don't create tiny first messages with just cursor if (this._messageId === null && this.cursor && text.includes(this.cursor) && visibleWithoutCursor.length < 4) { return true; } try { if (this._messageId !== null) { if (this._editSupported) { // Skip if identical if (text === this._lastSentText) return true; const result = await this._editMessage(this._messageId, this._chatId, text); if (result) { this._alreadySent = true; this._lastSentText = text; this._floodStrikes = 0; return true; } else { // Flood control — adaptive backoff this._floodStrikes++; this._currentEditInterval = Math.min(this._currentEditInterval * 2, 10000); logger.debug( `Flood control on edit (strike ${this._floodStrikes}/${MAX_FLOOD_STRIKES}), ` + `backoff → ${this._currentEditInterval}ms` ); if (this._floodStrikes >= MAX_FLOOD_STRIKES) { // Enter fallback mode logger.debug(`Edit failed (strikes=${this._floodStrikes}), entering fallback mode`); this._fallbackPrefix = this._visiblePrefix(); this._fallbackFinalSend = true; this._editSupported = false; this._alreadySent = true; await this._tryStripCursor(); return false; } this._lastEditTime = Date.now(); return false; } } else { return false; } } else { // First message — send as plain text (no formatting during streaming) try { const msg = await this.ctx.api.sendMessage(this.ctx.chat.id, text, { parse_mode: undefined }); if (msg && msg.message_id) { this._messageId = msg.message_id; this._chatId = msg.chat.id; this._alreadySent = true; this._lastSentText = text; return true; } else { this._editSupported = false; return false; } } catch (sendErr) { logger.warn('Initial send failed:', sendErr.message); this._editSupported = false; return false; } } } catch (e) { logger.error('Stream send/edit error:', e); return false; } } async _editMessage(messageId, chatId, text) { try { await this.ctx.api.editMessageText(chatId, messageId, text, { parse_mode: undefined }); return true; } catch (err) { const msg = err.message || ''; if (msg.includes('not modified')) return true; if (msg.includes('too long') || msg.includes('message_too_long')) { // Truncate const truncated = text.slice(0, MAX_MSG_LENGTH - 20) + '…'; try { await this.ctx.api.editMessageText(chatId, messageId, truncated, { parse_mode: undefined }); return true; } catch { return false; } } if (msg.includes('Flood') || msg.includes('flood') || msg.includes('retry after') || msg.includes('rate')) { return false; // Caller handles backoff } logger.warn(`Edit error: ${msg}`); return false; } } _visiblePrefix() { let prefix = this._lastSentText || ''; if (this.cursor && prefix.endsWith(this.cursor)) { prefix = prefix.slice(0, -this.cursor.length); } return prefix; } async _tryStripCursor() { if (!this._messageId) return; const prefix = this._visiblePrefix(); if (!prefix.trim()) return; try { await this.ctx.api.editMessageText(this._chatId, this._messageId, prefix, { parse_mode: undefined }); this._lastSentText = prefix; } catch { /* best-effort */ } } async _sendFallbackFinal(text) { const prefix = this._fallbackPrefix || this._visiblePrefix(); let continuation = text; if (prefix && text.startsWith(prefix)) { continuation = text.slice(prefix.length).replace(/^\s+/, ''); } this._fallbackFinalSend = false; if (!continuation.trim()) { this._alreadySent = true; this._finalResponseSent = true; return; } // Try to strip cursor from last partial await this._tryStripCursor(); // Send remaining content with HTML formatting const html = markdownToHtml(continuation); const chunks = splitMessage(html); let sentAny = false; for (const chunk of chunks) { try { await this.ctx.reply(chunk, { parse_mode: 'HTML' }); sentAny = true; } catch { // Fallback to plain try { await this.ctx.reply(stripMarkdown(chunk), { parse_mode: undefined }); sentAny = true; } catch (e) { logger.warn('Fallback send chunk error:', e.message); } } } this._alreadySent = sentAny; this._finalResponseSent = sentAny; } } /** * Simulated streaming — edits a single message in place as content grows. * Used by command handlers (/start, /tools, etc.) for visual flair. * Falls back to sendFormatted if editing fails. * * @param {object} ctx - grammy context * @param {string} text - Full text to "stream" * @param {object} [options] - { editInterval, cursor } */ export async function sendStreamingMessage(ctx, text, options = {}) { if (!text) { logger.warn('sendStreamingMessage called with empty text'); return; } const html = markdownToHtml(text); logger.info(`📤 Sending message (${text.length} chars, HTML ${html.length} chars)`); // Try sending as formatted HTML first — most reliable try { const msg = await ctx.reply(html, { parse_mode: 'HTML' }); logger.info(`✅ Message sent (msg_id: ${msg.message_id})`); return; } catch (e) { logger.warn(`sendStreamingMessage HTML failed (${e.message}), trying plain`); } // Fallback: stripped plain text try { const msg = await ctx.reply(stripMarkdown(text), { parse_mode: undefined }); logger.info(`✅ Plain message sent (msg_id: ${msg.message_id})`); } catch (e2) { logger.error(`sendStreamingMessage plain also failed: ${e2.message}`); } }