feat(gateway): add websocket pong timeout and channel auto-reconnect watchdog (#569)
This commit is contained in:
committed by
GitHub
Unverified
parent
554f894493
commit
1eda50ef44
@@ -4,18 +4,45 @@ type HealthResult = { ok: boolean; error?: string };
|
|||||||
|
|
||||||
export class GatewayConnectionMonitor {
|
export class GatewayConnectionMonitor {
|
||||||
private pingInterval: NodeJS.Timeout | null = null;
|
private pingInterval: NodeJS.Timeout | null = null;
|
||||||
|
private pongTimeout: NodeJS.Timeout | null = null;
|
||||||
private healthCheckInterval: NodeJS.Timeout | null = null;
|
private healthCheckInterval: NodeJS.Timeout | null = null;
|
||||||
|
|
||||||
startPing(sendPing: () => void, intervalMs = 30000): void {
|
startPing(
|
||||||
|
sendPing: () => void,
|
||||||
|
onPongTimeout?: () => void,
|
||||||
|
intervalMs = 30000,
|
||||||
|
timeoutMs = 15000,
|
||||||
|
): void {
|
||||||
if (this.pingInterval) {
|
if (this.pingInterval) {
|
||||||
clearInterval(this.pingInterval);
|
clearInterval(this.pingInterval);
|
||||||
}
|
}
|
||||||
|
if (this.pongTimeout) {
|
||||||
|
clearTimeout(this.pongTimeout);
|
||||||
|
this.pongTimeout = null;
|
||||||
|
}
|
||||||
|
|
||||||
this.pingInterval = setInterval(() => {
|
this.pingInterval = setInterval(() => {
|
||||||
sendPing();
|
sendPing();
|
||||||
|
|
||||||
|
if (onPongTimeout) {
|
||||||
|
if (this.pongTimeout) {
|
||||||
|
clearTimeout(this.pongTimeout);
|
||||||
|
}
|
||||||
|
this.pongTimeout = setTimeout(() => {
|
||||||
|
this.pongTimeout = null;
|
||||||
|
onPongTimeout();
|
||||||
|
}, timeoutMs);
|
||||||
|
}
|
||||||
}, intervalMs);
|
}, intervalMs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
handlePong(): void {
|
||||||
|
if (this.pongTimeout) {
|
||||||
|
clearTimeout(this.pongTimeout);
|
||||||
|
this.pongTimeout = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
startHealthCheck(options: {
|
startHealthCheck(options: {
|
||||||
shouldCheck: () => boolean;
|
shouldCheck: () => boolean;
|
||||||
checkHealth: () => Promise<HealthResult>;
|
checkHealth: () => Promise<HealthResult>;
|
||||||
@@ -51,6 +78,10 @@ export class GatewayConnectionMonitor {
|
|||||||
clearInterval(this.pingInterval);
|
clearInterval(this.pingInterval);
|
||||||
this.pingInterval = null;
|
this.pingInterval = null;
|
||||||
}
|
}
|
||||||
|
if (this.pongTimeout) {
|
||||||
|
clearTimeout(this.pongTimeout);
|
||||||
|
this.pongTimeout = null;
|
||||||
|
}
|
||||||
if (this.healthCheckInterval) {
|
if (this.healthCheckInterval) {
|
||||||
clearInterval(this.healthCheckInterval);
|
clearInterval(this.healthCheckInterval);
|
||||||
this.healthCheckInterval = null;
|
this.healthCheckInterval = null;
|
||||||
|
|||||||
@@ -109,6 +109,8 @@ export class GatewayManager extends EventEmitter {
|
|||||||
private reconnectAttemptsTotal = 0;
|
private reconnectAttemptsTotal = 0;
|
||||||
private reconnectSuccessTotal = 0;
|
private reconnectSuccessTotal = 0;
|
||||||
private static readonly RELOAD_POLICY_REFRESH_MS = 15_000;
|
private static readonly RELOAD_POLICY_REFRESH_MS = 15_000;
|
||||||
|
public static readonly RESTART_COOLDOWN_MS = 5_000;
|
||||||
|
private lastRestartAt = 0;
|
||||||
|
|
||||||
constructor(config?: Partial<ReconnectConfig>) {
|
constructor(config?: Partial<ReconnectConfig>) {
|
||||||
super();
|
super();
|
||||||
@@ -727,6 +729,9 @@ export class GatewayManager extends EventEmitter {
|
|||||||
getToken: async () => await import('../utils/store').then(({ getSetting }) => getSetting('gatewayToken')),
|
getToken: async () => await import('../utils/store').then(({ getSetting }) => getSetting('gatewayToken')),
|
||||||
onHandshakeComplete: (ws) => {
|
onHandshakeComplete: (ws) => {
|
||||||
this.ws = ws;
|
this.ws = ws;
|
||||||
|
this.ws.on('pong', () => {
|
||||||
|
this.connectionMonitor.handlePong();
|
||||||
|
});
|
||||||
this.setStatus({
|
this.setStatus({
|
||||||
state: 'running',
|
state: 'running',
|
||||||
port,
|
port,
|
||||||
@@ -802,11 +807,24 @@ export class GatewayManager extends EventEmitter {
|
|||||||
* Start ping interval to keep connection alive
|
* Start ping interval to keep connection alive
|
||||||
*/
|
*/
|
||||||
private startPing(): void {
|
private startPing(): void {
|
||||||
this.connectionMonitor.startPing(() => {
|
this.connectionMonitor.startPing(
|
||||||
|
() => {
|
||||||
if (this.ws?.readyState === WebSocket.OPEN) {
|
if (this.ws?.readyState === WebSocket.OPEN) {
|
||||||
this.ws.ping();
|
this.ws.ping();
|
||||||
}
|
}
|
||||||
});
|
},
|
||||||
|
() => {
|
||||||
|
logger.error('Gateway WebSocket dead connection detected (pong timeout)');
|
||||||
|
if (this.ws) {
|
||||||
|
this.ws.terminate(); // Force close the dead connection immediately
|
||||||
|
this.ws = null;
|
||||||
|
}
|
||||||
|
if (this.status.state === 'running') {
|
||||||
|
this.setStatus({ state: 'error', error: 'WebSocket ping timeout' });
|
||||||
|
this.scheduleReconnect();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -33,8 +33,13 @@ interface ChannelsState {
|
|||||||
setChannels: (channels: Channel[]) => void;
|
setChannels: (channels: Channel[]) => void;
|
||||||
updateChannel: (channelId: string, updates: Partial<Channel>) => void;
|
updateChannel: (channelId: string, updates: Partial<Channel>) => void;
|
||||||
clearError: () => void;
|
clearError: () => void;
|
||||||
|
scheduleAutoReconnect: (channelId: string) => void;
|
||||||
|
clearAutoReconnect: (channelId: string) => void;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const reconnectTimers = new Map<string, NodeJS.Timeout>();
|
||||||
|
const reconnectAttempts = new Map<string, number>();
|
||||||
|
|
||||||
export const useChannelsStore = create<ChannelsState>((set, get) => ({
|
export const useChannelsStore = create<ChannelsState>((set, get) => ({
|
||||||
channels: [],
|
channels: [],
|
||||||
loading: false,
|
loading: false,
|
||||||
@@ -194,7 +199,8 @@ export const useChannelsStore = create<ChannelsState>((set, get) => ({
|
|||||||
},
|
},
|
||||||
|
|
||||||
disconnectChannel: async (channelId) => {
|
disconnectChannel: async (channelId) => {
|
||||||
const { updateChannel } = get();
|
const { updateChannel, clearAutoReconnect } = get();
|
||||||
|
clearAutoReconnect(channelId);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await useGatewayStore.getState().rpc('channels.disconnect', { channelId });
|
await useGatewayStore.getState().rpc('channels.disconnect', { channelId });
|
||||||
@@ -223,4 +229,37 @@ export const useChannelsStore = create<ChannelsState>((set, get) => ({
|
|||||||
},
|
},
|
||||||
|
|
||||||
clearError: () => set({ error: null }),
|
clearError: () => set({ error: null }),
|
||||||
|
|
||||||
|
scheduleAutoReconnect: (channelId) => {
|
||||||
|
if (reconnectTimers.has(channelId)) return;
|
||||||
|
|
||||||
|
const attempts = reconnectAttempts.get(channelId) || 0;
|
||||||
|
// Exponential backoff capped at 2 minutes
|
||||||
|
const delay = Math.min(5000 * Math.pow(2, attempts), 120000);
|
||||||
|
|
||||||
|
console.log(`[Watchdog] Scheduling auto-reconnect for ${channelId} in ${delay}ms (attempt ${attempts + 1})`);
|
||||||
|
|
||||||
|
const timer = setTimeout(() => {
|
||||||
|
reconnectTimers.delete(channelId);
|
||||||
|
const state = get();
|
||||||
|
const channel = state.channels.find((c) => c.id === channelId);
|
||||||
|
|
||||||
|
if (channel && (channel.status === 'disconnected' || channel.status === 'error')) {
|
||||||
|
reconnectAttempts.set(channelId, attempts + 1);
|
||||||
|
console.log(`[Watchdog] Executing auto-reconnect for ${channelId} (attempt ${attempts + 1})`);
|
||||||
|
state.connectChannel(channelId).catch(() => {});
|
||||||
|
}
|
||||||
|
}, delay);
|
||||||
|
|
||||||
|
reconnectTimers.set(channelId, timer);
|
||||||
|
},
|
||||||
|
|
||||||
|
clearAutoReconnect: (channelId) => {
|
||||||
|
const timer = reconnectTimers.get(channelId);
|
||||||
|
if (timer) {
|
||||||
|
clearTimeout(timer);
|
||||||
|
reconnectTimers.delete(channelId);
|
||||||
|
}
|
||||||
|
reconnectAttempts.delete(channelId);
|
||||||
|
},
|
||||||
}));
|
}));
|
||||||
|
|||||||
@@ -269,7 +269,14 @@ export const useGatewayStore = create<GatewayState>((set, get) => ({
|
|||||||
const state = useChannelsStore.getState();
|
const state = useChannelsStore.getState();
|
||||||
const channel = state.channels.find((item) => item.type === update.channelId);
|
const channel = state.channels.find((item) => item.type === update.channelId);
|
||||||
if (channel) {
|
if (channel) {
|
||||||
state.updateChannel(channel.id, { status: mapChannelStatus(update.status) });
|
const newStatus = mapChannelStatus(update.status);
|
||||||
|
state.updateChannel(channel.id, { status: newStatus });
|
||||||
|
|
||||||
|
if (newStatus === 'disconnected' || newStatus === 'error') {
|
||||||
|
state.scheduleAutoReconnect(channel.id);
|
||||||
|
} else if (newStatus === 'connected' || newStatus === 'connecting') {
|
||||||
|
state.clearAutoReconnect(channel.id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.catch(() => {});
|
.catch(() => {});
|
||||||
|
|||||||
Reference in New Issue
Block a user