diff --git a/electron/api/routes/channels.ts b/electron/api/routes/channels.ts index 4405be241..0b3c06d0d 100644 --- a/electron/api/routes/channels.ts +++ b/electron/api/routes/channels.ts @@ -64,6 +64,7 @@ import { normalizeSlackMessagingTarget, normalizeWhatsAppMessagingTarget, } from '../../utils/openclaw-sdk'; +import { logger } from '../../utils/logger'; // listWhatsAppDirectory*FromConfig were removed from openclaw's public exports // in 2026.3.23-1. No-op stubs; WhatsApp target picker uses session discovery. @@ -263,11 +264,11 @@ function scheduleGatewayChannelSaveRefresh( return; } if (FORCE_RESTART_CHANNELS.has(storedChannelType)) { - ctx.gatewayManager.debouncedRestart(); + ctx.gatewayManager.debouncedRestart(150); void reason; return; } - ctx.gatewayManager.debouncedReload(); + ctx.gatewayManager.debouncedReload(150); void reason; } @@ -416,6 +417,28 @@ interface ChannelAccountsView { accounts: ChannelAccountView[]; } +function buildGatewayStatusSnapshot(status: GatewayChannelStatusPayload | null): string { + if (!status?.channelAccounts) return 'none'; + const entries = Object.entries(status.channelAccounts); + if (entries.length === 0) return 'empty'; + return entries + .slice(0, 12) + .map(([channelType, accounts]) => { + const channelStatus = pickChannelRuntimeStatus(accounts); + const flags = accounts.slice(0, 4).map((account) => { + const accountId = typeof account.accountId === 'string' ? account.accountId : 'default'; + const connected = account.connected === true ? '1' : '0'; + const running = account.running === true ? '1' : '0'; + const linked = account.linked === true ? '1' : '0'; + const probeOk = account.probe?.ok === true ? '1' : '0'; + const hasErr = typeof account.lastError === 'string' && account.lastError.trim().length > 0 ? '1' : '0'; + return `${accountId}[c${connected}r${running}l${linked}p${probeOk}e${hasErr}]`; + }).join('|'); + return `${channelType}:${channelStatus}{${flags}}`; + }) + .join(', '); +} + function shouldIncludeRuntimeAccountId( accountId: string, configuredAccountIds: Set, @@ -458,7 +481,11 @@ const CHANNEL_TARGET_CACHE_TTL_MS = 60_000; const CHANNEL_TARGET_CACHE_ENABLED = process.env.VITEST !== 'true'; const channelTargetCache = new Map(); -async function buildChannelAccountsView(ctx: HostApiContext): Promise { +async function buildChannelAccountsView( + ctx: HostApiContext, + options?: { probe?: boolean }, +): Promise { + const startedAt = Date.now(); // Read config once and share across all sub-calls (was 5 readFile calls before). const openClawConfig = await readOpenClawConfig(); @@ -470,11 +497,24 @@ async function buildChannelAccountsView(ctx: HostApiContext): Promise('channels.status', { probe: false }, 8000); + const rpcStartedAt = Date.now(); + gatewayStatus = await ctx.gatewayManager.rpc( + 'channels.status', + { probe }, + probe ? 5000 : 8000, + ); + logger.info( + `[channels.accounts] channels.status probe=${probe ? '1' : '0'} elapsedMs=${Date.now() - rpcStartedAt} snapshot=${buildGatewayStatusSnapshot(gatewayStatus)}` + ); } catch { + const probe = options?.probe === true; + logger.warn( + `[channels.accounts] channels.status probe=${probe ? '1' : '0'} failed after ${Date.now() - startedAt}ms` + ); gatewayStatus = null; } @@ -553,7 +593,11 @@ async function buildChannelAccountsView(ctx: HostApiContext): Promise left.channelType.localeCompare(right.channelType)); + const sorted = channels.sort((left, right) => left.channelType.localeCompare(right.channelType)); + logger.info( + `[channels.accounts] response probe=${options?.probe === true ? '1' : '0'} elapsedMs=${Date.now() - startedAt} view=${sorted.map((item) => `${item.channelType}:${item.status}`).join(',')}` + ); + return sorted; } function buildChannelTargetLabel(baseLabel: string, value: string): string { @@ -1147,7 +1191,9 @@ export async function handleChannelRoutes( if (url.pathname === '/api/channels/accounts' && req.method === 'GET') { try { - const channels = await buildChannelAccountsView(ctx); + const probe = url.searchParams.get('probe') === '1'; + logger.info(`[channels.accounts] request probe=${probe ? '1' : '0'}`); + const channels = await buildChannelAccountsView(ctx, { probe }); sendJson(res, 200, { success: true, channels }); } catch (error) { sendJson(res, 500, { success: false, error: String(error) }); diff --git a/electron/gateway/config-sync.ts b/electron/gateway/config-sync.ts index 8d1b1428c..e6bdc574f 100644 --- a/electron/gateway/config-sync.ts +++ b/electron/gateway/config-sync.ts @@ -20,8 +20,8 @@ import { getApiKey, getDefaultProvider, getProvider } from '../utils/secure-stor import { getProviderEnvVar, getKeyableProviderTypes } from '../utils/provider-registry'; import { getOpenClawDir, getOpenClawEntryPath, isOpenClawPresent } from '../utils/paths'; import { getUvMirrorEnv } from '../utils/uv-env'; -import { cleanupDanglingWeChatPluginState, listConfiguredChannels, readOpenClawConfig } from '../utils/channel-config'; -import { syncGatewayTokenToConfig, syncBrowserConfigToOpenClaw, syncSessionIdleMinutesToOpenClaw, sanitizeOpenClawConfig } from '../utils/openclaw-auth'; +import { cleanupDanglingWeChatPluginState, listConfiguredChannelsFromConfig, readOpenClawConfig } from '../utils/channel-config'; +import { sanitizeOpenClawConfig, batchSyncConfigFields } from '../utils/openclaw-auth'; import { buildProxyEnv, resolveProxySettings } from '../utils/proxy'; import { syncProxyConfigToOpenClaw } from '../utils/openclaw-proxy'; import { logger } from '../utils/logger'; @@ -180,7 +180,20 @@ function ensureConfiguredPluginsUpgraded(configuredChannels: string[]): void { * resolution algorithm find them. Skip-if-exists avoids overwriting * openclaw's own deps (they take priority). */ +let _extensionDepsLinked = false; + +/** + * Reset the extension-deps-linked cache so the next + * ensureExtensionDepsResolvable() call re-scans and links. + * Called before each Gateway launch to pick up newly installed extensions. + */ +export function resetExtensionDepsLinked(): void { + _extensionDepsLinked = false; +} + function ensureExtensionDepsResolvable(openclawDir: string): void { + if (_extensionDepsLinked) return; + const extDir = join(openclawDir, 'dist', 'extensions'); const topNM = join(openclawDir, 'node_modules'); let linkedCount = 0; @@ -229,6 +242,8 @@ function ensureExtensionDepsResolvable(openclawDir: string): void { if (linkedCount > 0) { logger.info(`[extension-deps] Linked ${linkedCount} extension packages into ${topNM}`); } + + _extensionDepsLinked = true; } // ── Pre-launch sync ────────────────────────────────────────────── @@ -236,6 +251,11 @@ function ensureExtensionDepsResolvable(openclawDir: string): void { export async function syncGatewayConfigBeforeLaunch( appSettings: Awaited>, ): Promise { + // Reset the extension-deps cache so that newly installed extensions + // (e.g. user added a channel while the app was running) get their + // node_modules linked on the next Gateway spawn. + resetExtensionDepsLinked(); + await syncProxyConfigToOpenClaw(appSettings, { preserveExistingWhenDisabled: true }); try { @@ -260,21 +280,20 @@ export async function syncGatewayConfigBeforeLaunch( // Auto-upgrade installed plugins before Gateway starts so that // the plugin manifest ID matches what sanitize wrote to the config. + // Read config once and reuse for both listConfiguredChannels and plugins.allow. try { - const configuredChannels = await listConfiguredChannels(); + const rawCfg = await readOpenClawConfig(); + const configuredChannels = await listConfiguredChannelsFromConfig(rawCfg); // Also ensure plugins referenced in plugins.allow are installed even if // they have no channels.X section yet (e.g. qqbot added via plugins.allow // but never fully saved through ClawX UI). try { - const rawCfg = await readOpenClawConfig(); const allowList = Array.isArray(rawCfg.plugins?.allow) ? (rawCfg.plugins!.allow as string[]) : []; - // Build reverse maps: dirName → channelType AND known manifest IDs → channelType const pluginIdToChannel: Record = {}; for (const [channelType, info] of Object.entries(CHANNEL_PLUGIN_MAP)) { pluginIdToChannel[info.dirName] = channelType; } - // Known manifest IDs that differ from their dirName/channelType pluginIdToChannel['openclaw-lark'] = 'feishu'; pluginIdToChannel['feishu-openclaw-plugin'] = 'feishu'; @@ -295,22 +314,11 @@ export async function syncGatewayConfigBeforeLaunch( logger.warn('Failed to auto-upgrade plugins:', err); } + // Batch gateway token, browser config, and session idle into one read+write cycle. try { - await syncGatewayTokenToConfig(appSettings.gatewayToken); + await batchSyncConfigFields(appSettings.gatewayToken); } catch (err) { - logger.warn('Failed to sync gateway token to openclaw.json:', err); - } - - try { - await syncBrowserConfigToOpenClaw(); - } catch (err) { - logger.warn('Failed to sync browser config to openclaw.json:', err); - } - - try { - await syncSessionIdleMinutesToOpenClaw(); - } catch (err) { - logger.warn('Failed to sync session idle minutes to openclaw.json:', err); + logger.warn('Failed to batch-sync config fields to openclaw.json:', err); } } @@ -360,7 +368,8 @@ async function resolveChannelStartupPolicy(): Promise<{ channelStartupSummary: string; }> { try { - const configuredChannels = await listConfiguredChannels(); + const rawCfg = await readOpenClawConfig(); + const configuredChannels = await listConfiguredChannelsFromConfig(rawCfg); if (configuredChannels.length === 0) { return { skipChannels: true, diff --git a/electron/gateway/event-dispatch.ts b/electron/gateway/event-dispatch.ts index 6e9427265..fedd34c64 100644 --- a/electron/gateway/event-dispatch.ts +++ b/electron/gateway/event-dispatch.ts @@ -23,8 +23,13 @@ export function dispatchProtocolEvent( break; } case 'channel.status': + case 'channel.status_changed': emitter.emit('channel:status', payload as { channelId: string; status: string }); break; + case 'gateway.ready': + case 'ready': + emitter.emit('gateway:ready', payload); + break; default: emitter.emit('notification', { method: event, params: payload }); } diff --git a/electron/gateway/manager.ts b/electron/gateway/manager.ts index 9ca7384ef..918813d98 100644 --- a/electron/gateway/manager.ts +++ b/electron/gateway/manager.ts @@ -61,6 +61,8 @@ export interface GatewayStatus { connectedAt?: number; version?: string; reconnectAttempts?: number; + /** True once the gateway's internal subsystems (skills, plugins) are ready for RPC calls. */ + gatewayReady?: boolean; } /** @@ -119,9 +121,11 @@ export class GatewayManager extends EventEmitter { private static readonly HEARTBEAT_TIMEOUT_MS_WIN = 25_000; private static readonly HEARTBEAT_MAX_MISSES_WIN = 5; public static readonly RESTART_COOLDOWN_MS = 5_000; + private static readonly GATEWAY_READY_FALLBACK_MS = 30_000; private lastRestartAt = 0; /** Set by scheduleReconnect() before calling start() to signal auto-reconnect. */ private isAutoReconnectStart = false; + private gatewayReadyFallbackTimer: NodeJS.Timeout | null = null; constructor(config?: Partial) { super(); @@ -152,6 +156,14 @@ export class GatewayManager extends EventEmitter { this.reconnectConfig = { ...DEFAULT_RECONNECT_CONFIG, ...config }; // Device identity is loaded lazily in start() — not in the constructor — // so that async file I/O and key generation don't block module loading. + + this.on('gateway:ready', () => { + this.clearGatewayReadyFallback(); + if (this.status.state === 'running' && !this.status.gatewayReady) { + logger.info('Gateway subsystems ready (event received)'); + this.setStatus({ gatewayReady: true }); + } + }); } private async initDeviceIdentity(): Promise { @@ -231,12 +243,16 @@ export class GatewayManager extends EventEmitter { this.reconnectAttempts = 0; } this.isAutoReconnectStart = false; // consume the flag - this.setStatus({ state: 'starting', reconnectAttempts: this.reconnectAttempts }); + this.setStatus({ state: 'starting', reconnectAttempts: this.reconnectAttempts, gatewayReady: false }); // Check if Python environment is ready (self-healing) asynchronously. // Fire-and-forget: only needs to run once, not on every retry. warmupManagedPythonReadiness(); + const t0 = Date.now(); + let tSpawned = 0; + let tReady = 0; + try { await runGatewayStartupSequence({ port: this.status.port, @@ -262,7 +278,6 @@ export class GatewayManager extends EventEmitter { await this.connect(port, externalToken); }, onConnectedToExistingGateway: () => { - // If the existing gateway is actually our own spawned UtilityProcess // (e.g. after a self-restart code=1012), keep ownership so that // stop() can still terminate the process during a restart() cycle. @@ -288,16 +303,24 @@ export class GatewayManager extends EventEmitter { }, startProcess: async () => { await this.startProcess(); + tSpawned = Date.now(); }, waitForReady: async (port) => { await waitForGatewayReady({ port, getProcessExitCode: () => this.processExitCode, }); + tReady = Date.now(); }, onConnectedToManagedGateway: () => { this.startHealthCheck(); - logger.debug('Gateway started successfully'); + const tConnected = Date.now(); + logger.info('[metric] gateway.startup', { + configSyncMs: tSpawned ? tSpawned - t0 : undefined, + spawnToReadyMs: tReady && tSpawned ? tReady - tSpawned : undefined, + readyToConnectMs: tReady ? tConnected - tReady : undefined, + totalMs: tConnected - t0, + }); }, runDoctorRepair: async () => await runOpenClawDoctorRepair(), onDoctorRepairSuccess: () => { @@ -390,7 +413,7 @@ export class GatewayManager extends EventEmitter { this.restartController.resetDeferredRestart(); this.isAutoReconnectStart = false; - this.setStatus({ state: 'stopped', error: undefined, pid: undefined, connectedAt: undefined, uptime: undefined }); + this.setStatus({ state: 'stopped', error: undefined, pid: undefined, connectedAt: undefined, uptime: undefined, gatewayReady: undefined }); } /** @@ -663,6 +686,25 @@ export class GatewayManager extends EventEmitter { clearTimeout(this.reloadDebounceTimer); this.reloadDebounceTimer = null; } + this.clearGatewayReadyFallback(); + } + + private clearGatewayReadyFallback(): void { + if (this.gatewayReadyFallbackTimer) { + clearTimeout(this.gatewayReadyFallbackTimer); + this.gatewayReadyFallbackTimer = null; + } + } + + private scheduleGatewayReadyFallback(): void { + this.clearGatewayReadyFallback(); + this.gatewayReadyFallbackTimer = setTimeout(() => { + this.gatewayReadyFallbackTimer = null; + if (this.status.state === 'running' && !this.status.gatewayReady) { + logger.info('Gateway ready fallback triggered (no gateway.ready event within timeout)'); + this.setStatus({ gatewayReady: true }); + } + }, GatewayManager.GATEWAY_READY_FALLBACK_MS); } /** @@ -843,6 +885,7 @@ export class GatewayManager extends EventEmitter { connectedAt: Date.now(), }); this.startPing(); + this.scheduleGatewayReadyFallback(); }, onMessage: (message) => { this.handleMessage(message); diff --git a/electron/main/ipc-handlers.ts b/electron/main/ipc-handlers.ts index e988ab5e7..00937d4ce 100644 --- a/electron/main/ipc-handlers.ts +++ b/electron/main/ipc-handlers.ts @@ -1452,7 +1452,7 @@ function registerOpenClawHandlers(gatewayManager: GatewayManager): void { const scheduleGatewayChannelRestart = (reason: string): void => { if (gatewayManager.getStatus().state !== 'stopped') { logger.info(`Scheduling Gateway restart after ${reason}`); - gatewayManager.debouncedRestart(); + gatewayManager.debouncedRestart(150); } else { logger.info(`Gateway is stopped; skip immediate restart after ${reason}`); } @@ -1465,11 +1465,11 @@ function registerOpenClawHandlers(gatewayManager: GatewayManager): void { } if (forceRestartChannels.has(channelType)) { logger.info(`Scheduling Gateway restart after ${reason}`); - gatewayManager.debouncedRestart(); + gatewayManager.debouncedRestart(150); return; } logger.info(`Scheduling Gateway reload after ${reason}`); - gatewayManager.debouncedReload(); + gatewayManager.debouncedReload(150); }; // Get OpenClaw package status diff --git a/electron/utils/openclaw-auth.ts b/electron/utils/openclaw-auth.ts index b8f087c52..237144a19 100644 --- a/electron/utils/openclaw-auth.ts +++ b/electron/utils/openclaw-auth.ts @@ -1235,6 +1235,89 @@ export async function syncSessionIdleMinutesToOpenClaw(): Promise { }); } +/** + * Batch-apply gateway token, browser config, and session idle minutes in a + * single config lock + read + write cycle. Replaces three separate + * withConfigLock calls during pre-launch sync. + */ +export async function batchSyncConfigFields(token: string): Promise { + const DEFAULT_IDLE_MINUTES = 10_080; // 7 days + + return withConfigLock(async () => { + const config = await readOpenClawJson(); + let modified = true; + + // ── Gateway token + controlUi ── + const gateway = ( + config.gateway && typeof config.gateway === 'object' + ? { ...(config.gateway as Record) } + : {} + ) as Record; + + const auth = ( + gateway.auth && typeof gateway.auth === 'object' + ? { ...(gateway.auth as Record) } + : {} + ) as Record; + auth.mode = 'token'; + auth.token = token; + gateway.auth = auth; + + const controlUi = ( + gateway.controlUi && typeof gateway.controlUi === 'object' + ? { ...(gateway.controlUi as Record) } + : {} + ) as Record; + const allowedOrigins = Array.isArray(controlUi.allowedOrigins) + ? (controlUi.allowedOrigins as unknown[]).filter((v): v is string => typeof v === 'string') + : []; + if (!allowedOrigins.includes('file://')) { + controlUi.allowedOrigins = [...allowedOrigins, 'file://']; + } + gateway.controlUi = controlUi; + if (!gateway.mode) gateway.mode = 'local'; + config.gateway = gateway; + + // ── Browser config ── + const browser = ( + config.browser && typeof config.browser === 'object' + ? { ...(config.browser as Record) } + : {} + ) as Record; + if (browser.enabled === undefined) { + browser.enabled = true; + config.browser = browser; + modified = true; + } + if (browser.defaultProfile === undefined) { + browser.defaultProfile = 'openclaw'; + config.browser = browser; + modified = true; + } + + // ── Session idle minutes ── + const session = ( + config.session && typeof config.session === 'object' + ? { ...(config.session as Record) } + : {} + ) as Record; + const hasExplicitSessionConfig = session.idleMinutes !== undefined + || session.reset !== undefined + || session.resetByType !== undefined + || session.resetByChannel !== undefined; + if (!hasExplicitSessionConfig) { + session.idleMinutes = DEFAULT_IDLE_MINUTES; + config.session = session; + modified = true; + } + + if (modified) { + await writeOpenClawJson(config); + console.log('Synced gateway token, browser config, and session idle to openclaw.json'); + } + }); +} + /** * Update a provider entry in every discovered agent's models.json. */ @@ -1670,9 +1753,10 @@ export async function sanitizeOpenClawConfig(): Promise { // that conflicts with the official @larksuite/openclaw-lark plugin // (id: 'openclaw-lark'). When the canonical feishu plugin is NOT the // built-in 'feishu' itself, we must: - // 1. Remove bare 'feishu' from plugins.allow - // 2. Always set plugins.entries.feishu = { enabled: false } to explicitly - // disable the built-in — it loads automatically unless disabled. + // 1. Remove bare 'feishu' from plugins.allow (already done above at line ~1648) + // 2. Delete plugins.entries.feishu entirely — keeping it with enabled:false + // causes the Gateway to report the feishu channel as "disabled". + // Since 'feishu' is not in plugins.allow, the built-in won't load. const allowArr2 = Array.isArray(pluginsObj.allow) ? pluginsObj.allow as string[] : []; const hasCanonicalFeishu = allowArr2.includes(canonicalFeishuId) || !!pEntries[canonicalFeishuId]; if (hasCanonicalFeishu && canonicalFeishuId !== 'feishu') { @@ -1683,11 +1767,13 @@ export async function sanitizeOpenClawConfig(): Promise { console.log('[sanitize] Removed bare "feishu" from plugins.allow (openclaw-lark plugin is configured)'); modified = true; } - // Always ensure the built-in feishu plugin is explicitly disabled. - // Built-in extensions load automatically unless plugins.entries..enabled = false. - if (!pEntries.feishu || pEntries.feishu.enabled !== false) { - pEntries.feishu = { ...(pEntries.feishu || {}), enabled: false }; - console.log('[sanitize] Disabled built-in feishu plugin (openclaw-lark plugin is configured)'); + // Delete the built-in feishu entry entirely instead of setting enabled:false. + // Setting enabled:false causes the Gateway to report the channel as "disabled" + // which shows as an error in the UI. Since 'feishu' is removed from + // plugins.allow above, the built-in extension won't auto-load. + if (pEntries.feishu) { + delete pEntries.feishu; + console.log('[sanitize] Removed built-in feishu plugin entry (openclaw-lark plugin is configured)'); modified = true; } } diff --git a/src/components/layout/Sidebar.tsx b/src/components/layout/Sidebar.tsx index 70cc81535..9df1d3739 100644 --- a/src/components/layout/Sidebar.tsx +++ b/src/components/layout/Sidebar.tsx @@ -128,9 +128,10 @@ export function Sidebar() { const gatewayStatus = useGatewayStore((s) => s.status); const isGatewayRunning = gatewayStatus.state === 'running'; + const isGatewayReady = isGatewayRunning && gatewayStatus.gatewayReady !== false; useEffect(() => { - if (!isGatewayRunning) return; + if (!isGatewayReady) return; let cancelled = false; const hasExistingMessages = useChatStore.getState().messages.length > 0; (async () => { @@ -141,7 +142,7 @@ export function Sidebar() { return () => { cancelled = true; }; - }, [isGatewayRunning, loadHistory, loadSessions]); + }, [isGatewayReady, loadHistory, loadSessions]); const agents = useAgentsStore((s) => s.agents); const fetchAgents = useAgentsStore((s) => s.fetchAgents); diff --git a/src/pages/Channels/index.tsx b/src/pages/Channels/index.tsx index 1f4176815..fd3b80026 100644 --- a/src/pages/Channels/index.tsx +++ b/src/pages/Channels/index.tsx @@ -89,6 +89,9 @@ export function Channels() { const [existingAccountIdsForModal, setExistingAccountIdsForModal] = useState([]); const [initialConfigValuesForModal, setInitialConfigValuesForModal] = useState | undefined>(undefined); const [deleteTarget, setDeleteTarget] = useState(null); + const convergenceRefreshTimersRef = useRef([]); + const fetchInFlightRef = useRef(false); + const queuedFetchOptionsRef = useRef<{ probe?: boolean } | null>(null); const displayedChannelTypes = getPrimaryChannels(); const visibleChannelGroups = channelGroups; @@ -104,7 +107,24 @@ export function Channels() { const agentsRef = useRef(agents); agentsRef.current = agents; - const fetchPageData = useCallback(async () => { + const mergeFetchOptions = ( + base: { probe?: boolean } | null, + incoming: { probe?: boolean } | undefined, + ): { probe?: boolean } => { + return { + probe: Boolean(base?.probe) || Boolean(incoming?.probe), + }; + }; + + const fetchPageData = useCallback(async (options?: { probe?: boolean }) => { + if (fetchInFlightRef.current) { + queuedFetchOptionsRef.current = mergeFetchOptions(queuedFetchOptionsRef.current, options); + return; + } + fetchInFlightRef.current = true; + const startedAt = Date.now(); + const probe = options?.probe === true; + console.info(`[channels-ui] fetch start probe=${probe ? '1' : '0'}`); // Only show loading spinner on first load (stale-while-revalidate). const hasData = channelGroupsRef.current.length > 0 || agentsRef.current.length > 0; if (!hasData) { @@ -113,7 +133,9 @@ export function Channels() { setError(null); try { const [channelsRes, agentsRes] = await Promise.all([ - hostApiFetch<{ success: boolean; channels?: ChannelGroupItem[]; error?: string }>('/api/channels/accounts'), + hostApiFetch<{ success: boolean; channels?: ChannelGroupItem[]; error?: string }>( + options?.probe ? '/api/channels/accounts?probe=1' : '/api/channels/accounts' + ), hostApiFetch<{ success: boolean; agents?: AgentItem[]; error?: string }>('/api/agents'), ]); @@ -127,20 +149,64 @@ export function Channels() { setChannelGroups(channelsRes.channels || []); setAgents(agentsRes.agents || []); + console.info( + `[channels-ui] fetch ok probe=${probe ? '1' : '0'} elapsedMs=${Date.now() - startedAt} view=${(channelsRes.channels || []).map((item) => `${item.channelType}:${item.status}`).join(',')}` + ); } catch (fetchError) { // Preserve previous data on error — don't clear channelGroups/agents. setError(String(fetchError)); + console.warn( + `[channels-ui] fetch fail probe=${probe ? '1' : '0'} elapsedMs=${Date.now() - startedAt} error=${String(fetchError)}` + ); } finally { + fetchInFlightRef.current = false; setLoading(false); + const queued = queuedFetchOptionsRef.current; + if (queued) { + queuedFetchOptionsRef.current = null; + void fetchPageData(queued); + } } // Stable reference — reads state via refs, no deps needed. }, []); + const clearConvergenceRefreshTimers = useCallback(() => { + convergenceRefreshTimersRef.current.forEach((timerId) => { + window.clearTimeout(timerId); + }); + convergenceRefreshTimersRef.current = []; + }, []); + + const scheduleConvergenceRefresh = useCallback(() => { + clearConvergenceRefreshTimers(); + // Channel adapters can take time to reconnect after gateway restart. + // First few rounds use probe=true to force runtime connectivity checks, + // then fall back to cached pulls to reduce load. + [ + { delay: 1200, probe: true }, + { delay: 2600, probe: false }, + { delay: 4500, probe: false }, + { delay: 7000, probe: false }, + { delay: 10500, probe: false }, + ].forEach(({ delay, probe }) => { + const timerId = window.setTimeout(() => { + void fetchPageData({ probe }); + }, delay); + convergenceRefreshTimersRef.current.push(timerId); + }); + }, [clearConvergenceRefreshTimers, fetchPageData]); + useEffect(() => { void fetchPageData(); }, [fetchPageData]); + useEffect(() => { + return () => { + clearConvergenceRefreshTimers(); + }; + }, [clearConvergenceRefreshTimers]); + useEffect(() => { // Throttle channel-status events to avoid flooding fetchPageData during AI tasks. let throttleTimer: ReturnType | null = null; @@ -176,8 +242,9 @@ export function Channels() { if (previousGatewayState !== 'running' && gatewayStatus.state === 'running') { void fetchPageData(); + scheduleConvergenceRefresh(); } - }, [fetchPageData, gatewayStatus.state]); + }, [fetchPageData, gatewayStatus.state, scheduleConvergenceRefresh]); const configuredTypes = useMemo( () => visibleChannelGroups.map((group) => group.channelType), @@ -199,7 +266,7 @@ export function Channels() { const unsupportedGroups = displayedChannelTypes.filter((type) => !configuredTypes.includes(type)); const handleRefresh = () => { - void fetchPageData(); + void fetchPageData({ probe: true }); }; const handleBindAgent = async (channelType: string, accountId: string, agentId: string) => { @@ -525,7 +592,8 @@ export function Channels() { setInitialConfigValuesForModal(undefined); }} onChannelSaved={async () => { - await fetchPageData(); + await fetchPageData({ probe: true }); + scheduleConvergenceRefresh(); setShowConfigModal(false); setSelectedChannelType(null); setSelectedAccountId(undefined); diff --git a/src/stores/chat/session-actions.ts b/src/stores/chat/session-actions.ts index 40f42071f..9e994fe52 100644 --- a/src/stores/chat/session-actions.ts +++ b/src/stores/chat/session-actions.ts @@ -1,9 +1,10 @@ import { invokeIpc } from '@/lib/api-client'; import { getCanonicalPrefixFromSessions, getMessageText, toMs } from './helpers'; -import { classifyHistoryStartupRetryError, sleep } from './history-startup-retry'; import { DEFAULT_CANONICAL_PREFIX, DEFAULT_SESSION_KEY, type ChatSession, type RawMessage } from './types'; import type { ChatGet, ChatSet, SessionHistoryActions } from './store-api'; +const LABEL_FETCH_CONCURRENCY = 5; + function getAgentIdFromSessionKey(sessionKey: string): string { if (!sessionKey.startsWith('agent:')) return 'main'; const [, agentId] = sessionKey.split(':'); @@ -111,30 +112,24 @@ export function createSessionActions( get().loadHistory(); } - // Background: fetch first user message for every non-main session to populate labels upfront. - // Retries on "gateway startup" errors since the gateway may still be initializing. + // Background: fetch first user message for every non-main session to populate labels. + // Concurrency-limited to avoid flooding the gateway with parallel RPCs. + // By the time this runs, the gateway should already be fully ready (Sidebar + // gates on gatewayReady), so no startup-retry loop is needed. const sessionsToLabel = sessionsWithCurrent.filter((s) => !s.key.endsWith(':main')); if (sessionsToLabel.length > 0) { - const LABEL_RETRY_DELAYS = [2_000, 5_000, 10_000]; void (async () => { - let pending = sessionsToLabel; - for (let attempt = 0; attempt <= LABEL_RETRY_DELAYS.length; attempt += 1) { - const failed: typeof pending = []; + for (let i = 0; i < sessionsToLabel.length; i += LABEL_FETCH_CONCURRENCY) { + const batch = sessionsToLabel.slice(i, i + LABEL_FETCH_CONCURRENCY); await Promise.all( - pending.map(async (session) => { + batch.map(async (session) => { try { const r = await invokeIpc( 'gateway:rpc', 'chat.history', { sessionKey: session.key, limit: 1000 }, ) as { success: boolean; result?: Record; error?: string }; - if (!r.success) { - if (classifyHistoryStartupRetryError(r.error) === 'gateway_startup') { - failed.push(session); - } - return; - } - if (!r.result) return; + if (!r.success || !r.result) return; const msgs = Array.isArray(r.result.messages) ? r.result.messages as RawMessage[] : []; const firstUser = msgs.find((m) => m.role === 'user'); const lastMsg = msgs[msgs.length - 1]; @@ -155,9 +150,6 @@ export function createSessionActions( } catch { /* ignore per-session errors */ } }), ); - if (failed.length === 0 || attempt >= LABEL_RETRY_DELAYS.length) break; - await sleep(LABEL_RETRY_DELAYS[attempt]!); - pending = failed; } })(); } diff --git a/src/types/gateway.ts b/src/types/gateway.ts index 3decd24a1..368d0f193 100644 --- a/src/types/gateway.ts +++ b/src/types/gateway.ts @@ -15,6 +15,8 @@ export interface GatewayStatus { connectedAt?: number; version?: string; reconnectAttempts?: number; + /** True once the gateway's internal subsystems (skills, plugins) are ready for RPC calls. */ + gatewayReady?: boolean; } /** diff --git a/tests/unit/gateway-event-dispatch.test.ts b/tests/unit/gateway-event-dispatch.test.ts new file mode 100644 index 000000000..42ecf5577 --- /dev/null +++ b/tests/unit/gateway-event-dispatch.test.ts @@ -0,0 +1,51 @@ +import { describe, expect, it, vi } from 'vitest'; +import { dispatchProtocolEvent } from '@electron/gateway/event-dispatch'; + +function createMockEmitter() { + const emitted: Array<{ event: string; payload: unknown }> = []; + return { + emit: vi.fn((event: string, payload: unknown) => { + emitted.push({ event, payload }); + return true; + }), + emitted, + }; +} + +describe('dispatchProtocolEvent', () => { + it('dispatches gateway.ready event to gateway:ready', () => { + const emitter = createMockEmitter(); + dispatchProtocolEvent(emitter, 'gateway.ready', { version: '4.11' }); + expect(emitter.emit).toHaveBeenCalledWith('gateway:ready', { version: '4.11' }); + }); + + it('dispatches ready event to gateway:ready', () => { + const emitter = createMockEmitter(); + dispatchProtocolEvent(emitter, 'ready', { skills: 31 }); + expect(emitter.emit).toHaveBeenCalledWith('gateway:ready', { skills: 31 }); + }); + + it('dispatches channel.status to channel:status', () => { + const emitter = createMockEmitter(); + dispatchProtocolEvent(emitter, 'channel.status', { channelId: 'telegram', status: 'connected' }); + expect(emitter.emit).toHaveBeenCalledWith('channel:status', { channelId: 'telegram', status: 'connected' }); + }); + + it('dispatches chat to chat:message', () => { + const emitter = createMockEmitter(); + dispatchProtocolEvent(emitter, 'chat', { text: 'hello' }); + expect(emitter.emit).toHaveBeenCalledWith('chat:message', { message: { text: 'hello' } }); + }); + + it('suppresses tick events', () => { + const emitter = createMockEmitter(); + dispatchProtocolEvent(emitter, 'tick', {}); + expect(emitter.emit).not.toHaveBeenCalled(); + }); + + it('dispatches unknown events as notifications', () => { + const emitter = createMockEmitter(); + dispatchProtocolEvent(emitter, 'some.custom.event', { data: 1 }); + expect(emitter.emit).toHaveBeenCalledWith('notification', { method: 'some.custom.event', params: { data: 1 } }); + }); +}); diff --git a/tests/unit/gateway-events.test.ts b/tests/unit/gateway-events.test.ts index 8e10c5c42..d33273154 100644 --- a/tests/unit/gateway-events.test.ts +++ b/tests/unit/gateway-events.test.ts @@ -38,4 +38,42 @@ describe('gateway store event wiring', () => { handlers.get('gateway:status')?.({ state: 'stopped', port: 18789 }); expect(useGatewayStore.getState().status.state).toBe('stopped'); }); + + it('propagates gatewayReady field from status events', async () => { + hostApiFetchMock.mockResolvedValueOnce({ state: 'running', port: 18789, gatewayReady: false }); + + const handlers = new Map void>(); + subscribeHostEventMock.mockImplementation((eventName: string, handler: (payload: unknown) => void) => { + handlers.set(eventName, handler); + return () => {}; + }); + + const { useGatewayStore } = await import('@/stores/gateway'); + await useGatewayStore.getState().init(); + + // Initially gatewayReady=false from the status fetch + expect(useGatewayStore.getState().status.gatewayReady).toBe(false); + + // Simulate gateway.ready event setting gatewayReady=true + handlers.get('gateway:status')?.({ state: 'running', port: 18789, gatewayReady: true }); + expect(useGatewayStore.getState().status.gatewayReady).toBe(true); + }); + + it('treats undefined gatewayReady as ready for backwards compatibility', async () => { + hostApiFetchMock.mockResolvedValueOnce({ state: 'running', port: 18789 }); + + const handlers = new Map void>(); + subscribeHostEventMock.mockImplementation((eventName: string, handler: (payload: unknown) => void) => { + handlers.set(eventName, handler); + return () => {}; + }); + + const { useGatewayStore } = await import('@/stores/gateway'); + await useGatewayStore.getState().init(); + + const status = useGatewayStore.getState().status; + // gatewayReady is undefined (old gateway version) — should be treated as ready + expect(status.gatewayReady).toBeUndefined(); + expect(status.state === 'running' && status.gatewayReady !== false).toBe(true); + }); }); diff --git a/tests/unit/gateway-ready-fallback.test.ts b/tests/unit/gateway-ready-fallback.test.ts new file mode 100644 index 000000000..8cd9596ad --- /dev/null +++ b/tests/unit/gateway-ready-fallback.test.ts @@ -0,0 +1,127 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +vi.mock('electron', () => ({ + app: { + getPath: () => '/tmp', + isPackaged: false, + }, + utilityProcess: {}, +})); + +vi.mock('@electron/utils/logger', () => ({ + logger: { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }, +})); + +vi.mock('@electron/utils/config', () => ({ + PORTS: { OPENCLAW_GATEWAY: 18789 }, +})); + +describe('GatewayManager gatewayReady fallback', () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.clearAllMocks(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it('sets gatewayReady=false when entering starting state', async () => { + vi.resetModules(); + const { GatewayManager } = await import('@electron/gateway/manager'); + const manager = new GatewayManager(); + + const statusUpdates: Array<{ gatewayReady?: boolean }> = []; + manager.on('status', (status: { gatewayReady?: boolean }) => { + statusUpdates.push({ gatewayReady: status.gatewayReady }); + }); + + // Simulate start attempt (will fail but we can check the initial status) + try { + await manager.start(); + } catch { + // expected to fail — no actual gateway process + } + + const startingUpdate = statusUpdates.find((u) => u.gatewayReady === false); + expect(startingUpdate).toBeDefined(); + }); + + it('emits gatewayReady=true when gateway:ready event is received', async () => { + vi.resetModules(); + const { GatewayManager } = await import('@electron/gateway/manager'); + const manager = new GatewayManager(); + + // Force internal state to 'running' for the test + const stateController = (manager as unknown as { stateController: { setStatus: (u: Record) => void } }).stateController; + stateController.setStatus({ state: 'running', connectedAt: Date.now() }); + + const statusUpdates: Array<{ gatewayReady?: boolean; state: string }> = []; + manager.on('status', (status: { gatewayReady?: boolean; state: string }) => { + statusUpdates.push({ gatewayReady: status.gatewayReady, state: status.state }); + }); + + manager.emit('gateway:ready', {}); + + const readyUpdate = statusUpdates.find((u) => u.gatewayReady === true); + expect(readyUpdate).toBeDefined(); + }); + + it('auto-sets gatewayReady=true after fallback timeout if no event received', async () => { + vi.resetModules(); + const { GatewayManager } = await import('@electron/gateway/manager'); + const manager = new GatewayManager(); + + // Force internal state to 'running' without gatewayReady + const stateController = (manager as unknown as { stateController: { setStatus: (u: Record) => void } }).stateController; + stateController.setStatus({ state: 'running', connectedAt: Date.now() }); + + const statusUpdates: Array<{ gatewayReady?: boolean }> = []; + manager.on('status', (status: { gatewayReady?: boolean }) => { + statusUpdates.push({ gatewayReady: status.gatewayReady }); + }); + + // Call the private scheduleGatewayReadyFallback method + (manager as unknown as { scheduleGatewayReadyFallback: () => void }).scheduleGatewayReadyFallback(); + + // Before timeout, no gatewayReady update + vi.advanceTimersByTime(29_000); + expect(statusUpdates.find((u) => u.gatewayReady === true)).toBeUndefined(); + + // After 30s fallback timeout + vi.advanceTimersByTime(2_000); + const readyUpdate = statusUpdates.find((u) => u.gatewayReady === true); + expect(readyUpdate).toBeDefined(); + }); + + it('cancels fallback timer when gateway:ready event arrives first', async () => { + vi.resetModules(); + const { GatewayManager } = await import('@electron/gateway/manager'); + const manager = new GatewayManager(); + + const stateController = (manager as unknown as { stateController: { setStatus: (u: Record) => void } }).stateController; + stateController.setStatus({ state: 'running', connectedAt: Date.now() }); + + const statusUpdates: Array<{ gatewayReady?: boolean }> = []; + manager.on('status', (status: { gatewayReady?: boolean }) => { + statusUpdates.push({ gatewayReady: status.gatewayReady }); + }); + + // Schedule fallback + (manager as unknown as { scheduleGatewayReadyFallback: () => void }).scheduleGatewayReadyFallback(); + + // gateway:ready event arrives at 5s + vi.advanceTimersByTime(5_000); + manager.emit('gateway:ready', {}); + expect(statusUpdates.filter((u) => u.gatewayReady === true)).toHaveLength(1); + + // After 30s, no duplicate gatewayReady=true + vi.advanceTimersByTime(30_000); + expect(statusUpdates.filter((u) => u.gatewayReady === true)).toHaveLength(1); + }); +}); diff --git a/tests/unit/session-label-fetch.test.ts b/tests/unit/session-label-fetch.test.ts new file mode 100644 index 000000000..55d6296ff --- /dev/null +++ b/tests/unit/session-label-fetch.test.ts @@ -0,0 +1,89 @@ +import { describe, expect, it, vi, beforeEach } from 'vitest'; + +const invokeIpcMock = vi.fn(); + +vi.mock('@/lib/api-client', () => ({ + invokeIpc: (...args: unknown[]) => invokeIpcMock(...args), +})); + +vi.mock('@/stores/chat/helpers', () => ({ + getCanonicalPrefixFromSessions: () => 'agent:main', + getMessageText: (content: unknown) => typeof content === 'string' ? content : '', + toMs: (v: unknown) => typeof v === 'number' ? v : 0, +})); + +describe('session label fetch concurrency', () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it('limits concurrent chat.history RPCs during label fetches', async () => { + // Track max concurrent RPCs + let currentConcurrency = 0; + let maxConcurrency = 0; + const resolvers: Array<() => void> = []; + + invokeIpcMock.mockImplementation(async (channel: string, method: string) => { + if (method === 'sessions.list') { + return { + success: true, + result: { + sessions: Array.from({ length: 12 }, (_, i) => ({ + key: `agent:main:session-${i}`, + label: `Session ${i}`, + })), + }, + }; + } + if (method === 'chat.history') { + currentConcurrency++; + maxConcurrency = Math.max(maxConcurrency, currentConcurrency); + await new Promise((resolve) => resolvers.push(resolve)); + currentConcurrency--; + return { + success: true, + result: { + messages: [{ role: 'user', content: 'hello', timestamp: Date.now() }], + }, + }; + } + return { success: false }; + }); + + vi.resetModules(); + const { createSessionActions } = await import('@/stores/chat/session-actions'); + const state = { + currentSessionKey: 'agent:main:main', + messages: [], + sessions: [], + sessionLabels: {}, + sessionLastActivity: {}, + }; + const set = vi.fn(); + const get = vi.fn().mockReturnValue({ + ...state, + loadHistory: vi.fn(), + }); + + const actions = createSessionActions(set as never, get as never); + await actions.loadSessions(); + + // Wait for the label-fetch loop to start its batches + await new Promise((resolve) => setTimeout(resolve, 10)); + + // Resolve first batch (up to 5 concurrent) + while (resolvers.length > 0 && resolvers.length <= 5) { + resolvers.shift()?.(); + await new Promise((resolve) => setTimeout(resolve, 0)); + } + + // Resolve remaining + while (resolvers.length > 0) { + resolvers.shift()?.(); + await new Promise((resolve) => setTimeout(resolve, 0)); + } + + // maxConcurrency should be capped at 5 (LABEL_FETCH_CONCURRENCY) + expect(maxConcurrency).toBeLessThanOrEqual(5); + }); +});