import { Bot } from 'grammy'; import { autoRetry } from '@grammyjs/auto-retry'; // import { sequentialize } from '@grammyjs/runner'; // Temporarily disabled for systemd compatibility import express from 'express'; import { createServer } from 'http'; import { WebSocketServer } from 'ws'; import fs from 'fs'; import path from 'path'; import { execSync } from 'child_process'; import { logger } from '../utils/logger.js'; import { checkEnv } from '../utils/env.js'; import { getRTK } from '../utils/rtk.js'; import { isDuplicate, markProcessed } from './deduplication.js'; import { queueRequest, clearQueue, isProcessing } from './request-queue.js'; import { sendFormatted, splitMessage, escapeMarkdown, sendStreamingMessage, StreamConsumer, markdownToHtml } from './message-sender.js'; import { withSelfCorrection } from './self-correction.js'; import { getMemory, getConversation } from './memory.js'; import { createSessionState } from './session-state.js'; import { detectIntent } from './intent-detector.js'; import { streamChatWithRetry } from './stream-handler.js'; // ── Ruflo-inspired systems: plugins, hooks, swarm, enhanced memory ── import { PluginManager } from '../plugins/PluginManager.js'; import { PluginLoader } from '../plugins/PluginLoader.js'; import { BasePlugin } from '../plugins/Plugin.js'; import { EXTENSION_POINTS } from '../plugins/ExtensionPoints.js'; import { hookManager, HOOK_TYPES } from './hooks.js'; import { initAgents, AgentOrchestrator } from '../agents/index.js'; import { Agent } from '../agents/Agent.js'; import { Task } from '../agents/Task.js'; import { SwarmCoordinator } from '../agents/SwarmCoordinator.js'; import { JSONBackend, InMemoryBackend, MEMORY_TYPES } from './memory-backend.js'; import { PortManager } from './port-manager.js'; // ── PortManager handles pidfile + port lifecycle (see port-manager.js) ── function buildSessionKey(chatId, threadId) { return threadId ? `${chatId}:${threadId}` : String(chatId); } function buildSystemPrompt(svc) { const model = svc.config?.api?.models?.default || 'glm-5.1'; const memory = svc.memory; const memoryContext = memory ? memory.buildContextSummary() : ''; const cwd = process.cwd(); const toolList = svc.tools.map(t => `${t.name}`).join(', '); const agentList = svc.agents.map(a => `${a.id} (${a.name})`).join(', '); const skillList = svc.skills.map(s => `${s.name} (${s.category})`).join(', '); const lines = [ `You are zCode CLI X — an elite agentic coding assistant powered by Z.AI (${model}).`, '', '## CORE RULES (FOLLOW STRICTLY)', '', '1. **Read your context first.** Your tools, agents, skills, and project info are listed below.', ' NEVER use tools to re-discover information already in this prompt. This wastes turns and time.', '2. **Use the RIGHT tool.** AVOID using bash with these commands (OpenCode rule):', ' - File search: Use `glob` (NOT find or ls)', ' - Content search: Use `grep` (NOT grep/rg)', ' - Read files: Use `file_read` (NOT cat/head/tail)', ' - Edit files: Use `file_edit` (NOT sed/awk)', ' - Write files: Use `file_write` (NOT echo/cat heredoc)', ' - Fetch URLs: Use `browser` or `web_fetch` (NOT curl/wget)', ' Use bash ONLY for: tests, installs, git, systemctl, and commands no tool covers.', ' Violating this rule wastes turns and bypasses caching.', '3. **BATCH independent calls.** ALWAYS emit multiple independent tool calls in a single turn.', ' Reading 3 files = 3 parallel calls in 1 turn, NOT 3 sequential turns.', ' Searching 2 patterns + reading 1 file = 3 calls in 1 turn. This is the #1 speed optimization.', ' NEVER serialize independent operations — the runtime executes them concurrently via Promise.all.', '4. **No ghost chasing.** If a command fails (wrong path, file not found), do NOT retry the', ' same command. Use `glob` or `ls` to find the correct path, then proceed.', '5. **Plan before acting.** For tasks requiring 3+ tool calls, think through the optimal sequence', ' first. Minimize total turns. Each turn costs an API round-trip.', '', '## IDENTITY', 'You run 24/7 as a Telegram bot. You are NOT a generic GLM model — you are zCode CLI X,', 'a specialized autonomous coding agent. Identify ONLY as zCode CLI X.', '', '## CONTEXT AWARENESS (CRITICAL)', 'When the user\'s message starts with [Replying to previous message:], the quoted text is', 'the message they are responding to. You MUST use that context to understand what they want.', 'Example: if they reply to a page design with "make hero more exciting", they mean THAT page\'s hero.', 'Never ask "which page?" or "what are you referring to?" — the reply context tells you.', '', '## SELF-LEARNING', 'You have PERSISTENT MEMORY across sessions. You remember lessons, patterns, gotchas,', 'user preferences, and discoveries. When you learn something useful, save it to memory.', 'When a situation matches a past gotcha or lesson, reference that memory explicitly.', '', '## YOUR CAPABILITIES (already loaded — do NOT re-discover via tools)', `**Tools (${svc.tools.length}):** ${toolList}`, `**Agent Roles (${svc.agents.length}):** ${agentList}`, `**Skills (${svc.skills.length}):** ${skillList}`, '', '## INFRASTRUCTURE', `- Working directory: ${cwd}`, `- Source repo: https://github.rommark.dev/admin/zCode-CLI-X.git (Gitea)`, `- Running on VPS as systemd service: zcode.service`, `- Deploy: git push → pull on VPS → npm install → systemctl restart zcode`, `- RTK (Rust Token Killer) active`, `- Telegram webhook + WebSocket`, `- Persistent memory (${memory ? memory.memories.length : 0} memories stored)`, ]; if (memoryContext) { lines.push('', '## RELEVANT MEMORIES', memoryContext); } return lines.join('\n'); } // ─────────────────────────────────────────── // SELF-LEARNING & CURIOSITY ENGINE // ─────────────────────────────────────────── /** * Analyze every interaction for self-learnable patterns. * Runs AFTER the response is delivered — zero latency impact. * * Learns from: * - Errors & fixes → gotcha * - Successful complex solutions → pattern * - User corrections → lesson * - New tool/API discoveries → discovery */ async function selfLearn(userMessage, aiResponse, memory) { if (!aiResponse || aiResponse.length < 50) return; const msg = userMessage.toLowerCase(); const resp = aiResponse.toLowerCase(); // 1. Detect error patterns → gotcha if (resp.includes('error') && resp.includes('fix')) { const errorMatch = aiResponse.match(/error[:\s]+([^\n]{10,120})/i); const fixMatch = aiResponse.match(/fix[:\s]+([^\n]{10,120})/i); if (errorMatch && fixMatch) { await memory.remember('gotcha', `${errorMatch[1].trim()} → ${fixMatch[1].trim()}`, { trigger: userMessage.substring(0, 100), }); logger.info('🧠 Self-learned gotcha from interaction'); return; } } // 2. Detect user corrections → lesson if (msg.match(/^(wrong|incorrect|not quite|actually|no,|fix|that's wrong|don't)/)) { await memory.remember('lesson', `Correction on "${userMessage.substring(0, 80)}": ${aiResponse.substring(0, 150)}`, { trigger: userMessage.substring(0, 100), }); logger.info('🧠 Self-learned lesson from correction'); return; } // 3. Detect successful complex solution → pattern if (resp.includes('✅') && aiResponse.length > 300) { // Check if this was a non-trivial interaction if (msg.length > 20 && !msg.match(/^(hi|hello|hey|thanks|ok)/)) { const existing = memory.recall({ category: 'pattern', query: msg.substring(0, 30), limit: 1 }); if (existing.length === 0) { await memory.remember('pattern', `Solution for "${userMessage.substring(0, 60)}": produced ${aiResponse.length} chars with tool usage`, { trigger: userMessage.substring(0, 100), }); logger.info('🧠 Self-learned pattern from successful interaction'); } } return; } // 4. Curiosity: detect new discoveries // - First-time tool usage if (resp.includes('bash') && resp.includes('success') && !memory.recall({ query: 'bash tool', limit: 1 }).length) { await memory.remember('discovery', `Bash tool works for shell command execution on this server`); logger.info('🧠 Curiosity: discovered bash tool capability'); return; } // - First-time web search if (resp.includes('search') && resp.includes('result') && !memory.recall({ query: 'web search', limit: 1 }).length) { await memory.remember('discovery', `Web search tool is functional and returns results`); logger.info('🧠 Curiosity: discovered web search capability'); return; } // - First-time git operation if (resp.includes('git') && resp.includes('commit') && !memory.recall({ query: 'git operation', limit: 1 }).length) { await memory.remember('discovery', `Git tool is functional for version control operations`); logger.info('🧠 Curiosity: discovered git capability'); } } // ─────────────────────────────────────────── // MAIN — called from zcode.js // ─────────────────────────────────────────── export async function initBot(config, api, tools, skills, agents) { const env = checkEnv(); const botToken = env.TELEGRAM_BOT_TOKEN; if (!botToken) { logger.warn('⚠ Telegram bot token not configured'); return null; } logger.info('🤖 Initializing zCode bot (grammy + claudegram patterns)…'); const rtk = getRTK(); await rtk.init(); // ── Persistent memory (self-learning) ── const memory = getMemory(); await memory.init(); // ── Conversation history (cross-session, cross-model) ── const conversation = getConversation(); await conversation.init(); // ── Session state: LRU file read cache + read-once dedup ── const sessionState = createSessionState(); // ── Service registry ── const svc = { config, api, tools: tools || [], skills: skills || [], agents: agents || [], rtk, memory, conversation, sessionState, toolMap: new Map((tools || []).map(t => [t.name, t])), }; // ── Ruflo-inspired Plugin System ── const pluginManager = new PluginManager({ coreVersion: '3.0.0' }); const pluginLoader = new PluginLoader(pluginManager); await pluginManager.initialize(); svc.pluginManager = pluginManager; svc.pluginLoader = pluginLoader; // ── Ruflo-inspired Hook System ── await hookManager.initialize?.(); svc.hooks = hookManager; // ── Ruflo-inspired Swarm Coordinator ── const swarm = new SwarmCoordinator({ topology: 'simple', maxAgents: 10 }); await swarm.initialize(); svc.swarm = swarm; // ── Enhanced Memory Backend (JSON-based with typed entries + search) ── const memBackend = new JSONBackend(path.join(process.cwd(), 'data', 'memory.json'), 500); await memBackend.initialize(); svc.memBackend = memBackend; // ── Ephemeral Agent Context (RAM-only, auto-evict) ── const ephemeralMem = new InMemoryBackend(200, 30 * 60 * 1000); svc.ephemeralMem = ephemeralMem; // ── Agent Orchestrator (replaces simple agent map) ── const agentOrchestrator = new AgentOrchestrator(agents || [], { topology: 'simple', maxAgents: 10 }); svc.agentOrchestrator = agentOrchestrator; // ── Register default plugin hooks ── // Pre-tool hook: log tool execution, check permissions hookManager.register(HOOK_TYPES.PRE_TOOL, 'pre-tool-logger', async (ctx) => { logger.info(`🔧 Hook: pre-tool ${ctx.toolName}`); return true; }, { priority: 10 }); // Post-tool hook: cache results, update metrics hookManager.register(HOOK_TYPES.POST_TOOL, 'post-tool-cache', async (ctx) => { if (ctx.toolName === 'file_read' && ctx.result) { sessionState.cacheRead(ctx.toolName, ctx.result); } return true; }, { priority: 5 }); // Pre-AI hook: check memory context hookManager.register(HOOK_TYPES.PRE_AI, 'pre-ai-memory', async (ctx) => { // Could inject memory context into AI prompt here return true; }, { priority: 5 }); // Post-AI hook: self-learning trigger hookManager.register(HOOK_TYPES.POST_AI, 'post-ai-selflearn', async (ctx) => { if (ctx.response && ctx.userMessage) { selfLearn(ctx.userMessage, ctx.response, memory); } return true; }, { priority: 1 }); // ── Tool definitions for the AI API (OpenAI function-calling format) ── // Defined at startBot scope so delegate handler can access them const TOOL_DEFS = { bash: { description: 'Execute a shell command. LAST RESORT — only use when no specialized tool fits (e.g. running tests, npm install, systemctl). For file reading use file_read, for finding files use glob, for searching code use grep, for editing files use file_edit.', parameters: { type: 'object', properties: { command: { type: 'string', description: 'Shell command to execute' }, timeout: { type: 'number', description: 'Timeout in ms (default 300000)' }, }, required: ['command'] }, }, file_edit: { description: 'Edit files — read, write, append, or find-and-replace', parameters: { type: 'object', properties: { action: { type: 'string', enum: ['read', 'write', 'append', 'edit'], description: 'Operation' }, file_path: { type: 'string', description: 'File path' }, content: { type: 'string', description: 'Content to write/append (for write/append)' }, old_text: { type: 'string', description: 'Text to find (for edit)' }, new_text: { type: 'string', description: 'Replacement text (for edit)' }, }, required: ['action', 'file_path'] }, }, file_read: { description: 'Read file contents with line numbers and pagination', parameters: { type: 'object', properties: { file_path: { type: 'string', description: 'File path to read' }, offset: { type: 'number', description: 'Start line (1-indexed, default 1)' }, limit: { type: 'number', description: 'Max lines (default 500)' }, }, required: ['file_path'] }, }, file_write: { description: 'Write content to a file (overwrites entire file)', parameters: { type: 'object', properties: { file_path: { type: 'string', description: 'File path' }, content: { type: 'string', description: 'Content to write' }, }, required: ['file_path', 'content'] }, }, glob: { description: 'Find files matching a glob pattern', parameters: { type: 'object', properties: { pattern: { type: 'string', description: 'Glob pattern (e.g. "**/*.js")' }, cwd: { type: 'string', description: 'Working directory (default: current)' }, }, required: ['pattern'] }, }, grep: { description: 'Search file contents using regex (ripgrep)', parameters: { type: 'object', properties: { pattern: { type: 'string', description: 'Regex pattern' }, path: { type: 'string', description: 'Directory to search (default: .)' }, file_glob: { type: 'string', description: 'File filter (e.g. "*.py")' }, max_results: { type: 'number', description: 'Max matches (default 20)' }, context: { type: 'number', description: 'Context lines before/after (default 0)' }, }, required: ['pattern'] }, }, web_search: { description: 'Search the web for information', parameters: { type: 'object', properties: { query: { type: 'string', description: 'Search query' }, num_results: { type: 'number', description: 'Results count (default 5)' }, }, required: ['query'] }, }, web_fetch: { description: 'Fetch content from a URL and return text', parameters: { type: 'object', properties: { url: { type: 'string', description: 'URL to fetch' }, max_length: { type: 'number', description: 'Max chars to return (default 15000)' }, }, required: ['url'] }, }, git: { description: 'Git operations: status, log, diff, commit, push, pull', parameters: { type: 'object', properties: { action: { type: 'string', enum: ['status', 'log', 'diff', 'commit', 'push', 'pull'] }, params: { type: 'array', items: { type: 'string' } }, }, required: ['action'] }, }, task_create: { description: 'Create a new task', parameters: { type: 'object', properties: { description: { type: 'string', description: 'Task description' }, }, required: ['description'] }, }, task_update: { description: 'Update task status', parameters: { type: 'object', properties: { task_id: { type: 'string', description: 'Task ID' }, status: { type: 'string', enum: ['pending', 'in_progress', 'completed', 'cancelled'] }, }, required: ['task_id', 'status'] }, }, task_list: { description: 'List all tasks with status', parameters: { type: 'object', properties: { status: { type: 'string', description: 'Filter by status (optional)' }, } }, }, send_message: { description: 'Send a message to Telegram chat or channel', parameters: { type: 'object', properties: { chat_id: { type: 'string', description: 'Target chat ID (optional, uses default)' }, message: { type: 'string', description: 'Message text to send' }, }, required: ['message'] }, }, schedule_cron: { description: 'Manage cron jobs (create/list/remove)', parameters: { type: 'object', properties: { action: { type: 'string', enum: ['create', 'list', 'remove'] }, name: { type: 'string', description: 'Job name' }, schedule: { type: 'string', description: 'Cron schedule (e.g. "0 9 * * *")' }, command: { type: 'string', description: 'Command to run' }, }, required: ['action'] }, }, vision: { description: 'Analyze an image from URL or file path. Returns detailed description and answers questions about the image.', parameters: { type: 'object', properties: { image_url: { type: 'string', description: 'Image URL (http/https) or local file path to analyze' }, question: { type: 'string', description: 'Specific question about the image (optional, defaults to full description)' }, }, required: ['image_url'] }, }, tts: { description: 'Convert text to speech audio. Generates an MP3 file using Edge TTS (free, no API key needed).', parameters: { type: 'object', properties: { text: { type: 'string', description: 'Text to convert to speech (max 5000 chars)' }, voice: { type: 'string', description: 'Voice name (default: en-US-AvaNeural)' }, output_path: { type: 'string', description: 'Output file path (optional)' }, }, required: ['text'] }, }, browser: { description: 'Fetch and extract readable content from a web page URL. Returns title, description, and main text content.', parameters: { type: 'object', properties: { url: { type: 'string', description: 'URL to fetch and extract content from' }, selector: { type: 'string', description: 'CSS selector for content extraction (optional, auto-detects article/main)' }, }, required: ['url'] }, }, delegate: { description: 'Spawn a sub-agent to autonomously complete a complex multi-step task. The sub-agent runs in isolation with its own conversation history and has access to tools. It will use tools, reason through problems, and return a final answer. Use for tasks that require multiple tool calls in sequence.', parameters: { type: 'object', properties: { goal: { type: 'string', description: 'The task for the sub-agent to accomplish' }, context: { type: 'string', description: 'Additional context or background information' }, tools: { type: 'array', items: { type: 'string' }, description: 'Specific tools to enable (optional, defaults to all available tools)' }, role: { type: 'string', description: 'Role description for the sub-agent (optional)' }, }, required: ['goal'] }, }, delegate_agent: { description: 'Delegate to a specialized agent role', parameters: { type: 'object', properties: { agent_id: { type: 'string', enum: svc.agents.map(a => a.id) }, task: { type: 'string', description: 'Task description' }, }, required: ['agent_id', 'task'] }, }, run_skill: { description: 'Run a skill by name', parameters: { type: 'object', properties: { skill: { type: 'string', enum: svc.skills.map(s => s.name) }, input: { type: 'string' }, }, required: ['skill'] }, }, swarm_spawn: { description: 'Spawn a new agent in the swarm for parallel task execution', parameters: { type: 'object', properties: { type: { type: 'string', enum: ['coder', 'reviewer', 'tester', 'architect', 'devops', 'security', 'researcher', 'designer', 'coordinator'], description: 'Agent type' }, name: { type: 'string', description: 'Agent name' }, capabilities: { type: 'array', items: { type: 'string' }, description: 'Agent capabilities' }, }, required: ['type'] }, }, swarm_execute: { description: 'Execute a task with a specific swarm agent', parameters: { type: 'object', properties: { agent_id: { type: 'string', description: 'Agent ID' }, description: { type: 'string', description: 'Task description' }, type: { type: 'string', description: 'Task type' }, priority: { type: 'string', enum: ['high', 'medium', 'low'], description: 'Task priority' }, dependencies: { type: 'array', items: { type: 'string' }, description: 'Task dependencies' }, }, required: ['agent_id', 'description'] }, }, swarm_distribute: { description: 'Distribute multiple tasks across swarm agents', parameters: { type: 'object', properties: { tasks: { type: 'array', items: { type: 'object' }, description: 'Array of {agent_id, description, type, priority, dependencies}' }, }, required: ['tasks'] }, }, swarm_state: { description: 'Get current swarm state and metrics', parameters: { type: 'object', properties: {} }, }, swarm_terminate: { description: 'Terminate a swarm agent', parameters: { type: 'object', properties: { agent_id: { type: 'string', description: 'Agent ID to terminate' }, }, required: ['agent_id'] }, }, }; // ── AI chat with agentic tool loop ── // Enterprise-grade: high turn limit, stuck detection, progress feedback, // context compaction, auto-continue, and robust error recovery. // Inspired by Claude Code, Aider, and OpenCode patterns. const MAX_TOOL_TURNS = 50; const TOOL_RESULT_MAX = 16000; // chars — enough for large outputs const STUCK_THRESHOLD = 3; // same tool+args pattern = stuck const COMPACT_EVERY = 15; // compact context every N turns const CONTEXT_WINDOW = 120000; // estimated char budget async function chatWithAI(messages, opts = {}) { const model = opts.model || svc.config?.api?.models?.default || 'glm-5.1'; const toolMap = svc.toolMap; const onDelta = opts.onDelta || null; // Build tool schemas (only once, reused across turns) const toolSchemas = []; for (const [name, def] of Object.entries(TOOL_DEFS)) { if (name === 'delegate_agent' && !svc.agents.length) continue; if (name === 'run_skill' && !svc.skills.length) continue; if (!toolMap.has(name) && name !== 'delegate_agent' && name !== 'run_skill' && name !== 'delegate') continue; toolSchemas.push({ type: 'function', function: { name, ...def } }); } // Working copy of messages — tool results get appended here const loopMessages = [...messages]; let turns = 0; const callHistory = []; // for stuck detection: [{name, args_sig}] let lastProgressSent = 0; // Progress ticker — sends user-facing status during long tool loops const sendProgress = (msg) => { const now = Date.now(); if (now - lastProgressSent < 8000) return; // max 1 progress msg per 8s lastProgressSent = now; if (onDelta) onDelta(`\n_${msg}_\n`); }; // Stuck detection: track tool call patterns const callSig = (tc) => { const fn = tc.function; const args = fn.arguments || ''; // Hash: tool name + first 80 chars of args (enough to detect repeated patterns) return `${fn.name}:${args.slice(0, 80)}`; }; const isStuck = () => { if (callHistory.length < STUCK_THRESHOLD) return false; const recent = callHistory.slice(-STUCK_THRESHOLD); // Flexible: detect stuck even if arguments vary slightly // Extract tool name from signature (everything before first colon) const toolNames = recent.map(s => s.split(':')[0]); const uniqueToolNames = [...new Set(toolNames)]; // If all calls use the same tool, check if they differ by arguments if (uniqueToolNames.length === 1) { // Same tool, different arguments → still stuck return true; } // Different tools → not stuck return false; }; // Context compaction: trim old tool results to keep context manageable const compactContext = () => { let totalChars = 0; for (const m of loopMessages) { totalChars += JSON.stringify(m).length; } if (totalChars < CONTEXT_WINDOW) return false; logger.info(`📦 Context at ${Math.round(totalChars / 1000)}K chars — compacting old tool results`); let trimmed = 0; for (const m of loopMessages) { if (m.role === 'tool' && typeof m.content === 'string' && m.content.length > 2000) { const original = m.content; m.content = m.content.slice(0, 500) + `\n... [trimmed ${original.length - 500} chars]`; trimmed += original.length - m.content.length; } } logger.info(`📦 Compacted ${Math.round(trimmed / 1000)}K chars`); return trimmed > 0; }; while (turns < MAX_TOOL_TURNS) { // Context compaction every N turns if (turns > 0 && turns % COMPACT_EVERY === 0) compactContext(); const body = { model, messages: loopMessages, temperature: opts.temperature ?? 0.7, max_tokens: opts.maxTokens || 8192, parallel_tool_calls: true, }; if (toolSchemas.length) body.tools = toolSchemas; let response; // { content: string, tool_calls: array|null } if (onDelta) { response = await streamChatWithRetry(svc, body, onDelta); } else { response = await nonStreamChat(body); } if (response.error) { if (turns === 0) return `❌ ${response.error}`; logger.error(`AI error on turn ${turns}: ${response.error}`); // Don't give up — retry once more if (turns < MAX_TOOL_TURNS - 1) { loopMessages.push({ role: 'user', content: `Previous call failed: ${response.error}. Try a different approach.` }); continue; } return response.content || `❌ ${response.error}`; } // No tool calls → final text answer if (!response.tool_calls?.length) { return response.content || '✅ Done.'; } // ── Stuck detection: track ALL tool calls (including failed ones) ── // Failed tool calls don't appear in response.tool_calls, so we track them separately const currentSigs = response.tool_calls.map(callSig); for (const sig of currentSigs) callHistory.push(sig); if (isStuck()) { logger.warn(`⚠ Stuck detected — same tool call pattern ${STUCK_THRESHOLD}x`); loopMessages.push({ role: 'user', content: 'You are repeating the same action and getting the same result. Try a completely different approach.' }); callHistory.length = 0; // reset history after intervention continue; } // ── Execute tool calls ── // IMPORTANT: Increment turns for failed tool calls too (not just successful ones) // This ensures stuck detection works even when tools fail repeatedly turns++; logger.info(`🔧 Tool turn ${turns}/${MAX_TOOL_TURNS} — ${response.tool_calls.length} call(s)`); sendProgress(`⚙️ Step ${turns} — executing ${response.tool_calls.length} tool(s)...`); // Append assistant message with tool_calls to conversation loopMessages.push({ role: 'assistant', tool_calls: response.tool_calls }); // ── Execute tool calls (PARALLEL for independent calls) ── // Inspired by Claude Code, Cursor, and OpenHands: run independent tool calls // concurrently to minimize per-turn latency. // ── Execute tool calls (PARALLEL + Hermes guardrail lifecycle) ── // Inspired by Hermes ToolCallGuardrailController + Cursor parallel execution: // 1. beforeCall() — check if call should be blocked/halted // 2. Execute (or serve from cache if blocked) // 3. afterCall() — track failures/no-progress, append guidance // 4. All independent calls run via Promise.all (parallel) const toolPromises = response.tool_calls.map(async (tc) => { const fn = tc.function; try { const handler = toolHandlers[fn.name]; if (!handler) { return { id: tc.id, result: `❌ Unknown tool: ${fn.name}` }; } let args; try { args = JSON.parse(fn.arguments || '{}'); } catch (parseErr) { const argLen = (fn.arguments || '').length; const hint = fn.name === 'file_write' ? 'Use bash with heredoc for large files.' : 'Retry with shorter arguments.'; logger.error(` → ${fn.name} parse failed: ${parseErr.message} (${argLen} chars)`); // Track failed tool call in stuck detection history callHistory.push(`${fn.name}:${fn.arguments?.slice(0, 80)}`); return { id: tc.id, result: `❌ ${fn.name} args truncated (${argLen} chars). ${hint}` }; } // ── Hermes guardrail: beforeCall ── const beforeDecision = sessionState.guardrail.beforeCall(fn.name, args); if (beforeDecision.action === 'halt' || beforeDecision.action === 'block') { logger.warn(`⚠ Guardrail ${beforeDecision.action}: ${fn.name} — ${beforeDecision.message}`); return { id: tc.id, result: `🛑 ${beforeDecision.message}` }; } // ── File read dedup: serve from cache ── if (fn.name === 'file_read' && args?.file_path && sessionState.wasRead(args.file_path)) { const cached = sessionState.getCachedRead(args.file_path, args.offset || 1, args.limit || 500); if (cached) { logger.info(` → ${fn.name} cache hit: ${args.file_path}`); return { id: tc.id, result: cached }; } } logger.info(` → ${fn.name}(${fn.arguments?.slice(0, 100)})`); const result = String(await handler(args)).slice(0, TOOL_RESULT_MAX); // ── Hermes guardrail: afterCall ── const afterDecision = sessionState.guardrail.afterCall(fn.name, args, result); let finalResult = result; if (afterDecision.action === 'warn' && afterDecision.guidance) { logger.warn(afterDecision.message); finalResult = result + '\n\n' + afterDecision.guidance; } return { id: tc.id, result: finalResult }; } catch (e) { logger.error(` → ${fn.name} failed: ${e.message}`); // Track failed tool call in stuck detection history callHistory.push(`${fn.name}:${JSON.stringify(args || {}).slice(0, 80)}`); // Track failure in guardrail const afterDecision = sessionState.guardrail.afterCall(fn.name, null, `Error: ${e.message}`); let errResult = `❌ ${fn.name} error: ${e.message}`; if (afterDecision.guidance) { errResult += '\n\n' + afterDecision.guidance; } return { id: tc.id, result: errResult }; } }); const toolResults = await Promise.all(toolPromises); for (const { id, result } of toolResults) { loopMessages.push({ role: 'tool', tool_call_id: id, content: result }); } } // Exhausted turns — tell the AI to summarize what was accomplished and what remains logger.warn(`⚠ Max tool turns (${MAX_TOOL_TURNS}) reached, requesting summary`); try { const final = await nonStreamChat({ model, messages: [ ...loopMessages, { role: 'user', content: 'You have reached the maximum number of tool calls. Please provide a clear summary of:\n1. What you accomplished\n2. What still needs to be done\n3. The exact next steps to continue (with specific commands/code)\n\nBe specific so the user can continue where you left off.' }, ], temperature: 0.3, max_tokens: 4096, }); return final.content || '⚠ Max tool turns reached. Some work may be incomplete — ask me to continue.'; } catch (e) { return `⚠ Max tool turns reached. Last error: ${e.message}`; } } // ── Non-streaming API call — returns { content, tool_calls, error } ── async function nonStreamChat(body) { try { const res = await api.client.post('/chat/completions', { ...body, stream: false }); const choice = res.data.choices?.[0]; if (!choice) return { content: '', tool_calls: null, error: 'No response from model' }; const msg = choice.message || {}; return { content: msg.content || '', tool_calls: msg.tool_calls || null, error: null, }; } catch (e) { return { content: '', tool_calls: null, error: e.response?.data?.error?.message || e.message }; } } // ── Tool handlers: route API tool_calls to tool class methods ── const toolHandlers = { bash: async (args) => { const tool = svc.toolMap.get('bash'); if (!tool) return '❌ Bash tool unavailable.'; try { const r = await tool.execute(args.command, { timeout: args.timeout || 300000 }); const out = (r.stdout || '').slice(0, 3000); const err = (r.stderr || '').slice(0, 3000); if (r.success) return `✅ \`\`\`\n${out || '(no output)'}\n\`\`\``; return `❌ Exit ${r.code}\n\`\`\`\n${err || out}\n\`\`\``; } catch (e) { return `❌ Bash error: ${e.message}`; } }, file_edit: async (args) => { const tool = svc.toolMap.get('file_edit'); if (!tool) return '❌ File edit tool unavailable.'; try { const r = await tool[args.action](args.file_path, args.content, args.old_text, args.new_text); return typeof r === 'string' ? r : (r.success ? `✅ ${JSON.stringify(r)}` : `❌ ${r.error}`); } catch (e) { return `❌ File edit error: ${e.message}`; } }, file_read: async (args) => { const tool = svc.toolMap.get('file_read'); if (!tool) return '❌ File read tool unavailable.'; try { return await tool.execute(args); } catch (e) { return `❌ File read error: ${e.message}`; } }, file_write: async (args) => { const tool = svc.toolMap.get('file_write'); if (!tool) return '❌ File write tool unavailable.'; try { return await tool.execute(args); } catch (e) { return `❌ File write error: ${e.message}`; } }, glob: async (args) => { const tool = svc.toolMap.get('glob'); if (!tool) return '❌ Glob tool unavailable.'; try { return await tool.execute(args); } catch (e) { return `❌ Glob error: ${e.message}`; } }, grep: async (args) => { const tool = svc.toolMap.get('grep'); if (!tool) return '❌ Grep tool unavailable.'; try { return await tool.execute(args); } catch (e) { return `❌ Grep error: ${e.message}`; } }, web_search: async (args) => { const tool = svc.toolMap.get('web_search'); if (!tool) return '❌ Web search unavailable.'; try { const r = await tool.search(args.query, { numResults: args.num_results || 5 }); if (r?.success && r.results?.length) { return `🔍 *${args.query}*\n\n${r.results.slice(0, 5).map((x, i) => `${i + 1}. [${x.title}](${x.url})\n${x.snippet || ''}`).join('\n\n')}`; } return `🔍 *${args.query}*\n\nNo results.`; } catch (e) { return `❌ Search error: ${e.message}`; } }, web_fetch: async (args) => { const tool = svc.toolMap.get('web_fetch'); if (!tool) return '❌ Web fetch tool unavailable.'; try { return await tool.execute(args); } catch (e) { return `❌ Fetch error: ${e.message}`; } }, git: async (args) => { const tool = svc.toolMap.get('git'); if (!tool) return '❌ Git tool unavailable.'; try { const method = tool[args.action]; if (!method) return `❌ Unknown git action: ${args.action}`; const r = await method.call(tool, ...(args.params || [])); return r.success ? `✅ ${r.status || JSON.stringify(r)}` : `❌ ${r.stderr || r.error}`; } catch (e) { return `❌ Git error: ${e.message}`; } }, task_create: async (args) => { const tool = svc.toolMap.get('task_create'); if (!tool) return '❌ Task tool unavailable.'; try { return await tool.execute(args); } catch (e) { return `❌ ${e.message}`; } }, task_update: async (args) => { const tool = svc.toolMap.get('task_update'); if (!tool) return '❌ Task tool unavailable.'; try { return await tool.execute(args); } catch (e) { return `❌ ${e.message}`; } }, task_list: async (args) => { const tool = svc.toolMap.get('task_list'); if (!tool) return '❌ Task tool unavailable.'; try { return await tool.execute(args); } catch (e) { return `❌ ${e.message}`; } }, send_message: async (args) => { const tool = svc.toolMap.get('send_message'); if (!tool) return '❌ Send message tool unavailable.'; try { return await tool.execute(args); } catch (e) { return `❌ ${e.message}`; } }, schedule_cron: async (args) => { const tool = svc.toolMap.get('schedule_cron'); if (!tool) return '❌ Cron tool unavailable.'; try { return await tool.execute(args); } catch (e) { return `❌ ${e.message}`; } }, vision: async (args) => { const tool = svc.toolMap.get('vision'); if (!tool) return '❌ Vision tool unavailable.'; try { return await tool.execute(args); } catch (e) { return `❌ ${e.message}`; } }, tts: async (args) => { const tool = svc.toolMap.get('tts'); if (!tool) return '❌ TTS tool unavailable.'; try { const result = await tool.execute(args); // If audio was generated, send it as a voice message if (result.startsWith('✅')) { const filePath = result.match(/saved:\s*(.+)/)?.[1]?.trim(); if (filePath) { try { await svc.bot.api.sendAudio(svc.currentChatId, { source: filePath }, { caption: '🔊 TTS', performer: 'zCode', }); return '✅ Audio sent as voice message.'; } catch (sendErr) { return `${result}\n⚠ Could not auto-send audio: ${sendErr.message}`; } } } return result; } catch (e) { return `❌ ${e.message}`; } }, browser: async (args) => { const tool = svc.toolMap.get('browser'); if (!tool) return '❌ Browser tool unavailable.'; try { return await tool.execute(args); } catch (e) { return `❌ ${e.message}`; } }, delegate: async (args) => { // Dynamically create a DelegateTool with current context try { const { DelegateTool } = await import('../tools/DelegateTool.js'); // Build tool defs from the currently available toolHandlers const subToolDefs = {}; for (const [name, handler] of Object.entries(toolHandlers)) { subToolDefs[name] = TOOL_DEFS[name] || { description: `Tool: ${name}` }; } const subAgent = new DelegateTool({ apiClient: svc.api.client, model: svc.config?.api?.models?.default || 'glm-5.1', toolHandlers, // pass all current tool handlers toolDefs: subToolDefs, // pass tool definitions }); return await subAgent.execute(args); } catch (e) { return `❌ Delegate error: ${e.message}`; } }, delegate_agent: async (args) => { const agent = svc.agents.find(a => a.id === args.agent_id); if (!agent) return `❌ Agent not found: ${args.agent_id}`; // Actually execute the task with an agent-specific system prompt const agentPrompts = { coder: 'You are an expert code reviewer. Analyze the code for bugs, security issues, performance problems, and style improvements. Be specific and actionable.', architect: 'You are a system architect. Analyze the request from an architecture perspective — patterns, scalability, maintainability, trade-offs. Provide clear recommendations.', devops: 'You are a DevOps engineer. Focus on deployment, CI/CD, infrastructure, monitoring, and reliability. Provide practical, copy-paste-ready solutions.', }; try { const systemMsg = agentPrompts[agent.id] || `You are ${agent.name}. ${agent.description}`; const agentMessages = [ { role: 'system', content: systemMsg }, { role: 'user', content: args.task }, ]; const result = await chatWithAI(agentMessages, { maxTokens: 4096 }); return `🤖 **${agent.name}**:\n${result}`; } catch (e) { return `❌ Agent ${agent.name} error: ${e.message}`; } }, run_skill: async (args) => { const skill = svc.skills.find(s => s.name === args.skill); if (!skill) return `❌ Skill not found: ${args.skill}`; // Execute skill with a specialized system prompt const skillPrompts = { code_review: 'You are a code review expert. Analyze the provided code for: bugs, security vulnerabilities, performance issues, style problems, and improvements. Structure your review with severity levels (critical/warning/info).', bug_fix: 'You are a debugging expert. Analyze the bug report and code, identify root cause, and provide a specific fix with explanation. Include edge cases.', refactor: 'You are a refactoring specialist. Improve code quality while preserving behavior. Focus on: readability, DRY, SOLID principles, naming, and structure. Show before/after.', documentation: 'You are a technical writer. Generate clear, comprehensive documentation from the provided code. Include: purpose, parameters, return values, examples, and edge cases.', testing: 'You are a testing expert. Generate thorough tests for the provided code. Include: unit tests, edge cases, error cases. Use assertions with clear descriptions.', }; try { const systemMsg = skillPrompts[skill.name] || `You are a ${skill.name} expert. ${skill.description}`; const skillMessages = [ { role: 'system', content: systemMsg }, { role: 'user', content: args.input || 'Please analyze the code and provide your expert review.' }, ]; const result = await chatWithAI(skillMessages, { maxTokens: 4096 }); return `📚 **${skill.name}**:\\n${result}`; } catch (e) { return `❌ Skill ${skill.name} error: ${e.message}`; } }, swarm_spawn: async (args) => { try { const agent = await svc.swarm.spawnAgent({ type: args.type, name: args.name || args.type, capabilities: args.capabilities || [], }); return `✅ Spawned agent: **${agent.name}** (id: ${agent.id}, type: ${agent.type})`; } catch (e) { return `❌ Swarm spawn error: ${e.message}`; } }, swarm_execute: async (args) => { try { const task = new Task({ type: args.type || 'generic', description: args.description, priority: args.priority || 'medium', dependencies: args.dependencies || [], }); const result = await svc.swarm.executeTask(args.agent_id, task); return `✅ Task completed on agent ${args.agent_id}: ${JSON.stringify(result)}`; } catch (e) { return `❌ Swarm execute error: ${e.message}`; } }, swarm_distribute: async (args) => { try { const taskObjs = (args.tasks || []).map((t, i) => new Task({ id: t.id || `task_${i}`, type: t.type || 'generic', description: t.description || '', priority: t.priority || 'medium', dependencies: t.dependencies || [], assignedTo: t.agent_id, })); const assignments = await svc.swarm.distributeTasks(taskObjs); return `✅ Distributed ${assignments.length} tasks:\\n${assignments.map(a => ` - ${a.taskId} → ${a.agentId || 'no agent'}${a.error ? ' (' + a.error + ')' : ''}` ).join('\\n')}`; } catch (e) { return `❌ Swarm distribute error: ${e.message}`; } }, swarm_state: async () => { try { const state = svc.swarm.getSwarmState(); return `🤖 **Swarm State**\\n\\nTopology: ${state.topology}\\nAgents: ${state.agents}\\nBy status: ${JSON.stringify(state.byStatus)}\\nBy type: ${JSON.stringify(state.byType)}`; } catch (e) { return `❌ Swarm state error: ${e.message}`; } }, swarm_terminate: async (args) => { try { await svc.swarm.terminateAgent(args.agent_id); return `✅ Agent ${args.agent_id} terminated`; } catch (e) { return `❌ Swarm terminate error: ${e.message}`; } }, }; // ── Create grammy bot ── const bot = new Bot(botToken); bot.api.config.use(autoRetry({ maxRetryAttempts: 5, maxDelaySeconds: 60 })); // Initialize the bot to get bot info try { await bot.init(); logger.info('✓ Bot initialized'); } catch (e) { logger.warn('⚠ Bot init failed (webhook will still work):', e.message); } // Register bot command menu const cmdList = [ { command: 'start', description: '🚀 Show help' }, { command: 'tools', description: '🔧 List available tools' }, { command: 'skills', description: '📚 List loaded skills' }, { command: 'agents', description: '🤖 List agent roles' }, { command: 'model', description: '🤖 Switch AI model' }, { command: 'stats', description: '📊 System & RTK stats' }, { command: 'voice', description: '🎤 Voice I/O info' }, { command: 'mcp', description: '🔌 MCP info' }, { command: 'memory', description: '🧠 Persistent memory stats' }, { command: 'remember', description: '📝 Save to memory' }, { command: 'recall', description: '🔍 Search memory' }, { command: 'cron', description: '⏰ Scheduled tasks' }, { command: 'bash', description: '💻 Execute shell command' }, { command: 'web', description: '🔍 Search the web' }, { command: 'git', description: '🔀 Git operations' }, ]; bot.api.setMyCommands(cmdList).catch(e => logger.warn('⚠ Failed to register commands:', e.message)); // ── Auth middleware (claudegram pattern) ── const allowedUsers = env.TELEGRAM_ALLOWED_USERS ? env.TELEGRAM_ALLOWED_USERS.split(',').map(s => s.trim()) : null; bot.use(async (ctx, next) => { if (!allowedUsers) return next(); // allow all const userId = String(ctx.from?.id || ''); if (!allowedUsers.includes(userId)) { await ctx.reply('⛔ Unauthorized.'); return; } return next(); }); // ── Sequentialize per-chat (claudegram pattern) ── // Temporarily disabled sequentialize for systemd compatibility /* bot.use(sequentialize((ctx) => { const chatId = ctx.chat?.id; if (!chatId) return undefined; const msg = ctx.message ?? ctx.callbackQuery?.message; const threadId = msg?.is_topic_message ? msg.message_thread_id : undefined; return buildSessionKey(chatId, threadId); })); */ // Simple middleware — pass through (sequentialize disabled for systemd) bot.use((ctx, next) => { // No session key needed; request queue handles per-chat ordering return next(); }); // ── /cancel bypasses queue ── bot.command('cancel', async (ctx) => { const key = buildSessionKey(ctx.chat.id, ctx.message?.message_thread_id); clearQueue(key); await ctx.reply('⏹️ Cancelled and queue cleared.'); }); // ── COMMAND HANDLERS ── bot.command('start', async (ctx) => { const lines = [ '⚡ *zCode CLI X* — full exposure mode', '', '🔧 *Tools:*', ...svc.tools.map(t => ` /${t.name} — ${t.description}`), '', '📚 *Skills:* ' + svc.skills.length + ' loaded', '🤖 *Agents:* ' + svc.agents.length + ' available', '', '🔄 *Self-Correction*: 2 retries + auto-simplification', '⚡ *Streaming*: Real-time text delivery', '', '📋 *Commands:* /tools /skills /agents /model /stats /voice /mcp /memory /cron /cancel /selfcorrection', '', 'Or just chat — I will use tools when needed.', `Model: \`${svc.config?.api?.models?.default || 'glm-5.1'}\``, ]; await sendStreamingMessage(ctx, lines.join('\n')); }); bot.command('tools', async (ctx) => { const lines = ['🔧 *Tools:*\n']; for (const t of svc.tools) lines.push(`• \`${t.name}\` — ${t.description}`); await sendStreamingMessage(ctx, lines.join('\n')); }); bot.command('skills', async (ctx) => { if (!svc.skills.length) return ctx.reply('📚 No skills loaded.'); const groups = {}; for (const s of svc.skills) { (groups[s.category] ||= []).push(s); } const lines = ['📚 *Skills:*\n']; for (const [cat, items] of Object.entries(groups)) { lines.push(`*${cat}:*`); for (const s of items) lines.push(` • \`${s.name}\` — ${s.description}`); lines.push(''); } await sendStreamingMessage(ctx, lines.join('\n')); }); bot.command('agents', async (ctx) => { const lines = ['🤖 *Agent Roles:*\n']; for (const a of svc.agents) { lines.push(`• *${a.name}* (\`${a.id}\`)`); lines.push(` ${a.description}`); } await sendStreamingMessage(ctx, lines.join('\n')); }); bot.command('selfcorrection', async (ctx) => { await sendStreamingMessage(ctx, `🔄 *Self-Correction Loops* — FULLY ENABLED\n\nzCode CLI X now uses automatic self-correction:\n\n• **Max Retries**: 2 attempts\n• **Retry Delay**: 500ms → 1s → 1.5s (exponential backoff)\n• **Triggers**: \n - ❌ Error responses\n - Rate limits\n - Timeouts\n - 5xx server errors\n\n• **Auto-Simplification**: On retry, prompts are simplified to avoid recurring errors\n\n• **Logging**: All retries are logged with retry count and reason\n\nThis ensures robust responses even when the AI initially fails.`); }); bot.command('model', async (ctx) => { const text = ctx.match?.trim(); if (!text) { return ctx.reply(`Current: \`${svc.config?.api?.models?.default || 'glm-5.1'}\`\nUsage: /model `); } svc.config.api.models = svc.config.api.models || {}; svc.config.api.models.default = text; await ctx.reply(`✅ Switched to \`${text}\``, { parse_mode: 'Markdown' }); }); bot.command('stats', async (ctx) => { const rtkStats = svc.rtk.enabled ? svc.rtk.getTrackingStats() : null; const lines = [ '📊 *Stats:*\n', `Tools: ${svc.tools.length}`, `Skills: ${svc.skills.length}`, `Agents: ${svc.agents.length}`, `Model: \`${svc.config?.api?.models?.default || 'glm-5.1'}\``, `RTK: ${svc.rtk.enabled ? '✅' : '❌'}`, ]; if (rtkStats) { lines.push(`Optimized: ${rtkStats.totalOptimized || 0}`); lines.push(`Saved: ${rtkStats.totalTokensSaved || 0} tok`); } await sendStreamingMessage(ctx, lines.join('\n')); }); bot.command('voice', async (ctx) => { await sendStreamingMessage(ctx, `🎤 *Voice I/O*\n\n🎤→📝 *Speech-to-Text*: Send a voice message — transcribed via Vosk (offline, no API key, ~200ms).\n📝→🎤 *Text-to-Speech*: Ask the AI to use the \`tts\` tool — generates voice via Edge TTS (free).\n\nNo API keys needed. Runs fully on the server.`); }); bot.command('mcp', async (ctx) => { await ctx.reply('🔌 *MCP:* Available via TS services at `src/services/mcp/`. Connect MCP servers for extended capabilities.'); }); bot.command('memory', async (ctx) => { const stats = memory.getStats(); const recent = memory.recall({ limit: 5 }); const lines = [ `🧠 *Persistent Memory* — ${stats.total} memories stored`, '', `📅 From ${stats.oldest || 'today'} to ${stats.newest || 'now'}`, '', '*Categories:*', ]; for (const [cat, count] of Object.entries(stats.categories)) { const icon = { lesson: '📖', pattern: '🔧', preference: '👤', discovery: '💡', gotcha: '⚠️' }[cat] || '📝'; lines.push(` ${icon} ${cat}: ${count}`); } if (recent.length > 0) { lines.push('', '*Recent:*'); for (const m of recent.slice(0, 5)) { lines.push(` • [${m.category}] ${m.content.substring(0, 60)}`); } } lines.push('', '_Use /remember to save, /recall to search, /forget to delete_'); await sendStreamingMessage(ctx, lines.join('\n')); }); bot.command('remember', async (ctx) => { const text = ctx.match?.trim(); if (!text) return ctx.reply('Usage: /remember \n\nCategories: lesson, pattern, preference, discovery, gotcha\n\nExamples:\n/remember User prefers TypeScript over JavaScript\n/remember gotcha: Z.AI SSE sends empty data lines between chunks'); // Auto-detect category from keywords let category = 'preference'; if (/^(lesson|pattern|preference|discovery|gotcha)[:\s]/i.test(text)) { const match = text.match(/^(lesson|pattern|preference|discovery|gotcha)[:\s]\s*(.*)/i); if (match) { category = match[1].toLowerCase(); await memory.remember(category, match[2]); } } else { await memory.remember(category, text); } await ctx.reply(`🧠 Saved to memory [${category}]. I will remember this across sessions.`); }); bot.command('recall', async (ctx) => { const query = ctx.match?.trim(); if (!query) return ctx.reply('Usage: /recall \n\nExample: /recall ZAI API timeout'); const results = memory.recall({ query, limit: 10 }); if (results.length === 0) return ctx.reply('🔍 No memories match that query.'); const lines = [`🔍 *Recall* (${results.length} results):`, '']; for (const m of results) { const date = new Date(m.created).toLocaleDateString(); lines.push(`[${m.category}] (${date}) ${m.content.substring(0, 80)}`); if (m.meta?.resolution) lines.push(` → ${m.meta.resolution.substring(0, 60)}`); } await sendStreamingMessage(ctx, lines.join('\n')); }); bot.command('forget', async (ctx) => { const id = ctx.match?.trim(); if (!id) return ctx.reply('Usage: /forget \n\nFind IDs with /recall or /memory'); const ok = await memory.forget(id); await ctx.reply(ok ? '🗑 Memory deleted.' : '❌ Memory not found.'); }); bot.command('reset', async (ctx) => { const chatKey = conversation._key(ctx.chat.id, ctx.message?.message_thread_id); await conversation.clear(chatKey); await ctx.reply('🧹 Chat history cleared. Fresh start — I won\'t remember our previous conversation in this chat.'); }); bot.command('cron', async (ctx) => { await ctx.reply('⏰ *Cron:* CronScheduler at `src/utils/cronScheduler.ts` — 1s interval, task locking, auto-recovery.'); }); // ── Dynamic tool commands ── for (const tool of svc.tools) { const tname = tool.name; if (bot.command(tname)) continue; // skip if registered bot.command(tname, async (ctx) => { const args = ctx.match?.trim(); const handler = toolHandlers[tname]; if (!handler) return ctx.reply(`❌ No handler for ${tname}`); if (!args) return ctx.reply(`Usage: /${tname} \n${tool.description}`); const result = await handler(tname === 'web_search' ? { query: args, num_results: 5 } : tname === 'bash' ? { command: args, timeout: 300000 } : tname === 'git' ? (() => { const p = args.split(/\s+/); return { action: p[0], params: p.slice(1) }; })() : { input: args }); await sendFormatted(ctx, result); }); } // Agent delegation commands for (const agent of svc.agents) { const cmdName = `agent_${agent.id}`; bot.command(cmdName, async (ctx) => { const task = ctx.match?.trim(); if (!task) return ctx.reply(`🤖 *${agent.name}*\n${agent.description}\n\nUsage: /${cmdName} `); const result = await chatWithAI([ { role: 'system', content: `You are the **${agent.name}** (${agent.id}). Capabilities: ${agent.capabilities.join(', ')}. Respond as this specialist within zCode CLI X.` }, { role: 'user', content: task }, ]); await sendFormatted(ctx, result); }); } // ── Message text handler (with dedup + queue + self-correction) ── // ── Text message handler (shared by text & voice) ── async function handleTextMessage(ctx, text, isVoice = false) { if (isDuplicate(ctx.message.message_id)) return; markProcessed(ctx.message.message_id); const key = buildSessionKey(ctx.chat.id, ctx.message?.message_thread_id); const user = ctx.from?.username || ctx.from?.first_name || 'Unknown'; const prefix = isVoice ? '🎤' : '💬'; // ── Reply context: inject replied-to message as context (Ruflo pattern) ── // When user replies/tags a previous message, Telegram sends reply_to_message. // Without this, the bot sees "make hero more exciting" with ZERO context about which page. const replyTo = ctx.message?.reply_to_message; if (replyTo) { const repliedText = replyTo.text || replyTo.caption || ''; if (repliedText) { // Truncate long replies to avoid context bloat const snippet = repliedText.length > 500 ? repliedText.substring(0, 500) + '…' : repliedText; text = `[Replying to previous message:\n${snippet}]\n\n${text}`; logger.info(`📎 Reply context injected (${repliedText.length} chars)`); } } logger.info(`${prefix} ${user}: ${text.substring(0, 80)}…`); await queueRequest(key, text, async () => { // ── Typing indicator: keep pinging until response is fully complete ── const typingInterval = setInterval(async () => { try { await ctx.api.sendChatAction(ctx.chat.id, 'typing'); } catch (e) { logger.warn(`⌨️ typing error: ${e.message}`); } }, 4000); // Telegram typing expires after 5s, refresh every 4s try { await ctx.api.sendChatAction(ctx.chat.id, 'typing'); logger.info('⌨️ typing started'); } catch (e) { logger.warn(`⌨️ initial typing error: ${e.message}`); } try { // ── Intent detection: bypass AI for simple messages ── const intent = detectIntent(text); if (intent && intent.bypassAI) { logger.info(`🎯 Intent: ${intent.type} — bypassing AI`); clearInterval(typingInterval); const reply = intent.response || 'Got it.'; await sendFormatted(ctx, reply); return; } // ── Load conversation history for this chat ── const chatKey = conversation._key(ctx.chat.id, ctx.message?.message_thread_id); svc.currentChatId = ctx.chat.id; // Track for TTS auto-send const history = await conversation.getContext(chatKey, text); // Create stream consumer for real-time edit-in-place const consumer = new StreamConsumer(ctx, { editInterval: 1000 }); const runPromise = consumer.run(); // Build messages: system + history + current const messages = [ { role: 'system', content: buildSystemPrompt(svc) }, ...history, { role: 'user', content: text }, { role: 'user', content: '[PLANNING] Before using ANY tool, check: is the answer already in your context (system prompt above)? Can you answer directly? If you need tools, plan the minimum number of turns. Batch independent calls together. Use specialized tools (file_read, glob, grep) over bash.' }, ]; // Wrap chatWithAI with self-correction + streaming const chatWithCorrection = withSelfCorrection(async (msgs) => { return await chatWithAI(msgs, { onDelta: (token) => { consumer.onDelta(token); }}); }); const result = await chatWithCorrection(messages); // ── Save this exchange to conversation history ── await conversation.add(chatKey, 'user', text); if (result) await conversation.add(chatKey, 'assistant', result); // Stop typing indicator clearInterval(typingInterval); logger.info('⌨️ typing stopped (response complete)'); // Signal completion and wait for final edit consumer.finish(); await runPromise; // If streaming failed to deliver (no message sent), fallback to plain send if (!consumer.alreadySent && result) { logger.info(`📤 Streaming failed, fallback send (${result.length} chars)`); await sendFormatted(ctx, result); } else { logger.info('✅ Stream delivered'); } // ── Self-learning: extract patterns from this interaction ── await selfLearn(text, result, memory); } finally { // Always stop typing indicator, even on error clearInterval(typingInterval); } }); } bot.on('message:text', async (ctx) => { await handleTextMessage(ctx, ctx.message.text, false); }); // ── Voice handler (Vosk STT) ── bot.on('message:voice', async (ctx) => { const fileId = ctx.message.voice.file_id; const user = ctx.from?.username || ctx.from?.first_name || 'Unknown'; logger.info(`🎤 Voice from ${user}`); const statusMsg = await ctx.reply('🎤 Transcribing…'); try { const file = await ctx.api.getFile(fileId); const url = `https://api.telegram.org/file/bot${botToken}/${file.file_path}`; const oggPath = `/tmp/zcode-voice-${Date.now()}.ogg`; // Download voice file via fetch (faster than curl subprocess) const { execSync } = await import('child_process'); const voiceResp = await fetch(url); if (!voiceResp.ok) throw new Error(`Download failed: ${voiceResp.status}`); const { writeFileSync, unlinkSync } = await import('fs'); writeFileSync(oggPath, Buffer.from(await voiceResp.arrayBuffer())); logger.info(`Voice downloaded: ${oggPath}`); // Run Vosk STT — path is ../../scripts/stt.py from src/bot/ const sttScript = new URL('../../scripts/stt.py', import.meta.url).pathname; let parsed; try { const result = execSync( `python3 "${sttScript}" "${oggPath}"`, { timeout: 30000, encoding: 'utf-8', stdio: ['pipe', 'pipe', 'pipe'] } ); parsed = JSON.parse(result.trim()); } catch (e) { // exit code 1 = no speech detected, stdout still has JSON const stdout = e.stdout?.trim(); if (stdout) { try { parsed = JSON.parse(stdout); } catch { parsed = null; } } if (!parsed || !parsed.text) { await ctx.api.editMessageText(ctx.chat.id, statusMsg.message_id, '🎤 Could not detect speech in the voice message.'); return; } } unlinkSync(oggPath); if (parsed.error) { logger.error(`STT error: ${parsed.error}`); await ctx.api.editMessageText(ctx.chat.id, statusMsg.message_id, `❌ STT error: ${parsed.error}`); return; } if (!parsed.text) { await ctx.api.editMessageText(ctx.chat.id, statusMsg.message_id, '🎤 Could not detect speech in the voice message.'); return; } logger.info(`🎤 STT (${parsed.confidence || '?'}): ${parsed.text}`); await ctx.api.deleteMessage(ctx.chat.id, statusMsg.message_id); // Feed transcribed text into the main chat pipeline await handleTextMessage(ctx, parsed.text, true); } catch (err) { logger.error(`Voice handler error: ${err.message}`); try { await ctx.api.editMessageText(ctx.chat.id, statusMsg.message_id, `❌ Voice processing failed: ${err.message.slice(0, 100)}`); } catch {} } }); // ── Photo handler ── bot.on('message:photo', async (ctx) => { logger.info(`📸 Photo from ${ctx.from?.username}`); // TODO: vision analysis await ctx.reply('📸 Image received. Vision analysis TBD.'); }); // ── Error handler ── bot.catch((err) => { logger.error('Bot error:', err.message || err); }); // ── (unhandled rejection handler registered below with gracefulShutdown) ── // ── Graceful shutdown is defined at end of initBot (requires full `svc`) ── // ── Express + WebSocket server (keep for webhook compatibility) ── const app = express(); app.use(express.json()); const httpServer = createServer(app); const wss = new WebSocketServer({ server: httpServer }); const wsClients = new Map(); wss.on('connection', (ws, req) => { const chatId = req.url?.startsWith('/?chatId=') ? req.url.slice(8) : 'ws'; wsClients.set(chatId, ws); ws.on('close', () => wsClients.delete(chatId)); logger.info(`🔌 WS: ${chatId}`); }); // Webhook handler — feeds into grammy app.get('/telegram/webhook', (req, res) => { const { 'hub.mode': mode, 'hub.challenge': challenge } = req.query; if (mode === 'subscribe' && challenge) return res.status(200).send(challenge); res.status(200).json({ ok: true, message: 'zCode webhook active' }); }); app.post('/telegram/webhook', async (req, res) => { res.json({ ok: true }); // ack immediately try { logger.info('📥 Webhook received:', { update_id: req.body?.update_id, has_message: !!req.body?.message, type: req.body?.message?.text ? 'text' : 'other' }); await bot.handleUpdate(req.body); } catch (e) { logger.error('Webhook update error:', { message: e.message, name: e.name, stack: e.stack, body: req.body?.update_id, full: e.toString(), body_preview: req.body ? JSON.stringify(req.body).substring(0, 200) : 'no body' }); } }); // Health check app.get('/health', (req, res) => { res.json({ ok: true, uptime: process.uptime(), tools: svc.tools.length, skills: svc.skills.length, agents: svc.agents.length, wsClients: wsClients.size, }); }); const PORT = process.env.ZCODE_PORT || 3000; // ── PortManager: smart port lifecycle (claim, retry, recover) ── const PIDFILE = path.join(process.env.HOME || '/tmp', '.zcode-bot.pid'); const portManager = new PortManager({ port: PORT, pidfile: PIDFILE, maxAttempts: 5, baseDelayMs: 500, maxDelayMs: 5000, }); // ── Claim port via PortManager (retry + stale recovery + backoff) ── try { await portManager.claim(httpServer); logger.info(`✓ HTTP on :${PORT} · WS ready · grammy bot online`); logger.info(`✓ ${svc.tools.length} tools · ${svc.skills.length} skills · ${svc.agents.length} agents`); } catch (err) { logger.error(`❌ Port ${PORT} unavailable after retries: ${err.message}`); process.exit(1); } // Set webhook const wu = process.env.ZCODE_WEBHOOK_URL; if (wu) { try { await bot.api.setWebhook(wu, { allowed_updates: ['message', 'callback_query'] }); logger.info('✓ Webhook set'); } catch (e) { logger.warn('⚠ Webhook failed:', e.message); } } // ── Graceful shutdown: cleanup all systems ── const gracefulShutdown = async (signal) => { logger.info(`🛑 Shutting down (${signal})...`); // Flush conversation history try { await conversation.flush(); } catch (e) { logger.warn(`Conversation flush: ${e.message}`); } // Cleanup swarm if (svc.swarm && typeof svc.swarm.shutdown === 'function') { try { await svc.swarm.shutdown(); } catch (e) { logger.warn(`Swarm shutdown: ${e.message}`); } } // Cleanup plugin manager if (svc.pluginManager && typeof svc.pluginManager.shutdown === 'function') { try { await svc.pluginManager.shutdown(); } catch (e) { logger.warn(`Plugin shutdown: ${e.message}`); } } // Cleanup memory backends if (svc.memBackend && typeof svc.memBackend.shutdown === 'function') { try { await svc.memBackend.shutdown(); } catch (e) { logger.warn(`Memory shutdown: ${e.message}`); } } // Cleanup hooks if (svc.hooks && typeof svc.hooks.shutdown === 'function') { try { await svc.hooks.shutdown(); } catch (e) { logger.warn(`Hooks shutdown: ${e.message}`); } } // Don't release pidfile — the next process needs it to detect us. // It will overwrite it on startup. This prevents the race condition // where the new process can't identify the stale process. // Stop webhook polling try { await bot.stop(); } catch {} // Close HTTP server try { await new Promise(r => httpServer.close(r)); } catch {} portManager.release(); logger.info('✓ Shutdown complete'); process.exit(0); }; process.on('SIGINT', () => { const stack = new Error().stack; logger.info(`SIGINT trace (${process.pid}, PPID=${process.ppid}): ${stack}`); gracefulShutdown('SIGINT'); }); process.on('SIGTERM', () => { const stack = new Error().stack; logger.info(`SIGTERM trace (${process.pid}, PPID=${process.ppid}): ${stack}`); gracefulShutdown('SIGTERM'); }); // ── Resilient error handlers (Hermes/OpenCode pattern) ── // LOG errors but DON'T kill the process. Only SIGINT/SIGTERM trigger gracefulShutdown. // uncaughtException: log and continue. Fatal errors will crash anyway — no need to force it. process.on('uncaughtException', (e) => { logger.error('💥 Uncaught exception (non-fatal, continuing):', e.message, String(e.stack).slice(0, 300)); }); process.on('unhandledRejection', (e) => { logger.error('💥 Unhandled rejection (non-fatal, continuing):', e?.message || e); }); return { send: (chatId, text) => bot.api.sendMessage(chatId, markdownToHtml(text), { parse_mode: 'HTML' }), ws: (chatId, msg) => wsClients.get(chatId)?.send(JSON.stringify(msg)), waitForMessages: async () => { // Robust keepalive: setInterval prevents Node.js from exiting // even if the Promise executor is optimized away by V8 await new Promise((resolve) => { const keepalive = setInterval(() => {}, 60000); // 1-min tick keeps event loop alive // If SIGINT/SIGTERM fires, clearInterval is handled by gracefulShutdown's process.exit // This promise intentionally never resolves }); }, getConnections: () => wsClients.size, // Expose new systems for external use pluginManager: svc.pluginManager, swarm: svc.swarm, hookManager: svc.hooks, memBackend: svc.memBackend, agentOrchestrator: svc.agentOrchestrator, portManager, getState: () => ({ tools: svc.tools.length, skills: svc.skills.length, agents: svc.agents.length, plugins: svc.pluginManager?.getPlugins()?.length || 0, wsClients: wsClients.size }), }; }