Stabilize gateway reload/restart behavior and remove doctor --json dependency (#504)
This commit is contained in:
committed by
GitHub
Unverified
parent
89bda3c7af
commit
7f3408559d
@@ -43,6 +43,12 @@ import { GatewayConnectionMonitor } from './connection-monitor';
|
||||
import { GatewayLifecycleController, LifecycleSupersededError } from './lifecycle-controller';
|
||||
import { launchGatewayProcess } from './process-launcher';
|
||||
import { GatewayRestartController } from './restart-controller';
|
||||
import { GatewayRestartGovernor } from './restart-governor';
|
||||
import {
|
||||
DEFAULT_GATEWAY_RELOAD_POLICY,
|
||||
loadGatewayReloadPolicy,
|
||||
type GatewayReloadPolicy,
|
||||
} from './reload-policy';
|
||||
import { classifyGatewayStderrMessage, recordGatewayStartupStderrLine } from './startup-stderr';
|
||||
import { runGatewayStartupSequence } from './startup-orchestrator';
|
||||
|
||||
@@ -94,12 +100,15 @@ export class GatewayManager extends EventEmitter {
|
||||
private readonly connectionMonitor = new GatewayConnectionMonitor();
|
||||
private readonly lifecycleController = new GatewayLifecycleController();
|
||||
private readonly restartController = new GatewayRestartController();
|
||||
private readonly restartGovernor = new GatewayRestartGovernor();
|
||||
private reloadDebounceTimer: NodeJS.Timeout | null = null;
|
||||
private reloadPolicy: GatewayReloadPolicy = { ...DEFAULT_GATEWAY_RELOAD_POLICY };
|
||||
private reloadPolicyLoadedAt = 0;
|
||||
private reloadPolicyRefreshPromise: Promise<void> | null = null;
|
||||
private externalShutdownSupported: boolean | null = null;
|
||||
private lastRestartAt = 0;
|
||||
private reconnectAttemptsTotal = 0;
|
||||
private reconnectSuccessTotal = 0;
|
||||
private static readonly RESTART_COOLDOWN_MS = 2500;
|
||||
private static readonly RELOAD_POLICY_REFRESH_MS = 15_000;
|
||||
|
||||
constructor(config?: Partial<ReconnectConfig>) {
|
||||
super();
|
||||
@@ -109,6 +118,9 @@ export class GatewayManager extends EventEmitter {
|
||||
this.emit('status', status);
|
||||
},
|
||||
onTransition: (previousState, nextState) => {
|
||||
if (nextState === 'running') {
|
||||
this.restartGovernor.onRunning();
|
||||
}
|
||||
this.restartController.flushDeferredRestart(
|
||||
`status:${previousState}->${nextState}`,
|
||||
{
|
||||
@@ -186,6 +198,7 @@ export class GatewayManager extends EventEmitter {
|
||||
logger.info(`Gateway start requested (port=${this.status.port})`);
|
||||
this.lastSpawnSummary = null;
|
||||
this.shouldReconnect = true;
|
||||
await this.refreshReloadPolicy(true);
|
||||
|
||||
// Lazily load device identity (async file I/O + key generation).
|
||||
// Must happen before connect() which uses the identity for the handshake.
|
||||
@@ -353,18 +366,27 @@ export class GatewayManager extends EventEmitter {
|
||||
return;
|
||||
}
|
||||
|
||||
const now = Date.now();
|
||||
const sinceLastRestart = now - this.lastRestartAt;
|
||||
if (sinceLastRestart < GatewayManager.RESTART_COOLDOWN_MS) {
|
||||
logger.info(
|
||||
`Gateway restart skipped due to cooldown (${sinceLastRestart}ms < ${GatewayManager.RESTART_COOLDOWN_MS}ms)`,
|
||||
const decision = this.restartGovernor.decide();
|
||||
if (!decision.allow) {
|
||||
const observability = this.restartGovernor.getObservability();
|
||||
logger.warn(
|
||||
`[gateway-restart-governor] restart suppressed reason=${decision.reason} retryAfterMs=${decision.retryAfterMs} ` +
|
||||
`suppressed=${observability.suppressed_total} executed=${observability.executed_total} circuitOpenUntil=${observability.circuit_open_until}`,
|
||||
);
|
||||
const props = {
|
||||
reason: decision.reason,
|
||||
retry_after_ms: decision.retryAfterMs,
|
||||
gateway_restart_suppressed_total: observability.suppressed_total,
|
||||
gateway_restart_executed_total: observability.executed_total,
|
||||
gateway_restart_circuit_open_until: observability.circuit_open_until,
|
||||
};
|
||||
trackMetric('gateway.restart.suppressed', props);
|
||||
captureTelemetryEvent('gateway_restart_suppressed', props);
|
||||
return;
|
||||
}
|
||||
|
||||
const pidBefore = this.status.pid;
|
||||
logger.info(`[gateway-refresh] mode=restart requested pidBefore=${pidBefore ?? 'n/a'}`);
|
||||
this.lastRestartAt = now;
|
||||
this.restartInFlight = (async () => {
|
||||
await this.stop();
|
||||
await this.start();
|
||||
@@ -372,8 +394,18 @@ export class GatewayManager extends EventEmitter {
|
||||
|
||||
try {
|
||||
await this.restartInFlight;
|
||||
this.restartGovernor.recordExecuted();
|
||||
const observability = this.restartGovernor.getObservability();
|
||||
const props = {
|
||||
gateway_restart_executed_total: observability.executed_total,
|
||||
gateway_restart_suppressed_total: observability.suppressed_total,
|
||||
gateway_restart_circuit_open_until: observability.circuit_open_until,
|
||||
};
|
||||
trackMetric('gateway.restart.executed', props);
|
||||
captureTelemetryEvent('gateway_restart_executed', props);
|
||||
logger.info(
|
||||
`[gateway-refresh] mode=restart result=applied pidBefore=${pidBefore ?? 'n/a'} pidAfter=${this.status.pid ?? 'n/a'}`,
|
||||
`[gateway-refresh] mode=restart result=applied pidBefore=${pidBefore ?? 'n/a'} pidAfter=${this.status.pid ?? 'n/a'} ` +
|
||||
`suppressed=${observability.suppressed_total} executed=${observability.executed_total} circuitOpenUntil=${observability.circuit_open_until}`,
|
||||
);
|
||||
} finally {
|
||||
this.restartInFlight = null;
|
||||
@@ -413,6 +445,16 @@ export class GatewayManager extends EventEmitter {
|
||||
* Falls back to restart on unsupported platforms or signaling failures.
|
||||
*/
|
||||
async reload(): Promise<void> {
|
||||
await this.refreshReloadPolicy();
|
||||
|
||||
if (this.reloadPolicy.mode === 'off' || this.reloadPolicy.mode === 'restart') {
|
||||
logger.info(
|
||||
`[gateway-refresh] mode=reload result=policy_forced_restart policy=${this.reloadPolicy.mode}`,
|
||||
);
|
||||
await this.restart();
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.restartController.isRestartDeferred({
|
||||
state: this.status.state,
|
||||
startLock: this.startLock,
|
||||
@@ -481,17 +523,51 @@ export class GatewayManager extends EventEmitter {
|
||||
* Debounced reload — coalesces multiple rapid config-change events into one
|
||||
* in-process reload when possible.
|
||||
*/
|
||||
debouncedReload(delayMs = 1200): void {
|
||||
debouncedReload(delayMs?: number): void {
|
||||
void this.refreshReloadPolicy();
|
||||
const effectiveDelay = delayMs ?? this.reloadPolicy.debounceMs;
|
||||
if (this.reloadPolicy.mode === 'off' || this.reloadPolicy.mode === 'restart') {
|
||||
logger.debug(
|
||||
`Gateway reload policy=${this.reloadPolicy.mode}; routing debouncedReload to debouncedRestart (${effectiveDelay}ms)`,
|
||||
);
|
||||
this.debouncedRestart(effectiveDelay);
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.reloadDebounceTimer) {
|
||||
clearTimeout(this.reloadDebounceTimer);
|
||||
}
|
||||
logger.debug(`Gateway reload debounced (will fire in ${delayMs}ms)`);
|
||||
logger.debug(`Gateway reload debounced (will fire in ${effectiveDelay}ms)`);
|
||||
this.reloadDebounceTimer = setTimeout(() => {
|
||||
this.reloadDebounceTimer = null;
|
||||
void this.reload().catch((err) => {
|
||||
logger.warn('Debounced Gateway reload failed:', err);
|
||||
});
|
||||
}, delayMs);
|
||||
}, effectiveDelay);
|
||||
}
|
||||
|
||||
private async refreshReloadPolicy(force = false): Promise<void> {
|
||||
const now = Date.now();
|
||||
if (!force && now - this.reloadPolicyLoadedAt < GatewayManager.RELOAD_POLICY_REFRESH_MS) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.reloadPolicyRefreshPromise) {
|
||||
await this.reloadPolicyRefreshPromise;
|
||||
return;
|
||||
}
|
||||
|
||||
this.reloadPolicyRefreshPromise = (async () => {
|
||||
const nextPolicy = await loadGatewayReloadPolicy();
|
||||
this.reloadPolicy = nextPolicy;
|
||||
this.reloadPolicyLoadedAt = Date.now();
|
||||
})();
|
||||
|
||||
try {
|
||||
await this.reloadPolicyRefreshPromise;
|
||||
} finally {
|
||||
this.reloadPolicyRefreshPromise = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
63
electron/gateway/reload-policy.ts
Normal file
63
electron/gateway/reload-policy.ts
Normal file
@@ -0,0 +1,63 @@
|
||||
import { readFile } from 'node:fs/promises';
|
||||
import { homedir } from 'node:os';
|
||||
import { join } from 'node:path';
|
||||
|
||||
export type GatewayReloadMode = 'hybrid' | 'reload' | 'restart' | 'off';
|
||||
|
||||
export type GatewayReloadPolicy = {
|
||||
mode: GatewayReloadMode;
|
||||
debounceMs: number;
|
||||
};
|
||||
|
||||
export const DEFAULT_GATEWAY_RELOAD_POLICY: GatewayReloadPolicy = {
|
||||
mode: 'hybrid',
|
||||
debounceMs: 1200,
|
||||
};
|
||||
|
||||
const OPENCLAW_CONFIG_PATH = join(homedir(), '.openclaw', 'openclaw.json');
|
||||
const MAX_DEBOUNCE_MS = 60_000;
|
||||
|
||||
function normalizeMode(value: unknown): GatewayReloadMode {
|
||||
if (value === 'off' || value === 'reload' || value === 'restart' || value === 'hybrid') {
|
||||
return value;
|
||||
}
|
||||
return DEFAULT_GATEWAY_RELOAD_POLICY.mode;
|
||||
}
|
||||
|
||||
function normalizeDebounceMs(value: unknown): number {
|
||||
if (typeof value !== 'number' || !Number.isFinite(value)) {
|
||||
return DEFAULT_GATEWAY_RELOAD_POLICY.debounceMs;
|
||||
}
|
||||
const rounded = Math.round(value);
|
||||
if (rounded < 0) return 0;
|
||||
if (rounded > MAX_DEBOUNCE_MS) return MAX_DEBOUNCE_MS;
|
||||
return rounded;
|
||||
}
|
||||
|
||||
export function parseGatewayReloadPolicy(config: unknown): GatewayReloadPolicy {
|
||||
if (!config || typeof config !== 'object') {
|
||||
return { ...DEFAULT_GATEWAY_RELOAD_POLICY };
|
||||
}
|
||||
const root = config as Record<string, unknown>;
|
||||
const gateway = (root.gateway && typeof root.gateway === 'object'
|
||||
? root.gateway
|
||||
: {}) as Record<string, unknown>;
|
||||
const reload = (gateway.reload && typeof gateway.reload === 'object'
|
||||
? gateway.reload
|
||||
: {}) as Record<string, unknown>;
|
||||
|
||||
return {
|
||||
mode: normalizeMode(reload.mode),
|
||||
debounceMs: normalizeDebounceMs(reload.debounceMs),
|
||||
};
|
||||
}
|
||||
|
||||
export async function loadGatewayReloadPolicy(): Promise<GatewayReloadPolicy> {
|
||||
try {
|
||||
const raw = await readFile(OPENCLAW_CONFIG_PATH, 'utf-8');
|
||||
return parseGatewayReloadPolicy(JSON.parse(raw));
|
||||
} catch {
|
||||
return { ...DEFAULT_GATEWAY_RELOAD_POLICY };
|
||||
}
|
||||
}
|
||||
|
||||
145
electron/gateway/restart-governor.ts
Normal file
145
electron/gateway/restart-governor.ts
Normal file
@@ -0,0 +1,145 @@
|
||||
export type RestartDecision =
|
||||
| { allow: true }
|
||||
| {
|
||||
allow: false;
|
||||
reason: 'circuit_open' | 'budget_exceeded' | 'cooldown_active';
|
||||
retryAfterMs: number;
|
||||
};
|
||||
|
||||
type RestartGovernorOptions = {
|
||||
maxRestartsPerWindow: number;
|
||||
windowMs: number;
|
||||
baseCooldownMs: number;
|
||||
maxCooldownMs: number;
|
||||
circuitOpenMs: number;
|
||||
stableResetMs: number;
|
||||
};
|
||||
|
||||
const DEFAULT_OPTIONS: RestartGovernorOptions = {
|
||||
maxRestartsPerWindow: 4,
|
||||
windowMs: 10 * 60 * 1000,
|
||||
baseCooldownMs: 2500,
|
||||
maxCooldownMs: 2 * 60 * 1000,
|
||||
circuitOpenMs: 10 * 60 * 1000,
|
||||
stableResetMs: 2 * 60 * 1000,
|
||||
};
|
||||
|
||||
export class GatewayRestartGovernor {
|
||||
private readonly options: RestartGovernorOptions;
|
||||
private restartTimestamps: number[] = [];
|
||||
private circuitOpenUntil = 0;
|
||||
private consecutiveRestarts = 0;
|
||||
private lastRestartAt = 0;
|
||||
private lastRunningAt = 0;
|
||||
private suppressedTotal = 0;
|
||||
private executedTotal = 0;
|
||||
private static readonly MAX_COUNTER = Number.MAX_SAFE_INTEGER;
|
||||
|
||||
constructor(options?: Partial<RestartGovernorOptions>) {
|
||||
this.options = { ...DEFAULT_OPTIONS, ...options };
|
||||
}
|
||||
|
||||
onRunning(now = Date.now()): void {
|
||||
this.lastRunningAt = now;
|
||||
}
|
||||
|
||||
decide(now = Date.now()): RestartDecision {
|
||||
this.pruneOld(now);
|
||||
this.maybeResetConsecutive(now);
|
||||
|
||||
if (now < this.circuitOpenUntil) {
|
||||
this.suppressedTotal = this.incrementCounter(this.suppressedTotal);
|
||||
return {
|
||||
allow: false,
|
||||
reason: 'circuit_open',
|
||||
retryAfterMs: this.circuitOpenUntil - now,
|
||||
};
|
||||
}
|
||||
|
||||
if (this.restartTimestamps.length >= this.options.maxRestartsPerWindow) {
|
||||
this.circuitOpenUntil = now + this.options.circuitOpenMs;
|
||||
this.suppressedTotal = this.incrementCounter(this.suppressedTotal);
|
||||
return {
|
||||
allow: false,
|
||||
reason: 'budget_exceeded',
|
||||
retryAfterMs: this.options.circuitOpenMs,
|
||||
};
|
||||
}
|
||||
|
||||
const requiredCooldown = this.getCooldownMs();
|
||||
if (this.lastRestartAt > 0) {
|
||||
const sinceLast = now - this.lastRestartAt;
|
||||
if (sinceLast < requiredCooldown) {
|
||||
this.suppressedTotal = this.incrementCounter(this.suppressedTotal);
|
||||
return {
|
||||
allow: false,
|
||||
reason: 'cooldown_active',
|
||||
retryAfterMs: requiredCooldown - sinceLast,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
return { allow: true };
|
||||
}
|
||||
|
||||
recordExecuted(now = Date.now()): void {
|
||||
this.executedTotal = this.incrementCounter(this.executedTotal);
|
||||
this.lastRestartAt = now;
|
||||
this.consecutiveRestarts += 1;
|
||||
this.restartTimestamps.push(now);
|
||||
this.pruneOld(now);
|
||||
}
|
||||
|
||||
getCounters(): { executedTotal: number; suppressedTotal: number } {
|
||||
return {
|
||||
executedTotal: this.executedTotal,
|
||||
suppressedTotal: this.suppressedTotal,
|
||||
};
|
||||
}
|
||||
|
||||
getObservability(): {
|
||||
suppressed_total: number;
|
||||
executed_total: number;
|
||||
circuit_open_until: number;
|
||||
} {
|
||||
return {
|
||||
suppressed_total: this.suppressedTotal,
|
||||
executed_total: this.executedTotal,
|
||||
circuit_open_until: this.circuitOpenUntil,
|
||||
};
|
||||
}
|
||||
|
||||
private getCooldownMs(): number {
|
||||
const factor = Math.pow(2, Math.max(0, this.consecutiveRestarts));
|
||||
return Math.min(this.options.baseCooldownMs * factor, this.options.maxCooldownMs);
|
||||
}
|
||||
|
||||
private maybeResetConsecutive(now: number): void {
|
||||
if (this.lastRunningAt <= 0) return;
|
||||
if (now - this.lastRunningAt >= this.options.stableResetMs) {
|
||||
this.consecutiveRestarts = 0;
|
||||
}
|
||||
}
|
||||
|
||||
private pruneOld(now: number): void {
|
||||
// Detect time rewind (system clock moved backwards) and clear all
|
||||
// time-based guard state to avoid stale lockouts.
|
||||
if (this.restartTimestamps.length > 0 && now < this.restartTimestamps[this.restartTimestamps.length - 1]) {
|
||||
this.restartTimestamps = [];
|
||||
this.circuitOpenUntil = 0;
|
||||
this.lastRestartAt = 0;
|
||||
this.lastRunningAt = 0;
|
||||
this.consecutiveRestarts = 0;
|
||||
return;
|
||||
}
|
||||
const threshold = now - this.options.windowMs;
|
||||
while (this.restartTimestamps.length > 0 && this.restartTimestamps[0] < threshold) {
|
||||
this.restartTimestamps.shift();
|
||||
}
|
||||
}
|
||||
|
||||
private incrementCounter(current: number): number {
|
||||
if (current >= GatewayRestartGovernor.MAX_COUNTER) return 0;
|
||||
return current + 1;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user