Files
DeskClaw/src/stores/chat.ts
2026-03-25 21:11:20 +08:00

1985 lines
76 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* Chat State Store
* Manages chat messages, sessions, streaming, and thinking state.
* Communicates with OpenClaw Gateway via renderer WebSocket RPC.
*/
import { create } from 'zustand';
import { hostApiFetch } from '@/lib/host-api';
import { useGatewayStore } from './gateway';
import { useAgentsStore } from './agents';
import { buildCronSessionHistoryPath, isCronSessionKey } from './chat/cron-session-utils';
import {
DEFAULT_CANONICAL_PREFIX,
DEFAULT_SESSION_KEY,
type AttachedFileMeta,
type ChatSession,
type ChatState,
type ContentBlock,
type RawMessage,
type ToolStatus,
} from './chat/types';
export type {
AttachedFileMeta,
ChatSession,
ContentBlock,
RawMessage,
ToolStatus,
} from './chat/types';
// Module-level timestamp tracking the last chat event received.
// Used by the safety timeout to avoid false-positive "no response" errors
// during tool-use conversations where streamingMessage is temporarily cleared
// between tool-result finals and the next delta.
let _lastChatEventAt = 0;
/** Normalize a timestamp to milliseconds. Handles both seconds and ms. */
function toMs(ts: number): number {
// Timestamps < 1e12 are in seconds (before ~2033); >= 1e12 are milliseconds
return ts < 1e12 ? ts * 1000 : ts;
}
// Timer for fallback history polling during active sends.
// If no streaming events arrive within a few seconds, we periodically
// poll chat.history to surface intermediate tool-call turns.
let _historyPollTimer: ReturnType<typeof setTimeout> | null = null;
// Timer for delayed error finalization. When the Gateway reports a mid-stream
// error (e.g. "terminated"), it may retry internally and recover. We wait
// before committing the error to give the recovery path a chance.
let _errorRecoveryTimer: ReturnType<typeof setTimeout> | null = null;
let _loadSessionsInFlight: Promise<void> | null = null;
let _lastLoadSessionsAt = 0;
const _historyLoadInFlight = new Map<string, Promise<void>>();
const _lastHistoryLoadAtBySession = new Map<string, number>();
const SESSION_LOAD_MIN_INTERVAL_MS = 1_200;
const HISTORY_LOAD_MIN_INTERVAL_MS = 800;
const HISTORY_POLL_SILENCE_WINDOW_MS = 2_500;
const CHAT_EVENT_DEDUPE_TTL_MS = 30_000;
const _chatEventDedupe = new Map<string, number>();
function clearErrorRecoveryTimer(): void {
if (_errorRecoveryTimer) {
clearTimeout(_errorRecoveryTimer);
_errorRecoveryTimer = null;
}
}
function clearHistoryPoll(): void {
if (_historyPollTimer) {
clearTimeout(_historyPollTimer);
_historyPollTimer = null;
}
}
function pruneChatEventDedupe(now: number): void {
for (const [key, ts] of _chatEventDedupe.entries()) {
if (now - ts > CHAT_EVENT_DEDUPE_TTL_MS) {
_chatEventDedupe.delete(key);
}
}
}
function buildChatEventDedupeKey(eventState: string, event: Record<string, unknown>): string | null {
const runId = event.runId != null ? String(event.runId) : '';
const sessionKey = event.sessionKey != null ? String(event.sessionKey) : '';
const seq = event.seq != null ? String(event.seq) : '';
if (runId || sessionKey || seq || eventState) {
return [runId, sessionKey, seq, eventState].join('|');
}
const msg = (event.message && typeof event.message === 'object')
? event.message as Record<string, unknown>
: null;
if (msg) {
const messageId = msg.id != null ? String(msg.id) : '';
const stopReason = msg.stopReason ?? msg.stop_reason;
if (messageId || stopReason) {
return `msg|${messageId}|${String(stopReason ?? '')}|${eventState}`;
}
}
return null;
}
function isDuplicateChatEvent(eventState: string, event: Record<string, unknown>): boolean {
const key = buildChatEventDedupeKey(eventState, event);
if (!key) return false;
const now = Date.now();
pruneChatEventDedupe(now);
if (_chatEventDedupe.has(key)) {
return true;
}
_chatEventDedupe.set(key, now);
return false;
}
// ── Local image cache ─────────────────────────────────────────
// The Gateway doesn't store image attachments in session content blocks,
// so we cache them locally keyed by staged file path (which appears in the
// [media attached: <path> ...] reference in the Gateway's user message text).
// Keying by path avoids the race condition of keying by runId (which is only
// available after the RPC returns, but history may load before that).
const IMAGE_CACHE_KEY = 'clawx:image-cache';
const IMAGE_CACHE_MAX = 100; // max entries to prevent unbounded growth
function loadImageCache(): Map<string, AttachedFileMeta> {
try {
const raw = localStorage.getItem(IMAGE_CACHE_KEY);
if (raw) {
const entries = JSON.parse(raw) as Array<[string, AttachedFileMeta]>;
return new Map(entries);
}
} catch { /* ignore parse errors */ }
return new Map();
}
function saveImageCache(cache: Map<string, AttachedFileMeta>): void {
try {
// Evict oldest entries if over limit
const entries = Array.from(cache.entries());
const trimmed = entries.length > IMAGE_CACHE_MAX
? entries.slice(entries.length - IMAGE_CACHE_MAX)
: entries;
localStorage.setItem(IMAGE_CACHE_KEY, JSON.stringify(trimmed));
} catch { /* ignore quota errors */ }
}
const _imageCache = loadImageCache();
/** Extract plain text from message content (string or content blocks) */
function getMessageText(content: unknown): string {
if (typeof content === 'string') return content;
if (Array.isArray(content)) {
return (content as Array<{ type?: string; text?: string }>)
.filter(b => b.type === 'text' && b.text)
.map(b => b.text!)
.join('\n');
}
return '';
}
/** Extract media file refs from [media attached: <path> (<mime>) | ...] patterns */
function extractMediaRefs(text: string): Array<{ filePath: string; mimeType: string }> {
const refs: Array<{ filePath: string; mimeType: string }> = [];
const regex = /\[media attached:\s*([^\s(]+)\s*\(([^)]+)\)\s*\|[^\]]*\]/g;
let match;
while ((match = regex.exec(text)) !== null) {
refs.push({ filePath: match[1], mimeType: match[2] });
}
return refs;
}
/** Map common file extensions to MIME types */
function mimeFromExtension(filePath: string): string {
const ext = filePath.split('.').pop()?.toLowerCase() || '';
const map: Record<string, string> = {
// Images
'png': 'image/png',
'jpg': 'image/jpeg',
'jpeg': 'image/jpeg',
'gif': 'image/gif',
'webp': 'image/webp',
'bmp': 'image/bmp',
'avif': 'image/avif',
'svg': 'image/svg+xml',
// Documents
'pdf': 'application/pdf',
'doc': 'application/msword',
'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'xls': 'application/vnd.ms-excel',
'xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
'ppt': 'application/vnd.ms-powerpoint',
'pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation',
'txt': 'text/plain',
'csv': 'text/csv',
'md': 'text/markdown',
'rtf': 'application/rtf',
'epub': 'application/epub+zip',
// Archives
'zip': 'application/zip',
'tar': 'application/x-tar',
'gz': 'application/gzip',
'rar': 'application/vnd.rar',
'7z': 'application/x-7z-compressed',
// Audio
'mp3': 'audio/mpeg',
'wav': 'audio/wav',
'ogg': 'audio/ogg',
'aac': 'audio/aac',
'flac': 'audio/flac',
'm4a': 'audio/mp4',
// Video
'mp4': 'video/mp4',
'mov': 'video/quicktime',
'avi': 'video/x-msvideo',
'mkv': 'video/x-matroska',
'webm': 'video/webm',
'm4v': 'video/mp4',
};
return map[ext] || 'application/octet-stream';
}
/**
* Extract raw file paths from message text.
* Detects absolute paths (Unix: / or ~/, Windows: C:\ etc.) ending with common file extensions.
* Handles both image and non-image files, consistent with channel push message behavior.
*/
function extractRawFilePaths(text: string): Array<{ filePath: string; mimeType: string }> {
const refs: Array<{ filePath: string; mimeType: string }> = [];
const seen = new Set<string>();
const exts = 'png|jpe?g|gif|webp|bmp|avif|svg|pdf|docx?|xlsx?|pptx?|txt|csv|md|rtf|epub|zip|tar|gz|rar|7z|mp3|wav|ogg|aac|flac|m4a|mp4|mov|avi|mkv|webm|m4v';
// Unix absolute paths (/... or ~/...) — lookbehind rejects mid-token slashes
// (e.g. "path/to/file.mp4", "https://example.com/file.mp4")
const unixRegex = new RegExp(`(?<![\\w./:])((?:\\/|~\\/)[^\\s\\n"'()\\[\\],<>]*?\\.(?:${exts}))`, 'gi');
// Windows absolute paths (C:\... D:\...) — lookbehind rejects drive letter glued to a word
const winRegex = new RegExp(`(?<![\\w])([A-Za-z]:\\\\[^\\s\\n"'()\\[\\],<>]*?\\.(?:${exts}))`, 'gi');
for (const regex of [unixRegex, winRegex]) {
let match;
while ((match = regex.exec(text)) !== null) {
const p = match[1];
if (p && !seen.has(p)) {
seen.add(p);
refs.push({ filePath: p, mimeType: mimeFromExtension(p) });
}
}
}
return refs;
}
/**
* Extract images from a content array (including nested tool_result content).
* Converts them to AttachedFileMeta entries with preview set to data URL or remote URL.
*/
function extractImagesAsAttachedFiles(content: unknown): AttachedFileMeta[] {
if (!Array.isArray(content)) return [];
const files: AttachedFileMeta[] = [];
for (const block of content as ContentBlock[]) {
if (block.type === 'image') {
// Path 1: Anthropic source-wrapped format {source: {type, media_type, data}}
if (block.source) {
const src = block.source;
const mimeType = src.media_type || 'image/jpeg';
if (src.type === 'base64' && src.data) {
files.push({
fileName: 'image',
mimeType,
fileSize: 0,
preview: `data:${mimeType};base64,${src.data}`,
});
} else if (src.type === 'url' && src.url) {
files.push({
fileName: 'image',
mimeType,
fileSize: 0,
preview: src.url,
});
}
}
// Path 2: Flat format from Gateway tool results {data, mimeType}
else if (block.data) {
const mimeType = block.mimeType || 'image/jpeg';
files.push({
fileName: 'image',
mimeType,
fileSize: 0,
preview: `data:${mimeType};base64,${block.data}`,
});
}
}
// Recurse into tool_result content blocks
if ((block.type === 'tool_result' || block.type === 'toolResult') && block.content) {
files.push(...extractImagesAsAttachedFiles(block.content));
}
}
return files;
}
/**
* Build an AttachedFileMeta entry for a file ref, using cache if available.
*/
function makeAttachedFile(ref: { filePath: string; mimeType: string }): AttachedFileMeta {
const cached = _imageCache.get(ref.filePath);
if (cached) return { ...cached, filePath: ref.filePath };
const fileName = ref.filePath.split(/[\\/]/).pop() || 'file';
return { fileName, mimeType: ref.mimeType, fileSize: 0, preview: null, filePath: ref.filePath };
}
/**
* Extract file path from a tool call's arguments by toolCallId.
* Searches common argument names: file_path, filePath, path, file.
*/
function getToolCallFilePath(msg: RawMessage, toolCallId: string): string | undefined {
if (!toolCallId) return undefined;
// Anthropic/normalized format — toolCall blocks in content array
const content = msg.content;
if (Array.isArray(content)) {
for (const block of content as ContentBlock[]) {
if ((block.type === 'tool_use' || block.type === 'toolCall') && block.id === toolCallId) {
const args = (block.input ?? block.arguments) as Record<string, unknown> | undefined;
if (args) {
const fp = args.file_path ?? args.filePath ?? args.path ?? args.file;
if (typeof fp === 'string') return fp;
}
}
}
}
// OpenAI format — tool_calls array on the message itself
const msgAny = msg as unknown as Record<string, unknown>;
const toolCalls = msgAny.tool_calls ?? msgAny.toolCalls;
if (Array.isArray(toolCalls)) {
for (const tc of toolCalls as Array<Record<string, unknown>>) {
if (tc.id !== toolCallId) continue;
const fn = (tc.function ?? tc) as Record<string, unknown>;
let args: Record<string, unknown> | undefined;
try {
args = typeof fn.arguments === 'string' ? JSON.parse(fn.arguments) : (fn.arguments ?? fn.input) as Record<string, unknown>;
} catch { /* ignore */ }
if (args) {
const fp = args.file_path ?? args.filePath ?? args.path ?? args.file;
if (typeof fp === 'string') return fp;
}
}
}
return undefined;
}
/**
* Collect all tool call file paths from a message into a Map<toolCallId, filePath>.
*/
function collectToolCallPaths(msg: RawMessage, paths: Map<string, string>): void {
const content = msg.content;
if (Array.isArray(content)) {
for (const block of content as ContentBlock[]) {
if ((block.type === 'tool_use' || block.type === 'toolCall') && block.id) {
const args = (block.input ?? block.arguments) as Record<string, unknown> | undefined;
if (args) {
const fp = args.file_path ?? args.filePath ?? args.path ?? args.file;
if (typeof fp === 'string') paths.set(block.id, fp);
}
}
}
}
const msgAny = msg as unknown as Record<string, unknown>;
const toolCalls = msgAny.tool_calls ?? msgAny.toolCalls;
if (Array.isArray(toolCalls)) {
for (const tc of toolCalls as Array<Record<string, unknown>>) {
const id = typeof tc.id === 'string' ? tc.id : '';
if (!id) continue;
const fn = (tc.function ?? tc) as Record<string, unknown>;
let args: Record<string, unknown> | undefined;
try {
args = typeof fn.arguments === 'string' ? JSON.parse(fn.arguments) : (fn.arguments ?? fn.input) as Record<string, unknown>;
} catch { /* ignore */ }
if (args) {
const fp = args.file_path ?? args.filePath ?? args.path ?? args.file;
if (typeof fp === 'string') paths.set(id, fp);
}
}
}
}
/**
* Before filtering tool_result messages from history, scan them for any file/image
* content and attach those to the immediately following assistant message.
* This mirrors channel push message behavior where tool outputs surface files to the UI.
* Handles:
* - Image content blocks (base64 / url)
* - [media attached: path (mime) | path] text patterns in tool result output
* - Raw file paths in tool result text
*/
function enrichWithToolResultFiles(messages: RawMessage[]): RawMessage[] {
const pending: AttachedFileMeta[] = [];
const toolCallPaths = new Map<string, string>();
return messages.map((msg) => {
// Track file paths from assistant tool call arguments for later matching
if (msg.role === 'assistant') {
collectToolCallPaths(msg, toolCallPaths);
}
if (isToolResultRole(msg.role)) {
// Resolve file path from the matching tool call
const matchedPath = msg.toolCallId ? toolCallPaths.get(msg.toolCallId) : undefined;
// 1. Image/file content blocks in the structured content array
const imageFiles = extractImagesAsAttachedFiles(msg.content);
if (matchedPath) {
for (const f of imageFiles) {
if (!f.filePath) {
f.filePath = matchedPath;
f.fileName = matchedPath.split(/[\\/]/).pop() || 'image';
}
}
}
pending.push(...imageFiles);
// 2. [media attached: ...] patterns in tool result text output
const text = getMessageText(msg.content);
if (text) {
const mediaRefs = extractMediaRefs(text);
const mediaRefPaths = new Set(mediaRefs.map(r => r.filePath));
for (const ref of mediaRefs) {
pending.push(makeAttachedFile(ref));
}
// 3. Raw file paths in tool result text (documents, audio, video, etc.)
for (const ref of extractRawFilePaths(text)) {
if (!mediaRefPaths.has(ref.filePath)) {
pending.push(makeAttachedFile(ref));
}
}
}
return msg; // will be filtered later
}
if (msg.role === 'assistant' && pending.length > 0) {
const toAttach = pending.splice(0);
// Deduplicate against files already on the assistant message
const existingPaths = new Set(
(msg._attachedFiles || []).map(f => f.filePath).filter(Boolean),
);
const newFiles = toAttach.filter(f => !f.filePath || !existingPaths.has(f.filePath));
if (newFiles.length === 0) return msg;
return {
...msg,
_attachedFiles: [...(msg._attachedFiles || []), ...newFiles],
};
}
return msg;
});
}
/**
* Restore _attachedFiles for messages loaded from history.
* Handles:
* 1. [media attached: path (mime) | path] patterns (attachment-button flow)
* 2. Raw image file paths typed in message text (e.g. /Users/.../image.png)
* Uses local cache for previews when available; missing previews are loaded async.
*/
function enrichWithCachedImages(messages: RawMessage[]): RawMessage[] {
return messages.map((msg, idx) => {
// Only process user and assistant messages; skip if already enriched
if ((msg.role !== 'user' && msg.role !== 'assistant') || msg._attachedFiles) return msg;
const text = getMessageText(msg.content);
// Path 1: [media attached: path (mime) | path] — guaranteed format from attachment button
const mediaRefs = extractMediaRefs(text);
const mediaRefPaths = new Set(mediaRefs.map(r => r.filePath));
// Path 2: Raw file paths.
// For assistant messages: scan own text AND the nearest preceding user message text,
// but only for non-tool-only assistant messages (i.e. the final answer turn).
// Tool-only messages (thinking + tool calls) should not show file previews — those
// belong to the final answer message that comes after the tool results.
// User messages never get raw-path previews so the image is not shown twice.
let rawRefs: Array<{ filePath: string; mimeType: string }> = [];
if (msg.role === 'assistant' && !isToolOnlyMessage(msg)) {
// Own text
rawRefs = extractRawFilePaths(text).filter(r => !mediaRefPaths.has(r.filePath));
// Nearest preceding user message text (look back up to 5 messages)
const seenPaths = new Set(rawRefs.map(r => r.filePath));
for (let i = idx - 1; i >= Math.max(0, idx - 5); i--) {
const prev = messages[i];
if (!prev) break;
if (prev.role === 'user') {
const prevText = getMessageText(prev.content);
for (const ref of extractRawFilePaths(prevText)) {
if (!mediaRefPaths.has(ref.filePath) && !seenPaths.has(ref.filePath)) {
seenPaths.add(ref.filePath);
rawRefs.push(ref);
}
}
break; // only use the nearest user message
}
}
}
const allRefs = [...mediaRefs, ...rawRefs];
if (allRefs.length === 0) return msg;
const files: AttachedFileMeta[] = allRefs.map(ref => {
const cached = _imageCache.get(ref.filePath);
if (cached) return { ...cached, filePath: ref.filePath };
const fileName = ref.filePath.split(/[\\/]/).pop() || 'file';
return { fileName, mimeType: ref.mimeType, fileSize: 0, preview: null, filePath: ref.filePath };
});
return { ...msg, _attachedFiles: files };
});
}
/**
* Async: load missing previews from disk via IPC for messages that have
* _attachedFiles with null previews. Updates messages in-place and triggers re-render.
* Handles both [media attached: ...] patterns and raw filePath entries.
*/
async function loadMissingPreviews(messages: RawMessage[]): Promise<boolean> {
// Collect all image paths that need previews
const needPreview: Array<{ filePath: string; mimeType: string }> = [];
const seenPaths = new Set<string>();
for (const msg of messages) {
if (!msg._attachedFiles) continue;
// Path 1: files with explicit filePath field (raw path detection or enriched refs)
for (const file of msg._attachedFiles) {
const fp = file.filePath;
if (!fp || seenPaths.has(fp)) continue;
// Images: need preview. Non-images: need file size (for FileCard display).
const needsLoad = file.mimeType.startsWith('image/')
? !file.preview
: file.fileSize === 0;
if (needsLoad) {
seenPaths.add(fp);
needPreview.push({ filePath: fp, mimeType: file.mimeType });
}
}
// Path 2: [media attached: ...] patterns (legacy — in case filePath wasn't stored)
if (msg.role === 'user') {
const text = getMessageText(msg.content);
const refs = extractMediaRefs(text);
for (let i = 0; i < refs.length; i++) {
const file = msg._attachedFiles[i];
const ref = refs[i];
if (!file || !ref || seenPaths.has(ref.filePath)) continue;
const needsLoad = ref.mimeType.startsWith('image/') ? !file.preview : file.fileSize === 0;
if (needsLoad) {
seenPaths.add(ref.filePath);
needPreview.push(ref);
}
}
}
}
if (needPreview.length === 0) return false;
try {
const thumbnails = await hostApiFetch<Record<string, { preview: string | null; fileSize: number }>>(
'/api/files/thumbnails',
{
method: 'POST',
body: JSON.stringify({ paths: needPreview }),
},
);
let updated = false;
for (const msg of messages) {
if (!msg._attachedFiles) continue;
// Update files that have filePath
for (const file of msg._attachedFiles) {
const fp = file.filePath;
if (!fp) continue;
const thumb = thumbnails[fp];
if (thumb && (thumb.preview || thumb.fileSize)) {
if (thumb.preview) file.preview = thumb.preview;
if (thumb.fileSize) file.fileSize = thumb.fileSize;
_imageCache.set(fp, { ...file });
updated = true;
}
}
// Legacy: update by index for [media attached: ...] refs
if (msg.role === 'user') {
const text = getMessageText(msg.content);
const refs = extractMediaRefs(text);
for (let i = 0; i < refs.length; i++) {
const file = msg._attachedFiles[i];
const ref = refs[i];
if (!file || !ref || file.filePath) continue; // skip if already handled via filePath
const thumb = thumbnails[ref.filePath];
if (thumb && (thumb.preview || thumb.fileSize)) {
if (thumb.preview) file.preview = thumb.preview;
if (thumb.fileSize) file.fileSize = thumb.fileSize;
_imageCache.set(ref.filePath, { ...file });
updated = true;
}
}
}
}
if (updated) saveImageCache(_imageCache);
return updated;
} catch (err) {
console.warn('[loadMissingPreviews] Failed:', err);
return false;
}
}
function getCanonicalPrefixFromSessions(sessions: ChatSession[]): string | null {
const canonical = sessions.find((s) => s.key.startsWith('agent:'))?.key;
if (!canonical) return null;
const parts = canonical.split(':');
if (parts.length < 2) return null;
return `${parts[0]}:${parts[1]}`;
}
function getAgentIdFromSessionKey(sessionKey: string): string {
if (!sessionKey.startsWith('agent:')) return 'main';
const parts = sessionKey.split(':');
return parts[1] || 'main';
}
function parseSessionUpdatedAtMs(value: unknown): number | undefined {
if (typeof value === 'number' && Number.isFinite(value)) {
return toMs(value);
}
if (typeof value === 'string' && value.trim()) {
const parsed = Date.parse(value);
if (Number.isFinite(parsed)) {
return parsed;
}
}
return undefined;
}
async function loadCronFallbackMessages(sessionKey: string, limit = 200): Promise<RawMessage[]> {
if (!isCronSessionKey(sessionKey)) return [];
try {
const response = await hostApiFetch<{ messages?: RawMessage[] }>(
buildCronSessionHistoryPath(sessionKey, limit),
);
return Array.isArray(response.messages) ? response.messages : [];
} catch (error) {
console.warn('Failed to load cron fallback history:', error);
return [];
}
}
function normalizeAgentId(value: string | undefined | null): string {
return (value ?? '').trim().toLowerCase() || 'main';
}
function buildFallbackMainSessionKey(agentId: string): string {
return `agent:${normalizeAgentId(agentId)}:main`;
}
function resolveMainSessionKeyForAgent(agentId: string | undefined | null): string | null {
if (!agentId) return null;
const normalizedAgentId = normalizeAgentId(agentId);
const summary = useAgentsStore.getState().agents.find((agent) => agent.id === normalizedAgentId);
return summary?.mainSessionKey || buildFallbackMainSessionKey(normalizedAgentId);
}
function ensureSessionEntry(sessions: ChatSession[], sessionKey: string): ChatSession[] {
if (sessions.some((session) => session.key === sessionKey)) {
return sessions;
}
return [...sessions, { key: sessionKey, displayName: sessionKey }];
}
function clearSessionEntryFromMap<T extends Record<string, unknown>>(entries: T, sessionKey: string): T {
return Object.fromEntries(Object.entries(entries).filter(([key]) => key !== sessionKey)) as T;
}
function buildSessionSwitchPatch(
state: Pick<
ChatState,
'currentSessionKey' | 'messages' | 'sessions' | 'sessionLabels' | 'sessionLastActivity'
>,
nextSessionKey: string,
): Partial<ChatState> {
// 仅将没有任何历史记录且无活动时间的会话视为空会话。
// 单纯依赖 messages.length 是不可靠的,因为 switchSession 会在真正调用 loadHistory 前抢先清空当前 messages
// 造成竞争条件,使得带有真实历史的会话被判定为空并从侧边栏移除。
const leavingEmpty = !state.currentSessionKey.endsWith(':main')
&& state.messages.length === 0
&& !state.sessionLastActivity[state.currentSessionKey]
&& !state.sessionLabels[state.currentSessionKey];
const nextSessions = leavingEmpty
? state.sessions.filter((session) => session.key !== state.currentSessionKey)
: state.sessions;
return {
currentSessionKey: nextSessionKey,
currentAgentId: getAgentIdFromSessionKey(nextSessionKey),
sessions: ensureSessionEntry(nextSessions, nextSessionKey),
sessionLabels: leavingEmpty
? clearSessionEntryFromMap(state.sessionLabels, state.currentSessionKey)
: state.sessionLabels,
sessionLastActivity: leavingEmpty
? clearSessionEntryFromMap(state.sessionLastActivity, state.currentSessionKey)
: state.sessionLastActivity,
messages: [],
streamingText: '',
streamingMessage: null,
streamingTools: [],
activeRunId: null,
error: null,
pendingFinal: false,
lastUserMessageAt: null,
pendingToolImages: [],
};
}
function getCanonicalPrefixFromSessionKey(sessionKey: string): string | null {
if (!sessionKey.startsWith('agent:')) return null;
const parts = sessionKey.split(':');
if (parts.length < 2) return null;
return `${parts[0]}:${parts[1]}`;
}
function isToolOnlyMessage(message: RawMessage | undefined): boolean {
if (!message) return false;
if (isToolResultRole(message.role)) return true;
const msg = message as unknown as Record<string, unknown>;
const content = message.content;
// Check OpenAI-format tool_calls field (real-time streaming from OpenAI-compatible models)
const toolCalls = msg.tool_calls ?? msg.toolCalls;
const hasOpenAITools = Array.isArray(toolCalls) && toolCalls.length > 0;
if (!Array.isArray(content)) {
// Content is not an array — check if there's OpenAI-format tool_calls
if (hasOpenAITools) {
// Has tool calls but content might be empty/string — treat as tool-only
// if there's no meaningful text content
const textContent = typeof content === 'string' ? content.trim() : '';
return textContent.length === 0;
}
return false;
}
let hasTool = hasOpenAITools;
let hasText = false;
let hasNonToolContent = false;
for (const block of content as ContentBlock[]) {
if (block.type === 'tool_use' || block.type === 'tool_result' || block.type === 'toolCall' || block.type === 'toolResult') {
hasTool = true;
continue;
}
if (block.type === 'text' && block.text && block.text.trim()) {
hasText = true;
continue;
}
// Only actual image output disqualifies a tool-only message.
// Thinking blocks are internal reasoning that can accompany tool_use — they
// should NOT prevent the message from being treated as an intermediate tool step.
if (block.type === 'image') {
hasNonToolContent = true;
}
}
return hasTool && !hasText && !hasNonToolContent;
}
function isToolResultRole(role: unknown): boolean {
if (!role) return false;
const normalized = String(role).toLowerCase();
return normalized === 'toolresult' || normalized === 'tool_result';
}
function extractTextFromContent(content: unknown): string {
if (typeof content === 'string') return content;
if (!Array.isArray(content)) return '';
const parts: string[] = [];
for (const block of content as ContentBlock[]) {
if (block.type === 'text' && block.text) {
parts.push(block.text);
}
}
return parts.join('\n');
}
function summarizeToolOutput(text: string): string | undefined {
const trimmed = text.trim();
if (!trimmed) return undefined;
const lines = trimmed.split(/\r?\n/).map((line) => line.trim()).filter(Boolean);
if (lines.length === 0) return undefined;
const summaryLines = lines.slice(0, 2);
let summary = summaryLines.join(' / ');
if (summary.length > 160) {
summary = `${summary.slice(0, 157)}...`;
}
return summary;
}
function normalizeToolStatus(rawStatus: unknown, fallback: 'running' | 'completed'): ToolStatus['status'] {
const status = typeof rawStatus === 'string' ? rawStatus.toLowerCase() : '';
if (status === 'error' || status === 'failed') return 'error';
if (status === 'completed' || status === 'success' || status === 'done') return 'completed';
return fallback;
}
function parseDurationMs(value: unknown): number | undefined {
if (typeof value === 'number' && Number.isFinite(value)) return value;
const parsed = typeof value === 'string' ? Number(value) : NaN;
return Number.isFinite(parsed) ? parsed : undefined;
}
function extractToolUseUpdates(message: unknown): ToolStatus[] {
if (!message || typeof message !== 'object') return [];
const msg = message as Record<string, unknown>;
const updates: ToolStatus[] = [];
// Path 1: Anthropic/normalized format — tool blocks inside content array
const content = msg.content;
if (Array.isArray(content)) {
for (const block of content as ContentBlock[]) {
if ((block.type !== 'tool_use' && block.type !== 'toolCall') || !block.name) continue;
updates.push({
id: block.id || block.name,
toolCallId: block.id,
name: block.name,
status: 'running',
updatedAt: Date.now(),
});
}
}
// Path 2: OpenAI format — tool_calls array on the message itself
if (updates.length === 0) {
const toolCalls = msg.tool_calls ?? msg.toolCalls;
if (Array.isArray(toolCalls)) {
for (const tc of toolCalls as Array<Record<string, unknown>>) {
const fn = (tc.function ?? tc) as Record<string, unknown>;
const name = typeof fn.name === 'string' ? fn.name : '';
if (!name) continue;
const id = typeof tc.id === 'string' ? tc.id : name;
updates.push({
id,
toolCallId: typeof tc.id === 'string' ? tc.id : undefined,
name,
status: 'running',
updatedAt: Date.now(),
});
}
}
}
return updates;
}
function extractToolResultBlocks(message: unknown, eventState: string): ToolStatus[] {
if (!message || typeof message !== 'object') return [];
const msg = message as Record<string, unknown>;
const content = msg.content;
if (!Array.isArray(content)) return [];
const updates: ToolStatus[] = [];
for (const block of content as ContentBlock[]) {
if (block.type !== 'tool_result' && block.type !== 'toolResult') continue;
const outputText = extractTextFromContent(block.content ?? block.text ?? '');
const summary = summarizeToolOutput(outputText);
updates.push({
id: block.id || block.name || 'tool',
toolCallId: block.id,
name: block.name || block.id || 'tool',
status: normalizeToolStatus(undefined, eventState === 'delta' ? 'running' : 'completed'),
summary,
updatedAt: Date.now(),
});
}
return updates;
}
function extractToolResultUpdate(message: unknown, eventState: string): ToolStatus | null {
if (!message || typeof message !== 'object') return null;
const msg = message as Record<string, unknown>;
const role = typeof msg.role === 'string' ? msg.role.toLowerCase() : '';
if (!isToolResultRole(role)) return null;
const toolName = typeof msg.toolName === 'string' ? msg.toolName : (typeof msg.name === 'string' ? msg.name : '');
const toolCallId = typeof msg.toolCallId === 'string' ? msg.toolCallId : undefined;
const details = (msg.details && typeof msg.details === 'object') ? msg.details as Record<string, unknown> : undefined;
const rawStatus = (msg.status ?? details?.status);
const fallback = eventState === 'delta' ? 'running' : 'completed';
const status = normalizeToolStatus(rawStatus, fallback);
const durationMs = parseDurationMs(details?.durationMs ?? details?.duration ?? (msg as Record<string, unknown>).durationMs);
const outputText = (details && typeof details.aggregated === 'string')
? details.aggregated
: extractTextFromContent(msg.content);
const summary = summarizeToolOutput(outputText) ?? summarizeToolOutput(String(details?.error ?? msg.error ?? ''));
const name = toolName || toolCallId || 'tool';
const id = toolCallId || name;
return {
id,
toolCallId,
name,
status,
durationMs,
summary,
updatedAt: Date.now(),
};
}
function mergeToolStatus(existing: ToolStatus['status'], incoming: ToolStatus['status']): ToolStatus['status'] {
const order: Record<ToolStatus['status'], number> = { running: 0, completed: 1, error: 2 };
return order[incoming] >= order[existing] ? incoming : existing;
}
function upsertToolStatuses(current: ToolStatus[], updates: ToolStatus[]): ToolStatus[] {
if (updates.length === 0) return current;
const next = [...current];
for (const update of updates) {
const key = update.toolCallId || update.id || update.name;
if (!key) continue;
const index = next.findIndex((tool) => (tool.toolCallId || tool.id || tool.name) === key);
if (index === -1) {
next.push(update);
continue;
}
const existing = next[index];
next[index] = {
...existing,
...update,
name: update.name || existing.name,
status: mergeToolStatus(existing.status, update.status),
durationMs: update.durationMs ?? existing.durationMs,
summary: update.summary ?? existing.summary,
updatedAt: update.updatedAt || existing.updatedAt,
};
}
return next;
}
function collectToolUpdates(message: unknown, eventState: string): ToolStatus[] {
const updates: ToolStatus[] = [];
const toolResultUpdate = extractToolResultUpdate(message, eventState);
if (toolResultUpdate) updates.push(toolResultUpdate);
updates.push(...extractToolResultBlocks(message, eventState));
updates.push(...extractToolUseUpdates(message));
return updates;
}
function hasNonToolAssistantContent(message: RawMessage | undefined): boolean {
if (!message) return false;
if (typeof message.content === 'string' && message.content.trim()) return true;
const content = message.content;
if (Array.isArray(content)) {
for (const block of content as ContentBlock[]) {
if (block.type === 'text' && block.text && block.text.trim()) return true;
if (block.type === 'thinking' && block.thinking && block.thinking.trim()) return true;
if (block.type === 'image') return true;
}
}
const msg = message as unknown as Record<string, unknown>;
if (typeof msg.text === 'string' && msg.text.trim()) return true;
return false;
}
// ── Store ────────────────────────────────────────────────────────
export const useChatStore = create<ChatState>((set, get) => ({
messages: [],
loading: false,
error: null,
sending: false,
activeRunId: null,
streamingText: '',
streamingMessage: null,
streamingTools: [],
pendingFinal: false,
lastUserMessageAt: null,
pendingToolImages: [],
sessions: [],
currentSessionKey: DEFAULT_SESSION_KEY,
currentAgentId: 'main',
sessionLabels: {},
sessionLastActivity: {},
showThinking: true,
thinkingLevel: null,
// ── Load sessions via sessions.list ──
loadSessions: async () => {
const now = Date.now();
if (_loadSessionsInFlight) {
await _loadSessionsInFlight;
return;
}
if (now - _lastLoadSessionsAt < SESSION_LOAD_MIN_INTERVAL_MS) {
return;
}
_loadSessionsInFlight = (async () => {
try {
const data = await useGatewayStore.getState().rpc<Record<string, unknown>>('sessions.list', {});
if (data) {
const rawSessions = Array.isArray(data.sessions) ? data.sessions : [];
const sessions: ChatSession[] = rawSessions.map((s: Record<string, unknown>) => ({
key: String(s.key || ''),
label: s.label ? String(s.label) : undefined,
displayName: s.displayName ? String(s.displayName) : undefined,
thinkingLevel: s.thinkingLevel ? String(s.thinkingLevel) : undefined,
model: s.model ? String(s.model) : undefined,
updatedAt: parseSessionUpdatedAtMs(s.updatedAt),
})).filter((s: ChatSession) => s.key);
const canonicalBySuffix = new Map<string, string>();
for (const session of sessions) {
if (!session.key.startsWith('agent:')) continue;
const parts = session.key.split(':');
if (parts.length < 3) continue;
const suffix = parts.slice(2).join(':');
if (suffix && !canonicalBySuffix.has(suffix)) {
canonicalBySuffix.set(suffix, session.key);
}
}
// Deduplicate: if both short and canonical existed, keep canonical only
const seen = new Set<string>();
const dedupedSessions = sessions.filter((s) => {
if (!s.key.startsWith('agent:') && canonicalBySuffix.has(s.key)) return false;
if (seen.has(s.key)) return false;
seen.add(s.key);
return true;
});
const { currentSessionKey, sessions: localSessions } = get();
let nextSessionKey = currentSessionKey || DEFAULT_SESSION_KEY;
if (!nextSessionKey.startsWith('agent:')) {
const canonicalMatch = canonicalBySuffix.get(nextSessionKey);
if (canonicalMatch) {
nextSessionKey = canonicalMatch;
}
}
if (!dedupedSessions.find((s) => s.key === nextSessionKey) && dedupedSessions.length > 0) {
// Preserve only locally-created pending sessions. On initial boot the
// default ghost key (`agent:main:main`) should yield to real history.
const hasLocalPendingSession = localSessions.some((session) => session.key === nextSessionKey);
if (!hasLocalPendingSession) {
nextSessionKey = dedupedSessions[0].key;
}
}
const sessionsWithCurrent = !dedupedSessions.find((s) => s.key === nextSessionKey) && nextSessionKey
? [
...dedupedSessions,
{ key: nextSessionKey, displayName: nextSessionKey },
]
: dedupedSessions;
const discoveredActivity = Object.fromEntries(
sessionsWithCurrent
.filter((session) => typeof session.updatedAt === 'number' && Number.isFinite(session.updatedAt))
.map((session) => [session.key, session.updatedAt!]),
);
set((state) => ({
sessions: sessionsWithCurrent,
currentSessionKey: nextSessionKey,
currentAgentId: getAgentIdFromSessionKey(nextSessionKey),
sessionLastActivity: {
...state.sessionLastActivity,
...discoveredActivity,
},
}));
if (currentSessionKey !== nextSessionKey) {
void get().loadHistory();
}
// Background: fetch first user message for every non-main session to populate labels upfront.
// Uses a small limit so it's cheap; runs in parallel and doesn't block anything.
const sessionsToLabel = sessionsWithCurrent.filter((s) => !s.key.endsWith(':main'));
if (sessionsToLabel.length > 0) {
void Promise.all(
sessionsToLabel.map(async (session) => {
try {
const r = await useGatewayStore.getState().rpc<Record<string, unknown>>(
'chat.history',
{ sessionKey: session.key, limit: 1000 },
);
const msgs = Array.isArray(r.messages) ? r.messages as RawMessage[] : [];
const firstUser = msgs.find((m) => m.role === 'user');
const lastMsg = msgs[msgs.length - 1];
set((s) => {
const next: Partial<typeof s> = {};
if (firstUser) {
const labelText = getMessageText(firstUser.content).trim();
if (labelText) {
const truncated = labelText.length > 50 ? `${labelText.slice(0, 50)}` : labelText;
next.sessionLabels = { ...s.sessionLabels, [session.key]: truncated };
}
}
if (lastMsg?.timestamp) {
next.sessionLastActivity = { ...s.sessionLastActivity, [session.key]: toMs(lastMsg.timestamp) };
}
return next;
});
} catch {
// ignore per-session errors
}
}),
);
}
}
} catch (err) {
console.warn('Failed to load sessions:', err);
} finally {
_lastLoadSessionsAt = Date.now();
}
})();
try {
await _loadSessionsInFlight;
} finally {
_loadSessionsInFlight = null;
}
},
// ── Switch session ──
switchSession: (key: string) => {
if (key === get().currentSessionKey) return;
// Stop any background polling for the old session before switching.
// This prevents the poll timer from firing after the switch and loading
// the wrong session's history into the new session's view.
clearHistoryPoll();
set((s) => buildSessionSwitchPatch(s, key));
get().loadHistory();
},
// ── Delete session ──
//
// NOTE: The OpenClaw Gateway does NOT expose a sessions.delete (or equivalent)
// RPC — confirmed by inspecting client.ts, protocol.ts and the full codebase.
// Deletion is therefore a local-only UI operation: the session is removed from
// the sidebar list and its labels/activity maps are cleared. The underlying
// JSONL history file on disk is intentionally left intact, consistent with the
// newSession() design that avoids sessions.reset to preserve history.
deleteSession: async (key: string) => {
// Soft-delete the session's JSONL transcript on disk.
// The main process renames <suffix>.jsonl → <suffix>.deleted.jsonl so that
// sessions.list skips it automatically.
try {
const result = await hostApiFetch<{
success: boolean;
error?: string;
}>('/api/sessions/delete', {
method: 'POST',
body: JSON.stringify({ sessionKey: key }),
});
if (!result.success) {
console.warn(`[deleteSession] IPC reported failure for ${key}:`, result.error);
}
} catch (err) {
console.warn(`[deleteSession] IPC call failed for ${key}:`, err);
}
const { currentSessionKey, sessions } = get();
const remaining = sessions.filter((s) => s.key !== key);
if (currentSessionKey === key) {
// Switched away from deleted session — pick the first remaining or create new
const next = remaining[0];
set((s) => ({
sessions: remaining,
sessionLabels: Object.fromEntries(Object.entries(s.sessionLabels).filter(([k]) => k !== key)),
sessionLastActivity: Object.fromEntries(Object.entries(s.sessionLastActivity).filter(([k]) => k !== key)),
messages: [],
streamingText: '',
streamingMessage: null,
streamingTools: [],
activeRunId: null,
error: null,
pendingFinal: false,
lastUserMessageAt: null,
pendingToolImages: [],
currentSessionKey: next?.key ?? DEFAULT_SESSION_KEY,
currentAgentId: getAgentIdFromSessionKey(next?.key ?? DEFAULT_SESSION_KEY),
}));
if (next) {
get().loadHistory();
}
} else {
set((s) => ({
sessions: remaining,
sessionLabels: Object.fromEntries(Object.entries(s.sessionLabels).filter(([k]) => k !== key)),
sessionLastActivity: Object.fromEntries(Object.entries(s.sessionLastActivity).filter(([k]) => k !== key)),
}));
}
},
// ── New session ──
newSession: () => {
// Generate a new unique session key and switch to it.
// NOTE: We intentionally do NOT call sessions.reset on the old session.
// sessions.reset archives (renames) the session JSONL file, making old
// conversation history inaccessible when the user switches back to it.
const { currentSessionKey, messages, sessions, sessionLastActivity, sessionLabels } = get();
// 仅将没有任何历史记录且无活动时间的会话视为空会话
const leavingEmpty = !currentSessionKey.endsWith(':main')
&& messages.length === 0
&& !sessionLastActivity[currentSessionKey]
&& !sessionLabels[currentSessionKey];
const prefix = getCanonicalPrefixFromSessionKey(currentSessionKey)
?? getCanonicalPrefixFromSessions(sessions)
?? DEFAULT_CANONICAL_PREFIX;
const newKey = `${prefix}:session-${Date.now()}`;
const newSessionEntry: ChatSession = { key: newKey, displayName: newKey };
set((s) => ({
currentSessionKey: newKey,
currentAgentId: getAgentIdFromSessionKey(newKey),
sessions: [
...(leavingEmpty ? s.sessions.filter((sess) => sess.key !== currentSessionKey) : s.sessions),
newSessionEntry,
],
sessionLabels: leavingEmpty
? Object.fromEntries(Object.entries(s.sessionLabels).filter(([k]) => k !== currentSessionKey))
: s.sessionLabels,
sessionLastActivity: leavingEmpty
? Object.fromEntries(Object.entries(s.sessionLastActivity).filter(([k]) => k !== currentSessionKey))
: s.sessionLastActivity,
messages: [],
streamingText: '',
streamingMessage: null,
streamingTools: [],
activeRunId: null,
error: null,
pendingFinal: false,
lastUserMessageAt: null,
pendingToolImages: [],
}));
},
// ── Cleanup empty session on navigate away ──
cleanupEmptySession: () => {
const { currentSessionKey, messages, sessionLastActivity, sessionLabels } = get();
// Only remove non-main sessions that were never used (no messages sent).
// This mirrors the "leavingEmpty" logic in switchSession so that creating
// a new session and immediately navigating away doesn't leave a ghost entry
// in the sidebar.
// 同样需要综合检查 sessionLastActivity 和 sessionLabels
// 防止因为 switchSession 抢先清空 messages 而误判有历史的会话为空。
const isEmptyNonMain = !currentSessionKey.endsWith(':main')
&& messages.length === 0
&& !sessionLastActivity[currentSessionKey]
&& !sessionLabels[currentSessionKey];
if (!isEmptyNonMain) return;
set((s) => ({
sessions: s.sessions.filter((sess) => sess.key !== currentSessionKey),
sessionLabels: Object.fromEntries(
Object.entries(s.sessionLabels).filter(([k]) => k !== currentSessionKey),
),
sessionLastActivity: Object.fromEntries(
Object.entries(s.sessionLastActivity).filter(([k]) => k !== currentSessionKey),
),
}));
},
// ── Load chat history ──
loadHistory: async (quiet = false) => {
const { currentSessionKey } = get();
const existingLoad = _historyLoadInFlight.get(currentSessionKey);
if (existingLoad) {
await existingLoad;
return;
}
const lastLoadAt = _lastHistoryLoadAtBySession.get(currentSessionKey) || 0;
if (quiet && Date.now() - lastLoadAt < HISTORY_LOAD_MIN_INTERVAL_MS) {
return;
}
if (!quiet) set({ loading: true, error: null });
// 安全保护:如果历史记录加载花费太多时间,则强制将 loading 设置为 false
// 防止 UI 永远卡在转圈状态。
let loadingTimedOut = false;
const loadingSafetyTimer = quiet ? null : setTimeout(() => {
loadingTimedOut = true;
set({ loading: false });
}, 15_000);
const loadPromise = (async () => {
const applyLoadedMessages = (rawMessages: RawMessage[], thinkingLevel: string | null) => {
// Guard: if the user switched sessions while this async load was in
// flight, discard the result to prevent overwriting the new session's
// messages with stale data from the old session.
if (get().currentSessionKey !== currentSessionKey) return;
// Before filtering: attach images/files from tool_result messages to the next assistant message
const messagesWithToolImages = enrichWithToolResultFiles(rawMessages);
const filteredMessages = messagesWithToolImages.filter((msg) => !isToolResultRole(msg.role));
// Restore file attachments for user/assistant messages (from cache + text patterns)
const enrichedMessages = enrichWithCachedImages(filteredMessages);
// Preserve the optimistic user message during an active send.
// The Gateway may not include the user's message in chat.history
// until the run completes, causing it to flash out of the UI.
let finalMessages = enrichedMessages;
const userMsgAt = get().lastUserMessageAt;
if (get().sending && userMsgAt) {
const userMsMs = toMs(userMsgAt);
const hasRecentUser = enrichedMessages.some(
(m) => m.role === 'user' && m.timestamp && Math.abs(toMs(m.timestamp) - userMsMs) < 5000,
);
if (!hasRecentUser) {
const currentMsgs = get().messages;
const optimistic = [...currentMsgs].reverse().find(
(m) => m.role === 'user' && m.timestamp && Math.abs(toMs(m.timestamp) - userMsMs) < 5000,
);
if (optimistic) {
finalMessages = [...enrichedMessages, optimistic];
}
}
}
set({ messages: finalMessages, thinkingLevel, loading: false });
// Extract first user message text as a session label for display in the toolbar.
// Skip main sessions (key ends with ":main") — they rely on the Gateway-provided
// displayName (e.g. the configured agent name "ClawX") instead.
const isMainSession = currentSessionKey.endsWith(':main');
if (!isMainSession) {
const firstUserMsg = finalMessages.find((m) => m.role === 'user');
if (firstUserMsg) {
const labelText = getMessageText(firstUserMsg.content).trim();
if (labelText) {
const truncated = labelText.length > 50 ? `${labelText.slice(0, 50)}` : labelText;
set((s) => ({
sessionLabels: { ...s.sessionLabels, [currentSessionKey]: truncated },
}));
}
}
}
// Record last activity time from the last message in history
const lastMsg = finalMessages[finalMessages.length - 1];
if (lastMsg?.timestamp) {
const lastAt = toMs(lastMsg.timestamp);
set((s) => ({
sessionLastActivity: { ...s.sessionLastActivity, [currentSessionKey]: lastAt },
}));
}
// Async: load missing image previews from disk (updates in background)
loadMissingPreviews(finalMessages).then((updated) => {
if (updated) {
// Create new object references so React.memo detects changes.
// loadMissingPreviews mutates AttachedFileMeta in place, so we
// must produce fresh message + file references for each affected msg.
set({
messages: finalMessages.map(msg =>
msg._attachedFiles
? { ...msg, _attachedFiles: msg._attachedFiles.map(f => ({ ...f })) }
: msg
),
});
}
});
const { pendingFinal, lastUserMessageAt, sending: isSendingNow } = get();
// If we're sending but haven't received streaming events, check
// whether the loaded history reveals intermediate tool-call activity.
// This surfaces progress via the pendingFinal → ActivityIndicator path.
const userMsTs = lastUserMessageAt ? toMs(lastUserMessageAt) : 0;
const isAfterUserMsg = (msg: RawMessage): boolean => {
if (!userMsTs || !msg.timestamp) return true;
return toMs(msg.timestamp) >= userMsTs;
};
if (isSendingNow && !pendingFinal) {
const hasRecentAssistantActivity = [...filteredMessages].reverse().some((msg) => {
if (msg.role !== 'assistant') return false;
return isAfterUserMsg(msg);
});
if (hasRecentAssistantActivity) {
set({ pendingFinal: true });
}
}
// If pendingFinal, check whether the AI produced a final text response.
if (pendingFinal || get().pendingFinal) {
const recentAssistant = [...filteredMessages].reverse().find((msg) => {
if (msg.role !== 'assistant') return false;
if (!hasNonToolAssistantContent(msg)) return false;
return isAfterUserMsg(msg);
});
if (recentAssistant) {
clearHistoryPoll();
set({ sending: false, activeRunId: null, pendingFinal: false });
}
}
};
try {
const data = await useGatewayStore.getState().rpc<Record<string, unknown>>(
'chat.history',
{ sessionKey: currentSessionKey, limit: 200 },
);
if (data) {
let rawMessages = Array.isArray(data.messages) ? data.messages as RawMessage[] : [];
const thinkingLevel = data.thinkingLevel ? String(data.thinkingLevel) : null;
if (rawMessages.length === 0 && isCronSessionKey(currentSessionKey)) {
rawMessages = await loadCronFallbackMessages(currentSessionKey, 200);
}
applyLoadedMessages(rawMessages, thinkingLevel);
} else {
const fallbackMessages = await loadCronFallbackMessages(currentSessionKey, 200);
if (fallbackMessages.length > 0) {
applyLoadedMessages(fallbackMessages, null);
} else {
set({ messages: [], loading: false });
}
}
} catch (err) {
console.warn('Failed to load chat history:', err);
const fallbackMessages = await loadCronFallbackMessages(currentSessionKey, 200);
if (fallbackMessages.length > 0) {
applyLoadedMessages(fallbackMessages, null);
} else {
set({ messages: [], loading: false });
}
}
})();
_historyLoadInFlight.set(currentSessionKey, loadPromise);
try {
await loadPromise;
} finally {
// 正常完成时清除安全定时器
if (loadingSafetyTimer) clearTimeout(loadingSafetyTimer);
if (!loadingTimedOut) {
// Only update load time if we actually didn't time out
_lastHistoryLoadAtBySession.set(currentSessionKey, Date.now());
}
const active = _historyLoadInFlight.get(currentSessionKey);
if (active === loadPromise) {
_historyLoadInFlight.delete(currentSessionKey);
}
}
},
// ── Send message ──
sendMessage: async (
text: string,
attachments?: Array<{ fileName: string; mimeType: string; fileSize: number; stagedPath: string; preview: string | null }>,
targetAgentId?: string | null,
) => {
const trimmed = text.trim();
if (!trimmed && (!attachments || attachments.length === 0)) return;
const targetSessionKey = resolveMainSessionKeyForAgent(targetAgentId) ?? get().currentSessionKey;
if (targetSessionKey !== get().currentSessionKey) {
set((s) => buildSessionSwitchPatch(s, targetSessionKey));
await get().loadHistory(true);
}
const currentSessionKey = targetSessionKey;
// Add user message optimistically (with local file metadata for UI display)
const nowMs = Date.now();
const userMsg: RawMessage = {
role: 'user',
content: trimmed || (attachments?.length ? '(file attached)' : ''),
timestamp: nowMs / 1000,
id: crypto.randomUUID(),
_attachedFiles: attachments?.map(a => ({
fileName: a.fileName,
mimeType: a.mimeType,
fileSize: a.fileSize,
preview: a.preview,
filePath: a.stagedPath,
})),
};
set((s) => ({
messages: [...s.messages, userMsg],
sending: true,
error: null,
streamingText: '',
streamingMessage: null,
streamingTools: [],
pendingFinal: false,
lastUserMessageAt: nowMs,
}));
// Update session label with first user message text as soon as it's sent
const { sessionLabels, messages } = get();
const isFirstMessage = !messages.slice(0, -1).some((m) => m.role === 'user');
if (!currentSessionKey.endsWith(':main') && isFirstMessage && !sessionLabels[currentSessionKey] && trimmed) {
const truncated = trimmed.length > 50 ? `${trimmed.slice(0, 50)}` : trimmed;
set((s) => ({ sessionLabels: { ...s.sessionLabels, [currentSessionKey]: truncated } }));
}
// Mark this session as most recently active
set((s) => ({ sessionLastActivity: { ...s.sessionLastActivity, [currentSessionKey]: nowMs } }));
// Start the history poll and safety timeout IMMEDIATELY (before the
// RPC await) because the gateway's chat.send RPC may block until the
// entire agentic conversation finishes — the poll must run in parallel.
_lastChatEventAt = Date.now();
clearHistoryPoll();
clearErrorRecoveryTimer();
const POLL_START_DELAY = 3_000;
const POLL_INTERVAL = 4_000;
const pollHistory = () => {
const state = get();
if (!state.sending) { clearHistoryPoll(); return; }
if (state.streamingMessage) {
_historyPollTimer = setTimeout(pollHistory, POLL_INTERVAL);
return;
}
if (Date.now() - _lastChatEventAt < HISTORY_POLL_SILENCE_WINDOW_MS) {
_historyPollTimer = setTimeout(pollHistory, POLL_INTERVAL);
return;
}
state.loadHistory(true);
_historyPollTimer = setTimeout(pollHistory, POLL_INTERVAL);
};
_historyPollTimer = setTimeout(pollHistory, POLL_START_DELAY);
const SAFETY_TIMEOUT_MS = 90_000;
const checkStuck = () => {
const state = get();
if (!state.sending) return;
if (state.streamingMessage || state.streamingText) return;
if (state.pendingFinal) {
setTimeout(checkStuck, 10_000);
return;
}
if (Date.now() - _lastChatEventAt < SAFETY_TIMEOUT_MS) {
setTimeout(checkStuck, 10_000);
return;
}
clearHistoryPoll();
set({
error: 'No response received from the model. The provider may be unavailable or the API key may have insufficient quota. Please check your provider settings.',
sending: false,
activeRunId: null,
lastUserMessageAt: null,
});
};
setTimeout(checkStuck, 30_000);
try {
const idempotencyKey = crypto.randomUUID();
const hasMedia = attachments && attachments.length > 0;
if (hasMedia) {
console.log('[sendMessage] Media paths:', attachments!.map(a => a.stagedPath));
}
// Cache image attachments BEFORE the IPC call to avoid race condition:
// history may reload (via Gateway event) before the RPC returns.
// Keyed by staged file path which appears in [media attached: <path> ...].
if (hasMedia && attachments) {
for (const a of attachments) {
_imageCache.set(a.stagedPath, {
fileName: a.fileName,
mimeType: a.mimeType,
fileSize: a.fileSize,
preview: a.preview,
});
}
saveImageCache(_imageCache);
}
let result: { success: boolean; result?: { runId?: string }; error?: string };
// Longer timeout for chat sends to tolerate high-latency networks (avoids connect error)
const CHAT_SEND_TIMEOUT_MS = 120_000;
if (hasMedia) {
result = await hostApiFetch<{ success: boolean; result?: { runId?: string }; error?: string }>(
'/api/chat/send-with-media',
{
method: 'POST',
body: JSON.stringify({
sessionKey: currentSessionKey,
message: trimmed || 'Process the attached file(s).',
deliver: false,
idempotencyKey,
media: attachments.map((a) => ({
filePath: a.stagedPath,
mimeType: a.mimeType,
fileName: a.fileName,
})),
}),
},
);
} else {
const rpcResult = await useGatewayStore.getState().rpc<{ runId?: string }>(
'chat.send',
{
sessionKey: currentSessionKey,
message: trimmed,
deliver: false,
idempotencyKey,
},
CHAT_SEND_TIMEOUT_MS,
);
result = { success: true, result: rpcResult };
}
console.log(`[sendMessage] RPC result: success=${result.success}, runId=${result.result?.runId || 'none'}`);
if (!result.success) {
clearHistoryPoll();
set({ error: result.error || 'Failed to send message', sending: false });
} else if (result.result?.runId) {
set({ activeRunId: result.result.runId });
}
} catch (err) {
clearHistoryPoll();
set({ error: String(err), sending: false });
}
},
// ── Abort active run ──
abortRun: async () => {
clearHistoryPoll();
clearErrorRecoveryTimer();
const { currentSessionKey } = get();
set({ sending: false, streamingText: '', streamingMessage: null, pendingFinal: false, lastUserMessageAt: null, pendingToolImages: [] });
set({ streamingTools: [] });
try {
await useGatewayStore.getState().rpc(
'chat.abort',
{ sessionKey: currentSessionKey },
);
} catch (err) {
set({ error: String(err) });
}
},
// ── Handle incoming chat events from Gateway ──
handleChatEvent: (event: Record<string, unknown>) => {
const runId = String(event.runId || '');
const eventState = String(event.state || '');
const eventSessionKey = event.sessionKey != null ? String(event.sessionKey) : null;
const { activeRunId, currentSessionKey } = get();
// Only process events for the current session (when sessionKey is present)
if (eventSessionKey != null && eventSessionKey !== currentSessionKey) return;
// Only process events for the active run (or if no active run set)
if (activeRunId && runId && runId !== activeRunId) return;
if (isDuplicateChatEvent(eventState, event)) return;
_lastChatEventAt = Date.now();
// Defensive: if state is missing but we have a message, try to infer state.
let resolvedState = eventState;
if (!resolvedState && event.message && typeof event.message === 'object') {
const msg = event.message as Record<string, unknown>;
const stopReason = msg.stopReason ?? msg.stop_reason;
if (stopReason) {
resolvedState = 'final';
} else if (msg.role || msg.content) {
resolvedState = 'delta';
}
}
// Only pause the history poll when we receive actual streaming data.
// The gateway sends "agent" events with { phase, startedAt } that carry
// no message — these must NOT kill the poll, since the poll is our only
// way to track progress when the gateway doesn't stream intermediate turns.
const hasUsefulData = resolvedState === 'delta' || resolvedState === 'final'
|| resolvedState === 'error' || resolvedState === 'aborted';
if (hasUsefulData) {
clearHistoryPoll();
// Adopt run started from another client (e.g. console at 127.0.0.1:18789):
// show loading/streaming in the app when this session has an active run.
const { sending } = get();
if (!sending && runId) {
set({ sending: true, activeRunId: runId, error: null });
}
}
switch (resolvedState) {
case 'started': {
// Run just started (e.g. from console); show loading immediately.
const { sending: currentSending } = get();
if (!currentSending && runId) {
set({ sending: true, activeRunId: runId, error: null });
}
break;
}
case 'delta': {
// If we're receiving new deltas, the Gateway has recovered from any
// prior error — cancel the error finalization timer and clear the
// stale error banner so the user sees the live stream again.
if (_errorRecoveryTimer) {
clearErrorRecoveryTimer();
set({ error: null });
}
const updates = collectToolUpdates(event.message, resolvedState);
set((s) => ({
streamingMessage: (() => {
if (event.message && typeof event.message === 'object') {
const msgRole = (event.message as RawMessage).role;
if (isToolResultRole(msgRole)) return s.streamingMessage;
}
return event.message ?? s.streamingMessage;
})(),
streamingTools: updates.length > 0 ? upsertToolStatuses(s.streamingTools, updates) : s.streamingTools,
}));
break;
}
case 'final': {
clearErrorRecoveryTimer();
if (get().error) set({ error: null });
// Message complete - add to history and clear streaming
const finalMsg = event.message as RawMessage | undefined;
if (finalMsg) {
const updates = collectToolUpdates(finalMsg, resolvedState);
if (isToolResultRole(finalMsg.role)) {
// Resolve file path from the streaming assistant message's matching tool call
const currentStreamForPath = get().streamingMessage as RawMessage | null;
const matchedPath = (currentStreamForPath && finalMsg.toolCallId)
? getToolCallFilePath(currentStreamForPath, finalMsg.toolCallId)
: undefined;
// Mirror enrichWithToolResultFiles: collect images + file refs for next assistant msg
const toolFiles: AttachedFileMeta[] = [
...extractImagesAsAttachedFiles(finalMsg.content),
];
if (matchedPath) {
for (const f of toolFiles) {
if (!f.filePath) {
f.filePath = matchedPath;
f.fileName = matchedPath.split(/[\\/]/).pop() || 'image';
}
}
}
const text = getMessageText(finalMsg.content);
if (text) {
const mediaRefs = extractMediaRefs(text);
const mediaRefPaths = new Set(mediaRefs.map(r => r.filePath));
for (const ref of mediaRefs) toolFiles.push(makeAttachedFile(ref));
for (const ref of extractRawFilePaths(text)) {
if (!mediaRefPaths.has(ref.filePath)) toolFiles.push(makeAttachedFile(ref));
}
}
set((s) => {
// Snapshot the current streaming assistant message (thinking + tool_use) into
// messages[] before clearing it. The Gateway does NOT send separate 'final'
// events for intermediate tool-use turns — it only sends deltas and then the
// tool result. Without snapshotting here, the intermediate thinking+tool steps
// would be overwritten by the next turn's deltas and never appear in the UI.
const currentStream = s.streamingMessage as RawMessage | null;
const snapshotMsgs: RawMessage[] = [];
if (currentStream) {
const streamRole = currentStream.role;
if (streamRole === 'assistant' || streamRole === undefined) {
// Use message's own id if available, otherwise derive a stable one from runId
const snapId = currentStream.id
|| `${runId || 'run'}-turn-${s.messages.length}`;
if (!s.messages.some(m => m.id === snapId)) {
snapshotMsgs.push({
...(currentStream as RawMessage),
role: 'assistant',
id: snapId,
});
}
}
}
return {
messages: snapshotMsgs.length > 0 ? [...s.messages, ...snapshotMsgs] : s.messages,
streamingText: '',
streamingMessage: null,
pendingFinal: true,
pendingToolImages: toolFiles.length > 0
? [...s.pendingToolImages, ...toolFiles]
: s.pendingToolImages,
streamingTools: updates.length > 0 ? upsertToolStatuses(s.streamingTools, updates) : s.streamingTools,
};
});
break;
}
const toolOnly = isToolOnlyMessage(finalMsg);
const hasOutput = hasNonToolAssistantContent(finalMsg);
const msgId = finalMsg.id || (toolOnly ? `run-${runId}-tool-${Date.now()}` : `run-${runId}`);
set((s) => {
const nextTools = updates.length > 0 ? upsertToolStatuses(s.streamingTools, updates) : s.streamingTools;
const streamingTools = hasOutput ? [] : nextTools;
// Attach any images collected from preceding tool results
const pendingImgs = s.pendingToolImages;
const msgWithImages: RawMessage = pendingImgs.length > 0
? {
...finalMsg,
role: (finalMsg.role || 'assistant') as RawMessage['role'],
id: msgId,
_attachedFiles: [...(finalMsg._attachedFiles || []), ...pendingImgs],
}
: { ...finalMsg, role: (finalMsg.role || 'assistant') as RawMessage['role'], id: msgId };
const clearPendingImages = { pendingToolImages: [] as AttachedFileMeta[] };
// Check if message already exists (prevent duplicates)
const alreadyExists = s.messages.some(m => m.id === msgId);
if (alreadyExists) {
return toolOnly ? {
streamingText: '',
streamingMessage: null,
pendingFinal: true,
streamingTools,
...clearPendingImages,
} : {
streamingText: '',
streamingMessage: null,
sending: hasOutput ? false : s.sending,
activeRunId: hasOutput ? null : s.activeRunId,
pendingFinal: hasOutput ? false : true,
streamingTools,
...clearPendingImages,
};
}
return toolOnly ? {
messages: [...s.messages, msgWithImages],
streamingText: '',
streamingMessage: null,
pendingFinal: true,
streamingTools,
...clearPendingImages,
} : {
messages: [...s.messages, msgWithImages],
streamingText: '',
streamingMessage: null,
sending: hasOutput ? false : s.sending,
activeRunId: hasOutput ? null : s.activeRunId,
pendingFinal: hasOutput ? false : true,
streamingTools,
...clearPendingImages,
};
});
// After the final response, quietly reload history to surface all intermediate
// tool-use turns (thinking + tool blocks) from the Gateway's authoritative record.
if (hasOutput && !toolOnly) {
clearHistoryPoll();
void get().loadHistory(true);
}
} else {
// No message in final event - reload history to get complete data
set({ streamingText: '', streamingMessage: null, pendingFinal: true });
get().loadHistory();
}
break;
}
case 'error': {
const errorMsg = String(event.errorMessage || 'An error occurred');
const wasSending = get().sending;
// Snapshot the current streaming message into messages[] so partial
// content ("Let me get that written down...") is preserved in the UI
// rather than being silently discarded.
const currentStream = get().streamingMessage as RawMessage | null;
if (currentStream && (currentStream.role === 'assistant' || currentStream.role === undefined)) {
const snapId = (currentStream as RawMessage).id
|| `error-snap-${Date.now()}`;
const alreadyExists = get().messages.some(m => m.id === snapId);
if (!alreadyExists) {
set((s) => ({
messages: [...s.messages, { ...currentStream, role: 'assistant' as const, id: snapId }],
}));
}
}
set({
error: errorMsg,
streamingText: '',
streamingMessage: null,
streamingTools: [],
pendingFinal: false,
pendingToolImages: [],
});
// Don't immediately give up: the Gateway often retries internally
// after transient API failures (e.g. "terminated"). Keep `sending`
// true for a grace period so that recovery events are processed and
// the agent-phase-completion handler can still trigger loadHistory.
if (wasSending) {
clearErrorRecoveryTimer();
const ERROR_RECOVERY_GRACE_MS = 15_000;
_errorRecoveryTimer = setTimeout(() => {
_errorRecoveryTimer = null;
const state = get();
if (state.sending && !state.streamingMessage) {
clearHistoryPoll();
// Grace period expired with no recovery — finalize the error
set({
sending: false,
activeRunId: null,
lastUserMessageAt: null,
});
// One final history reload in case the Gateway completed in the
// background and we just missed the event.
state.loadHistory(true);
}
}, ERROR_RECOVERY_GRACE_MS);
} else {
clearHistoryPoll();
set({ sending: false, activeRunId: null, lastUserMessageAt: null });
}
break;
}
case 'aborted': {
clearHistoryPoll();
clearErrorRecoveryTimer();
set({
sending: false,
activeRunId: null,
streamingText: '',
streamingMessage: null,
streamingTools: [],
pendingFinal: false,
lastUserMessageAt: null,
pendingToolImages: [],
});
break;
}
default: {
// Unknown or empty state — if we're currently sending and receive an event
// with a message, attempt to process it as streaming data. This handles
// edge cases where the Gateway sends events without a state field.
const { sending } = get();
if (sending && event.message && typeof event.message === 'object') {
console.warn(`[handleChatEvent] Unknown event state "${resolvedState}", treating message as streaming delta. Event keys:`, Object.keys(event));
const updates = collectToolUpdates(event.message, 'delta');
set((s) => ({
streamingMessage: event.message ?? s.streamingMessage,
streamingTools: updates.length > 0 ? upsertToolStatuses(s.streamingTools, updates) : s.streamingTools,
}));
}
break;
}
}
},
// ── Toggle thinking visibility ──
toggleThinking: () => set((s) => ({ showThinking: !s.showThinking })),
// ── Refresh: reload history + sessions ──
refresh: async () => {
const { loadHistory, loadSessions } = get();
await Promise.all([loadHistory(), loadSessions()]);
},
clearError: () => set({ error: null }),
}));