/** * Chat State Store * Manages chat messages, sessions, streaming, and thinking state. * Communicates with OpenClaw Gateway via renderer WebSocket RPC. */ import { create } from 'zustand'; import { hostApiFetch } from '@/lib/host-api'; import { useGatewayStore } from './gateway'; import { useAgentsStore } from './agents'; import { buildCronSessionHistoryPath, isCronSessionKey } from './chat/cron-session-utils'; import { DEFAULT_CANONICAL_PREFIX, DEFAULT_SESSION_KEY, type AttachedFileMeta, type ChatSession, type ChatState, type ContentBlock, type RawMessage, type ToolStatus, } from './chat/types'; export type { AttachedFileMeta, ChatSession, ContentBlock, RawMessage, ToolStatus, } from './chat/types'; // Module-level timestamp tracking the last chat event received. // Used by the safety timeout to avoid false-positive "no response" errors // during tool-use conversations where streamingMessage is temporarily cleared // between tool-result finals and the next delta. let _lastChatEventAt = 0; /** Normalize a timestamp to milliseconds. Handles both seconds and ms. */ function toMs(ts: number): number { // Timestamps < 1e12 are in seconds (before ~2033); >= 1e12 are milliseconds return ts < 1e12 ? ts * 1000 : ts; } // Timer for fallback history polling during active sends. // If no streaming events arrive within a few seconds, we periodically // poll chat.history to surface intermediate tool-call turns. let _historyPollTimer: ReturnType | null = null; // Timer for delayed error finalization. When the Gateway reports a mid-stream // error (e.g. "terminated"), it may retry internally and recover. We wait // before committing the error to give the recovery path a chance. let _errorRecoveryTimer: ReturnType | null = null; let _loadSessionsInFlight: Promise | null = null; let _lastLoadSessionsAt = 0; const _historyLoadInFlight = new Map>(); const _lastHistoryLoadAtBySession = new Map(); const SESSION_LOAD_MIN_INTERVAL_MS = 1_200; const HISTORY_LOAD_MIN_INTERVAL_MS = 800; const HISTORY_POLL_SILENCE_WINDOW_MS = 2_500; const CHAT_EVENT_DEDUPE_TTL_MS = 30_000; const _chatEventDedupe = new Map(); function clearErrorRecoveryTimer(): void { if (_errorRecoveryTimer) { clearTimeout(_errorRecoveryTimer); _errorRecoveryTimer = null; } } function clearHistoryPoll(): void { if (_historyPollTimer) { clearTimeout(_historyPollTimer); _historyPollTimer = null; } } function pruneChatEventDedupe(now: number): void { for (const [key, ts] of _chatEventDedupe.entries()) { if (now - ts > CHAT_EVENT_DEDUPE_TTL_MS) { _chatEventDedupe.delete(key); } } } function buildChatEventDedupeKey(eventState: string, event: Record): string | null { const runId = event.runId != null ? String(event.runId) : ''; const sessionKey = event.sessionKey != null ? String(event.sessionKey) : ''; const seq = event.seq != null ? String(event.seq) : ''; if (runId || sessionKey || seq || eventState) { return [runId, sessionKey, seq, eventState].join('|'); } const msg = (event.message && typeof event.message === 'object') ? event.message as Record : null; if (msg) { const messageId = msg.id != null ? String(msg.id) : ''; const stopReason = msg.stopReason ?? msg.stop_reason; if (messageId || stopReason) { return `msg|${messageId}|${String(stopReason ?? '')}|${eventState}`; } } return null; } function isDuplicateChatEvent(eventState: string, event: Record): boolean { const key = buildChatEventDedupeKey(eventState, event); if (!key) return false; const now = Date.now(); pruneChatEventDedupe(now); if (_chatEventDedupe.has(key)) { return true; } _chatEventDedupe.set(key, now); return false; } // ── Local image cache ───────────────────────────────────────── // The Gateway doesn't store image attachments in session content blocks, // so we cache them locally keyed by staged file path (which appears in the // [media attached: ...] reference in the Gateway's user message text). // Keying by path avoids the race condition of keying by runId (which is only // available after the RPC returns, but history may load before that). const IMAGE_CACHE_KEY = 'clawx:image-cache'; const IMAGE_CACHE_MAX = 100; // max entries to prevent unbounded growth function loadImageCache(): Map { try { const raw = localStorage.getItem(IMAGE_CACHE_KEY); if (raw) { const entries = JSON.parse(raw) as Array<[string, AttachedFileMeta]>; return new Map(entries); } } catch { /* ignore parse errors */ } return new Map(); } function saveImageCache(cache: Map): void { try { // Evict oldest entries if over limit const entries = Array.from(cache.entries()); const trimmed = entries.length > IMAGE_CACHE_MAX ? entries.slice(entries.length - IMAGE_CACHE_MAX) : entries; localStorage.setItem(IMAGE_CACHE_KEY, JSON.stringify(trimmed)); } catch { /* ignore quota errors */ } } const _imageCache = loadImageCache(); /** Extract plain text from message content (string or content blocks) */ function getMessageText(content: unknown): string { if (typeof content === 'string') return content; if (Array.isArray(content)) { return (content as Array<{ type?: string; text?: string }>) .filter(b => b.type === 'text' && b.text) .map(b => b.text!) .join('\n'); } return ''; } /** Extract media file refs from [media attached: () | ...] patterns */ function extractMediaRefs(text: string): Array<{ filePath: string; mimeType: string }> { const refs: Array<{ filePath: string; mimeType: string }> = []; const regex = /\[media attached:\s*([^\s(]+)\s*\(([^)]+)\)\s*\|[^\]]*\]/g; let match; while ((match = regex.exec(text)) !== null) { refs.push({ filePath: match[1], mimeType: match[2] }); } return refs; } /** Map common file extensions to MIME types */ function mimeFromExtension(filePath: string): string { const ext = filePath.split('.').pop()?.toLowerCase() || ''; const map: Record = { // Images 'png': 'image/png', 'jpg': 'image/jpeg', 'jpeg': 'image/jpeg', 'gif': 'image/gif', 'webp': 'image/webp', 'bmp': 'image/bmp', 'avif': 'image/avif', 'svg': 'image/svg+xml', // Documents 'pdf': 'application/pdf', 'doc': 'application/msword', 'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', 'xls': 'application/vnd.ms-excel', 'xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', 'ppt': 'application/vnd.ms-powerpoint', 'pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation', 'txt': 'text/plain', 'csv': 'text/csv', 'md': 'text/markdown', 'rtf': 'application/rtf', 'epub': 'application/epub+zip', // Archives 'zip': 'application/zip', 'tar': 'application/x-tar', 'gz': 'application/gzip', 'rar': 'application/vnd.rar', '7z': 'application/x-7z-compressed', // Audio 'mp3': 'audio/mpeg', 'wav': 'audio/wav', 'ogg': 'audio/ogg', 'aac': 'audio/aac', 'flac': 'audio/flac', 'm4a': 'audio/mp4', // Video 'mp4': 'video/mp4', 'mov': 'video/quicktime', 'avi': 'video/x-msvideo', 'mkv': 'video/x-matroska', 'webm': 'video/webm', 'm4v': 'video/mp4', }; return map[ext] || 'application/octet-stream'; } /** * Extract raw file paths from message text. * Detects absolute paths (Unix: / or ~/, Windows: C:\ etc.) ending with common file extensions. * Handles both image and non-image files, consistent with channel push message behavior. */ function extractRawFilePaths(text: string): Array<{ filePath: string; mimeType: string }> { const refs: Array<{ filePath: string; mimeType: string }> = []; const seen = new Set(); const exts = 'png|jpe?g|gif|webp|bmp|avif|svg|pdf|docx?|xlsx?|pptx?|txt|csv|md|rtf|epub|zip|tar|gz|rar|7z|mp3|wav|ogg|aac|flac|m4a|mp4|mov|avi|mkv|webm|m4v'; // Unix absolute paths (/... or ~/...) — lookbehind rejects mid-token slashes // (e.g. "path/to/file.mp4", "https://example.com/file.mp4") const unixRegex = new RegExp(`(?]*?\\.(?:${exts}))`, 'gi'); // Windows absolute paths (C:\... D:\...) — lookbehind rejects drive letter glued to a word const winRegex = new RegExp(`(?]*?\\.(?:${exts}))`, 'gi'); for (const regex of [unixRegex, winRegex]) { let match; while ((match = regex.exec(text)) !== null) { const p = match[1]; if (p && !seen.has(p)) { seen.add(p); refs.push({ filePath: p, mimeType: mimeFromExtension(p) }); } } } return refs; } /** * Extract images from a content array (including nested tool_result content). * Converts them to AttachedFileMeta entries with preview set to data URL or remote URL. */ function extractImagesAsAttachedFiles(content: unknown): AttachedFileMeta[] { if (!Array.isArray(content)) return []; const files: AttachedFileMeta[] = []; for (const block of content as ContentBlock[]) { if (block.type === 'image') { // Path 1: Anthropic source-wrapped format {source: {type, media_type, data}} if (block.source) { const src = block.source; const mimeType = src.media_type || 'image/jpeg'; if (src.type === 'base64' && src.data) { files.push({ fileName: 'image', mimeType, fileSize: 0, preview: `data:${mimeType};base64,${src.data}`, }); } else if (src.type === 'url' && src.url) { files.push({ fileName: 'image', mimeType, fileSize: 0, preview: src.url, }); } } // Path 2: Flat format from Gateway tool results {data, mimeType} else if (block.data) { const mimeType = block.mimeType || 'image/jpeg'; files.push({ fileName: 'image', mimeType, fileSize: 0, preview: `data:${mimeType};base64,${block.data}`, }); } } // Recurse into tool_result content blocks if ((block.type === 'tool_result' || block.type === 'toolResult') && block.content) { files.push(...extractImagesAsAttachedFiles(block.content)); } } return files; } /** * Build an AttachedFileMeta entry for a file ref, using cache if available. */ function makeAttachedFile(ref: { filePath: string; mimeType: string }): AttachedFileMeta { const cached = _imageCache.get(ref.filePath); if (cached) return { ...cached, filePath: ref.filePath }; const fileName = ref.filePath.split(/[\\/]/).pop() || 'file'; return { fileName, mimeType: ref.mimeType, fileSize: 0, preview: null, filePath: ref.filePath }; } /** * Extract file path from a tool call's arguments by toolCallId. * Searches common argument names: file_path, filePath, path, file. */ function getToolCallFilePath(msg: RawMessage, toolCallId: string): string | undefined { if (!toolCallId) return undefined; // Anthropic/normalized format — toolCall blocks in content array const content = msg.content; if (Array.isArray(content)) { for (const block of content as ContentBlock[]) { if ((block.type === 'tool_use' || block.type === 'toolCall') && block.id === toolCallId) { const args = (block.input ?? block.arguments) as Record | undefined; if (args) { const fp = args.file_path ?? args.filePath ?? args.path ?? args.file; if (typeof fp === 'string') return fp; } } } } // OpenAI format — tool_calls array on the message itself const msgAny = msg as unknown as Record; const toolCalls = msgAny.tool_calls ?? msgAny.toolCalls; if (Array.isArray(toolCalls)) { for (const tc of toolCalls as Array>) { if (tc.id !== toolCallId) continue; const fn = (tc.function ?? tc) as Record; let args: Record | undefined; try { args = typeof fn.arguments === 'string' ? JSON.parse(fn.arguments) : (fn.arguments ?? fn.input) as Record; } catch { /* ignore */ } if (args) { const fp = args.file_path ?? args.filePath ?? args.path ?? args.file; if (typeof fp === 'string') return fp; } } } return undefined; } /** * Collect all tool call file paths from a message into a Map. */ function collectToolCallPaths(msg: RawMessage, paths: Map): void { const content = msg.content; if (Array.isArray(content)) { for (const block of content as ContentBlock[]) { if ((block.type === 'tool_use' || block.type === 'toolCall') && block.id) { const args = (block.input ?? block.arguments) as Record | undefined; if (args) { const fp = args.file_path ?? args.filePath ?? args.path ?? args.file; if (typeof fp === 'string') paths.set(block.id, fp); } } } } const msgAny = msg as unknown as Record; const toolCalls = msgAny.tool_calls ?? msgAny.toolCalls; if (Array.isArray(toolCalls)) { for (const tc of toolCalls as Array>) { const id = typeof tc.id === 'string' ? tc.id : ''; if (!id) continue; const fn = (tc.function ?? tc) as Record; let args: Record | undefined; try { args = typeof fn.arguments === 'string' ? JSON.parse(fn.arguments) : (fn.arguments ?? fn.input) as Record; } catch { /* ignore */ } if (args) { const fp = args.file_path ?? args.filePath ?? args.path ?? args.file; if (typeof fp === 'string') paths.set(id, fp); } } } } /** * Before filtering tool_result messages from history, scan them for any file/image * content and attach those to the immediately following assistant message. * This mirrors channel push message behavior where tool outputs surface files to the UI. * Handles: * - Image content blocks (base64 / url) * - [media attached: path (mime) | path] text patterns in tool result output * - Raw file paths in tool result text */ function enrichWithToolResultFiles(messages: RawMessage[]): RawMessage[] { const pending: AttachedFileMeta[] = []; const toolCallPaths = new Map(); return messages.map((msg) => { // Track file paths from assistant tool call arguments for later matching if (msg.role === 'assistant') { collectToolCallPaths(msg, toolCallPaths); } if (isToolResultRole(msg.role)) { // Resolve file path from the matching tool call const matchedPath = msg.toolCallId ? toolCallPaths.get(msg.toolCallId) : undefined; // 1. Image/file content blocks in the structured content array const imageFiles = extractImagesAsAttachedFiles(msg.content); if (matchedPath) { for (const f of imageFiles) { if (!f.filePath) { f.filePath = matchedPath; f.fileName = matchedPath.split(/[\\/]/).pop() || 'image'; } } } pending.push(...imageFiles); // 2. [media attached: ...] patterns in tool result text output const text = getMessageText(msg.content); if (text) { const mediaRefs = extractMediaRefs(text); const mediaRefPaths = new Set(mediaRefs.map(r => r.filePath)); for (const ref of mediaRefs) { pending.push(makeAttachedFile(ref)); } // 3. Raw file paths in tool result text (documents, audio, video, etc.) for (const ref of extractRawFilePaths(text)) { if (!mediaRefPaths.has(ref.filePath)) { pending.push(makeAttachedFile(ref)); } } } return msg; // will be filtered later } if (msg.role === 'assistant' && pending.length > 0) { const toAttach = pending.splice(0); // Deduplicate against files already on the assistant message const existingPaths = new Set( (msg._attachedFiles || []).map(f => f.filePath).filter(Boolean), ); const newFiles = toAttach.filter(f => !f.filePath || !existingPaths.has(f.filePath)); if (newFiles.length === 0) return msg; return { ...msg, _attachedFiles: [...(msg._attachedFiles || []), ...newFiles], }; } return msg; }); } /** * Restore _attachedFiles for messages loaded from history. * Handles: * 1. [media attached: path (mime) | path] patterns (attachment-button flow) * 2. Raw image file paths typed in message text (e.g. /Users/.../image.png) * Uses local cache for previews when available; missing previews are loaded async. */ function enrichWithCachedImages(messages: RawMessage[]): RawMessage[] { return messages.map((msg, idx) => { // Only process user and assistant messages; skip if already enriched if ((msg.role !== 'user' && msg.role !== 'assistant') || msg._attachedFiles) return msg; const text = getMessageText(msg.content); // Path 1: [media attached: path (mime) | path] — guaranteed format from attachment button const mediaRefs = extractMediaRefs(text); const mediaRefPaths = new Set(mediaRefs.map(r => r.filePath)); // Path 2: Raw file paths. // For assistant messages: scan own text AND the nearest preceding user message text, // but only for non-tool-only assistant messages (i.e. the final answer turn). // Tool-only messages (thinking + tool calls) should not show file previews — those // belong to the final answer message that comes after the tool results. // User messages never get raw-path previews so the image is not shown twice. let rawRefs: Array<{ filePath: string; mimeType: string }> = []; if (msg.role === 'assistant' && !isToolOnlyMessage(msg)) { // Own text rawRefs = extractRawFilePaths(text).filter(r => !mediaRefPaths.has(r.filePath)); // Nearest preceding user message text (look back up to 5 messages) const seenPaths = new Set(rawRefs.map(r => r.filePath)); for (let i = idx - 1; i >= Math.max(0, idx - 5); i--) { const prev = messages[i]; if (!prev) break; if (prev.role === 'user') { const prevText = getMessageText(prev.content); for (const ref of extractRawFilePaths(prevText)) { if (!mediaRefPaths.has(ref.filePath) && !seenPaths.has(ref.filePath)) { seenPaths.add(ref.filePath); rawRefs.push(ref); } } break; // only use the nearest user message } } } const allRefs = [...mediaRefs, ...rawRefs]; if (allRefs.length === 0) return msg; const files: AttachedFileMeta[] = allRefs.map(ref => { const cached = _imageCache.get(ref.filePath); if (cached) return { ...cached, filePath: ref.filePath }; const fileName = ref.filePath.split(/[\\/]/).pop() || 'file'; return { fileName, mimeType: ref.mimeType, fileSize: 0, preview: null, filePath: ref.filePath }; }); return { ...msg, _attachedFiles: files }; }); } /** * Async: load missing previews from disk via IPC for messages that have * _attachedFiles with null previews. Updates messages in-place and triggers re-render. * Handles both [media attached: ...] patterns and raw filePath entries. */ async function loadMissingPreviews(messages: RawMessage[]): Promise { // Collect all image paths that need previews const needPreview: Array<{ filePath: string; mimeType: string }> = []; const seenPaths = new Set(); for (const msg of messages) { if (!msg._attachedFiles) continue; // Path 1: files with explicit filePath field (raw path detection or enriched refs) for (const file of msg._attachedFiles) { const fp = file.filePath; if (!fp || seenPaths.has(fp)) continue; // Images: need preview. Non-images: need file size (for FileCard display). const needsLoad = file.mimeType.startsWith('image/') ? !file.preview : file.fileSize === 0; if (needsLoad) { seenPaths.add(fp); needPreview.push({ filePath: fp, mimeType: file.mimeType }); } } // Path 2: [media attached: ...] patterns (legacy — in case filePath wasn't stored) if (msg.role === 'user') { const text = getMessageText(msg.content); const refs = extractMediaRefs(text); for (let i = 0; i < refs.length; i++) { const file = msg._attachedFiles[i]; const ref = refs[i]; if (!file || !ref || seenPaths.has(ref.filePath)) continue; const needsLoad = ref.mimeType.startsWith('image/') ? !file.preview : file.fileSize === 0; if (needsLoad) { seenPaths.add(ref.filePath); needPreview.push(ref); } } } } if (needPreview.length === 0) return false; try { const thumbnails = await hostApiFetch>( '/api/files/thumbnails', { method: 'POST', body: JSON.stringify({ paths: needPreview }), }, ); let updated = false; for (const msg of messages) { if (!msg._attachedFiles) continue; // Update files that have filePath for (const file of msg._attachedFiles) { const fp = file.filePath; if (!fp) continue; const thumb = thumbnails[fp]; if (thumb && (thumb.preview || thumb.fileSize)) { if (thumb.preview) file.preview = thumb.preview; if (thumb.fileSize) file.fileSize = thumb.fileSize; _imageCache.set(fp, { ...file }); updated = true; } } // Legacy: update by index for [media attached: ...] refs if (msg.role === 'user') { const text = getMessageText(msg.content); const refs = extractMediaRefs(text); for (let i = 0; i < refs.length; i++) { const file = msg._attachedFiles[i]; const ref = refs[i]; if (!file || !ref || file.filePath) continue; // skip if already handled via filePath const thumb = thumbnails[ref.filePath]; if (thumb && (thumb.preview || thumb.fileSize)) { if (thumb.preview) file.preview = thumb.preview; if (thumb.fileSize) file.fileSize = thumb.fileSize; _imageCache.set(ref.filePath, { ...file }); updated = true; } } } } if (updated) saveImageCache(_imageCache); return updated; } catch (err) { console.warn('[loadMissingPreviews] Failed:', err); return false; } } function getCanonicalPrefixFromSessions(sessions: ChatSession[]): string | null { const canonical = sessions.find((s) => s.key.startsWith('agent:'))?.key; if (!canonical) return null; const parts = canonical.split(':'); if (parts.length < 2) return null; return `${parts[0]}:${parts[1]}`; } function getAgentIdFromSessionKey(sessionKey: string): string { if (!sessionKey.startsWith('agent:')) return 'main'; const parts = sessionKey.split(':'); return parts[1] || 'main'; } function parseSessionUpdatedAtMs(value: unknown): number | undefined { if (typeof value === 'number' && Number.isFinite(value)) { return toMs(value); } if (typeof value === 'string' && value.trim()) { const parsed = Date.parse(value); if (Number.isFinite(parsed)) { return parsed; } } return undefined; } async function loadCronFallbackMessages(sessionKey: string, limit = 200): Promise { if (!isCronSessionKey(sessionKey)) return []; try { const response = await hostApiFetch<{ messages?: RawMessage[] }>( buildCronSessionHistoryPath(sessionKey, limit), ); return Array.isArray(response.messages) ? response.messages : []; } catch (error) { console.warn('Failed to load cron fallback history:', error); return []; } } function normalizeAgentId(value: string | undefined | null): string { return (value ?? '').trim().toLowerCase() || 'main'; } function buildFallbackMainSessionKey(agentId: string): string { return `agent:${normalizeAgentId(agentId)}:main`; } function resolveMainSessionKeyForAgent(agentId: string | undefined | null): string | null { if (!agentId) return null; const normalizedAgentId = normalizeAgentId(agentId); const summary = useAgentsStore.getState().agents.find((agent) => agent.id === normalizedAgentId); return summary?.mainSessionKey || buildFallbackMainSessionKey(normalizedAgentId); } function ensureSessionEntry(sessions: ChatSession[], sessionKey: string): ChatSession[] { if (sessions.some((session) => session.key === sessionKey)) { return sessions; } return [...sessions, { key: sessionKey, displayName: sessionKey }]; } function clearSessionEntryFromMap>(entries: T, sessionKey: string): T { return Object.fromEntries(Object.entries(entries).filter(([key]) => key !== sessionKey)) as T; } function buildSessionSwitchPatch( state: Pick< ChatState, 'currentSessionKey' | 'messages' | 'sessions' | 'sessionLabels' | 'sessionLastActivity' >, nextSessionKey: string, ): Partial { // 仅将没有任何历史记录且无活动时间的会话视为空会话。 // 单纯依赖 messages.length 是不可靠的,因为 switchSession 会在真正调用 loadHistory 前抢先清空当前 messages, // 造成竞争条件,使得带有真实历史的会话被判定为空并从侧边栏移除。 const leavingEmpty = !state.currentSessionKey.endsWith(':main') && state.messages.length === 0 && !state.sessionLastActivity[state.currentSessionKey] && !state.sessionLabels[state.currentSessionKey]; const nextSessions = leavingEmpty ? state.sessions.filter((session) => session.key !== state.currentSessionKey) : state.sessions; return { currentSessionKey: nextSessionKey, currentAgentId: getAgentIdFromSessionKey(nextSessionKey), sessions: ensureSessionEntry(nextSessions, nextSessionKey), sessionLabels: leavingEmpty ? clearSessionEntryFromMap(state.sessionLabels, state.currentSessionKey) : state.sessionLabels, sessionLastActivity: leavingEmpty ? clearSessionEntryFromMap(state.sessionLastActivity, state.currentSessionKey) : state.sessionLastActivity, messages: [], streamingText: '', streamingMessage: null, streamingTools: [], activeRunId: null, error: null, pendingFinal: false, lastUserMessageAt: null, pendingToolImages: [], }; } function getCanonicalPrefixFromSessionKey(sessionKey: string): string | null { if (!sessionKey.startsWith('agent:')) return null; const parts = sessionKey.split(':'); if (parts.length < 2) return null; return `${parts[0]}:${parts[1]}`; } function isToolOnlyMessage(message: RawMessage | undefined): boolean { if (!message) return false; if (isToolResultRole(message.role)) return true; const msg = message as unknown as Record; const content = message.content; // Check OpenAI-format tool_calls field (real-time streaming from OpenAI-compatible models) const toolCalls = msg.tool_calls ?? msg.toolCalls; const hasOpenAITools = Array.isArray(toolCalls) && toolCalls.length > 0; if (!Array.isArray(content)) { // Content is not an array — check if there's OpenAI-format tool_calls if (hasOpenAITools) { // Has tool calls but content might be empty/string — treat as tool-only // if there's no meaningful text content const textContent = typeof content === 'string' ? content.trim() : ''; return textContent.length === 0; } return false; } let hasTool = hasOpenAITools; let hasText = false; let hasNonToolContent = false; for (const block of content as ContentBlock[]) { if (block.type === 'tool_use' || block.type === 'tool_result' || block.type === 'toolCall' || block.type === 'toolResult') { hasTool = true; continue; } if (block.type === 'text' && block.text && block.text.trim()) { hasText = true; continue; } // Only actual image output disqualifies a tool-only message. // Thinking blocks are internal reasoning that can accompany tool_use — they // should NOT prevent the message from being treated as an intermediate tool step. if (block.type === 'image') { hasNonToolContent = true; } } return hasTool && !hasText && !hasNonToolContent; } function isToolResultRole(role: unknown): boolean { if (!role) return false; const normalized = String(role).toLowerCase(); return normalized === 'toolresult' || normalized === 'tool_result'; } function extractTextFromContent(content: unknown): string { if (typeof content === 'string') return content; if (!Array.isArray(content)) return ''; const parts: string[] = []; for (const block of content as ContentBlock[]) { if (block.type === 'text' && block.text) { parts.push(block.text); } } return parts.join('\n'); } function summarizeToolOutput(text: string): string | undefined { const trimmed = text.trim(); if (!trimmed) return undefined; const lines = trimmed.split(/\r?\n/).map((line) => line.trim()).filter(Boolean); if (lines.length === 0) return undefined; const summaryLines = lines.slice(0, 2); let summary = summaryLines.join(' / '); if (summary.length > 160) { summary = `${summary.slice(0, 157)}...`; } return summary; } function normalizeToolStatus(rawStatus: unknown, fallback: 'running' | 'completed'): ToolStatus['status'] { const status = typeof rawStatus === 'string' ? rawStatus.toLowerCase() : ''; if (status === 'error' || status === 'failed') return 'error'; if (status === 'completed' || status === 'success' || status === 'done') return 'completed'; return fallback; } function parseDurationMs(value: unknown): number | undefined { if (typeof value === 'number' && Number.isFinite(value)) return value; const parsed = typeof value === 'string' ? Number(value) : NaN; return Number.isFinite(parsed) ? parsed : undefined; } function extractToolUseUpdates(message: unknown): ToolStatus[] { if (!message || typeof message !== 'object') return []; const msg = message as Record; const updates: ToolStatus[] = []; // Path 1: Anthropic/normalized format — tool blocks inside content array const content = msg.content; if (Array.isArray(content)) { for (const block of content as ContentBlock[]) { if ((block.type !== 'tool_use' && block.type !== 'toolCall') || !block.name) continue; updates.push({ id: block.id || block.name, toolCallId: block.id, name: block.name, status: 'running', updatedAt: Date.now(), }); } } // Path 2: OpenAI format — tool_calls array on the message itself if (updates.length === 0) { const toolCalls = msg.tool_calls ?? msg.toolCalls; if (Array.isArray(toolCalls)) { for (const tc of toolCalls as Array>) { const fn = (tc.function ?? tc) as Record; const name = typeof fn.name === 'string' ? fn.name : ''; if (!name) continue; const id = typeof tc.id === 'string' ? tc.id : name; updates.push({ id, toolCallId: typeof tc.id === 'string' ? tc.id : undefined, name, status: 'running', updatedAt: Date.now(), }); } } } return updates; } function extractToolResultBlocks(message: unknown, eventState: string): ToolStatus[] { if (!message || typeof message !== 'object') return []; const msg = message as Record; const content = msg.content; if (!Array.isArray(content)) return []; const updates: ToolStatus[] = []; for (const block of content as ContentBlock[]) { if (block.type !== 'tool_result' && block.type !== 'toolResult') continue; const outputText = extractTextFromContent(block.content ?? block.text ?? ''); const summary = summarizeToolOutput(outputText); updates.push({ id: block.id || block.name || 'tool', toolCallId: block.id, name: block.name || block.id || 'tool', status: normalizeToolStatus(undefined, eventState === 'delta' ? 'running' : 'completed'), summary, updatedAt: Date.now(), }); } return updates; } function extractToolResultUpdate(message: unknown, eventState: string): ToolStatus | null { if (!message || typeof message !== 'object') return null; const msg = message as Record; const role = typeof msg.role === 'string' ? msg.role.toLowerCase() : ''; if (!isToolResultRole(role)) return null; const toolName = typeof msg.toolName === 'string' ? msg.toolName : (typeof msg.name === 'string' ? msg.name : ''); const toolCallId = typeof msg.toolCallId === 'string' ? msg.toolCallId : undefined; const details = (msg.details && typeof msg.details === 'object') ? msg.details as Record : undefined; const rawStatus = (msg.status ?? details?.status); const fallback = eventState === 'delta' ? 'running' : 'completed'; const status = normalizeToolStatus(rawStatus, fallback); const durationMs = parseDurationMs(details?.durationMs ?? details?.duration ?? (msg as Record).durationMs); const outputText = (details && typeof details.aggregated === 'string') ? details.aggregated : extractTextFromContent(msg.content); const summary = summarizeToolOutput(outputText) ?? summarizeToolOutput(String(details?.error ?? msg.error ?? '')); const name = toolName || toolCallId || 'tool'; const id = toolCallId || name; return { id, toolCallId, name, status, durationMs, summary, updatedAt: Date.now(), }; } function mergeToolStatus(existing: ToolStatus['status'], incoming: ToolStatus['status']): ToolStatus['status'] { const order: Record = { running: 0, completed: 1, error: 2 }; return order[incoming] >= order[existing] ? incoming : existing; } function upsertToolStatuses(current: ToolStatus[], updates: ToolStatus[]): ToolStatus[] { if (updates.length === 0) return current; const next = [...current]; for (const update of updates) { const key = update.toolCallId || update.id || update.name; if (!key) continue; const index = next.findIndex((tool) => (tool.toolCallId || tool.id || tool.name) === key); if (index === -1) { next.push(update); continue; } const existing = next[index]; next[index] = { ...existing, ...update, name: update.name || existing.name, status: mergeToolStatus(existing.status, update.status), durationMs: update.durationMs ?? existing.durationMs, summary: update.summary ?? existing.summary, updatedAt: update.updatedAt || existing.updatedAt, }; } return next; } function collectToolUpdates(message: unknown, eventState: string): ToolStatus[] { const updates: ToolStatus[] = []; const toolResultUpdate = extractToolResultUpdate(message, eventState); if (toolResultUpdate) updates.push(toolResultUpdate); updates.push(...extractToolResultBlocks(message, eventState)); updates.push(...extractToolUseUpdates(message)); return updates; } function hasNonToolAssistantContent(message: RawMessage | undefined): boolean { if (!message) return false; if (typeof message.content === 'string' && message.content.trim()) return true; const content = message.content; if (Array.isArray(content)) { for (const block of content as ContentBlock[]) { if (block.type === 'text' && block.text && block.text.trim()) return true; if (block.type === 'thinking' && block.thinking && block.thinking.trim()) return true; if (block.type === 'image') return true; } } const msg = message as unknown as Record; if (typeof msg.text === 'string' && msg.text.trim()) return true; return false; } // ── Store ──────────────────────────────────────────────────────── export const useChatStore = create((set, get) => ({ messages: [], loading: false, error: null, sending: false, activeRunId: null, streamingText: '', streamingMessage: null, streamingTools: [], pendingFinal: false, lastUserMessageAt: null, pendingToolImages: [], sessions: [], currentSessionKey: DEFAULT_SESSION_KEY, currentAgentId: 'main', sessionLabels: {}, sessionLastActivity: {}, showThinking: true, thinkingLevel: null, // ── Load sessions via sessions.list ── loadSessions: async () => { const now = Date.now(); if (_loadSessionsInFlight) { await _loadSessionsInFlight; return; } if (now - _lastLoadSessionsAt < SESSION_LOAD_MIN_INTERVAL_MS) { return; } _loadSessionsInFlight = (async () => { try { const data = await useGatewayStore.getState().rpc>('sessions.list', {}); if (data) { const rawSessions = Array.isArray(data.sessions) ? data.sessions : []; const sessions: ChatSession[] = rawSessions.map((s: Record) => ({ key: String(s.key || ''), label: s.label ? String(s.label) : undefined, displayName: s.displayName ? String(s.displayName) : undefined, thinkingLevel: s.thinkingLevel ? String(s.thinkingLevel) : undefined, model: s.model ? String(s.model) : undefined, updatedAt: parseSessionUpdatedAtMs(s.updatedAt), })).filter((s: ChatSession) => s.key); const canonicalBySuffix = new Map(); for (const session of sessions) { if (!session.key.startsWith('agent:')) continue; const parts = session.key.split(':'); if (parts.length < 3) continue; const suffix = parts.slice(2).join(':'); if (suffix && !canonicalBySuffix.has(suffix)) { canonicalBySuffix.set(suffix, session.key); } } // Deduplicate: if both short and canonical existed, keep canonical only const seen = new Set(); const dedupedSessions = sessions.filter((s) => { if (!s.key.startsWith('agent:') && canonicalBySuffix.has(s.key)) return false; if (seen.has(s.key)) return false; seen.add(s.key); return true; }); const { currentSessionKey, sessions: localSessions } = get(); let nextSessionKey = currentSessionKey || DEFAULT_SESSION_KEY; if (!nextSessionKey.startsWith('agent:')) { const canonicalMatch = canonicalBySuffix.get(nextSessionKey); if (canonicalMatch) { nextSessionKey = canonicalMatch; } } if (!dedupedSessions.find((s) => s.key === nextSessionKey) && dedupedSessions.length > 0) { // Preserve only locally-created pending sessions. On initial boot the // default ghost key (`agent:main:main`) should yield to real history. const hasLocalPendingSession = localSessions.some((session) => session.key === nextSessionKey); if (!hasLocalPendingSession) { nextSessionKey = dedupedSessions[0].key; } } const sessionsWithCurrent = !dedupedSessions.find((s) => s.key === nextSessionKey) && nextSessionKey ? [ ...dedupedSessions, { key: nextSessionKey, displayName: nextSessionKey }, ] : dedupedSessions; const discoveredActivity = Object.fromEntries( sessionsWithCurrent .filter((session) => typeof session.updatedAt === 'number' && Number.isFinite(session.updatedAt)) .map((session) => [session.key, session.updatedAt!]), ); set((state) => ({ sessions: sessionsWithCurrent, currentSessionKey: nextSessionKey, currentAgentId: getAgentIdFromSessionKey(nextSessionKey), sessionLastActivity: { ...state.sessionLastActivity, ...discoveredActivity, }, })); if (currentSessionKey !== nextSessionKey) { void get().loadHistory(); } // Background: fetch first user message for every non-main session to populate labels upfront. // Uses a small limit so it's cheap; runs in parallel and doesn't block anything. const sessionsToLabel = sessionsWithCurrent.filter((s) => !s.key.endsWith(':main')); if (sessionsToLabel.length > 0) { void Promise.all( sessionsToLabel.map(async (session) => { try { const r = await useGatewayStore.getState().rpc>( 'chat.history', { sessionKey: session.key, limit: 1000 }, ); const msgs = Array.isArray(r.messages) ? r.messages as RawMessage[] : []; const firstUser = msgs.find((m) => m.role === 'user'); const lastMsg = msgs[msgs.length - 1]; set((s) => { const next: Partial = {}; if (firstUser) { const labelText = getMessageText(firstUser.content).trim(); if (labelText) { const truncated = labelText.length > 50 ? `${labelText.slice(0, 50)}…` : labelText; next.sessionLabels = { ...s.sessionLabels, [session.key]: truncated }; } } if (lastMsg?.timestamp) { next.sessionLastActivity = { ...s.sessionLastActivity, [session.key]: toMs(lastMsg.timestamp) }; } return next; }); } catch { // ignore per-session errors } }), ); } } } catch (err) { console.warn('Failed to load sessions:', err); } finally { _lastLoadSessionsAt = Date.now(); } })(); try { await _loadSessionsInFlight; } finally { _loadSessionsInFlight = null; } }, // ── Switch session ── switchSession: (key: string) => { if (key === get().currentSessionKey) return; // Stop any background polling for the old session before switching. // This prevents the poll timer from firing after the switch and loading // the wrong session's history into the new session's view. clearHistoryPoll(); set((s) => buildSessionSwitchPatch(s, key)); get().loadHistory(); }, // ── Delete session ── // // NOTE: The OpenClaw Gateway does NOT expose a sessions.delete (or equivalent) // RPC — confirmed by inspecting client.ts, protocol.ts and the full codebase. // Deletion is therefore a local-only UI operation: the session is removed from // the sidebar list and its labels/activity maps are cleared. The underlying // JSONL history file on disk is intentionally left intact, consistent with the // newSession() design that avoids sessions.reset to preserve history. deleteSession: async (key: string) => { // Soft-delete the session's JSONL transcript on disk. // The main process renames .jsonl → .deleted.jsonl so that // sessions.list skips it automatically. try { const result = await hostApiFetch<{ success: boolean; error?: string; }>('/api/sessions/delete', { method: 'POST', body: JSON.stringify({ sessionKey: key }), }); if (!result.success) { console.warn(`[deleteSession] IPC reported failure for ${key}:`, result.error); } } catch (err) { console.warn(`[deleteSession] IPC call failed for ${key}:`, err); } const { currentSessionKey, sessions } = get(); const remaining = sessions.filter((s) => s.key !== key); if (currentSessionKey === key) { // Switched away from deleted session — pick the first remaining or create new const next = remaining[0]; set((s) => ({ sessions: remaining, sessionLabels: Object.fromEntries(Object.entries(s.sessionLabels).filter(([k]) => k !== key)), sessionLastActivity: Object.fromEntries(Object.entries(s.sessionLastActivity).filter(([k]) => k !== key)), messages: [], streamingText: '', streamingMessage: null, streamingTools: [], activeRunId: null, error: null, pendingFinal: false, lastUserMessageAt: null, pendingToolImages: [], currentSessionKey: next?.key ?? DEFAULT_SESSION_KEY, currentAgentId: getAgentIdFromSessionKey(next?.key ?? DEFAULT_SESSION_KEY), })); if (next) { get().loadHistory(); } } else { set((s) => ({ sessions: remaining, sessionLabels: Object.fromEntries(Object.entries(s.sessionLabels).filter(([k]) => k !== key)), sessionLastActivity: Object.fromEntries(Object.entries(s.sessionLastActivity).filter(([k]) => k !== key)), })); } }, // ── New session ── newSession: () => { // Generate a new unique session key and switch to it. // NOTE: We intentionally do NOT call sessions.reset on the old session. // sessions.reset archives (renames) the session JSONL file, making old // conversation history inaccessible when the user switches back to it. const { currentSessionKey, messages, sessions, sessionLastActivity, sessionLabels } = get(); // 仅将没有任何历史记录且无活动时间的会话视为空会话 const leavingEmpty = !currentSessionKey.endsWith(':main') && messages.length === 0 && !sessionLastActivity[currentSessionKey] && !sessionLabels[currentSessionKey]; const prefix = getCanonicalPrefixFromSessionKey(currentSessionKey) ?? getCanonicalPrefixFromSessions(sessions) ?? DEFAULT_CANONICAL_PREFIX; const newKey = `${prefix}:session-${Date.now()}`; const newSessionEntry: ChatSession = { key: newKey, displayName: newKey }; set((s) => ({ currentSessionKey: newKey, currentAgentId: getAgentIdFromSessionKey(newKey), sessions: [ ...(leavingEmpty ? s.sessions.filter((sess) => sess.key !== currentSessionKey) : s.sessions), newSessionEntry, ], sessionLabels: leavingEmpty ? Object.fromEntries(Object.entries(s.sessionLabels).filter(([k]) => k !== currentSessionKey)) : s.sessionLabels, sessionLastActivity: leavingEmpty ? Object.fromEntries(Object.entries(s.sessionLastActivity).filter(([k]) => k !== currentSessionKey)) : s.sessionLastActivity, messages: [], streamingText: '', streamingMessage: null, streamingTools: [], activeRunId: null, error: null, pendingFinal: false, lastUserMessageAt: null, pendingToolImages: [], })); }, // ── Cleanup empty session on navigate away ── cleanupEmptySession: () => { const { currentSessionKey, messages, sessionLastActivity, sessionLabels } = get(); // Only remove non-main sessions that were never used (no messages sent). // This mirrors the "leavingEmpty" logic in switchSession so that creating // a new session and immediately navigating away doesn't leave a ghost entry // in the sidebar. // 同样需要综合检查 sessionLastActivity 和 sessionLabels, // 防止因为 switchSession 抢先清空 messages 而误判有历史的会话为空。 const isEmptyNonMain = !currentSessionKey.endsWith(':main') && messages.length === 0 && !sessionLastActivity[currentSessionKey] && !sessionLabels[currentSessionKey]; if (!isEmptyNonMain) return; set((s) => ({ sessions: s.sessions.filter((sess) => sess.key !== currentSessionKey), sessionLabels: Object.fromEntries( Object.entries(s.sessionLabels).filter(([k]) => k !== currentSessionKey), ), sessionLastActivity: Object.fromEntries( Object.entries(s.sessionLastActivity).filter(([k]) => k !== currentSessionKey), ), })); }, // ── Load chat history ── loadHistory: async (quiet = false) => { const { currentSessionKey } = get(); const existingLoad = _historyLoadInFlight.get(currentSessionKey); if (existingLoad) { await existingLoad; return; } const lastLoadAt = _lastHistoryLoadAtBySession.get(currentSessionKey) || 0; if (quiet && Date.now() - lastLoadAt < HISTORY_LOAD_MIN_INTERVAL_MS) { return; } if (!quiet) set({ loading: true, error: null }); // 安全保护:如果历史记录加载花费太多时间,则强制将 loading 设置为 false // 防止 UI 永远卡在转圈状态。 let loadingTimedOut = false; const loadingSafetyTimer = quiet ? null : setTimeout(() => { loadingTimedOut = true; set({ loading: false }); }, 15_000); const loadPromise = (async () => { const applyLoadedMessages = (rawMessages: RawMessage[], thinkingLevel: string | null) => { // Guard: if the user switched sessions while this async load was in // flight, discard the result to prevent overwriting the new session's // messages with stale data from the old session. if (get().currentSessionKey !== currentSessionKey) return; // Before filtering: attach images/files from tool_result messages to the next assistant message const messagesWithToolImages = enrichWithToolResultFiles(rawMessages); const filteredMessages = messagesWithToolImages.filter((msg) => !isToolResultRole(msg.role)); // Restore file attachments for user/assistant messages (from cache + text patterns) const enrichedMessages = enrichWithCachedImages(filteredMessages); // Preserve the optimistic user message during an active send. // The Gateway may not include the user's message in chat.history // until the run completes, causing it to flash out of the UI. let finalMessages = enrichedMessages; const userMsgAt = get().lastUserMessageAt; if (get().sending && userMsgAt) { const userMsMs = toMs(userMsgAt); const hasRecentUser = enrichedMessages.some( (m) => m.role === 'user' && m.timestamp && Math.abs(toMs(m.timestamp) - userMsMs) < 5000, ); if (!hasRecentUser) { const currentMsgs = get().messages; const optimistic = [...currentMsgs].reverse().find( (m) => m.role === 'user' && m.timestamp && Math.abs(toMs(m.timestamp) - userMsMs) < 5000, ); if (optimistic) { finalMessages = [...enrichedMessages, optimistic]; } } } set({ messages: finalMessages, thinkingLevel, loading: false }); // Extract first user message text as a session label for display in the toolbar. // Skip main sessions (key ends with ":main") — they rely on the Gateway-provided // displayName (e.g. the configured agent name "ClawX") instead. const isMainSession = currentSessionKey.endsWith(':main'); if (!isMainSession) { const firstUserMsg = finalMessages.find((m) => m.role === 'user'); if (firstUserMsg) { const labelText = getMessageText(firstUserMsg.content).trim(); if (labelText) { const truncated = labelText.length > 50 ? `${labelText.slice(0, 50)}…` : labelText; set((s) => ({ sessionLabels: { ...s.sessionLabels, [currentSessionKey]: truncated }, })); } } } // Record last activity time from the last message in history const lastMsg = finalMessages[finalMessages.length - 1]; if (lastMsg?.timestamp) { const lastAt = toMs(lastMsg.timestamp); set((s) => ({ sessionLastActivity: { ...s.sessionLastActivity, [currentSessionKey]: lastAt }, })); } // Async: load missing image previews from disk (updates in background) loadMissingPreviews(finalMessages).then((updated) => { if (updated) { // Create new object references so React.memo detects changes. // loadMissingPreviews mutates AttachedFileMeta in place, so we // must produce fresh message + file references for each affected msg. set({ messages: finalMessages.map(msg => msg._attachedFiles ? { ...msg, _attachedFiles: msg._attachedFiles.map(f => ({ ...f })) } : msg ), }); } }); const { pendingFinal, lastUserMessageAt, sending: isSendingNow } = get(); // If we're sending but haven't received streaming events, check // whether the loaded history reveals intermediate tool-call activity. // This surfaces progress via the pendingFinal → ActivityIndicator path. const userMsTs = lastUserMessageAt ? toMs(lastUserMessageAt) : 0; const isAfterUserMsg = (msg: RawMessage): boolean => { if (!userMsTs || !msg.timestamp) return true; return toMs(msg.timestamp) >= userMsTs; }; if (isSendingNow && !pendingFinal) { const hasRecentAssistantActivity = [...filteredMessages].reverse().some((msg) => { if (msg.role !== 'assistant') return false; return isAfterUserMsg(msg); }); if (hasRecentAssistantActivity) { set({ pendingFinal: true }); } } // If pendingFinal, check whether the AI produced a final text response. if (pendingFinal || get().pendingFinal) { const recentAssistant = [...filteredMessages].reverse().find((msg) => { if (msg.role !== 'assistant') return false; if (!hasNonToolAssistantContent(msg)) return false; return isAfterUserMsg(msg); }); if (recentAssistant) { clearHistoryPoll(); set({ sending: false, activeRunId: null, pendingFinal: false }); } } }; try { const data = await useGatewayStore.getState().rpc>( 'chat.history', { sessionKey: currentSessionKey, limit: 200 }, ); if (data) { let rawMessages = Array.isArray(data.messages) ? data.messages as RawMessage[] : []; const thinkingLevel = data.thinkingLevel ? String(data.thinkingLevel) : null; if (rawMessages.length === 0 && isCronSessionKey(currentSessionKey)) { rawMessages = await loadCronFallbackMessages(currentSessionKey, 200); } applyLoadedMessages(rawMessages, thinkingLevel); } else { const fallbackMessages = await loadCronFallbackMessages(currentSessionKey, 200); if (fallbackMessages.length > 0) { applyLoadedMessages(fallbackMessages, null); } else { set({ messages: [], loading: false }); } } } catch (err) { console.warn('Failed to load chat history:', err); const fallbackMessages = await loadCronFallbackMessages(currentSessionKey, 200); if (fallbackMessages.length > 0) { applyLoadedMessages(fallbackMessages, null); } else { set({ messages: [], loading: false }); } } })(); _historyLoadInFlight.set(currentSessionKey, loadPromise); try { await loadPromise; } finally { // 正常完成时清除安全定时器 if (loadingSafetyTimer) clearTimeout(loadingSafetyTimer); if (!loadingTimedOut) { // Only update load time if we actually didn't time out _lastHistoryLoadAtBySession.set(currentSessionKey, Date.now()); } const active = _historyLoadInFlight.get(currentSessionKey); if (active === loadPromise) { _historyLoadInFlight.delete(currentSessionKey); } } }, // ── Send message ── sendMessage: async ( text: string, attachments?: Array<{ fileName: string; mimeType: string; fileSize: number; stagedPath: string; preview: string | null }>, targetAgentId?: string | null, ) => { const trimmed = text.trim(); if (!trimmed && (!attachments || attachments.length === 0)) return; const targetSessionKey = resolveMainSessionKeyForAgent(targetAgentId) ?? get().currentSessionKey; if (targetSessionKey !== get().currentSessionKey) { set((s) => buildSessionSwitchPatch(s, targetSessionKey)); await get().loadHistory(true); } const currentSessionKey = targetSessionKey; // Add user message optimistically (with local file metadata for UI display) const nowMs = Date.now(); const userMsg: RawMessage = { role: 'user', content: trimmed || (attachments?.length ? '(file attached)' : ''), timestamp: nowMs / 1000, id: crypto.randomUUID(), _attachedFiles: attachments?.map(a => ({ fileName: a.fileName, mimeType: a.mimeType, fileSize: a.fileSize, preview: a.preview, filePath: a.stagedPath, })), }; set((s) => ({ messages: [...s.messages, userMsg], sending: true, error: null, streamingText: '', streamingMessage: null, streamingTools: [], pendingFinal: false, lastUserMessageAt: nowMs, })); // Update session label with first user message text as soon as it's sent const { sessionLabels, messages } = get(); const isFirstMessage = !messages.slice(0, -1).some((m) => m.role === 'user'); if (!currentSessionKey.endsWith(':main') && isFirstMessage && !sessionLabels[currentSessionKey] && trimmed) { const truncated = trimmed.length > 50 ? `${trimmed.slice(0, 50)}…` : trimmed; set((s) => ({ sessionLabels: { ...s.sessionLabels, [currentSessionKey]: truncated } })); } // Mark this session as most recently active set((s) => ({ sessionLastActivity: { ...s.sessionLastActivity, [currentSessionKey]: nowMs } })); // Start the history poll and safety timeout IMMEDIATELY (before the // RPC await) because the gateway's chat.send RPC may block until the // entire agentic conversation finishes — the poll must run in parallel. _lastChatEventAt = Date.now(); clearHistoryPoll(); clearErrorRecoveryTimer(); const POLL_START_DELAY = 3_000; const POLL_INTERVAL = 4_000; const pollHistory = () => { const state = get(); if (!state.sending) { clearHistoryPoll(); return; } if (state.streamingMessage) { _historyPollTimer = setTimeout(pollHistory, POLL_INTERVAL); return; } if (Date.now() - _lastChatEventAt < HISTORY_POLL_SILENCE_WINDOW_MS) { _historyPollTimer = setTimeout(pollHistory, POLL_INTERVAL); return; } state.loadHistory(true); _historyPollTimer = setTimeout(pollHistory, POLL_INTERVAL); }; _historyPollTimer = setTimeout(pollHistory, POLL_START_DELAY); const SAFETY_TIMEOUT_MS = 90_000; const checkStuck = () => { const state = get(); if (!state.sending) return; if (state.streamingMessage || state.streamingText) return; if (state.pendingFinal) { setTimeout(checkStuck, 10_000); return; } if (Date.now() - _lastChatEventAt < SAFETY_TIMEOUT_MS) { setTimeout(checkStuck, 10_000); return; } clearHistoryPoll(); set({ error: 'No response received from the model. The provider may be unavailable or the API key may have insufficient quota. Please check your provider settings.', sending: false, activeRunId: null, lastUserMessageAt: null, }); }; setTimeout(checkStuck, 30_000); try { const idempotencyKey = crypto.randomUUID(); const hasMedia = attachments && attachments.length > 0; if (hasMedia) { console.log('[sendMessage] Media paths:', attachments!.map(a => a.stagedPath)); } // Cache image attachments BEFORE the IPC call to avoid race condition: // history may reload (via Gateway event) before the RPC returns. // Keyed by staged file path which appears in [media attached: ...]. if (hasMedia && attachments) { for (const a of attachments) { _imageCache.set(a.stagedPath, { fileName: a.fileName, mimeType: a.mimeType, fileSize: a.fileSize, preview: a.preview, }); } saveImageCache(_imageCache); } let result: { success: boolean; result?: { runId?: string }; error?: string }; // Longer timeout for chat sends to tolerate high-latency networks (avoids connect error) const CHAT_SEND_TIMEOUT_MS = 120_000; if (hasMedia) { result = await hostApiFetch<{ success: boolean; result?: { runId?: string }; error?: string }>( '/api/chat/send-with-media', { method: 'POST', body: JSON.stringify({ sessionKey: currentSessionKey, message: trimmed || 'Process the attached file(s).', deliver: false, idempotencyKey, media: attachments.map((a) => ({ filePath: a.stagedPath, mimeType: a.mimeType, fileName: a.fileName, })), }), }, ); } else { const rpcResult = await useGatewayStore.getState().rpc<{ runId?: string }>( 'chat.send', { sessionKey: currentSessionKey, message: trimmed, deliver: false, idempotencyKey, }, CHAT_SEND_TIMEOUT_MS, ); result = { success: true, result: rpcResult }; } console.log(`[sendMessage] RPC result: success=${result.success}, runId=${result.result?.runId || 'none'}`); if (!result.success) { clearHistoryPoll(); set({ error: result.error || 'Failed to send message', sending: false }); } else if (result.result?.runId) { set({ activeRunId: result.result.runId }); } } catch (err) { clearHistoryPoll(); set({ error: String(err), sending: false }); } }, // ── Abort active run ── abortRun: async () => { clearHistoryPoll(); clearErrorRecoveryTimer(); const { currentSessionKey } = get(); set({ sending: false, streamingText: '', streamingMessage: null, pendingFinal: false, lastUserMessageAt: null, pendingToolImages: [] }); set({ streamingTools: [] }); try { await useGatewayStore.getState().rpc( 'chat.abort', { sessionKey: currentSessionKey }, ); } catch (err) { set({ error: String(err) }); } }, // ── Handle incoming chat events from Gateway ── handleChatEvent: (event: Record) => { const runId = String(event.runId || ''); const eventState = String(event.state || ''); const eventSessionKey = event.sessionKey != null ? String(event.sessionKey) : null; const { activeRunId, currentSessionKey } = get(); // Only process events for the current session (when sessionKey is present) if (eventSessionKey != null && eventSessionKey !== currentSessionKey) return; // Only process events for the active run (or if no active run set) if (activeRunId && runId && runId !== activeRunId) return; if (isDuplicateChatEvent(eventState, event)) return; _lastChatEventAt = Date.now(); // Defensive: if state is missing but we have a message, try to infer state. let resolvedState = eventState; if (!resolvedState && event.message && typeof event.message === 'object') { const msg = event.message as Record; const stopReason = msg.stopReason ?? msg.stop_reason; if (stopReason) { resolvedState = 'final'; } else if (msg.role || msg.content) { resolvedState = 'delta'; } } // Only pause the history poll when we receive actual streaming data. // The gateway sends "agent" events with { phase, startedAt } that carry // no message — these must NOT kill the poll, since the poll is our only // way to track progress when the gateway doesn't stream intermediate turns. const hasUsefulData = resolvedState === 'delta' || resolvedState === 'final' || resolvedState === 'error' || resolvedState === 'aborted'; if (hasUsefulData) { clearHistoryPoll(); // Adopt run started from another client (e.g. console at 127.0.0.1:18789): // show loading/streaming in the app when this session has an active run. const { sending } = get(); if (!sending && runId) { set({ sending: true, activeRunId: runId, error: null }); } } switch (resolvedState) { case 'started': { // Run just started (e.g. from console); show loading immediately. const { sending: currentSending } = get(); if (!currentSending && runId) { set({ sending: true, activeRunId: runId, error: null }); } break; } case 'delta': { // If we're receiving new deltas, the Gateway has recovered from any // prior error — cancel the error finalization timer and clear the // stale error banner so the user sees the live stream again. if (_errorRecoveryTimer) { clearErrorRecoveryTimer(); set({ error: null }); } const updates = collectToolUpdates(event.message, resolvedState); set((s) => ({ streamingMessage: (() => { if (event.message && typeof event.message === 'object') { const msgRole = (event.message as RawMessage).role; if (isToolResultRole(msgRole)) return s.streamingMessage; } return event.message ?? s.streamingMessage; })(), streamingTools: updates.length > 0 ? upsertToolStatuses(s.streamingTools, updates) : s.streamingTools, })); break; } case 'final': { clearErrorRecoveryTimer(); if (get().error) set({ error: null }); // Message complete - add to history and clear streaming const finalMsg = event.message as RawMessage | undefined; if (finalMsg) { const updates = collectToolUpdates(finalMsg, resolvedState); if (isToolResultRole(finalMsg.role)) { // Resolve file path from the streaming assistant message's matching tool call const currentStreamForPath = get().streamingMessage as RawMessage | null; const matchedPath = (currentStreamForPath && finalMsg.toolCallId) ? getToolCallFilePath(currentStreamForPath, finalMsg.toolCallId) : undefined; // Mirror enrichWithToolResultFiles: collect images + file refs for next assistant msg const toolFiles: AttachedFileMeta[] = [ ...extractImagesAsAttachedFiles(finalMsg.content), ]; if (matchedPath) { for (const f of toolFiles) { if (!f.filePath) { f.filePath = matchedPath; f.fileName = matchedPath.split(/[\\/]/).pop() || 'image'; } } } const text = getMessageText(finalMsg.content); if (text) { const mediaRefs = extractMediaRefs(text); const mediaRefPaths = new Set(mediaRefs.map(r => r.filePath)); for (const ref of mediaRefs) toolFiles.push(makeAttachedFile(ref)); for (const ref of extractRawFilePaths(text)) { if (!mediaRefPaths.has(ref.filePath)) toolFiles.push(makeAttachedFile(ref)); } } set((s) => { // Snapshot the current streaming assistant message (thinking + tool_use) into // messages[] before clearing it. The Gateway does NOT send separate 'final' // events for intermediate tool-use turns — it only sends deltas and then the // tool result. Without snapshotting here, the intermediate thinking+tool steps // would be overwritten by the next turn's deltas and never appear in the UI. const currentStream = s.streamingMessage as RawMessage | null; const snapshotMsgs: RawMessage[] = []; if (currentStream) { const streamRole = currentStream.role; if (streamRole === 'assistant' || streamRole === undefined) { // Use message's own id if available, otherwise derive a stable one from runId const snapId = currentStream.id || `${runId || 'run'}-turn-${s.messages.length}`; if (!s.messages.some(m => m.id === snapId)) { snapshotMsgs.push({ ...(currentStream as RawMessage), role: 'assistant', id: snapId, }); } } } return { messages: snapshotMsgs.length > 0 ? [...s.messages, ...snapshotMsgs] : s.messages, streamingText: '', streamingMessage: null, pendingFinal: true, pendingToolImages: toolFiles.length > 0 ? [...s.pendingToolImages, ...toolFiles] : s.pendingToolImages, streamingTools: updates.length > 0 ? upsertToolStatuses(s.streamingTools, updates) : s.streamingTools, }; }); break; } const toolOnly = isToolOnlyMessage(finalMsg); const hasOutput = hasNonToolAssistantContent(finalMsg); const msgId = finalMsg.id || (toolOnly ? `run-${runId}-tool-${Date.now()}` : `run-${runId}`); set((s) => { const nextTools = updates.length > 0 ? upsertToolStatuses(s.streamingTools, updates) : s.streamingTools; const streamingTools = hasOutput ? [] : nextTools; // Attach any images collected from preceding tool results const pendingImgs = s.pendingToolImages; const msgWithImages: RawMessage = pendingImgs.length > 0 ? { ...finalMsg, role: (finalMsg.role || 'assistant') as RawMessage['role'], id: msgId, _attachedFiles: [...(finalMsg._attachedFiles || []), ...pendingImgs], } : { ...finalMsg, role: (finalMsg.role || 'assistant') as RawMessage['role'], id: msgId }; const clearPendingImages = { pendingToolImages: [] as AttachedFileMeta[] }; // Check if message already exists (prevent duplicates) const alreadyExists = s.messages.some(m => m.id === msgId); if (alreadyExists) { return toolOnly ? { streamingText: '', streamingMessage: null, pendingFinal: true, streamingTools, ...clearPendingImages, } : { streamingText: '', streamingMessage: null, sending: hasOutput ? false : s.sending, activeRunId: hasOutput ? null : s.activeRunId, pendingFinal: hasOutput ? false : true, streamingTools, ...clearPendingImages, }; } return toolOnly ? { messages: [...s.messages, msgWithImages], streamingText: '', streamingMessage: null, pendingFinal: true, streamingTools, ...clearPendingImages, } : { messages: [...s.messages, msgWithImages], streamingText: '', streamingMessage: null, sending: hasOutput ? false : s.sending, activeRunId: hasOutput ? null : s.activeRunId, pendingFinal: hasOutput ? false : true, streamingTools, ...clearPendingImages, }; }); // After the final response, quietly reload history to surface all intermediate // tool-use turns (thinking + tool blocks) from the Gateway's authoritative record. if (hasOutput && !toolOnly) { clearHistoryPoll(); void get().loadHistory(true); } } else { // No message in final event - reload history to get complete data set({ streamingText: '', streamingMessage: null, pendingFinal: true }); get().loadHistory(); } break; } case 'error': { const errorMsg = String(event.errorMessage || 'An error occurred'); const wasSending = get().sending; // Snapshot the current streaming message into messages[] so partial // content ("Let me get that written down...") is preserved in the UI // rather than being silently discarded. const currentStream = get().streamingMessage as RawMessage | null; if (currentStream && (currentStream.role === 'assistant' || currentStream.role === undefined)) { const snapId = (currentStream as RawMessage).id || `error-snap-${Date.now()}`; const alreadyExists = get().messages.some(m => m.id === snapId); if (!alreadyExists) { set((s) => ({ messages: [...s.messages, { ...currentStream, role: 'assistant' as const, id: snapId }], })); } } set({ error: errorMsg, streamingText: '', streamingMessage: null, streamingTools: [], pendingFinal: false, pendingToolImages: [], }); // Don't immediately give up: the Gateway often retries internally // after transient API failures (e.g. "terminated"). Keep `sending` // true for a grace period so that recovery events are processed and // the agent-phase-completion handler can still trigger loadHistory. if (wasSending) { clearErrorRecoveryTimer(); const ERROR_RECOVERY_GRACE_MS = 15_000; _errorRecoveryTimer = setTimeout(() => { _errorRecoveryTimer = null; const state = get(); if (state.sending && !state.streamingMessage) { clearHistoryPoll(); // Grace period expired with no recovery — finalize the error set({ sending: false, activeRunId: null, lastUserMessageAt: null, }); // One final history reload in case the Gateway completed in the // background and we just missed the event. state.loadHistory(true); } }, ERROR_RECOVERY_GRACE_MS); } else { clearHistoryPoll(); set({ sending: false, activeRunId: null, lastUserMessageAt: null }); } break; } case 'aborted': { clearHistoryPoll(); clearErrorRecoveryTimer(); set({ sending: false, activeRunId: null, streamingText: '', streamingMessage: null, streamingTools: [], pendingFinal: false, lastUserMessageAt: null, pendingToolImages: [], }); break; } default: { // Unknown or empty state — if we're currently sending and receive an event // with a message, attempt to process it as streaming data. This handles // edge cases where the Gateway sends events without a state field. const { sending } = get(); if (sending && event.message && typeof event.message === 'object') { console.warn(`[handleChatEvent] Unknown event state "${resolvedState}", treating message as streaming delta. Event keys:`, Object.keys(event)); const updates = collectToolUpdates(event.message, 'delta'); set((s) => ({ streamingMessage: event.message ?? s.streamingMessage, streamingTools: updates.length > 0 ? upsertToolStatuses(s.streamingTools, updates) : s.streamingTools, })); } break; } } }, // ── Toggle thinking visibility ── toggleThinking: () => set((s) => ({ showThinking: !s.showThinking })), // ── Refresh: reload history + sessions ── refresh: async () => { const { loadHistory, loadSessions } = get(); await Promise.all([loadHistory(), loadSessions()]); }, clearError: () => set({ error: null }), }));