/** * Gateway Process Manager * Manages the OpenClaw Gateway process lifecycle */ import { app } from 'electron'; import path from 'path'; import { EventEmitter } from 'events'; import WebSocket from 'ws'; import { PORTS } from '../utils/config'; import { JsonRpcNotification, isNotification, isResponse } from './protocol'; import { logger } from '../utils/logger'; import { captureTelemetryEvent, trackMetric } from '../utils/telemetry'; import { loadOrCreateDeviceIdentity, type DeviceIdentity, } from '../utils/device-identity'; import { DEFAULT_RECONNECT_CONFIG, type ReconnectConfig, type GatewayLifecycleState, getReconnectScheduleDecision, getReconnectSkipReason, } from './process-policy'; import { clearPendingGatewayRequests, rejectPendingGatewayRequest, resolvePendingGatewayRequest, type PendingGatewayRequest, } from './request-store'; import { dispatchJsonRpcNotification, dispatchProtocolEvent } from './event-dispatch'; import { GatewayStateController } from './state'; import { prepareGatewayLaunchContext } from './config-sync'; import { connectGatewaySocket, waitForGatewayReady } from './ws-client'; import { findExistingGatewayProcess, runOpenClawDoctorRepair, terminateOwnedGatewayProcess, unloadLaunchctlGatewayService, waitForPortFree, warmupManagedPythonReadiness, } from './supervisor'; import { GatewayConnectionMonitor } from './connection-monitor'; import { GatewayLifecycleController, LifecycleSupersededError } from './lifecycle-controller'; import { launchGatewayProcess } from './process-launcher'; import { GatewayRestartController } from './restart-controller'; import { GatewayRestartGovernor } from './restart-governor'; import { DEFAULT_GATEWAY_RELOAD_POLICY, loadGatewayReloadPolicy, type GatewayReloadPolicy, } from './reload-policy'; import { classifyGatewayStderrMessage, recordGatewayStartupStderrLine } from './startup-stderr'; import { runGatewayStartupSequence } from './startup-orchestrator'; export interface GatewayStatus { state: GatewayLifecycleState; port: number; pid?: number; uptime?: number; error?: string; connectedAt?: number; version?: string; reconnectAttempts?: number; } /** * Gateway Manager Events */ export interface GatewayManagerEvents { status: (status: GatewayStatus) => void; message: (message: unknown) => void; notification: (notification: JsonRpcNotification) => void; exit: (code: number | null) => void; error: (error: Error) => void; 'channel:status': (data: { channelId: string; status: string }) => void; 'chat:message': (data: { message: unknown }) => void; } /** * Gateway Manager * Handles starting, stopping, and communicating with the OpenClaw Gateway */ export class GatewayManager extends EventEmitter { private process: Electron.UtilityProcess | null = null; private processExitCode: number | null = null; // set by exit event, replaces exitCode/signalCode private ownsProcess = false; private ws: WebSocket | null = null; private status: GatewayStatus = { state: 'stopped', port: PORTS.OPENCLAW_GATEWAY }; private readonly stateController: GatewayStateController; private reconnectTimer: NodeJS.Timeout | null = null; private reconnectAttempts = 0; private reconnectConfig: ReconnectConfig; private shouldReconnect = true; private startLock = false; private lastSpawnSummary: string | null = null; private recentStartupStderrLines: string[] = []; private pendingRequests: Map = new Map(); private deviceIdentity: DeviceIdentity | null = null; private restartInFlight: Promise | null = null; private readonly connectionMonitor = new GatewayConnectionMonitor(); private readonly lifecycleController = new GatewayLifecycleController(); private readonly restartController = new GatewayRestartController(); private readonly restartGovernor = new GatewayRestartGovernor(); private reloadDebounceTimer: NodeJS.Timeout | null = null; private reloadPolicy: GatewayReloadPolicy = { ...DEFAULT_GATEWAY_RELOAD_POLICY }; private reloadPolicyLoadedAt = 0; private reloadPolicyRefreshPromise: Promise | null = null; private externalShutdownSupported: boolean | null = null; private reconnectAttemptsTotal = 0; private reconnectSuccessTotal = 0; private static readonly RELOAD_POLICY_REFRESH_MS = 15_000; private static readonly HEARTBEAT_INTERVAL_MS = 30_000; private static readonly HEARTBEAT_TIMEOUT_MS = 12_000; private static readonly HEARTBEAT_MAX_MISSES = 3; public static readonly RESTART_COOLDOWN_MS = 5_000; private lastRestartAt = 0; constructor(config?: Partial) { super(); this.stateController = new GatewayStateController({ emitStatus: (status) => { this.status = status; this.emit('status', status); }, onTransition: (previousState, nextState) => { if (nextState === 'running') { this.restartGovernor.onRunning(); } this.restartController.flushDeferredRestart( `status:${previousState}->${nextState}`, { state: this.status.state, startLock: this.startLock, shouldReconnect: this.shouldReconnect, }, () => { void this.restart().catch((error) => { logger.warn('Deferred Gateway restart failed:', error); }); }, ); }, }); this.reconnectConfig = { ...DEFAULT_RECONNECT_CONFIG, ...config }; // Device identity is loaded lazily in start() — not in the constructor — // so that async file I/O and key generation don't block module loading. } private async initDeviceIdentity(): Promise { if (this.deviceIdentity) return; // already loaded try { const identityPath = path.join(app.getPath('userData'), 'clawx-device-identity.json'); this.deviceIdentity = await loadOrCreateDeviceIdentity(identityPath); logger.debug(`Device identity loaded (deviceId=${this.deviceIdentity.deviceId})`); } catch (err) { logger.warn('Failed to load device identity, scopes will be limited:', err); } } private sanitizeSpawnArgs(args: string[]): string[] { const sanitized = [...args]; const tokenIdx = sanitized.indexOf('--token'); if (tokenIdx !== -1 && tokenIdx + 1 < sanitized.length) { sanitized[tokenIdx + 1] = '[redacted]'; } return sanitized; } private isUnsupportedShutdownError(error: unknown): boolean { const message = error instanceof Error ? error.message : String(error); return /unknown method:\s*shutdown/i.test(message); } /** * Get current Gateway status */ getStatus(): GatewayStatus { return this.stateController.getStatus(); } /** * Check if Gateway is connected and ready */ isConnected(): boolean { return this.stateController.isConnected(this.ws?.readyState === WebSocket.OPEN); } /** * Start Gateway process */ async start(): Promise { if (this.startLock) { logger.debug('Gateway start ignored because a start flow is already in progress'); return; } if (this.status.state === 'running') { logger.debug('Gateway already running, skipping start'); return; } this.startLock = true; const startEpoch = this.lifecycleController.bump('start'); logger.info(`Gateway start requested (port=${this.status.port})`); this.lastSpawnSummary = null; this.shouldReconnect = true; await this.refreshReloadPolicy(true); // Lazily load device identity (async file I/O + key generation). // Must happen before connect() which uses the identity for the handshake. await this.initDeviceIdentity(); // Manual start should override and cancel any pending reconnect timer. if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; logger.debug('Cleared pending reconnect timer because start was requested manually'); } this.reconnectAttempts = 0; this.setStatus({ state: 'starting', reconnectAttempts: 0 }); // Check if Python environment is ready (self-healing) asynchronously. // Fire-and-forget: only needs to run once, not on every retry. warmupManagedPythonReadiness(); try { await runGatewayStartupSequence({ port: this.status.port, ownedPid: this.process?.pid, shouldWaitForPortFree: process.platform === 'win32', resetStartupStderrLines: () => { this.recentStartupStderrLines = []; }, getStartupStderrLines: () => this.recentStartupStderrLines, assertLifecycle: (phase) => { this.lifecycleController.assert(startEpoch, phase); }, findExistingGateway: async (port) => { // Always read the current process pid dynamically so that retries // don't treat a just-spawned gateway as an orphan. The ownedPid // snapshot captured at start() entry is stale after startProcess() // replaces this.process — leading to the just-started pid being // immediately killed as a false orphan on the next retry iteration. return await findExistingGatewayProcess({ port, ownedPid: this.process?.pid }); }, connect: async (port, externalToken) => { await this.connect(port, externalToken); }, onConnectedToExistingGateway: () => { // If the existing gateway is actually our own spawned UtilityProcess // (e.g. after a self-restart code=1012), keep ownership so that // stop() can still terminate the process during a restart() cycle. const isOwnProcess = this.process?.pid != null && this.ownsProcess; if (!isOwnProcess) { this.ownsProcess = false; this.setStatus({ pid: undefined }); } this.startHealthCheck(); }, waitForPortFree: async (port) => { await waitForPortFree(port); }, startProcess: async () => { await this.startProcess(); }, waitForReady: async (port) => { await waitForGatewayReady({ port, getProcessExitCode: () => this.processExitCode, }); }, onConnectedToManagedGateway: () => { this.startHealthCheck(); logger.debug('Gateway started successfully'); }, runDoctorRepair: async () => await runOpenClawDoctorRepair(), onDoctorRepairSuccess: () => { this.setStatus({ state: 'starting', error: undefined, reconnectAttempts: 0 }); }, delay: async (ms) => { await new Promise((resolve) => setTimeout(resolve, ms)); }, }); } catch (error) { if (error instanceof LifecycleSupersededError) { logger.debug(error.message); return; } logger.error( `Gateway start failed (port=${this.status.port}, reconnectAttempts=${this.reconnectAttempts}, spawn=${this.lastSpawnSummary ?? 'n/a'})`, error ); this.setStatus({ state: 'error', error: String(error) }); throw error; } finally { this.startLock = false; this.restartController.flushDeferredRestart( 'start:finally', { state: this.status.state, startLock: this.startLock, shouldReconnect: this.shouldReconnect, }, () => { void this.restart().catch((error) => { logger.warn('Deferred Gateway restart failed:', error); }); }, ); } } /** * Stop Gateway process */ async stop(): Promise { logger.info('Gateway stop requested'); this.lifecycleController.bump('stop'); // Disable auto-reconnect this.shouldReconnect = false; // Clear all timers this.clearAllTimers(); // If this manager is attached to an external gateway process, ask it to shut down // over protocol before closing the socket. if (!this.ownsProcess && this.ws?.readyState === WebSocket.OPEN && this.externalShutdownSupported !== false) { try { await this.rpc('shutdown', undefined, 5000); this.externalShutdownSupported = true; } catch (error) { if (this.isUnsupportedShutdownError(error)) { this.externalShutdownSupported = false; logger.info('External Gateway does not support "shutdown"; skipping shutdown RPC for future stops'); } else { logger.warn('Failed to request shutdown for externally managed Gateway:', error); } } } // Close WebSocket — use terminate() to force-close the TCP connection // immediately without waiting for the WebSocket close handshake. // ws.close() sends a close frame and waits for the server to respond; // if the gateway process is being killed concurrently, the handshake // never completes and the connection stays ESTABLISHED indefinitely, // accumulating leaked connections on every restart cycle. if (this.ws) { try { this.ws.terminate(); } catch { /* ignore */ } this.ws = null; } // Kill process if (this.process && this.ownsProcess) { const child = this.process; await terminateOwnedGatewayProcess(child); if (this.process === child) { this.process = null; } } this.ownsProcess = false; clearPendingGatewayRequests(this.pendingRequests, new Error('Gateway stopped')); this.restartController.resetDeferredRestart(); this.setStatus({ state: 'stopped', error: undefined, pid: undefined, connectedAt: undefined, uptime: undefined }); } /** * Best-effort emergency cleanup for app-quit timeout paths. * Only terminates a process this manager still owns. */ async forceTerminateOwnedProcessForQuit(): Promise { if (!this.process || !this.ownsProcess) { return false; } const child = this.process; await terminateOwnedGatewayProcess(child); if (this.process === child) { this.process = null; } this.ownsProcess = false; this.setStatus({ pid: undefined }); return true; } /** * Restart Gateway process */ async restart(): Promise { if (this.restartController.isRestartDeferred({ state: this.status.state, startLock: this.startLock, })) { this.restartController.markDeferredRestart('restart', { state: this.status.state, startLock: this.startLock, }); return; } if (this.restartInFlight) { logger.debug('Gateway restart already in progress, joining existing request'); await this.restartInFlight; return; } const decision = this.restartGovernor.decide(); if (!decision.allow) { const observability = this.restartGovernor.getObservability(); logger.warn( `[gateway-restart-governor] restart suppressed reason=${decision.reason} retryAfterMs=${decision.retryAfterMs} ` + `suppressed=${observability.suppressed_total} executed=${observability.executed_total} circuitOpenUntil=${observability.circuit_open_until}`, ); const props = { reason: decision.reason, retry_after_ms: decision.retryAfterMs, gateway_restart_suppressed_total: observability.suppressed_total, gateway_restart_executed_total: observability.executed_total, gateway_restart_circuit_open_until: observability.circuit_open_until, }; trackMetric('gateway.restart.suppressed', props); captureTelemetryEvent('gateway_restart_suppressed', props); return; } const pidBefore = this.status.pid; logger.info(`[gateway-refresh] mode=restart requested pidBefore=${pidBefore ?? 'n/a'}`); this.restartInFlight = (async () => { await this.stop(); await this.start(); })(); try { await this.restartInFlight; this.restartGovernor.recordExecuted(); this.restartController.recordRestartCompleted(); const observability = this.restartGovernor.getObservability(); const props = { gateway_restart_executed_total: observability.executed_total, gateway_restart_suppressed_total: observability.suppressed_total, gateway_restart_circuit_open_until: observability.circuit_open_until, }; trackMetric('gateway.restart.executed', props); captureTelemetryEvent('gateway_restart_executed', props); logger.info( `[gateway-refresh] mode=restart result=applied pidBefore=${pidBefore ?? 'n/a'} pidAfter=${this.status.pid ?? 'n/a'} ` + `suppressed=${observability.suppressed_total} executed=${observability.executed_total} circuitOpenUntil=${observability.circuit_open_until}`, ); } finally { this.restartInFlight = null; this.restartController.flushDeferredRestart( 'restart:finally', { state: this.status.state, startLock: this.startLock, shouldReconnect: this.shouldReconnect, }, () => { void this.restart().catch((error) => { logger.warn('Deferred Gateway restart failed:', error); }); }, ); } } /** * Debounced restart — coalesces multiple rapid restart requests into a * single restart after `delayMs` of inactivity. This prevents the * cascading stop/start cycles that occur when provider:save, * provider:setDefault and channel:saveConfig all fire within seconds * of each other during setup. */ debouncedRestart(delayMs = 2000): void { this.restartController.debouncedRestart(delayMs, () => { void this.restart().catch((err) => { logger.warn('Debounced Gateway restart failed:', err); }); }); } /** * Ask the Gateway process to reload config in-place when possible. * Falls back to restart on unsupported platforms or signaling failures. */ async reload(): Promise { await this.refreshReloadPolicy(); if (this.reloadPolicy.mode === 'off' || this.reloadPolicy.mode === 'restart') { logger.info( `[gateway-refresh] mode=reload result=policy_forced_restart policy=${this.reloadPolicy.mode}`, ); await this.restart(); return; } if (this.restartController.isRestartDeferred({ state: this.status.state, startLock: this.startLock, })) { this.restartController.markDeferredRestart('reload', { state: this.status.state, startLock: this.startLock, }); return; } const pidBefore = this.process?.pid; logger.info(`[gateway-refresh] mode=reload requested pid=${pidBefore ?? 'n/a'} state=${this.status.state}`); if (!this.process?.pid || this.status.state !== 'running') { logger.warn('[gateway-refresh] mode=reload result=fallback_restart cause=not_running'); logger.warn('Gateway reload requested while not running; falling back to restart'); await this.restart(); return; } const connectedForMs = this.status.connectedAt ? Date.now() - this.status.connectedAt : Number.POSITIVE_INFINITY; // Avoid signaling a process that just came up; it will already read latest config. if (connectedForMs < 8000) { logger.info( `[gateway-refresh] mode=reload result=skipped_recent_connect connectedForMs=${connectedForMs} pid=${this.process.pid}`, ); logger.info(`Gateway connected ${connectedForMs}ms ago, skipping reload signal`); return; } if (process.platform === 'win32') { // Windows does not support SIGUSR1 for in-process reload. // Fall back to a full restart. The connectedForMs < 8000 guard above // already skips unnecessary restarts for recently-started processes. logger.warn('[gateway-refresh] mode=reload result=fallback_restart cause=windows'); await this.restart(); return; } try { process.kill(this.process.pid, 'SIGUSR1'); logger.info(`Sent SIGUSR1 to Gateway for config reload (pid=${this.process.pid})`); // Some gateway builds do not handle SIGUSR1 as an in-process reload. // If process state doesn't recover quickly, fall back to restart. await new Promise((resolve) => setTimeout(resolve, 1500)); if (this.status.state !== 'running' || !this.process?.pid) { logger.warn('[gateway-refresh] mode=reload result=fallback_restart cause=post_signal_unhealthy'); logger.warn('Gateway did not stay running after reload signal, falling back to restart'); await this.restart(); } else { const pidAfter = this.process.pid; logger.info( `[gateway-refresh] mode=reload result=applied_in_place pidBefore=${pidBefore} pidAfter=${pidAfter}`, ); } } catch (error) { logger.warn('[gateway-refresh] mode=reload result=fallback_restart cause=signal_error'); logger.warn('Gateway reload signal failed, falling back to restart:', error); await this.restart(); } } /** * Debounced reload — coalesces multiple rapid config-change events into one * in-process reload when possible. */ debouncedReload(delayMs?: number): void { void this.refreshReloadPolicy(); const effectiveDelay = delayMs ?? this.reloadPolicy.debounceMs; if (this.reloadPolicy.mode === 'off' || this.reloadPolicy.mode === 'restart') { logger.debug( `Gateway reload policy=${this.reloadPolicy.mode}; routing debouncedReload to debouncedRestart (${effectiveDelay}ms)`, ); this.debouncedRestart(effectiveDelay); return; } if (this.reloadDebounceTimer) { clearTimeout(this.reloadDebounceTimer); } logger.debug(`Gateway reload debounced (will fire in ${effectiveDelay}ms)`); this.reloadDebounceTimer = setTimeout(() => { this.reloadDebounceTimer = null; void this.reload().catch((err) => { logger.warn('Debounced Gateway reload failed:', err); }); }, effectiveDelay); } private async refreshReloadPolicy(force = false): Promise { const now = Date.now(); if (!force && now - this.reloadPolicyLoadedAt < GatewayManager.RELOAD_POLICY_REFRESH_MS) { return; } if (this.reloadPolicyRefreshPromise) { await this.reloadPolicyRefreshPromise; return; } this.reloadPolicyRefreshPromise = (async () => { const nextPolicy = await loadGatewayReloadPolicy(); this.reloadPolicy = nextPolicy; this.reloadPolicyLoadedAt = Date.now(); })(); try { await this.reloadPolicyRefreshPromise; } finally { this.reloadPolicyRefreshPromise = null; } } /** * Clear all active timers */ private clearAllTimers(): void { if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; } this.connectionMonitor.clear(); this.restartController.clearDebounceTimer(); if (this.reloadDebounceTimer) { clearTimeout(this.reloadDebounceTimer); this.reloadDebounceTimer = null; } } /** * Make an RPC call to the Gateway * Uses OpenClaw protocol format: { type: "req", id: "...", method: "...", params: {...} } */ async rpc(method: string, params?: unknown, timeoutMs = 30000): Promise { return new Promise((resolve, reject) => { if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { reject(new Error('Gateway not connected')); return; } const id = crypto.randomUUID(); // Set timeout for request const timeout = setTimeout(() => { rejectPendingGatewayRequest(this.pendingRequests, id, new Error(`RPC timeout: ${method}`)); }, timeoutMs); // Store pending request this.pendingRequests.set(id, { resolve: resolve as (value: unknown) => void, reject, timeout, }); // Send request using OpenClaw protocol format const request = { type: 'req', id, method, params, }; try { this.ws.send(JSON.stringify(request)); } catch (error) { rejectPendingGatewayRequest(this.pendingRequests, id, new Error(`Failed to send RPC request: ${error}`)); } }); } /** * Start health check monitoring */ private startHealthCheck(): void { this.connectionMonitor.startHealthCheck({ shouldCheck: () => this.status.state === 'running', checkHealth: () => this.checkHealth(), onUnhealthy: (errorMessage) => { this.emit('error', new Error(errorMessage)); }, onError: () => { // The monitor already logged the error; nothing else to do here. }, }); } /** * Check Gateway health via WebSocket ping * OpenClaw Gateway doesn't have an HTTP /health endpoint */ async checkHealth(): Promise<{ ok: boolean; error?: string; uptime?: number }> { try { if (this.ws && this.ws.readyState === WebSocket.OPEN) { const uptime = this.status.connectedAt ? Math.floor((Date.now() - this.status.connectedAt) / 1000) : undefined; return { ok: true, uptime }; } return { ok: false, error: 'WebSocket not connected' }; } catch (error) { return { ok: false, error: String(error) }; } } /** * Start Gateway process * Uses OpenClaw npm package from node_modules (dev) or resources (production) */ private async startProcess(): Promise { const launchContext = await prepareGatewayLaunchContext(this.status.port); await unloadLaunchctlGatewayService(); this.processExitCode = null; // Per-process dedup map for stderr lines — resets on each new spawn. const stderrDedup = new Map(); const { child, lastSpawnSummary } = await launchGatewayProcess({ port: this.status.port, launchContext, sanitizeSpawnArgs: (args) => this.sanitizeSpawnArgs(args), getCurrentState: () => this.status.state, getShouldReconnect: () => this.shouldReconnect, onStderrLine: (line) => { recordGatewayStartupStderrLine(this.recentStartupStderrLines, line); const classified = classifyGatewayStderrMessage(line); if (classified.level === 'drop') return; // Dedup: suppress identical stderr lines after the first occurrence. const count = (stderrDedup.get(classified.normalized) ?? 0) + 1; stderrDedup.set(classified.normalized, count); if (count > 1) { // Log a summary every 50 duplicates to stay visible without flooding. if (count % 50 === 0) { logger.debug(`[Gateway stderr] (suppressed ${count} repeats) ${classified.normalized}`); } return; } if (classified.level === 'debug') { logger.debug(`[Gateway stderr] ${classified.normalized}`); return; } logger.warn(`[Gateway stderr] ${classified.normalized}`); }, onSpawn: (pid) => { this.setStatus({ pid }); }, onExit: (exitedChild, code) => { this.processExitCode = code; this.ownsProcess = false; this.connectionMonitor.clear(); if (this.process === exitedChild) { this.process = null; } this.emit('exit', code); if (this.status.state === 'running') { this.setStatus({ state: 'stopped' }); this.scheduleReconnect(); } }, onError: () => { this.ownsProcess = false; if (this.process === child) { this.process = null; } }, }); this.process = child; this.ownsProcess = true; logger.debug(`Gateway manager now owns process pid=${child.pid ?? 'unknown'}`); this.lastSpawnSummary = lastSpawnSummary; } /** * Connect WebSocket to Gateway */ private async connect(port: number, _externalToken?: string): Promise { this.ws = await connectGatewaySocket({ port, deviceIdentity: this.deviceIdentity, platform: process.platform, pendingRequests: this.pendingRequests, getToken: async () => await import('../utils/store').then(({ getSetting }) => getSetting('gatewayToken')), onHandshakeComplete: (ws) => { this.ws = ws; ws.on('pong', () => { this.connectionMonitor.markAlive('pong'); }); this.setStatus({ state: 'running', port, connectedAt: Date.now(), }); this.startPing(); }, onMessage: (message) => { this.handleMessage(message); }, onCloseAfterHandshake: (closeCode) => { this.connectionMonitor.clear(); if (this.status.state === 'running') { this.setStatus({ state: 'stopped' }); // On Windows, skip reconnect from WS close. The Gateway is a local // child process; actual crashes are already caught by the process exit // handler (`onExit`) which calls scheduleReconnect(). Triggering // reconnect from WS close as well races with the exit handler and can // cause double start() attempts or port conflicts during TCP TIME_WAIT. // // Exception: code=1012 means the Gateway is performing an in-process // restart (e.g. config reload). The UtilityProcess stays alive, so // `onExit` will never fire — we MUST reconnect from the WS close path. if (process.platform !== 'win32' || closeCode === 1012) { this.scheduleReconnect(); } } }, }); } /** * Handle incoming WebSocket message */ private handleMessage(message: unknown): void { this.connectionMonitor.markAlive('message'); if (typeof message !== 'object' || message === null) { logger.debug('Received non-object Gateway message'); return; } const msg = message as Record; // Handle OpenClaw protocol response format: { type: "res", id: "...", ok: true/false, ... } if (msg.type === 'res' && typeof msg.id === 'string') { if (msg.ok === false || msg.error) { const errorObj = msg.error as { message?: string; code?: number } | undefined; const errorMsg = errorObj?.message || JSON.stringify(msg.error) || 'Unknown error'; if (rejectPendingGatewayRequest(this.pendingRequests, msg.id, new Error(errorMsg))) { return; } } else if (resolvePendingGatewayRequest(this.pendingRequests, msg.id, msg.payload ?? msg)) { return; } } // Handle OpenClaw protocol event format: { type: "event", event: "...", payload: {...} } if (msg.type === 'event' && typeof msg.event === 'string') { dispatchProtocolEvent(this, msg.event, msg.payload); return; } // Fallback: Check if this is a JSON-RPC 2.0 response (legacy support) if (isResponse(message) && message.id && this.pendingRequests.has(String(message.id))) { if (message.error) { const errorMsg = typeof message.error === 'object' ? (message.error as { message?: string }).message || JSON.stringify(message.error) : String(message.error); rejectPendingGatewayRequest(this.pendingRequests, String(message.id), new Error(errorMsg)); } else { resolvePendingGatewayRequest(this.pendingRequests, String(message.id), message.result); } return; } // Check if this is a JSON-RPC notification (server-initiated event) if (isNotification(message)) { dispatchJsonRpcNotification(this, message); return; } this.emit('message', message); } /** * Start ping interval to keep connection alive */ private startPing(): void { this.connectionMonitor.startPing({ intervalMs: GatewayManager.HEARTBEAT_INTERVAL_MS, timeoutMs: GatewayManager.HEARTBEAT_TIMEOUT_MS, maxConsecutiveMisses: GatewayManager.HEARTBEAT_MAX_MISSES, sendPing: () => { if (this.ws?.readyState === WebSocket.OPEN) { this.ws.ping(); } }, onHeartbeatTimeout: ({ consecutiveMisses, timeoutMs }) => { if (this.status.state !== 'running' || !this.shouldReconnect) { return; } const ws = this.ws; if (!ws || ws.readyState !== WebSocket.OPEN) { return; } logger.warn( `Gateway heartbeat timed out after ${consecutiveMisses} consecutive misses (timeout=${timeoutMs}ms); terminating stale socket`, ); try { ws.terminate(); } catch (error) { logger.warn('Failed to terminate stale Gateway socket after heartbeat timeout:', error); } // On Windows, onCloseAfterHandshake intentionally skips scheduleReconnect() // to avoid double-reconnect races with the process exit handler. However, // a heartbeat timeout means the socket is stale while the process may still // be alive (no exit event), so we must explicitly trigger reconnect here. if (process.platform === 'win32') { this.scheduleReconnect(); } }, }); } /** * Schedule reconnection attempt with exponential backoff */ private scheduleReconnect(): void { const decision = getReconnectScheduleDecision({ shouldReconnect: this.shouldReconnect, hasReconnectTimer: this.reconnectTimer !== null, reconnectAttempts: this.reconnectAttempts, maxAttempts: this.reconnectConfig.maxAttempts, baseDelay: this.reconnectConfig.baseDelay, maxDelay: this.reconnectConfig.maxDelay, }); if (decision.action === 'skip') { logger.debug(`Gateway reconnect skipped (${decision.reason})`); return; } if (decision.action === 'already-scheduled') { return; } if (decision.action === 'fail') { logger.error(`Gateway reconnect failed: max attempts reached (${decision.maxAttempts})`); this.setStatus({ state: 'error', error: 'Failed to reconnect after maximum attempts', reconnectAttempts: this.reconnectAttempts }); return; } const cooldownRemaining = Math.max(0, GatewayManager.RESTART_COOLDOWN_MS - (Date.now() - this.lastRestartAt)); const { delay, nextAttempt, maxAttempts } = decision; const effectiveDelay = Math.max(delay, cooldownRemaining); this.reconnectAttempts = nextAttempt; logger.warn(`Scheduling Gateway reconnect attempt ${nextAttempt}/${maxAttempts} in ${effectiveDelay}ms`); this.setStatus({ state: 'reconnecting', reconnectAttempts: this.reconnectAttempts }); const scheduledEpoch = this.lifecycleController.getCurrentEpoch(); this.reconnectTimer = setTimeout(async () => { this.reconnectTimer = null; const skipReason = getReconnectSkipReason({ scheduledEpoch, currentEpoch: this.lifecycleController.getCurrentEpoch(), shouldReconnect: this.shouldReconnect, }); if (skipReason) { logger.debug(`Skipping reconnect attempt: ${skipReason}`); return; } const attemptNo = this.reconnectAttempts; this.reconnectAttemptsTotal += 1; try { // Use the guarded start() flow so reconnect attempts cannot bypass // lifecycle locking and accidentally start duplicate Gateway processes. await this.start(); this.reconnectSuccessTotal += 1; this.emitReconnectMetric('success', { attemptNo, maxAttempts, delayMs: effectiveDelay, }); this.reconnectAttempts = 0; } catch (error) { logger.error('Gateway reconnection attempt failed:', error); this.emitReconnectMetric('failure', { attemptNo, maxAttempts, delayMs: effectiveDelay, error: error instanceof Error ? error.message : String(error), }); this.scheduleReconnect(); } }, effectiveDelay); } private emitReconnectMetric( outcome: 'success' | 'failure', payload: { attemptNo: number; maxAttempts: number; delayMs: number; error?: string; }, ): void { const successRate = this.reconnectAttemptsTotal > 0 ? this.reconnectSuccessTotal / this.reconnectAttemptsTotal : 0; const properties = { outcome, attemptNo: payload.attemptNo, maxAttempts: payload.maxAttempts, delayMs: payload.delayMs, gateway_reconnect_success_count: this.reconnectSuccessTotal, gateway_reconnect_attempt_count: this.reconnectAttemptsTotal, gateway_reconnect_success_rate: Number(successRate.toFixed(4)), ...(payload.error ? { error: payload.error } : {}), }; trackMetric('gateway.reconnect', properties); // Keep local metrics only; do not upload reconnect details to PostHog. } /** * Update status and emit event */ private setStatus(update: Partial): void { this.stateController.setStatus(update); } }