feat(gateway): enhance gateway readiness handling and batch sync configuration (#851)
Co-authored-by: paisley <8197966+su8su@users.noreply.github.com>
This commit is contained in:
51
tests/unit/gateway-event-dispatch.test.ts
Normal file
51
tests/unit/gateway-event-dispatch.test.ts
Normal file
@@ -0,0 +1,51 @@
|
||||
import { describe, expect, it, vi } from 'vitest';
|
||||
import { dispatchProtocolEvent } from '@electron/gateway/event-dispatch';
|
||||
|
||||
function createMockEmitter() {
|
||||
const emitted: Array<{ event: string; payload: unknown }> = [];
|
||||
return {
|
||||
emit: vi.fn((event: string, payload: unknown) => {
|
||||
emitted.push({ event, payload });
|
||||
return true;
|
||||
}),
|
||||
emitted,
|
||||
};
|
||||
}
|
||||
|
||||
describe('dispatchProtocolEvent', () => {
|
||||
it('dispatches gateway.ready event to gateway:ready', () => {
|
||||
const emitter = createMockEmitter();
|
||||
dispatchProtocolEvent(emitter, 'gateway.ready', { version: '4.11' });
|
||||
expect(emitter.emit).toHaveBeenCalledWith('gateway:ready', { version: '4.11' });
|
||||
});
|
||||
|
||||
it('dispatches ready event to gateway:ready', () => {
|
||||
const emitter = createMockEmitter();
|
||||
dispatchProtocolEvent(emitter, 'ready', { skills: 31 });
|
||||
expect(emitter.emit).toHaveBeenCalledWith('gateway:ready', { skills: 31 });
|
||||
});
|
||||
|
||||
it('dispatches channel.status to channel:status', () => {
|
||||
const emitter = createMockEmitter();
|
||||
dispatchProtocolEvent(emitter, 'channel.status', { channelId: 'telegram', status: 'connected' });
|
||||
expect(emitter.emit).toHaveBeenCalledWith('channel:status', { channelId: 'telegram', status: 'connected' });
|
||||
});
|
||||
|
||||
it('dispatches chat to chat:message', () => {
|
||||
const emitter = createMockEmitter();
|
||||
dispatchProtocolEvent(emitter, 'chat', { text: 'hello' });
|
||||
expect(emitter.emit).toHaveBeenCalledWith('chat:message', { message: { text: 'hello' } });
|
||||
});
|
||||
|
||||
it('suppresses tick events', () => {
|
||||
const emitter = createMockEmitter();
|
||||
dispatchProtocolEvent(emitter, 'tick', {});
|
||||
expect(emitter.emit).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('dispatches unknown events as notifications', () => {
|
||||
const emitter = createMockEmitter();
|
||||
dispatchProtocolEvent(emitter, 'some.custom.event', { data: 1 });
|
||||
expect(emitter.emit).toHaveBeenCalledWith('notification', { method: 'some.custom.event', params: { data: 1 } });
|
||||
});
|
||||
});
|
||||
@@ -38,4 +38,42 @@ describe('gateway store event wiring', () => {
|
||||
handlers.get('gateway:status')?.({ state: 'stopped', port: 18789 });
|
||||
expect(useGatewayStore.getState().status.state).toBe('stopped');
|
||||
});
|
||||
|
||||
it('propagates gatewayReady field from status events', async () => {
|
||||
hostApiFetchMock.mockResolvedValueOnce({ state: 'running', port: 18789, gatewayReady: false });
|
||||
|
||||
const handlers = new Map<string, (payload: unknown) => void>();
|
||||
subscribeHostEventMock.mockImplementation((eventName: string, handler: (payload: unknown) => void) => {
|
||||
handlers.set(eventName, handler);
|
||||
return () => {};
|
||||
});
|
||||
|
||||
const { useGatewayStore } = await import('@/stores/gateway');
|
||||
await useGatewayStore.getState().init();
|
||||
|
||||
// Initially gatewayReady=false from the status fetch
|
||||
expect(useGatewayStore.getState().status.gatewayReady).toBe(false);
|
||||
|
||||
// Simulate gateway.ready event setting gatewayReady=true
|
||||
handlers.get('gateway:status')?.({ state: 'running', port: 18789, gatewayReady: true });
|
||||
expect(useGatewayStore.getState().status.gatewayReady).toBe(true);
|
||||
});
|
||||
|
||||
it('treats undefined gatewayReady as ready for backwards compatibility', async () => {
|
||||
hostApiFetchMock.mockResolvedValueOnce({ state: 'running', port: 18789 });
|
||||
|
||||
const handlers = new Map<string, (payload: unknown) => void>();
|
||||
subscribeHostEventMock.mockImplementation((eventName: string, handler: (payload: unknown) => void) => {
|
||||
handlers.set(eventName, handler);
|
||||
return () => {};
|
||||
});
|
||||
|
||||
const { useGatewayStore } = await import('@/stores/gateway');
|
||||
await useGatewayStore.getState().init();
|
||||
|
||||
const status = useGatewayStore.getState().status;
|
||||
// gatewayReady is undefined (old gateway version) — should be treated as ready
|
||||
expect(status.gatewayReady).toBeUndefined();
|
||||
expect(status.state === 'running' && status.gatewayReady !== false).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
127
tests/unit/gateway-ready-fallback.test.ts
Normal file
127
tests/unit/gateway-ready-fallback.test.ts
Normal file
@@ -0,0 +1,127 @@
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
vi.mock('electron', () => ({
|
||||
app: {
|
||||
getPath: () => '/tmp',
|
||||
isPackaged: false,
|
||||
},
|
||||
utilityProcess: {},
|
||||
}));
|
||||
|
||||
vi.mock('@electron/utils/logger', () => ({
|
||||
logger: {
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
debug: vi.fn(),
|
||||
},
|
||||
}));
|
||||
|
||||
vi.mock('@electron/utils/config', () => ({
|
||||
PORTS: { OPENCLAW_GATEWAY: 18789 },
|
||||
}));
|
||||
|
||||
describe('GatewayManager gatewayReady fallback', () => {
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it('sets gatewayReady=false when entering starting state', async () => {
|
||||
vi.resetModules();
|
||||
const { GatewayManager } = await import('@electron/gateway/manager');
|
||||
const manager = new GatewayManager();
|
||||
|
||||
const statusUpdates: Array<{ gatewayReady?: boolean }> = [];
|
||||
manager.on('status', (status: { gatewayReady?: boolean }) => {
|
||||
statusUpdates.push({ gatewayReady: status.gatewayReady });
|
||||
});
|
||||
|
||||
// Simulate start attempt (will fail but we can check the initial status)
|
||||
try {
|
||||
await manager.start();
|
||||
} catch {
|
||||
// expected to fail — no actual gateway process
|
||||
}
|
||||
|
||||
const startingUpdate = statusUpdates.find((u) => u.gatewayReady === false);
|
||||
expect(startingUpdate).toBeDefined();
|
||||
});
|
||||
|
||||
it('emits gatewayReady=true when gateway:ready event is received', async () => {
|
||||
vi.resetModules();
|
||||
const { GatewayManager } = await import('@electron/gateway/manager');
|
||||
const manager = new GatewayManager();
|
||||
|
||||
// Force internal state to 'running' for the test
|
||||
const stateController = (manager as unknown as { stateController: { setStatus: (u: Record<string, unknown>) => void } }).stateController;
|
||||
stateController.setStatus({ state: 'running', connectedAt: Date.now() });
|
||||
|
||||
const statusUpdates: Array<{ gatewayReady?: boolean; state: string }> = [];
|
||||
manager.on('status', (status: { gatewayReady?: boolean; state: string }) => {
|
||||
statusUpdates.push({ gatewayReady: status.gatewayReady, state: status.state });
|
||||
});
|
||||
|
||||
manager.emit('gateway:ready', {});
|
||||
|
||||
const readyUpdate = statusUpdates.find((u) => u.gatewayReady === true);
|
||||
expect(readyUpdate).toBeDefined();
|
||||
});
|
||||
|
||||
it('auto-sets gatewayReady=true after fallback timeout if no event received', async () => {
|
||||
vi.resetModules();
|
||||
const { GatewayManager } = await import('@electron/gateway/manager');
|
||||
const manager = new GatewayManager();
|
||||
|
||||
// Force internal state to 'running' without gatewayReady
|
||||
const stateController = (manager as unknown as { stateController: { setStatus: (u: Record<string, unknown>) => void } }).stateController;
|
||||
stateController.setStatus({ state: 'running', connectedAt: Date.now() });
|
||||
|
||||
const statusUpdates: Array<{ gatewayReady?: boolean }> = [];
|
||||
manager.on('status', (status: { gatewayReady?: boolean }) => {
|
||||
statusUpdates.push({ gatewayReady: status.gatewayReady });
|
||||
});
|
||||
|
||||
// Call the private scheduleGatewayReadyFallback method
|
||||
(manager as unknown as { scheduleGatewayReadyFallback: () => void }).scheduleGatewayReadyFallback();
|
||||
|
||||
// Before timeout, no gatewayReady update
|
||||
vi.advanceTimersByTime(29_000);
|
||||
expect(statusUpdates.find((u) => u.gatewayReady === true)).toBeUndefined();
|
||||
|
||||
// After 30s fallback timeout
|
||||
vi.advanceTimersByTime(2_000);
|
||||
const readyUpdate = statusUpdates.find((u) => u.gatewayReady === true);
|
||||
expect(readyUpdate).toBeDefined();
|
||||
});
|
||||
|
||||
it('cancels fallback timer when gateway:ready event arrives first', async () => {
|
||||
vi.resetModules();
|
||||
const { GatewayManager } = await import('@electron/gateway/manager');
|
||||
const manager = new GatewayManager();
|
||||
|
||||
const stateController = (manager as unknown as { stateController: { setStatus: (u: Record<string, unknown>) => void } }).stateController;
|
||||
stateController.setStatus({ state: 'running', connectedAt: Date.now() });
|
||||
|
||||
const statusUpdates: Array<{ gatewayReady?: boolean }> = [];
|
||||
manager.on('status', (status: { gatewayReady?: boolean }) => {
|
||||
statusUpdates.push({ gatewayReady: status.gatewayReady });
|
||||
});
|
||||
|
||||
// Schedule fallback
|
||||
(manager as unknown as { scheduleGatewayReadyFallback: () => void }).scheduleGatewayReadyFallback();
|
||||
|
||||
// gateway:ready event arrives at 5s
|
||||
vi.advanceTimersByTime(5_000);
|
||||
manager.emit('gateway:ready', {});
|
||||
expect(statusUpdates.filter((u) => u.gatewayReady === true)).toHaveLength(1);
|
||||
|
||||
// After 30s, no duplicate gatewayReady=true
|
||||
vi.advanceTimersByTime(30_000);
|
||||
expect(statusUpdates.filter((u) => u.gatewayReady === true)).toHaveLength(1);
|
||||
});
|
||||
});
|
||||
89
tests/unit/session-label-fetch.test.ts
Normal file
89
tests/unit/session-label-fetch.test.ts
Normal file
@@ -0,0 +1,89 @@
|
||||
import { describe, expect, it, vi, beforeEach } from 'vitest';
|
||||
|
||||
const invokeIpcMock = vi.fn();
|
||||
|
||||
vi.mock('@/lib/api-client', () => ({
|
||||
invokeIpc: (...args: unknown[]) => invokeIpcMock(...args),
|
||||
}));
|
||||
|
||||
vi.mock('@/stores/chat/helpers', () => ({
|
||||
getCanonicalPrefixFromSessions: () => 'agent:main',
|
||||
getMessageText: (content: unknown) => typeof content === 'string' ? content : '',
|
||||
toMs: (v: unknown) => typeof v === 'number' ? v : 0,
|
||||
}));
|
||||
|
||||
describe('session label fetch concurrency', () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
it('limits concurrent chat.history RPCs during label fetches', async () => {
|
||||
// Track max concurrent RPCs
|
||||
let currentConcurrency = 0;
|
||||
let maxConcurrency = 0;
|
||||
const resolvers: Array<() => void> = [];
|
||||
|
||||
invokeIpcMock.mockImplementation(async (channel: string, method: string) => {
|
||||
if (method === 'sessions.list') {
|
||||
return {
|
||||
success: true,
|
||||
result: {
|
||||
sessions: Array.from({ length: 12 }, (_, i) => ({
|
||||
key: `agent:main:session-${i}`,
|
||||
label: `Session ${i}`,
|
||||
})),
|
||||
},
|
||||
};
|
||||
}
|
||||
if (method === 'chat.history') {
|
||||
currentConcurrency++;
|
||||
maxConcurrency = Math.max(maxConcurrency, currentConcurrency);
|
||||
await new Promise<void>((resolve) => resolvers.push(resolve));
|
||||
currentConcurrency--;
|
||||
return {
|
||||
success: true,
|
||||
result: {
|
||||
messages: [{ role: 'user', content: 'hello', timestamp: Date.now() }],
|
||||
},
|
||||
};
|
||||
}
|
||||
return { success: false };
|
||||
});
|
||||
|
||||
vi.resetModules();
|
||||
const { createSessionActions } = await import('@/stores/chat/session-actions');
|
||||
const state = {
|
||||
currentSessionKey: 'agent:main:main',
|
||||
messages: [],
|
||||
sessions: [],
|
||||
sessionLabels: {},
|
||||
sessionLastActivity: {},
|
||||
};
|
||||
const set = vi.fn();
|
||||
const get = vi.fn().mockReturnValue({
|
||||
...state,
|
||||
loadHistory: vi.fn(),
|
||||
});
|
||||
|
||||
const actions = createSessionActions(set as never, get as never);
|
||||
await actions.loadSessions();
|
||||
|
||||
// Wait for the label-fetch loop to start its batches
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
|
||||
// Resolve first batch (up to 5 concurrent)
|
||||
while (resolvers.length > 0 && resolvers.length <= 5) {
|
||||
resolvers.shift()?.();
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
}
|
||||
|
||||
// Resolve remaining
|
||||
while (resolvers.length > 0) {
|
||||
resolvers.shift()?.();
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
}
|
||||
|
||||
// maxConcurrency should be capped at 5 (LABEL_FETCH_CONCURRENCY)
|
||||
expect(maxConcurrency).toBeLessThanOrEqual(5);
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user