feat: real-time SSE streaming via StreamConsumer (adapted from Hermes Agent)

- StreamConsumer class: queued token buffer → rate-limited editMessageText loop
- Adaptive flood control backoff (3 strikes → fallback to plain send)
- Cursor indicator (▉) during typing, stripped on completion
- chatWithAI now supports onDelta callback for SSE token streaming
- Uses native fetch() for SSE (Node 18+), falls back to non-streaming on error
- Message handler wires StreamConsumer into the chat pipeline
- Graceful fallback: if streaming fails entirely, sends as plain message
This commit is contained in:
admin
2026-05-05 14:13:03 +00:00
Unverified
parent e17af157ae
commit ed4a4c35e8
2 changed files with 410 additions and 72 deletions

View File

@@ -9,7 +9,7 @@ import { checkEnv } from '../utils/env.js';
import { getRTK } from '../utils/rtk.js';
import { isDuplicate, markProcessed } from './deduplication.js';
import { queueRequest, clearQueue, isProcessing } from './request-queue.js';
import { sendFormatted, splitMessage, escapeMarkdown, sendStreamingMessage } from './message-sender.js';
import { sendFormatted, splitMessage, escapeMarkdown, sendStreamingMessage, StreamConsumer } from './message-sender.js';
import { withSelfCorrection } from './self-correction.js';
function buildSessionKey(chatId, threadId) {
@@ -155,9 +155,16 @@ export async function initBot(config, api, tools, skills, agents) {
messages,
temperature: opts.temperature ?? 0.7,
max_tokens: opts.maxTokens || 4096,
stream: !!opts.onDelta, // Enable SSE when delta callback is provided
};
if (tools.length) body.tools = tools;
// ── Streaming path (SSE) ──
if (opts.onDelta) {
return await chatWithAIStream(svc, body, tools, toolHandlers, opts.onDelta);
}
// ── Non-streaming path (original) ──
const response = await api.client.post('/chat/completions', body);
const choice = response.data.choices?.[0];
if (!choice) return '❌ No response from model.';
@@ -186,6 +193,83 @@ export async function initBot(config, api, tools, skills, agents) {
}
}
/**
* Streaming chat completion via SSE.
* Pipes each token chunk to onDelta() callback in real-time.
* Falls back to non-streaming if SSE fails.
*/
async function chatWithAIStream(svc, body, tools, toolHandlers, onDelta) {
const baseUrl = svc.api?.config?.baseUrl || 'https://api.z.ai/api/coding/paas/v4';
const apiKey = svc.api?.config?.apiKey || '';
let fullResponse = '';
try {
const response = await fetch(`${baseUrl}/chat/completions`, {
method: 'POST',
headers: {
'Authorization': `Bearer ${apiKey}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(body),
});
if (!response.ok) {
const errText = await response.text();
logger.error(`SSE error ${response.status}: ${errText}`);
// Fallback to non-streaming
return await chatWithAI(body.messages, { ...body, stream: false });
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || ''; // Keep incomplete line in buffer
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed || !trimmed.startsWith('data: ')) continue;
const data = trimmed.slice(6);
if (data === '[DONE]') continue;
try {
const parsed = JSON.parse(data);
const choices = parsed.choices || [];
if (choices.length > 0) {
const delta = choices[0].delta || {};
const content = delta.content || '';
if (content) {
fullResponse += content;
onDelta(content);
}
// Check for tool calls in streaming
if (delta.tool_calls) {
// Tool calls in streaming mode — accumulate and handle after stream
// For now, fall through to non-streaming tool handling
}
}
} catch {
// Ignore malformed JSON lines
}
}
}
} catch (e) {
logger.error('SSE stream error:', e.message);
// Fallback to non-streaming
if (!fullResponse) {
return await chatWithAI(body.messages, { maxTokens: body.max_tokens });
}
}
return fullResponse || '✅ Done.';
}
const toolHandlers = {
bash: async (args) => {
const tool = svc.toolMap.get('bash');
@@ -437,9 +521,13 @@ export async function initBot(config, api, tools, skills, agents) {
await queueRequest(key, text, async () => {
await ctx.api.sendChatAction(ctx.chat.id, 'typing');
// Wrap chatWithAI with self-correction
// Create stream consumer for real-time edit-in-place
const consumer = new StreamConsumer(ctx, { editInterval: 1000 });
const runPromise = consumer.run();
// Wrap chatWithAI with self-correction + streaming
const chatWithCorrection = withSelfCorrection(async (msgs) => {
return await chatWithAI(msgs, {});
return await chatWithAI(msgs, { onDelta: (token) => consumer.onDelta(token) });
});
const result = await chatWithCorrection([
@@ -447,8 +535,14 @@ export async function initBot(config, api, tools, skills, agents) {
{ role: 'user', content: text },
]);
// Send with streaming effect
await sendStreamingMessage(ctx, result);
// Signal completion and wait for final edit
consumer.finish();
await runPromise;
// If streaming failed to deliver (no message sent), fallback to plain send
if (!consumer.alreadySent && result) {
await sendFormatted(ctx, result);
}
});
});

View File

@@ -1,10 +1,22 @@
// Adapted from claudegram's message-sender.ts — send with retry, chunking, streaming
/**
* Streaming message consumer — bridges SSE token stream to Telegram editMessageText.
*
* Architecture (adapted from Hermes Agent's GatewayStreamConsumer):
* 1. Agent fires onDelta(token) for each SSE chunk
* 2. Tokens accumulate in a buffer
* 3. An async run() loop edits a single message at ~1s intervals
* 4. Adaptive backoff on flood control, graceful fallback to plain send
*
* Credit: Hermes Agent gateway/stream_consumer.py (NousResearch/hermes-agent)
*/
import { logger } from '../utils/logger.js';
const MAX_MSG_LENGTH = 4000;
// Telegram rate limit: ~30 edits per minute per chat.
// We batch edits with ~1000ms minimum between each to stay safe.
const MIN_EDIT_INTERVAL_MS = 1000;
const MAX_MSG_LENGTH = 4096;
const DEFAULT_EDIT_INTERVAL_MS = 1000;
const DEFAULT_BUFFER_THRESHOLD = 40;
const MAX_FLOOD_STRIKES = 3;
const CURSOR = ' ▉';
export function splitMessage(text) {
if (text.length <= MAX_MSG_LENGTH) return [text];
@@ -43,74 +55,306 @@ export async function sendFormatted(ctx, text) {
}
/**
* Streaming message — edits a single message in place as content grows.
* Splits on sentences to keep edit count low (Telegram ~30 edits/min limit).
* StreamConsumer — progressive edit-in-place streaming for Telegram.
*
* Usage:
* const consumer = new StreamConsumer(ctx, { editInterval: 1000 });
* const runPromise = consumer.run(); // start async edit loop
* // ... call consumer.onDelta(token) for each SSE chunk ...
* consumer.finish();
* await runPromise; // wait for final edit
*/
export class StreamConsumer {
constructor(ctx, options = {}) {
this.ctx = ctx;
this.editInterval = options.editInterval || DEFAULT_EDIT_INTERVAL_MS;
this.bufferThreshold = options.bufferThreshold || DEFAULT_BUFFER_THRESHOLD;
this.cursor = options.cursor !== undefined ? options.cursor : CURSOR;
// Internal state
this._queue = [];
this._done = false;
this._accumulated = '';
this._messageId = null;
this._chatId = null;
this._alreadySent = false;
this._editSupported = true;
this._lastEditTime = 0;
this._lastSentText = '';
this._fallbackFinalSend = false;
this._fallbackPrefix = '';
this._floodStrikes = 0;
this._currentEditInterval = this.editInterval;
this._finalResponseSent = false;
this._drainResolve = null;
this._drainPromise = new Promise(r => { this._drainResolve = r; });
}
/**
* Thread-safe callback — called for each SSE token chunk.
*/
onDelta(text) {
if (text) {
this._queue.push(text);
// Wake up the run() loop
if (this._drainResolve) {
const r = this._drainResolve;
this._drainResolve = null;
this._drainPromise = new Promise(resolve => { this._drainResolve = resolve; });
r();
}
}
}
/** Signal stream completion. */
finish() {
this._done = true;
if (this._drainResolve) {
const r = this._drainResolve;
this._drainResolve = null;
r();
}
}
get alreadySent() { return this._alreadySent; }
get finalResponseSent() { return this._finalResponseSent; }
async run() {
const safeLimit = MAX_MSG_LENGTH - 20;
try {
while (true) {
// Drain all available items from queue
while (this._queue.length > 0) {
this._accumulated += this._queue.shift();
}
// Check if done
if (this._done) break;
// Decide whether to flush an edit
const now = Date.now();
const elapsed = now - this._lastEditTime;
const shouldEdit =
elapsed >= this._currentEditInterval && this._accumulated.length > 0
|| this._accumulated.length >= this.bufferThreshold;
if (shouldEdit && this._accumulated.trim()) {
// Handle overflow: if text exceeds limit, split
while (this._accumulated.length > safeLimit && this._messageId !== null && this._editSupported) {
const splitAt = this._accumulated.lastIndexOf('\n', 0, safeLimit);
const chunk = this._accumulated.slice(0, splitAt > safeLimit / 2 ? splitAt : safeLimit);
const ok = await this._sendOrEdit(chunk);
if (this._fallbackFinalSend || !ok) break;
this._accumulated = this._accumulated.slice(chunk.length).replace(/^\n+/, '');
this._messageId = null;
this._lastSentText = '';
}
// Add cursor if not final
const displayText = this._accumulated + this.cursor;
await this._sendOrEdit(displayText);
this._lastEditTime = Date.now();
}
// Wait for more data or completion
if (!this._done && this._queue.length === 0) {
await Promise.race([
this._drainPromise,
new Promise(r => setTimeout(r, 50)),
]);
}
}
// Final edit without cursor
if (this._accumulated.trim()) {
if (this._fallbackFinalSend) {
await this._sendFallbackFinal(this._accumulated);
} else if (this._messageId) {
await this._sendOrEdit(this._accumulated);
this._finalResponseSent = true;
} else {
await this._sendOrEdit(this._accumulated);
this._finalResponseSent = true;
}
}
} catch (e) {
logger.error('StreamConsumer error:', e);
}
}
async _sendOrEdit(text) {
if (!text.trim()) return true;
// Skip cursor-only messages for first send
const visibleWithoutCursor = text.replace(this.cursor, '').trim();
if (!visibleWithoutCursor) return true;
// Don't create tiny first messages with just cursor
if (this._messageId === null && this.cursor && text.includes(this.cursor) && visibleWithoutCursor.length < 4) {
return true;
}
try {
if (this._messageId !== null) {
if (this._editSupported) {
// Skip if identical
if (text === this._lastSentText) return true;
const result = await this._editMessage(this._messageId, this._chatId, text);
if (result) {
this._alreadySent = true;
this._lastSentText = text;
this._floodStrikes = 0;
return true;
} else {
// Flood control — adaptive backoff
this._floodStrikes++;
this._currentEditInterval = Math.min(this._currentEditInterval * 2, 10000);
logger.debug(
`Flood control on edit (strike ${this._floodStrikes}/${MAX_FLOOD_STRIKES}), ` +
`backoff → ${this._currentEditInterval}ms`
);
if (this._floodStrikes >= MAX_FLOOD_STRIKES) {
// Enter fallback mode
logger.debug(`Edit failed (strikes=${this._floodStrikes}), entering fallback mode`);
this._fallbackPrefix = this._visiblePrefix();
this._fallbackFinalSend = true;
this._editSupported = false;
this._alreadySent = true;
await this._tryStripCursor();
return false;
}
this._lastEditTime = Date.now();
return false;
}
} else {
return false;
}
} else {
// First message — send new
try {
const msg = await this.ctx.api.sendMessage(this.ctx.chat.id, text, { parse_mode: undefined });
if (msg && msg.message_id) {
this._messageId = msg.message_id;
this._chatId = msg.chat.id;
this._alreadySent = true;
this._lastSentText = text;
return true;
} else {
this._editSupported = false;
return false;
}
} catch (sendErr) {
logger.warn('Initial send failed:', sendErr.message);
this._editSupported = false;
return false;
}
}
} catch (e) {
logger.error('Stream send/edit error:', e);
return false;
}
}
async _editMessage(messageId, chatId, text) {
try {
await this.ctx.api.editMessageText(chatId, messageId, text, { parse_mode: undefined });
return true;
} catch (err) {
const msg = err.message || '';
if (msg.includes('not modified')) return true;
if (msg.includes('too long') || msg.includes('message_too_long')) {
// Truncate
const truncated = text.slice(0, MAX_MSG_LENGTH - 20) + '…';
try {
await this.ctx.api.editMessageText(chatId, messageId, truncated, { parse_mode: undefined });
return true;
} catch { return false; }
}
if (msg.includes('Flood') || msg.includes('flood') || msg.includes('retry after') || msg.includes('rate')) {
return false; // Caller handles backoff
}
logger.warn(`Edit error: ${msg}`);
return false;
}
}
_visiblePrefix() {
let prefix = this._lastSentText || '';
if (this.cursor && prefix.endsWith(this.cursor)) {
prefix = prefix.slice(0, -this.cursor.length);
}
return prefix;
}
async _tryStripCursor() {
if (!this._messageId) return;
const prefix = this._visiblePrefix();
if (!prefix.trim()) return;
try {
await this.ctx.api.editMessageText(this._chatId, this._messageId, prefix, { parse_mode: undefined });
this._lastSentText = prefix;
} catch { /* best-effort */ }
}
async _sendFallbackFinal(text) {
const prefix = this._fallbackPrefix || this._visiblePrefix();
let continuation = text;
if (prefix && text.startsWith(prefix)) {
continuation = text.slice(prefix.length).replace(/^\s+/, '');
}
this._fallbackFinalSend = false;
if (!continuation.trim()) {
this._alreadySent = true;
this._finalResponseSent = true;
return;
}
// Try to strip cursor from last partial
await this._tryStripCursor();
const chunks = splitMessage(continuation);
let sentAny = false;
for (const chunk of chunks) {
try {
await this.ctx.reply(chunk, { parse_mode: undefined });
sentAny = true;
} catch (e) {
logger.warn('Fallback send chunk error:', e.message);
}
}
this._already_sent = sentAny;
this._finalResponseSent = sentAny;
}
}
/**
* Simulated streaming — edits a single message in place as content grows.
* Used when the AI doesn't support SSE streaming (full response received first).
* Falls back to sendFormatted if editing fails.
*
* @deprecated Prefer StreamConsumer with real SSE streaming
*/
export async function sendStreamingMessage(ctx, text, options = {}) {
if (!text) return;
const { sentenceMode = true } = options;
const consumer = new StreamConsumer(ctx, {
editInterval: 800,
bufferThreshold: 20,
cursor: '',
});
try {
// Send placeholder
const msg = await ctx.reply('⌨️ Typing...', { parse_mode: 'Markdown' });
const messageId = msg.message_id;
const chatId = msg.chat.id;
const runPromise = consumer.run();
if (sentenceMode) {
// Split into sentence-level chunks for safe edit rate
const segments = text.match(/[^.!?]+[.!?]+[\s]*/g) || [text];
let accumulated = '';
for (const segment of segments) {
accumulated += segment;
try {
await ctx.api.editMessageText(accumulated.trim(), {
chat_id: chatId,
message_id: messageId,
parse_mode: 'Markdown'
});
} catch (editErr) {
// If edit fails (rate limit, content unchanged), just skip
if (!editErr.message?.includes('message is not modified')) {
logger.warn('Edit skipped:', editErr.message);
}
}
// Throttle: wait at least MIN_EDIT_INTERVAL_MS between edits
await new Promise(r => setTimeout(r, MIN_EDIT_INTERVAL_MS));
}
} else {
// Word-by-word mode — throttled
const words = text.split(' ');
let accumulated = '';
let lastEdit = 0;
for (const word of words) {
accumulated += (accumulated ? ' ' : '') + word;
const now = Date.now();
const elapsed = now - lastEdit;
if (elapsed < MIN_EDIT_INTERVAL_MS) {
await new Promise(r => setTimeout(r, MIN_EDIT_INTERVAL_MS - elapsed));
}
try {
await ctx.api.editMessageText(accumulated, {
chat_id: chatId,
message_id: messageId,
parse_mode: 'Markdown'
});
lastEdit = Date.now();
} catch (editErr) {
if (!editErr.message?.includes('message is not modified')) {
logger.warn('Edit skipped:', editErr.message);
}
}
}
}
} catch (error) {
logger.error('Streaming failed, falling back:', error.message);
await sendFormatted(ctx, text);
// Simulate streaming by feeding sentence-by-sentence
const segments = text.match(/[^.!?]+[.!?]+[\s]*/g) || [text];
for (const segment of segments) {
consumer.onDelta(segment);
await new Promise(r => setTimeout(r, 600));
}
consumer.finish();
await runPromise;
}