import { Bot } from 'grammy'; import { autoRetry } from '@grammyjs/auto-retry'; import { sequentialize } from '@grammyjs/runner'; import express from 'express'; import { createServer } from 'http'; import { WebSocketServer } from 'ws'; import { logger } from '../utils/logger.js'; import { checkEnv } from '../utils/env.js'; import { getRTK } from '../utils/rtk.js'; import { isDuplicate, markProcessed } from './deduplication.js'; import { queueRequest, clearQueue, isProcessing } from './request-queue.js'; import { sendFormatted, splitMessage, escapeMarkdown, sendStreamingMessage, StreamConsumer, markdownToHtml } from './message-sender.js'; import { withSelfCorrection } from './self-correction.js'; import { getMemory } from './memory.js'; function buildSessionKey(chatId, threadId) { return threadId ? `${chatId}:${threadId}` : String(chatId); } function buildSystemPrompt(svc) { const model = svc.config?.api?.models?.default || 'glm-5.1'; const memory = svc.memory; const memoryContext = memory ? memory.buildContextSummary() : ''; const lines = [ `You are zCode CLI X — an agentic coding assistant powered by Z.AI (${model}) with Telegram integration.`, '', 'You run 24/7 as a Telegram bot. Answer concisely, helpfully, with code examples when relevant.', 'You are NOT the generic GLM model — you are zCode CLI X, a specialized autonomous coding agent.', '', '## Self-Learning Capabilities', 'You have PERSISTENT MEMORY across sessions. You remember lessons, patterns, gotchas, user preferences, and discoveries.', 'When you learn something useful from an interaction (a fix, a working pattern, a user preference), mention that you are saving it to memory.', 'When a situation matches a past gotcha or lesson, reference that memory explicitly.', 'Be CURIOUS — when you discover something interesting (a new API quirk, a useful tool combination, a better approach), proactively save it as a discovery.', '', '## Available Tools', ]; for (const t of svc.tools) { lines.push(`- **${t.name}**: ${t.description || t.name}`); } lines.push('', '## Available Skills'); for (const s of svc.skills) { lines.push(`- **${s.name}** (${s.category}): ${s.description}`); } lines.push('', '## Available Agent Roles'); for (const a of svc.agents) { lines.push(`- **${a.id}** (${a.name}): ${a.description}`); } lines.push('', `## Infrastructure - RTK (Rust Token Killer) active - Z.AI API via ${svc.config?.api?.baseUrl || 'z.ai'} - Telegram webhook + WebSocket - Persistent memory (self-learning, ${memory ? memory.memories.length : 0} memories stored)`); if (memoryContext) { lines.push('', memoryContext); } lines.push('', 'Identify ONLY as zCode CLI X.'); return lines.join('\n'); } // ─────────────────────────────────────────── // SELF-LEARNING & CURIOSITY ENGINE // ─────────────────────────────────────────── /** * Analyze every interaction for self-learnable patterns. * Runs AFTER the response is delivered — zero latency impact. * * Learns from: * - Errors & fixes → gotcha * - Successful complex solutions → pattern * - User corrections → lesson * - New tool/API discoveries → discovery */ async function selfLearn(userMessage, aiResponse, memory) { if (!aiResponse || aiResponse.length < 50) return; const msg = userMessage.toLowerCase(); const resp = aiResponse.toLowerCase(); // 1. Detect error patterns → gotcha if (resp.includes('error') && resp.includes('fix')) { const errorMatch = aiResponse.match(/error[:\s]+([^\n]{10,120})/i); const fixMatch = aiResponse.match(/fix[:\s]+([^\n]{10,120})/i); if (errorMatch && fixMatch) { await memory.remember('gotcha', `${errorMatch[1].trim()} → ${fixMatch[1].trim()}`, { trigger: userMessage.substring(0, 100), }); logger.info('🧠 Self-learned gotcha from interaction'); return; } } // 2. Detect user corrections → lesson if (msg.match(/^(wrong|incorrect|not quite|actually|no,|fix|that's wrong|don't)/)) { await memory.remember('lesson', `Correction on "${userMessage.substring(0, 80)}": ${aiResponse.substring(0, 150)}`, { trigger: userMessage.substring(0, 100), }); logger.info('🧠 Self-learned lesson from correction'); return; } // 3. Detect successful complex solution → pattern if (resp.includes('✅') && aiResponse.length > 300) { // Check if this was a non-trivial interaction if (msg.length > 20 && !msg.match(/^(hi|hello|hey|thanks|ok)/)) { const existing = memory.recall({ category: 'pattern', query: msg.substring(0, 30), limit: 1 }); if (existing.length === 0) { await memory.remember('pattern', `Solution for "${userMessage.substring(0, 60)}": produced ${aiResponse.length} chars with tool usage`, { trigger: userMessage.substring(0, 100), }); logger.info('🧠 Self-learned pattern from successful interaction'); } } return; } // 4. Curiosity: detect new discoveries // - First-time tool usage if (resp.includes('bash') && resp.includes('success') && !memory.recall({ query: 'bash tool', limit: 1 }).length) { await memory.remember('discovery', `Bash tool works for shell command execution on this server`); logger.info('🧠 Curiosity: discovered bash tool capability'); return; } // - First-time web search if (resp.includes('search') && resp.includes('result') && !memory.recall({ query: 'web search', limit: 1 }).length) { await memory.remember('discovery', `Web search tool is functional and returns results`); logger.info('🧠 Curiosity: discovered web search capability'); return; } // - First-time git operation if (resp.includes('git') && resp.includes('commit') && !memory.recall({ query: 'git operation', limit: 1 }).length) { await memory.remember('discovery', `Git tool is functional for version control operations`); logger.info('🧠 Curiosity: discovered git capability'); } } // ─────────────────────────────────────────── // MAIN — called from zcode.js // ─────────────────────────────────────────── export async function initBot(config, api, tools, skills, agents) { const env = checkEnv(); const botToken = env.TELEGRAM_BOT_TOKEN; if (!botToken) { logger.warn('⚠ Telegram bot token not configured'); return null; } logger.info('🤖 Initializing zCode bot (grammy + claudegram patterns)…'); const rtk = getRTK(); await rtk.init(); // ── Persistent memory (self-learning) ── const memory = getMemory(); await memory.init(); // ── Service registry ── const svc = { config, api, tools: tools || [], skills: skills || [], agents: agents || [], rtk, memory, toolMap: new Map((tools || []).map(t => [t.name, t])), }; // ── AI chat with function calling ── async function chatWithAI(messages, opts = {}) { const model = opts.model || svc.config?.api?.models?.default || 'glm-5.1'; const tools = []; const toolMap = svc.toolMap; if (toolMap.has('bash')) { tools.push({ type: 'function', function: { name: 'bash', description: 'Execute a shell command', parameters: { type: 'object', properties: { command: { type: 'string', description: 'Shell command' }, timeout: { type: 'number', description: 'Timeout ms (default 300000)' }, }, required: ['command'], }, }, }); } if (toolMap.has('web_search')) { tools.push({ type: 'function', function: { name: 'web_search', description: 'Search the web', parameters: { type: 'object', properties: { query: { type: 'string', description: 'Search query' }, num_results: { type: 'number', description: 'Results count (default 5)' }, }, required: ['query'], }, }, }); } if (toolMap.has('git')) { tools.push({ type: 'function', function: { name: 'git', description: 'Git operations: status, log, diff, commit, push, pull', parameters: { type: 'object', properties: { action: { type: 'string', enum: ['status', 'log', 'diff', 'commit', 'push', 'pull'] }, params: { type: 'array', items: { type: 'string' } }, }, required: ['action'], }, }, }); } if (svc.agents.length) { tools.push({ type: 'function', function: { name: 'delegate_agent', description: 'Delegate to a specialized agent role', parameters: { type: 'object', properties: { agent_id: { type: 'string', enum: svc.agents.map(a => a.id) }, task: { type: 'string', description: 'Task description' }, }, required: ['agent_id', 'task'], }, }, }); } if (svc.skills.length) { tools.push({ type: 'function', function: { name: 'run_skill', description: 'Run a skill by name', parameters: { type: 'object', properties: { skill: { type: 'string', enum: svc.skills.map(s => s.name) }, input: { type: 'string' }, }, required: ['skill'], }, }, }); } try { const body = { model, messages, temperature: opts.temperature ?? 0.7, max_tokens: opts.maxTokens || 4096, stream: !!opts.onDelta, // Enable SSE when delta callback is provided }; if (tools.length) body.tools = tools; // ── Streaming path (SSE) ── if (opts.onDelta) { return await chatWithAIStream(svc, body, tools, toolHandlers, opts.onDelta); } // ── Non-streaming path (original) ── const response = await api.client.post('/chat/completions', body); const choice = response.data.choices?.[0]; if (!choice) return '❌ No response from model.'; const msg = choice.message; if (msg.tool_calls?.length) { const parts = []; for (const tc of msg.tool_calls) { const fn = tc.function; try { const handler = toolHandlers[fn.name]; if (!handler) { parts.push(`❌ Unknown tool: ${fn.name}`); continue; } const args = JSON.parse(fn.arguments); const result = await handler(args); parts.push(`${result}`); } catch (e) { parts.push(`❌ Tool ${fn.name} error: ${e.message}`); } } return parts.join('\n\n'); } return msg.content || '✅ Done.'; } catch (error) { logger.error('AI error:', error.response?.data || error.message); return `❌ ${error.response?.data?.error?.message || error.message}`; } } /** * Streaming chat completion via SSE. * Pipes each token chunk to onDelta() callback in real-time. * Falls back to non-streaming if SSE fails. */ async function chatWithAIStream(svc, body, tools, toolHandlers, onDelta) { const baseUrl = svc.api?.config?.baseUrl || 'https://api.z.ai/api/coding/paas/v4'; const apiKey = svc.api?.config?.apiKey || ''; let fullResponse = ''; try { const response = await fetch(`${baseUrl}/chat/completions`, { method: 'POST', headers: { 'Authorization': `Bearer ${apiKey}`, 'Content-Type': 'application/json', }, body: JSON.stringify(body), }); if (!response.ok) { const errText = await response.text(); logger.error(`SSE error ${response.status}: ${errText}`); // Fallback to non-streaming return await chatWithAI(body.messages, { ...body, stream: false }); } const reader = response.body.getReader(); const decoder = new TextDecoder(); let buffer = ''; while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const lines = buffer.split('\n'); buffer = lines.pop() || ''; // Keep incomplete line in buffer for (const line of lines) { const trimmed = line.trim(); if (!trimmed || !trimmed.startsWith('data: ')) continue; const data = trimmed.slice(6); if (data === '[DONE]') continue; try { const parsed = JSON.parse(data); const choices = parsed.choices || []; if (choices.length > 0) { const delta = choices[0].delta || {}; const content = delta.content || ''; if (content) { fullResponse += content; onDelta(content); } // Check for tool calls in streaming if (delta.tool_calls) { // Tool calls in streaming mode — accumulate and handle after stream // For now, fall through to non-streaming tool handling } } } catch { // Ignore malformed JSON lines } } } } catch (e) { logger.error('SSE stream error:', e.message); // Fallback to non-streaming if (!fullResponse) { return await chatWithAI(body.messages, { maxTokens: body.max_tokens }); } } return fullResponse || '✅ Done.'; } const toolHandlers = { bash: async (args) => { const tool = svc.toolMap.get('bash'); if (!tool) return '❌ Bash tool unavailable.'; try { const r = await tool.execute(args.command, { timeout: args.timeout || 300000 }); const out = (r.stdout || '').slice(0, 3000); const err = (r.stderr || '').slice(0, 3000); if (r.success) return `✅ \`\`\`\n${out || '(no output)'}\n\`\`\``; return `❌ Exit ${r.code}\n\`\`\`\n${err || out}\n\`\`\``; } catch (e) { return `❌ Bash error: ${e.message}`; } }, web_search: async (args) => { const tool = svc.toolMap.get('web_search'); if (!tool) return '❌ Web search unavailable.'; try { const r = await tool.search(args.query, { numResults: args.num_results || 5 }); if (r?.success && r.results?.length) { return `🔍 *${args.query}*\n\n${r.results.slice(0, 5).map((x, i) => `${i + 1}. [${x.title}](${x.url})\n${x.snippet || ''}`).join('\n\n')}`; } return `🔍 *${args.query}*\n\nNo results.`; } catch (e) { return `❌ Search error: ${e.message}`; } }, git: async (args) => { const tool = svc.toolMap.get('git'); if (!tool) return '❌ Git tool unavailable.'; try { const method = tool[args.action]; if (!method) return `❌ Unknown: ${args.action}`; const r = await method.call(tool, ...(args.params || [])); return r.success ? `✅ ${r.status || JSON.stringify(r)}` : `❌ ${r.stderr || r.error}`; } catch (e) { return `❌ Git error: ${e.message}`; } }, delegate_agent: async (args) => { const agent = svc.agents.find(a => a.id === args.agent_id); if (!agent) return `❌ Agent not found: ${args.agent_id}`; return `✅ Delegated to **${agent.name}**: "${args.task}"`; }, run_skill: async (args) => { const skill = svc.skills.find(s => s.name === args.skill); if (!skill) return `❌ Skill not found: ${args.skill}`; return `✅ Skill **${skill.name}** queued: ${args.input || '(none)'}`; }, }; // ── Create grammy bot ── const bot = new Bot(botToken); bot.api.config.use(autoRetry({ maxRetryAttempts: 5, maxDelaySeconds: 60 })); // Initialize the bot to get bot info try { await bot.init(); logger.info('✓ Bot initialized'); } catch (e) { logger.warn('⚠ Bot init failed (webhook will still work):', e.message); } // Register bot command menu const cmdList = [ { command: 'start', description: '🚀 Show help' }, { command: 'tools', description: '🔧 List available tools' }, { command: 'skills', description: '📚 List loaded skills' }, { command: 'agents', description: '🤖 List agent roles' }, { command: 'model', description: '🤖 Switch AI model' }, { command: 'stats', description: '📊 System & RTK stats' }, { command: 'voice', description: '🎤 Voice I/O info' }, { command: 'mcp', description: '🔌 MCP info' }, { command: 'memory', description: '🧠 Persistent memory stats' }, { command: 'remember', description: '📝 Save to memory' }, { command: 'recall', description: '🔍 Search memory' }, { command: 'cron', description: '⏰ Scheduled tasks' }, { command: 'bash', description: '💻 Execute shell command' }, { command: 'web', description: '🔍 Search the web' }, { command: 'git', description: '🔀 Git operations' }, ]; bot.api.setMyCommands(cmdList).catch(e => logger.warn('⚠ Failed to register commands:', e.message)); // ── Auth middleware (claudegram pattern) ── const allowedUsers = env.TELEGRAM_ALLOWED_USERS ? env.TELEGRAM_ALLOWED_USERS.split(',').map(s => s.trim()) : null; bot.use(async (ctx, next) => { if (!allowedUsers) return next(); // allow all const userId = String(ctx.from?.id || ''); if (!allowedUsers.includes(userId)) { await ctx.reply('⛔ Unauthorized.'); return; } return next(); }); // ── Sequentialize per-chat (claudegram pattern) ── bot.use(sequentialize((ctx) => { const chatId = ctx.chat?.id; if (!chatId) return undefined; const msg = ctx.message ?? ctx.callbackQuery?.message; const threadId = msg?.is_topic_message ? msg.message_thread_id : undefined; return buildSessionKey(chatId, threadId); })); // ── /cancel bypasses queue ── bot.command('cancel', async (ctx) => { const key = buildSessionKey(ctx.chat.id, ctx.message?.message_thread_id); clearQueue(key); await ctx.reply('⏹️ Cancelled and queue cleared.'); }); // ── COMMAND HANDLERS ── bot.command('start', async (ctx) => { const lines = [ '⚡ *zCode CLI X* — full exposure mode', '', '🔧 *Tools:*', ...svc.tools.map(t => ` /${t.name} — ${t.description}`), '', '📚 *Skills:* ' + svc.skills.length + ' loaded', '🤖 *Agents:* ' + svc.agents.length + ' available', '', '🔄 *Self-Correction*: 2 retries + auto-simplification', '⚡ *Streaming*: Real-time text delivery', '', '📋 *Commands:* /tools /skills /agents /model /stats /voice /mcp /memory /cron /cancel /selfcorrection', '', 'Or just chat — I will use tools when needed.', `Model: \`${svc.config?.api?.models?.default || 'glm-5.1'}\``, ]; await sendStreamingMessage(ctx, lines.join('\n')); }); bot.command('tools', async (ctx) => { const lines = ['🔧 *Tools:*\n']; for (const t of svc.tools) lines.push(`• \`${t.name}\` — ${t.description}`); await sendStreamingMessage(ctx, lines.join('\n')); }); bot.command('skills', async (ctx) => { if (!svc.skills.length) return ctx.reply('📚 No skills loaded.'); const groups = {}; for (const s of svc.skills) { (groups[s.category] ||= []).push(s); } const lines = ['📚 *Skills:*\n']; for (const [cat, items] of Object.entries(groups)) { lines.push(`*${cat}:*`); for (const s of items) lines.push(` • \`${s.name}\` — ${s.description}`); lines.push(''); } await sendStreamingMessage(ctx, lines.join('\n')); }); bot.command('agents', async (ctx) => { const lines = ['🤖 *Agent Roles:*\n']; for (const a of svc.agents) { lines.push(`• *${a.name}* (\`${a.id}\`)`); lines.push(` ${a.description}`); } await sendStreamingMessage(ctx, lines.join('\n')); }); bot.command('selfcorrection', async (ctx) => { await sendStreamingMessage(ctx, `🔄 *Self-Correction Loops* — FULLY ENABLED\n\nzCode CLI X now uses automatic self-correction:\n\n• **Max Retries**: 2 attempts\n• **Retry Delay**: 500ms → 1s → 1.5s (exponential backoff)\n• **Triggers**: \n - ❌ Error responses\n - Rate limits\n - Timeouts\n - 5xx server errors\n\n• **Auto-Simplification**: On retry, prompts are simplified to avoid recurring errors\n\n• **Logging**: All retries are logged with retry count and reason\n\nThis ensures robust responses even when the AI initially fails.`); }); bot.command('model', async (ctx) => { const text = ctx.match?.trim(); if (!text) { return ctx.reply(`Current: \`${svc.config?.api?.models?.default || 'glm-5.1'}\`\nUsage: /model `); } svc.config.api.models = svc.config.api.models || {}; svc.config.api.models.default = text; await ctx.reply(`✅ Switched to \`${text}\``, { parse_mode: 'Markdown' }); }); bot.command('stats', async (ctx) => { const rtkStats = svc.rtk.enabled ? svc.rtk.getTrackingStats() : null; const lines = [ '📊 *Stats:*\n', `Tools: ${svc.tools.length}`, `Skills: ${svc.skills.length}`, `Agents: ${svc.agents.length}`, `Model: \`${svc.config?.api?.models?.default || 'glm-5.1'}\``, `RTK: ${svc.rtk.enabled ? '✅' : '❌'}`, ]; if (rtkStats) { lines.push(`Optimized: ${rtkStats.totalOptimized || 0}`); lines.push(`Saved: ${rtkStats.totalTokensSaved || 0} tok`); } await sendStreamingMessage(ctx, lines.join('\n')); }); bot.command('voice', async (ctx) => { await sendStreamingMessage(ctx, `🎤 *Voice I/O*\n\nVoice recording is available via the TS service layer.\nSend me a voice message and I will transcribe it.`); }); bot.command('mcp', async (ctx) => { await ctx.reply('🔌 *MCP:* Available via TS services at `src/services/mcp/`. Connect MCP servers for extended capabilities.'); }); bot.command('memory', async (ctx) => { const stats = memory.getStats(); const recent = memory.recall({ limit: 5 }); const lines = [ `🧠 *Persistent Memory* — ${stats.total} memories stored`, '', `📅 From ${stats.oldest || 'today'} to ${stats.newest || 'now'}`, '', '*Categories:*', ]; for (const [cat, count] of Object.entries(stats.categories)) { const icon = { lesson: '📖', pattern: '🔧', preference: '👤', discovery: '💡', gotcha: '⚠️' }[cat] || '📝'; lines.push(` ${icon} ${cat}: ${count}`); } if (recent.length > 0) { lines.push('', '*Recent:*'); for (const m of recent.slice(0, 5)) { lines.push(` • [${m.category}] ${m.content.substring(0, 60)}`); } } lines.push('', '_Use /remember to save, /recall to search, /forget to delete_'); await sendStreamingMessage(ctx, lines.join('\n')); }); bot.command('remember', async (ctx) => { const text = ctx.match?.trim(); if (!text) return ctx.reply('Usage: /remember \n\nCategories: lesson, pattern, preference, discovery, gotcha\n\nExamples:\n/remember User prefers TypeScript over JavaScript\n/remember gotcha: Z.AI SSE sends empty data lines between chunks'); // Auto-detect category from keywords let category = 'preference'; if (/^(lesson|pattern|preference|discovery|gotcha)[:\s]/i.test(text)) { const match = text.match(/^(lesson|pattern|preference|discovery|gotcha)[:\s]\s*(.*)/i); if (match) { category = match[1].toLowerCase(); await memory.remember(category, match[2]); } } else { await memory.remember(category, text); } await ctx.reply(`🧠 Saved to memory [${category}]. I will remember this across sessions.`); }); bot.command('recall', async (ctx) => { const query = ctx.match?.trim(); if (!query) return ctx.reply('Usage: /recall \n\nExample: /recall ZAI API timeout'); const results = memory.recall({ query, limit: 10 }); if (results.length === 0) return ctx.reply('🔍 No memories match that query.'); const lines = [`🔍 *Recall* (${results.length} results):`, '']; for (const m of results) { const date = new Date(m.created).toLocaleDateString(); lines.push(`[${m.category}] (${date}) ${m.content.substring(0, 80)}`); if (m.meta?.resolution) lines.push(` → ${m.meta.resolution.substring(0, 60)}`); } await sendStreamingMessage(ctx, lines.join('\n')); }); bot.command('forget', async (ctx) => { const id = ctx.match?.trim(); if (!id) return ctx.reply('Usage: /forget \n\nFind IDs with /recall or /memory'); const ok = await memory.forget(id); await ctx.reply(ok ? '🗑 Memory deleted.' : '❌ Memory not found.'); }); bot.command('cron', async (ctx) => { await ctx.reply('⏰ *Cron:* CronScheduler at `src/utils/cronScheduler.ts` — 1s interval, task locking, auto-recovery.'); }); // ── Dynamic tool commands ── for (const tool of svc.tools) { const tname = tool.name; if (bot.command(tname)) continue; // skip if registered bot.command(tname, async (ctx) => { const args = ctx.match?.trim(); const handler = toolHandlers[tname]; if (!handler) return ctx.reply(`❌ No handler for ${tname}`); if (!args) return ctx.reply(`Usage: /${tname} \n${tool.description}`); const result = await handler(tname === 'web_search' ? { query: args, num_results: 5 } : tname === 'bash' ? { command: args, timeout: 300000 } : tname === 'git' ? (() => { const p = args.split(/\s+/); return { action: p[0], params: p.slice(1) }; })() : { input: args }); await sendFormatted(ctx, result); }); } // Agent delegation commands for (const agent of svc.agents) { const cmdName = `agent_${agent.id}`; bot.command(cmdName, async (ctx) => { const task = ctx.match?.trim(); if (!task) return ctx.reply(`🤖 *${agent.name}*\n${agent.description}\n\nUsage: /${cmdName} `); const result = await chatWithAI([ { role: 'system', content: `You are the **${agent.name}** (${agent.id}). Capabilities: ${agent.capabilities.join(', ')}. Respond as this specialist within zCode CLI X.` }, { role: 'user', content: task }, ]); await sendFormatted(ctx, result); }); } // ── Message text handler (with dedup + queue + self-correction) ── bot.on('message:text', async (ctx) => { if (isDuplicate(ctx.message.message_id)) return; markProcessed(ctx.message.message_id); const key = buildSessionKey(ctx.chat.id, ctx.message?.message_thread_id); const text = ctx.message.text; const user = ctx.from?.username || ctx.from?.first_name || 'Unknown'; logger.info(`💬 ${user}: ${text.substring(0, 80)}…`); await queueRequest(key, text, async () => { await ctx.api.sendChatAction(ctx.chat.id, 'typing'); // Create stream consumer for real-time edit-in-place const consumer = new StreamConsumer(ctx, { editInterval: 1000 }); const runPromise = consumer.run(); // Wrap chatWithAI with self-correction + streaming const chatWithCorrection = withSelfCorrection(async (msgs) => { return await chatWithAI(msgs, { onDelta: (token) => consumer.onDelta(token) }); }); const result = await chatWithCorrection([ { role: 'system', content: buildSystemPrompt(svc) }, { role: 'user', content: text }, ]); // Signal completion and wait for final edit consumer.finish(); await runPromise; // If streaming failed to deliver (no message sent), fallback to plain send if (!consumer.alreadySent && result) { logger.info(`📤 Streaming failed, fallback send (${result.length} chars)`); await sendFormatted(ctx, result); } else { logger.info('✅ Stream delivered'); } // ── Self-learning: extract patterns from this interaction ── await selfLearn(text, result, memory); }); }); // ── Voice handler ── bot.on('message:voice', async (ctx) => { const fileId = ctx.message.voice.file_id; const user = ctx.from?.username || ctx.from?.first_name || 'Unknown'; logger.info(`🎤 Voice from ${user}`); await ctx.reply('🎤 Voice received! (STT via Whisper TBD)'); const file = await ctx.api.getFile(fileId); const url = `https://api.telegram.org/file/bot${botToken}/${file.file_path}`; logger.info(`Voice file: ${url}`); }); // ── Photo handler ── bot.on('message:photo', async (ctx) => { logger.info(`📸 Photo from ${ctx.from?.username}`); // TODO: vision analysis await ctx.reply('📸 Image received. Vision analysis TBD.'); }); // ── Error handler ── bot.catch((err) => { logger.error('Bot error:', err.message || err); }); // ── Global unhandled rejection guard ── process.on('unhandledRejection', (reason, promise) => { logger.error('Unhandled rejection:', reason?.message || reason); }); // ── Express + WebSocket server (keep for webhook compatibility) ── const app = express(); app.use(express.json()); const httpServer = createServer(app); const wss = new WebSocketServer({ server: httpServer }); const wsClients = new Map(); wss.on('connection', (ws, req) => { const chatId = req.url?.startsWith('/?chatId=') ? req.url.slice(8) : 'ws'; wsClients.set(chatId, ws); ws.on('close', () => wsClients.delete(chatId)); logger.info(`🔌 WS: ${chatId}`); }); // Webhook handler — feeds into grammy app.get('/telegram/webhook', (req, res) => { const { 'hub.mode': mode, 'hub.challenge': challenge } = req.query; if (mode === 'subscribe' && challenge) return res.status(200).send(challenge); res.status(200).json({ ok: true, message: 'zCode webhook active' }); }); app.post('/telegram/webhook', async (req, res) => { res.json({ ok: true }); // ack immediately try { logger.info('📥 Webhook received:', { update_id: req.body?.update_id, has_message: !!req.body?.message, type: req.body?.message?.text ? 'text' : 'other' }); await bot.handleUpdate(req.body); } catch (e) { logger.error('Webhook update error:', { message: e.message, name: e.name, stack: e.stack, body: req.body?.update_id, full: e.toString(), body_preview: req.body ? JSON.stringify(req.body).substring(0, 200) : 'no body' }); } }); // Health check app.get('/health', (req, res) => { res.json({ ok: true, uptime: process.uptime(), tools: svc.tools.length, skills: svc.skills.length, agents: svc.agents.length, wsClients: wsClients.size, }); }); const PORT = process.env.ZCODE_PORT || 3000; httpServer.listen(PORT, () => { logger.info(`✓ HTTP on :${PORT} · WS ready · grammy bot online`); logger.info(`✓ ${svc.tools.length} tools · ${svc.skills.length} skills · ${svc.agents.length} agents`); }); // Set webhook const wu = process.env.ZCODE_WEBHOOK_URL; if (wu) { try { await bot.api.setWebhook(wu, { allowed_updates: ['message', 'callback_query'] }); logger.info('✓ Webhook set'); } catch (e) { logger.warn('⚠ Webhook failed:', e.message); } } return { send: (chatId, text) => bot.api.sendMessage(chatId, markdownToHtml(text), { parse_mode: 'HTML' }), ws: (chatId, msg) => wsClients.get(chatId)?.send(JSON.stringify(msg)), waitForMessages: async () => { await new Promise(() => {}); }, getConnections: () => wsClients.size, }; }