Add channel health diagnostics and gateway recovery fixes (#855)
This commit is contained in:
committed by
GitHub
Unverified
parent
6acd8acf5a
commit
1f39d1a8a7
@@ -65,6 +65,40 @@ export interface GatewayStatus {
|
||||
gatewayReady?: boolean;
|
||||
}
|
||||
|
||||
export type GatewayHealthState = 'healthy' | 'degraded' | 'unresponsive';
|
||||
|
||||
export interface GatewayHealthSummary {
|
||||
state: GatewayHealthState;
|
||||
reasons: string[];
|
||||
consecutiveHeartbeatMisses: number;
|
||||
lastAliveAt?: number;
|
||||
lastRpcSuccessAt?: number;
|
||||
lastRpcFailureAt?: number;
|
||||
lastRpcFailureMethod?: string;
|
||||
lastChannelsStatusOkAt?: number;
|
||||
lastChannelsStatusFailureAt?: number;
|
||||
}
|
||||
|
||||
export interface GatewayDiagnosticsSnapshot {
|
||||
lastAliveAt?: number;
|
||||
lastRpcSuccessAt?: number;
|
||||
lastRpcFailureAt?: number;
|
||||
lastRpcFailureMethod?: string;
|
||||
lastHeartbeatTimeoutAt?: number;
|
||||
consecutiveHeartbeatMisses: number;
|
||||
lastSocketCloseAt?: number;
|
||||
lastSocketCloseCode?: number;
|
||||
consecutiveRpcFailures: number;
|
||||
}
|
||||
|
||||
function isTransportRpcFailure(error: unknown): boolean {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
return message.includes('RPC timeout:')
|
||||
|| message.includes('Gateway not connected')
|
||||
|| message.includes('Gateway stopped')
|
||||
|| message.includes('Failed to send RPC request:');
|
||||
}
|
||||
|
||||
/**
|
||||
* Gateway Manager Events
|
||||
*/
|
||||
@@ -126,6 +160,10 @@ export class GatewayManager extends EventEmitter {
|
||||
/** Set by scheduleReconnect() before calling start() to signal auto-reconnect. */
|
||||
private isAutoReconnectStart = false;
|
||||
private gatewayReadyFallbackTimer: NodeJS.Timeout | null = null;
|
||||
private diagnostics: GatewayDiagnosticsSnapshot = {
|
||||
consecutiveHeartbeatMisses: 0,
|
||||
consecutiveRpcFailures: 0,
|
||||
};
|
||||
|
||||
constructor(config?: Partial<ReconnectConfig>) {
|
||||
super();
|
||||
@@ -197,6 +235,10 @@ export class GatewayManager extends EventEmitter {
|
||||
return this.stateController.getStatus();
|
||||
}
|
||||
|
||||
getDiagnostics(): GatewayDiagnosticsSnapshot {
|
||||
return { ...this.diagnostics };
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if Gateway is connected and ready
|
||||
*/
|
||||
@@ -413,6 +455,7 @@ export class GatewayManager extends EventEmitter {
|
||||
|
||||
this.restartController.resetDeferredRestart();
|
||||
this.isAutoReconnectStart = false;
|
||||
this.diagnostics.consecutiveHeartbeatMisses = 0;
|
||||
this.setStatus({ state: 'stopped', error: undefined, pid: undefined, connectedAt: undefined, uptime: undefined, gatewayReady: undefined });
|
||||
}
|
||||
|
||||
@@ -712,7 +755,7 @@ export class GatewayManager extends EventEmitter {
|
||||
* Uses OpenClaw protocol format: { type: "req", id: "...", method: "...", params: {...} }
|
||||
*/
|
||||
async rpc<T>(method: string, params?: unknown, timeoutMs = 30000): Promise<T> {
|
||||
return new Promise((resolve, reject) => {
|
||||
return await new Promise<T>((resolve, reject) => {
|
||||
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
|
||||
reject(new Error('Gateway not connected'));
|
||||
return;
|
||||
@@ -745,6 +788,14 @@ export class GatewayManager extends EventEmitter {
|
||||
} catch (error) {
|
||||
rejectPendingGatewayRequest(this.pendingRequests, id, new Error(`Failed to send RPC request: ${error}`));
|
||||
}
|
||||
}).then((result) => {
|
||||
this.recordRpcSuccess();
|
||||
return result;
|
||||
}).catch((error) => {
|
||||
if (isTransportRpcFailure(error)) {
|
||||
this.recordRpcFailure(method);
|
||||
}
|
||||
throw error;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -782,6 +833,32 @@ export class GatewayManager extends EventEmitter {
|
||||
}
|
||||
}
|
||||
|
||||
private recordGatewayAlive(): void {
|
||||
this.diagnostics.lastAliveAt = Date.now();
|
||||
this.diagnostics.consecutiveHeartbeatMisses = 0;
|
||||
}
|
||||
|
||||
private recordRpcSuccess(): void {
|
||||
this.diagnostics.lastRpcSuccessAt = Date.now();
|
||||
this.diagnostics.consecutiveRpcFailures = 0;
|
||||
}
|
||||
|
||||
private recordRpcFailure(method: string): void {
|
||||
this.diagnostics.lastRpcFailureAt = Date.now();
|
||||
this.diagnostics.lastRpcFailureMethod = method;
|
||||
this.diagnostics.consecutiveRpcFailures += 1;
|
||||
}
|
||||
|
||||
private recordHeartbeatTimeout(consecutiveMisses: number): void {
|
||||
this.diagnostics.lastHeartbeatTimeoutAt = Date.now();
|
||||
this.diagnostics.consecutiveHeartbeatMisses = consecutiveMisses;
|
||||
}
|
||||
|
||||
private recordSocketClose(code: number): void {
|
||||
this.diagnostics.lastSocketCloseAt = Date.now();
|
||||
this.diagnostics.lastSocketCloseCode = code;
|
||||
}
|
||||
|
||||
/**
|
||||
* Start Gateway process
|
||||
* Uses OpenClaw npm package from node_modules (dev) or resources (production)
|
||||
@@ -878,7 +955,9 @@ export class GatewayManager extends EventEmitter {
|
||||
this.ws = ws;
|
||||
ws.on('pong', () => {
|
||||
this.connectionMonitor.markAlive('pong');
|
||||
this.recordGatewayAlive();
|
||||
});
|
||||
this.recordGatewayAlive();
|
||||
this.setStatus({
|
||||
state: 'running',
|
||||
port,
|
||||
@@ -892,6 +971,8 @@ export class GatewayManager extends EventEmitter {
|
||||
},
|
||||
onCloseAfterHandshake: (closeCode) => {
|
||||
this.connectionMonitor.clear();
|
||||
this.recordSocketClose(closeCode);
|
||||
this.diagnostics.consecutiveHeartbeatMisses = 0;
|
||||
if (this.status.state === 'running') {
|
||||
this.setStatus({ state: 'stopped' });
|
||||
// On Windows, skip reconnect from WS close. The Gateway is a local
|
||||
@@ -916,6 +997,7 @@ export class GatewayManager extends EventEmitter {
|
||||
*/
|
||||
private handleMessage(message: unknown): void {
|
||||
this.connectionMonitor.markAlive('message');
|
||||
this.recordGatewayAlive();
|
||||
|
||||
if (typeof message !== 'object' || message === null) {
|
||||
logger.debug('Received non-object Gateway message');
|
||||
@@ -986,24 +1068,25 @@ export class GatewayManager extends EventEmitter {
|
||||
}
|
||||
},
|
||||
onHeartbeatTimeout: ({ consecutiveMisses, timeoutMs }) => {
|
||||
// Heartbeat timeout is observability-only. We intentionally do NOT
|
||||
// terminate the socket or trigger reconnection here because:
|
||||
//
|
||||
// 1. If the gateway process dies → child.on('exit') fires reliably.
|
||||
// 2. If the socket disconnects → ws.on('close') fires reliably.
|
||||
// 3. If the gateway event loop is blocked (skills scanning, GC,
|
||||
// antivirus) → pong is delayed but the process and connection
|
||||
// are still valid. Terminating the socket would cause a
|
||||
// cascading restart loop for no reason.
|
||||
//
|
||||
// The only scenario ping/pong could catch (silent half-open TCP on
|
||||
// localhost) is practically impossible. So we just log.
|
||||
this.recordHeartbeatTimeout(consecutiveMisses);
|
||||
const pid = this.process?.pid ?? 'unknown';
|
||||
const isWindows = process.platform === 'win32';
|
||||
const shouldAttemptRecovery = !isWindows && this.shouldReconnect && this.status.state === 'running';
|
||||
logger.warn(
|
||||
`Gateway heartbeat: ${consecutiveMisses} consecutive pong misses ` +
|
||||
`(timeout=${timeoutMs}ms, pid=${pid}, state=${this.status.state}). ` +
|
||||
`No action taken — relying on process exit and socket close events.`,
|
||||
`(timeout=${timeoutMs}ms, pid=${pid}, state=${this.status.state}, autoReconnect=${this.shouldReconnect}).`,
|
||||
);
|
||||
if (!shouldAttemptRecovery) {
|
||||
const reason = isWindows
|
||||
? 'platform=win32'
|
||||
: 'lifecycle is not in auto-recoverable running state';
|
||||
logger.warn(`Gateway heartbeat recovery skipped (${reason})`);
|
||||
return;
|
||||
}
|
||||
logger.warn('Gateway heartbeat recovery: restarting unresponsive gateway process');
|
||||
void this.restart().catch((error) => {
|
||||
logger.warn('Gateway heartbeat recovery failed:', error);
|
||||
});
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user