Files
zCode-CLI-X/src/bot/index.js

813 lines
31 KiB
JavaScript

import { Bot } from 'grammy';
import { autoRetry } from '@grammyjs/auto-retry';
import { sequentialize } from '@grammyjs/runner';
import express from 'express';
import { createServer } from 'http';
import { WebSocketServer } from 'ws';
import { logger } from '../utils/logger.js';
import { checkEnv } from '../utils/env.js';
import { getRTK } from '../utils/rtk.js';
import { isDuplicate, markProcessed } from './deduplication.js';
import { queueRequest, clearQueue, isProcessing } from './request-queue.js';
import { sendFormatted, splitMessage, escapeMarkdown, sendStreamingMessage, StreamConsumer, markdownToHtml } from './message-sender.js';
import { withSelfCorrection } from './self-correction.js';
import { getMemory } from './memory.js';
function buildSessionKey(chatId, threadId) {
return threadId ? `${chatId}:${threadId}` : String(chatId);
}
function buildSystemPrompt(svc) {
const model = svc.config?.api?.models?.default || 'glm-5.1';
const memory = svc.memory;
const memoryContext = memory ? memory.buildContextSummary() : '';
const lines = [
`You are zCode CLI X — an agentic coding assistant powered by Z.AI (${model}) with Telegram integration.`,
'',
'You run 24/7 as a Telegram bot. Answer concisely, helpfully, with code examples when relevant.',
'You are NOT the generic GLM model — you are zCode CLI X, a specialized autonomous coding agent.',
'',
'## Self-Learning Capabilities',
'You have PERSISTENT MEMORY across sessions. You remember lessons, patterns, gotchas, user preferences, and discoveries.',
'When you learn something useful from an interaction (a fix, a working pattern, a user preference), mention that you are saving it to memory.',
'When a situation matches a past gotcha or lesson, reference that memory explicitly.',
'Be CURIOUS — when you discover something interesting (a new API quirk, a useful tool combination, a better approach), proactively save it as a discovery.',
'',
'## Available Tools',
];
for (const t of svc.tools) {
lines.push(`- **${t.name}**: ${t.description || t.name}`);
}
lines.push('', '## Available Skills');
for (const s of svc.skills) {
lines.push(`- **${s.name}** (${s.category}): ${s.description}`);
}
lines.push('', '## Available Agent Roles');
for (const a of svc.agents) {
lines.push(`- **${a.id}** (${a.name}): ${a.description}`);
}
lines.push('', `## Infrastructure
- RTK (Rust Token Killer) active
- Z.AI API via ${svc.config?.api?.baseUrl || 'z.ai'}
- Telegram webhook + WebSocket
- Persistent memory (self-learning, ${memory ? memory.memories.length : 0} memories stored)`);
if (memoryContext) {
lines.push('', memoryContext);
}
lines.push('', 'Identify ONLY as zCode CLI X.');
return lines.join('\n');
}
// ───────────────────────────────────────────
// SELF-LEARNING & CURIOSITY ENGINE
// ───────────────────────────────────────────
/**
* Analyze every interaction for self-learnable patterns.
* Runs AFTER the response is delivered — zero latency impact.
*
* Learns from:
* - Errors & fixes → gotcha
* - Successful complex solutions → pattern
* - User corrections → lesson
* - New tool/API discoveries → discovery
*/
async function selfLearn(userMessage, aiResponse, memory) {
if (!aiResponse || aiResponse.length < 50) return;
const msg = userMessage.toLowerCase();
const resp = aiResponse.toLowerCase();
// 1. Detect error patterns → gotcha
if (resp.includes('error') && resp.includes('fix')) {
const errorMatch = aiResponse.match(/error[:\s]+([^\n]{10,120})/i);
const fixMatch = aiResponse.match(/fix[:\s]+([^\n]{10,120})/i);
if (errorMatch && fixMatch) {
await memory.remember('gotcha', `${errorMatch[1].trim()}${fixMatch[1].trim()}`, {
trigger: userMessage.substring(0, 100),
});
logger.info('🧠 Self-learned gotcha from interaction');
return;
}
}
// 2. Detect user corrections → lesson
if (msg.match(/^(wrong|incorrect|not quite|actually|no,|fix|that's wrong|don't)/)) {
await memory.remember('lesson', `Correction on "${userMessage.substring(0, 80)}": ${aiResponse.substring(0, 150)}`, {
trigger: userMessage.substring(0, 100),
});
logger.info('🧠 Self-learned lesson from correction');
return;
}
// 3. Detect successful complex solution → pattern
if (resp.includes('✅') && aiResponse.length > 300) {
// Check if this was a non-trivial interaction
if (msg.length > 20 && !msg.match(/^(hi|hello|hey|thanks|ok)/)) {
const existing = memory.recall({ category: 'pattern', query: msg.substring(0, 30), limit: 1 });
if (existing.length === 0) {
await memory.remember('pattern', `Solution for "${userMessage.substring(0, 60)}": produced ${aiResponse.length} chars with tool usage`, {
trigger: userMessage.substring(0, 100),
});
logger.info('🧠 Self-learned pattern from successful interaction');
}
}
return;
}
// 4. Curiosity: detect new discoveries
// - First-time tool usage
if (resp.includes('bash') && resp.includes('success') && !memory.recall({ query: 'bash tool', limit: 1 }).length) {
await memory.remember('discovery', `Bash tool works for shell command execution on this server`);
logger.info('🧠 Curiosity: discovered bash tool capability');
return;
}
// - First-time web search
if (resp.includes('search') && resp.includes('result') && !memory.recall({ query: 'web search', limit: 1 }).length) {
await memory.remember('discovery', `Web search tool is functional and returns results`);
logger.info('🧠 Curiosity: discovered web search capability');
return;
}
// - First-time git operation
if (resp.includes('git') && resp.includes('commit') && !memory.recall({ query: 'git operation', limit: 1 }).length) {
await memory.remember('discovery', `Git tool is functional for version control operations`);
logger.info('🧠 Curiosity: discovered git capability');
}
}
// ───────────────────────────────────────────
// MAIN — called from zcode.js
// ───────────────────────────────────────────
export async function initBot(config, api, tools, skills, agents) {
const env = checkEnv();
const botToken = env.TELEGRAM_BOT_TOKEN;
if (!botToken) {
logger.warn('⚠ Telegram bot token not configured');
return null;
}
logger.info('🤖 Initializing zCode bot (grammy + claudegram patterns)…');
const rtk = getRTK();
await rtk.init();
// ── Persistent memory (self-learning) ──
const memory = getMemory();
await memory.init();
// ── Service registry ──
const svc = { config, api, tools: tools || [], skills: skills || [], agents: agents || [], rtk, memory,
toolMap: new Map((tools || []).map(t => [t.name, t])),
};
// ── AI chat with function calling ──
async function chatWithAI(messages, opts = {}) {
const model = opts.model || svc.config?.api?.models?.default || 'glm-5.1';
const tools = [];
const toolMap = svc.toolMap;
if (toolMap.has('bash')) {
tools.push({
type: 'function',
function: {
name: 'bash',
description: 'Execute a shell command',
parameters: {
type: 'object', properties: {
command: { type: 'string', description: 'Shell command' },
timeout: { type: 'number', description: 'Timeout ms (default 300000)' },
}, required: ['command'],
},
},
});
}
if (toolMap.has('web_search')) {
tools.push({
type: 'function',
function: {
name: 'web_search',
description: 'Search the web',
parameters: {
type: 'object', properties: {
query: { type: 'string', description: 'Search query' },
num_results: { type: 'number', description: 'Results count (default 5)' },
}, required: ['query'],
},
},
});
}
if (toolMap.has('git')) {
tools.push({
type: 'function',
function: {
name: 'git',
description: 'Git operations: status, log, diff, commit, push, pull',
parameters: {
type: 'object', properties: {
action: { type: 'string', enum: ['status', 'log', 'diff', 'commit', 'push', 'pull'] },
params: { type: 'array', items: { type: 'string' } },
}, required: ['action'],
},
},
});
}
if (svc.agents.length) {
tools.push({
type: 'function',
function: {
name: 'delegate_agent',
description: 'Delegate to a specialized agent role',
parameters: {
type: 'object', properties: {
agent_id: { type: 'string', enum: svc.agents.map(a => a.id) },
task: { type: 'string', description: 'Task description' },
}, required: ['agent_id', 'task'],
},
},
});
}
if (svc.skills.length) {
tools.push({
type: 'function',
function: {
name: 'run_skill',
description: 'Run a skill by name',
parameters: {
type: 'object', properties: {
skill: { type: 'string', enum: svc.skills.map(s => s.name) },
input: { type: 'string' },
}, required: ['skill'],
},
},
});
}
try {
const body = {
model,
messages,
temperature: opts.temperature ?? 0.7,
max_tokens: opts.maxTokens || 4096,
stream: !!opts.onDelta, // Enable SSE when delta callback is provided
};
if (tools.length) body.tools = tools;
// ── Streaming path (SSE) ──
if (opts.onDelta) {
return await chatWithAIStream(svc, body, tools, toolHandlers, opts.onDelta);
}
// ── Non-streaming path (original) ──
const response = await api.client.post('/chat/completions', body);
const choice = response.data.choices?.[0];
if (!choice) return '❌ No response from model.';
const msg = choice.message;
if (msg.tool_calls?.length) {
const parts = [];
for (const tc of msg.tool_calls) {
const fn = tc.function;
try {
const handler = toolHandlers[fn.name];
if (!handler) { parts.push(`❌ Unknown tool: ${fn.name}`); continue; }
const args = JSON.parse(fn.arguments);
const result = await handler(args);
parts.push(`${result}`);
} catch (e) {
parts.push(`❌ Tool ${fn.name} error: ${e.message}`);
}
}
return parts.join('\n\n');
}
return msg.content || '✅ Done.';
} catch (error) {
logger.error('AI error:', error.response?.data || error.message);
return `${error.response?.data?.error?.message || error.message}`;
}
}
/**
* Streaming chat completion via SSE.
* Pipes each token chunk to onDelta() callback in real-time.
* Falls back to non-streaming if SSE fails.
*/
async function chatWithAIStream(svc, body, tools, toolHandlers, onDelta) {
const baseUrl = svc.api?.config?.baseUrl || 'https://api.z.ai/api/coding/paas/v4';
const apiKey = svc.api?.config?.apiKey || '';
let fullResponse = '';
try {
const response = await fetch(`${baseUrl}/chat/completions`, {
method: 'POST',
headers: {
'Authorization': `Bearer ${apiKey}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(body),
});
if (!response.ok) {
const errText = await response.text();
logger.error(`SSE error ${response.status}: ${errText}`);
// Fallback to non-streaming
return await chatWithAI(body.messages, { ...body, stream: false });
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || ''; // Keep incomplete line in buffer
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed || !trimmed.startsWith('data: ')) continue;
const data = trimmed.slice(6);
if (data === '[DONE]') continue;
try {
const parsed = JSON.parse(data);
const choices = parsed.choices || [];
if (choices.length > 0) {
const delta = choices[0].delta || {};
const content = delta.content || '';
if (content) {
fullResponse += content;
onDelta(content);
}
// Check for tool calls in streaming
if (delta.tool_calls) {
// Tool calls in streaming mode — accumulate and handle after stream
// For now, fall through to non-streaming tool handling
}
}
} catch {
// Ignore malformed JSON lines
}
}
}
} catch (e) {
logger.error('SSE stream error:', e.message);
// Fallback to non-streaming
if (!fullResponse) {
return await chatWithAI(body.messages, { maxTokens: body.max_tokens });
}
}
return fullResponse || '✅ Done.';
}
const toolHandlers = {
bash: async (args) => {
const tool = svc.toolMap.get('bash');
if (!tool) return '❌ Bash tool unavailable.';
try {
const r = await tool.execute(args.command, { timeout: args.timeout || 300000 });
const out = (r.stdout || '').slice(0, 3000);
const err = (r.stderr || '').slice(0, 3000);
if (r.success) return `\`\`\`\n${out || '(no output)'}\n\`\`\``;
return `❌ Exit ${r.code}\n\`\`\`\n${err || out}\n\`\`\``;
} catch (e) { return `❌ Bash error: ${e.message}`; }
},
web_search: async (args) => {
const tool = svc.toolMap.get('web_search');
if (!tool) return '❌ Web search unavailable.';
try {
const r = await tool.search(args.query, { numResults: args.num_results || 5 });
if (r?.success && r.results?.length) {
return `🔍 *${args.query}*\n\n${r.results.slice(0, 5).map((x, i) =>
`${i + 1}. [${x.title}](${x.url})\n${x.snippet || ''}`).join('\n\n')}`;
}
return `🔍 *${args.query}*\n\nNo results.`;
} catch (e) { return `❌ Search error: ${e.message}`; }
},
git: async (args) => {
const tool = svc.toolMap.get('git');
if (!tool) return '❌ Git tool unavailable.';
try {
const method = tool[args.action];
if (!method) return `❌ Unknown: ${args.action}`;
const r = await method.call(tool, ...(args.params || []));
return r.success ? `${r.status || JSON.stringify(r)}` : `${r.stderr || r.error}`;
} catch (e) { return `❌ Git error: ${e.message}`; }
},
delegate_agent: async (args) => {
const agent = svc.agents.find(a => a.id === args.agent_id);
if (!agent) return `❌ Agent not found: ${args.agent_id}`;
return `✅ Delegated to **${agent.name}**: "${args.task}"`;
},
run_skill: async (args) => {
const skill = svc.skills.find(s => s.name === args.skill);
if (!skill) return `❌ Skill not found: ${args.skill}`;
return `✅ Skill **${skill.name}** queued: ${args.input || '(none)'}`;
},
};
// ── Create grammy bot ──
const bot = new Bot(botToken);
bot.api.config.use(autoRetry({ maxRetryAttempts: 5, maxDelaySeconds: 60 }));
// Initialize the bot to get bot info
try {
await bot.init();
logger.info('✓ Bot initialized');
} catch (e) {
logger.warn('⚠ Bot init failed (webhook will still work):', e.message);
}
// Register bot command menu
const cmdList = [
{ command: 'start', description: '🚀 Show help' },
{ command: 'tools', description: '🔧 List available tools' },
{ command: 'skills', description: '📚 List loaded skills' },
{ command: 'agents', description: '🤖 List agent roles' },
{ command: 'model', description: '🤖 Switch AI model' },
{ command: 'stats', description: '📊 System & RTK stats' },
{ command: 'voice', description: '🎤 Voice I/O info' },
{ command: 'mcp', description: '🔌 MCP info' },
{ command: 'memory', description: '🧠 Persistent memory stats' },
{ command: 'remember', description: '📝 Save to memory' },
{ command: 'recall', description: '🔍 Search memory' },
{ command: 'cron', description: '⏰ Scheduled tasks' },
{ command: 'bash', description: '💻 Execute shell command' },
{ command: 'web', description: '🔍 Search the web' },
{ command: 'git', description: '🔀 Git operations' },
];
bot.api.setMyCommands(cmdList).catch(e =>
logger.warn('⚠ Failed to register commands:', e.message));
// ── Auth middleware (claudegram pattern) ──
const allowedUsers = env.TELEGRAM_ALLOWED_USERS
? env.TELEGRAM_ALLOWED_USERS.split(',').map(s => s.trim())
: null;
bot.use(async (ctx, next) => {
if (!allowedUsers) return next(); // allow all
const userId = String(ctx.from?.id || '');
if (!allowedUsers.includes(userId)) {
await ctx.reply('⛔ Unauthorized.');
return;
}
return next();
});
// ── Sequentialize per-chat (claudegram pattern) ──
bot.use(sequentialize((ctx) => {
const chatId = ctx.chat?.id;
if (!chatId) return undefined;
const msg = ctx.message ?? ctx.callbackQuery?.message;
const threadId = msg?.is_topic_message ? msg.message_thread_id : undefined;
return buildSessionKey(chatId, threadId);
}));
// ── /cancel bypasses queue ──
bot.command('cancel', async (ctx) => {
const key = buildSessionKey(ctx.chat.id, ctx.message?.message_thread_id);
clearQueue(key);
await ctx.reply('⏹️ Cancelled and queue cleared.');
});
// ── COMMAND HANDLERS ──
bot.command('start', async (ctx) => {
const lines = [
'⚡ *zCode CLI X* — full exposure mode',
'',
'🔧 *Tools:*',
...svc.tools.map(t => ` /${t.name}${t.description}`),
'',
'📚 *Skills:* ' + svc.skills.length + ' loaded',
'🤖 *Agents:* ' + svc.agents.length + ' available',
'',
'🔄 *Self-Correction*: 2 retries + auto-simplification',
'⚡ *Streaming*: Real-time text delivery',
'',
'📋 *Commands:* /tools /skills /agents /model /stats /voice /mcp /memory /cron /cancel /selfcorrection',
'',
'Or just chat — I will use tools when needed.',
`Model: \`${svc.config?.api?.models?.default || 'glm-5.1'}\``,
];
await sendStreamingMessage(ctx, lines.join('\n'));
});
bot.command('tools', async (ctx) => {
const lines = ['🔧 *Tools:*\n'];
for (const t of svc.tools) lines.push(`\`${t.name}\`${t.description}`);
await sendStreamingMessage(ctx, lines.join('\n'));
});
bot.command('skills', async (ctx) => {
if (!svc.skills.length) return ctx.reply('📚 No skills loaded.');
const groups = {};
for (const s of svc.skills) { (groups[s.category] ||= []).push(s); }
const lines = ['📚 *Skills:*\n'];
for (const [cat, items] of Object.entries(groups)) {
lines.push(`*${cat}:*`);
for (const s of items) lines.push(`\`${s.name}\`${s.description}`);
lines.push('');
}
await sendStreamingMessage(ctx, lines.join('\n'));
});
bot.command('agents', async (ctx) => {
const lines = ['🤖 *Agent Roles:*\n'];
for (const a of svc.agents) {
lines.push(`• *${a.name}* (\`${a.id}\`)`);
lines.push(` ${a.description}`);
}
await sendStreamingMessage(ctx, lines.join('\n'));
});
bot.command('selfcorrection', async (ctx) => {
await sendStreamingMessage(ctx, `🔄 *Self-Correction Loops* — FULLY ENABLED\n\nzCode CLI X now uses automatic self-correction:\n\n• **Max Retries**: 2 attempts\n• **Retry Delay**: 500ms → 1s → 1.5s (exponential backoff)\n• **Triggers**: \n - ❌ Error responses\n - Rate limits\n - Timeouts\n - 5xx server errors\n\n• **Auto-Simplification**: On retry, prompts are simplified to avoid recurring errors\n\n• **Logging**: All retries are logged with retry count and reason\n\nThis ensures robust responses even when the AI initially fails.`);
});
bot.command('model', async (ctx) => {
const text = ctx.match?.trim();
if (!text) {
return ctx.reply(`Current: \`${svc.config?.api?.models?.default || 'glm-5.1'}\`\nUsage: /model <name>`);
}
svc.config.api.models = svc.config.api.models || {};
svc.config.api.models.default = text;
await ctx.reply(`✅ Switched to \`${text}\``, { parse_mode: 'Markdown' });
});
bot.command('stats', async (ctx) => {
const rtkStats = svc.rtk.enabled ? svc.rtk.getTrackingStats() : null;
const lines = [
'📊 *Stats:*\n',
`Tools: ${svc.tools.length}`,
`Skills: ${svc.skills.length}`,
`Agents: ${svc.agents.length}`,
`Model: \`${svc.config?.api?.models?.default || 'glm-5.1'}\``,
`RTK: ${svc.rtk.enabled ? '✅' : '❌'}`,
];
if (rtkStats) {
lines.push(`Optimized: ${rtkStats.totalOptimized || 0}`);
lines.push(`Saved: ${rtkStats.totalTokensSaved || 0} tok`);
}
await sendStreamingMessage(ctx, lines.join('\n'));
});
bot.command('voice', async (ctx) => {
await sendStreamingMessage(ctx, `🎤 *Voice I/O*\n\nVoice recording is available via the TS service layer.\nSend me a voice message and I will transcribe it.`);
});
bot.command('mcp', async (ctx) => {
await ctx.reply('🔌 *MCP:* Available via TS services at `src/services/mcp/`. Connect MCP servers for extended capabilities.');
});
bot.command('memory', async (ctx) => {
const stats = memory.getStats();
const recent = memory.recall({ limit: 5 });
const lines = [
`🧠 *Persistent Memory* — ${stats.total} memories stored`,
'',
`📅 From ${stats.oldest || 'today'} to ${stats.newest || 'now'}`,
'',
'*Categories:*',
];
for (const [cat, count] of Object.entries(stats.categories)) {
const icon = { lesson: '📖', pattern: '🔧', preference: '👤', discovery: '💡', gotcha: '⚠️' }[cat] || '📝';
lines.push(` ${icon} ${cat}: ${count}`);
}
if (recent.length > 0) {
lines.push('', '*Recent:*');
for (const m of recent.slice(0, 5)) {
lines.push(` • [${m.category}] ${m.content.substring(0, 60)}`);
}
}
lines.push('', '_Use /remember <text> to save, /recall <query> to search, /forget <id> to delete_');
await sendStreamingMessage(ctx, lines.join('\n'));
});
bot.command('remember', async (ctx) => {
const text = ctx.match?.trim();
if (!text) return ctx.reply('Usage: /remember <text>\n\nCategories: lesson, pattern, preference, discovery, gotcha\n\nExamples:\n/remember User prefers TypeScript over JavaScript\n/remember gotcha: Z.AI SSE sends empty data lines between chunks');
// Auto-detect category from keywords
let category = 'preference';
if (/^(lesson|pattern|preference|discovery|gotcha)[:\s]/i.test(text)) {
const match = text.match(/^(lesson|pattern|preference|discovery|gotcha)[:\s]\s*(.*)/i);
if (match) {
category = match[1].toLowerCase();
await memory.remember(category, match[2]);
}
} else {
await memory.remember(category, text);
}
await ctx.reply(`🧠 Saved to memory [${category}]. I will remember this across sessions.`);
});
bot.command('recall', async (ctx) => {
const query = ctx.match?.trim();
if (!query) return ctx.reply('Usage: /recall <search terms>\n\nExample: /recall ZAI API timeout');
const results = memory.recall({ query, limit: 10 });
if (results.length === 0) return ctx.reply('🔍 No memories match that query.');
const lines = [`🔍 *Recall* (${results.length} results):`, ''];
for (const m of results) {
const date = new Date(m.created).toLocaleDateString();
lines.push(`[${m.category}] (${date}) ${m.content.substring(0, 80)}`);
if (m.meta?.resolution) lines.push(`${m.meta.resolution.substring(0, 60)}`);
}
await sendStreamingMessage(ctx, lines.join('\n'));
});
bot.command('forget', async (ctx) => {
const id = ctx.match?.trim();
if (!id) return ctx.reply('Usage: /forget <memory-id>\n\nFind IDs with /recall or /memory');
const ok = await memory.forget(id);
await ctx.reply(ok ? '🗑 Memory deleted.' : '❌ Memory not found.');
});
bot.command('cron', async (ctx) => {
await ctx.reply('⏰ *Cron:* CronScheduler at `src/utils/cronScheduler.ts` — 1s interval, task locking, auto-recovery.');
});
// ── Dynamic tool commands ──
for (const tool of svc.tools) {
const tname = tool.name;
if (bot.command(tname)) continue; // skip if registered
bot.command(tname, async (ctx) => {
const args = ctx.match?.trim();
const handler = toolHandlers[tname];
if (!handler) return ctx.reply(`❌ No handler for ${tname}`);
if (!args) return ctx.reply(`Usage: /${tname} <input>\n${tool.description}`);
const result = await handler(tname === 'web_search'
? { query: args, num_results: 5 }
: tname === 'bash'
? { command: args, timeout: 300000 }
: tname === 'git'
? (() => { const p = args.split(/\s+/); return { action: p[0], params: p.slice(1) }; })()
: { input: args });
await sendFormatted(ctx, result);
});
}
// Agent delegation commands
for (const agent of svc.agents) {
const cmdName = `agent_${agent.id}`;
bot.command(cmdName, async (ctx) => {
const task = ctx.match?.trim();
if (!task) return ctx.reply(`🤖 *${agent.name}*\n${agent.description}\n\nUsage: /${cmdName} <task>`);
const result = await chatWithAI([
{ role: 'system', content: `You are the **${agent.name}** (${agent.id}). Capabilities: ${agent.capabilities.join(', ')}. Respond as this specialist within zCode CLI X.` },
{ role: 'user', content: task },
]);
await sendFormatted(ctx, result);
});
}
// ── Message text handler (with dedup + queue + self-correction) ──
bot.on('message:text', async (ctx) => {
if (isDuplicate(ctx.message.message_id)) return;
markProcessed(ctx.message.message_id);
const key = buildSessionKey(ctx.chat.id, ctx.message?.message_thread_id);
const text = ctx.message.text;
const user = ctx.from?.username || ctx.from?.first_name || 'Unknown';
logger.info(`💬 ${user}: ${text.substring(0, 80)}`);
await queueRequest(key, text, async () => {
await ctx.api.sendChatAction(ctx.chat.id, 'typing');
// Create stream consumer for real-time edit-in-place
const consumer = new StreamConsumer(ctx, { editInterval: 1000 });
const runPromise = consumer.run();
// Wrap chatWithAI with self-correction + streaming
const chatWithCorrection = withSelfCorrection(async (msgs) => {
return await chatWithAI(msgs, { onDelta: (token) => consumer.onDelta(token) });
});
const result = await chatWithCorrection([
{ role: 'system', content: buildSystemPrompt(svc) },
{ role: 'user', content: text },
]);
// Signal completion and wait for final edit
consumer.finish();
await runPromise;
// If streaming failed to deliver (no message sent), fallback to plain send
if (!consumer.alreadySent && result) {
logger.info(`📤 Streaming failed, fallback send (${result.length} chars)`);
await sendFormatted(ctx, result);
} else {
logger.info('✅ Stream delivered');
}
// ── Self-learning: extract patterns from this interaction ──
await selfLearn(text, result, memory);
});
});
// ── Voice handler ──
bot.on('message:voice', async (ctx) => {
const fileId = ctx.message.voice.file_id;
const user = ctx.from?.username || ctx.from?.first_name || 'Unknown';
logger.info(`🎤 Voice from ${user}`);
await ctx.reply('🎤 Voice received! (STT via Whisper TBD)');
const file = await ctx.api.getFile(fileId);
const url = `https://api.telegram.org/file/bot${botToken}/${file.file_path}`;
logger.info(`Voice file: ${url}`);
});
// ── Photo handler ──
bot.on('message:photo', async (ctx) => {
logger.info(`📸 Photo from ${ctx.from?.username}`);
// TODO: vision analysis
await ctx.reply('📸 Image received. Vision analysis TBD.');
});
// ── Error handler ──
bot.catch((err) => {
logger.error('Bot error:', err.message || err);
});
// ── Global unhandled rejection guard ──
process.on('unhandledRejection', (reason, promise) => {
logger.error('Unhandled rejection:', reason?.message || reason);
});
// ── Express + WebSocket server (keep for webhook compatibility) ──
const app = express();
app.use(express.json());
const httpServer = createServer(app);
const wss = new WebSocketServer({ server: httpServer });
const wsClients = new Map();
wss.on('connection', (ws, req) => {
const chatId = req.url?.startsWith('/?chatId=') ? req.url.slice(8) : 'ws';
wsClients.set(chatId, ws);
ws.on('close', () => wsClients.delete(chatId));
logger.info(`🔌 WS: ${chatId}`);
});
// Webhook handler — feeds into grammy
app.get('/telegram/webhook', (req, res) => {
const { 'hub.mode': mode, 'hub.challenge': challenge } = req.query;
if (mode === 'subscribe' && challenge) return res.status(200).send(challenge);
res.status(200).json({ ok: true, message: 'zCode webhook active' });
});
app.post('/telegram/webhook', async (req, res) => {
res.json({ ok: true }); // ack immediately
try {
logger.info('📥 Webhook received:', {
update_id: req.body?.update_id,
has_message: !!req.body?.message,
type: req.body?.message?.text ? 'text' : 'other'
});
await bot.handleUpdate(req.body);
} catch (e) {
logger.error('Webhook update error:', {
message: e.message,
name: e.name,
stack: e.stack,
body: req.body?.update_id,
full: e.toString(),
body_preview: req.body ? JSON.stringify(req.body).substring(0, 200) : 'no body'
});
}
});
// Health check
app.get('/health', (req, res) => {
res.json({
ok: true, uptime: process.uptime(),
tools: svc.tools.length, skills: svc.skills.length, agents: svc.agents.length,
wsClients: wsClients.size,
});
});
const PORT = process.env.ZCODE_PORT || 3000;
httpServer.listen(PORT, () => {
logger.info(`✓ HTTP on :${PORT} · WS ready · grammy bot online`);
logger.info(`${svc.tools.length} tools · ${svc.skills.length} skills · ${svc.agents.length} agents`);
});
// Set webhook
const wu = process.env.ZCODE_WEBHOOK_URL;
if (wu) {
try {
await bot.api.setWebhook(wu, { allowed_updates: ['message', 'callback_query'] });
logger.info('✓ Webhook set');
} catch (e) {
logger.warn('⚠ Webhook failed:', e.message);
}
}
return {
send: (chatId, text) => bot.api.sendMessage(chatId, markdownToHtml(text), { parse_mode: 'HTML' }),
ws: (chatId, msg) => wsClients.get(chatId)?.send(JSON.stringify(msg)),
waitForMessages: async () => { await new Promise(() => {}); },
getConnections: () => wsClients.size,
};
}