fix: sentence-level streaming with proper Telegram rate limit throttling
This commit is contained in:
@@ -1,7 +1,10 @@
|
|||||||
// Adapted from claudegram's message-sender.ts — send with retry, chunking, fallback
|
// Adapted from claudegram's message-sender.ts — send with retry, chunking, streaming
|
||||||
import { logger } from '../utils/logger.js';
|
import { logger } from '../utils/logger.js';
|
||||||
|
|
||||||
const MAX_MSG_LENGTH = 4000;
|
const MAX_MSG_LENGTH = 4000;
|
||||||
|
// Telegram rate limit: ~30 edits per minute per chat.
|
||||||
|
// We batch edits with ~1000ms minimum between each to stay safe.
|
||||||
|
const MIN_EDIT_INTERVAL_MS = 1000;
|
||||||
|
|
||||||
export function splitMessage(text) {
|
export function splitMessage(text) {
|
||||||
if (text.length <= MAX_MSG_LENGTH) return [text];
|
if (text.length <= MAX_MSG_LENGTH) return [text];
|
||||||
@@ -25,15 +28,12 @@ export function escapeMarkdown(text) {
|
|||||||
|
|
||||||
export async function sendFormatted(ctx, text) {
|
export async function sendFormatted(ctx, text) {
|
||||||
if (!text) return;
|
if (!text) return;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Try Markdown first
|
|
||||||
const chunks = splitMessage(text);
|
const chunks = splitMessage(text);
|
||||||
for (const chunk of chunks) {
|
for (const chunk of chunks) {
|
||||||
await ctx.reply(chunk, { parse_mode: 'Markdown' });
|
await ctx.reply(chunk, { parse_mode: 'Markdown' });
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
// Fallback to plain text
|
|
||||||
logger.warn('Markdown send failed, falling back to plain text');
|
logger.warn('Markdown send failed, falling back to plain text');
|
||||||
const chunks = splitMessage(text);
|
const chunks = splitMessage(text);
|
||||||
for (const chunk of chunks) {
|
for (const chunk of chunks) {
|
||||||
@@ -42,101 +42,75 @@ export async function sendFormatted(ctx, text) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Streaming message — edits a single message in place as content grows.
|
||||||
|
* Splits on sentences to keep edit count low (Telegram ~30 edits/min limit).
|
||||||
|
* Falls back to sendFormatted if editing fails.
|
||||||
|
*/
|
||||||
export async function sendStreamingMessage(ctx, text, options = {}) {
|
export async function sendStreamingMessage(ctx, text, options = {}) {
|
||||||
if (!text) return;
|
if (!text) return;
|
||||||
|
|
||||||
const {
|
const { sentenceMode = true } = options;
|
||||||
delay = 50,
|
|
||||||
minDelay = 20,
|
|
||||||
maxDelay = 100,
|
|
||||||
charMode = false
|
|
||||||
} = options;
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
logger.info('📡 Starting streaming for message of length:', text.length);
|
// Send placeholder
|
||||||
|
const msg = await ctx.reply('⌨️ Typing...', { parse_mode: 'Markdown' });
|
||||||
|
const messageId = msg.message_id;
|
||||||
|
const chatId = msg.chat.id;
|
||||||
|
|
||||||
// Send initial placeholder message using ctx.api.sendMessage directly
|
if (sentenceMode) {
|
||||||
const sentMsg = await ctx.api.sendMessage(ctx.chat.id, '⌨️ ⌨️', { parse_mode: 'Markdown' });
|
// Split into sentence-level chunks for safe edit rate
|
||||||
|
const segments = text.match(/[^.!?]+[.!?]+[\s]*/g) || [text];
|
||||||
// Log response structure - try to understand the format
|
let accumulated = '';
|
||||||
logger.info('📡 Response typeof:', typeof sentMsg);
|
|
||||||
logger.info('📡 sentMsg constructor:', sentMsg?.constructor?.name);
|
|
||||||
logger.info('📡 sentMsg has result?:', 'result' in (sentMsg || {}));
|
|
||||||
|
|
||||||
// Force log the response to see what we're getting
|
|
||||||
logger.info('📡 sentMsg raw:', sentMsg);
|
|
||||||
logger.info('📡 sentMsg keys (Object.keys):', Object.keys(sentMsg || {}).join(', '));
|
|
||||||
logger.info('📡 sentMsg ownKeys (Reflect.ownKeys):', Reflect.ownKeys(sentMsg || {}).join(', '));
|
|
||||||
|
|
||||||
// The response might be wrapped in a 'result' property
|
|
||||||
const response = sentMsg?.result || sentMsg;
|
|
||||||
logger.info('📡 Response type:', typeof response);
|
|
||||||
logger.info('📡 Response keys:', Object.keys(response || {}).join(', '));
|
|
||||||
|
|
||||||
// Try different access patterns
|
|
||||||
const messageId = response?.['message_id'] || response?.message_id;
|
|
||||||
const chatId = response?.['chat']?.['id'] || response?.chat?.id;
|
|
||||||
|
|
||||||
logger.info('📡 Extracted Message ID:', messageId);
|
|
||||||
logger.info('📡 Extracted Chat ID:', chatId);
|
|
||||||
|
|
||||||
if (!messageId || !chatId) {
|
|
||||||
logger.error('Failed to extract message ID or chat ID from reply response');
|
|
||||||
await sendFormatted(ctx, text);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info('📡 Using message_id:', messageId, 'chat_id:', chatId);
|
|
||||||
|
|
||||||
let sentText = '';
|
|
||||||
|
|
||||||
if (charMode) {
|
|
||||||
// Character-by-character streaming
|
|
||||||
const chars = text.split('');
|
|
||||||
logger.info('📡 Character mode, total chars:', chars.length);
|
|
||||||
|
|
||||||
for (const char of chars) {
|
|
||||||
sentText += char;
|
|
||||||
|
|
||||||
|
for (const segment of segments) {
|
||||||
|
accumulated += segment;
|
||||||
try {
|
try {
|
||||||
await ctx.api.editMessageText(sentText, {
|
await ctx.api.editMessageText(accumulated.trim(), {
|
||||||
chat_id: chatId,
|
chat_id: chatId,
|
||||||
message_id: messageId,
|
message_id: messageId,
|
||||||
parse_mode: 'Markdown'
|
parse_mode: 'Markdown'
|
||||||
});
|
});
|
||||||
} catch (editErr) {
|
} catch (editErr) {
|
||||||
logger.error('Edit error:', editErr.message, 'payload:', { chat_id: chatId, message_id: messageId });
|
// If edit fails (rate limit, content unchanged), just skip
|
||||||
|
if (!editErr.message?.includes('message is not modified')) {
|
||||||
|
logger.warn('Edit skipped:', editErr.message);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
const delayMs = minDelay + Math.random() * (maxDelay - minDelay);
|
// Throttle: wait at least MIN_EDIT_INTERVAL_MS between edits
|
||||||
await new Promise(resolve => setTimeout(resolve, delayMs));
|
await new Promise(r => setTimeout(r, MIN_EDIT_INTERVAL_MS));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Word-by-word streaming
|
// Word-by-word mode — throttled
|
||||||
const words = text.split(' ');
|
const words = text.split(' ');
|
||||||
logger.info('📡 Word mode, total words:', words.length);
|
let accumulated = '';
|
||||||
|
let lastEdit = 0;
|
||||||
|
|
||||||
for (let i = 0; i < words.length; i++) {
|
for (const word of words) {
|
||||||
sentText += (i > 0 ? ' ' : '') + words[i];
|
accumulated += (accumulated ? ' ' : '') + word;
|
||||||
|
const now = Date.now();
|
||||||
|
const elapsed = now - lastEdit;
|
||||||
|
|
||||||
|
if (elapsed < MIN_EDIT_INTERVAL_MS) {
|
||||||
|
await new Promise(r => setTimeout(r, MIN_EDIT_INTERVAL_MS - elapsed));
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await ctx.api.editMessageText(sentText, {
|
await ctx.api.editMessageText(accumulated, {
|
||||||
chat_id: chatId,
|
chat_id: chatId,
|
||||||
message_id: messageId,
|
message_id: messageId,
|
||||||
parse_mode: 'Markdown'
|
parse_mode: 'Markdown'
|
||||||
});
|
});
|
||||||
|
lastEdit = Date.now();
|
||||||
} catch (editErr) {
|
} catch (editErr) {
|
||||||
logger.error('Edit error:', editErr.message, 'payload:', { chat_id: chatId, message_id: messageId });
|
if (!editErr.message?.includes('message is not modified')) {
|
||||||
}
|
logger.warn('Edit skipped:', editErr.message);
|
||||||
|
}
|
||||||
const wordDelay = Math.max(minDelay, Math.min(maxDelay, delay + words[i].length * 5));
|
}
|
||||||
await new Promise(resolve => setTimeout(resolve, wordDelay));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info('📡 Streaming complete');
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error('Streaming send failed:', error);
|
logger.error('Streaming failed, falling back:', error.message);
|
||||||
await sendFormatted(ctx, text);
|
await sendFormatted(ctx, text);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user