feat(gateway): enhance gateway process management with auto-reconnection
Improve Gateway lifecycle management with the following features: - Add exponential backoff reconnection (1s-30s delay, max 10 attempts) - Add health check monitoring every 30 seconds - Add proper restart method with graceful shutdown - Handle server-initiated notifications (channel status, chat messages) - Add 'reconnecting' state for better UI feedback - Enhance IPC handlers with isConnected and health check endpoints - Update preload script with new event channels - Improve type safety and error handling throughout Also fixes several TypeScript errors and unused variable warnings.
This commit is contained in:
@@ -2,24 +2,24 @@
|
||||
* Gateway Process Manager
|
||||
* Manages the OpenClaw Gateway process lifecycle
|
||||
*/
|
||||
import { spawn, ChildProcess, exec } from 'child_process';
|
||||
import { spawn, ChildProcess } from 'child_process';
|
||||
import { EventEmitter } from 'events';
|
||||
import WebSocket from 'ws';
|
||||
import { promisify } from 'util';
|
||||
import { PORTS } from '../utils/config';
|
||||
|
||||
const execAsync = promisify(exec);
|
||||
import { GatewayEventType, JsonRpcNotification, isNotification, isResponse } from './protocol';
|
||||
|
||||
/**
|
||||
* Gateway connection status
|
||||
*/
|
||||
export interface GatewayStatus {
|
||||
state: 'stopped' | 'starting' | 'running' | 'error';
|
||||
state: 'stopped' | 'starting' | 'running' | 'error' | 'reconnecting';
|
||||
port: number;
|
||||
pid?: number;
|
||||
uptime?: number;
|
||||
error?: string;
|
||||
connectedAt?: number;
|
||||
version?: string;
|
||||
reconnectAttempts?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -28,10 +28,28 @@ export interface GatewayStatus {
|
||||
export interface GatewayManagerEvents {
|
||||
status: (status: GatewayStatus) => void;
|
||||
message: (message: unknown) => void;
|
||||
notification: (notification: JsonRpcNotification) => void;
|
||||
exit: (code: number | null) => void;
|
||||
error: (error: Error) => void;
|
||||
'channel:status': (data: { channelId: string; status: string }) => void;
|
||||
'chat:message': (data: { message: unknown }) => void;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconnection configuration
|
||||
*/
|
||||
interface ReconnectConfig {
|
||||
maxAttempts: number;
|
||||
baseDelay: number;
|
||||
maxDelay: number;
|
||||
}
|
||||
|
||||
const DEFAULT_RECONNECT_CONFIG: ReconnectConfig = {
|
||||
maxAttempts: 10,
|
||||
baseDelay: 1000,
|
||||
maxDelay: 30000,
|
||||
};
|
||||
|
||||
/**
|
||||
* Gateway Manager
|
||||
* Handles starting, stopping, and communicating with the OpenClaw Gateway
|
||||
@@ -42,14 +60,19 @@ export class GatewayManager extends EventEmitter {
|
||||
private status: GatewayStatus = { state: 'stopped', port: PORTS.OPENCLAW_GATEWAY };
|
||||
private reconnectTimer: NodeJS.Timeout | null = null;
|
||||
private pingInterval: NodeJS.Timeout | null = null;
|
||||
private healthCheckInterval: NodeJS.Timeout | null = null;
|
||||
private reconnectAttempts = 0;
|
||||
private reconnectConfig: ReconnectConfig;
|
||||
private shouldReconnect = true;
|
||||
private pendingRequests: Map<string, {
|
||||
resolve: (value: unknown) => void;
|
||||
reject: (error: Error) => void;
|
||||
timeout: NodeJS.Timeout;
|
||||
}> = new Map();
|
||||
|
||||
constructor() {
|
||||
constructor(config?: Partial<ReconnectConfig>) {
|
||||
super();
|
||||
this.reconnectConfig = { ...DEFAULT_RECONNECT_CONFIG, ...config };
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -59,6 +82,13 @@ export class GatewayManager extends EventEmitter {
|
||||
return { ...this.status };
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if Gateway is connected and ready
|
||||
*/
|
||||
isConnected(): boolean {
|
||||
return this.status.state === 'running' && this.ws?.readyState === WebSocket.OPEN;
|
||||
}
|
||||
|
||||
/**
|
||||
* Start Gateway process
|
||||
*/
|
||||
@@ -67,7 +97,9 @@ export class GatewayManager extends EventEmitter {
|
||||
return;
|
||||
}
|
||||
|
||||
this.setStatus({ state: 'starting' });
|
||||
this.shouldReconnect = true;
|
||||
this.reconnectAttempts = 0;
|
||||
this.setStatus({ state: 'starting', reconnectAttempts: 0 });
|
||||
|
||||
try {
|
||||
// Check if Gateway is already running
|
||||
@@ -75,6 +107,7 @@ export class GatewayManager extends EventEmitter {
|
||||
if (existing) {
|
||||
console.log('Found existing Gateway on port', existing.port);
|
||||
await this.connect(existing.port);
|
||||
this.startHealthCheck();
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -87,6 +120,9 @@ export class GatewayManager extends EventEmitter {
|
||||
// Connect WebSocket
|
||||
await this.connect(this.status.port);
|
||||
|
||||
// Start health monitoring
|
||||
this.startHealthCheck();
|
||||
|
||||
} catch (error) {
|
||||
this.setStatus({ state: 'error', error: String(error) });
|
||||
throw error;
|
||||
@@ -97,7 +133,56 @@ export class GatewayManager extends EventEmitter {
|
||||
* Stop Gateway process
|
||||
*/
|
||||
async stop(): Promise<void> {
|
||||
// Clear timers
|
||||
// Disable auto-reconnect
|
||||
this.shouldReconnect = false;
|
||||
|
||||
// Clear all timers
|
||||
this.clearAllTimers();
|
||||
|
||||
// Close WebSocket
|
||||
if (this.ws) {
|
||||
this.ws.close(1000, 'Gateway stopped by user');
|
||||
this.ws = null;
|
||||
}
|
||||
|
||||
// Kill process
|
||||
if (this.process) {
|
||||
this.process.kill('SIGTERM');
|
||||
// Force kill after timeout
|
||||
setTimeout(() => {
|
||||
if (this.process) {
|
||||
this.process.kill('SIGKILL');
|
||||
this.process = null;
|
||||
}
|
||||
}, 5000);
|
||||
this.process = null;
|
||||
}
|
||||
|
||||
// Reject all pending requests
|
||||
for (const [, request] of this.pendingRequests) {
|
||||
clearTimeout(request.timeout);
|
||||
request.reject(new Error('Gateway stopped'));
|
||||
}
|
||||
this.pendingRequests.clear();
|
||||
|
||||
this.setStatus({ state: 'stopped', error: undefined });
|
||||
}
|
||||
|
||||
/**
|
||||
* Restart Gateway process
|
||||
*/
|
||||
async restart(): Promise<void> {
|
||||
console.log('Restarting Gateway...');
|
||||
await this.stop();
|
||||
// Brief delay before restart
|
||||
await new Promise(resolve => setTimeout(resolve, 1000));
|
||||
await this.start();
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear all active timers
|
||||
*/
|
||||
private clearAllTimers(): void {
|
||||
if (this.reconnectTimer) {
|
||||
clearTimeout(this.reconnectTimer);
|
||||
this.reconnectTimer = null;
|
||||
@@ -106,33 +191,16 @@ export class GatewayManager extends EventEmitter {
|
||||
clearInterval(this.pingInterval);
|
||||
this.pingInterval = null;
|
||||
}
|
||||
|
||||
// Close WebSocket
|
||||
if (this.ws) {
|
||||
this.ws.close();
|
||||
this.ws = null;
|
||||
if (this.healthCheckInterval) {
|
||||
clearInterval(this.healthCheckInterval);
|
||||
this.healthCheckInterval = null;
|
||||
}
|
||||
|
||||
// Kill process
|
||||
if (this.process) {
|
||||
this.process.kill();
|
||||
this.process = null;
|
||||
}
|
||||
|
||||
// Reject all pending requests
|
||||
for (const [id, request] of this.pendingRequests) {
|
||||
clearTimeout(request.timeout);
|
||||
request.reject(new Error('Gateway stopped'));
|
||||
}
|
||||
this.pendingRequests.clear();
|
||||
|
||||
this.setStatus({ state: 'stopped' });
|
||||
}
|
||||
|
||||
/**
|
||||
* Make an RPC call to the Gateway
|
||||
*/
|
||||
async rpc<T>(method: string, params?: unknown): Promise<T> {
|
||||
async rpc<T>(method: string, params?: unknown, timeoutMs = 30000): Promise<T> {
|
||||
return new Promise((resolve, reject) => {
|
||||
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
|
||||
reject(new Error('Gateway not connected'));
|
||||
@@ -145,7 +213,7 @@ export class GatewayManager extends EventEmitter {
|
||||
const timeout = setTimeout(() => {
|
||||
this.pendingRequests.delete(id);
|
||||
reject(new Error(`RPC timeout: ${method}`));
|
||||
}, 30000);
|
||||
}, timeoutMs);
|
||||
|
||||
// Store pending request
|
||||
this.pendingRequests.set(id, {
|
||||
@@ -162,10 +230,61 @@ export class GatewayManager extends EventEmitter {
|
||||
params,
|
||||
};
|
||||
|
||||
this.ws.send(JSON.stringify(request));
|
||||
try {
|
||||
this.ws.send(JSON.stringify(request));
|
||||
} catch (error) {
|
||||
this.pendingRequests.delete(id);
|
||||
clearTimeout(timeout);
|
||||
reject(new Error(`Failed to send RPC request: ${error}`));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Start health check monitoring
|
||||
*/
|
||||
private startHealthCheck(): void {
|
||||
if (this.healthCheckInterval) {
|
||||
clearInterval(this.healthCheckInterval);
|
||||
}
|
||||
|
||||
this.healthCheckInterval = setInterval(async () => {
|
||||
if (this.status.state !== 'running') {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const health = await this.checkHealth();
|
||||
if (!health.ok) {
|
||||
console.warn('Gateway health check failed:', health.error);
|
||||
this.emit('error', new Error(health.error || 'Health check failed'));
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('Health check error:', error);
|
||||
}
|
||||
}, 30000); // Check every 30 seconds
|
||||
}
|
||||
|
||||
/**
|
||||
* Check Gateway health via HTTP endpoint
|
||||
*/
|
||||
async checkHealth(): Promise<{ ok: boolean; error?: string; uptime?: number }> {
|
||||
try {
|
||||
const response = await fetch(`http://localhost:${this.status.port}/health`, {
|
||||
signal: AbortSignal.timeout(5000),
|
||||
});
|
||||
|
||||
if (response.ok) {
|
||||
const data = await response.json() as { uptime?: number };
|
||||
return { ok: true, uptime: data.uptime };
|
||||
}
|
||||
|
||||
return { ok: false, error: `Health check returned ${response.status}` };
|
||||
} catch (error) {
|
||||
return { ok: false, error: String(error) };
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find existing Gateway process
|
||||
*/
|
||||
@@ -307,25 +426,61 @@ export class GatewayManager extends EventEmitter {
|
||||
/**
|
||||
* Handle incoming WebSocket message
|
||||
*/
|
||||
private handleMessage(message: { id?: string; result?: unknown; error?: unknown }): void {
|
||||
// Check if this is a response to a pending request
|
||||
if (message.id && this.pendingRequests.has(message.id)) {
|
||||
const request = this.pendingRequests.get(message.id)!;
|
||||
private handleMessage(message: unknown): void {
|
||||
// Check if this is a JSON-RPC response
|
||||
if (isResponse(message) && message.id && this.pendingRequests.has(String(message.id))) {
|
||||
const request = this.pendingRequests.get(String(message.id))!;
|
||||
clearTimeout(request.timeout);
|
||||
this.pendingRequests.delete(message.id);
|
||||
this.pendingRequests.delete(String(message.id));
|
||||
|
||||
if (message.error) {
|
||||
request.reject(new Error(String(message.error)));
|
||||
const errorMsg = typeof message.error === 'object'
|
||||
? (message.error as { message?: string }).message || JSON.stringify(message.error)
|
||||
: String(message.error);
|
||||
request.reject(new Error(errorMsg));
|
||||
} else {
|
||||
request.resolve(message.result);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Emit message for other handlers
|
||||
// Check if this is a notification (server-initiated event)
|
||||
if (isNotification(message)) {
|
||||
this.handleNotification(message);
|
||||
return;
|
||||
}
|
||||
|
||||
// Emit generic message for other handlers
|
||||
this.emit('message', message);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle server-initiated notifications
|
||||
*/
|
||||
private handleNotification(notification: JsonRpcNotification): void {
|
||||
this.emit('notification', notification);
|
||||
|
||||
// Route specific events
|
||||
switch (notification.method) {
|
||||
case GatewayEventType.CHANNEL_STATUS_CHANGED:
|
||||
this.emit('channel:status', notification.params as { channelId: string; status: string });
|
||||
break;
|
||||
|
||||
case GatewayEventType.MESSAGE_RECEIVED:
|
||||
this.emit('chat:message', notification.params as { message: unknown });
|
||||
break;
|
||||
|
||||
case GatewayEventType.ERROR:
|
||||
const errorData = notification.params as { message?: string };
|
||||
this.emit('error', new Error(errorData.message || 'Gateway error'));
|
||||
break;
|
||||
|
||||
default:
|
||||
// Unknown notification type, just log it
|
||||
console.log('Unknown Gateway notification:', notification.method);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start ping interval to keep connection alive
|
||||
*/
|
||||
@@ -342,29 +497,84 @@ export class GatewayManager extends EventEmitter {
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedule reconnection attempt
|
||||
* Schedule reconnection attempt with exponential backoff
|
||||
*/
|
||||
private scheduleReconnect(): void {
|
||||
if (!this.shouldReconnect) {
|
||||
console.log('Auto-reconnect disabled, not scheduling reconnect');
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.reconnectTimer) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.reconnectAttempts >= this.reconnectConfig.maxAttempts) {
|
||||
console.error(`Max reconnection attempts (${this.reconnectConfig.maxAttempts}) reached`);
|
||||
this.setStatus({
|
||||
state: 'error',
|
||||
error: 'Failed to reconnect after maximum attempts',
|
||||
reconnectAttempts: this.reconnectAttempts
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// Calculate delay with exponential backoff
|
||||
const delay = Math.min(
|
||||
this.reconnectConfig.baseDelay * Math.pow(2, this.reconnectAttempts),
|
||||
this.reconnectConfig.maxDelay
|
||||
);
|
||||
|
||||
this.reconnectAttempts++;
|
||||
console.log(`Scheduling reconnect attempt ${this.reconnectAttempts} in ${delay}ms`);
|
||||
|
||||
this.setStatus({
|
||||
state: 'reconnecting',
|
||||
reconnectAttempts: this.reconnectAttempts
|
||||
});
|
||||
|
||||
this.reconnectTimer = setTimeout(async () => {
|
||||
this.reconnectTimer = null;
|
||||
try {
|
||||
await this.start();
|
||||
// Try to find existing Gateway first
|
||||
const existing = await this.findExistingGateway();
|
||||
if (existing) {
|
||||
await this.connect(existing.port);
|
||||
this.reconnectAttempts = 0;
|
||||
this.startHealthCheck();
|
||||
return;
|
||||
}
|
||||
|
||||
// Otherwise restart the process
|
||||
await this.startProcess();
|
||||
await this.waitForReady();
|
||||
await this.connect(this.status.port);
|
||||
this.reconnectAttempts = 0;
|
||||
this.startHealthCheck();
|
||||
} catch (error) {
|
||||
console.error('Reconnection failed:', error);
|
||||
this.scheduleReconnect();
|
||||
}
|
||||
}, 5000);
|
||||
}, delay);
|
||||
}
|
||||
|
||||
/**
|
||||
* Update status and emit event
|
||||
*/
|
||||
private setStatus(update: Partial<GatewayStatus>): void {
|
||||
const previousState = this.status.state;
|
||||
this.status = { ...this.status, ...update };
|
||||
|
||||
// Calculate uptime if connected
|
||||
if (this.status.state === 'running' && this.status.connectedAt) {
|
||||
this.status.uptime = Date.now() - this.status.connectedAt;
|
||||
}
|
||||
|
||||
this.emit('status', this.status);
|
||||
|
||||
// Log state transitions
|
||||
if (previousState !== this.status.state) {
|
||||
console.log(`Gateway state: ${previousState} -> ${this.status.state}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user