When streaming produces a long essay that exceeds Telegram's edit limit, the final HTML edit silently fails, leaving the user with raw ** markers. Now: delete the draft message and send fresh formatted message(s) via sendFormatted which handles HTML conversion, splitting, and fallback.
514 lines
17 KiB
JavaScript
514 lines
17 KiB
JavaScript
/**
|
|
* Streaming message consumer — bridges SSE token stream to Telegram editMessageText.
|
|
*
|
|
* Architecture (adapted from Hermes Agent's GatewayStreamConsumer):
|
|
* 1. Agent fires onDelta(token) for each SSE chunk
|
|
* 2. Tokens accumulate in a buffer
|
|
* 3. An async run() loop edits a single message at ~1s intervals
|
|
* 4. Adaptive backoff on flood control, graceful fallback to plain send
|
|
* 5. Final message delivered with HTML formatting
|
|
*
|
|
* Credit: Hermes Agent gateway/stream_consumer.py (NousResearch/hermes-agent)
|
|
*/
|
|
|
|
import { logger } from '../utils/logger.js';
|
|
|
|
const MAX_MSG_LENGTH = 4096;
|
|
const DEFAULT_EDIT_INTERVAL_MS = 1000;
|
|
const DEFAULT_BUFFER_THRESHOLD = 40;
|
|
const MAX_FLOOD_STRIKES = 3;
|
|
const CURSOR = ' ▉';
|
|
|
|
// ───────────────────────────────────────────
|
|
// Markdown → Telegram HTML converter
|
|
// ───────────────────────────────────────────
|
|
|
|
/**
|
|
* Convert common Markdown to Telegram-compatible HTML.
|
|
* Handles: **bold**, *italic*, `code`, ```blocks```, [links](url), ~~strike~~, headings, lists.
|
|
* Code content is properly escaped; surrounding text is escaped before tag insertion.
|
|
*/
|
|
export function markdownToHtml(text) {
|
|
if (!text) return '';
|
|
|
|
// 1. Extract fenced code blocks → protect from escaping
|
|
const codeBlocks = [];
|
|
text = text.replace(/```(\w*)\n?([\s\S]*?)```/g, (_, lang, code) => {
|
|
const idx = codeBlocks.length;
|
|
const escaped = code
|
|
.replace(/&/g, '&')
|
|
.replace(/</g, '<')
|
|
.replace(/>/g, '>');
|
|
codeBlocks.push(`<pre><code>${escaped}</code></pre>`);
|
|
return `\x00CB${idx}\x00`;
|
|
});
|
|
|
|
// 2. Extract inline code → protect from escaping
|
|
const inlineCodes = [];
|
|
text = text.replace(/`([^`\n]+)`/g, (_, code) => {
|
|
const idx = inlineCodes.length;
|
|
const escaped = code
|
|
.replace(/&/g, '&')
|
|
.replace(/</g, '<')
|
|
.replace(/>/g, '>');
|
|
inlineCodes.push(`<code>${escaped}</code>`);
|
|
return `\x00IC${idx}\x00`;
|
|
});
|
|
|
|
// 3. Escape HTML entities in remaining text
|
|
text = text
|
|
.replace(/&/g, '&')
|
|
.replace(/</g, '<')
|
|
.replace(/>/g, '>');
|
|
|
|
// 4. Convert Markdown patterns → HTML tags
|
|
text = text
|
|
.replace(/\*\*(.+?)\*\*/g, '<b>$1</b>') // **bold**
|
|
.replace(/(?<!\*)\*(?!\*)(.+?)(?<!\*)\*(?!\*)/g, '<i>$1</i>') // *italic* (not inside **)
|
|
.replace(/~~(.+?)~~/g, '<s>$1</s>') // ~~strike~~
|
|
.replace(/\[(.+?)\]\((.+?)\)/g, '<a href="$2">$1</a>') // [link](url)
|
|
.replace(/^####\s+(.+)$/gm, '<b>$1</b>') // h4
|
|
.replace(/^###\s+(.+)$/gm, '<b>$1</b>') // h3
|
|
.replace(/^##\s+(.+)$/gm, '<b>$1</b>') // h2
|
|
.replace(/^#\s+(.+)$/gm, '<b>$1</b>') // h1
|
|
.replace(/^>\s+(.+)$/gm, '<blockquote>$1</blockquote>') // > quote
|
|
.replace(/^[-*]\s+/gm, '• '); // - or * list → bullet
|
|
|
|
// 5. Restore protected code blocks and inline code
|
|
for (let i = 0; i < codeBlocks.length; i++) {
|
|
text = text.replace(`\x00CB${i}\x00`, codeBlocks[i]);
|
|
}
|
|
for (let i = 0; i < inlineCodes.length; i++) {
|
|
text = text.replace(`\x00IC${i}\x00`, inlineCodes[i]);
|
|
}
|
|
|
|
return text;
|
|
}
|
|
|
|
/**
|
|
* Sanitize text for plain-text Telegram messages (no parse_mode).
|
|
* Strips markdown formatting symbols so they don't show as raw text.
|
|
*/
|
|
export function stripMarkdown(text) {
|
|
if (!text) return '';
|
|
return text
|
|
.replace(/```[\s\S]*?```/g, (m) => m.replace(/```\w*\n?/g, '┌──\n').replace(/```/g, '\n└──'))
|
|
.replace(/\*\*(.+?)\*\*/g, '$1')
|
|
.replace(/\*(.+?)\*/g, '$1')
|
|
.replace(/~~(.+?)~~/g, '$1')
|
|
.replace(/`([^`\n]+)`/g, '「$1」')
|
|
.replace(/\[(.+?)\]\((.+?)\)/g, '$1 ($2)')
|
|
.replace(/^#{1,4}\s+/gm, '')
|
|
.replace(/^[-*]\s+/gm, '• ');
|
|
}
|
|
|
|
export function splitMessage(text) {
|
|
if (text.length <= MAX_MSG_LENGTH) return [text];
|
|
const chunks = [];
|
|
let remaining = text;
|
|
while (remaining.length > 0) {
|
|
chunks.push(remaining.slice(0, MAX_MSG_LENGTH));
|
|
remaining = remaining.slice(MAX_MSG_LENGTH);
|
|
}
|
|
return chunks;
|
|
}
|
|
|
|
export function escapeMarkdown(text) {
|
|
if (!text) return '';
|
|
return text
|
|
.replace(/_/g, '\\_')
|
|
.replace(/\*/g, '\\*')
|
|
.replace(/\[/g, '\\[')
|
|
.replace(/`/g, '\\`');
|
|
}
|
|
|
|
export async function sendFormatted(ctx, text) {
|
|
if (!text) return;
|
|
const html = markdownToHtml(text);
|
|
try {
|
|
const chunks = splitMessage(html);
|
|
for (const chunk of chunks) {
|
|
await ctx.reply(chunk, { parse_mode: 'HTML' });
|
|
}
|
|
} catch {
|
|
logger.warn('HTML send failed, falling back to stripped plain text');
|
|
const plain = stripMarkdown(text);
|
|
const chunks = splitMessage(plain);
|
|
for (const chunk of chunks) {
|
|
await ctx.reply(chunk, { parse_mode: undefined });
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* StreamConsumer — progressive edit-in-place streaming for Telegram.
|
|
*
|
|
* - Intermediate edits: plain text (no formatting — partial HTML would break)
|
|
* - Final message: converted to Telegram HTML with full formatting
|
|
*
|
|
* Usage:
|
|
* const consumer = new StreamConsumer(ctx, { editInterval: 1000 });
|
|
* const runPromise = consumer.run(); // start async edit loop
|
|
* // ... call consumer.onDelta(token) for each SSE chunk ...
|
|
* consumer.finish();
|
|
* await runPromise; // wait for final edit (HTML formatted)
|
|
*/
|
|
export class StreamConsumer {
|
|
constructor(ctx, options = {}) {
|
|
this.ctx = ctx;
|
|
this.editInterval = options.editInterval || DEFAULT_EDIT_INTERVAL_MS;
|
|
this.bufferThreshold = options.bufferThreshold || DEFAULT_BUFFER_THRESHOLD;
|
|
this.cursor = options.cursor !== undefined ? options.cursor : CURSOR;
|
|
|
|
// Internal state
|
|
this._queue = [];
|
|
this._done = false;
|
|
this._accumulated = '';
|
|
this._messageId = null;
|
|
this._chatId = null;
|
|
this._alreadySent = false;
|
|
this._editSupported = true;
|
|
this._lastEditTime = 0;
|
|
this._lastSentText = '';
|
|
this._fallbackFinalSend = false;
|
|
this._fallbackPrefix = '';
|
|
this._floodStrikes = 0;
|
|
this._currentEditInterval = this.editInterval;
|
|
this._finalResponseSent = false;
|
|
this._drainResolve = null;
|
|
this._drainPromise = new Promise(r => { this._drainResolve = r; });
|
|
}
|
|
|
|
/**
|
|
* Thread-safe callback — called for each SSE token chunk.
|
|
*/
|
|
onDelta(text) {
|
|
if (text) {
|
|
this._queue.push(text);
|
|
// Wake up the run() loop
|
|
if (this._drainResolve) {
|
|
const r = this._drainResolve;
|
|
this._drainResolve = null;
|
|
this._drainPromise = new Promise(resolve => { this._drainResolve = resolve; });
|
|
r();
|
|
}
|
|
}
|
|
}
|
|
|
|
/** Signal stream completion. */
|
|
finish() {
|
|
this._done = true;
|
|
if (this._drainResolve) {
|
|
const r = this._drainResolve;
|
|
this._drainResolve = null;
|
|
r();
|
|
}
|
|
}
|
|
|
|
get alreadySent() { return this._alreadySent; }
|
|
get finalResponseSent() { return this._finalResponseSent; }
|
|
|
|
async run() {
|
|
const safeLimit = MAX_MSG_LENGTH - 20;
|
|
|
|
try {
|
|
while (true) {
|
|
// Drain all available items from queue
|
|
while (this._queue.length > 0) {
|
|
this._accumulated += this._queue.shift();
|
|
}
|
|
|
|
// Check if done
|
|
if (this._done) break;
|
|
|
|
// Decide whether to flush an edit
|
|
const now = Date.now();
|
|
const elapsed = now - this._lastEditTime;
|
|
const shouldEdit =
|
|
elapsed >= this._currentEditInterval && this._accumulated.length > 0
|
|
|| this._accumulated.length >= this.bufferThreshold;
|
|
|
|
if (shouldEdit && this._accumulated.trim()) {
|
|
// Handle overflow: if text exceeds limit, split
|
|
while (this._accumulated.length > safeLimit && this._messageId !== null && this._editSupported) {
|
|
const splitAt = this._accumulated.lastIndexOf('\n', 0, safeLimit);
|
|
const chunk = this._accumulated.slice(0, splitAt > safeLimit / 2 ? splitAt : safeLimit);
|
|
const ok = await this._sendOrEdit(chunk);
|
|
if (this._fallbackFinalSend || !ok) break;
|
|
this._accumulated = this._accumulated.slice(chunk.length).replace(/^\n+/, '');
|
|
this._messageId = null;
|
|
this._lastSentText = '';
|
|
}
|
|
|
|
// Intermediate edits: plain text + cursor (no parse_mode)
|
|
const displayText = this._accumulated + this.cursor;
|
|
await this._sendOrEdit(displayText);
|
|
this._lastEditTime = Date.now();
|
|
}
|
|
|
|
// Wait for more data or completion
|
|
if (!this._done && this._queue.length === 0) {
|
|
await Promise.race([
|
|
this._drainPromise,
|
|
new Promise(r => setTimeout(r, 50)),
|
|
]);
|
|
}
|
|
}
|
|
|
|
// ═══════════════════════════════════════
|
|
// FINAL EDIT — with HTML formatting
|
|
// ═══════════════════════════════════════
|
|
if (this._accumulated.trim()) {
|
|
if (this._fallbackFinalSend) {
|
|
await this._sendFallbackFinal(this._accumulated);
|
|
} else if (this._messageId) {
|
|
// Edit the existing message with formatted HTML
|
|
await this._sendFinalFormatted(this._accumulated);
|
|
} else {
|
|
// No message sent yet — send new with formatting
|
|
await this._sendFinalFormatted(this._accumulated);
|
|
}
|
|
}
|
|
} catch (e) {
|
|
logger.error('StreamConsumer error:', e);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Send the final message with HTML formatting.
|
|
* Falls back to stripped plain text if HTML parse fails.
|
|
*/
|
|
async _sendFinalFormatted(text) {
|
|
const html = markdownToHtml(text);
|
|
|
|
// Try HTML edit first (works for short messages)
|
|
try {
|
|
if (this._messageId) {
|
|
await this.ctx.api.editMessageText(this._chatId, this._messageId, html, { parse_mode: 'HTML' });
|
|
} else {
|
|
const msg = await this.ctx.api.sendMessage(this.ctx.chat.id, html, { parse_mode: 'HTML' });
|
|
if (msg?.message_id) {
|
|
this._messageId = msg.message_id;
|
|
this._chatId = msg.chat.id;
|
|
}
|
|
}
|
|
this._alreadySent = true;
|
|
this._finalResponseSent = true;
|
|
this._lastSentText = html;
|
|
return;
|
|
} catch (e) {
|
|
logger.warn(`Final HTML edit failed (${e.message}), deleting draft and resending`);
|
|
}
|
|
|
|
// Delete the old streaming draft (has raw ** markers)
|
|
if (this._messageId) {
|
|
try {
|
|
await this.ctx.api.deleteMessage(this._chatId, this._messageId);
|
|
this._messageId = null;
|
|
} catch {
|
|
// Ignore — may already be gone
|
|
}
|
|
}
|
|
|
|
// Send fresh formatted message(s) via sendFormatted (handles splitting + fallback)
|
|
try {
|
|
await sendFormatted(this.ctx, text);
|
|
this._alreadySent = true;
|
|
this._finalResponseSent = true;
|
|
logger.info(`✅ Sent formatted replacement (${text.length} chars)`);
|
|
} catch (e2) {
|
|
logger.error('Formatted replacement also failed:', e2.message);
|
|
}
|
|
}
|
|
|
|
async _sendOrEdit(text) {
|
|
if (!text.trim()) return true;
|
|
|
|
// Skip cursor-only messages for first send
|
|
const visibleWithoutCursor = text.replace(this.cursor, '').trim();
|
|
if (!visibleWithoutCursor) return true;
|
|
|
|
// Don't create tiny first messages with just cursor
|
|
if (this._messageId === null && this.cursor && text.includes(this.cursor) && visibleWithoutCursor.length < 4) {
|
|
return true;
|
|
}
|
|
|
|
try {
|
|
if (this._messageId !== null) {
|
|
if (this._editSupported) {
|
|
// Skip if identical
|
|
if (text === this._lastSentText) return true;
|
|
|
|
const result = await this._editMessage(this._messageId, this._chatId, text);
|
|
if (result) {
|
|
this._alreadySent = true;
|
|
this._lastSentText = text;
|
|
this._floodStrikes = 0;
|
|
return true;
|
|
} else {
|
|
// Flood control — adaptive backoff
|
|
this._floodStrikes++;
|
|
this._currentEditInterval = Math.min(this._currentEditInterval * 2, 10000);
|
|
logger.debug(
|
|
`Flood control on edit (strike ${this._floodStrikes}/${MAX_FLOOD_STRIKES}), ` +
|
|
`backoff → ${this._currentEditInterval}ms`
|
|
);
|
|
if (this._floodStrikes >= MAX_FLOOD_STRIKES) {
|
|
// Enter fallback mode
|
|
logger.debug(`Edit failed (strikes=${this._floodStrikes}), entering fallback mode`);
|
|
this._fallbackPrefix = this._visiblePrefix();
|
|
this._fallbackFinalSend = true;
|
|
this._editSupported = false;
|
|
this._alreadySent = true;
|
|
await this._tryStripCursor();
|
|
return false;
|
|
}
|
|
this._lastEditTime = Date.now();
|
|
return false;
|
|
}
|
|
} else {
|
|
return false;
|
|
}
|
|
} else {
|
|
// First message — send as plain text (no formatting during streaming)
|
|
try {
|
|
const msg = await this.ctx.api.sendMessage(this.ctx.chat.id, text, { parse_mode: undefined });
|
|
if (msg && msg.message_id) {
|
|
this._messageId = msg.message_id;
|
|
this._chatId = msg.chat.id;
|
|
this._alreadySent = true;
|
|
this._lastSentText = text;
|
|
return true;
|
|
} else {
|
|
this._editSupported = false;
|
|
return false;
|
|
}
|
|
} catch (sendErr) {
|
|
logger.warn('Initial send failed:', sendErr.message);
|
|
this._editSupported = false;
|
|
return false;
|
|
}
|
|
}
|
|
} catch (e) {
|
|
logger.error('Stream send/edit error:', e);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
async _editMessage(messageId, chatId, text) {
|
|
try {
|
|
await this.ctx.api.editMessageText(chatId, messageId, text, { parse_mode: undefined });
|
|
return true;
|
|
} catch (err) {
|
|
const msg = err.message || '';
|
|
if (msg.includes('not modified')) return true;
|
|
if (msg.includes('too long') || msg.includes('message_too_long')) {
|
|
// Truncate
|
|
const truncated = text.slice(0, MAX_MSG_LENGTH - 20) + '…';
|
|
try {
|
|
await this.ctx.api.editMessageText(chatId, messageId, truncated, { parse_mode: undefined });
|
|
return true;
|
|
} catch { return false; }
|
|
}
|
|
if (msg.includes('Flood') || msg.includes('flood') || msg.includes('retry after') || msg.includes('rate')) {
|
|
return false; // Caller handles backoff
|
|
}
|
|
logger.warn(`Edit error: ${msg}`);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
_visiblePrefix() {
|
|
let prefix = this._lastSentText || '';
|
|
if (this.cursor && prefix.endsWith(this.cursor)) {
|
|
prefix = prefix.slice(0, -this.cursor.length);
|
|
}
|
|
return prefix;
|
|
}
|
|
|
|
async _tryStripCursor() {
|
|
if (!this._messageId) return;
|
|
const prefix = this._visiblePrefix();
|
|
if (!prefix.trim()) return;
|
|
try {
|
|
await this.ctx.api.editMessageText(this._chatId, this._messageId, prefix, { parse_mode: undefined });
|
|
this._lastSentText = prefix;
|
|
} catch { /* best-effort */ }
|
|
}
|
|
|
|
async _sendFallbackFinal(text) {
|
|
const prefix = this._fallbackPrefix || this._visiblePrefix();
|
|
let continuation = text;
|
|
if (prefix && text.startsWith(prefix)) {
|
|
continuation = text.slice(prefix.length).replace(/^\s+/, '');
|
|
}
|
|
this._fallbackFinalSend = false;
|
|
|
|
if (!continuation.trim()) {
|
|
this._alreadySent = true;
|
|
this._finalResponseSent = true;
|
|
return;
|
|
}
|
|
|
|
// Try to strip cursor from last partial
|
|
await this._tryStripCursor();
|
|
|
|
// Send remaining content with HTML formatting
|
|
const html = markdownToHtml(continuation);
|
|
const chunks = splitMessage(html);
|
|
let sentAny = false;
|
|
for (const chunk of chunks) {
|
|
try {
|
|
await this.ctx.reply(chunk, { parse_mode: 'HTML' });
|
|
sentAny = true;
|
|
} catch {
|
|
// Fallback to plain
|
|
try {
|
|
await this.ctx.reply(stripMarkdown(chunk), { parse_mode: undefined });
|
|
sentAny = true;
|
|
} catch (e) {
|
|
logger.warn('Fallback send chunk error:', e.message);
|
|
}
|
|
}
|
|
}
|
|
this._alreadySent = sentAny;
|
|
this._finalResponseSent = sentAny;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Simulated streaming — edits a single message in place as content grows.
|
|
* Used by command handlers (/start, /tools, etc.) for visual flair.
|
|
* Falls back to sendFormatted if editing fails.
|
|
*
|
|
* @param {object} ctx - grammy context
|
|
* @param {string} text - Full text to "stream"
|
|
* @param {object} [options] - { editInterval, cursor }
|
|
*/
|
|
export async function sendStreamingMessage(ctx, text, options = {}) {
|
|
if (!text) {
|
|
logger.warn('sendStreamingMessage called with empty text');
|
|
return;
|
|
}
|
|
|
|
const html = markdownToHtml(text);
|
|
logger.info(`📤 Sending message (${text.length} chars, HTML ${html.length} chars)`);
|
|
|
|
// Try sending as formatted HTML first — most reliable
|
|
try {
|
|
const msg = await ctx.reply(html, { parse_mode: 'HTML' });
|
|
logger.info(`✅ Message sent (msg_id: ${msg.message_id})`);
|
|
return;
|
|
} catch (e) {
|
|
logger.warn(`sendStreamingMessage HTML failed (${e.message}), trying plain`);
|
|
}
|
|
|
|
// Fallback: stripped plain text
|
|
try {
|
|
const msg = await ctx.reply(stripMarkdown(text), { parse_mode: undefined });
|
|
logger.info(`✅ Plain message sent (msg_id: ${msg.message_id})`);
|
|
} catch (e2) {
|
|
logger.error(`sendStreamingMessage plain also failed: ${e2.message}`);
|
|
}
|
|
}
|