From 6383e10d630c1670fbea91d141beb2bad60868ff Mon Sep 17 00:00:00 2001 From: Haze <709547807@qq.com> Date: Wed, 25 Feb 2026 23:59:55 +0800 Subject: [PATCH] feat(chat): reformat streaming output (#173) --- electron/gateway/manager.ts | 22 ++++- src/pages/Chat/ChatMessage.tsx | 61 +++++++------ src/pages/Chat/index.tsx | 41 +++++++-- src/stores/chat.ts | 158 ++++++++++++++++++++++----------- src/stores/gateway.ts | 89 ++++++++++++------- 5 files changed, 249 insertions(+), 122 deletions(-) diff --git a/electron/gateway/manager.ts b/electron/gateway/manager.ts index 18340a736..6284d39f3 100644 --- a/electron/gateway/manager.ts +++ b/electron/gateway/manager.ts @@ -1146,7 +1146,6 @@ export class GatewayManager extends EventEmitter { return; } - // Emit generic message for other handlers this.emit('message', message); } @@ -1154,19 +1153,34 @@ export class GatewayManager extends EventEmitter { * Handle OpenClaw protocol events */ private handleProtocolEvent(event: string, payload: unknown): void { - // Map OpenClaw events to our internal event types switch (event) { case 'tick': - // Heartbeat tick, ignore break; case 'chat': this.emit('chat:message', { message: payload }); break; + case 'agent': { + // Agent events may carry chat streaming data inside payload.data, + // or be lifecycle events (phase=started/completed) with no message. + const p = payload as Record; + const data = (p.data && typeof p.data === 'object') ? p.data as Record : {}; + const chatEvent: Record = { + ...data, + runId: p.runId ?? data.runId, + sessionKey: p.sessionKey ?? data.sessionKey, + state: p.state ?? data.state, + message: p.message ?? data.message, + }; + if (chatEvent.state || chatEvent.message) { + this.emit('chat:message', { message: chatEvent }); + } + this.emit('notification', { method: event, params: payload }); + break; + } case 'channel.status': this.emit('channel:status', payload as { channelId: string; status: string }); break; default: - // Forward unknown events as generic notifications this.emit('notification', { method: event, params: payload }); } } diff --git a/src/pages/Chat/ChatMessage.tsx b/src/pages/Chat/ChatMessage.tsx index c1c416396..b0cf19982 100644 --- a/src/pages/Chat/ChatMessage.tsx +++ b/src/pages/Chat/ChatMessage.tsx @@ -4,7 +4,7 @@ * with markdown, thinking sections, images, and tool cards. */ import { useState, useCallback, useEffect, memo } from 'react'; -import { User, Sparkles, Copy, Check, ChevronDown, ChevronRight, Wrench, FileText, Film, Music, FileArchive, File, X, FolderOpen, ZoomIn } from 'lucide-react'; +import { User, Sparkles, Copy, Check, ChevronDown, ChevronRight, Wrench, FileText, Film, Music, FileArchive, File, X, FolderOpen, ZoomIn, Loader2, CheckCircle2, AlertCircle } from 'lucide-react'; import ReactMarkdown from 'react-markdown'; import remarkGfm from 'remark-gfm'; import { createPortal } from 'react-dom'; @@ -51,7 +51,7 @@ export const ChatMessage = memo(function ChatMessage({ const images = extractImages(message); const tools = extractToolUse(message); const visibleThinking = showThinking ? thinking : null; - const visibleTools = showThinking ? tools : []; + const visibleTools = tools; const attachedFiles = message._attachedFiles || []; const [lightboxImg, setLightboxImg] = useState<{ src: string; fileName: string; filePath?: string; base64?: string; mimeType?: string } | null>(null); @@ -59,8 +59,7 @@ export const ChatMessage = memo(function ChatMessage({ // Never render tool result messages in chat UI if (isToolResult) return null; - // Don't render empty messages (also keep messages with streaming tool status) - const hasStreamingToolStatus = showThinking && isStreaming && streamingTools.length > 0; + const hasStreamingToolStatus = isStreaming && streamingTools.length > 0; if (!hasText && !visibleThinking && images.length === 0 && visibleTools.length === 0 && attachedFiles.length === 0 && !hasStreamingToolStatus) return null; return ( @@ -89,7 +88,7 @@ export const ChatMessage = memo(function ChatMessage({ isUser ? 'items-end' : 'items-start', )} > - {showThinking && isStreaming && !isUser && streamingTools.length > 0 && ( + {isStreaming && !isUser && streamingTools.length > 0 && ( )} @@ -266,28 +265,33 @@ function ToolStatusBar({ }>; }) { return ( -
-
- {tools.map((tool) => { - const duration = formatDuration(tool.durationMs); - const statusLabel = tool.status === 'running' ? 'running' : (tool.status === 'error' ? 'error' : 'done'); - return ( -
- - {tool.name} - {statusLabel} - - {duration && {duration}} - {tool.summary && ( - {tool.summary} - )} -
- ); - })} -
+
+ {tools.map((tool) => { + const duration = formatDuration(tool.durationMs); + const isRunning = tool.status === 'running'; + const isError = tool.status === 'error'; + return ( +
+ {isRunning && } + {!isRunning && !isError && } + {isError && } + + {tool.name} + {duration && {duration}} + {tool.summary && ( + {tool.summary} + )} +
+ ); + })}
); } @@ -595,7 +599,8 @@ function ToolCard({ name, input }: { name: string; input: unknown }) { className="flex items-center gap-2 w-full px-3 py-1.5 text-muted-foreground hover:text-foreground transition-colors" onClick={() => setExpanded(!expanded)} > - + + {name} {expanded ? : } diff --git a/src/pages/Chat/index.tsx b/src/pages/Chat/index.tsx index 32a357978..8e976dead 100644 --- a/src/pages/Chat/index.tsx +++ b/src/pages/Chat/index.tsx @@ -5,7 +5,7 @@ * are in the toolbar; messages render with markdown + streaming. */ import { useEffect, useRef, useState } from 'react'; -import { AlertCircle, Bot, MessageSquare, Sparkles } from 'lucide-react'; +import { AlertCircle, Bot, Loader2, MessageSquare, Sparkles } from 'lucide-react'; import { Card, CardContent } from '@/components/ui/card'; import { useChatStore, type RawMessage } from '@/stores/chat'; import { useGatewayStore } from '@/stores/gateway'; @@ -28,6 +28,7 @@ export function Chat() { const showThinking = useChatStore((s) => s.showThinking); const streamingMessage = useChatStore((s) => s.streamingMessage); const streamingTools = useChatStore((s) => s.streamingTools); + const pendingFinal = useChatStore((s) => s.pendingFinal); const loadHistory = useChatStore((s) => s.loadHistory); const loadSessions = useChatStore((s) => s.loadSessions); const sendMessage = useChatStore((s) => s.sendMessage); @@ -51,10 +52,10 @@ export function Chat() { }; }, [isGatewayRunning, loadHistory, loadSessions]); - // Auto-scroll on new messages or streaming + // Auto-scroll on new messages, streaming, or activity changes useEffect(() => { messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' }); - }, [messages, streamingMessage, sending]); + }, [messages, streamingMessage, sending, pendingFinal]); // Update timestamp when sending starts useEffect(() => { @@ -79,7 +80,6 @@ export function Chat() { ); } - // Extract streaming text for display const streamMsg = streamingMessage && typeof streamingMessage === 'object' ? streamingMessage as unknown as { role?: string; content?: unknown; timestamp?: number } : null; @@ -88,11 +88,12 @@ export function Chat() { const streamThinking = streamMsg ? extractThinking(streamMsg) : null; const hasStreamThinking = showThinking && !!streamThinking && streamThinking.trim().length > 0; const streamTools = streamMsg ? extractToolUse(streamMsg) : []; - const hasStreamTools = showThinking && streamTools.length > 0; + const hasStreamTools = streamTools.length > 0; const streamImages = streamMsg ? extractImages(streamMsg) : []; const hasStreamImages = streamImages.length > 0; - const hasStreamToolStatus = showThinking && streamingTools.length > 0; + const hasStreamToolStatus = streamingTools.length > 0; const shouldRenderStreaming = sending && (hasStreamText || hasStreamThinking || hasStreamTools || hasStreamImages || hasStreamToolStatus); + const hasAnyStreamContent = hasStreamText || hasStreamThinking || hasStreamTools || hasStreamImages || hasStreamToolStatus; return (
@@ -141,8 +142,13 @@ export function Chat() { /> )} - {/* Typing indicator when sending but no stream yet */} - {sending && !hasStreamText && !hasStreamThinking && !hasStreamTools && !hasStreamImages && !hasStreamToolStatus && ( + {/* Activity indicator: waiting for next AI turn after tool execution */} + {sending && pendingFinal && !shouldRenderStreaming && ( + + )} + + {/* Typing indicator when sending but no stream content yet */} + {sending && !pendingFinal && !hasAnyStreamContent && ( )} @@ -233,4 +239,23 @@ function TypingIndicator() { ); } +// ── Activity Indicator (shown between tool cycles) ───────────── + +function ActivityIndicator({ phase }: { phase: 'tool_processing' }) { + void phase; + return ( +
+
+ +
+
+
+ + Processing tool results… +
+
+
+ ); +} + export default Chat; diff --git a/src/stores/chat.ts b/src/stores/chat.ts index 0b935e731..aecb6fefa 100644 --- a/src/stores/chat.ts +++ b/src/stores/chat.ts @@ -109,6 +109,24 @@ interface ChatState { // between tool-result finals and the next delta. let _lastChatEventAt = 0; +/** Normalize a timestamp to milliseconds. Handles both seconds and ms. */ +function toMs(ts: number): number { + // Timestamps < 1e12 are in seconds (before ~2033); >= 1e12 are milliseconds + return ts < 1e12 ? ts * 1000 : ts; +} + +// Timer for fallback history polling during active sends. +// If no streaming events arrive within a few seconds, we periodically +// poll chat.history to surface intermediate tool-call turns. +let _historyPollTimer: ReturnType | null = null; + +function clearHistoryPoll(): void { + if (_historyPollTimer) { + clearTimeout(_historyPollTimer); + _historyPollTimer = null; + } +} + const DEFAULT_CANONICAL_PREFIX = 'agent:main'; const DEFAULT_SESSION_KEY = `${DEFAULT_CANONICAL_PREFIX}:main`; @@ -1014,6 +1032,7 @@ export const useChatStore = create((set, get) => ({ if (result.success && result.result) { const data = result.result; const rawMessages = Array.isArray(data.messages) ? data.messages as RawMessage[] : []; + // Before filtering: attach images/files from tool_result messages to the next assistant message const messagesWithToolImages = enrichWithToolResultFiles(rawMessages); const filteredMessages = messagesWithToolImages.filter((msg) => !isToolResultRole(msg.role)); @@ -1037,15 +1056,36 @@ export const useChatStore = create((set, get) => ({ }); } }); - const { pendingFinal, lastUserMessageAt } = get(); - if (pendingFinal) { + const { pendingFinal, lastUserMessageAt, sending: isSendingNow } = get(); + + // If we're sending but haven't received streaming events, check + // whether the loaded history reveals intermediate tool-call activity. + // This surfaces progress via the pendingFinal → ActivityIndicator path. + const userMsTs = lastUserMessageAt ? toMs(lastUserMessageAt) : 0; + const isAfterUserMsg = (msg: RawMessage): boolean => { + if (!userMsTs || !msg.timestamp) return true; + return toMs(msg.timestamp) >= userMsTs; + }; + + if (isSendingNow && !pendingFinal) { + const hasRecentAssistantActivity = [...filteredMessages].reverse().some((msg) => { + if (msg.role !== 'assistant') return false; + return isAfterUserMsg(msg); + }); + if (hasRecentAssistantActivity) { + set({ pendingFinal: true }); + } + } + + // If pendingFinal, check whether the AI produced a final text response. + if (pendingFinal || get().pendingFinal) { const recentAssistant = [...filteredMessages].reverse().find((msg) => { if (msg.role !== 'assistant') return false; if (!hasNonToolAssistantContent(msg)) return false; - if (lastUserMessageAt && msg.timestamp && msg.timestamp < lastUserMessageAt) return false; - return true; + return isAfterUserMsg(msg); }); if (recentAssistant) { + clearHistoryPoll(); set({ sending: false, activeRunId: null, pendingFinal: false }); } } @@ -1067,10 +1107,11 @@ export const useChatStore = create((set, get) => ({ const { currentSessionKey } = get(); // Add user message optimistically (with local file metadata for UI display) + const nowMs = Date.now(); const userMsg: RawMessage = { role: 'user', content: trimmed || (attachments?.length ? '(file attached)' : ''), - timestamp: Date.now() / 1000, + timestamp: nowMs / 1000, id: crypto.randomUUID(), _attachedFiles: attachments?.map(a => ({ fileName: a.fileName, @@ -1088,13 +1129,55 @@ export const useChatStore = create((set, get) => ({ streamingMessage: null, streamingTools: [], pendingFinal: false, - lastUserMessageAt: userMsg.timestamp ?? null, + lastUserMessageAt: nowMs, })); + // Start the history poll and safety timeout IMMEDIATELY (before the + // RPC await) because the gateway's chat.send RPC may block until the + // entire agentic conversation finishes — the poll must run in parallel. + _lastChatEventAt = Date.now(); + clearHistoryPoll(); + + const POLL_START_DELAY = 3_000; + const POLL_INTERVAL = 4_000; + const pollHistory = () => { + const state = get(); + if (!state.sending) { clearHistoryPoll(); return; } + if (state.streamingMessage) { + _historyPollTimer = setTimeout(pollHistory, POLL_INTERVAL); + return; + } + state.loadHistory(true); + _historyPollTimer = setTimeout(pollHistory, POLL_INTERVAL); + }; + _historyPollTimer = setTimeout(pollHistory, POLL_START_DELAY); + + const SAFETY_TIMEOUT_MS = 90_000; + const checkStuck = () => { + const state = get(); + if (!state.sending) return; + if (state.streamingMessage || state.streamingText) return; + if (state.pendingFinal) { + setTimeout(checkStuck, 10_000); + return; + } + if (Date.now() - _lastChatEventAt < SAFETY_TIMEOUT_MS) { + setTimeout(checkStuck, 10_000); + return; + } + clearHistoryPoll(); + set({ + error: 'No response received from the model. The provider may be unavailable or the API key may have insufficient quota. Please check your provider settings.', + sending: false, + activeRunId: null, + lastUserMessageAt: null, + }); + }; + setTimeout(checkStuck, 30_000); + try { const idempotencyKey = crypto.randomUUID(); const hasMedia = attachments && attachments.length > 0; - console.log(`[sendMessage] hasMedia=${hasMedia}, attachmentCount=${attachments?.length ?? 0}`); if (hasMedia) { console.log('[sendMessage] Media paths:', attachments!.map(a => a.stagedPath)); } @@ -1117,8 +1200,6 @@ export const useChatStore = create((set, get) => ({ let result: { success: boolean; result?: { runId?: string }; error?: string }; if (hasMedia) { - // Use dedicated chat:sendWithMedia handler — main process reads staged files - // from disk and builds base64 attachments, avoiding large IPC transfers result = await window.electron.ipcRenderer.invoke( 'chat:sendWithMedia', { @@ -1134,7 +1215,6 @@ export const useChatStore = create((set, get) => ({ }, ) as { success: boolean; result?: { runId?: string }; error?: string }; } else { - // No media — use standard lightweight RPC result = await window.electron.ipcRenderer.invoke( 'gateway:rpc', 'chat.send', @@ -1147,50 +1227,16 @@ export const useChatStore = create((set, get) => ({ ) as { success: boolean; result?: { runId?: string }; error?: string }; } - console.log(`[sendMessage] RPC result: success=${result.success}, error=${result.error || 'none'}, runId=${result.result?.runId || 'none'}`); + console.log(`[sendMessage] RPC result: success=${result.success}, runId=${result.result?.runId || 'none'}`); if (!result.success) { + clearHistoryPoll(); set({ error: result.error || 'Failed to send message', sending: false }); } else if (result.result?.runId) { set({ activeRunId: result.result.runId }); - } else { - // No runId from gateway; keep sending state and wait for events. - } - - // Safety timeout: if we're still in "sending" state without receiving - // any chat event for SAFETY_TIMEOUT_MS, the run likely failed silently - // (e.g. provider error not surfaced as a chat event). Surface the error - // to the user instead of leaving an infinite spinner. - // - // The timeout is based on the last received event (not the original send - // time) so that long-running tool-use conversations don't trigger false - // positives — each received event resets the clock. - if (result.success) { - _lastChatEventAt = Date.now(); - const SAFETY_TIMEOUT_MS = 90_000; - const checkStuck = () => { - const state = get(); - if (!state.sending) return; - if (state.streamingMessage || state.streamingText) return; - // Still between tool cycles (pendingFinal) — the model is working - if (state.pendingFinal) { - setTimeout(checkStuck, 10_000); - return; - } - if (Date.now() - _lastChatEventAt < SAFETY_TIMEOUT_MS) { - setTimeout(checkStuck, 10_000); - return; - } - set({ - error: 'No response received from the model. The provider may be unavailable or the API key may have insufficient quota. Please check your provider settings.', - sending: false, - activeRunId: null, - lastUserMessageAt: null, - }); - }; - setTimeout(checkStuck, 30_000); } } catch (err) { + clearHistoryPoll(); set({ error: String(err), sending: false }); } }, @@ -1198,6 +1244,7 @@ export const useChatStore = create((set, get) => ({ // ── Abort active run ── abortRun: async () => { + clearHistoryPoll(); const { currentSessionKey } = get(); set({ sending: false, streamingText: '', streamingMessage: null, pendingFinal: false, lastUserMessageAt: null, pendingToolImages: [] }); set({ streamingTools: [] }); @@ -1226,21 +1273,27 @@ export const useChatStore = create((set, get) => ({ _lastChatEventAt = Date.now(); // Defensive: if state is missing but we have a message, try to infer state. - // This handles the case where the Gateway sends events without a state wrapper - // (e.g., protocol events where payload is the raw message). let resolvedState = eventState; if (!resolvedState && event.message && typeof event.message === 'object') { const msg = event.message as Record; const stopReason = msg.stopReason ?? msg.stop_reason; if (stopReason) { - // Message has a stopReason → it's a final message resolvedState = 'final'; } else if (msg.role || msg.content) { - // Message has role/content but no stopReason → treat as delta (streaming) resolvedState = 'delta'; } } + // Only pause the history poll when we receive actual streaming data. + // The gateway sends "agent" events with { phase, startedAt } that carry + // no message — these must NOT kill the poll, since the poll is our only + // way to track progress when the gateway doesn't stream intermediate turns. + const hasUsefulData = resolvedState === 'delta' || resolvedState === 'final' + || resolvedState === 'error' || resolvedState === 'aborted'; + if (hasUsefulData) { + clearHistoryPoll(); + } + switch (resolvedState) { case 'delta': { // Streaming update - store the cumulative message @@ -1385,6 +1438,7 @@ export const useChatStore = create((set, get) => ({ // After the final response, quietly reload history to surface all intermediate // tool-use turns (thinking + tool blocks) from the Gateway's authoritative record. if (hasOutput && !toolOnly) { + clearHistoryPoll(); void get().loadHistory(true); } } else { @@ -1395,6 +1449,7 @@ export const useChatStore = create((set, get) => ({ break; } case 'error': { + clearHistoryPoll(); const errorMsg = String(event.errorMessage || 'An error occurred'); set({ error: errorMsg, @@ -1410,6 +1465,7 @@ export const useChatStore = create((set, get) => ({ break; } case 'aborted': { + clearHistoryPoll(); set({ sending: false, activeRunId: null, diff --git a/src/stores/gateway.ts b/src/stores/gateway.ts index 844b75eee..4d90ac3a5 100644 --- a/src/stores/gateway.ts +++ b/src/stores/gateway.ts @@ -74,26 +74,37 @@ export const useGatewayStore = create((set, get) => ({ const p = payload.params; const data = (p.data && typeof p.data === 'object') ? (p.data as Record) : {}; - const normalizedEvent: Record = { - // Spread data sub-object first (nested layout) - ...data, - // Then override with top-level params fields (flat layout takes precedence) - runId: p.runId ?? data.runId, - sessionKey: p.sessionKey ?? data.sessionKey, - stream: p.stream ?? data.stream, - seq: p.seq ?? data.seq, - // Critical: also pick up state and message from params (flat layout) - state: p.state ?? data.state, - message: p.message ?? data.message, - }; + const phase = data.phase ?? p.phase; - import('./chat') - .then(({ useChatStore }) => { - useChatStore.getState().handleChatEvent(normalizedEvent); - }) - .catch((err) => { - console.warn('Failed to forward gateway notification event:', err); - }); + const hasChatData = (p.state ?? data.state) || (p.message ?? data.message); + if (hasChatData) { + const normalizedEvent: Record = { + ...data, + runId: p.runId ?? data.runId, + sessionKey: p.sessionKey ?? data.sessionKey, + stream: p.stream ?? data.stream, + seq: p.seq ?? data.seq, + state: p.state ?? data.state, + message: p.message ?? data.message, + }; + import('./chat') + .then(({ useChatStore }) => { + useChatStore.getState().handleChatEvent(normalizedEvent); + }) + .catch(() => {}); + } + + // When the agent run completes, reload history to get the final response. + if (phase === 'completed' || phase === 'done' || phase === 'finished' || phase === 'end') { + import('./chat') + .then(({ useChatStore }) => { + const state = useChatStore.getState(); + if (state.sending) { + state.loadHistory(true); + } + }) + .catch(() => {}); + } }); // Listen for chat events from the gateway and forward to chat store. @@ -102,32 +113,48 @@ export const useGatewayStore = create((set, get) => ({ // or the raw chat message itself. We need to handle both. window.electron.ipcRenderer.on('gateway:chat-message', (data) => { try { - // Dynamic import to avoid circular dependency import('./chat').then(({ useChatStore }) => { const chatData = data as Record; - // Unwrap the { message: payload } wrapper from handleProtocolEvent const payload = ('message' in chatData && typeof chatData.message === 'object') ? chatData.message as Record : chatData; - // If payload has a 'state' field, it's already a proper event wrapper if (payload.state) { useChatStore.getState().handleChatEvent(payload); return; } - // Otherwise, payload is the raw message — wrap it as a 'final' event - // so handleChatEvent can process it (this happens when the Gateway - // sends protocol events with the message directly as payload). - const syntheticEvent: Record = { + // Raw message without state wrapper — treat as final + useChatStore.getState().handleChatEvent({ state: 'final', message: payload, runId: chatData.runId ?? payload.runId, - }; - useChatStore.getState().handleChatEvent(syntheticEvent); - }); - } catch (err) { - console.warn('Failed to forward chat event:', err); + }); + }).catch(() => {}); + } catch { + // Silently ignore forwarding failures + } + }); + + // Catch-all: handle unmatched gateway messages that fell through + // all protocol/notification handlers in the main process. + // This prevents events from being silently lost. + window.electron.ipcRenderer.on('gateway:message', (data) => { + if (!data || typeof data !== 'object') return; + const msg = data as Record; + + // Try to detect if this is a chat-related event and forward it + if (msg.state && msg.message) { + import('./chat').then(({ useChatStore }) => { + useChatStore.getState().handleChatEvent(msg); + }).catch(() => {}); + } else if (msg.role && msg.content) { + import('./chat').then(({ useChatStore }) => { + useChatStore.getState().handleChatEvent({ + state: 'final', + message: msg, + }); + }).catch(() => {}); } });