fix(gateway): harden heartbeat timeout recovery to avoid reconnect flapping (#588)
Co-authored-by: zuolingxuan <zuolingxuan@bytedance.com>
This commit is contained in:
committed by
GitHub
Unverified
parent
8cca9af773
commit
8029b507ba
@@ -1,46 +1,76 @@
|
||||
import { logger } from '../utils/logger';
|
||||
|
||||
type HealthResult = { ok: boolean; error?: string };
|
||||
type HeartbeatAliveReason = 'pong' | 'message';
|
||||
|
||||
type PingOptions = {
|
||||
sendPing: () => void;
|
||||
onHeartbeatTimeout: (context: { consecutiveMisses: number; timeoutMs: number }) => void;
|
||||
intervalMs?: number;
|
||||
timeoutMs?: number;
|
||||
maxConsecutiveMisses?: number;
|
||||
};
|
||||
|
||||
export class GatewayConnectionMonitor {
|
||||
private pingInterval: NodeJS.Timeout | null = null;
|
||||
private pongTimeout: NodeJS.Timeout | null = null;
|
||||
private healthCheckInterval: NodeJS.Timeout | null = null;
|
||||
private lastPingAt = 0;
|
||||
private waitingForAlive = false;
|
||||
private consecutiveMisses = 0;
|
||||
private timeoutTriggered = false;
|
||||
|
||||
startPing(options: PingOptions): void {
|
||||
const intervalMs = options.intervalMs ?? 30000;
|
||||
const timeoutMs = options.timeoutMs ?? 10000;
|
||||
const maxConsecutiveMisses = Math.max(1, options.maxConsecutiveMisses ?? 3);
|
||||
this.resetHeartbeatState();
|
||||
|
||||
startPing(
|
||||
sendPing: () => void,
|
||||
onPongTimeout?: () => void,
|
||||
intervalMs = 30000,
|
||||
timeoutMs = 15000,
|
||||
): void {
|
||||
if (this.pingInterval) {
|
||||
clearInterval(this.pingInterval);
|
||||
}
|
||||
if (this.pongTimeout) {
|
||||
clearTimeout(this.pongTimeout);
|
||||
this.pongTimeout = null;
|
||||
}
|
||||
|
||||
this.pingInterval = setInterval(() => {
|
||||
sendPing();
|
||||
const now = Date.now();
|
||||
|
||||
if (onPongTimeout) {
|
||||
if (this.pongTimeout) {
|
||||
clearTimeout(this.pongTimeout);
|
||||
if (this.waitingForAlive && now - this.lastPingAt >= timeoutMs) {
|
||||
this.waitingForAlive = false;
|
||||
this.consecutiveMisses += 1;
|
||||
logger.warn(
|
||||
`Gateway heartbeat missed (${this.consecutiveMisses}/${maxConsecutiveMisses}, timeout=${timeoutMs}ms)`,
|
||||
);
|
||||
if (this.consecutiveMisses >= maxConsecutiveMisses && !this.timeoutTriggered) {
|
||||
this.timeoutTriggered = true;
|
||||
options.onHeartbeatTimeout({
|
||||
consecutiveMisses: this.consecutiveMisses,
|
||||
timeoutMs,
|
||||
});
|
||||
return;
|
||||
}
|
||||
this.pongTimeout = setTimeout(() => {
|
||||
this.pongTimeout = null;
|
||||
onPongTimeout();
|
||||
}, timeoutMs);
|
||||
}
|
||||
|
||||
options.sendPing();
|
||||
this.waitingForAlive = true;
|
||||
this.lastPingAt = now;
|
||||
}, intervalMs);
|
||||
}
|
||||
|
||||
handlePong(): void {
|
||||
if (this.pongTimeout) {
|
||||
clearTimeout(this.pongTimeout);
|
||||
this.pongTimeout = null;
|
||||
markAlive(reason: HeartbeatAliveReason): void {
|
||||
// Only log true recovery cases to avoid steady-state heartbeat log spam.
|
||||
if (this.consecutiveMisses > 0) {
|
||||
logger.debug(`Gateway heartbeat recovered via ${reason} (misses=${this.consecutiveMisses})`);
|
||||
}
|
||||
this.waitingForAlive = false;
|
||||
this.consecutiveMisses = 0;
|
||||
this.timeoutTriggered = false;
|
||||
}
|
||||
|
||||
// Backward-compatible alias for old callers.
|
||||
handlePong(): void {
|
||||
this.markAlive('pong');
|
||||
}
|
||||
|
||||
getConsecutiveMisses(): number {
|
||||
return this.consecutiveMisses;
|
||||
}
|
||||
|
||||
startHealthCheck(options: {
|
||||
@@ -78,13 +108,17 @@ export class GatewayConnectionMonitor {
|
||||
clearInterval(this.pingInterval);
|
||||
this.pingInterval = null;
|
||||
}
|
||||
if (this.pongTimeout) {
|
||||
clearTimeout(this.pongTimeout);
|
||||
this.pongTimeout = null;
|
||||
}
|
||||
if (this.healthCheckInterval) {
|
||||
clearInterval(this.healthCheckInterval);
|
||||
this.healthCheckInterval = null;
|
||||
}
|
||||
this.resetHeartbeatState();
|
||||
}
|
||||
|
||||
private resetHeartbeatState(): void {
|
||||
this.lastPingAt = 0;
|
||||
this.waitingForAlive = false;
|
||||
this.consecutiveMisses = 0;
|
||||
this.timeoutTriggered = false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -109,6 +109,9 @@ export class GatewayManager extends EventEmitter {
|
||||
private reconnectAttemptsTotal = 0;
|
||||
private reconnectSuccessTotal = 0;
|
||||
private static readonly RELOAD_POLICY_REFRESH_MS = 15_000;
|
||||
private static readonly HEARTBEAT_INTERVAL_MS = 30_000;
|
||||
private static readonly HEARTBEAT_TIMEOUT_MS = 12_000;
|
||||
private static readonly HEARTBEAT_MAX_MISSES = 3;
|
||||
public static readonly RESTART_COOLDOWN_MS = 5_000;
|
||||
private lastRestartAt = 0;
|
||||
|
||||
@@ -694,6 +697,7 @@ export class GatewayManager extends EventEmitter {
|
||||
onExit: (exitedChild, code) => {
|
||||
this.processExitCode = code;
|
||||
this.ownsProcess = false;
|
||||
this.connectionMonitor.clear();
|
||||
if (this.process === exitedChild) {
|
||||
this.process = null;
|
||||
}
|
||||
@@ -729,8 +733,8 @@ export class GatewayManager extends EventEmitter {
|
||||
getToken: async () => await import('../utils/store').then(({ getSetting }) => getSetting('gatewayToken')),
|
||||
onHandshakeComplete: (ws) => {
|
||||
this.ws = ws;
|
||||
this.ws.on('pong', () => {
|
||||
this.connectionMonitor.handlePong();
|
||||
ws.on('pong', () => {
|
||||
this.connectionMonitor.markAlive('pong');
|
||||
});
|
||||
this.setStatus({
|
||||
state: 'running',
|
||||
@@ -743,6 +747,7 @@ export class GatewayManager extends EventEmitter {
|
||||
this.handleMessage(message);
|
||||
},
|
||||
onCloseAfterHandshake: () => {
|
||||
this.connectionMonitor.clear();
|
||||
if (this.status.state === 'running') {
|
||||
this.setStatus({ state: 'stopped' });
|
||||
this.scheduleReconnect();
|
||||
@@ -755,6 +760,8 @@ export class GatewayManager extends EventEmitter {
|
||||
* Handle incoming WebSocket message
|
||||
*/
|
||||
private handleMessage(message: unknown): void {
|
||||
this.connectionMonitor.markAlive('message');
|
||||
|
||||
if (typeof message !== 'object' || message === null) {
|
||||
logger.debug('Received non-object Gateway message');
|
||||
return;
|
||||
@@ -807,24 +814,34 @@ export class GatewayManager extends EventEmitter {
|
||||
* Start ping interval to keep connection alive
|
||||
*/
|
||||
private startPing(): void {
|
||||
this.connectionMonitor.startPing(
|
||||
() => {
|
||||
this.connectionMonitor.startPing({
|
||||
intervalMs: GatewayManager.HEARTBEAT_INTERVAL_MS,
|
||||
timeoutMs: GatewayManager.HEARTBEAT_TIMEOUT_MS,
|
||||
maxConsecutiveMisses: GatewayManager.HEARTBEAT_MAX_MISSES,
|
||||
sendPing: () => {
|
||||
if (this.ws?.readyState === WebSocket.OPEN) {
|
||||
this.ws.ping();
|
||||
}
|
||||
},
|
||||
() => {
|
||||
logger.error('Gateway WebSocket dead connection detected (pong timeout)');
|
||||
if (this.ws) {
|
||||
this.ws.terminate(); // Force close the dead connection immediately
|
||||
this.ws = null;
|
||||
onHeartbeatTimeout: ({ consecutiveMisses, timeoutMs }) => {
|
||||
if (this.status.state !== 'running' || !this.shouldReconnect) {
|
||||
return;
|
||||
}
|
||||
if (this.status.state === 'running') {
|
||||
this.setStatus({ state: 'error', error: 'WebSocket ping timeout' });
|
||||
this.scheduleReconnect();
|
||||
const ws = this.ws;
|
||||
if (!ws || ws.readyState !== WebSocket.OPEN) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
logger.warn(
|
||||
`Gateway heartbeat timed out after ${consecutiveMisses} consecutive misses (timeout=${timeoutMs}ms); terminating stale socket`,
|
||||
);
|
||||
try {
|
||||
ws.terminate();
|
||||
} catch (error) {
|
||||
logger.warn('Failed to terminate stale Gateway socket after heartbeat timeout:', error);
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
70
tests/unit/gateway-connection-monitor.test.ts
Normal file
70
tests/unit/gateway-connection-monitor.test.ts
Normal file
@@ -0,0 +1,70 @@
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
import { GatewayConnectionMonitor } from '@electron/gateway/connection-monitor';
|
||||
|
||||
vi.mock('electron', () => ({
|
||||
app: {
|
||||
getPath: () => '/tmp',
|
||||
getVersion: () => '0.0.0-test',
|
||||
isPackaged: false,
|
||||
},
|
||||
utilityProcess: {
|
||||
fork: vi.fn(),
|
||||
},
|
||||
}));
|
||||
|
||||
describe('GatewayConnectionMonitor heartbeat', () => {
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date('2026-03-19T00:00:00.000Z'));
|
||||
});
|
||||
|
||||
it('terminates only after consecutive heartbeat misses reach threshold', () => {
|
||||
const monitor = new GatewayConnectionMonitor();
|
||||
const sendPing = vi.fn();
|
||||
const onHeartbeatTimeout = vi.fn();
|
||||
|
||||
monitor.startPing({
|
||||
sendPing,
|
||||
onHeartbeatTimeout,
|
||||
intervalMs: 100,
|
||||
timeoutMs: 50,
|
||||
maxConsecutiveMisses: 3,
|
||||
});
|
||||
|
||||
vi.advanceTimersByTime(100); // send ping #1
|
||||
vi.advanceTimersByTime(100); // miss #1, send ping #2
|
||||
vi.advanceTimersByTime(100); // miss #2, send ping #3
|
||||
expect(onHeartbeatTimeout).not.toHaveBeenCalled();
|
||||
|
||||
vi.advanceTimersByTime(100); // miss #3 -> timeout callback
|
||||
expect(onHeartbeatTimeout).toHaveBeenCalledTimes(1);
|
||||
expect(onHeartbeatTimeout).toHaveBeenCalledWith({ consecutiveMisses: 3, timeoutMs: 50 });
|
||||
expect(sendPing).toHaveBeenCalledTimes(3);
|
||||
});
|
||||
|
||||
it('resets miss counter when alive signal is received', () => {
|
||||
const monitor = new GatewayConnectionMonitor();
|
||||
const sendPing = vi.fn();
|
||||
const onHeartbeatTimeout = vi.fn();
|
||||
|
||||
monitor.startPing({
|
||||
sendPing,
|
||||
onHeartbeatTimeout,
|
||||
intervalMs: 100,
|
||||
timeoutMs: 50,
|
||||
maxConsecutiveMisses: 2,
|
||||
});
|
||||
|
||||
vi.advanceTimersByTime(100); // send ping #1
|
||||
vi.advanceTimersByTime(100); // miss #1, send ping #2
|
||||
expect(monitor.getConsecutiveMisses()).toBe(1);
|
||||
|
||||
monitor.markAlive('pong');
|
||||
expect(monitor.getConsecutiveMisses()).toBe(0);
|
||||
|
||||
vi.advanceTimersByTime(100); // send ping #3
|
||||
vi.advanceTimersByTime(100); // miss #1 again (reset confirmed)
|
||||
expect(monitor.getConsecutiveMisses()).toBe(1);
|
||||
expect(onHeartbeatTimeout).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
79
tests/unit/gateway-manager-heartbeat.test.ts
Normal file
79
tests/unit/gateway-manager-heartbeat.test.ts
Normal file
@@ -0,0 +1,79 @@
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
vi.mock('electron', () => ({
|
||||
app: {
|
||||
getPath: () => '/tmp',
|
||||
isPackaged: false,
|
||||
},
|
||||
utilityProcess: {
|
||||
fork: vi.fn(),
|
||||
},
|
||||
}));
|
||||
|
||||
describe('GatewayManager heartbeat recovery', () => {
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date('2026-03-19T00:00:00.000Z'));
|
||||
});
|
||||
|
||||
it('terminates stale socket only after 3 consecutive heartbeat misses', async () => {
|
||||
const { GatewayManager } = await import('@electron/gateway/manager');
|
||||
const manager = new GatewayManager();
|
||||
|
||||
const ws = {
|
||||
readyState: 1, // WebSocket.OPEN
|
||||
ping: vi.fn(),
|
||||
terminate: vi.fn(),
|
||||
on: vi.fn(),
|
||||
};
|
||||
|
||||
(manager as unknown as { ws: typeof ws }).ws = ws;
|
||||
(manager as unknown as { shouldReconnect: boolean }).shouldReconnect = true;
|
||||
(manager as unknown as { status: { state: string; port: number } }).status = {
|
||||
state: 'running',
|
||||
port: 18789,
|
||||
};
|
||||
|
||||
(manager as unknown as { startPing: () => void }).startPing();
|
||||
|
||||
vi.advanceTimersByTime(120_000);
|
||||
|
||||
expect(ws.ping).toHaveBeenCalledTimes(3);
|
||||
expect(ws.terminate).toHaveBeenCalledTimes(1);
|
||||
|
||||
(manager as unknown as { connectionMonitor: { clear: () => void } }).connectionMonitor.clear();
|
||||
});
|
||||
|
||||
it('does not terminate when heartbeat is recovered by incoming messages', async () => {
|
||||
const { GatewayManager } = await import('@electron/gateway/manager');
|
||||
const manager = new GatewayManager();
|
||||
|
||||
const ws = {
|
||||
readyState: 1, // WebSocket.OPEN
|
||||
ping: vi.fn(),
|
||||
terminate: vi.fn(),
|
||||
on: vi.fn(),
|
||||
};
|
||||
|
||||
(manager as unknown as { ws: typeof ws }).ws = ws;
|
||||
(manager as unknown as { shouldReconnect: boolean }).shouldReconnect = true;
|
||||
(manager as unknown as { status: { state: string; port: number } }).status = {
|
||||
state: 'running',
|
||||
port: 18789,
|
||||
};
|
||||
|
||||
(manager as unknown as { startPing: () => void }).startPing();
|
||||
|
||||
vi.advanceTimersByTime(30_000); // ping #1
|
||||
vi.advanceTimersByTime(30_000); // miss #1 + ping #2
|
||||
(manager as unknown as { handleMessage: (message: unknown) => void }).handleMessage('alive');
|
||||
|
||||
vi.advanceTimersByTime(30_000); // recovered, ping #3
|
||||
vi.advanceTimersByTime(30_000); // miss #1 + ping #4
|
||||
vi.advanceTimersByTime(30_000); // miss #2 + ping #5
|
||||
|
||||
expect(ws.terminate).not.toHaveBeenCalled();
|
||||
|
||||
(manager as unknown as { connectionMonitor: { clear: () => void } }).connectionMonitor.clear();
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user