diff --git a/electron/gateway/connection-monitor.ts b/electron/gateway/connection-monitor.ts index 4f3010e22..3b90dabcb 100644 --- a/electron/gateway/connection-monitor.ts +++ b/electron/gateway/connection-monitor.ts @@ -1,46 +1,76 @@ import { logger } from '../utils/logger'; type HealthResult = { ok: boolean; error?: string }; +type HeartbeatAliveReason = 'pong' | 'message'; + +type PingOptions = { + sendPing: () => void; + onHeartbeatTimeout: (context: { consecutiveMisses: number; timeoutMs: number }) => void; + intervalMs?: number; + timeoutMs?: number; + maxConsecutiveMisses?: number; +}; export class GatewayConnectionMonitor { private pingInterval: NodeJS.Timeout | null = null; - private pongTimeout: NodeJS.Timeout | null = null; private healthCheckInterval: NodeJS.Timeout | null = null; + private lastPingAt = 0; + private waitingForAlive = false; + private consecutiveMisses = 0; + private timeoutTriggered = false; + + startPing(options: PingOptions): void { + const intervalMs = options.intervalMs ?? 30000; + const timeoutMs = options.timeoutMs ?? 10000; + const maxConsecutiveMisses = Math.max(1, options.maxConsecutiveMisses ?? 3); + this.resetHeartbeatState(); - startPing( - sendPing: () => void, - onPongTimeout?: () => void, - intervalMs = 30000, - timeoutMs = 15000, - ): void { if (this.pingInterval) { clearInterval(this.pingInterval); } - if (this.pongTimeout) { - clearTimeout(this.pongTimeout); - this.pongTimeout = null; - } this.pingInterval = setInterval(() => { - sendPing(); + const now = Date.now(); - if (onPongTimeout) { - if (this.pongTimeout) { - clearTimeout(this.pongTimeout); + if (this.waitingForAlive && now - this.lastPingAt >= timeoutMs) { + this.waitingForAlive = false; + this.consecutiveMisses += 1; + logger.warn( + `Gateway heartbeat missed (${this.consecutiveMisses}/${maxConsecutiveMisses}, timeout=${timeoutMs}ms)`, + ); + if (this.consecutiveMisses >= maxConsecutiveMisses && !this.timeoutTriggered) { + this.timeoutTriggered = true; + options.onHeartbeatTimeout({ + consecutiveMisses: this.consecutiveMisses, + timeoutMs, + }); + return; } - this.pongTimeout = setTimeout(() => { - this.pongTimeout = null; - onPongTimeout(); - }, timeoutMs); } + + options.sendPing(); + this.waitingForAlive = true; + this.lastPingAt = now; }, intervalMs); } - handlePong(): void { - if (this.pongTimeout) { - clearTimeout(this.pongTimeout); - this.pongTimeout = null; + markAlive(reason: HeartbeatAliveReason): void { + // Only log true recovery cases to avoid steady-state heartbeat log spam. + if (this.consecutiveMisses > 0) { + logger.debug(`Gateway heartbeat recovered via ${reason} (misses=${this.consecutiveMisses})`); } + this.waitingForAlive = false; + this.consecutiveMisses = 0; + this.timeoutTriggered = false; + } + + // Backward-compatible alias for old callers. + handlePong(): void { + this.markAlive('pong'); + } + + getConsecutiveMisses(): number { + return this.consecutiveMisses; } startHealthCheck(options: { @@ -78,13 +108,17 @@ export class GatewayConnectionMonitor { clearInterval(this.pingInterval); this.pingInterval = null; } - if (this.pongTimeout) { - clearTimeout(this.pongTimeout); - this.pongTimeout = null; - } if (this.healthCheckInterval) { clearInterval(this.healthCheckInterval); this.healthCheckInterval = null; } + this.resetHeartbeatState(); + } + + private resetHeartbeatState(): void { + this.lastPingAt = 0; + this.waitingForAlive = false; + this.consecutiveMisses = 0; + this.timeoutTriggered = false; } } diff --git a/electron/gateway/manager.ts b/electron/gateway/manager.ts index c0de90b17..396f39c15 100644 --- a/electron/gateway/manager.ts +++ b/electron/gateway/manager.ts @@ -109,6 +109,9 @@ export class GatewayManager extends EventEmitter { private reconnectAttemptsTotal = 0; private reconnectSuccessTotal = 0; private static readonly RELOAD_POLICY_REFRESH_MS = 15_000; + private static readonly HEARTBEAT_INTERVAL_MS = 30_000; + private static readonly HEARTBEAT_TIMEOUT_MS = 12_000; + private static readonly HEARTBEAT_MAX_MISSES = 3; public static readonly RESTART_COOLDOWN_MS = 5_000; private lastRestartAt = 0; @@ -694,6 +697,7 @@ export class GatewayManager extends EventEmitter { onExit: (exitedChild, code) => { this.processExitCode = code; this.ownsProcess = false; + this.connectionMonitor.clear(); if (this.process === exitedChild) { this.process = null; } @@ -729,8 +733,8 @@ export class GatewayManager extends EventEmitter { getToken: async () => await import('../utils/store').then(({ getSetting }) => getSetting('gatewayToken')), onHandshakeComplete: (ws) => { this.ws = ws; - this.ws.on('pong', () => { - this.connectionMonitor.handlePong(); + ws.on('pong', () => { + this.connectionMonitor.markAlive('pong'); }); this.setStatus({ state: 'running', @@ -743,6 +747,7 @@ export class GatewayManager extends EventEmitter { this.handleMessage(message); }, onCloseAfterHandshake: () => { + this.connectionMonitor.clear(); if (this.status.state === 'running') { this.setStatus({ state: 'stopped' }); this.scheduleReconnect(); @@ -755,6 +760,8 @@ export class GatewayManager extends EventEmitter { * Handle incoming WebSocket message */ private handleMessage(message: unknown): void { + this.connectionMonitor.markAlive('message'); + if (typeof message !== 'object' || message === null) { logger.debug('Received non-object Gateway message'); return; @@ -807,24 +814,34 @@ export class GatewayManager extends EventEmitter { * Start ping interval to keep connection alive */ private startPing(): void { - this.connectionMonitor.startPing( - () => { + this.connectionMonitor.startPing({ + intervalMs: GatewayManager.HEARTBEAT_INTERVAL_MS, + timeoutMs: GatewayManager.HEARTBEAT_TIMEOUT_MS, + maxConsecutiveMisses: GatewayManager.HEARTBEAT_MAX_MISSES, + sendPing: () => { if (this.ws?.readyState === WebSocket.OPEN) { this.ws.ping(); } }, - () => { - logger.error('Gateway WebSocket dead connection detected (pong timeout)'); - if (this.ws) { - this.ws.terminate(); // Force close the dead connection immediately - this.ws = null; + onHeartbeatTimeout: ({ consecutiveMisses, timeoutMs }) => { + if (this.status.state !== 'running' || !this.shouldReconnect) { + return; } - if (this.status.state === 'running') { - this.setStatus({ state: 'error', error: 'WebSocket ping timeout' }); - this.scheduleReconnect(); + const ws = this.ws; + if (!ws || ws.readyState !== WebSocket.OPEN) { + return; } - } - ); + + logger.warn( + `Gateway heartbeat timed out after ${consecutiveMisses} consecutive misses (timeout=${timeoutMs}ms); terminating stale socket`, + ); + try { + ws.terminate(); + } catch (error) { + logger.warn('Failed to terminate stale Gateway socket after heartbeat timeout:', error); + } + }, + }); } /** diff --git a/tests/unit/gateway-connection-monitor.test.ts b/tests/unit/gateway-connection-monitor.test.ts new file mode 100644 index 000000000..00b4da49c --- /dev/null +++ b/tests/unit/gateway-connection-monitor.test.ts @@ -0,0 +1,70 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { GatewayConnectionMonitor } from '@electron/gateway/connection-monitor'; + +vi.mock('electron', () => ({ + app: { + getPath: () => '/tmp', + getVersion: () => '0.0.0-test', + isPackaged: false, + }, + utilityProcess: { + fork: vi.fn(), + }, +})); + +describe('GatewayConnectionMonitor heartbeat', () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.setSystemTime(new Date('2026-03-19T00:00:00.000Z')); + }); + + it('terminates only after consecutive heartbeat misses reach threshold', () => { + const monitor = new GatewayConnectionMonitor(); + const sendPing = vi.fn(); + const onHeartbeatTimeout = vi.fn(); + + monitor.startPing({ + sendPing, + onHeartbeatTimeout, + intervalMs: 100, + timeoutMs: 50, + maxConsecutiveMisses: 3, + }); + + vi.advanceTimersByTime(100); // send ping #1 + vi.advanceTimersByTime(100); // miss #1, send ping #2 + vi.advanceTimersByTime(100); // miss #2, send ping #3 + expect(onHeartbeatTimeout).not.toHaveBeenCalled(); + + vi.advanceTimersByTime(100); // miss #3 -> timeout callback + expect(onHeartbeatTimeout).toHaveBeenCalledTimes(1); + expect(onHeartbeatTimeout).toHaveBeenCalledWith({ consecutiveMisses: 3, timeoutMs: 50 }); + expect(sendPing).toHaveBeenCalledTimes(3); + }); + + it('resets miss counter when alive signal is received', () => { + const monitor = new GatewayConnectionMonitor(); + const sendPing = vi.fn(); + const onHeartbeatTimeout = vi.fn(); + + monitor.startPing({ + sendPing, + onHeartbeatTimeout, + intervalMs: 100, + timeoutMs: 50, + maxConsecutiveMisses: 2, + }); + + vi.advanceTimersByTime(100); // send ping #1 + vi.advanceTimersByTime(100); // miss #1, send ping #2 + expect(monitor.getConsecutiveMisses()).toBe(1); + + monitor.markAlive('pong'); + expect(monitor.getConsecutiveMisses()).toBe(0); + + vi.advanceTimersByTime(100); // send ping #3 + vi.advanceTimersByTime(100); // miss #1 again (reset confirmed) + expect(monitor.getConsecutiveMisses()).toBe(1); + expect(onHeartbeatTimeout).not.toHaveBeenCalled(); + }); +}); diff --git a/tests/unit/gateway-manager-heartbeat.test.ts b/tests/unit/gateway-manager-heartbeat.test.ts new file mode 100644 index 000000000..e7d68949f --- /dev/null +++ b/tests/unit/gateway-manager-heartbeat.test.ts @@ -0,0 +1,79 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +vi.mock('electron', () => ({ + app: { + getPath: () => '/tmp', + isPackaged: false, + }, + utilityProcess: { + fork: vi.fn(), + }, +})); + +describe('GatewayManager heartbeat recovery', () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.setSystemTime(new Date('2026-03-19T00:00:00.000Z')); + }); + + it('terminates stale socket only after 3 consecutive heartbeat misses', async () => { + const { GatewayManager } = await import('@electron/gateway/manager'); + const manager = new GatewayManager(); + + const ws = { + readyState: 1, // WebSocket.OPEN + ping: vi.fn(), + terminate: vi.fn(), + on: vi.fn(), + }; + + (manager as unknown as { ws: typeof ws }).ws = ws; + (manager as unknown as { shouldReconnect: boolean }).shouldReconnect = true; + (manager as unknown as { status: { state: string; port: number } }).status = { + state: 'running', + port: 18789, + }; + + (manager as unknown as { startPing: () => void }).startPing(); + + vi.advanceTimersByTime(120_000); + + expect(ws.ping).toHaveBeenCalledTimes(3); + expect(ws.terminate).toHaveBeenCalledTimes(1); + + (manager as unknown as { connectionMonitor: { clear: () => void } }).connectionMonitor.clear(); + }); + + it('does not terminate when heartbeat is recovered by incoming messages', async () => { + const { GatewayManager } = await import('@electron/gateway/manager'); + const manager = new GatewayManager(); + + const ws = { + readyState: 1, // WebSocket.OPEN + ping: vi.fn(), + terminate: vi.fn(), + on: vi.fn(), + }; + + (manager as unknown as { ws: typeof ws }).ws = ws; + (manager as unknown as { shouldReconnect: boolean }).shouldReconnect = true; + (manager as unknown as { status: { state: string; port: number } }).status = { + state: 'running', + port: 18789, + }; + + (manager as unknown as { startPing: () => void }).startPing(); + + vi.advanceTimersByTime(30_000); // ping #1 + vi.advanceTimersByTime(30_000); // miss #1 + ping #2 + (manager as unknown as { handleMessage: (message: unknown) => void }).handleMessage('alive'); + + vi.advanceTimersByTime(30_000); // recovered, ping #3 + vi.advanceTimersByTime(30_000); // miss #1 + ping #4 + vi.advanceTimersByTime(30_000); // miss #2 + ping #5 + + expect(ws.terminate).not.toHaveBeenCalled(); + + (manager as unknown as { connectionMonitor: { clear: () => void } }).connectionMonitor.clear(); + }); +});