fix(gateway): gateway start waiting (#120)
This commit is contained in:
@@ -220,23 +220,26 @@ export class GatewayManager extends EventEmitter {
|
|||||||
this.setStatus({ state: 'starting', reconnectAttempts: 0 });
|
this.setStatus({ state: 'starting', reconnectAttempts: 0 });
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Check if Python environment is ready (self-healing)
|
// Check if Python environment is ready (self-healing) asynchronously
|
||||||
const pythonReady = await isPythonReady();
|
void isPythonReady().then(pythonReady => {
|
||||||
if (!pythonReady) {
|
if (!pythonReady) {
|
||||||
logger.info('Python environment missing or incomplete, attempting background repair...');
|
logger.info('Python environment missing or incomplete, attempting background repair...');
|
||||||
// We don't await this to avoid blocking Gateway startup,
|
// We don't await this to avoid blocking Gateway startup,
|
||||||
// as uv run will handle it if needed, but this pre-warms it.
|
// as uv run will handle it if needed, but this pre-warms it.
|
||||||
void setupManagedPython().catch(err => {
|
void setupManagedPython().catch(err => {
|
||||||
logger.error('Background Python repair failed:', err);
|
logger.error('Background Python repair failed:', err);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
}).catch(err => {
|
||||||
|
logger.error('Failed to check Python environment:', err);
|
||||||
|
});
|
||||||
|
|
||||||
// Check if Gateway is already running
|
// Check if Gateway is already running
|
||||||
logger.debug('Checking for existing Gateway...');
|
logger.debug('Checking for existing Gateway...');
|
||||||
const existing = await this.findExistingGateway();
|
const existing = await this.findExistingGateway();
|
||||||
if (existing) {
|
if (existing) {
|
||||||
logger.debug(`Found existing Gateway on port ${existing.port}`);
|
logger.debug(`Found existing Gateway on port ${existing.port}`);
|
||||||
await this.connect(existing.port);
|
await this.connect(existing.port, existing.externalToken);
|
||||||
this.ownsProcess = false;
|
this.ownsProcess = false;
|
||||||
this.setStatus({ pid: undefined });
|
this.setStatus({ pid: undefined });
|
||||||
this.startHealthCheck();
|
this.startHealthCheck();
|
||||||
@@ -300,19 +303,39 @@ export class GatewayManager extends EventEmitter {
|
|||||||
// Kill process
|
// Kill process
|
||||||
if (this.process && this.ownsProcess) {
|
if (this.process && this.ownsProcess) {
|
||||||
const child = this.process;
|
const child = this.process;
|
||||||
logger.info(`Sending SIGTERM to Gateway (pid=${child.pid ?? 'unknown'})`);
|
|
||||||
child.kill('SIGTERM');
|
await new Promise<void>((resolve) => {
|
||||||
// Force kill after timeout
|
// If process already exited, resolve immediately
|
||||||
setTimeout(() => {
|
if (child.exitCode !== null || child.signalCode !== null) {
|
||||||
if (child.exitCode === null) {
|
return resolve();
|
||||||
logger.warn(`Gateway did not exit in time, sending SIGKILL (pid=${child.pid ?? 'unknown'})`);
|
|
||||||
child.kill('SIGKILL');
|
|
||||||
}
|
}
|
||||||
if (this.process === child) {
|
|
||||||
this.process = null;
|
logger.info(`Sending SIGTERM to Gateway (pid=${child.pid ?? 'unknown'})`);
|
||||||
}
|
child.kill('SIGTERM');
|
||||||
}, 5000);
|
|
||||||
this.process = null;
|
// Force kill after timeout
|
||||||
|
const timeout = setTimeout(() => {
|
||||||
|
if (child.exitCode === null && child.signalCode === null) {
|
||||||
|
logger.warn(`Gateway did not exit in time, sending SIGKILL (pid=${child.pid ?? 'unknown'})`);
|
||||||
|
child.kill('SIGKILL');
|
||||||
|
}
|
||||||
|
resolve();
|
||||||
|
}, 5000);
|
||||||
|
|
||||||
|
child.once('exit', () => {
|
||||||
|
clearTimeout(timeout);
|
||||||
|
resolve();
|
||||||
|
});
|
||||||
|
|
||||||
|
child.once('error', () => {
|
||||||
|
clearTimeout(timeout);
|
||||||
|
resolve();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
if (this.process === child) {
|
||||||
|
this.process = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
this.ownsProcess = false;
|
this.ownsProcess = false;
|
||||||
|
|
||||||
@@ -332,8 +355,6 @@ export class GatewayManager extends EventEmitter {
|
|||||||
async restart(): Promise<void> {
|
async restart(): Promise<void> {
|
||||||
logger.debug('Gateway restart requested');
|
logger.debug('Gateway restart requested');
|
||||||
await this.stop();
|
await this.stop();
|
||||||
// Brief delay before restart
|
|
||||||
await new Promise(resolve => setTimeout(resolve, 1000));
|
|
||||||
await this.start();
|
await this.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -445,11 +466,46 @@ export class GatewayManager extends EventEmitter {
|
|||||||
/**
|
/**
|
||||||
* Find existing Gateway process by attempting a WebSocket connection
|
* Find existing Gateway process by attempting a WebSocket connection
|
||||||
*/
|
*/
|
||||||
private async findExistingGateway(): Promise<{ port: number } | null> {
|
private async findExistingGateway(): Promise<{ port: number, externalToken?: string } | null> {
|
||||||
try {
|
try {
|
||||||
const port = PORTS.OPENCLAW_GATEWAY;
|
const port = PORTS.OPENCLAW_GATEWAY;
|
||||||
|
|
||||||
|
try {
|
||||||
|
const { stdout } = await new Promise<{ stdout: string }>((resolve) => {
|
||||||
|
import('child_process').then(cp => {
|
||||||
|
cp.exec(`lsof -i :${port} | grep LISTEN`, (err, stdout) => {
|
||||||
|
if (err) resolve({ stdout: '' });
|
||||||
|
else resolve({ stdout });
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
if (stdout.trim()) {
|
||||||
|
// A process is listening on the port
|
||||||
|
const pids = stdout.split('\n')
|
||||||
|
.map(line => line.trim().split(/\s+/)[1])
|
||||||
|
.filter(pid => pid && pid !== 'PID');
|
||||||
|
|
||||||
|
if (pids.length > 0) {
|
||||||
|
// Try to kill it if it's not us to avoid connection issues
|
||||||
|
// This happens frequently on HMR / dev reloads
|
||||||
|
if (!this.process || !pids.includes(String(this.process.pid))) {
|
||||||
|
logger.info(`Found orphaned process listening on port ${port} (PID: ${pids[0]}), attempting to kill...`);
|
||||||
|
for (const pid of pids) {
|
||||||
|
try { process.kill(parseInt(pid), 'SIGKILL'); } catch { /* ignore */ }
|
||||||
|
}
|
||||||
|
// Wait a moment for port to be released
|
||||||
|
await new Promise(r => setTimeout(r, 500));
|
||||||
|
return null; // Return null so we start a fresh instance
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
logger.debug('Error checking for existing process on port:', err);
|
||||||
|
}
|
||||||
|
|
||||||
// Try a quick WebSocket connection to check if gateway is listening
|
// Try a quick WebSocket connection to check if gateway is listening
|
||||||
return await new Promise<{ port: number } | null>((resolve) => {
|
return await new Promise<{ port: number, externalToken?: string } | null>((resolve) => {
|
||||||
const testWs = new WebSocket(`ws://localhost:${port}/ws`);
|
const testWs = new WebSocket(`ws://localhost:${port}/ws`);
|
||||||
const timeout = setTimeout(() => {
|
const timeout = setTimeout(() => {
|
||||||
testWs.close();
|
testWs.close();
|
||||||
@@ -675,12 +731,13 @@ export class GatewayManager extends EventEmitter {
|
|||||||
/**
|
/**
|
||||||
* Wait for Gateway to be ready by checking if the port is accepting connections
|
* Wait for Gateway to be ready by checking if the port is accepting connections
|
||||||
*/
|
*/
|
||||||
private async waitForReady(retries = 600, interval = 1000): Promise<void> {
|
private async waitForReady(retries = 2400, interval = 250): Promise<void> {
|
||||||
|
const child = this.process;
|
||||||
for (let i = 0; i < retries; i++) {
|
for (let i = 0; i < retries; i++) {
|
||||||
// Early exit if the gateway process has already exited
|
// Early exit if the gateway process has already exited
|
||||||
if (this.process && (this.process.exitCode !== null || this.process.signalCode !== null)) {
|
if (child && (child.exitCode !== null || child.signalCode !== null)) {
|
||||||
const code = this.process.exitCode;
|
const code = child.exitCode;
|
||||||
const signal = this.process.signalCode;
|
const signal = child.signalCode;
|
||||||
logger.error(`Gateway process exited before ready (${this.formatExit(code, signal)})`);
|
logger.error(`Gateway process exited before ready (${this.formatExit(code, signal)})`);
|
||||||
throw new Error(`Gateway process exited before becoming ready (${this.formatExit(code, signal)})`);
|
throw new Error(`Gateway process exited before becoming ready (${this.formatExit(code, signal)})`);
|
||||||
}
|
}
|
||||||
@@ -727,9 +784,7 @@ export class GatewayManager extends EventEmitter {
|
|||||||
/**
|
/**
|
||||||
* Connect WebSocket to Gateway
|
* Connect WebSocket to Gateway
|
||||||
*/
|
*/
|
||||||
private async connect(port: number): Promise<void> {
|
private async connect(port: number, _externalToken?: string): Promise<void> {
|
||||||
// Get token for WebSocket authentication
|
|
||||||
const gatewayToken = await getSetting('gatewayToken');
|
|
||||||
logger.debug(`Connecting Gateway WebSocket (ws://localhost:${port}/ws)`);
|
logger.debug(`Connecting Gateway WebSocket (ws://localhost:${port}/ws)`);
|
||||||
|
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
@@ -774,6 +829,9 @@ export class GatewayManager extends EventEmitter {
|
|||||||
this.ws.on('open', async () => {
|
this.ws.on('open', async () => {
|
||||||
logger.debug('Gateway WebSocket opened, sending connect handshake');
|
logger.debug('Gateway WebSocket opened, sending connect handshake');
|
||||||
|
|
||||||
|
// Re-fetch token here before generating payload just in case it updated while connecting
|
||||||
|
const currentToken = await getSetting('gatewayToken');
|
||||||
|
|
||||||
// Send proper connect handshake as required by OpenClaw Gateway protocol
|
// Send proper connect handshake as required by OpenClaw Gateway protocol
|
||||||
// The Gateway expects: { type: "req", id: "...", method: "connect", params: ConnectParams }
|
// The Gateway expects: { type: "req", id: "...", method: "connect", params: ConnectParams }
|
||||||
// Since 2026.2.15, scopes are only granted when a signed device identity is included.
|
// Since 2026.2.15, scopes are only granted when a signed device identity is included.
|
||||||
@@ -786,6 +844,7 @@ export class GatewayManager extends EventEmitter {
|
|||||||
|
|
||||||
const device = (() => {
|
const device = (() => {
|
||||||
if (!this.deviceIdentity) return undefined;
|
if (!this.deviceIdentity) return undefined;
|
||||||
|
|
||||||
const payload = buildDeviceAuthPayload({
|
const payload = buildDeviceAuthPayload({
|
||||||
deviceId: this.deviceIdentity.deviceId,
|
deviceId: this.deviceIdentity.deviceId,
|
||||||
clientId,
|
clientId,
|
||||||
@@ -793,7 +852,7 @@ export class GatewayManager extends EventEmitter {
|
|||||||
role,
|
role,
|
||||||
scopes,
|
scopes,
|
||||||
signedAtMs,
|
signedAtMs,
|
||||||
token: gatewayToken ?? null,
|
token: currentToken ?? null,
|
||||||
});
|
});
|
||||||
const signature = signDevicePayload(this.deviceIdentity.privateKeyPem, payload);
|
const signature = signDevicePayload(this.deviceIdentity.privateKeyPem, payload);
|
||||||
return {
|
return {
|
||||||
@@ -819,7 +878,7 @@ export class GatewayManager extends EventEmitter {
|
|||||||
mode: clientMode,
|
mode: clientMode,
|
||||||
},
|
},
|
||||||
auth: {
|
auth: {
|
||||||
token: gatewayToken,
|
token: currentToken,
|
||||||
},
|
},
|
||||||
caps: [],
|
caps: [],
|
||||||
role,
|
role,
|
||||||
@@ -1061,7 +1120,7 @@ export class GatewayManager extends EventEmitter {
|
|||||||
// Try to find existing Gateway first
|
// Try to find existing Gateway first
|
||||||
const existing = await this.findExistingGateway();
|
const existing = await this.findExistingGateway();
|
||||||
if (existing) {
|
if (existing) {
|
||||||
await this.connect(existing.port);
|
await this.connect(existing.port, existing.externalToken);
|
||||||
this.ownsProcess = false;
|
this.ownsProcess = false;
|
||||||
this.setStatus({ pid: undefined });
|
this.setStatus({ pid: undefined });
|
||||||
this.reconnectAttempts = 0;
|
this.reconnectAttempts = 0;
|
||||||
|
|||||||
@@ -172,6 +172,24 @@ export function saveChannelConfig(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Special handling for Feishu: default to open DM policy with wildcard allowlist
|
||||||
|
if (channelType === 'feishu') {
|
||||||
|
const existingConfig = currentConfig.channels[channelType] || {};
|
||||||
|
transformedConfig.dmPolicy = transformedConfig.dmPolicy ?? existingConfig.dmPolicy ?? 'open';
|
||||||
|
|
||||||
|
let allowFrom = transformedConfig.allowFrom ?? existingConfig.allowFrom ?? ['*'];
|
||||||
|
if (!Array.isArray(allowFrom)) {
|
||||||
|
allowFrom = [allowFrom];
|
||||||
|
}
|
||||||
|
|
||||||
|
// If dmPolicy is open, OpenClaw schema requires '*' in allowFrom
|
||||||
|
if (transformedConfig.dmPolicy === 'open' && !allowFrom.includes('*')) {
|
||||||
|
allowFrom = [...allowFrom, '*'];
|
||||||
|
}
|
||||||
|
|
||||||
|
transformedConfig.allowFrom = allowFrom;
|
||||||
|
}
|
||||||
|
|
||||||
// Merge with existing config
|
// Merge with existing config
|
||||||
currentConfig.channels[channelType] = {
|
currentConfig.channels[channelType] = {
|
||||||
...currentConfig.channels[channelType],
|
...currentConfig.channels[channelType],
|
||||||
|
|||||||
Reference in New Issue
Block a user