852 lines
28 KiB
TypeScript
852 lines
28 KiB
TypeScript
/**
|
|
* Gateway Process Manager
|
|
* Manages the OpenClaw Gateway process lifecycle
|
|
*/
|
|
import { app } from 'electron';
|
|
import path from 'path';
|
|
import { EventEmitter } from 'events';
|
|
import WebSocket from 'ws';
|
|
import { PORTS } from '../utils/config';
|
|
import { JsonRpcNotification, isNotification, isResponse } from './protocol';
|
|
import { logger } from '../utils/logger';
|
|
import { captureTelemetryEvent, trackMetric } from '../utils/telemetry';
|
|
import {
|
|
loadOrCreateDeviceIdentity,
|
|
type DeviceIdentity,
|
|
} from '../utils/device-identity';
|
|
import {
|
|
DEFAULT_RECONNECT_CONFIG,
|
|
type ReconnectConfig,
|
|
type GatewayLifecycleState,
|
|
getReconnectScheduleDecision,
|
|
getReconnectSkipReason,
|
|
} from './process-policy';
|
|
import {
|
|
clearPendingGatewayRequests,
|
|
rejectPendingGatewayRequest,
|
|
resolvePendingGatewayRequest,
|
|
type PendingGatewayRequest,
|
|
} from './request-store';
|
|
import { dispatchJsonRpcNotification, dispatchProtocolEvent } from './event-dispatch';
|
|
import { GatewayStateController } from './state';
|
|
import { prepareGatewayLaunchContext } from './config-sync';
|
|
import { connectGatewaySocket, waitForGatewayReady } from './ws-client';
|
|
import {
|
|
findExistingGatewayProcess,
|
|
runOpenClawDoctorRepair,
|
|
terminateOwnedGatewayProcess,
|
|
unloadLaunchctlGatewayService,
|
|
waitForPortFree,
|
|
warmupManagedPythonReadiness,
|
|
} from './supervisor';
|
|
import { GatewayConnectionMonitor } from './connection-monitor';
|
|
import { GatewayLifecycleController, LifecycleSupersededError } from './lifecycle-controller';
|
|
import { launchGatewayProcess } from './process-launcher';
|
|
import { GatewayRestartController } from './restart-controller';
|
|
import { classifyGatewayStderrMessage, recordGatewayStartupStderrLine } from './startup-stderr';
|
|
import { runGatewayStartupSequence } from './startup-orchestrator';
|
|
|
|
export interface GatewayStatus {
|
|
state: GatewayLifecycleState;
|
|
port: number;
|
|
pid?: number;
|
|
uptime?: number;
|
|
error?: string;
|
|
connectedAt?: number;
|
|
version?: string;
|
|
reconnectAttempts?: number;
|
|
}
|
|
|
|
/**
|
|
* Gateway Manager Events
|
|
*/
|
|
export interface GatewayManagerEvents {
|
|
status: (status: GatewayStatus) => void;
|
|
message: (message: unknown) => void;
|
|
notification: (notification: JsonRpcNotification) => void;
|
|
exit: (code: number | null) => void;
|
|
error: (error: Error) => void;
|
|
'channel:status': (data: { channelId: string; status: string }) => void;
|
|
'chat:message': (data: { message: unknown }) => void;
|
|
}
|
|
|
|
/**
|
|
* Gateway Manager
|
|
* Handles starting, stopping, and communicating with the OpenClaw Gateway
|
|
*/
|
|
export class GatewayManager extends EventEmitter {
|
|
private process: Electron.UtilityProcess | null = null;
|
|
private processExitCode: number | null = null; // set by exit event, replaces exitCode/signalCode
|
|
private ownsProcess = false;
|
|
private ws: WebSocket | null = null;
|
|
private status: GatewayStatus = { state: 'stopped', port: PORTS.OPENCLAW_GATEWAY };
|
|
private readonly stateController: GatewayStateController;
|
|
private reconnectTimer: NodeJS.Timeout | null = null;
|
|
private reconnectAttempts = 0;
|
|
private reconnectConfig: ReconnectConfig;
|
|
private shouldReconnect = true;
|
|
private startLock = false;
|
|
private lastSpawnSummary: string | null = null;
|
|
private recentStartupStderrLines: string[] = [];
|
|
private pendingRequests: Map<string, PendingGatewayRequest> = new Map();
|
|
private deviceIdentity: DeviceIdentity | null = null;
|
|
private restartInFlight: Promise<void> | null = null;
|
|
private readonly connectionMonitor = new GatewayConnectionMonitor();
|
|
private readonly lifecycleController = new GatewayLifecycleController();
|
|
private readonly restartController = new GatewayRestartController();
|
|
private reloadDebounceTimer: NodeJS.Timeout | null = null;
|
|
private externalShutdownSupported: boolean | null = null;
|
|
private lastRestartAt = 0;
|
|
private reconnectAttemptsTotal = 0;
|
|
private reconnectSuccessTotal = 0;
|
|
private static readonly RESTART_COOLDOWN_MS = 2500;
|
|
|
|
constructor(config?: Partial<ReconnectConfig>) {
|
|
super();
|
|
this.stateController = new GatewayStateController({
|
|
emitStatus: (status) => {
|
|
this.status = status;
|
|
this.emit('status', status);
|
|
},
|
|
onTransition: (previousState, nextState) => {
|
|
this.restartController.flushDeferredRestart(
|
|
`status:${previousState}->${nextState}`,
|
|
{
|
|
state: this.status.state,
|
|
startLock: this.startLock,
|
|
shouldReconnect: this.shouldReconnect,
|
|
},
|
|
() => {
|
|
void this.restart().catch((error) => {
|
|
logger.warn('Deferred Gateway restart failed:', error);
|
|
});
|
|
},
|
|
);
|
|
},
|
|
});
|
|
this.reconnectConfig = { ...DEFAULT_RECONNECT_CONFIG, ...config };
|
|
// Device identity is loaded lazily in start() — not in the constructor —
|
|
// so that async file I/O and key generation don't block module loading.
|
|
}
|
|
|
|
private async initDeviceIdentity(): Promise<void> {
|
|
if (this.deviceIdentity) return; // already loaded
|
|
try {
|
|
const identityPath = path.join(app.getPath('userData'), 'clawx-device-identity.json');
|
|
this.deviceIdentity = await loadOrCreateDeviceIdentity(identityPath);
|
|
logger.debug(`Device identity loaded (deviceId=${this.deviceIdentity.deviceId})`);
|
|
} catch (err) {
|
|
logger.warn('Failed to load device identity, scopes will be limited:', err);
|
|
}
|
|
}
|
|
|
|
private sanitizeSpawnArgs(args: string[]): string[] {
|
|
const sanitized = [...args];
|
|
const tokenIdx = sanitized.indexOf('--token');
|
|
if (tokenIdx !== -1 && tokenIdx + 1 < sanitized.length) {
|
|
sanitized[tokenIdx + 1] = '[redacted]';
|
|
}
|
|
return sanitized;
|
|
}
|
|
|
|
private isUnsupportedShutdownError(error: unknown): boolean {
|
|
const message = error instanceof Error ? error.message : String(error);
|
|
return /unknown method:\s*shutdown/i.test(message);
|
|
}
|
|
/**
|
|
* Get current Gateway status
|
|
*/
|
|
getStatus(): GatewayStatus {
|
|
return this.stateController.getStatus();
|
|
}
|
|
|
|
/**
|
|
* Check if Gateway is connected and ready
|
|
*/
|
|
isConnected(): boolean {
|
|
return this.stateController.isConnected(this.ws?.readyState === WebSocket.OPEN);
|
|
}
|
|
|
|
/**
|
|
* Start Gateway process
|
|
*/
|
|
async start(): Promise<void> {
|
|
if (this.startLock) {
|
|
logger.debug('Gateway start ignored because a start flow is already in progress');
|
|
return;
|
|
}
|
|
|
|
if (this.status.state === 'running') {
|
|
logger.debug('Gateway already running, skipping start');
|
|
return;
|
|
}
|
|
|
|
this.startLock = true;
|
|
const startEpoch = this.lifecycleController.bump('start');
|
|
logger.info(`Gateway start requested (port=${this.status.port})`);
|
|
this.lastSpawnSummary = null;
|
|
this.shouldReconnect = true;
|
|
|
|
// Lazily load device identity (async file I/O + key generation).
|
|
// Must happen before connect() which uses the identity for the handshake.
|
|
await this.initDeviceIdentity();
|
|
|
|
// Manual start should override and cancel any pending reconnect timer.
|
|
if (this.reconnectTimer) {
|
|
clearTimeout(this.reconnectTimer);
|
|
this.reconnectTimer = null;
|
|
logger.debug('Cleared pending reconnect timer because start was requested manually');
|
|
}
|
|
|
|
this.reconnectAttempts = 0;
|
|
this.setStatus({ state: 'starting', reconnectAttempts: 0 });
|
|
|
|
// Check if Python environment is ready (self-healing) asynchronously.
|
|
// Fire-and-forget: only needs to run once, not on every retry.
|
|
warmupManagedPythonReadiness();
|
|
|
|
try {
|
|
await runGatewayStartupSequence({
|
|
port: this.status.port,
|
|
ownedPid: this.process?.pid,
|
|
shouldWaitForPortFree: process.platform === 'win32',
|
|
resetStartupStderrLines: () => {
|
|
this.recentStartupStderrLines = [];
|
|
},
|
|
getStartupStderrLines: () => this.recentStartupStderrLines,
|
|
assertLifecycle: (phase) => {
|
|
this.lifecycleController.assert(startEpoch, phase);
|
|
},
|
|
findExistingGateway: async (port, ownedPid) => {
|
|
return await findExistingGatewayProcess({ port, ownedPid });
|
|
},
|
|
connect: async (port, externalToken) => {
|
|
await this.connect(port, externalToken);
|
|
},
|
|
onConnectedToExistingGateway: () => {
|
|
this.ownsProcess = false;
|
|
this.setStatus({ pid: undefined });
|
|
this.startHealthCheck();
|
|
},
|
|
waitForPortFree: async (port) => {
|
|
await waitForPortFree(port);
|
|
},
|
|
startProcess: async () => {
|
|
await this.startProcess();
|
|
},
|
|
waitForReady: async (port) => {
|
|
await waitForGatewayReady({
|
|
port,
|
|
getProcessExitCode: () => this.processExitCode,
|
|
});
|
|
},
|
|
onConnectedToManagedGateway: () => {
|
|
this.startHealthCheck();
|
|
logger.debug('Gateway started successfully');
|
|
},
|
|
runDoctorRepair: async () => await runOpenClawDoctorRepair(),
|
|
onDoctorRepairSuccess: () => {
|
|
this.setStatus({ state: 'starting', error: undefined, reconnectAttempts: 0 });
|
|
},
|
|
delay: async (ms) => {
|
|
await new Promise((resolve) => setTimeout(resolve, ms));
|
|
},
|
|
});
|
|
} catch (error) {
|
|
if (error instanceof LifecycleSupersededError) {
|
|
logger.debug(error.message);
|
|
return;
|
|
}
|
|
logger.error(
|
|
`Gateway start failed (port=${this.status.port}, reconnectAttempts=${this.reconnectAttempts}, spawn=${this.lastSpawnSummary ?? 'n/a'})`,
|
|
error
|
|
);
|
|
this.setStatus({ state: 'error', error: String(error) });
|
|
throw error;
|
|
} finally {
|
|
this.startLock = false;
|
|
this.restartController.flushDeferredRestart(
|
|
'start:finally',
|
|
{
|
|
state: this.status.state,
|
|
startLock: this.startLock,
|
|
shouldReconnect: this.shouldReconnect,
|
|
},
|
|
() => {
|
|
void this.restart().catch((error) => {
|
|
logger.warn('Deferred Gateway restart failed:', error);
|
|
});
|
|
},
|
|
);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Stop Gateway process
|
|
*/
|
|
async stop(): Promise<void> {
|
|
logger.info('Gateway stop requested');
|
|
this.lifecycleController.bump('stop');
|
|
// Disable auto-reconnect
|
|
this.shouldReconnect = false;
|
|
|
|
// Clear all timers
|
|
this.clearAllTimers();
|
|
|
|
// If this manager is attached to an external gateway process, ask it to shut down
|
|
// over protocol before closing the socket.
|
|
if (!this.ownsProcess && this.ws?.readyState === WebSocket.OPEN && this.externalShutdownSupported !== false) {
|
|
try {
|
|
await this.rpc('shutdown', undefined, 5000);
|
|
this.externalShutdownSupported = true;
|
|
} catch (error) {
|
|
if (this.isUnsupportedShutdownError(error)) {
|
|
this.externalShutdownSupported = false;
|
|
logger.info('External Gateway does not support "shutdown"; skipping shutdown RPC for future stops');
|
|
} else {
|
|
logger.warn('Failed to request shutdown for externally managed Gateway:', error);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Close WebSocket
|
|
if (this.ws) {
|
|
this.ws.close(1000, 'Gateway stopped by user');
|
|
this.ws = null;
|
|
}
|
|
|
|
// Kill process
|
|
if (this.process && this.ownsProcess) {
|
|
const child = this.process;
|
|
await terminateOwnedGatewayProcess(child);
|
|
|
|
if (this.process === child) {
|
|
this.process = null;
|
|
}
|
|
}
|
|
this.ownsProcess = false;
|
|
|
|
clearPendingGatewayRequests(this.pendingRequests, new Error('Gateway stopped'));
|
|
|
|
this.restartController.resetDeferredRestart();
|
|
this.setStatus({ state: 'stopped', error: undefined, pid: undefined, connectedAt: undefined, uptime: undefined });
|
|
}
|
|
|
|
/**
|
|
* Restart Gateway process
|
|
*/
|
|
async restart(): Promise<void> {
|
|
if (this.restartController.isRestartDeferred({
|
|
state: this.status.state,
|
|
startLock: this.startLock,
|
|
})) {
|
|
this.restartController.markDeferredRestart('restart', {
|
|
state: this.status.state,
|
|
startLock: this.startLock,
|
|
});
|
|
return;
|
|
}
|
|
|
|
if (this.restartInFlight) {
|
|
logger.debug('Gateway restart already in progress, joining existing request');
|
|
await this.restartInFlight;
|
|
return;
|
|
}
|
|
|
|
const now = Date.now();
|
|
const sinceLastRestart = now - this.lastRestartAt;
|
|
if (sinceLastRestart < GatewayManager.RESTART_COOLDOWN_MS) {
|
|
logger.info(
|
|
`Gateway restart skipped due to cooldown (${sinceLastRestart}ms < ${GatewayManager.RESTART_COOLDOWN_MS}ms)`,
|
|
);
|
|
return;
|
|
}
|
|
|
|
const pidBefore = this.status.pid;
|
|
logger.info(`[gateway-refresh] mode=restart requested pidBefore=${pidBefore ?? 'n/a'}`);
|
|
this.lastRestartAt = now;
|
|
this.restartInFlight = (async () => {
|
|
await this.stop();
|
|
await this.start();
|
|
})();
|
|
|
|
try {
|
|
await this.restartInFlight;
|
|
logger.info(
|
|
`[gateway-refresh] mode=restart result=applied pidBefore=${pidBefore ?? 'n/a'} pidAfter=${this.status.pid ?? 'n/a'}`,
|
|
);
|
|
} finally {
|
|
this.restartInFlight = null;
|
|
this.restartController.flushDeferredRestart(
|
|
'restart:finally',
|
|
{
|
|
state: this.status.state,
|
|
startLock: this.startLock,
|
|
shouldReconnect: this.shouldReconnect,
|
|
},
|
|
() => {
|
|
void this.restart().catch((error) => {
|
|
logger.warn('Deferred Gateway restart failed:', error);
|
|
});
|
|
},
|
|
);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Debounced restart — coalesces multiple rapid restart requests into a
|
|
* single restart after `delayMs` of inactivity. This prevents the
|
|
* cascading stop/start cycles that occur when provider:save,
|
|
* provider:setDefault and channel:saveConfig all fire within seconds
|
|
* of each other during setup.
|
|
*/
|
|
debouncedRestart(delayMs = 2000): void {
|
|
this.restartController.debouncedRestart(delayMs, () => {
|
|
void this.restart().catch((err) => {
|
|
logger.warn('Debounced Gateway restart failed:', err);
|
|
});
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Ask the Gateway process to reload config in-place when possible.
|
|
* Falls back to restart on unsupported platforms or signaling failures.
|
|
*/
|
|
async reload(): Promise<void> {
|
|
if (this.restartController.isRestartDeferred({
|
|
state: this.status.state,
|
|
startLock: this.startLock,
|
|
})) {
|
|
this.restartController.markDeferredRestart('reload', {
|
|
state: this.status.state,
|
|
startLock: this.startLock,
|
|
});
|
|
return;
|
|
}
|
|
|
|
const pidBefore = this.process?.pid;
|
|
logger.info(`[gateway-refresh] mode=reload requested pid=${pidBefore ?? 'n/a'} state=${this.status.state}`);
|
|
|
|
if (!this.process?.pid || this.status.state !== 'running') {
|
|
logger.warn('[gateway-refresh] mode=reload result=fallback_restart cause=not_running');
|
|
logger.warn('Gateway reload requested while not running; falling back to restart');
|
|
await this.restart();
|
|
return;
|
|
}
|
|
|
|
if (process.platform === 'win32') {
|
|
logger.warn('[gateway-refresh] mode=reload result=fallback_restart cause=windows');
|
|
logger.debug('Windows detected, falling back to Gateway restart for reload');
|
|
await this.restart();
|
|
return;
|
|
}
|
|
|
|
const connectedForMs = this.status.connectedAt
|
|
? Date.now() - this.status.connectedAt
|
|
: Number.POSITIVE_INFINITY;
|
|
|
|
// Avoid signaling a process that just came up; it will already read latest config.
|
|
if (connectedForMs < 8000) {
|
|
logger.info(
|
|
`[gateway-refresh] mode=reload result=skipped_recent_connect connectedForMs=${connectedForMs} pid=${this.process.pid}`,
|
|
);
|
|
logger.info(`Gateway connected ${connectedForMs}ms ago, skipping reload signal`);
|
|
return;
|
|
}
|
|
|
|
try {
|
|
process.kill(this.process.pid, 'SIGUSR1');
|
|
logger.info(`Sent SIGUSR1 to Gateway for config reload (pid=${this.process.pid})`);
|
|
// Some gateway builds do not handle SIGUSR1 as an in-process reload.
|
|
// If process state doesn't recover quickly, fall back to restart.
|
|
await new Promise((resolve) => setTimeout(resolve, 1500));
|
|
if (this.status.state !== 'running' || !this.process?.pid) {
|
|
logger.warn('[gateway-refresh] mode=reload result=fallback_restart cause=post_signal_unhealthy');
|
|
logger.warn('Gateway did not stay running after reload signal, falling back to restart');
|
|
await this.restart();
|
|
} else {
|
|
const pidAfter = this.process.pid;
|
|
logger.info(
|
|
`[gateway-refresh] mode=reload result=applied_in_place pidBefore=${pidBefore} pidAfter=${pidAfter}`,
|
|
);
|
|
}
|
|
} catch (error) {
|
|
logger.warn('[gateway-refresh] mode=reload result=fallback_restart cause=signal_error');
|
|
logger.warn('Gateway reload signal failed, falling back to restart:', error);
|
|
await this.restart();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Debounced reload — coalesces multiple rapid config-change events into one
|
|
* in-process reload when possible.
|
|
*/
|
|
debouncedReload(delayMs = 1200): void {
|
|
if (this.reloadDebounceTimer) {
|
|
clearTimeout(this.reloadDebounceTimer);
|
|
}
|
|
logger.debug(`Gateway reload debounced (will fire in ${delayMs}ms)`);
|
|
this.reloadDebounceTimer = setTimeout(() => {
|
|
this.reloadDebounceTimer = null;
|
|
void this.reload().catch((err) => {
|
|
logger.warn('Debounced Gateway reload failed:', err);
|
|
});
|
|
}, delayMs);
|
|
}
|
|
|
|
/**
|
|
* Clear all active timers
|
|
*/
|
|
private clearAllTimers(): void {
|
|
if (this.reconnectTimer) {
|
|
clearTimeout(this.reconnectTimer);
|
|
this.reconnectTimer = null;
|
|
}
|
|
this.connectionMonitor.clear();
|
|
this.restartController.clearDebounceTimer();
|
|
if (this.reloadDebounceTimer) {
|
|
clearTimeout(this.reloadDebounceTimer);
|
|
this.reloadDebounceTimer = null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Make an RPC call to the Gateway
|
|
* Uses OpenClaw protocol format: { type: "req", id: "...", method: "...", params: {...} }
|
|
*/
|
|
async rpc<T>(method: string, params?: unknown, timeoutMs = 30000): Promise<T> {
|
|
return new Promise((resolve, reject) => {
|
|
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
|
|
reject(new Error('Gateway not connected'));
|
|
return;
|
|
}
|
|
|
|
const id = crypto.randomUUID();
|
|
|
|
// Set timeout for request
|
|
const timeout = setTimeout(() => {
|
|
rejectPendingGatewayRequest(this.pendingRequests, id, new Error(`RPC timeout: ${method}`));
|
|
}, timeoutMs);
|
|
|
|
// Store pending request
|
|
this.pendingRequests.set(id, {
|
|
resolve: resolve as (value: unknown) => void,
|
|
reject,
|
|
timeout,
|
|
});
|
|
|
|
// Send request using OpenClaw protocol format
|
|
const request = {
|
|
type: 'req',
|
|
id,
|
|
method,
|
|
params,
|
|
};
|
|
|
|
try {
|
|
this.ws.send(JSON.stringify(request));
|
|
} catch (error) {
|
|
rejectPendingGatewayRequest(this.pendingRequests, id, new Error(`Failed to send RPC request: ${error}`));
|
|
}
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Start health check monitoring
|
|
*/
|
|
private startHealthCheck(): void {
|
|
this.connectionMonitor.startHealthCheck({
|
|
shouldCheck: () => this.status.state === 'running',
|
|
checkHealth: () => this.checkHealth(),
|
|
onUnhealthy: (errorMessage) => {
|
|
this.emit('error', new Error(errorMessage));
|
|
},
|
|
onError: () => {
|
|
// The monitor already logged the error; nothing else to do here.
|
|
},
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Check Gateway health via WebSocket ping
|
|
* OpenClaw Gateway doesn't have an HTTP /health endpoint
|
|
*/
|
|
async checkHealth(): Promise<{ ok: boolean; error?: string; uptime?: number }> {
|
|
try {
|
|
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
|
|
const uptime = this.status.connectedAt
|
|
? Math.floor((Date.now() - this.status.connectedAt) / 1000)
|
|
: undefined;
|
|
return { ok: true, uptime };
|
|
}
|
|
return { ok: false, error: 'WebSocket not connected' };
|
|
} catch (error) {
|
|
return { ok: false, error: String(error) };
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Start Gateway process
|
|
* Uses OpenClaw npm package from node_modules (dev) or resources (production)
|
|
*/
|
|
private async startProcess(): Promise<void> {
|
|
const launchContext = await prepareGatewayLaunchContext(this.status.port);
|
|
await unloadLaunchctlGatewayService();
|
|
this.processExitCode = null;
|
|
|
|
const { child, lastSpawnSummary } = await launchGatewayProcess({
|
|
port: this.status.port,
|
|
launchContext,
|
|
sanitizeSpawnArgs: (args) => this.sanitizeSpawnArgs(args),
|
|
getCurrentState: () => this.status.state,
|
|
getShouldReconnect: () => this.shouldReconnect,
|
|
onStderrLine: (line) => {
|
|
recordGatewayStartupStderrLine(this.recentStartupStderrLines, line);
|
|
const classified = classifyGatewayStderrMessage(line);
|
|
if (classified.level === 'drop') return;
|
|
if (classified.level === 'debug') {
|
|
logger.debug(`[Gateway stderr] ${classified.normalized}`);
|
|
return;
|
|
}
|
|
logger.warn(`[Gateway stderr] ${classified.normalized}`);
|
|
},
|
|
onSpawn: (pid) => {
|
|
this.setStatus({ pid });
|
|
},
|
|
onExit: (exitedChild, code) => {
|
|
this.processExitCode = code;
|
|
this.ownsProcess = false;
|
|
if (this.process === exitedChild) {
|
|
this.process = null;
|
|
}
|
|
this.emit('exit', code);
|
|
|
|
if (this.status.state === 'running') {
|
|
this.setStatus({ state: 'stopped' });
|
|
this.scheduleReconnect();
|
|
}
|
|
},
|
|
onError: () => {
|
|
this.ownsProcess = false;
|
|
if (this.process === child) {
|
|
this.process = null;
|
|
}
|
|
},
|
|
});
|
|
|
|
this.process = child;
|
|
this.ownsProcess = true;
|
|
this.lastSpawnSummary = lastSpawnSummary;
|
|
}
|
|
|
|
/**
|
|
* Connect WebSocket to Gateway
|
|
*/
|
|
private async connect(port: number, _externalToken?: string): Promise<void> {
|
|
this.ws = await connectGatewaySocket({
|
|
port,
|
|
deviceIdentity: this.deviceIdentity,
|
|
platform: process.platform,
|
|
pendingRequests: this.pendingRequests,
|
|
getToken: async () => await import('../utils/store').then(({ getSetting }) => getSetting('gatewayToken')),
|
|
onHandshakeComplete: (ws) => {
|
|
this.ws = ws;
|
|
this.setStatus({
|
|
state: 'running',
|
|
port,
|
|
connectedAt: Date.now(),
|
|
});
|
|
this.startPing();
|
|
},
|
|
onMessage: (message) => {
|
|
this.handleMessage(message);
|
|
},
|
|
onCloseAfterHandshake: () => {
|
|
if (this.status.state === 'running') {
|
|
this.setStatus({ state: 'stopped' });
|
|
this.scheduleReconnect();
|
|
}
|
|
},
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Handle incoming WebSocket message
|
|
*/
|
|
private handleMessage(message: unknown): void {
|
|
if (typeof message !== 'object' || message === null) {
|
|
logger.debug('Received non-object Gateway message');
|
|
return;
|
|
}
|
|
|
|
const msg = message as Record<string, unknown>;
|
|
|
|
// Handle OpenClaw protocol response format: { type: "res", id: "...", ok: true/false, ... }
|
|
if (msg.type === 'res' && typeof msg.id === 'string') {
|
|
if (msg.ok === false || msg.error) {
|
|
const errorObj = msg.error as { message?: string; code?: number } | undefined;
|
|
const errorMsg = errorObj?.message || JSON.stringify(msg.error) || 'Unknown error';
|
|
if (rejectPendingGatewayRequest(this.pendingRequests, msg.id, new Error(errorMsg))) {
|
|
return;
|
|
}
|
|
} else if (resolvePendingGatewayRequest(this.pendingRequests, msg.id, msg.payload ?? msg)) {
|
|
return;
|
|
}
|
|
}
|
|
|
|
// Handle OpenClaw protocol event format: { type: "event", event: "...", payload: {...} }
|
|
if (msg.type === 'event' && typeof msg.event === 'string') {
|
|
dispatchProtocolEvent(this, msg.event, msg.payload);
|
|
return;
|
|
}
|
|
|
|
// Fallback: Check if this is a JSON-RPC 2.0 response (legacy support)
|
|
if (isResponse(message) && message.id && this.pendingRequests.has(String(message.id))) {
|
|
if (message.error) {
|
|
const errorMsg = typeof message.error === 'object'
|
|
? (message.error as { message?: string }).message || JSON.stringify(message.error)
|
|
: String(message.error);
|
|
rejectPendingGatewayRequest(this.pendingRequests, String(message.id), new Error(errorMsg));
|
|
} else {
|
|
resolvePendingGatewayRequest(this.pendingRequests, String(message.id), message.result);
|
|
}
|
|
return;
|
|
}
|
|
|
|
// Check if this is a JSON-RPC notification (server-initiated event)
|
|
if (isNotification(message)) {
|
|
dispatchJsonRpcNotification(this, message);
|
|
return;
|
|
}
|
|
|
|
this.emit('message', message);
|
|
}
|
|
|
|
/**
|
|
* Start ping interval to keep connection alive
|
|
*/
|
|
private startPing(): void {
|
|
this.connectionMonitor.startPing(() => {
|
|
if (this.ws?.readyState === WebSocket.OPEN) {
|
|
this.ws.ping();
|
|
}
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Schedule reconnection attempt with exponential backoff
|
|
*/
|
|
private scheduleReconnect(): void {
|
|
const decision = getReconnectScheduleDecision({
|
|
shouldReconnect: this.shouldReconnect,
|
|
hasReconnectTimer: this.reconnectTimer !== null,
|
|
reconnectAttempts: this.reconnectAttempts,
|
|
maxAttempts: this.reconnectConfig.maxAttempts,
|
|
baseDelay: this.reconnectConfig.baseDelay,
|
|
maxDelay: this.reconnectConfig.maxDelay,
|
|
});
|
|
|
|
if (decision.action === 'skip') {
|
|
logger.debug(`Gateway reconnect skipped (${decision.reason})`);
|
|
return;
|
|
}
|
|
|
|
if (decision.action === 'already-scheduled') {
|
|
return;
|
|
}
|
|
|
|
if (decision.action === 'fail') {
|
|
logger.error(`Gateway reconnect failed: max attempts reached (${decision.maxAttempts})`);
|
|
this.setStatus({
|
|
state: 'error',
|
|
error: 'Failed to reconnect after maximum attempts',
|
|
reconnectAttempts: this.reconnectAttempts
|
|
});
|
|
return;
|
|
}
|
|
|
|
const cooldownRemaining = Math.max(0, GatewayManager.RESTART_COOLDOWN_MS - (Date.now() - this.lastRestartAt));
|
|
const { delay, nextAttempt, maxAttempts } = decision;
|
|
const effectiveDelay = Math.max(delay, cooldownRemaining);
|
|
this.reconnectAttempts = nextAttempt;
|
|
logger.warn(`Scheduling Gateway reconnect attempt ${nextAttempt}/${maxAttempts} in ${effectiveDelay}ms`);
|
|
|
|
this.setStatus({
|
|
state: 'reconnecting',
|
|
reconnectAttempts: this.reconnectAttempts
|
|
});
|
|
const scheduledEpoch = this.lifecycleController.getCurrentEpoch();
|
|
|
|
this.reconnectTimer = setTimeout(async () => {
|
|
this.reconnectTimer = null;
|
|
const skipReason = getReconnectSkipReason({
|
|
scheduledEpoch,
|
|
currentEpoch: this.lifecycleController.getCurrentEpoch(),
|
|
shouldReconnect: this.shouldReconnect,
|
|
});
|
|
if (skipReason) {
|
|
logger.debug(`Skipping reconnect attempt: ${skipReason}`);
|
|
return;
|
|
}
|
|
const attemptNo = this.reconnectAttempts;
|
|
this.reconnectAttemptsTotal += 1;
|
|
try {
|
|
// Use the guarded start() flow so reconnect attempts cannot bypass
|
|
// lifecycle locking and accidentally start duplicate Gateway processes.
|
|
await this.start();
|
|
this.reconnectSuccessTotal += 1;
|
|
this.emitReconnectMetric('success', {
|
|
attemptNo,
|
|
maxAttempts,
|
|
delayMs: effectiveDelay,
|
|
});
|
|
this.reconnectAttempts = 0;
|
|
} catch (error) {
|
|
logger.error('Gateway reconnection attempt failed:', error);
|
|
this.emitReconnectMetric('failure', {
|
|
attemptNo,
|
|
maxAttempts,
|
|
delayMs: effectiveDelay,
|
|
error: error instanceof Error ? error.message : String(error),
|
|
});
|
|
this.scheduleReconnect();
|
|
}
|
|
}, effectiveDelay);
|
|
}
|
|
|
|
private emitReconnectMetric(
|
|
outcome: 'success' | 'failure',
|
|
payload: {
|
|
attemptNo: number;
|
|
maxAttempts: number;
|
|
delayMs: number;
|
|
error?: string;
|
|
},
|
|
): void {
|
|
const successRate = this.reconnectAttemptsTotal > 0
|
|
? this.reconnectSuccessTotal / this.reconnectAttemptsTotal
|
|
: 0;
|
|
|
|
const properties = {
|
|
outcome,
|
|
attemptNo: payload.attemptNo,
|
|
maxAttempts: payload.maxAttempts,
|
|
delayMs: payload.delayMs,
|
|
gateway_reconnect_success_count: this.reconnectSuccessTotal,
|
|
gateway_reconnect_attempt_count: this.reconnectAttemptsTotal,
|
|
gateway_reconnect_success_rate: Number(successRate.toFixed(4)),
|
|
...(payload.error ? { error: payload.error } : {}),
|
|
};
|
|
|
|
trackMetric('gateway.reconnect', properties);
|
|
captureTelemetryEvent('gateway_reconnect', properties);
|
|
}
|
|
|
|
/**
|
|
* Update status and emit event
|
|
*/
|
|
private setStatus(update: Partial<GatewayStatus>): void {
|
|
this.stateController.setStatus(update);
|
|
}
|
|
}
|