diff --git a/src/bot/index.js b/src/bot/index.js index a56e63e5..1a40c96c 100644 --- a/src/bot/index.js +++ b/src/bot/index.js @@ -7,7 +7,6 @@ import { WebSocketServer } from 'ws'; import fs from 'fs'; import path from 'path'; import { execSync } from 'child_process'; -import net from 'net'; import { logger } from '../utils/logger.js'; import { checkEnv } from '../utils/env.js'; import { getRTK } from '../utils/rtk.js'; @@ -31,20 +30,10 @@ import { Agent } from '../agents/Agent.js'; import { Task } from '../agents/Task.js'; import { SwarmCoordinator } from '../agents/SwarmCoordinator.js'; import { JSONBackend, InMemoryBackend, MEMORY_TYPES } from './memory-backend.js'; +import { PortManager } from './port-manager.js'; -// ── Pidfile: informational only, no process killing ── -const PIDFILE = path.join(process.env.HOME || '/tmp', '.zcode-bot.pid'); -function acquirePidfile() { - try { - fs.writeFileSync(PIDFILE, process.pid.toString()); - logger.info(`✓ Pidfile: ${PIDFILE} (PID ${process.pid})`); - } catch (e) { - logger.warn(`Pidfile write failed: ${e.message}`); - } -} -function releasePidfile() { - try { fs.unlinkSync(PIDFILE); } catch {} -} + +// ── PortManager handles pidfile + port lifecycle (see port-manager.js) ── function buildSessionKey(chatId, threadId) { return threadId ? `${chatId}:${threadId}` : String(chatId); @@ -1430,9 +1419,15 @@ export async function initBot(config, api, tools, skills, agents) { // ── (unhandled rejection handler registered below with gracefulShutdown) ── - // ── Graceful shutdown is defined at end of initBot (requires full `svc`) ── - - acquirePidfile(); + // ── PortManager: smart port lifecycle (claim, retry, recover) ── + const PIDFILE = path.join(process.env.HOME || '/tmp', '.zcode-bot.pid'); + const portManager = new PortManager({ + port: PORT, + pidfile: PIDFILE, + maxAttempts: 5, + baseDelayMs: 500, + maxDelayMs: 5000, + }); // ── Express + WebSocket server (keep for webhook compatibility) ── const app = express(); @@ -1487,148 +1482,16 @@ export async function initBot(config, api, tools, skills, agents) { const PORT = process.env.ZCODE_PORT || 3000; - // ── Robust port binding: pidfile-based stale detection, no fuser ── - // Strategy (inspired by Next.js, Vite, webpack-dev-server): - // 1. Read pidfile for stale process, SIGTERM it if still alive - // 2. Probe port with a disposable net socket to confirm availability - // 3. Bind httpServer after port is confirmed free - // This avoids the fuser race condition where fuser returns the current - // process PID because the socket is already half-open. - - function readStalePid() { - try { - const pid = parseInt(fs.readFileSync(PIDFILE, 'utf8').trim(), 10); - if (!isNaN(pid) && pid !== process.pid) return pid; - } catch {} - return null; + // ── Claim port via PortManager (retry + stale recovery + backoff) ── + try { + await portManager.claim(httpServer); + logger.info(`✓ HTTP on :${PORT} · WS ready · grammy bot online`); + logger.info(`✓ ${svc.tools.length} tools · ${svc.skills.length} skills · ${svc.agents.length} agents`); + } catch (err) { + logger.error(`❌ Port ${PORT} unavailable after retries: ${err.message}`); + process.exit(1); } - function isProcessAlive(pid) { - try { process.kill(pid, 0); return true; } catch { return false; } - } - - function killStaleProcess(pid) { - if (!isProcessAlive(pid)) return false; - // Guard: don't kill processes younger than 15 seconds (prevents crash-loop - // where systemd restarts before old instance finishes dying, causing mutual kills) - try { - const stat = fs.readFileSync(`/proc/${pid}/stat`, 'utf8'); - // Field 22 in /proc/pid/stat is starttime (in jiffies since boot) - const fields = stat.split(')'); - if (fields.length > 1) { - const statFields = fields[1].trim().split(/\s+/); - const startTimeTicks = parseInt(statFields[19], 10); - if (!isNaN(startTimeTicks)) { - const bootTime = fs.readFileSync('/proc/stat', 'utf8') - .split('\n').find(l => l.startsWith('btime '))?.split(/\s+/)[1]; - if (bootTime) { - const startTimeMs = (parseInt(bootTime) + startTimeTicks / 100) * 1000; - const age = Date.now() - startTimeMs; - if (age < 15000) { - logger.warn(` Skipping PID ${pid} — only ${Math.round(age / 1000)}s old (crash-loop guard)`); - return false; - } - } - } - } - } catch { - // /proc not available (non-Linux) — skip guard - } - try { - process.kill(pid, 'SIGTERM'); - logger.warn(` Sent SIGTERM to stale PID ${pid}`); - return true; - } catch (e) { - logger.warn(` Failed to kill stale PID ${pid}: ${e.message}`); - return false; - } - } - - function probePort(port) { - return new Promise((resolve) => { - const sock = net.createServer(); - sock.listen(port, () => { - sock.close(() => resolve(false)); // port is free - }); - sock.on('error', () => resolve(true)); // port is in use - }); - } - - async function waitForPort(port, maxMs = 5000) { - const start = Date.now(); - while (Date.now() - start < maxMs) { - const inUse = await probePort(port); - if (!inUse) return true; - await new Promise(r => setTimeout(r, 300)); - } - return false; - } - - async function bindPort() { - // Ensure pidfile is current before port probe - acquirePidfile(); - const inUse = await probePort(PORT); - if (inUse) { - logger.warn(`⚠ Port ${PORT} in use — checking for stale process`); - const stalePid = readStalePid(); - if (stalePid) { - const killed = killStaleProcess(stalePid); - if (killed) { - const freed = await waitForPort(PORT); - if (!freed) { - logger.error(`❌ Port ${PORT} still occupied after killing stale PID ${stalePid}`); - process.exit(1); - } - logger.info(`✓ Port ${PORT} freed after stale process cleanup`); - } - } else { - // No stale pidfile — try to find the process via ss - try { - const ssOut = execSync(`ss -tlnp 'sport = :${PORT}' 2>/dev/null`, { encoding: 'utf8' }).trim(); - const pidMatch = ssOut.match(/pid=(\d+)/); - if (pidMatch) { - const stalePid = parseInt(pidMatch[1]); - if (!isNaN(stalePid) && stalePid !== process.pid) { - const killed = killStaleProcess(stalePid); - if (killed) { - const freed = await waitForPort(PORT); - if (freed) { - logger.info(`✓ Port ${PORT} freed after killing PID ${stalePid} (detected via ss)`); - } else { - logger.error(`❌ Port ${PORT} still occupied after killing PID ${stalePid}`); - process.exit(1); - } - } - } - } else { - logger.error(`❌ Port ${PORT} occupied by unknown process (no pidfile, ss couldn't identify). Free it manually or change ZCODE_PORT.`); - process.exit(1); - } - } catch { - logger.error(`❌ Port ${PORT} occupied by unknown process. Free it manually or change ZCODE_PORT.`); - process.exit(1); - } - } - } - - // Bind the server - await new Promise((resolve, reject) => { - httpServer.listen(PORT, () => { - logger.info(`✓ HTTP on :${PORT} · WS ready · grammy bot online`); - logger.info(`✓ ${svc.tools.length} tools · ${svc.skills.length} skills · ${svc.agents.length} agents`); - resolve(); - }); - httpServer.once('error', (err) => { - reject(err); - }); - }).catch((err) => { - logger.error(`❌ Failed to bind port ${PORT}: ${err.message}`); - process.exit(1); - }); - } - - await bindPort(); - @@ -1672,7 +1535,7 @@ export async function initBot(config, api, tools, skills, agents) { try { await bot.stop(); } catch {} // Close HTTP server try { await new Promise(r => httpServer.close(r)); } catch {} - releasePidfile(); + portManager.release(); logger.info('✓ Shutdown complete'); process.exit(0); }; @@ -1715,6 +1578,7 @@ export async function initBot(config, api, tools, skills, agents) { hookManager: svc.hooks, memBackend: svc.memBackend, agentOrchestrator: svc.agentOrchestrator, + portManager, getState: () => ({ tools: svc.tools.length, skills: svc.skills.length, agents: svc.agents.length, plugins: svc.pluginManager?.getPlugins()?.length || 0, wsClients: wsClients.size }), }; } diff --git a/src/bot/port-manager.js b/src/bot/port-manager.js new file mode 100644 index 00000000..e4da18f4 --- /dev/null +++ b/src/bot/port-manager.js @@ -0,0 +1,327 @@ +/** + * PortManager — intelligent port lifecycle manager + * + * Replaces the fragile probe→kill→exit dance with a proper state machine: + * 1. Probe if port is in use + * 2. Identify the holder (via pidfile + /proc + ss) + * 3. Attempt graceful SIGTERM if safe (not self, not too young) + * 4. Retry with exponential backoff instead of process.exit(1) + * 5. Track port ownership to prevent self-conflicts + * + * Inspired by Ruflo's PluginManager error recovery: + * - Never panic-exit on recoverable errors + * - Graceful degradation with retry loops + * - Event-based state tracking + */ + +import net from 'net'; +import fs from 'fs'; +import { execSync } from 'child_process'; +import { EventEmitter } from 'events'; +import { logger } from '../utils/logger.js'; + +export class PortManager extends EventEmitter { + #state; // 'idle' | 'probing' | 'claiming' | 'owned' | 'releasing' | 'failed' + #port; + #owner; // pid of current owner (null if free) + #pidfile; + #retryConfig; // { maxAttempts, baseDelayMs, maxDelayMs } + + /** + * @param {object} opts + * @param {number} opts.port — port to manage + * @param {string} [opts.pidfile] — path to pidfile for stale detection + * @param {number} [opts.maxAttempts=5] — max bind retries + * @param {number} [opts.baseDelayMs=500] — initial retry delay + * @param {number} [opts.maxDelayMs=5000] — max retry delay + */ + constructor({ port, pidfile, maxAttempts = 5, baseDelayMs = 500, maxDelayMs = 5000 }) { + super(); + this.#port = port; + this.#pidfile = pidfile || null; + this.#owner = null; + this.#state = 'idle'; + this.#retryConfig = { maxAttempts, baseDelayMs, maxDelayMs }; + } + + get port() { return this.#port; } + get state() { return this.#state; } + get owner() { return this.#owner; } + + // ── Core API ────────────────────────────────────────────── + + /** + * Claim the port for this process. Retries with backoff on EADDRINUSE. + * Returns a bound http.Server-compatible callback — call it after createServer. + * + * @param {import('http').Server} server + * @returns {Promise} + */ + async claim(server) { + this.#setState('claiming'); + this.#writePidfile(process.pid); + + const inUse = await this.probe(); + if (!inUse) { + return this.#bind(server); + } + + // Port occupied — intelligent recovery + logger.warn(`Port ${this.#port} occupied — starting smart recovery`); + const holder = this.#identifyHolder(); + + if (holder && holder.pid !== process.pid) { + const age = this.#getProcessAge(holder.pid); + logger.info(`Holder: PID ${holder.pid}, age: ${age}ms, source: ${holder.source}`); + + if (age !== null && age < 3000) { + // Sibling process just started (systemd rapid restart) — don't kill, just wait + logger.warn(`Holder PID ${holder.pid} is only ${Math.round(age / 1000)}s old — waiting for graceful exit`); + } else if (age !== null && age < 30000) { + // Recent process — send SIGTERM then wait + logger.warn(`Sending SIGTERM to PID ${holder.pid} (${Math.round(age / 1000)}s old)`); + this.#safeKill(holder.pid); + } else { + // Old stale process — force kill + logger.warn(`Stale holder PID ${holder.pid} (${Math.round(age / 1000)}s old) — SIGTERM`); + this.#safeKill(holder.pid); + } + } else if (holder && holder.pid === process.pid) { + // We already own it? Shouldn't happen but handle gracefully + logger.info(`Port ${this.#port} already held by this process (PID ${process.pid})`); + this.#state = 'owned'; + return; + } + + // Wait for port to free, then bind with retries + return this.#waitForFreeAndBind(server); + } + + /** + * Release the port (cleanup on shutdown) + */ + release() { + this.#setState('releasing'); + this.#owner = null; + try { + if (this.#pidfile) fs.unlinkSync(this.#pidfile); + } catch {} + this.#setState('idle'); + logger.info(`Port ${this.#port} released`); + } + + // ── Probing ─────────────────────────────────────────────── + + /** + * Check if port is in use. Returns true if occupied. + * @returns {Promise} + */ + probe() { + return new Promise((resolve) => { + const sock = net.createServer(); + sock.listen(this.#port, '0.0.0.0', () => { + sock.close(() => resolve(false)); // free + }); + sock.on('error', () => resolve(true)); // in use + }); + } + + // ── Holder identification ───────────────────────────────── + + /** + * Identify who's holding the port. + * Checks pidfile first, then ss, then falls back to unknown. + * @returns {{ pid: number|null, source: string }|null} + */ + #identifyHolder() { + // Method 1: pidfile + if (this.#pidfile) { + try { + const pid = parseInt(fs.readFileSync(this.#pidfile, 'utf8').trim(), 10); + if (!isNaN(pid) && this.#isAlive(pid)) { + return { pid, source: 'pidfile' }; + } + } catch {} + } + + // Method 2: ss -tlnp + try { + const ssOut = execSync(`ss -tlnp 'sport = :${this.#port}' 2>/dev/null`, { encoding: 'utf8' }).trim(); + const match = ssOut.match(/pid=(\d+)/); + if (match) { + const pid = parseInt(match[1], 10); + if (!isNaN(pid)) { + return { pid, source: 'ss' }; + } + } + } catch {} + + // Method 3: lsof fallback + try { + const lsofOut = execSync(`lsof -ti :${this.#port} 2>/dev/null`, { encoding: 'utf8' }).trim(); + if (lsofOut) { + const pid = parseInt(lsofOut.split('\n')[0], 10); + if (!isNaN(pid)) { + return { pid, source: 'lsof' }; + } + } + } catch {} + + return null; + } + + // ── Process helpers ─────────────────────────────────────── + + #isAlive(pid) { + try { process.kill(pid, 0); return true; } catch { return false; } + } + + #getProcessAge(pid) { + try { + const stat = fs.readFileSync(`/proc/${pid}/stat`, 'utf8'); + const fields = stat.split(')'); + if (fields.length > 1) { + const statFields = fields[1].trim().split(/\s+/); + const startTimeTicks = parseInt(statFields[19], 10); + if (!isNaN(startTimeTicks)) { + const bootLine = fs.readFileSync('/proc/stat', 'utf8') + .split('\n').find(l => l.startsWith('btime ')); + if (bootLine) { + const bootSec = parseInt(bootLine.split(/\s+/)[1], 10); + const hz = 100; // USER_HZ on most Linux + const startSec = bootSec + startTimeTicks / hz; + return Date.now() - (startSec * 1000); + } + } + } + } catch {} + return null; // can't determine age (non-Linux, etc.) + } + + #safeKill(pid) { + try { + process.kill(pid, 'SIGTERM'); + this.emit('kill', { pid, signal: 'SIGTERM' }); + } catch (e) { + logger.warn(`Failed to kill PID ${pid}: ${e.message}`); + } + } + + // ── Bind with retry ─────────────────────────────────────── + + async #waitForFreeAndBind(server) { + const { maxAttempts, baseDelayMs, maxDelayMs } = this.#retryConfig; + + for (let attempt = 1; attempt <= maxAttempts; attempt++) { + // Wait for port to become free + const freed = await this.#pollFree(maxDelayMs); + if (!freed) { + logger.warn(`Attempt ${attempt}/${maxAttempts}: port still occupied`); + } + + // Try to bind + try { + await this.#bind(server); + return; // success + } catch (err) { + if (err.code === 'EADDRINUSE' && attempt < maxAttempts) { + const delay = Math.min(baseDelayMs * Math.pow(2, attempt - 1), maxDelayMs); + logger.warn(`EADDRINUSE on attempt ${attempt}/${maxAttempts} — retrying in ${delay}ms`); + this.emit('retry', { attempt, maxAttempts, delay, error: err.message }); + await this.#sleep(delay); + } else { + this.#setState('failed'); + this.emit('failed', { error: err.message, attempts: attempt }); + throw err; + } + } + } + + this.#setState('failed'); + throw new Error(`Port ${this.#port} unavailable after ${maxAttempts} attempts`); + } + + #bind(server) { + return new Promise((resolve, reject) => { + server.listen(this.#port, '0.0.0.0', () => { + this.#owner = process.pid; + this.#setState('owned'); + logger.info(`Port ${this.#port} claimed (PID ${process.pid})`); + this.emit('claimed', { port: this.#port, pid: process.pid }); + resolve(); + }); + server.on('error', (err) => { + if (err.code === 'EADDRINUSE') { + reject(err); + } else { + logger.error(`Port ${this.#port} bind error: ${err.message}`); + this.#setState('failed'); + reject(err); + } + }); + }); + } + + /** + * Poll until port is free or timeout. + * @param {number} timeoutMs + * @returns {Promise} true if free + */ + #pollFree(timeoutMs) { + const interval = 300; + const deadline = Date.now() + timeoutMs; + return new Promise(async (resolve) => { + while (Date.now() < deadline) { + if (!(await this.probe())) { + resolve(true); + return; + } + await this.#sleep(interval); + } + resolve(false); + }); + } + + // ── Pidfile ─────────────────────────────────────────────── + + #writePidfile(pid) { + if (!this.#pidfile) return; + try { + fs.writeFileSync(this.#pidfile, pid.toString()); + logger.info(`Pidfile: ${this.#pidfile} (PID ${pid})`); + } catch (e) { + logger.warn(`Pidfile write failed: ${e.message}`); + } + } + + // ── State machine ───────────────────────────────────────── + + #setState(next) { + const prev = this.#state; + this.#state = next; + if (prev !== next) { + this.emit('stateChange', { from: prev, to: next }); + } + } + + #sleep(ms) { + return new Promise(r => setTimeout(r, ms)); + } + + // ── Diagnostics ─────────────────────────────────────────── + + /** + * Get current status for /status commands and health checks. + */ + getStatus() { + return { + port: this.#port, + state: this.#state, + owner: this.#owner, + pidfile: this.#pidfile, + processPid: process.pid, + }; + } +} + +export default PortManager;