Feat/upgrade openclaw (#729)
This commit is contained in:
committed by
GitHub
Unverified
parent
bf5b089158
commit
d34a88e629
@@ -235,8 +235,13 @@ export class GatewayManager extends EventEmitter {
|
||||
assertLifecycle: (phase) => {
|
||||
this.lifecycleController.assert(startEpoch, phase);
|
||||
},
|
||||
findExistingGateway: async (port, ownedPid) => {
|
||||
return await findExistingGatewayProcess({ port, ownedPid });
|
||||
findExistingGateway: async (port) => {
|
||||
// Always read the current process pid dynamically so that retries
|
||||
// don't treat a just-spawned gateway as an orphan. The ownedPid
|
||||
// snapshot captured at start() entry is stale after startProcess()
|
||||
// replaces this.process — leading to the just-started pid being
|
||||
// immediately killed as a false orphan on the next retry iteration.
|
||||
return await findExistingGatewayProcess({ port, ownedPid: this.process?.pid });
|
||||
},
|
||||
connect: async (port, externalToken) => {
|
||||
await this.connect(port, externalToken);
|
||||
@@ -335,9 +340,14 @@ export class GatewayManager extends EventEmitter {
|
||||
}
|
||||
}
|
||||
|
||||
// Close WebSocket
|
||||
// Close WebSocket — use terminate() to force-close the TCP connection
|
||||
// immediately without waiting for the WebSocket close handshake.
|
||||
// ws.close() sends a close frame and waits for the server to respond;
|
||||
// if the gateway process is being killed concurrently, the handshake
|
||||
// never completes and the connection stays ESTABLISHED indefinitely,
|
||||
// accumulating leaked connections on every restart cycle.
|
||||
if (this.ws) {
|
||||
this.ws.close(1000, 'Gateway stopped by user');
|
||||
try { this.ws.terminate(); } catch { /* ignore */ }
|
||||
this.ws = null;
|
||||
}
|
||||
|
||||
@@ -792,7 +802,7 @@ export class GatewayManager extends EventEmitter {
|
||||
onMessage: (message) => {
|
||||
this.handleMessage(message);
|
||||
},
|
||||
onCloseAfterHandshake: () => {
|
||||
onCloseAfterHandshake: (closeCode) => {
|
||||
this.connectionMonitor.clear();
|
||||
if (this.status.state === 'running') {
|
||||
this.setStatus({ state: 'stopped' });
|
||||
@@ -801,7 +811,11 @@ export class GatewayManager extends EventEmitter {
|
||||
// handler (`onExit`) which calls scheduleReconnect(). Triggering
|
||||
// reconnect from WS close as well races with the exit handler and can
|
||||
// cause double start() attempts or port conflicts during TCP TIME_WAIT.
|
||||
if (process.platform !== 'win32') {
|
||||
//
|
||||
// Exception: code=1012 means the Gateway is performing an in-process
|
||||
// restart (e.g. config reload). The UtilityProcess stays alive, so
|
||||
// `onExit` will never fire — we MUST reconnect from the WS close path.
|
||||
if (process.platform !== 'win32' || closeCode === 1012) {
|
||||
this.scheduleReconnect();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -117,6 +117,10 @@ export async function launchGatewayProcess(options: {
|
||||
const lastSpawnSummary = `mode=${mode}, entry="${entryScript}", args="${options.sanitizeSpawnArgs(gatewayArgs).join(' ')}", cwd="${openclawDir}"`;
|
||||
|
||||
const runtimeEnv = { ...forkEnv };
|
||||
// Only apply the fetch/child_process preload in dev mode.
|
||||
// In packaged builds Electron's UtilityProcess rejects NODE_OPTIONS
|
||||
// with --require, logging "Most NODE_OPTIONs are not supported in
|
||||
// packaged apps" and the preload never loads.
|
||||
if (!app.isPackaged) {
|
||||
try {
|
||||
const preloadPath = ensureGatewayFetchPreload();
|
||||
|
||||
@@ -2,79 +2,60 @@ export type RestartDecision =
|
||||
| { allow: true }
|
||||
| {
|
||||
allow: false;
|
||||
reason: 'circuit_open' | 'budget_exceeded' | 'cooldown_active';
|
||||
reason: 'cooldown_active';
|
||||
retryAfterMs: number;
|
||||
};
|
||||
|
||||
type RestartGovernorOptions = {
|
||||
maxRestartsPerWindow: number;
|
||||
windowMs: number;
|
||||
baseCooldownMs: number;
|
||||
maxCooldownMs: number;
|
||||
circuitOpenMs: number;
|
||||
stableResetMs: number;
|
||||
/** Minimum interval between consecutive restarts (ms). */
|
||||
cooldownMs: number;
|
||||
};
|
||||
|
||||
const DEFAULT_OPTIONS: RestartGovernorOptions = {
|
||||
maxRestartsPerWindow: 4,
|
||||
windowMs: 10 * 60 * 1000,
|
||||
baseCooldownMs: 2500,
|
||||
maxCooldownMs: 2 * 60 * 1000,
|
||||
circuitOpenMs: 10 * 60 * 1000,
|
||||
stableResetMs: 2 * 60 * 1000,
|
||||
cooldownMs: 2500,
|
||||
};
|
||||
|
||||
/**
|
||||
* Lightweight restart rate-limiter.
|
||||
*
|
||||
* Prevents rapid-fire restarts by enforcing a simple cooldown between
|
||||
* consecutive restart executions. Nothing more — no circuit breakers,
|
||||
* no sliding-window budgets, no exponential back-off. Those mechanisms
|
||||
* were previously present but removed because:
|
||||
*
|
||||
* 1. The root causes of infinite restart loops (stale ownedPid, port
|
||||
* contention, leaked WebSocket connections) have been fixed at their
|
||||
* source.
|
||||
* 2. A 10-minute circuit-breaker lockout actively hurt the user
|
||||
* experience: legitimate config changes were silently dropped.
|
||||
* 3. The complexity made the restart path harder to reason about during
|
||||
* debugging.
|
||||
*/
|
||||
export class GatewayRestartGovernor {
|
||||
private readonly options: RestartGovernorOptions;
|
||||
private restartTimestamps: number[] = [];
|
||||
private circuitOpenUntil = 0;
|
||||
private consecutiveRestarts = 0;
|
||||
private lastRestartAt = 0;
|
||||
private lastRunningAt = 0;
|
||||
private suppressedTotal = 0;
|
||||
private executedTotal = 0;
|
||||
private static readonly MAX_COUNTER = Number.MAX_SAFE_INTEGER;
|
||||
|
||||
constructor(options?: Partial<RestartGovernorOptions>) {
|
||||
this.options = { ...DEFAULT_OPTIONS, ...options };
|
||||
}
|
||||
|
||||
onRunning(now = Date.now()): void {
|
||||
this.lastRunningAt = now;
|
||||
/** No-op kept for interface compatibility with callers. */
|
||||
onRunning(_now = Date.now()): void {
|
||||
// Previously used to track "stable running" for exponential back-off
|
||||
// reset. No longer needed with the simplified cooldown model.
|
||||
}
|
||||
|
||||
decide(now = Date.now()): RestartDecision {
|
||||
this.pruneOld(now);
|
||||
this.maybeResetConsecutive(now);
|
||||
|
||||
if (now < this.circuitOpenUntil) {
|
||||
this.suppressedTotal = this.incrementCounter(this.suppressedTotal);
|
||||
return {
|
||||
allow: false,
|
||||
reason: 'circuit_open',
|
||||
retryAfterMs: this.circuitOpenUntil - now,
|
||||
};
|
||||
}
|
||||
|
||||
if (this.restartTimestamps.length >= this.options.maxRestartsPerWindow) {
|
||||
this.circuitOpenUntil = now + this.options.circuitOpenMs;
|
||||
this.suppressedTotal = this.incrementCounter(this.suppressedTotal);
|
||||
return {
|
||||
allow: false,
|
||||
reason: 'budget_exceeded',
|
||||
retryAfterMs: this.options.circuitOpenMs,
|
||||
};
|
||||
}
|
||||
|
||||
const requiredCooldown = this.getCooldownMs();
|
||||
if (this.lastRestartAt > 0) {
|
||||
const sinceLast = now - this.lastRestartAt;
|
||||
if (sinceLast < requiredCooldown) {
|
||||
this.suppressedTotal = this.incrementCounter(this.suppressedTotal);
|
||||
if (sinceLast < this.options.cooldownMs) {
|
||||
this.suppressedTotal = this.safeIncrement(this.suppressedTotal);
|
||||
return {
|
||||
allow: false,
|
||||
reason: 'cooldown_active',
|
||||
retryAfterMs: requiredCooldown - sinceLast,
|
||||
retryAfterMs: this.options.cooldownMs - sinceLast,
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -83,11 +64,8 @@ export class GatewayRestartGovernor {
|
||||
}
|
||||
|
||||
recordExecuted(now = Date.now()): void {
|
||||
this.executedTotal = this.incrementCounter(this.executedTotal);
|
||||
this.executedTotal = this.safeIncrement(this.executedTotal);
|
||||
this.lastRestartAt = now;
|
||||
this.consecutiveRestarts += 1;
|
||||
this.restartTimestamps.push(now);
|
||||
this.pruneOld(now);
|
||||
}
|
||||
|
||||
getCounters(): { executedTotal: number; suppressedTotal: number } {
|
||||
@@ -105,41 +83,12 @@ export class GatewayRestartGovernor {
|
||||
return {
|
||||
suppressed_total: this.suppressedTotal,
|
||||
executed_total: this.executedTotal,
|
||||
circuit_open_until: this.circuitOpenUntil,
|
||||
circuit_open_until: 0, // Always 0 — no circuit breaker
|
||||
};
|
||||
}
|
||||
|
||||
private getCooldownMs(): number {
|
||||
const factor = Math.pow(2, Math.max(0, this.consecutiveRestarts));
|
||||
return Math.min(this.options.baseCooldownMs * factor, this.options.maxCooldownMs);
|
||||
}
|
||||
|
||||
private maybeResetConsecutive(now: number): void {
|
||||
if (this.lastRunningAt <= 0) return;
|
||||
if (now - this.lastRunningAt >= this.options.stableResetMs) {
|
||||
this.consecutiveRestarts = 0;
|
||||
}
|
||||
}
|
||||
|
||||
private pruneOld(now: number): void {
|
||||
// Detect time rewind (system clock moved backwards) and clear all
|
||||
// time-based guard state to avoid stale lockouts.
|
||||
if (this.restartTimestamps.length > 0 && now < this.restartTimestamps[this.restartTimestamps.length - 1]) {
|
||||
this.restartTimestamps = [];
|
||||
this.circuitOpenUntil = 0;
|
||||
this.lastRestartAt = 0;
|
||||
this.lastRunningAt = 0;
|
||||
this.consecutiveRestarts = 0;
|
||||
return;
|
||||
}
|
||||
const threshold = now - this.options.windowMs;
|
||||
while (this.restartTimestamps.length > 0 && this.restartTimestamps[0] < threshold) {
|
||||
this.restartTimestamps.shift();
|
||||
}
|
||||
}
|
||||
|
||||
private incrementCounter(current: number): number {
|
||||
if (current >= GatewayRestartGovernor.MAX_COUNTER) return 0;
|
||||
private safeIncrement(current: number): number {
|
||||
if (current >= Number.MAX_SAFE_INTEGER) return 0;
|
||||
return current + 1;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,13 +9,13 @@ export interface ExistingGatewayInfo {
|
||||
|
||||
type StartupHooks = {
|
||||
port: number;
|
||||
ownedPid?: number;
|
||||
ownedPid?: never; // Removed: pid is now read dynamically in findExistingGateway to avoid stale-snapshot bug
|
||||
shouldWaitForPortFree: boolean;
|
||||
maxStartAttempts?: number;
|
||||
resetStartupStderrLines: () => void;
|
||||
getStartupStderrLines: () => string[];
|
||||
assertLifecycle: (phase: string) => void;
|
||||
findExistingGateway: (port: number, ownedPid?: number) => Promise<ExistingGatewayInfo | null>;
|
||||
findExistingGateway: (port: number) => Promise<ExistingGatewayInfo | null>;
|
||||
connect: (port: number, externalToken?: string) => Promise<void>;
|
||||
onConnectedToExistingGateway: () => void;
|
||||
waitForPortFree: (port: number) => Promise<void>;
|
||||
@@ -39,7 +39,7 @@ export async function runGatewayStartupSequence(hooks: StartupHooks): Promise<vo
|
||||
|
||||
try {
|
||||
logger.debug('Checking for existing Gateway...');
|
||||
const existing = await hooks.findExistingGateway(hooks.port, hooks.ownedPid);
|
||||
const existing = await hooks.findExistingGateway(hooks.port);
|
||||
hooks.assertLifecycle('start/find-existing');
|
||||
if (existing) {
|
||||
logger.debug(`Found existing Gateway on port ${existing.port}`);
|
||||
|
||||
@@ -18,6 +18,8 @@ const TRANSIENT_START_ERROR_PATTERNS: RegExp[] = [
|
||||
/Gateway process exited before becoming ready/i,
|
||||
/Timed out waiting for connect\.challenge/i,
|
||||
/Connect handshake timeout/i,
|
||||
// Port occupied after orphan kill: transient, worth retrying with backoff
|
||||
/Port \d+ still occupied after \d+ms/i,
|
||||
];
|
||||
|
||||
function normalizeLogLine(value: string): string {
|
||||
|
||||
@@ -156,7 +156,8 @@ export async function waitForPortFree(port: number, timeoutMs = 30000): Promise<
|
||||
await new Promise((resolve) => setTimeout(resolve, pollInterval));
|
||||
}
|
||||
|
||||
logger.warn(`Port ${port} still occupied after ${timeoutMs}ms, proceeding anyway`);
|
||||
logger.error(`Port ${port} still occupied after ${timeoutMs}ms; aborting startup to avoid port conflict`);
|
||||
throw new Error(`Port ${port} still occupied after ${timeoutMs}ms`);
|
||||
}
|
||||
|
||||
async function getListeningProcessIds(port: number): Promise<string[]> {
|
||||
@@ -256,15 +257,18 @@ export async function findExistingGatewayProcess(options: {
|
||||
|
||||
return await new Promise<{ port: number; externalToken?: string } | null>((resolve) => {
|
||||
const testWs = new WebSocket(`ws://localhost:${port}/ws`);
|
||||
const terminateAndResolve = (result: { port: number; externalToken?: string } | null) => {
|
||||
// terminate() avoids TIME_WAIT on Windows (vs close() which does WS handshake)
|
||||
try { testWs.terminate(); } catch { /* ignore */ }
|
||||
resolve(result);
|
||||
};
|
||||
const timeout = setTimeout(() => {
|
||||
testWs.close();
|
||||
resolve(null);
|
||||
terminateAndResolve(null);
|
||||
}, 2000);
|
||||
|
||||
testWs.on('open', () => {
|
||||
clearTimeout(timeout);
|
||||
testWs.close();
|
||||
resolve({ port });
|
||||
terminateAndResolve({ port });
|
||||
});
|
||||
|
||||
testWs.on('error', () => {
|
||||
|
||||
@@ -24,7 +24,10 @@ export async function probeGatewayReady(
|
||||
settled = true;
|
||||
clearTimeout(timeout);
|
||||
try {
|
||||
testWs.close();
|
||||
// Use terminate() (TCP RST) instead of close() (WS close handshake)
|
||||
// to avoid leaving TIME_WAIT connections on Windows. These probe
|
||||
// WebSockets are short-lived and don't need a graceful close.
|
||||
testWs.terminate();
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
@@ -171,7 +174,7 @@ export async function connectGatewaySocket(options: {
|
||||
getToken: () => Promise<string>;
|
||||
onHandshakeComplete: (ws: WebSocket) => void;
|
||||
onMessage: (message: unknown) => void;
|
||||
onCloseAfterHandshake: () => void;
|
||||
onCloseAfterHandshake: (code: number) => void;
|
||||
challengeTimeoutMs?: number;
|
||||
connectTimeoutMs?: number;
|
||||
}): Promise<WebSocket> {
|
||||
@@ -308,7 +311,7 @@ export async function connectGatewaySocket(options: {
|
||||
return;
|
||||
}
|
||||
cleanupHandshakeRequest();
|
||||
options.onCloseAfterHandshake();
|
||||
options.onCloseAfterHandshake(code);
|
||||
});
|
||||
|
||||
ws.on('error', (error) => {
|
||||
|
||||
Reference in New Issue
Block a user