From 1eda50ef44c9c2f4a93d6c852f39d2a3d9ff2af8 Mon Sep 17 00:00:00 2001 From: paisley <8197966+su8su@users.noreply.github.com> Date: Wed, 18 Mar 2026 13:25:30 +0800 Subject: [PATCH] feat(gateway): add websocket pong timeout and channel auto-reconnect watchdog (#569) --- electron/gateway/connection-monitor.ts | 33 ++++++++++++++++++++- electron/gateway/manager.ts | 26 +++++++++++++--- src/stores/channels.ts | 41 +++++++++++++++++++++++++- src/stores/gateway.ts | 9 +++++- 4 files changed, 102 insertions(+), 7 deletions(-) diff --git a/electron/gateway/connection-monitor.ts b/electron/gateway/connection-monitor.ts index 56afe75c4..4f3010e22 100644 --- a/electron/gateway/connection-monitor.ts +++ b/electron/gateway/connection-monitor.ts @@ -4,18 +4,45 @@ type HealthResult = { ok: boolean; error?: string }; export class GatewayConnectionMonitor { private pingInterval: NodeJS.Timeout | null = null; + private pongTimeout: NodeJS.Timeout | null = null; private healthCheckInterval: NodeJS.Timeout | null = null; - startPing(sendPing: () => void, intervalMs = 30000): void { + startPing( + sendPing: () => void, + onPongTimeout?: () => void, + intervalMs = 30000, + timeoutMs = 15000, + ): void { if (this.pingInterval) { clearInterval(this.pingInterval); } + if (this.pongTimeout) { + clearTimeout(this.pongTimeout); + this.pongTimeout = null; + } this.pingInterval = setInterval(() => { sendPing(); + + if (onPongTimeout) { + if (this.pongTimeout) { + clearTimeout(this.pongTimeout); + } + this.pongTimeout = setTimeout(() => { + this.pongTimeout = null; + onPongTimeout(); + }, timeoutMs); + } }, intervalMs); } + handlePong(): void { + if (this.pongTimeout) { + clearTimeout(this.pongTimeout); + this.pongTimeout = null; + } + } + startHealthCheck(options: { shouldCheck: () => boolean; checkHealth: () => Promise; @@ -51,6 +78,10 @@ export class GatewayConnectionMonitor { clearInterval(this.pingInterval); this.pingInterval = null; } + if (this.pongTimeout) { + clearTimeout(this.pongTimeout); + this.pongTimeout = null; + } if (this.healthCheckInterval) { clearInterval(this.healthCheckInterval); this.healthCheckInterval = null; diff --git a/electron/gateway/manager.ts b/electron/gateway/manager.ts index fdf79a195..c0de90b17 100644 --- a/electron/gateway/manager.ts +++ b/electron/gateway/manager.ts @@ -109,6 +109,8 @@ export class GatewayManager extends EventEmitter { private reconnectAttemptsTotal = 0; private reconnectSuccessTotal = 0; private static readonly RELOAD_POLICY_REFRESH_MS = 15_000; + public static readonly RESTART_COOLDOWN_MS = 5_000; + private lastRestartAt = 0; constructor(config?: Partial) { super(); @@ -727,6 +729,9 @@ export class GatewayManager extends EventEmitter { getToken: async () => await import('../utils/store').then(({ getSetting }) => getSetting('gatewayToken')), onHandshakeComplete: (ws) => { this.ws = ws; + this.ws.on('pong', () => { + this.connectionMonitor.handlePong(); + }); this.setStatus({ state: 'running', port, @@ -802,11 +807,24 @@ export class GatewayManager extends EventEmitter { * Start ping interval to keep connection alive */ private startPing(): void { - this.connectionMonitor.startPing(() => { - if (this.ws?.readyState === WebSocket.OPEN) { - this.ws.ping(); + this.connectionMonitor.startPing( + () => { + if (this.ws?.readyState === WebSocket.OPEN) { + this.ws.ping(); + } + }, + () => { + logger.error('Gateway WebSocket dead connection detected (pong timeout)'); + if (this.ws) { + this.ws.terminate(); // Force close the dead connection immediately + this.ws = null; + } + if (this.status.state === 'running') { + this.setStatus({ state: 'error', error: 'WebSocket ping timeout' }); + this.scheduleReconnect(); + } } - }); + ); } /** diff --git a/src/stores/channels.ts b/src/stores/channels.ts index f1aa68cdf..58d2db897 100644 --- a/src/stores/channels.ts +++ b/src/stores/channels.ts @@ -33,8 +33,13 @@ interface ChannelsState { setChannels: (channels: Channel[]) => void; updateChannel: (channelId: string, updates: Partial) => void; clearError: () => void; + scheduleAutoReconnect: (channelId: string) => void; + clearAutoReconnect: (channelId: string) => void; } +const reconnectTimers = new Map(); +const reconnectAttempts = new Map(); + export const useChannelsStore = create((set, get) => ({ channels: [], loading: false, @@ -194,7 +199,8 @@ export const useChannelsStore = create((set, get) => ({ }, disconnectChannel: async (channelId) => { - const { updateChannel } = get(); + const { updateChannel, clearAutoReconnect } = get(); + clearAutoReconnect(channelId); try { await useGatewayStore.getState().rpc('channels.disconnect', { channelId }); @@ -223,4 +229,37 @@ export const useChannelsStore = create((set, get) => ({ }, clearError: () => set({ error: null }), + + scheduleAutoReconnect: (channelId) => { + if (reconnectTimers.has(channelId)) return; + + const attempts = reconnectAttempts.get(channelId) || 0; + // Exponential backoff capped at 2 minutes + const delay = Math.min(5000 * Math.pow(2, attempts), 120000); + + console.log(`[Watchdog] Scheduling auto-reconnect for ${channelId} in ${delay}ms (attempt ${attempts + 1})`); + + const timer = setTimeout(() => { + reconnectTimers.delete(channelId); + const state = get(); + const channel = state.channels.find((c) => c.id === channelId); + + if (channel && (channel.status === 'disconnected' || channel.status === 'error')) { + reconnectAttempts.set(channelId, attempts + 1); + console.log(`[Watchdog] Executing auto-reconnect for ${channelId} (attempt ${attempts + 1})`); + state.connectChannel(channelId).catch(() => {}); + } + }, delay); + + reconnectTimers.set(channelId, timer); + }, + + clearAutoReconnect: (channelId) => { + const timer = reconnectTimers.get(channelId); + if (timer) { + clearTimeout(timer); + reconnectTimers.delete(channelId); + } + reconnectAttempts.delete(channelId); + }, })); diff --git a/src/stores/gateway.ts b/src/stores/gateway.ts index f07322836..21d644dd1 100644 --- a/src/stores/gateway.ts +++ b/src/stores/gateway.ts @@ -269,7 +269,14 @@ export const useGatewayStore = create((set, get) => ({ const state = useChannelsStore.getState(); const channel = state.channels.find((item) => item.type === update.channelId); if (channel) { - state.updateChannel(channel.id, { status: mapChannelStatus(update.status) }); + const newStatus = mapChannelStatus(update.status); + state.updateChannel(channel.id, { status: newStatus }); + + if (newStatus === 'disconnected' || newStatus === 'error') { + state.scheduleAutoReconnect(channel.id); + } else if (newStatus === 'connected' || newStatus === 'connecting') { + state.clearAutoReconnect(channel.id); + } } }) .catch(() => {});