From b01952fba7203d8372677f3ae591d9bf2b8b541e Mon Sep 17 00:00:00 2001 From: Haze <709547807@qq.com> Date: Fri, 6 Feb 2026 02:38:18 +0800 Subject: [PATCH] fix(stores): align RPC methods with OpenClaw protocol and fix chat flow - Fix channels store: use channels.status instead of channels.list - Fix skills store: use skills.status instead of skills.list - Fix chat store: correct chat.history response parsing (messages in payload) - Fix chat store: handle chat.send async flow (ack + event streaming) - Add chat event handling for streaming AI responses (delta/final/error) - Wire gateway:chat-message IPC events to chat store - Fix health check: use WebSocket status instead of nonexistent /health endpoint - Fix waitForReady: probe via WebSocket instead of HTTP - Gracefully degrade when methods are unsupported (no white screen) --- electron/gateway/manager.ts | 69 ++++++++++++++------- src/stores/channels.ts | 17 ++++-- src/stores/chat.ts | 117 +++++++++++++++++++++++++++++++----- src/stores/gateway.ts | 12 +++- src/stores/skills.ts | 19 ++++-- 5 files changed, 186 insertions(+), 48 deletions(-) diff --git a/electron/gateway/manager.ts b/electron/gateway/manager.ts index 0419bc214..e4a064106 100644 --- a/electron/gateway/manager.ts +++ b/electron/gateway/manager.ts @@ -276,39 +276,48 @@ export class GatewayManager extends EventEmitter { } /** - * Check Gateway health via HTTP endpoint + * Check Gateway health via WebSocket ping + * OpenClaw Gateway doesn't have an HTTP /health endpoint */ async checkHealth(): Promise<{ ok: boolean; error?: string; uptime?: number }> { try { - const response = await fetch(`http://localhost:${this.status.port}/health`, { - signal: AbortSignal.timeout(5000), - }); - - if (response.ok) { - const data = await response.json() as { uptime?: number }; - return { ok: true, uptime: data.uptime }; + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + const uptime = this.status.connectedAt + ? Math.floor((Date.now() - this.status.connectedAt) / 1000) + : undefined; + return { ok: true, uptime }; } - - return { ok: false, error: `Health check returned ${response.status}` }; + return { ok: false, error: 'WebSocket not connected' }; } catch (error) { return { ok: false, error: String(error) }; } } /** - * Find existing Gateway process + * Find existing Gateway process by attempting a WebSocket connection */ private async findExistingGateway(): Promise<{ port: number } | null> { try { - // Try to connect to default port const port = PORTS.OPENCLAW_GATEWAY; - const response = await fetch(`http://localhost:${port}/health`, { - signal: AbortSignal.timeout(2000), + // Try a quick WebSocket connection to check if gateway is listening + return await new Promise<{ port: number } | null>((resolve) => { + const testWs = new WebSocket(`ws://localhost:${port}/ws`); + const timeout = setTimeout(() => { + testWs.close(); + resolve(null); + }, 2000); + + testWs.on('open', () => { + clearTimeout(timeout); + testWs.close(); + resolve({ port }); + }); + + testWs.on('error', () => { + clearTimeout(timeout); + resolve(null); + }); }); - - if (response.ok) { - return { port }; - } } catch { // Gateway not running } @@ -413,16 +422,32 @@ export class GatewayManager extends EventEmitter { } /** - * Wait for Gateway to be ready + * Wait for Gateway to be ready by checking if the port is accepting connections */ private async waitForReady(retries = 30, interval = 1000): Promise { for (let i = 0; i < retries; i++) { try { - const response = await fetch(`http://localhost:${this.status.port}/health`, { - signal: AbortSignal.timeout(1000), + // Try a quick WebSocket connection to see if the gateway is listening + const ready = await new Promise((resolve) => { + const testWs = new WebSocket(`ws://localhost:${this.status.port}/ws`); + const timeout = setTimeout(() => { + testWs.close(); + resolve(false); + }, 1000); + + testWs.on('open', () => { + clearTimeout(timeout); + testWs.close(); + resolve(true); + }); + + testWs.on('error', () => { + clearTimeout(timeout); + resolve(false); + }); }); - if (response.ok) { + if (ready) { return; } } catch { diff --git a/src/stores/channels.ts b/src/stores/channels.ts index ed2e41f65..8ec9dd323 100644 --- a/src/stores/channels.ts +++ b/src/stores/channels.ts @@ -37,19 +37,26 @@ export const useChannelsStore = create((set, get) => ({ set({ loading: true, error: null }); try { + // OpenClaw uses channels.status to get channel information const result = await window.electron.ipcRenderer.invoke( 'gateway:rpc', - 'channels.list' - ) as { success: boolean; result?: Channel[]; error?: string }; + 'channels.status', + {} + ) as { success: boolean; result?: { channels?: Channel[] } | Channel[]; error?: string }; if (result.success && result.result) { - set({ channels: result.result, loading: false }); + // Handle both array and object response formats + const channels = Array.isArray(result.result) + ? result.result + : (result.result.channels || []); + set({ channels, loading: false }); } else { - // Gateway might not be running, don't show error for empty channels + // Don't show error for unsupported methods - just use empty list set({ channels: [], loading: false }); } } catch (error) { - // Gateway not running - start with empty channels + // Gateway might not support this method yet - gracefully degrade + console.warn('Failed to fetch channels:', error); set({ channels: [], loading: false }); } }, diff --git a/src/stores/chat.ts b/src/stores/chat.ts index 370e7c5d5..d022a97f6 100644 --- a/src/stores/chat.ts +++ b/src/stores/chat.ts @@ -32,6 +32,8 @@ interface ChatState { loading: boolean; sending: boolean; error: string | null; + // Track active run for streaming + activeRunId: string | null; // Actions fetchHistory: (limit?: number) => Promise; @@ -40,6 +42,7 @@ interface ChatState { addMessage: (message: ChatMessage) => void; updateMessage: (messageId: string, updates: Partial) => void; setMessages: (messages: ChatMessage[]) => void; + handleChatEvent: (event: Record) => void; } export const useChatStore = create((set, get) => ({ @@ -47,28 +50,47 @@ export const useChatStore = create((set, get) => ({ loading: false, sending: false, error: null, + activeRunId: null, fetchHistory: async (limit = 50) => { set({ loading: true, error: null }); try { + // OpenClaw chat.history requires: { sessionKey, limit? } + // Response format: { sessionKey, sessionId, messages, thinkingLevel, verboseLevel } const result = await window.electron.ipcRenderer.invoke( 'gateway:rpc', 'chat.history', - { limit, offset: 0 } - ) as { success: boolean; result?: ChatMessage[]; error?: string }; + { sessionKey: 'main', limit } + ) as { success: boolean; result?: { messages?: unknown[] } | unknown; error?: string }; if (result.success && result.result) { - set({ messages: result.result, loading: false }); + const data = result.result as Record; + const rawMessages = Array.isArray(data.messages) ? data.messages : []; + + // Map OpenClaw messages to our ChatMessage format + const messages: ChatMessage[] = rawMessages.map((msg: unknown, idx: number) => { + const m = msg as Record; + return { + id: String(m.id || `msg-${idx}`), + role: (m.role as 'user' | 'assistant' | 'system') || 'assistant', + content: String(m.content || m.text || ''), + timestamp: String(m.timestamp || new Date().toISOString()), + }; + }); + + set({ messages, loading: false }); } else { - set({ error: result.error || 'Failed to fetch history', loading: false }); + // No history yet or method not available - just show empty + set({ messages: [], loading: false }); } } catch (error) { - set({ error: String(error), loading: false }); + console.warn('Failed to fetch chat history:', error); + set({ messages: [], loading: false }); } }, - sendMessage: async (content, channelId) => { + sendMessage: async (content, _channelId) => { const { addMessage } = get(); // Add user message immediately @@ -77,28 +99,34 @@ export const useChatStore = create((set, get) => ({ role: 'user', content, timestamp: new Date().toISOString(), - channel: channelId, }; addMessage(userMessage); set({ sending: true, error: null }); try { + // OpenClaw chat.send requires: { sessionKey, message, idempotencyKey } + // Response is an acknowledgment: { runId, status: "started" } + // The actual AI response comes via WebSocket chat events + const idempotencyKey = crypto.randomUUID(); const result = await window.electron.ipcRenderer.invoke( 'gateway:rpc', 'chat.send', - { content, channelId } - ) as { success: boolean; result?: ChatMessage; error?: string }; + { sessionKey: 'main', message: content, idempotencyKey } + ) as { success: boolean; result?: { runId?: string; status?: string }; error?: string }; - if (result.success && result.result) { - addMessage(result.result); + if (!result.success) { + set({ error: result.error || 'Failed to send message', sending: false }); } else { - set({ error: result.error || 'Failed to send message' }); + // Store the active run ID - response will come via chat events + const runId = result.result?.runId; + if (runId) { + set({ activeRunId: runId }); + } + // Keep sending=true until we receive the final chat event } } catch (error) { - set({ error: String(error) }); - } finally { - set({ sending: false }); + set({ error: String(error), sending: false }); } }, @@ -126,4 +154,63 @@ export const useChatStore = create((set, get) => ({ }, setMessages: (messages) => set({ messages }), + + /** + * Handle incoming chat event from Gateway WebSocket + * Events: { runId, sessionKey, seq, state, message, errorMessage } + * States: "delta" (streaming), "final" (complete), "aborted", "error" + */ + handleChatEvent: (event) => { + const { addMessage, updateMessage, messages } = get(); + const runId = String(event.runId || ''); + const state = String(event.state || ''); + + if (state === 'delta') { + // Streaming delta - find or create assistant message for this run + const existingMsg = messages.find((m) => m.id === `run-${runId}`); + const messageContent = event.message as Record | undefined; + const content = String(messageContent?.content || messageContent?.text || ''); + + if (existingMsg) { + // Append to existing message + updateMessage(`run-${runId}`, { + content: existingMsg.content + content, + }); + } else if (content) { + // Create new assistant message + addMessage({ + id: `run-${runId}`, + role: 'assistant', + content, + timestamp: new Date().toISOString(), + }); + } + } else if (state === 'final') { + // Final message - replace or add complete response + const messageContent = event.message as Record | undefined; + const content = String( + messageContent?.content + || messageContent?.text + || (typeof messageContent === 'string' ? messageContent : '') + ); + + const existingMsg = messages.find((m) => m.id === `run-${runId}`); + if (existingMsg) { + updateMessage(`run-${runId}`, { content }); + } else if (content) { + addMessage({ + id: `run-${runId}`, + role: 'assistant', + content, + timestamp: new Date().toISOString(), + }); + } + set({ sending: false, activeRunId: null }); + } else if (state === 'error') { + const errorMsg = String(event.errorMessage || 'An error occurred'); + set({ error: errorMsg, sending: false, activeRunId: null }); + } else if (state === 'aborted') { + set({ sending: false, activeRunId: null }); + } + }, })); diff --git a/src/stores/gateway.ts b/src/stores/gateway.ts index 89368f4b5..2d81d6016 100644 --- a/src/stores/gateway.ts +++ b/src/stores/gateway.ts @@ -58,7 +58,17 @@ export const useGatewayStore = create((set, get) => ({ // Listen for notifications window.electron.ipcRenderer.on('gateway:notification', (notification) => { console.log('Gateway notification:', notification); - // Could dispatch to other stores based on notification type + }); + + // Listen for chat events from the gateway and forward to chat store + window.electron.ipcRenderer.on('gateway:chat-message', (data) => { + const { useChatStore } = require('./chat'); + const chatData = data as { message?: Record } | Record; + // The event payload may be nested under 'message' or directly on data + const event = ('message' in chatData && typeof chatData.message === 'object') + ? chatData.message as Record + : chatData as Record; + useChatStore.getState().handleChatEvent(event); }); } catch (error) { diff --git a/src/stores/skills.ts b/src/stores/skills.ts index acbaf97b2..841a74304 100644 --- a/src/stores/skills.ts +++ b/src/stores/skills.ts @@ -27,18 +27,27 @@ export const useSkillsStore = create((set, get) => ({ set({ loading: true, error: null }); try { + // OpenClaw uses skills.status to get skill information const result = await window.electron.ipcRenderer.invoke( 'gateway:rpc', - 'skills.list' - ) as { success: boolean; result?: Skill[]; error?: string }; + 'skills.status', + {} + ) as { success: boolean; result?: { skills?: Skill[] } | Skill[]; error?: string }; if (result.success && result.result) { - set({ skills: result.result, loading: false }); + // Handle both array and object response formats + const skills = Array.isArray(result.result) + ? result.result + : (result.result.skills || []); + set({ skills, loading: false }); } else { - set({ error: result.error || 'Failed to fetch skills', loading: false }); + // Don't show error for unsupported methods - just use empty list + set({ skills: [], loading: false }); } } catch (error) { - set({ error: String(error), loading: false }); + // Gateway might not support this method yet - gracefully degrade + console.warn('Failed to fetch skills:', error); + set({ skills: [], loading: false }); } },