diff --git a/.github/workflows/comms-regression.yml b/.github/workflows/comms-regression.yml new file mode 100644 index 000000000..6adf7b0c5 --- /dev/null +++ b/.github/workflows/comms-regression.yml @@ -0,0 +1,52 @@ +name: Comms Regression + +on: + workflow_dispatch: + pull_request: + branches: + - main + paths: + - 'src/lib/api-client.ts' + - 'src/lib/host-api.ts' + - 'src/stores/gateway.ts' + - 'src/stores/chat.ts' + - 'electron/gateway/**' + - 'electron/main/ipc-handlers.ts' + - 'electron/utils/logger.ts' + - 'scripts/comms/**' + - 'tests/unit/gateway-events.test.ts' + - '.github/workflows/comms-regression.yml' + +jobs: + comms-regression: + runs-on: ubuntu-latest + env: + ELECTRON_SKIP_BINARY_DOWNLOAD: '1' + steps: + - name: Checkout code + uses: actions/checkout@v6 + + - name: Setup pnpm + uses: pnpm/action-setup@v4 + + - name: Setup Node.js + uses: actions/setup-node@v6 + with: + node-version: '24' + cache: 'pnpm' + + - name: Install dependencies + run: pnpm install --frozen-lockfile + + - name: Run comms replay + run: pnpm run comms:replay + + - name: Compare with baseline + run: pnpm run comms:compare + + - name: Upload comms artifacts + if: always() + uses: actions/upload-artifact@v4 + with: + name: comms-regression-artifacts + path: artifacts/comms diff --git a/.gitignore b/.gitignore index 06f508a36..a2dcb7eba 100644 --- a/.gitignore +++ b/.gitignore @@ -59,6 +59,8 @@ resources/bin *.key build/ +artifacts/ +docs/pr-session-notes-*.md .cursor/ -.pnpm-store/ \ No newline at end of file +.pnpm-store/ diff --git a/AGENTS.md b/AGENTS.md index 6c1421cf7..ffae5737a 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -17,6 +17,9 @@ Standard dev commands are in `package.json` scripts and `README.md`. Key ones: | Lint (ESLint, auto-fix) | `pnpm run lint` | | Type check | `pnpm run typecheck` | | Unit tests (Vitest) | `pnpm test` | +| Comms replay metrics | `pnpm run comms:replay` | +| Comms baseline refresh | `pnpm run comms:baseline` | +| Comms regression compare | `pnpm run comms:compare` | | E2E tests (Playwright) | `pnpm run test:e2e` | | Build frontend only | `pnpm run build:vite` | @@ -38,4 +41,5 @@ Standard dev commands are in `package.json` scripts and `README.md`. Key ones: - Do not add new direct `window.electron.ipcRenderer.invoke(...)` calls in pages/components; expose them through host-api/api-client instead. - Do not call Gateway HTTP endpoints directly from renderer (`fetch('http://127.0.0.1:18789/...')` etc.). Use Main-process proxy channels (`hostapi:fetch`, `gateway:httpProxy`) to avoid CORS/env drift. - Transport policy is Main-owned and fixed as `WS -> HTTP -> IPC fallback`; renderer should not implement protocol switching UI/business logic. +- **Comms-change checklist**: If your change touches communication paths (gateway events, runtime send/receive, delivery, or fallback), run `pnpm run comms:replay` and `pnpm run comms:compare` before pushing. - **Doc sync rule**: After any functional or architecture change, review `README.md`, `README.zh-CN.md`, and `README.ja-JP.md` for required updates; if behavior/flows/interfaces changed, update docs in the same PR/commit. diff --git a/README.ja-JP.md b/README.ja-JP.md index 8578a42fc..0fef32027 100644 --- a/README.ja-JP.md +++ b/README.ja-JP.md @@ -316,6 +316,9 @@ pnpm typecheck # TypeScriptの型チェック # テスト pnpm test # ユニットテストを実行 +pnpm run comms:replay # 通信リプレイ指標を算出 +pnpm run comms:baseline # 通信ベースラインを更新 +pnpm run comms:compare # リプレイ指標をベースライン閾値と比較 # ビルド&パッケージ pnpm run build:vite # フロントエンドのみビルド @@ -325,6 +328,17 @@ pnpm package:mac # macOS向けにパッケージ化 pnpm package:win # Windows向けにパッケージ化 pnpm package:linux # Linux向けにパッケージ化 ``` + +### 通信回帰チェック + +PR が通信経路(Gateway イベント、Chat 送受信フロー、Channel 配信、トランスポートのフォールバック)に触れる場合は、次を実行してください。 + +```bash +pnpm run comms:replay +pnpm run comms:compare +``` + +CI の `comms-regression` が必須シナリオと閾値を検証します。 ### 技術スタック | レイヤー | 技術 | diff --git a/README.md b/README.md index a4bff5276..a77ca132e 100644 --- a/README.md +++ b/README.md @@ -320,6 +320,9 @@ pnpm typecheck # TypeScript validation # Testing pnpm test # Run unit tests +pnpm run comms:replay # Compute communication replay metrics +pnpm run comms:baseline # Refresh communication baseline snapshot +pnpm run comms:compare # Compare replay metrics against baseline thresholds # Build & Package pnpm run build:vite # Build frontend only @@ -329,6 +332,17 @@ pnpm package:mac # Package for macOS pnpm package:win # Package for Windows pnpm package:linux # Package for Linux ``` + +### Communication Regression Checks + +When a PR changes communication paths (gateway events, chat runtime send/receive flow, channel delivery, or transport fallback), run: + +```bash +pnpm run comms:replay +pnpm run comms:compare +``` + +`comms-regression` in CI enforces required scenarios and threshold checks. ### Tech Stack | Layer | Technology | diff --git a/README.zh-CN.md b/README.zh-CN.md index 779890a2d..2bee40711 100644 --- a/README.zh-CN.md +++ b/README.zh-CN.md @@ -320,6 +320,9 @@ pnpm typecheck # TypeScript 类型检查 # 测试 pnpm test # 运行单元测试 +pnpm run comms:replay # 计算通信回放指标 +pnpm run comms:baseline # 刷新通信基线快照 +pnpm run comms:compare # 将回放指标与基线阈值对比 # 构建与打包 pnpm run build:vite # 仅构建前端 @@ -329,6 +332,17 @@ pnpm package:mac # 为 macOS 打包 pnpm package:win # 为 Windows 打包 pnpm package:linux # 为 Linux 打包 ``` + +### 通信回归检查 + +当 PR 涉及通信链路(Gateway 事件、Chat 收发流程、Channel 投递、传输回退)时,建议执行: + +```bash +pnpm run comms:replay +pnpm run comms:compare +``` + +CI 中的 `comms-regression` 会校验必选场景与阈值。 ### 技术栈 | 层级 | 技术 | diff --git a/electron/api/routes/channels.ts b/electron/api/routes/channels.ts index 7f1ce950a..da0223428 100644 --- a/electron/api/routes/channels.ts +++ b/electron/api/routes/channels.ts @@ -12,7 +12,7 @@ import { validateChannelConfig, validateChannelCredentials, } from '../../utils/channel-config'; -import { clearAllBindingsForChannel } from '../../utils/agent-config'; +import { assignChannelToAgent, clearAllBindingsForChannel } from '../../utils/agent-config'; import { whatsAppLoginManager } from '../../utils/whatsapp-login'; import type { HostApiContext } from '../context'; import { parseJsonBody, sendJson } from '../route-utils'; @@ -25,6 +25,25 @@ function scheduleGatewayChannelRestart(ctx: HostApiContext, reason: string): voi void reason; } +const FORCE_RESTART_CHANNELS = new Set(['dingtalk', 'wecom', 'feishu', 'whatsapp']); + +function scheduleGatewayChannelSaveRefresh( + ctx: HostApiContext, + channelType: string, + reason: string, +): void { + if (ctx.gatewayManager.getStatus().state === 'stopped') { + return; + } + if (FORCE_RESTART_CHANNELS.has(channelType)) { + ctx.gatewayManager.debouncedRestart(); + void reason; + return; + } + ctx.gatewayManager.debouncedReload(); + void reason; +} + // ── Generic plugin installer with version-aware upgrades ───────── function readPluginVersion(pkgJsonPath: string): string | null { @@ -119,6 +138,49 @@ function ensureQQBotPluginInstalled(): { installed: boolean; warning?: string } return ensurePluginInstalled('qqbot', buildCandidateSources('qqbot'), 'QQ Bot'); } +function toComparableConfig(input: Record): Record { + const next: Record = {}; + for (const [key, value] of Object.entries(input)) { + if (value === undefined || value === null) continue; + if (typeof value === 'string') { + next[key] = value.trim(); + continue; + } + if (typeof value === 'number' || typeof value === 'boolean') { + next[key] = String(value); + } + } + return next; +} + +function isSameConfigValues( + existing: Record | undefined, + incoming: Record, +): boolean { + if (!existing) return false; + const next = toComparableConfig(incoming); + const keys = new Set([...Object.keys(existing), ...Object.keys(next)]); + if (keys.size === 0) return false; + for (const key of keys) { + if ((existing[key] ?? '') !== (next[key] ?? '')) { + return false; + } + } + return true; +} + +function inferAgentIdFromAccountId(accountId: string): string { + if (accountId === 'default') return 'main'; + return accountId; +} + +async function ensureScopedChannelBinding(channelType: string, accountId?: string): Promise { + // Multi-agent safety: only bind when the caller explicitly scopes the account. + // Global channel saves (no accountId) must not override routing to "main". + if (!accountId) return; + await assignChannelToAgent(inferAgentIdFromAccountId(accountId), channelType).catch(() => undefined); +} + export async function handleChannelRoutes( req: IncomingMessage, res: ServerResponse, @@ -202,8 +264,15 @@ export async function handleChannelRoutes( return true; } } + const existingValues = await getChannelFormValues(body.channelType, body.accountId); + if (isSameConfigValues(existingValues, body.config)) { + await ensureScopedChannelBinding(body.channelType, body.accountId); + sendJson(res, 200, { success: true, noChange: true }); + return true; + } await saveChannelConfig(body.channelType, body.config, body.accountId); - scheduleGatewayChannelRestart(ctx, `channel:saveConfig:${body.channelType}`); + await ensureScopedChannelBinding(body.channelType, body.accountId); + scheduleGatewayChannelSaveRefresh(ctx, body.channelType, `channel:saveConfig:${body.channelType}`); sendJson(res, 200, { success: true }); } catch (error) { sendJson(res, 500, { success: false, error: String(error) }); diff --git a/electron/api/routes/providers.ts b/electron/api/routes/providers.ts index 78b7945f3..0e7cc78d3 100644 --- a/electron/api/routes/providers.ts +++ b/electron/api/routes/providers.ts @@ -25,6 +25,16 @@ import { logger } from '../../utils/logger'; const legacyProviderRoutesWarned = new Set(); +function hasObjectChanges>( + existing: T, + patch: Partial | undefined, +): boolean { + if (!patch) return false; + const keys = Object.keys(patch) as Array; + if (keys.length === 0) return false; + return keys.some((key) => JSON.stringify(existing[key]) !== JSON.stringify(patch[key])); +} + export async function handleProviderRoutes( req: IncomingMessage, res: ServerResponse, @@ -70,6 +80,11 @@ export async function handleProviderRoutes( if (url.pathname === '/api/provider-accounts/default' && req.method === 'PUT') { try { const body = await parseJsonBody<{ accountId: string }>(req); + const currentDefault = await providerService.getDefaultAccountId(); + if (currentDefault === body.accountId) { + sendJson(res, 200, { success: true, noChange: true }); + return true; + } await providerService.setDefaultAccount(body.accountId); await syncDefaultProviderToRuntime(body.accountId, ctx.gatewayManager); sendJson(res, 200, { success: true }); @@ -94,6 +109,11 @@ export async function handleProviderRoutes( sendJson(res, 404, { success: false, error: 'Provider account not found' }); return true; } + const hasPatchChanges = hasObjectChanges(existing as unknown as Record, body.updates); + if (!hasPatchChanges && body.apiKey === undefined) { + sendJson(res, 200, { success: true, noChange: true, account: existing }); + return true; + } const nextAccount = await providerService.updateAccount(accountId, body.updates, body.apiKey); await syncUpdatedProviderToRuntime(providerAccountToConfig(nextAccount), body.apiKey, ctx.gatewayManager); sendJson(res, 200, { success: true, account: nextAccount }); @@ -152,6 +172,11 @@ export async function handleProviderRoutes( logLegacyProviderRoute('PUT /api/providers/default'); try { const body = await parseJsonBody<{ providerId: string }>(req); + const currentDefault = await providerService.getDefaultLegacyProvider(); + if (currentDefault === body.providerId) { + sendJson(res, 200, { success: true, noChange: true }); + return true; + } await providerService.setDefaultLegacyProvider(body.providerId); await syncDefaultProviderToRuntime(body.providerId, ctx.gatewayManager); sendJson(res, 200, { success: true }); @@ -280,6 +305,11 @@ export async function handleProviderRoutes( sendJson(res, 404, { success: false, error: 'Provider not found' }); return true; } + const hasPatchChanges = hasObjectChanges(existing as unknown as Record, body.updates); + if (!hasPatchChanges && body.apiKey === undefined) { + sendJson(res, 200, { success: true, noChange: true }); + return true; + } const nextConfig: ProviderConfig = { ...existing, ...body.updates, updatedAt: new Date().toISOString() }; await providerService.saveLegacyProvider(nextConfig); if (body.apiKey !== undefined) { diff --git a/electron/gateway/event-dispatch.ts b/electron/gateway/event-dispatch.ts index c5af9da7e..6e9427265 100644 --- a/electron/gateway/event-dispatch.ts +++ b/electron/gateway/event-dispatch.ts @@ -17,18 +17,8 @@ export function dispatchProtocolEvent( emitter.emit('chat:message', { message: payload }); break; case 'agent': { - const p = payload as Record; - const data = (p.data && typeof p.data === 'object') ? p.data as Record : {}; - const chatEvent: Record = { - ...data, - runId: p.runId ?? data.runId, - sessionKey: p.sessionKey ?? data.sessionKey, - state: p.state ?? data.state, - message: p.message ?? data.message, - }; - if (chatEvent.state || chatEvent.message) { - emitter.emit('chat:message', { message: chatEvent }); - } + // Keep "agent" on the canonical notification path to avoid double + // handling in renderer when both notification and chat-message are wired. emitter.emit('notification', { method: event, params: payload }); break; } diff --git a/electron/gateway/manager.ts b/electron/gateway/manager.ts index 82255f894..88eea6f56 100644 --- a/electron/gateway/manager.ts +++ b/electron/gateway/manager.ts @@ -9,6 +9,7 @@ import WebSocket from 'ws'; import { PORTS } from '../utils/config'; import { JsonRpcNotification, isNotification, isResponse } from './protocol'; import { logger } from '../utils/logger'; +import { captureTelemetryEvent, trackMetric } from '../utils/telemetry'; import { loadOrCreateDeviceIdentity, type DeviceIdentity, @@ -95,6 +96,10 @@ export class GatewayManager extends EventEmitter { private readonly restartController = new GatewayRestartController(); private reloadDebounceTimer: NodeJS.Timeout | null = null; private externalShutdownSupported: boolean | null = null; + private lastRestartAt = 0; + private reconnectAttemptsTotal = 0; + private reconnectSuccessTotal = 0; + private static readonly RESTART_COOLDOWN_MS = 2500; constructor(config?: Partial) { super(); @@ -348,7 +353,18 @@ export class GatewayManager extends EventEmitter { return; } - logger.debug('Gateway restart requested'); + const now = Date.now(); + const sinceLastRestart = now - this.lastRestartAt; + if (sinceLastRestart < GatewayManager.RESTART_COOLDOWN_MS) { + logger.info( + `Gateway restart skipped due to cooldown (${sinceLastRestart}ms < ${GatewayManager.RESTART_COOLDOWN_MS}ms)`, + ); + return; + } + + const pidBefore = this.status.pid; + logger.info(`[gateway-refresh] mode=restart requested pidBefore=${pidBefore ?? 'n/a'}`); + this.lastRestartAt = now; this.restartInFlight = (async () => { await this.stop(); await this.start(); @@ -356,6 +372,9 @@ export class GatewayManager extends EventEmitter { try { await this.restartInFlight; + logger.info( + `[gateway-refresh] mode=restart result=applied pidBefore=${pidBefore ?? 'n/a'} pidAfter=${this.status.pid ?? 'n/a'}`, + ); } finally { this.restartInFlight = null; this.restartController.flushDeferredRestart( @@ -405,13 +424,18 @@ export class GatewayManager extends EventEmitter { return; } + const pidBefore = this.process?.pid; + logger.info(`[gateway-refresh] mode=reload requested pid=${pidBefore ?? 'n/a'} state=${this.status.state}`); + if (!this.process?.pid || this.status.state !== 'running') { + logger.warn('[gateway-refresh] mode=reload result=fallback_restart cause=not_running'); logger.warn('Gateway reload requested while not running; falling back to restart'); await this.restart(); return; } if (process.platform === 'win32') { + logger.warn('[gateway-refresh] mode=reload result=fallback_restart cause=windows'); logger.debug('Windows detected, falling back to Gateway restart for reload'); await this.restart(); return; @@ -423,6 +447,9 @@ export class GatewayManager extends EventEmitter { // Avoid signaling a process that just came up; it will already read latest config. if (connectedForMs < 8000) { + logger.info( + `[gateway-refresh] mode=reload result=skipped_recent_connect connectedForMs=${connectedForMs} pid=${this.process.pid}`, + ); logger.info(`Gateway connected ${connectedForMs}ms ago, skipping reload signal`); return; } @@ -434,10 +461,17 @@ export class GatewayManager extends EventEmitter { // If process state doesn't recover quickly, fall back to restart. await new Promise((resolve) => setTimeout(resolve, 1500)); if (this.status.state !== 'running' || !this.process?.pid) { + logger.warn('[gateway-refresh] mode=reload result=fallback_restart cause=post_signal_unhealthy'); logger.warn('Gateway did not stay running after reload signal, falling back to restart'); await this.restart(); + } else { + const pidAfter = this.process.pid; + logger.info( + `[gateway-refresh] mode=reload result=applied_in_place pidBefore=${pidBefore} pidAfter=${pidAfter}`, + ); } } catch (error) { + logger.warn('[gateway-refresh] mode=reload result=fallback_restart cause=signal_error'); logger.warn('Gateway reload signal failed, falling back to restart:', error); await this.restart(); } @@ -731,9 +765,11 @@ export class GatewayManager extends EventEmitter { return; } + const cooldownRemaining = Math.max(0, GatewayManager.RESTART_COOLDOWN_MS - (Date.now() - this.lastRestartAt)); const { delay, nextAttempt, maxAttempts } = decision; + const effectiveDelay = Math.max(delay, cooldownRemaining); this.reconnectAttempts = nextAttempt; - logger.warn(`Scheduling Gateway reconnect attempt ${nextAttempt}/${maxAttempts} in ${delay}ms`); + logger.warn(`Scheduling Gateway reconnect attempt ${nextAttempt}/${maxAttempts} in ${effectiveDelay}ms`); this.setStatus({ state: 'reconnecting', @@ -752,16 +788,58 @@ export class GatewayManager extends EventEmitter { logger.debug(`Skipping reconnect attempt: ${skipReason}`); return; } + const attemptNo = this.reconnectAttempts; + this.reconnectAttemptsTotal += 1; try { // Use the guarded start() flow so reconnect attempts cannot bypass // lifecycle locking and accidentally start duplicate Gateway processes. await this.start(); + this.reconnectSuccessTotal += 1; + this.emitReconnectMetric('success', { + attemptNo, + maxAttempts, + delayMs: effectiveDelay, + }); this.reconnectAttempts = 0; } catch (error) { logger.error('Gateway reconnection attempt failed:', error); + this.emitReconnectMetric('failure', { + attemptNo, + maxAttempts, + delayMs: effectiveDelay, + error: error instanceof Error ? error.message : String(error), + }); this.scheduleReconnect(); } - }, delay); + }, effectiveDelay); + } + + private emitReconnectMetric( + outcome: 'success' | 'failure', + payload: { + attemptNo: number; + maxAttempts: number; + delayMs: number; + error?: string; + }, + ): void { + const successRate = this.reconnectAttemptsTotal > 0 + ? this.reconnectSuccessTotal / this.reconnectAttemptsTotal + : 0; + + const properties = { + outcome, + attemptNo: payload.attemptNo, + maxAttempts: payload.maxAttempts, + delayMs: payload.delayMs, + gateway_reconnect_success_count: this.reconnectSuccessTotal, + gateway_reconnect_attempt_count: this.reconnectAttemptsTotal, + gateway_reconnect_success_rate: Number(successRate.toFixed(4)), + ...(payload.error ? { error: payload.error } : {}), + }; + + trackMetric('gateway.reconnect', properties); + captureTelemetryEvent('gateway_reconnect', properties); } /** @@ -770,4 +848,4 @@ export class GatewayManager extends EventEmitter { private setStatus(update: Partial): void { this.stateController.setStatus(update); } -} \ No newline at end of file +} diff --git a/electron/gateway/startup-stderr.ts b/electron/gateway/startup-stderr.ts index b23f08936..fd7806937 100644 --- a/electron/gateway/startup-stderr.ts +++ b/electron/gateway/startup-stderr.ts @@ -18,6 +18,12 @@ export function classifyGatewayStderrMessage(message: string): GatewayStderrClas if (msg.includes('closed before connect') && msg.includes('token mismatch')) { return { level: 'drop', normalized: msg }; } + if (msg.includes('[ws] closed before connect') && msg.includes('code=1005')) { + return { level: 'debug', normalized: msg }; + } + if (msg.includes('security warning: dangerous config flags enabled')) { + return { level: 'debug', normalized: msg }; + } // Downgrade frequent non-fatal noise. if (msg.includes('ExperimentalWarning')) return { level: 'debug', normalized: msg }; diff --git a/electron/main/ipc-handlers.ts b/electron/main/ipc-handlers.ts index 49664a049..e16164eac 100644 --- a/electron/main/ipc-handlers.ts +++ b/electron/main/ipc-handlers.ts @@ -1159,13 +1159,18 @@ function registerGatewayHandlers( const controller = new AbortController(); const timer = setTimeout(() => controller.abort(), timeoutMs); - const response = await proxyAwareFetch(`http://127.0.0.1:${port}${path}`, { - method, - headers, - body, - signal: controller.signal, - }); - clearTimeout(timer); + const response = await (async () => { + try { + return await proxyAwareFetch(`http://127.0.0.1:${port}${path}`, { + method, + headers, + body, + signal: controller.signal, + }); + } finally { + clearTimeout(timer); + } + })(); const contentType = (response.headers.get('content-type') || '').toLowerCase(); if (contentType.includes('application/json')) { @@ -1350,6 +1355,8 @@ function registerGatewayHandlers( * For checking package status and channel configuration */ function registerOpenClawHandlers(gatewayManager: GatewayManager): void { + const forceRestartChannels = new Set(['dingtalk', 'wecom', 'feishu', 'whatsapp']); + const scheduleGatewayChannelRestart = (reason: string): void => { if (gatewayManager.getStatus().state !== 'stopped') { logger.info(`Scheduling Gateway restart after ${reason}`); @@ -1359,6 +1366,20 @@ function registerOpenClawHandlers(gatewayManager: GatewayManager): void { } }; + const scheduleGatewayChannelSaveRefresh = (channelType: string, reason: string): void => { + if (gatewayManager.getStatus().state === 'stopped') { + logger.info(`Gateway is stopped; skip immediate refresh after ${reason}`); + return; + } + if (forceRestartChannels.has(channelType)) { + logger.info(`Scheduling Gateway restart after ${reason}`); + gatewayManager.debouncedRestart(); + return; + } + logger.info(`Scheduling Gateway reload after ${reason}`); + gatewayManager.debouncedReload(); + }; + // ── Generic plugin installer with version-aware upgrades ───────── function readPluginVersion(pkgJsonPath: string): string | null { @@ -1517,7 +1538,7 @@ function registerOpenClawHandlers(gatewayManager: GatewayManager): void { }; } await saveChannelConfig(channelType, config); - scheduleGatewayChannelRestart(`channel:saveConfig (${channelType})`); + scheduleGatewayChannelSaveRefresh(channelType, `channel:saveConfig (${channelType})`); return { success: true, pluginInstalled: installResult.installed, @@ -1533,7 +1554,7 @@ function registerOpenClawHandlers(gatewayManager: GatewayManager): void { }; } await saveChannelConfig(channelType, config); - scheduleGatewayChannelRestart(`channel:saveConfig (${channelType})`); + scheduleGatewayChannelSaveRefresh(channelType, `channel:saveConfig (${channelType})`); return { success: true, pluginInstalled: installResult.installed, @@ -1549,12 +1570,7 @@ function registerOpenClawHandlers(gatewayManager: GatewayManager): void { }; } await saveChannelConfig(channelType, config); - if (gatewayManager.getStatus().state !== 'stopped') { - logger.info(`Scheduling Gateway reload after channel:saveConfig (${channelType})`); - gatewayManager.debouncedReload(); - } else { - logger.info(`Gateway is stopped; skip immediate reload after channel:saveConfig (${channelType})`); - } + scheduleGatewayChannelSaveRefresh(channelType, `channel:saveConfig (${channelType})`); return { success: true, pluginInstalled: installResult.installed, @@ -1570,7 +1586,7 @@ function registerOpenClawHandlers(gatewayManager: GatewayManager): void { }; } await saveChannelConfig(channelType, config); - scheduleGatewayChannelRestart(`channel:saveConfig (${channelType})`); + scheduleGatewayChannelSaveRefresh(channelType, `channel:saveConfig (${channelType})`); return { success: true, pluginInstalled: installResult.installed, @@ -1578,7 +1594,7 @@ function registerOpenClawHandlers(gatewayManager: GatewayManager): void { }; } await saveChannelConfig(channelType, config); - scheduleGatewayChannelRestart(`channel:saveConfig (${channelType})`); + scheduleGatewayChannelSaveRefresh(channelType, `channel:saveConfig (${channelType})`); return { success: true }; } catch (error) { console.error('Failed to save channel config:', error); @@ -1777,10 +1793,8 @@ function registerProviderHandlers(gatewayManager: GatewayManager): void { }; // Listen for OAuth success to automatically restart the Gateway with new tokens/configs. - // Use a longer debounce (8s) so that provider:setDefault — which writes the full config - // and then calls debouncedRestart(2s) — has time to fire and coalesce into a single - // restart. Without this, the OAuth restart fires first with stale config, and the - // subsequent provider:setDefault restart is deferred and dropped. + // Keep a longer debounce (8s) so provider config writes and OAuth token persistence + // can settle before applying the process-level refresh. deviceOAuthManager.on('oauth:success', ({ provider, accountId }) => { logger.info(`[IPC] Scheduling Gateway restart after ${provider} OAuth success for ${accountId}...`); gatewayManager.debouncedRestart(8000); diff --git a/electron/services/providers/provider-runtime-sync.ts b/electron/services/providers/provider-runtime-sync.ts index 45b3b4150..2ad2d5543 100644 --- a/electron/services/providers/provider-runtime-sync.ts +++ b/electron/services/providers/provider-runtime-sync.ts @@ -160,10 +160,12 @@ export async function getProviderFallbackModelRefs(config: ProviderConfig): Prom return results; } -function scheduleGatewayRestart( +type GatewayRefreshMode = 'reload' | 'restart'; + +function scheduleGatewayRefresh( gatewayManager: GatewayManager | undefined, message: string, - options?: { delayMs?: number; onlyIfRunning?: boolean }, + options?: { delayMs?: number; onlyIfRunning?: boolean; mode?: GatewayRefreshMode }, ): void { if (!gatewayManager) { return; @@ -174,7 +176,11 @@ function scheduleGatewayRestart( } logger.info(message); - gatewayManager.debouncedRestart(options?.delayMs); + if (options?.mode === 'restart') { + gatewayManager.debouncedRestart(options?.delayMs); + return; + } + gatewayManager.debouncedReload(options?.delayMs); } export async function syncProviderApiKeyToRuntime( @@ -340,9 +346,9 @@ export async function syncSavedProviderToRuntime( return; } - scheduleGatewayRestart( + scheduleGatewayRefresh( gatewayManager, - `Scheduling Gateway restart after saving provider "${context.runtimeProviderKey}" config`, + `Scheduling Gateway reload after saving provider "${context.runtimeProviderKey}" config`, ); } @@ -381,9 +387,9 @@ export async function syncUpdatedProviderToRuntime( } } - scheduleGatewayRestart( + scheduleGatewayRefresh( gatewayManager, - `Scheduling Gateway restart after updating provider "${ock}" config`, + `Scheduling Gateway reload after updating provider "${ock}" config`, ); } @@ -400,9 +406,10 @@ export async function syncDeletedProviderToRuntime( const ock = runtimeProviderKey ?? await resolveRuntimeProviderKey({ ...provider, id: providerId }); await removeProviderFromOpenClaw(ock); - scheduleGatewayRestart( + scheduleGatewayRefresh( gatewayManager, `Scheduling Gateway restart after deleting provider "${ock}"`, + { mode: 'restart' }, ); } @@ -487,9 +494,9 @@ export async function syncDefaultProviderToRuntime( await setOpenClawDefaultModel(browserOAuthRuntimeProvider, modelOverride, fallbackModels); logger.info(`Configured openclaw.json for browser OAuth provider "${provider.id}"`); - scheduleGatewayRestart( + scheduleGatewayRefresh( gatewayManager, - `Scheduling Gateway restart after provider switch to "${browserOAuthRuntimeProvider}"`, + `Scheduling Gateway reload after provider switch to "${browserOAuthRuntimeProvider}"`, ); return; } @@ -548,9 +555,9 @@ export async function syncDefaultProviderToRuntime( }); } - scheduleGatewayRestart( + scheduleGatewayRefresh( gatewayManager, - `Scheduling Gateway restart after provider switch to "${ock}"`, + `Scheduling Gateway reload after provider switch to "${ock}"`, { onlyIfRunning: true }, ); } diff --git a/electron/utils/logger.ts b/electron/utils/logger.ts index 576905cce..056f77dad 100644 --- a/electron/utils/logger.ts +++ b/electron/utils/logger.ts @@ -10,7 +10,7 @@ import { app } from 'electron'; import { join } from 'path'; import { existsSync, mkdirSync, appendFileSync } from 'fs'; -import { appendFile, readFile, readdir, stat } from 'fs/promises'; +import { appendFile, open, readdir, stat } from 'fs/promises'; /** * Log levels @@ -230,11 +230,33 @@ export function getRecentLogs(count?: number, minLevel?: LogLevel): string[] { */ export async function readLogFile(tailLines = 200): Promise { if (!logFilePath) return '(No log file found)'; + const safeTailLines = Math.max(1, Math.floor(tailLines)); try { - const content = await readFile(logFilePath, 'utf-8'); - const lines = content.split('\n'); - if (lines.length <= tailLines) return content; - return lines.slice(-tailLines).join('\n'); + const file = await open(logFilePath, 'r'); + try { + const fileStat = await file.stat(); + if (fileStat.size === 0) return ''; + + const chunkSize = 64 * 1024; + let position = fileStat.size; + let content = ''; + let lineCount = 0; + + while (position > 0 && lineCount <= safeTailLines) { + const bytesToRead = Math.min(chunkSize, position); + position -= bytesToRead; + const buffer = Buffer.allocUnsafe(bytesToRead); + await file.read(buffer, 0, bytesToRead, position); + content = `${buffer.toString('utf-8')}${content}`; + lineCount = content.split('\n').length - 1; + } + + const lines = content.split('\n'); + if (lines.length <= safeTailLines) return content; + return lines.slice(-safeTailLines).join('\n'); + } finally { + await file.close(); + } } catch (err) { return `(Failed to read log file: ${err})`; } diff --git a/electron/utils/telemetry.ts b/electron/utils/telemetry.ts index 3405e1157..526d1bc45 100644 --- a/electron/utils/telemetry.ts +++ b/electron/utils/telemetry.ts @@ -11,6 +11,15 @@ const TELEMETRY_SHUTDOWN_TIMEOUT_MS = 1500; let posthogClient: PostHog | null = null; let distinctId: string = ''; +function getCommonProperties(): Record { + return { + $app_version: app.getVersion(), + $os: process.platform, + os_tag: process.platform, + arch: process.arch, + }; +} + function isIgnorablePostHogShutdownError(error: unknown): boolean { if (!(error instanceof Error)) { return false; @@ -54,12 +63,7 @@ export async function initTelemetry(): Promise { logger.debug(`Generated new machine ID for telemetry: ${distinctId}`); } - // Common properties for all events - const properties = { - $app_version: app.getVersion(), - $os: process.platform, - arch: process.arch, - }; + const properties = getCommonProperties(); // Check if this is a new installation const hasReportedInstall = await getSetting('hasReportedInstall'); @@ -86,6 +90,29 @@ export async function initTelemetry(): Promise { } } +export function trackMetric(event: string, properties: Record = {}): void { + logger.info(`[metric] ${event}`, properties); +} + +export function captureTelemetryEvent(event: string, properties: Record = {}): void { + if (!posthogClient || !distinctId) { + return; + } + + try { + posthogClient.capture({ + distinctId, + event, + properties: { + ...getCommonProperties(), + ...properties, + }, + }); + } catch (error) { + logger.debug(`Failed to capture telemetry event "${event}":`, error); + } +} + /** * Best-effort telemetry shutdown that never blocks app exit on network issues. */ diff --git a/package.json b/package.json index a08617f68..5b077d00b 100644 --- a/package.json +++ b/package.json @@ -31,6 +31,9 @@ "lint": "eslint . --fix", "typecheck": "tsc --noEmit", "test": "vitest run", + "comms:replay": "node scripts/comms/replay.mjs", + "comms:baseline": "node scripts/comms/baseline.mjs", + "comms:compare": "node scripts/comms/compare.mjs", "uv:download": "zx scripts/download-bundled-uv.mjs", "uv:download:mac": "zx scripts/download-bundled-uv.mjs --platform=mac", "uv:download:win": "zx scripts/download-bundled-uv.mjs --platform=win", diff --git a/scripts/comms/baseline.mjs b/scripts/comms/baseline.mjs new file mode 100644 index 000000000..3bae24075 --- /dev/null +++ b/scripts/comms/baseline.mjs @@ -0,0 +1,22 @@ +import { mkdir, readFile, writeFile } from 'node:fs/promises'; +import path from 'node:path'; +import process from 'node:process'; + +const ROOT = path.resolve(path.dirname(new URL(import.meta.url).pathname), '..', '..'); +const CURRENT_FILE = path.join(ROOT, 'artifacts/comms/current-metrics.json'); +const BASELINE_DIR = path.join(ROOT, 'scripts/comms/baseline'); +const BASELINE_FILE = path.join(BASELINE_DIR, 'metrics.baseline.json'); + +async function main() { + const raw = await readFile(CURRENT_FILE, 'utf8'); + const current = JSON.parse(raw); + + await mkdir(BASELINE_DIR, { recursive: true }); + await writeFile(BASELINE_FILE, JSON.stringify(current, null, 2)); + console.log(`Updated comms baseline: ${BASELINE_FILE}`); +} + +main().catch((error) => { + console.error('[comms:baseline] failed:', error); + process.exitCode = 1; +}); diff --git a/scripts/comms/baseline/metrics.baseline.json b/scripts/comms/baseline/metrics.baseline.json new file mode 100644 index 000000000..de097a627 --- /dev/null +++ b/scripts/comms/baseline/metrics.baseline.json @@ -0,0 +1,126 @@ +{ + "generated_at": "2026-03-14T15:02:39.817Z", + "scenario": "all", + "scenarios": { + "gateway-restart-during-run": { + "duplicate_event_rate": 0, + "event_fanout_ratio": 1, + "history_inflight_max": 1, + "history_load_qps": 0.4166666666666667, + "rpc_p50_ms": 240, + "rpc_p95_ms": 240, + "rpc_timeout_rate": 0, + "gateway_reconnect_count": 1, + "message_loss_count": 0, + "message_order_violation_count": 0, + "_meta": { + "duration_sec": 2.4, + "total_gateway_events": 4, + "unique_gateway_events": 4, + "total_rpc_calls": 1 + } + }, + "happy-path-chat": { + "duplicate_event_rate": 0, + "event_fanout_ratio": 1, + "history_inflight_max": 1, + "history_load_qps": 0.2857142857142857, + "rpc_p50_ms": 180, + "rpc_p95_ms": 180, + "rpc_timeout_rate": 0, + "gateway_reconnect_count": 0, + "message_loss_count": 0, + "message_order_violation_count": 0, + "_meta": { + "duration_sec": 3.5, + "total_gateway_events": 3, + "unique_gateway_events": 3, + "total_rpc_calls": 1 + } + }, + "history-overlap-guard": { + "duplicate_event_rate": 0, + "event_fanout_ratio": 1, + "history_inflight_max": 1, + "history_load_qps": 1, + "rpc_p50_ms": 95, + "rpc_p95_ms": 95, + "rpc_timeout_rate": 0, + "gateway_reconnect_count": 0, + "message_loss_count": 0, + "message_order_violation_count": 0, + "_meta": { + "duration_sec": 2, + "total_gateway_events": 2, + "unique_gateway_events": 2, + "total_rpc_calls": 1 + } + }, + "invalid-config-patch-recovered": { + "duplicate_event_rate": 0, + "event_fanout_ratio": 1, + "history_inflight_max": 1, + "history_load_qps": 0.4166666666666667, + "rpc_p50_ms": 110, + "rpc_p95_ms": 130, + "rpc_timeout_rate": 0, + "gateway_reconnect_count": 1, + "message_loss_count": 0, + "message_order_violation_count": 0, + "_meta": { + "duration_sec": 2.4, + "total_gateway_events": 3, + "unique_gateway_events": 3, + "total_rpc_calls": 2 + } + }, + "multi-agent-channel-switch": { + "duplicate_event_rate": 0, + "event_fanout_ratio": 1, + "history_inflight_max": 0, + "history_load_qps": 0, + "rpc_p50_ms": 210, + "rpc_p95_ms": 240, + "rpc_timeout_rate": 0, + "gateway_reconnect_count": 0, + "message_loss_count": 0, + "message_order_violation_count": 0, + "_meta": { + "duration_sec": 2.1, + "total_gateway_events": 5, + "unique_gateway_events": 5, + "total_rpc_calls": 2 + } + }, + "network-degraded": { + "duplicate_event_rate": 0, + "event_fanout_ratio": 1, + "history_inflight_max": 1, + "history_load_qps": 0.35714285714285715, + "rpc_p50_ms": 420, + "rpc_p95_ms": 820, + "rpc_timeout_rate": 0, + "gateway_reconnect_count": 1, + "message_loss_count": 0, + "message_order_violation_count": 0, + "_meta": { + "duration_sec": 2.8, + "total_gateway_events": 3, + "unique_gateway_events": 3, + "total_rpc_calls": 2 + } + } + }, + "aggregate": { + "duplicate_event_rate": 0, + "event_fanout_ratio": 1, + "history_inflight_max": 1, + "history_load_qps": 0.41269841269841273, + "rpc_p50_ms": 209.16666666666666, + "rpc_p95_ms": 284.1666666666667, + "rpc_timeout_rate": 0, + "gateway_reconnect_count": 3, + "message_loss_count": 0, + "message_order_violation_count": 0 + } +} \ No newline at end of file diff --git a/scripts/comms/compare.mjs b/scripts/comms/compare.mjs new file mode 100644 index 000000000..5283408e5 --- /dev/null +++ b/scripts/comms/compare.mjs @@ -0,0 +1,122 @@ +import { mkdir, readFile, writeFile } from 'node:fs/promises'; +import path from 'node:path'; +import process from 'node:process'; + +const ROOT = path.resolve(path.dirname(new URL(import.meta.url).pathname), '..', '..'); +const CURRENT_FILE = path.join(ROOT, 'artifacts/comms/current-metrics.json'); +const BASELINE_FILE = path.join(ROOT, 'scripts/comms/baseline/metrics.baseline.json'); +const OUTPUT_DIR = path.join(ROOT, 'artifacts/comms'); +const REPORT_FILE = path.join(OUTPUT_DIR, 'compare-report.md'); + +const HARD_THRESHOLDS = { + duplicate_event_rate: 0.005, + event_fanout_ratio: 1.2, + history_inflight_max: 1, + rpc_timeout_rate: 0.01, + message_loss_count: 0, + message_order_violation_count: 0, +}; + +const RELATIVE_THRESHOLDS = { + history_load_qps: 0.10, + rpc_p95_ms: 0.15, +}; + +const REQUIRED_SCENARIOS = [ + 'gateway-restart-during-run', + 'happy-path-chat', + 'history-overlap-guard', + 'invalid-config-patch-recovered', + 'multi-agent-channel-switch', + 'network-degraded', +]; + +function ratioDelta(current, baseline) { + if (!Number.isFinite(baseline) || baseline === 0) return current === 0 ? 0 : Infinity; + return (current - baseline) / baseline; +} + +function fmtPercent(value) { + return `${(value * 100).toFixed(2)}%`; +} + +function fmtNumber(value) { + return Number.isFinite(value) ? Number(value).toFixed(4) : String(value); +} + +export function evaluateReport(current, baseline) { + const c = current.aggregate ?? {}; + const b = baseline.aggregate ?? {}; + const scenarios = current.scenarios ?? {}; + const failures = []; + const rows = []; + + for (const scenario of REQUIRED_SCENARIOS) { + if (!scenarios[scenario]) { + failures.push(`missing scenario: ${scenario}`); + rows.push(`| scenario:${scenario} | missing | required | FAIL |`); + continue; + } + const scenarioMetrics = scenarios[scenario]; + for (const [metric, threshold] of Object.entries(HARD_THRESHOLDS)) { + const cv = Number(scenarioMetrics[metric] ?? 0); + const pass = cv <= threshold; + if (!pass) failures.push(`scenario:${scenario} ${metric}=${cv} > ${threshold}`); + rows.push(`| ${scenario}.${metric} | ${fmtNumber(cv)} | <= ${threshold} | ${pass ? 'PASS' : 'FAIL'} |`); + } + } + + for (const [metric, threshold] of Object.entries(HARD_THRESHOLDS)) { + const cv = Number(c[metric] ?? 0); + const pass = cv <= threshold; + if (!pass) failures.push(`${metric}=${cv} > ${threshold}`); + rows.push(`| ${metric} | ${fmtNumber(cv)} | <= ${threshold} | ${pass ? 'PASS' : 'FAIL'} |`); + } + + for (const [metric, maxIncrease] of Object.entries(RELATIVE_THRESHOLDS)) { + const cv = Number(c[metric] ?? 0); + const bv = Number(b[metric] ?? 0); + const delta = ratioDelta(cv, bv); + const pass = delta <= maxIncrease; + if (!pass) failures.push(`${metric} delta=${delta} > ${maxIncrease}`); + rows.push(`| ${metric} | ${fmtNumber(cv)} (baseline ${fmtNumber(bv)}) | delta <= ${fmtPercent(maxIncrease)} | ${pass ? 'PASS' : 'FAIL'} (${fmtPercent(delta)}) |`); + } + + return { failures, rows }; +} + +export async function main() { + const current = JSON.parse(await readFile(CURRENT_FILE, 'utf8')); + const baseline = JSON.parse(await readFile(BASELINE_FILE, 'utf8')); + const { failures, rows } = evaluateReport(current, baseline); + + const report = [ + '# Comms Regression Report', + '', + `- Generated at: ${new Date().toISOString()}`, + `- Result: ${failures.length === 0 ? 'PASS' : 'FAIL'}`, + '', + '| Metric | Current | Threshold | Status |', + '|---|---:|---:|---|', + ...rows, + '', + ].join('\n'); + + await mkdir(OUTPUT_DIR, { recursive: true }); + await writeFile(REPORT_FILE, report); + console.log(report); + console.log(`\nWrote comparison report to ${REPORT_FILE}`); + + if (failures.length > 0) { + console.error('\nThreshold failures:\n- ' + failures.join('\n- ')); + process.exitCode = 1; + } +} + +const isEntrypoint = process.argv[1] && path.resolve(process.argv[1]) === path.resolve(new URL(import.meta.url).pathname); +if (isEntrypoint) { + main().catch((error) => { + console.error('[comms:compare] failed:', error); + process.exitCode = 1; + }); +} diff --git a/scripts/comms/datasets/gateway-restart-during-run.jsonl b/scripts/comms/datasets/gateway-restart-during-run.jsonl new file mode 100644 index 000000000..1f5591b05 --- /dev/null +++ b/scripts/comms/datasets/gateway-restart-during-run.jsonl @@ -0,0 +1,9 @@ +{"ts":0,"type":"gateway_event","runId":"run-restart-1","sessionKey":"agent:main:session-restart","seq":1,"state":"started","fanout":1} +{"ts":0.3,"type":"rpc","method":"chat.send","latencyMs":240,"timeout":false} +{"ts":0.6,"type":"gateway_reconnect"} +{"ts":0.8,"type":"gateway_event","runId":"run-restart-1","sessionKey":"agent:main:session-restart","seq":2,"state":"delta","fanout":1} +{"ts":1.1,"type":"gateway_event","runId":"run-restart-1","sessionKey":"agent:main:session-restart","seq":3,"state":"delta","fanout":1} +{"ts":1.4,"type":"history_load","sessionKey":"agent:main:session-restart","action":"start"} +{"ts":1.6,"type":"history_load","sessionKey":"agent:main:session-restart","action":"end"} +{"ts":2.0,"type":"gateway_event","runId":"run-restart-1","sessionKey":"agent:main:session-restart","seq":4,"state":"final","fanout":1} +{"ts":2.4,"type":"message","lost":false,"orderViolation":false} diff --git a/scripts/comms/datasets/happy-path-chat.jsonl b/scripts/comms/datasets/happy-path-chat.jsonl new file mode 100644 index 000000000..a208fde74 --- /dev/null +++ b/scripts/comms/datasets/happy-path-chat.jsonl @@ -0,0 +1,7 @@ +{"ts":0,"type":"gateway_event","runId":"run-happy-1","sessionKey":"agent:main:session-happy","seq":1,"state":"started","fanout":1} +{"ts":1,"type":"gateway_event","runId":"run-happy-1","sessionKey":"agent:main:session-happy","seq":2,"state":"delta","fanout":1} +{"ts":2,"type":"history_load","sessionKey":"agent:main:session-happy","action":"start"} +{"ts":2.2,"type":"history_load","sessionKey":"agent:main:session-happy","action":"end"} +{"ts":2.5,"type":"rpc","method":"chat.send","latencyMs":180,"timeout":false} +{"ts":3.0,"type":"gateway_event","runId":"run-happy-1","sessionKey":"agent:main:session-happy","seq":3,"state":"final","fanout":1} +{"ts":3.5,"type":"message","lost":false,"orderViolation":false} diff --git a/scripts/comms/datasets/history-overlap-guard.jsonl b/scripts/comms/datasets/history-overlap-guard.jsonl new file mode 100644 index 000000000..d90707a28 --- /dev/null +++ b/scripts/comms/datasets/history-overlap-guard.jsonl @@ -0,0 +1,8 @@ +{"ts":0,"type":"gateway_event","runId":"run-history-1","sessionKey":"agent:main:session-history","seq":1,"state":"started","fanout":1} +{"ts":0.2,"type":"history_load","sessionKey":"agent:main:session-history","action":"start"} +{"ts":0.4,"type":"history_load","sessionKey":"agent:main:session-history","action":"end"} +{"ts":0.8,"type":"history_load","sessionKey":"agent:main:session-history","action":"start"} +{"ts":1.0,"type":"history_load","sessionKey":"agent:main:session-history","action":"end"} +{"ts":1.4,"type":"rpc","method":"chat.history","latencyMs":95,"timeout":false} +{"ts":1.8,"type":"gateway_event","runId":"run-history-1","sessionKey":"agent:main:session-history","seq":2,"state":"final","fanout":1} +{"ts":2.0,"type":"message","lost":false,"orderViolation":false} diff --git a/scripts/comms/datasets/invalid-config-patch-recovered.jsonl b/scripts/comms/datasets/invalid-config-patch-recovered.jsonl new file mode 100644 index 000000000..ce513566e --- /dev/null +++ b/scripts/comms/datasets/invalid-config-patch-recovered.jsonl @@ -0,0 +1,9 @@ +{"ts":0,"type":"gateway_event","runId":"run-invalid-1","sessionKey":"agent:main:session-invalid","seq":1,"state":"started","fanout":1} +{"ts":0.2,"type":"rpc","method":"config.patch","latencyMs":110,"timeout":false} +{"ts":0.4,"type":"rpc","method":"config.patch","latencyMs":130,"timeout":false} +{"ts":0.7,"type":"gateway_reconnect"} +{"ts":1.0,"type":"history_load","sessionKey":"agent:main:session-invalid","action":"start"} +{"ts":1.3,"type":"history_load","sessionKey":"agent:main:session-invalid","action":"end"} +{"ts":1.7,"type":"gateway_event","runId":"run-invalid-1","sessionKey":"agent:main:session-invalid","seq":2,"state":"delta","fanout":1} +{"ts":2.1,"type":"gateway_event","runId":"run-invalid-1","sessionKey":"agent:main:session-invalid","seq":3,"state":"final","fanout":1} +{"ts":2.4,"type":"message","lost":false,"orderViolation":false} diff --git a/scripts/comms/datasets/multi-agent-channel-switch.jsonl b/scripts/comms/datasets/multi-agent-channel-switch.jsonl new file mode 100644 index 000000000..1a058e553 --- /dev/null +++ b/scripts/comms/datasets/multi-agent-channel-switch.jsonl @@ -0,0 +1,8 @@ +{"ts":0,"type":"gateway_event","runId":"run-a-main","sessionKey":"agent:main:session-1","seq":1,"state":"started","fanout":1} +{"ts":0.2,"type":"rpc","method":"chat.send","latencyMs":210,"timeout":false} +{"ts":0.5,"type":"gateway_event","runId":"run-a-main","sessionKey":"agent:main:session-1","seq":2,"state":"delta","fanout":1} +{"ts":0.8,"type":"gateway_event","runId":"run-a-team","sessionKey":"agent:team-a:session-2","seq":1,"state":"started","fanout":1} +{"ts":1.1,"type":"rpc","method":"chat.send","latencyMs":240,"timeout":false} +{"ts":1.4,"type":"gateway_event","runId":"run-a-main","sessionKey":"agent:main:session-1","seq":3,"state":"final","fanout":1} +{"ts":1.8,"type":"gateway_event","runId":"run-a-team","sessionKey":"agent:team-a:session-2","seq":2,"state":"final","fanout":1} +{"ts":2.1,"type":"message","lost":false,"orderViolation":false} diff --git a/scripts/comms/datasets/network-degraded.jsonl b/scripts/comms/datasets/network-degraded.jsonl new file mode 100644 index 000000000..9177cb35a --- /dev/null +++ b/scripts/comms/datasets/network-degraded.jsonl @@ -0,0 +1,9 @@ +{"ts":0,"type":"gateway_event","runId":"run-net-1","sessionKey":"agent:main:session-net","seq":1,"state":"started","fanout":1} +{"ts":0.2,"type":"rpc","method":"chat.send","latencyMs":420,"timeout":false} +{"ts":0.8,"type":"rpc","method":"chat.history","latencyMs":820,"timeout":false} +{"ts":1.0,"type":"history_load","sessionKey":"agent:main:session-net","action":"start"} +{"ts":1.5,"type":"history_load","sessionKey":"agent:main:session-net","action":"end"} +{"ts":1.6,"type":"gateway_reconnect"} +{"ts":2.1,"type":"gateway_event","runId":"run-net-1","sessionKey":"agent:main:session-net","seq":2,"state":"delta","fanout":1} +{"ts":2.3,"type":"gateway_event","runId":"run-net-1","sessionKey":"agent:main:session-net","seq":3,"state":"final","fanout":1} +{"ts":2.8,"type":"message","lost":false,"orderViolation":false} diff --git a/scripts/comms/replay.mjs b/scripts/comms/replay.mjs new file mode 100644 index 000000000..c6697b7c4 --- /dev/null +++ b/scripts/comms/replay.mjs @@ -0,0 +1,176 @@ +import { mkdir, readdir, readFile, writeFile } from 'node:fs/promises'; +import path from 'node:path'; +import process from 'node:process'; + +const ROOT = path.resolve(path.dirname(new URL(import.meta.url).pathname), '..', '..'); +const DATASET_DIR = path.join(ROOT, 'scripts/comms/datasets'); +const OUTPUT_DIR = path.join(ROOT, 'artifacts/comms'); +const OUTPUT_FILE = path.join(OUTPUT_DIR, 'current-metrics.json'); + +export function percentile(values, p) { + if (values.length === 0) return 0; + const sorted = [...values].sort((a, b) => a - b); + const idx = Math.min(sorted.length - 1, Math.ceil((p / 100) * sorted.length) - 1); + return sorted[idx]; +} + +export function dedupeKey(event) { + if (event.type !== 'gateway_event') return null; + const runId = event.runId ?? ''; + const sessionKey = event.sessionKey ?? ''; + const seq = event.seq ?? ''; + const state = event.state ?? ''; + if (!runId && !sessionKey && !seq && !state) return null; + return `${runId}|${sessionKey}|${seq}|${state}`; +} + +export function calculateScenarioMetrics(events) { + let totalGatewayEvents = 0; + let uniqueGatewayEvents = 0; + let fanoutTotal = 0; + let duplicateGatewayEvents = 0; + let gatewayReconnectCount = 0; + let messageLossCount = 0; + let messageOrderViolationCount = 0; + let rpcTimeoutCount = 0; + const rpcLatencies = []; + const dedupeSet = new Set(); + const historyInFlight = new Map(); + let historyInflightMax = 0; + let historyLoadCount = 0; + + const sorted = [...events].sort((a, b) => (a.ts ?? 0) - (b.ts ?? 0)); + const startTs = sorted.length > 0 ? (sorted[0].ts ?? 0) : 0; + const endTs = sorted.length > 0 ? (sorted[sorted.length - 1].ts ?? 0) : 0; + const durationSec = Math.max(1, endTs - startTs); + + for (const event of sorted) { + if (event.type === 'gateway_event') { + totalGatewayEvents += 1; + fanoutTotal += Number(event.fanout ?? 1); + const key = dedupeKey(event); + if (!key || !dedupeSet.has(key)) { + uniqueGatewayEvents += 1; + if (key) dedupeSet.add(key); + } else { + duplicateGatewayEvents += 1; + } + continue; + } + + if (event.type === 'history_load') { + const sessionKey = String(event.sessionKey ?? 'unknown'); + if (event.action === 'start') { + const next = (historyInFlight.get(sessionKey) ?? 0) + 1; + historyInFlight.set(sessionKey, next); + historyInflightMax = Math.max(historyInflightMax, next); + historyLoadCount += 1; + } else if (event.action === 'end') { + const current = historyInFlight.get(sessionKey) ?? 0; + historyInFlight.set(sessionKey, Math.max(0, current - 1)); + } + continue; + } + + if (event.type === 'rpc') { + const latency = Number(event.latencyMs ?? 0); + if (latency > 0) rpcLatencies.push(latency); + if (event.timeout === true) rpcTimeoutCount += 1; + continue; + } + + if (event.type === 'gateway_reconnect') { + gatewayReconnectCount += 1; + continue; + } + + if (event.type === 'message') { + if (event.lost === true) messageLossCount += 1; + if (event.orderViolation === true) messageOrderViolationCount += 1; + } + } + + return { + duplicate_event_rate: totalGatewayEvents > 0 ? duplicateGatewayEvents / totalGatewayEvents : 0, + event_fanout_ratio: uniqueGatewayEvents > 0 ? fanoutTotal / uniqueGatewayEvents : 0, + history_inflight_max: historyInflightMax, + history_load_qps: historyLoadCount / durationSec, + rpc_p50_ms: percentile(rpcLatencies, 50), + rpc_p95_ms: percentile(rpcLatencies, 95), + rpc_timeout_rate: rpcLatencies.length > 0 ? rpcTimeoutCount / rpcLatencies.length : 0, + gateway_reconnect_count: gatewayReconnectCount, + message_loss_count: messageLossCount, + message_order_violation_count: messageOrderViolationCount, + _meta: { + duration_sec: durationSec, + total_gateway_events: totalGatewayEvents, + unique_gateway_events: uniqueGatewayEvents, + total_rpc_calls: rpcLatencies.length, + }, + }; +} + +export function aggregateMetrics(metricsList) { + if (metricsList.length === 0) { + return calculateScenarioMetrics([]); + } + const sum = (key) => metricsList.reduce((acc, item) => acc + Number(item[key] ?? 0), 0); + return { + duplicate_event_rate: sum('duplicate_event_rate') / metricsList.length, + event_fanout_ratio: sum('event_fanout_ratio') / metricsList.length, + history_inflight_max: Math.max(...metricsList.map((m) => Number(m.history_inflight_max ?? 0))), + history_load_qps: sum('history_load_qps') / metricsList.length, + rpc_p50_ms: sum('rpc_p50_ms') / metricsList.length, + rpc_p95_ms: sum('rpc_p95_ms') / metricsList.length, + rpc_timeout_rate: sum('rpc_timeout_rate') / metricsList.length, + gateway_reconnect_count: Math.round(sum('gateway_reconnect_count')), + message_loss_count: Math.round(sum('message_loss_count')), + message_order_violation_count: Math.round(sum('message_order_violation_count')), + }; +} + +export async function loadScenario(fileName) { + const fullPath = path.join(DATASET_DIR, fileName); + const raw = await readFile(fullPath, 'utf8'); + const lines = raw.split('\n').map((line) => line.trim()).filter(Boolean); + return lines.map((line) => JSON.parse(line)); +} + +export async function main() { + const argScenario = process.argv.find((arg) => arg.startsWith('--scenario='))?.split('=')[1] ?? 'all'; + const files = (await readdir(DATASET_DIR)).filter((name) => name.endsWith('.jsonl')).sort(); + const selectedFiles = argScenario === 'all' + ? files + : files.filter((name) => name === `${argScenario}.jsonl`); + + if (selectedFiles.length === 0) { + throw new Error(`No dataset found for scenario "${argScenario}"`); + } + + const scenarios = {}; + for (const fileName of selectedFiles) { + const scenarioName = fileName.replace(/\.jsonl$/, ''); + const events = await loadScenario(fileName); + scenarios[scenarioName] = calculateScenarioMetrics(events); + } + + const aggregate = aggregateMetrics(Object.values(scenarios)); + const output = { + generated_at: new Date().toISOString(), + scenario: argScenario, + scenarios, + aggregate, + }; + + await mkdir(OUTPUT_DIR, { recursive: true }); + await writeFile(OUTPUT_FILE, JSON.stringify(output, null, 2)); + console.log(`Wrote comms replay metrics to ${OUTPUT_FILE}`); +} + +const isEntrypoint = process.argv[1] && path.resolve(process.argv[1]) === path.resolve(new URL(import.meta.url).pathname); +if (isEntrypoint) { + main().catch((error) => { + console.error('[comms:replay] failed:', error); + process.exitCode = 1; + }); +} diff --git a/src/lib/api-client.ts b/src/lib/api-client.ts index 31921dba3..516376555 100644 --- a/src/lib/api-client.ts +++ b/src/lib/api-client.ts @@ -551,6 +551,7 @@ export function createGatewayHttpTransportInvoker( if (typeof method !== 'string') { throw new Error('gateway:rpc requires method string'); } + validateGatewayRpcParams(method, params); const timeoutMs = typeof timeoutOverride === 'number' && timeoutOverride > 0 @@ -857,6 +858,7 @@ export function createGatewayWsTransportInvoker(options: GatewayWsTransportOptio if (typeof method !== 'string') { throw new Error('gateway:rpc requires method string'); } + validateGatewayRpcParams(method, params); const requestTimeoutMs = typeof timeoutOverride === 'number' && timeoutOverride > 0 @@ -887,6 +889,17 @@ export function createGatewayWsTransportInvoker(options: GatewayWsTransportOptio }; } +function validateGatewayRpcParams(method: string, params: unknown): void { + if (method !== 'config.patch') return; + if (!params || typeof params !== 'object' || Array.isArray(params)) { + throw new Error('gateway:rpc config.patch requires object params'); + } + const patch = (params as Record).patch; + if (!patch || typeof patch !== 'object' || Array.isArray(patch)) { + throw new Error('gateway:rpc config.patch requires object patch'); + } +} + let defaultTransportsInitialized = false; export function initializeDefaultTransports(): void { diff --git a/src/lib/host-api.ts b/src/lib/host-api.ts index 8c68cdd42..b5a81bc48 100644 --- a/src/lib/host-api.ts +++ b/src/lib/host-api.ts @@ -129,6 +129,14 @@ function shouldFallbackToBrowser(message: string): boolean { || normalized.includes('window is not defined'); } +function allowLocalhostFallback(): boolean { + try { + return window.localStorage.getItem('clawx:allow-localhost-fallback') === '1'; + } catch { + return false; + } +} + export async function hostApiFetch(path: string, init?: RequestInit): Promise { const startedAt = Date.now(); const method = init?.method || 'GET'; @@ -160,6 +168,17 @@ export async function hostApiFetch(path: string, init?: RequestInit): Promise if (!shouldFallbackToBrowser(message)) { throw normalized; } + if (!allowLocalhostFallback()) { + trackUiEvent('hostapi.fetch_error', { + path, + method, + source: 'ipc-proxy', + durationMs: Date.now() - startedAt, + message: 'localhost fallback blocked by policy', + code: 'CHANNEL_UNAVAILABLE', + }); + throw normalized; + } } // Browser-only fallback (non-Electron environments). diff --git a/src/stores/chat.ts b/src/stores/chat.ts index 726a95ed1..c5591e9dc 100644 --- a/src/stores/chat.ts +++ b/src/stores/chat.ts @@ -140,6 +140,15 @@ let _historyPollTimer: ReturnType | null = null; // error (e.g. "terminated"), it may retry internally and recover. We wait // before committing the error to give the recovery path a chance. let _errorRecoveryTimer: ReturnType | null = null; +let _loadSessionsInFlight: Promise | null = null; +let _lastLoadSessionsAt = 0; +const _historyLoadInFlight = new Map>(); +const _lastHistoryLoadAtBySession = new Map(); +const SESSION_LOAD_MIN_INTERVAL_MS = 1_200; +const HISTORY_LOAD_MIN_INTERVAL_MS = 800; +const HISTORY_POLL_SILENCE_WINDOW_MS = 2_500; +const CHAT_EVENT_DEDUPE_TTL_MS = 30_000; +const _chatEventDedupe = new Map(); function clearErrorRecoveryTimer(): void { if (_errorRecoveryTimer) { @@ -155,6 +164,46 @@ function clearHistoryPoll(): void { } } +function pruneChatEventDedupe(now: number): void { + for (const [key, ts] of _chatEventDedupe.entries()) { + if (now - ts > CHAT_EVENT_DEDUPE_TTL_MS) { + _chatEventDedupe.delete(key); + } + } +} + +function buildChatEventDedupeKey(eventState: string, event: Record): string | null { + const runId = event.runId != null ? String(event.runId) : ''; + const sessionKey = event.sessionKey != null ? String(event.sessionKey) : ''; + const seq = event.seq != null ? String(event.seq) : ''; + if (runId || sessionKey || seq || eventState) { + return [runId, sessionKey, seq, eventState].join('|'); + } + const msg = (event.message && typeof event.message === 'object') + ? event.message as Record + : null; + if (msg) { + const messageId = msg.id != null ? String(msg.id) : ''; + const stopReason = msg.stopReason ?? msg.stop_reason; + if (messageId || stopReason) { + return `msg|${messageId}|${String(stopReason ?? '')}|${eventState}`; + } + } + return null; +} + +function isDuplicateChatEvent(eventState: string, event: Record): boolean { + const key = buildChatEventDedupeKey(eventState, event); + if (!key) return false; + const now = Date.now(); + pruneChatEventDedupe(now); + if (_chatEventDedupe.has(key)) { + return true; + } + _chatEventDedupe.set(key, now); + return false; +} + const DEFAULT_CANONICAL_PREFIX = 'agent:main'; const DEFAULT_SESSION_KEY = `${DEFAULT_CANONICAL_PREFIX}:main`; @@ -1040,118 +1089,139 @@ export const useChatStore = create((set, get) => ({ // ── Load sessions via sessions.list ── loadSessions: async () => { - try { - const data = await useGatewayStore.getState().rpc>('sessions.list', {}); - if (data) { - const rawSessions = Array.isArray(data.sessions) ? data.sessions : []; - const sessions: ChatSession[] = rawSessions.map((s: Record) => ({ - key: String(s.key || ''), - label: s.label ? String(s.label) : undefined, - displayName: s.displayName ? String(s.displayName) : undefined, - thinkingLevel: s.thinkingLevel ? String(s.thinkingLevel) : undefined, - model: s.model ? String(s.model) : undefined, - updatedAt: parseSessionUpdatedAtMs(s.updatedAt), - })).filter((s: ChatSession) => s.key); + const now = Date.now(); + if (_loadSessionsInFlight) { + await _loadSessionsInFlight; + return; + } + if (now - _lastLoadSessionsAt < SESSION_LOAD_MIN_INTERVAL_MS) { + return; + } - const canonicalBySuffix = new Map(); - for (const session of sessions) { - if (!session.key.startsWith('agent:')) continue; - const parts = session.key.split(':'); - if (parts.length < 3) continue; - const suffix = parts.slice(2).join(':'); - if (suffix && !canonicalBySuffix.has(suffix)) { - canonicalBySuffix.set(suffix, session.key); + _loadSessionsInFlight = (async () => { + try { + const data = await useGatewayStore.getState().rpc>('sessions.list', {}); + if (data) { + const rawSessions = Array.isArray(data.sessions) ? data.sessions : []; + const sessions: ChatSession[] = rawSessions.map((s: Record) => ({ + key: String(s.key || ''), + label: s.label ? String(s.label) : undefined, + displayName: s.displayName ? String(s.displayName) : undefined, + thinkingLevel: s.thinkingLevel ? String(s.thinkingLevel) : undefined, + model: s.model ? String(s.model) : undefined, + updatedAt: parseSessionUpdatedAtMs(s.updatedAt), + })).filter((s: ChatSession) => s.key); + + const canonicalBySuffix = new Map(); + for (const session of sessions) { + if (!session.key.startsWith('agent:')) continue; + const parts = session.key.split(':'); + if (parts.length < 3) continue; + const suffix = parts.slice(2).join(':'); + if (suffix && !canonicalBySuffix.has(suffix)) { + canonicalBySuffix.set(suffix, session.key); + } } - } - // Deduplicate: if both short and canonical existed, keep canonical only - const seen = new Set(); - const dedupedSessions = sessions.filter((s) => { - if (!s.key.startsWith('agent:') && canonicalBySuffix.has(s.key)) return false; - if (seen.has(s.key)) return false; - seen.add(s.key); - return true; - }); + // Deduplicate: if both short and canonical existed, keep canonical only + const seen = new Set(); + const dedupedSessions = sessions.filter((s) => { + if (!s.key.startsWith('agent:') && canonicalBySuffix.has(s.key)) return false; + if (seen.has(s.key)) return false; + seen.add(s.key); + return true; + }); - const { currentSessionKey, sessions: localSessions } = get(); - let nextSessionKey = currentSessionKey || DEFAULT_SESSION_KEY; - if (!nextSessionKey.startsWith('agent:')) { - const canonicalMatch = canonicalBySuffix.get(nextSessionKey); - if (canonicalMatch) { - nextSessionKey = canonicalMatch; + const { currentSessionKey, sessions: localSessions } = get(); + let nextSessionKey = currentSessionKey || DEFAULT_SESSION_KEY; + if (!nextSessionKey.startsWith('agent:')) { + const canonicalMatch = canonicalBySuffix.get(nextSessionKey); + if (canonicalMatch) { + nextSessionKey = canonicalMatch; + } } - } - if (!dedupedSessions.find((s) => s.key === nextSessionKey) && dedupedSessions.length > 0) { - // Preserve only locally-created pending sessions. On initial boot the - // default ghost key (`agent:main:main`) should yield to real history. - const hasLocalPendingSession = localSessions.some((session) => session.key === nextSessionKey); - if (!hasLocalPendingSession) { - nextSessionKey = dedupedSessions[0].key; + if (!dedupedSessions.find((s) => s.key === nextSessionKey) && dedupedSessions.length > 0) { + // Preserve only locally-created pending sessions. On initial boot the + // default ghost key (`agent:main:main`) should yield to real history. + const hasLocalPendingSession = localSessions.some((session) => session.key === nextSessionKey); + if (!hasLocalPendingSession) { + nextSessionKey = dedupedSessions[0].key; + } } - } - const sessionsWithCurrent = !dedupedSessions.find((s) => s.key === nextSessionKey) && nextSessionKey - ? [ - ...dedupedSessions, - { key: nextSessionKey, displayName: nextSessionKey }, - ] - : dedupedSessions; + const sessionsWithCurrent = !dedupedSessions.find((s) => s.key === nextSessionKey) && nextSessionKey + ? [ + ...dedupedSessions, + { key: nextSessionKey, displayName: nextSessionKey }, + ] + : dedupedSessions; - const discoveredActivity = Object.fromEntries( - sessionsWithCurrent - .filter((session) => typeof session.updatedAt === 'number' && Number.isFinite(session.updatedAt)) - .map((session) => [session.key, session.updatedAt!]), - ); - - set((state) => ({ - sessions: sessionsWithCurrent, - currentSessionKey: nextSessionKey, - currentAgentId: getAgentIdFromSessionKey(nextSessionKey), - sessionLastActivity: { - ...state.sessionLastActivity, - ...discoveredActivity, - }, - })); - - if (currentSessionKey !== nextSessionKey) { - get().loadHistory(); - } - - // Background: fetch first user message for every non-main session to populate labels upfront. - // Uses a small limit so it's cheap; runs in parallel and doesn't block anything. - const sessionsToLabel = sessionsWithCurrent.filter((s) => !s.key.endsWith(':main')); - if (sessionsToLabel.length > 0) { - void Promise.all( - sessionsToLabel.map(async (session) => { - try { - const r = await useGatewayStore.getState().rpc>( - 'chat.history', - { sessionKey: session.key, limit: 1000 }, - ); - const msgs = Array.isArray(r.messages) ? r.messages as RawMessage[] : []; - const firstUser = msgs.find((m) => m.role === 'user'); - const lastMsg = msgs[msgs.length - 1]; - set((s) => { - const next: Partial = {}; - if (firstUser) { - const labelText = getMessageText(firstUser.content).trim(); - if (labelText) { - const truncated = labelText.length > 50 ? `${labelText.slice(0, 50)}…` : labelText; - next.sessionLabels = { ...s.sessionLabels, [session.key]: truncated }; - } - } - if (lastMsg?.timestamp) { - next.sessionLastActivity = { ...s.sessionLastActivity, [session.key]: toMs(lastMsg.timestamp) }; - } - return next; - }); - } catch { /* ignore per-session errors */ } - }), + const discoveredActivity = Object.fromEntries( + sessionsWithCurrent + .filter((session) => typeof session.updatedAt === 'number' && Number.isFinite(session.updatedAt)) + .map((session) => [session.key, session.updatedAt!]), ); + + set((state) => ({ + sessions: sessionsWithCurrent, + currentSessionKey: nextSessionKey, + currentAgentId: getAgentIdFromSessionKey(nextSessionKey), + sessionLastActivity: { + ...state.sessionLastActivity, + ...discoveredActivity, + }, + })); + + if (currentSessionKey !== nextSessionKey) { + void get().loadHistory(); + } + + // Background: fetch first user message for every non-main session to populate labels upfront. + // Uses a small limit so it's cheap; runs in parallel and doesn't block anything. + const sessionsToLabel = sessionsWithCurrent.filter((s) => !s.key.endsWith(':main')); + if (sessionsToLabel.length > 0) { + void Promise.all( + sessionsToLabel.map(async (session) => { + try { + const r = await useGatewayStore.getState().rpc>( + 'chat.history', + { sessionKey: session.key, limit: 1000 }, + ); + const msgs = Array.isArray(r.messages) ? r.messages as RawMessage[] : []; + const firstUser = msgs.find((m) => m.role === 'user'); + const lastMsg = msgs[msgs.length - 1]; + set((s) => { + const next: Partial = {}; + if (firstUser) { + const labelText = getMessageText(firstUser.content).trim(); + if (labelText) { + const truncated = labelText.length > 50 ? `${labelText.slice(0, 50)}…` : labelText; + next.sessionLabels = { ...s.sessionLabels, [session.key]: truncated }; + } + } + if (lastMsg?.timestamp) { + next.sessionLastActivity = { ...s.sessionLastActivity, [session.key]: toMs(lastMsg.timestamp) }; + } + return next; + }); + } catch { + // ignore per-session errors + } + }), + ); + } } + } catch (err) { + console.warn('Failed to load sessions:', err); + } finally { + _lastLoadSessionsAt = Date.now(); } - } catch (err) { - console.warn('Failed to load sessions:', err); + })(); + + try { + await _loadSessionsInFlight; + } finally { + _loadSessionsInFlight = null; } }, @@ -1289,9 +1359,21 @@ export const useChatStore = create((set, get) => ({ loadHistory: async (quiet = false) => { const { currentSessionKey } = get(); + const existingLoad = _historyLoadInFlight.get(currentSessionKey); + if (existingLoad) { + await existingLoad; + return; + } + + const lastLoadAt = _lastHistoryLoadAtBySession.get(currentSessionKey) || 0; + if (quiet && Date.now() - lastLoadAt < HISTORY_LOAD_MIN_INTERVAL_MS) { + return; + } + if (!quiet) set({ loading: true, error: null }); - const applyLoadedMessages = (rawMessages: RawMessage[], thinkingLevel: string | null) => { + const loadPromise = (async () => { + const applyLoadedMessages = (rawMessages: RawMessage[], thinkingLevel: string | null) => { // Before filtering: attach images/files from tool_result messages to the next assistant message const messagesWithToolImages = enrichWithToolResultFiles(rawMessages); const filteredMessages = messagesWithToolImages.filter((msg) => !isToolResultRole(msg.role)); @@ -1395,22 +1477,31 @@ export const useChatStore = create((set, get) => ({ set({ sending: false, activeRunId: null, pendingFinal: false }); } } - }; + }; - try { - const data = await useGatewayStore.getState().rpc>( - 'chat.history', - { sessionKey: currentSessionKey, limit: 200 }, - ); - if (data) { - let rawMessages = Array.isArray(data.messages) ? data.messages as RawMessage[] : []; - const thinkingLevel = data.thinkingLevel ? String(data.thinkingLevel) : null; - if (rawMessages.length === 0 && isCronSessionKey(currentSessionKey)) { - rawMessages = await loadCronFallbackMessages(currentSessionKey, 200); + try { + const data = await useGatewayStore.getState().rpc>( + 'chat.history', + { sessionKey: currentSessionKey, limit: 200 }, + ); + if (data) { + let rawMessages = Array.isArray(data.messages) ? data.messages as RawMessage[] : []; + const thinkingLevel = data.thinkingLevel ? String(data.thinkingLevel) : null; + if (rawMessages.length === 0 && isCronSessionKey(currentSessionKey)) { + rawMessages = await loadCronFallbackMessages(currentSessionKey, 200); + } + + applyLoadedMessages(rawMessages, thinkingLevel); + } else { + const fallbackMessages = await loadCronFallbackMessages(currentSessionKey, 200); + if (fallbackMessages.length > 0) { + applyLoadedMessages(fallbackMessages, null); + } else { + set({ messages: [], loading: false }); + } } - - applyLoadedMessages(rawMessages, thinkingLevel); - } else { + } catch (err) { + console.warn('Failed to load chat history:', err); const fallbackMessages = await loadCronFallbackMessages(currentSessionKey, 200); if (fallbackMessages.length > 0) { applyLoadedMessages(fallbackMessages, null); @@ -1418,13 +1509,16 @@ export const useChatStore = create((set, get) => ({ set({ messages: [], loading: false }); } } - } catch (err) { - console.warn('Failed to load chat history:', err); - const fallbackMessages = await loadCronFallbackMessages(currentSessionKey, 200); - if (fallbackMessages.length > 0) { - applyLoadedMessages(fallbackMessages, null); - } else { - set({ messages: [], loading: false }); + })(); + + _historyLoadInFlight.set(currentSessionKey, loadPromise); + try { + await loadPromise; + } finally { + _lastHistoryLoadAtBySession.set(currentSessionKey, Date.now()); + const active = _historyLoadInFlight.get(currentSessionKey); + if (active === loadPromise) { + _historyLoadInFlight.delete(currentSessionKey); } } }, @@ -1501,6 +1595,10 @@ export const useChatStore = create((set, get) => ({ _historyPollTimer = setTimeout(pollHistory, POLL_INTERVAL); return; } + if (Date.now() - _lastChatEventAt < HISTORY_POLL_SILENCE_WINDOW_MS) { + _historyPollTimer = setTimeout(pollHistory, POLL_INTERVAL); + return; + } state.loadHistory(true); _historyPollTimer = setTimeout(pollHistory, POLL_INTERVAL); }; @@ -1635,6 +1733,8 @@ export const useChatStore = create((set, get) => ({ // Only process events for the active run (or if no active run set) if (activeRunId && runId && runId !== activeRunId) return; + if (isDuplicateChatEvent(eventState, event)) return; + _lastChatEventAt = Date.now(); // Defensive: if state is missing but we have a message, try to infer state. diff --git a/src/stores/gateway.ts b/src/stores/gateway.ts index d65e35e65..f07322836 100644 --- a/src/stores/gateway.ts +++ b/src/stores/gateway.ts @@ -10,6 +10,12 @@ import type { GatewayStatus } from '../types/gateway'; let gatewayInitPromise: Promise | null = null; let gatewayEventUnsubscribers: Array<() => void> | null = null; +const gatewayEventDedupe = new Map(); +const GATEWAY_EVENT_DEDUPE_TTL_MS = 30_000; +const LOAD_SESSIONS_MIN_INTERVAL_MS = 1_200; +const LOAD_HISTORY_MIN_INTERVAL_MS = 800; +let lastLoadSessionsAt = 0; +let lastLoadHistoryAt = 0; interface GatewayHealth { ok: boolean; @@ -32,6 +38,66 @@ interface GatewayState { clearError: () => void; } +function pruneGatewayEventDedupe(now: number): void { + for (const [key, ts] of gatewayEventDedupe) { + if (now - ts > GATEWAY_EVENT_DEDUPE_TTL_MS) { + gatewayEventDedupe.delete(key); + } + } +} + +function buildGatewayEventDedupeKey(event: Record): string | null { + const runId = event.runId != null ? String(event.runId) : ''; + const sessionKey = event.sessionKey != null ? String(event.sessionKey) : ''; + const seq = event.seq != null ? String(event.seq) : ''; + const state = event.state != null ? String(event.state) : ''; + if (runId || sessionKey || seq || state) { + return [runId, sessionKey, seq, state].join('|'); + } + const message = event.message; + if (message && typeof message === 'object') { + const msg = message as Record; + const messageId = msg.id != null ? String(msg.id) : ''; + const stopReason = msg.stopReason ?? msg.stop_reason; + if (messageId || stopReason) { + return `msg|${messageId}|${String(stopReason ?? '')}`; + } + } + return null; +} + +function shouldProcessGatewayEvent(event: Record): boolean { + const key = buildGatewayEventDedupeKey(event); + if (!key) return true; + const now = Date.now(); + pruneGatewayEventDedupe(now); + if (gatewayEventDedupe.has(key)) { + return false; + } + gatewayEventDedupe.set(key, now); + return true; +} + +function maybeLoadSessions( + state: { loadSessions: () => Promise }, + force = false, +): void { + const now = Date.now(); + if (!force && now - lastLoadSessionsAt < LOAD_SESSIONS_MIN_INTERVAL_MS) return; + lastLoadSessionsAt = now; + void state.loadSessions(); +} + +function maybeLoadHistory( + state: { loadHistory: (quiet?: boolean) => Promise }, + force = false, +): void { + const now = Date.now(); + if (!force && now - lastLoadHistoryAt < LOAD_HISTORY_MIN_INTERVAL_MS) return; + lastLoadHistoryAt = now; + void state.loadHistory(true); +} + function handleGatewayNotification(notification: { method?: string; params?: Record } | undefined): void { const payload = notification; if (!payload || payload.method !== 'agent' || !payload.params || typeof payload.params !== 'object') { @@ -53,11 +119,13 @@ function handleGatewayNotification(notification: { method?: string; params?: Rec state: p.state ?? data.state, message: p.message ?? data.message, }; - import('./chat') - .then(({ useChatStore }) => { - useChatStore.getState().handleChatEvent(normalizedEvent); - }) - .catch(() => {}); + if (shouldProcessGatewayEvent(normalizedEvent)) { + import('./chat') + .then(({ useChatStore }) => { + useChatStore.getState().handleChatEvent(normalizedEvent); + }) + .catch(() => {}); + } } const runId = p.runId ?? data.runId; @@ -71,7 +139,7 @@ function handleGatewayNotification(notification: { method?: string; params?: Rec resolvedSessionKey !== state.currentSessionKey || !state.sessions.some((session) => session.key === resolvedSessionKey); if (shouldRefreshSessions) { - void state.loadSessions(); + maybeLoadSessions(state, true); } state.handleChatEvent({ @@ -93,14 +161,14 @@ function handleGatewayNotification(notification: { method?: string; params?: Rec || !state.sessions.some((session) => session.key === resolvedSessionKey) ); if (shouldRefreshSessions) { - void state.loadSessions(); + maybeLoadSessions(state); } const matchesCurrentSession = resolvedSessionKey == null || resolvedSessionKey === state.currentSessionKey; const matchesActiveRun = runId != null && state.activeRunId != null && String(runId) === state.activeRunId; if (matchesCurrentSession || matchesActiveRun) { - void state.loadHistory(true); + maybeLoadHistory(state); } if ((matchesCurrentSession || matchesActiveRun) && state.sending) { useChatStore.setState({ @@ -123,15 +191,18 @@ function handleGatewayChatMessage(data: unknown): void { : chatData; if (payload.state) { + if (!shouldProcessGatewayEvent(payload)) return; useChatStore.getState().handleChatEvent(payload); return; } - useChatStore.getState().handleChatEvent({ + const normalized = { state: 'final', message: payload, runId: chatData.runId ?? payload.runId, - }); + }; + if (!shouldProcessGatewayEvent(normalized)) return; + useChatStore.getState().handleChatEvent(normalized); }).catch(() => {}); } diff --git a/src/stores/settings.ts b/src/stores/settings.ts index bfe9abe1b..d1403d555 100644 --- a/src/stores/settings.ts +++ b/src/stores/settings.ts @@ -161,7 +161,13 @@ export const useSettingsStore = create()( setAutoCheckUpdate: (autoCheckUpdate) => set({ autoCheckUpdate }), setAutoDownloadUpdate: (autoDownloadUpdate) => set({ autoDownloadUpdate }), setSidebarCollapsed: (sidebarCollapsed) => set({ sidebarCollapsed }), - setDevModeUnlocked: (devModeUnlocked) => set({ devModeUnlocked }), + setDevModeUnlocked: (devModeUnlocked) => { + set({ devModeUnlocked }); + void hostApiFetch('/api/settings/devModeUnlocked', { + method: 'PUT', + body: JSON.stringify({ value: devModeUnlocked }), + }).catch(() => { }); + }, markSetupComplete: () => set({ setupComplete: true }), resetSettings: () => set(defaultSettings), }), diff --git a/tests/unit/api-client.test.ts b/tests/unit/api-client.test.ts index aea400ac6..f432e2a18 100644 --- a/tests/unit/api-client.test.ts +++ b/tests/unit/api-client.test.ts @@ -244,4 +244,24 @@ describe('api-client', () => { expect(result.success).toBe(true); expect(result.result.channels[0].id).toBe('telegram-default'); }); + + it('rejects invalid config.patch params before gateway:httpProxy call', async () => { + const invoke = vi.mocked(window.electron.ipcRenderer.invoke); + const invoker = createGatewayHttpTransportInvoker(); + + await expect(invoker('gateway:rpc', ['config.patch', 'abc'])).rejects.toThrow( + 'gateway:rpc config.patch requires object params', + ); + expect(invoke).not.toHaveBeenCalled(); + }); + + it('rejects invalid config.patch.patch before gateway:httpProxy call', async () => { + const invoke = vi.mocked(window.electron.ipcRenderer.invoke); + const invoker = createGatewayHttpTransportInvoker(); + + await expect(invoker('gateway:rpc', ['config.patch', { patch: 'abc' }])).rejects.toThrow( + 'gateway:rpc config.patch requires object patch', + ); + expect(invoke).not.toHaveBeenCalled(); + }); }); diff --git a/tests/unit/comms-scripts.test.ts b/tests/unit/comms-scripts.test.ts new file mode 100644 index 000000000..51287eb68 --- /dev/null +++ b/tests/unit/comms-scripts.test.ts @@ -0,0 +1,62 @@ +import { describe, expect, it } from 'vitest'; + +import { + aggregateMetrics, + calculateScenarioMetrics, +} from '../../scripts/comms/replay.mjs'; +import { evaluateReport } from '../../scripts/comms/compare.mjs'; + +function buildPassingScenarioMetrics() { + return { + duplicate_event_rate: 0, + event_fanout_ratio: 1, + history_inflight_max: 1, + history_load_qps: 0.3, + rpc_p50_ms: 100, + rpc_p95_ms: 150, + rpc_timeout_rate: 0, + gateway_reconnect_count: 0, + message_loss_count: 0, + message_order_violation_count: 0, + }; +} + +describe('comms scripts', () => { + it('computes scenario metrics with dedupe and inflight tracking', () => { + const metrics = calculateScenarioMetrics([ + { ts: 0, type: 'gateway_event', runId: 'r1', sessionKey: 's1', seq: 1, state: 'started', fanout: 1 }, + { ts: 0.2, type: 'gateway_event', runId: 'r1', sessionKey: 's1', seq: 1, state: 'started', fanout: 1 }, + { ts: 0.5, type: 'history_load', sessionKey: 's1', action: 'start' }, + { ts: 0.7, type: 'history_load', sessionKey: 's1', action: 'end' }, + { ts: 1.0, type: 'rpc', latencyMs: 120, timeout: false }, + { ts: 1.5, type: 'message', lost: false, orderViolation: false }, + ]); + + expect(metrics.duplicate_event_rate).toBeCloseTo(0.5, 6); + expect(metrics.history_inflight_max).toBe(1); + expect(metrics.rpc_p95_ms).toBe(120); + }); + + it('aggregates multiple scenario metrics deterministically', () => { + const aggregate = aggregateMetrics([ + { ...buildPassingScenarioMetrics(), rpc_p95_ms: 200 }, + { ...buildPassingScenarioMetrics(), rpc_p95_ms: 400 }, + ]); + expect(aggregate.rpc_p95_ms).toBe(300); + expect(aggregate.history_inflight_max).toBe(1); + }); + + it('fails report evaluation when required scenarios are missing', () => { + const passing = buildPassingScenarioMetrics(); + const current = { + aggregate: passing, + scenarios: { + 'happy-path-chat': passing, + }, + }; + const baseline = { aggregate: passing }; + const result = evaluateReport(current, baseline); + + expect(result.failures.some((f) => f.includes('missing scenario'))).toBe(true); + }); +}); diff --git a/tests/unit/host-api.test.ts b/tests/unit/host-api.test.ts index be703a977..916bbff2d 100644 --- a/tests/unit/host-api.test.ts +++ b/tests/unit/host-api.test.ts @@ -9,6 +9,7 @@ vi.mock('@/lib/api-client', () => ({ describe('host-api', () => { beforeEach(() => { vi.resetAllMocks(); + window.localStorage.removeItem('clawx:allow-localhost-fallback'); }); it('uses IPC proxy and returns unified envelope json', async () => { @@ -51,6 +52,7 @@ describe('host-api', () => { json: async () => ({ fallback: true }), }); vi.stubGlobal('fetch', fetchMock); + window.localStorage.setItem('clawx:allow-localhost-fallback', '1'); invokeIpcMock.mockResolvedValueOnce({ ok: false, @@ -86,6 +88,7 @@ describe('host-api', () => { json: async () => ({ fallback: true }), }); vi.stubGlobal('fetch', fetchMock); + window.localStorage.setItem('clawx:allow-localhost-fallback', '1'); invokeIpcMock.mockRejectedValueOnce(new Error('Invalid IPC channel: hostapi:fetch')); @@ -98,4 +101,19 @@ describe('host-api', () => { expect.objectContaining({ headers: expect.any(Object) }), ); }); + + it('does not use localhost fallback when policy flag is disabled', async () => { + const fetchMock = vi.fn().mockResolvedValue({ + ok: true, + status: 200, + json: async () => ({ fallback: true }), + }); + vi.stubGlobal('fetch', fetchMock); + + invokeIpcMock.mockRejectedValueOnce(new Error('Invalid IPC channel: hostapi:fetch')); + + const { hostApiFetch } = await import('@/lib/host-api'); + await expect(hostApiFetch('/api/test')).rejects.toThrow('Invalid IPC channel: hostapi:fetch'); + expect(fetchMock).not.toHaveBeenCalled(); + }); }); diff --git a/tests/unit/provider-runtime-sync.test.ts b/tests/unit/provider-runtime-sync.test.ts new file mode 100644 index 000000000..1e6fa4094 --- /dev/null +++ b/tests/unit/provider-runtime-sync.test.ts @@ -0,0 +1,145 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import type { GatewayManager } from '@electron/gateway/manager'; +import type { ProviderConfig } from '@electron/utils/secure-storage'; + +const mocks = vi.hoisted(() => ({ + getProviderAccount: vi.fn(), + listProviderAccounts: vi.fn(), + getProviderSecret: vi.fn(), + getAllProviders: vi.fn(), + getApiKey: vi.fn(), + getDefaultProvider: vi.fn(), + getProvider: vi.fn(), + getProviderConfig: vi.fn(), + getProviderDefaultModel: vi.fn(), + removeProviderFromOpenClaw: vi.fn(), + saveOAuthTokenToOpenClaw: vi.fn(), + saveProviderKeyToOpenClaw: vi.fn(), + setOpenClawDefaultModel: vi.fn(), + setOpenClawDefaultModelWithOverride: vi.fn(), + syncProviderConfigToOpenClaw: vi.fn(), + updateAgentModelProvider: vi.fn(), +})); + +vi.mock('@electron/services/providers/provider-store', () => ({ + getProviderAccount: mocks.getProviderAccount, + listProviderAccounts: mocks.listProviderAccounts, +})); + +vi.mock('@electron/services/secrets/secret-store', () => ({ + getProviderSecret: mocks.getProviderSecret, +})); + +vi.mock('@electron/utils/secure-storage', () => ({ + getAllProviders: mocks.getAllProviders, + getApiKey: mocks.getApiKey, + getDefaultProvider: mocks.getDefaultProvider, + getProvider: mocks.getProvider, +})); + +vi.mock('@electron/utils/provider-registry', () => ({ + getProviderConfig: mocks.getProviderConfig, + getProviderDefaultModel: mocks.getProviderDefaultModel, +})); + +vi.mock('@electron/utils/openclaw-auth', () => ({ + removeProviderFromOpenClaw: mocks.removeProviderFromOpenClaw, + saveOAuthTokenToOpenClaw: mocks.saveOAuthTokenToOpenClaw, + saveProviderKeyToOpenClaw: mocks.saveProviderKeyToOpenClaw, + setOpenClawDefaultModel: mocks.setOpenClawDefaultModel, + setOpenClawDefaultModelWithOverride: mocks.setOpenClawDefaultModelWithOverride, + syncProviderConfigToOpenClaw: mocks.syncProviderConfigToOpenClaw, + updateAgentModelProvider: mocks.updateAgentModelProvider, +})); + +vi.mock('@electron/utils/logger', () => ({ + logger: { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }, +})); + +import { + syncDefaultProviderToRuntime, + syncDeletedProviderToRuntime, + syncSavedProviderToRuntime, +} from '@electron/services/providers/provider-runtime-sync'; + +function createProvider(overrides: Partial = {}): ProviderConfig { + return { + id: 'moonshot', + name: 'Moonshot', + type: 'moonshot', + model: 'kimi-k2.5', + enabled: true, + createdAt: '2026-03-14T00:00:00.000Z', + updatedAt: '2026-03-14T00:00:00.000Z', + ...overrides, + }; +} + +function createGateway(state: 'running' | 'stopped' = 'running'): Pick { + return { + debouncedReload: vi.fn(), + debouncedRestart: vi.fn(), + getStatus: vi.fn(() => ({ state } as ReturnType)), + }; +} + +describe('provider-runtime-sync refresh strategy', () => { + beforeEach(() => { + vi.clearAllMocks(); + mocks.getProviderAccount.mockResolvedValue(null); + mocks.getProviderSecret.mockResolvedValue(undefined); + mocks.getAllProviders.mockResolvedValue([]); + mocks.getApiKey.mockResolvedValue('sk-test'); + mocks.getDefaultProvider.mockResolvedValue('moonshot'); + mocks.getProvider.mockResolvedValue(createProvider()); + mocks.getProviderDefaultModel.mockReturnValue('kimi-k2.5'); + mocks.getProviderConfig.mockReturnValue({ + api: 'openai-completions', + baseUrl: 'https://api.moonshot.cn/v1', + apiKeyEnv: 'MOONSHOT_API_KEY', + }); + mocks.syncProviderConfigToOpenClaw.mockResolvedValue(undefined); + mocks.setOpenClawDefaultModel.mockResolvedValue(undefined); + mocks.setOpenClawDefaultModelWithOverride.mockResolvedValue(undefined); + mocks.saveProviderKeyToOpenClaw.mockResolvedValue(undefined); + mocks.removeProviderFromOpenClaw.mockResolvedValue(undefined); + mocks.updateAgentModelProvider.mockResolvedValue(undefined); + }); + + it('uses debouncedReload after saving provider config', async () => { + const gateway = createGateway('running'); + await syncSavedProviderToRuntime(createProvider(), undefined, gateway as GatewayManager); + + expect(gateway.debouncedReload).toHaveBeenCalledTimes(1); + expect(gateway.debouncedRestart).not.toHaveBeenCalled(); + }); + + it('uses debouncedRestart after deleting provider config', async () => { + const gateway = createGateway('running'); + await syncDeletedProviderToRuntime(createProvider(), 'moonshot', gateway as GatewayManager); + + expect(gateway.debouncedRestart).toHaveBeenCalledTimes(1); + expect(gateway.debouncedReload).not.toHaveBeenCalled(); + }); + + it('uses debouncedReload after switching default provider when gateway is running', async () => { + const gateway = createGateway('running'); + await syncDefaultProviderToRuntime('moonshot', gateway as GatewayManager); + + expect(gateway.debouncedReload).toHaveBeenCalledTimes(1); + expect(gateway.debouncedRestart).not.toHaveBeenCalled(); + }); + + it('skips refresh after switching default provider when gateway is stopped', async () => { + const gateway = createGateway('stopped'); + await syncDefaultProviderToRuntime('moonshot', gateway as GatewayManager); + + expect(gateway.debouncedReload).not.toHaveBeenCalled(); + expect(gateway.debouncedRestart).not.toHaveBeenCalled(); + }); +}); diff --git a/tests/unit/stores.test.ts b/tests/unit/stores.test.ts index d51dcb3bf..c62588df4 100644 --- a/tests/unit/stores.test.ts +++ b/tests/unit/stores.test.ts @@ -43,9 +43,27 @@ describe('Settings Store', () => { }); it('should unlock dev mode', () => { + const invoke = vi.mocked(window.electron.ipcRenderer.invoke); + invoke.mockResolvedValueOnce({ + ok: true, + data: { + status: 200, + ok: true, + json: { success: true }, + }, + }); + const { setDevModeUnlocked } = useSettingsStore.getState(); setDevModeUnlocked(true); + expect(useSettingsStore.getState().devModeUnlocked).toBe(true); + expect(invoke).toHaveBeenCalledWith( + 'hostapi:fetch', + expect.objectContaining({ + path: '/api/settings/devModeUnlocked', + method: 'PUT', + }), + ); }); it('should persist launch-at-startup setting through host api', () => {