From 38391dd093288688d47f45bbc7a91f102837c2d1 Mon Sep 17 00:00:00 2001 From: Haze <709547807@qq.com> Date: Thu, 12 Mar 2026 11:20:56 +0800 Subject: [PATCH] feat(cron): implement cron session management and logging features, including session key parsing and fallback message handling (#429) --- electron/api/routes/cron.ts | 280 ++++++++++++++++++++++++ src/stores/chat.ts | 270 ++++++++++++++--------- src/stores/chat/cron-session-utils.ts | 37 ++++ src/stores/chat/history-actions.ts | 243 +++++++++++--------- src/stores/chat/session-actions.ts | 28 ++- src/stores/chat/types.ts | 1 + src/stores/gateway.ts | 31 ++- tests/unit/chat-history-actions.test.ts | 131 +++++++++++ tests/unit/chat-session-actions.test.ts | 35 ++- 9 files changed, 836 insertions(+), 220 deletions(-) create mode 100644 src/stores/chat/cron-session-utils.ts create mode 100644 tests/unit/chat-history-actions.test.ts diff --git a/electron/api/routes/cron.ts b/electron/api/routes/cron.ts index 0ab658a84..29c708f1e 100644 --- a/electron/api/routes/cron.ts +++ b/electron/api/routes/cron.ts @@ -1,6 +1,9 @@ +import { readFile } from 'node:fs/promises'; import type { IncomingMessage, ServerResponse } from 'http'; +import { join } from 'node:path'; import type { HostApiContext } from '../context'; import { parseJsonBody, sendJson } from '../route-utils'; +import { getOpenClawConfigDir } from '../../utils/paths'; interface GatewayCronJob { id: string; @@ -15,6 +18,7 @@ interface GatewayCronJob { sessionTarget?: string; state: { nextRunAtMs?: number; + runningAtMs?: number; lastRunAtMs?: number; lastStatus?: string; lastError?: string; @@ -22,6 +26,241 @@ interface GatewayCronJob { }; } +interface CronRunLogEntry { + jobId?: string; + action?: string; + status?: string; + error?: string; + summary?: string; + sessionId?: string; + sessionKey?: string; + ts?: number; + runAtMs?: number; + durationMs?: number; + model?: string; + provider?: string; +} + +interface CronSessionKeyParts { + agentId: string; + jobId: string; + runSessionId?: string; +} + +interface CronSessionFallbackMessage { + id: string; + role: 'assistant' | 'system'; + content: string; + timestamp: number; + isError?: boolean; +} + +function parseCronSessionKey(sessionKey: string): CronSessionKeyParts | null { + if (!sessionKey.startsWith('agent:')) return null; + const parts = sessionKey.split(':'); + if (parts.length < 4 || parts[2] !== 'cron') return null; + + const agentId = parts[1] || 'main'; + const jobId = parts[3]; + if (!jobId) return null; + + if (parts.length === 4) { + return { agentId, jobId }; + } + + if (parts.length === 6 && parts[4] === 'run' && parts[5]) { + return { agentId, jobId, runSessionId: parts[5] }; + } + + return null; +} + +function normalizeTimestampMs(value: unknown): number | undefined { + if (typeof value === 'number' && Number.isFinite(value)) { + return value < 1e12 ? value * 1000 : value; + } + if (typeof value === 'string' && value.trim()) { + const parsed = Date.parse(value); + if (Number.isFinite(parsed)) { + return parsed; + } + } + return undefined; +} + +function formatDuration(durationMs: number | undefined): string | null { + if (!durationMs || !Number.isFinite(durationMs)) return null; + if (durationMs < 1000) return `${Math.round(durationMs)}ms`; + if (durationMs < 10_000) return `${(durationMs / 1000).toFixed(1)}s`; + return `${Math.round(durationMs / 1000)}s`; +} + +function buildCronRunMessage(entry: CronRunLogEntry, index: number): CronSessionFallbackMessage | null { + const timestamp = normalizeTimestampMs(entry.ts) ?? normalizeTimestampMs(entry.runAtMs); + if (!timestamp) return null; + + const status = typeof entry.status === 'string' ? entry.status.toLowerCase() : ''; + const summary = typeof entry.summary === 'string' ? entry.summary.trim() : ''; + const error = typeof entry.error === 'string' ? entry.error.trim() : ''; + let content = summary || error; + + if (!content) { + content = status === 'error' + ? 'Scheduled task failed.' + : 'Scheduled task completed.'; + } + + if (status === 'error' && !content.toLowerCase().startsWith('run failed:')) { + content = `Run failed: ${content}`; + } + + const meta: string[] = []; + const duration = formatDuration(entry.durationMs); + if (duration) meta.push(`Duration: ${duration}`); + if (entry.provider && entry.model) { + meta.push(`Model: ${entry.provider}/${entry.model}`); + } else if (entry.model) { + meta.push(`Model: ${entry.model}`); + } + if (meta.length > 0) { + content = `${content}\n\n${meta.join(' | ')}`; + } + + return { + id: `cron-run-${entry.sessionId ?? entry.ts ?? index}`, + role: status === 'error' ? 'system' : 'assistant', + content, + timestamp, + ...(status === 'error' ? { isError: true } : {}), + }; +} + +async function readCronRunLog(jobId: string): Promise { + const logPath = join(getOpenClawConfigDir(), 'cron', 'runs', `${jobId}.jsonl`); + const raw = await readFile(logPath, 'utf8').catch(() => ''); + if (!raw.trim()) return []; + + const entries: CronRunLogEntry[] = []; + for (const line of raw.split(/\r?\n/)) { + const trimmed = line.trim(); + if (!trimmed) continue; + try { + const entry = JSON.parse(trimmed) as CronRunLogEntry; + if (!entry || entry.jobId !== jobId) continue; + if (entry.action && entry.action !== 'finished') continue; + entries.push(entry); + } catch { + // Ignore malformed log lines so one bad entry does not hide the rest. + } + } + return entries; +} + +async function readSessionStoreEntry( + agentId: string, + sessionKey: string, +): Promise | undefined> { + const storePath = join(getOpenClawConfigDir(), 'agents', agentId, 'sessions', 'sessions.json'); + const raw = await readFile(storePath, 'utf8').catch(() => ''); + if (!raw.trim()) return undefined; + + try { + const store = JSON.parse(raw) as Record; + const directEntry = store[sessionKey]; + if (directEntry && typeof directEntry === 'object') { + return directEntry as Record; + } + + const sessions = (store as { sessions?: unknown }).sessions; + if (Array.isArray(sessions)) { + const arrayEntry = sessions.find((entry) => { + if (!entry || typeof entry !== 'object') return false; + const record = entry as Record; + return record.key === sessionKey || record.sessionKey === sessionKey; + }); + if (arrayEntry && typeof arrayEntry === 'object') { + return arrayEntry as Record; + } + } + } catch { + return undefined; + } + + return undefined; +} + +export function buildCronSessionFallbackMessages(params: { + sessionKey: string; + job?: Pick; + runs: CronRunLogEntry[]; + sessionEntry?: { label?: string; updatedAt?: number }; + limit?: number; +}): CronSessionFallbackMessage[] { + const parsed = parseCronSessionKey(params.sessionKey); + if (!parsed) return []; + + const matchingRuns = params.runs + .filter((entry) => { + if (!parsed.runSessionId) return true; + return entry.sessionId === parsed.runSessionId + || entry.sessionKey === `${params.sessionKey}`; + }) + .sort((a, b) => { + const left = normalizeTimestampMs(a.ts) ?? normalizeTimestampMs(a.runAtMs) ?? 0; + const right = normalizeTimestampMs(b.ts) ?? normalizeTimestampMs(b.runAtMs) ?? 0; + return left - right; + }); + + const messages: CronSessionFallbackMessage[] = []; + const prompt = params.job?.payload?.message || params.job?.payload?.text || ''; + const taskName = params.job?.name?.trim() + || params.sessionEntry?.label?.replace(/^Cron:\s*/, '').trim() + || ''; + const firstRelevantTimestamp = matchingRuns.length > 0 + ? (normalizeTimestampMs(matchingRuns[0]?.runAtMs) ?? normalizeTimestampMs(matchingRuns[0]?.ts)) + : (normalizeTimestampMs(params.job?.state?.runningAtMs) ?? params.sessionEntry?.updatedAt); + + if (taskName || prompt) { + const lines = [taskName ? `Scheduled task: ${taskName}` : 'Scheduled task']; + if (prompt) lines.push(`Prompt: ${prompt}`); + messages.push({ + id: `cron-meta-${parsed.jobId}`, + role: 'system', + content: lines.join('\n'), + timestamp: Math.max(0, (firstRelevantTimestamp ?? Date.now()) - 1), + }); + } + + matchingRuns.forEach((entry, index) => { + const message = buildCronRunMessage(entry, index); + if (message) messages.push(message); + }); + + if (matchingRuns.length === 0) { + const runningAt = normalizeTimestampMs(params.job?.state?.runningAtMs); + if (runningAt) { + messages.push({ + id: `cron-running-${parsed.jobId}`, + role: 'system', + content: 'This scheduled task is still running in OpenClaw, but no chat transcript is available yet.', + timestamp: runningAt, + }); + } else if (messages.length === 0) { + messages.push({ + id: `cron-empty-${parsed.jobId}`, + role: 'system', + content: 'No chat transcript is available for this scheduled task yet.', + timestamp: params.sessionEntry?.updatedAt ?? Date.now(), + }); + } + } + + const limit = typeof params.limit === 'number' && Number.isFinite(params.limit) + ? Math.max(1, Math.floor(params.limit)) + : messages.length; + return messages.slice(-limit); +} + function transformCronJob(job: GatewayCronJob) { const message = job.payload?.message || job.payload?.text || ''; const channelType = job.delivery?.channel; @@ -60,6 +299,47 @@ export async function handleCronRoutes( url: URL, ctx: HostApiContext, ): Promise { + if (url.pathname === '/api/cron/session-history' && req.method === 'GET') { + const sessionKey = url.searchParams.get('sessionKey')?.trim() || ''; + const parsedSession = parseCronSessionKey(sessionKey); + if (!parsedSession) { + sendJson(res, 400, { success: false, error: `Invalid cron sessionKey: ${sessionKey}` }); + return true; + } + + const rawLimit = Number(url.searchParams.get('limit') || '200'); + const limit = Number.isFinite(rawLimit) + ? Math.min(Math.max(Math.floor(rawLimit), 1), 200) + : 200; + + try { + const [jobsResult, runs, sessionEntry] = await Promise.all([ + ctx.gatewayManager.rpc('cron.list', { includeDisabled: true }) + .catch(() => ({ jobs: [] as GatewayCronJob[] })), + readCronRunLog(parsedSession.jobId), + readSessionStoreEntry(parsedSession.agentId, sessionKey), + ]); + + const jobs = (jobsResult as { jobs?: GatewayCronJob[] }).jobs ?? []; + const job = jobs.find((item) => item.id === parsedSession.jobId); + const messages = buildCronSessionFallbackMessages({ + sessionKey, + job, + runs, + sessionEntry: sessionEntry ? { + label: typeof sessionEntry.label === 'string' ? sessionEntry.label : undefined, + updatedAt: normalizeTimestampMs(sessionEntry.updatedAt), + } : undefined, + limit, + }); + + sendJson(res, 200, { messages }); + } catch (error) { + sendJson(res, 500, { success: false, error: String(error) }); + } + return true; + } + if (url.pathname === '/api/cron/jobs' && req.method === 'GET') { try { const result = await ctx.gatewayManager.rpc('cron.list', { includeDisabled: true }); diff --git a/src/stores/chat.ts b/src/stores/chat.ts index 9b8ff605c..9f61d97e9 100644 --- a/src/stores/chat.ts +++ b/src/stores/chat.ts @@ -7,6 +7,7 @@ import { create } from 'zustand'; import { hostApiFetch } from '@/lib/host-api'; import { useGatewayStore } from './gateway'; import { useAgentsStore } from './agents'; +import { buildCronSessionHistoryPath, isCronSessionKey } from './chat/cron-session-utils'; // ── Types ──────────────────────────────────────────────────────── @@ -56,6 +57,7 @@ export interface ChatSession { displayName?: string; thinkingLevel?: string; model?: string; + updatedAt?: number; } export interface ToolStatus { @@ -669,6 +671,32 @@ function getAgentIdFromSessionKey(sessionKey: string): string { return parts[1] || 'main'; } +function parseSessionUpdatedAtMs(value: unknown): number | undefined { + if (typeof value === 'number' && Number.isFinite(value)) { + return toMs(value); + } + if (typeof value === 'string' && value.trim()) { + const parsed = Date.parse(value); + if (Number.isFinite(parsed)) { + return parsed; + } + } + return undefined; +} + +async function loadCronFallbackMessages(sessionKey: string, limit = 200): Promise { + if (!isCronSessionKey(sessionKey)) return []; + try { + const response = await hostApiFetch<{ messages?: RawMessage[] }>( + buildCronSessionHistoryPath(sessionKey, limit), + ); + return Array.isArray(response.messages) ? response.messages : []; + } catch (error) { + console.warn('Failed to load cron fallback history:', error); + return []; + } +} + function normalizeAgentId(value: string | undefined | null): string { return (value ?? '').trim().toLowerCase() || 'main'; } @@ -1022,6 +1050,7 @@ export const useChatStore = create((set, get) => ({ displayName: s.displayName ? String(s.displayName) : undefined, thinkingLevel: s.thinkingLevel ? String(s.thinkingLevel) : undefined, model: s.model ? String(s.model) : undefined, + updatedAt: parseSessionUpdatedAtMs(s.updatedAt), })).filter((s: ChatSession) => s.key); const canonicalBySuffix = new Map(); @@ -1068,11 +1097,21 @@ export const useChatStore = create((set, get) => ({ ] : dedupedSessions; - set({ + const discoveredActivity = Object.fromEntries( + sessionsWithCurrent + .filter((session) => typeof session.updatedAt === 'number' && Number.isFinite(session.updatedAt)) + .map((session) => [session.key, session.updatedAt!]), + ); + + set((state) => ({ sessions: sessionsWithCurrent, currentSessionKey: nextSessionKey, currentAgentId: getAgentIdFromSessionKey(nextSessionKey), - }); + sessionLastActivity: { + ...state.sessionLastActivity, + ...discoveredActivity, + }, + })); if (currentSessionKey !== nextSessionKey) { get().loadHistory(); @@ -1251,124 +1290,141 @@ export const useChatStore = create((set, get) => ({ const { currentSessionKey } = get(); if (!quiet) set({ loading: true, error: null }); + const applyLoadedMessages = (rawMessages: RawMessage[], thinkingLevel: string | null) => { + // Before filtering: attach images/files from tool_result messages to the next assistant message + const messagesWithToolImages = enrichWithToolResultFiles(rawMessages); + const filteredMessages = messagesWithToolImages.filter((msg) => !isToolResultRole(msg.role)); + // Restore file attachments for user/assistant messages (from cache + text patterns) + const enrichedMessages = enrichWithCachedImages(filteredMessages); + + // Preserve the optimistic user message during an active send. + // The Gateway may not include the user's message in chat.history + // until the run completes, causing it to flash out of the UI. + let finalMessages = enrichedMessages; + const userMsgAt = get().lastUserMessageAt; + if (get().sending && userMsgAt) { + const userMsMs = toMs(userMsgAt); + const hasRecentUser = enrichedMessages.some( + (m) => m.role === 'user' && m.timestamp && Math.abs(toMs(m.timestamp) - userMsMs) < 5000, + ); + if (!hasRecentUser) { + const currentMsgs = get().messages; + const optimistic = [...currentMsgs].reverse().find( + (m) => m.role === 'user' && m.timestamp && Math.abs(toMs(m.timestamp) - userMsMs) < 5000, + ); + if (optimistic) { + finalMessages = [...enrichedMessages, optimistic]; + } + } + } + + set({ messages: finalMessages, thinkingLevel, loading: false }); + + // Extract first user message text as a session label for display in the toolbar. + // Skip main sessions (key ends with ":main") — they rely on the Gateway-provided + // displayName (e.g. the configured agent name "ClawX") instead. + const isMainSession = currentSessionKey.endsWith(':main'); + if (!isMainSession) { + const firstUserMsg = finalMessages.find((m) => m.role === 'user'); + if (firstUserMsg) { + const labelText = getMessageText(firstUserMsg.content).trim(); + if (labelText) { + const truncated = labelText.length > 50 ? `${labelText.slice(0, 50)}…` : labelText; + set((s) => ({ + sessionLabels: { ...s.sessionLabels, [currentSessionKey]: truncated }, + })); + } + } + } + + // Record last activity time from the last message in history + const lastMsg = finalMessages[finalMessages.length - 1]; + if (lastMsg?.timestamp) { + const lastAt = toMs(lastMsg.timestamp); + set((s) => ({ + sessionLastActivity: { ...s.sessionLastActivity, [currentSessionKey]: lastAt }, + })); + } + + // Async: load missing image previews from disk (updates in background) + loadMissingPreviews(finalMessages).then((updated) => { + if (updated) { + // Create new object references so React.memo detects changes. + // loadMissingPreviews mutates AttachedFileMeta in place, so we + // must produce fresh message + file references for each affected msg. + set({ + messages: finalMessages.map(msg => + msg._attachedFiles + ? { ...msg, _attachedFiles: msg._attachedFiles.map(f => ({ ...f })) } + : msg + ), + }); + } + }); + const { pendingFinal, lastUserMessageAt, sending: isSendingNow } = get(); + + // If we're sending but haven't received streaming events, check + // whether the loaded history reveals intermediate tool-call activity. + // This surfaces progress via the pendingFinal → ActivityIndicator path. + const userMsTs = lastUserMessageAt ? toMs(lastUserMessageAt) : 0; + const isAfterUserMsg = (msg: RawMessage): boolean => { + if (!userMsTs || !msg.timestamp) return true; + return toMs(msg.timestamp) >= userMsTs; + }; + + if (isSendingNow && !pendingFinal) { + const hasRecentAssistantActivity = [...filteredMessages].reverse().some((msg) => { + if (msg.role !== 'assistant') return false; + return isAfterUserMsg(msg); + }); + if (hasRecentAssistantActivity) { + set({ pendingFinal: true }); + } + } + + // If pendingFinal, check whether the AI produced a final text response. + if (pendingFinal || get().pendingFinal) { + const recentAssistant = [...filteredMessages].reverse().find((msg) => { + if (msg.role !== 'assistant') return false; + if (!hasNonToolAssistantContent(msg)) return false; + return isAfterUserMsg(msg); + }); + if (recentAssistant) { + clearHistoryPoll(); + set({ sending: false, activeRunId: null, pendingFinal: false }); + } + } + }; + try { const data = await useGatewayStore.getState().rpc>( 'chat.history', { sessionKey: currentSessionKey, limit: 200 }, ); if (data) { - const rawMessages = Array.isArray(data.messages) ? data.messages as RawMessage[] : []; - - // Before filtering: attach images/files from tool_result messages to the next assistant message - const messagesWithToolImages = enrichWithToolResultFiles(rawMessages); - const filteredMessages = messagesWithToolImages.filter((msg) => !isToolResultRole(msg.role)); - // Restore file attachments for user/assistant messages (from cache + text patterns) - const enrichedMessages = enrichWithCachedImages(filteredMessages); + let rawMessages = Array.isArray(data.messages) ? data.messages as RawMessage[] : []; const thinkingLevel = data.thinkingLevel ? String(data.thinkingLevel) : null; - - // Preserve the optimistic user message during an active send. - // The Gateway may not include the user's message in chat.history - // until the run completes, causing it to flash out of the UI. - let finalMessages = enrichedMessages; - const userMsgAt = get().lastUserMessageAt; - if (get().sending && userMsgAt) { - const userMsMs = toMs(userMsgAt); - const hasRecentUser = enrichedMessages.some( - (m) => m.role === 'user' && m.timestamp && Math.abs(toMs(m.timestamp) - userMsMs) < 5000, - ); - if (!hasRecentUser) { - const currentMsgs = get().messages; - const optimistic = [...currentMsgs].reverse().find( - (m) => m.role === 'user' && m.timestamp && Math.abs(toMs(m.timestamp) - userMsMs) < 5000, - ); - if (optimistic) { - finalMessages = [...enrichedMessages, optimistic]; - } - } + if (rawMessages.length === 0 && isCronSessionKey(currentSessionKey)) { + rawMessages = await loadCronFallbackMessages(currentSessionKey, 200); } - set({ messages: finalMessages, thinkingLevel, loading: false }); - - // Extract first user message text as a session label for display in the toolbar. - // Skip main sessions (key ends with ":main") — they rely on the Gateway-provided - // displayName (e.g. the configured agent name "ClawX") instead. - const isMainSession = currentSessionKey.endsWith(':main'); - if (!isMainSession) { - const firstUserMsg = finalMessages.find((m) => m.role === 'user'); - if (firstUserMsg) { - const labelText = getMessageText(firstUserMsg.content).trim(); - if (labelText) { - const truncated = labelText.length > 50 ? `${labelText.slice(0, 50)}…` : labelText; - set((s) => ({ - sessionLabels: { ...s.sessionLabels, [currentSessionKey]: truncated }, - })); - } - } - } - - // Record last activity time from the last message in history - const lastMsg = finalMessages[finalMessages.length - 1]; - if (lastMsg?.timestamp) { - const lastAt = toMs(lastMsg.timestamp); - set((s) => ({ - sessionLastActivity: { ...s.sessionLastActivity, [currentSessionKey]: lastAt }, - })); - } - - // Async: load missing image previews from disk (updates in background) - loadMissingPreviews(finalMessages).then((updated) => { - if (updated) { - // Create new object references so React.memo detects changes. - // loadMissingPreviews mutates AttachedFileMeta in place, so we - // must produce fresh message + file references for each affected msg. - set({ - messages: finalMessages.map(msg => - msg._attachedFiles - ? { ...msg, _attachedFiles: msg._attachedFiles.map(f => ({ ...f })) } - : msg - ), - }); - } - }); - const { pendingFinal, lastUserMessageAt, sending: isSendingNow } = get(); - - // If we're sending but haven't received streaming events, check - // whether the loaded history reveals intermediate tool-call activity. - // This surfaces progress via the pendingFinal → ActivityIndicator path. - const userMsTs = lastUserMessageAt ? toMs(lastUserMessageAt) : 0; - const isAfterUserMsg = (msg: RawMessage): boolean => { - if (!userMsTs || !msg.timestamp) return true; - return toMs(msg.timestamp) >= userMsTs; - }; - - if (isSendingNow && !pendingFinal) { - const hasRecentAssistantActivity = [...filteredMessages].reverse().some((msg) => { - if (msg.role !== 'assistant') return false; - return isAfterUserMsg(msg); - }); - if (hasRecentAssistantActivity) { - set({ pendingFinal: true }); - } - } - - // If pendingFinal, check whether the AI produced a final text response. - if (pendingFinal || get().pendingFinal) { - const recentAssistant = [...filteredMessages].reverse().find((msg) => { - if (msg.role !== 'assistant') return false; - if (!hasNonToolAssistantContent(msg)) return false; - return isAfterUserMsg(msg); - }); - if (recentAssistant) { - clearHistoryPoll(); - set({ sending: false, activeRunId: null, pendingFinal: false }); - } - } + applyLoadedMessages(rawMessages, thinkingLevel); } else { - set({ messages: [], loading: false }); + const fallbackMessages = await loadCronFallbackMessages(currentSessionKey, 200); + if (fallbackMessages.length > 0) { + applyLoadedMessages(fallbackMessages, null); + } else { + set({ messages: [], loading: false }); + } } } catch (err) { console.warn('Failed to load chat history:', err); - set({ messages: [], loading: false }); + const fallbackMessages = await loadCronFallbackMessages(currentSessionKey, 200); + if (fallbackMessages.length > 0) { + applyLoadedMessages(fallbackMessages, null); + } else { + set({ messages: [], loading: false }); + } } }, diff --git a/src/stores/chat/cron-session-utils.ts b/src/stores/chat/cron-session-utils.ts new file mode 100644 index 000000000..0df394c1b --- /dev/null +++ b/src/stores/chat/cron-session-utils.ts @@ -0,0 +1,37 @@ +export interface CronSessionKeyParts { + agentId: string; + jobId: string; + runSessionId?: string; +} + +export function parseCronSessionKey(sessionKey: string): CronSessionKeyParts | null { + if (!sessionKey.startsWith('agent:')) return null; + const parts = sessionKey.split(':'); + if (parts.length < 4 || parts[2] !== 'cron') return null; + + const agentId = parts[1] || 'main'; + const jobId = parts[3]; + if (!jobId) return null; + + if (parts.length === 4) { + return { agentId, jobId }; + } + + if (parts.length === 6 && parts[4] === 'run' && parts[5]) { + return { agentId, jobId, runSessionId: parts[5] }; + } + + return null; +} + +export function isCronSessionKey(sessionKey: string): boolean { + return parseCronSessionKey(sessionKey) != null; +} + +export function buildCronSessionHistoryPath(sessionKey: string, limit = 200): string { + const params = new URLSearchParams({ sessionKey }); + if (Number.isFinite(limit) && limit > 0) { + params.set('limit', String(Math.floor(limit))); + } + return `/api/cron/session-history?${params.toString()}`; +} diff --git a/src/stores/chat/history-actions.ts b/src/stores/chat/history-actions.ts index 514e2aeed..0dbeca465 100644 --- a/src/stores/chat/history-actions.ts +++ b/src/stores/chat/history-actions.ts @@ -1,4 +1,5 @@ import { invokeIpc } from '@/lib/api-client'; +import { hostApiFetch } from '@/lib/host-api'; import { clearHistoryPoll, enrichWithCachedImages, @@ -9,9 +10,23 @@ import { loadMissingPreviews, toMs, } from './helpers'; +import { buildCronSessionHistoryPath, isCronSessionKey } from './cron-session-utils'; import type { RawMessage } from './types'; import type { ChatGet, ChatSet, SessionHistoryActions } from './store-api'; +async function loadCronFallbackMessages(sessionKey: string, limit = 200): Promise { + if (!isCronSessionKey(sessionKey)) return []; + try { + const response = await hostApiFetch<{ messages?: RawMessage[] }>( + buildCronSessionHistoryPath(sessionKey, limit), + ); + return Array.isArray(response.messages) ? response.messages : []; + } catch (error) { + console.warn('Failed to load cron fallback history:', error); + return []; + } +} + export function createHistoryActions( set: ChatSet, get: ChatGet, @@ -21,6 +36,112 @@ export function createHistoryActions( const { currentSessionKey } = get(); if (!quiet) set({ loading: true, error: null }); + const applyLoadedMessages = (rawMessages: RawMessage[], thinkingLevel: string | null) => { + // Before filtering: attach images/files from tool_result messages to the next assistant message + const messagesWithToolImages = enrichWithToolResultFiles(rawMessages); + const filteredMessages = messagesWithToolImages.filter((msg) => !isToolResultRole(msg.role)); + // Restore file attachments for user/assistant messages (from cache + text patterns) + const enrichedMessages = enrichWithCachedImages(filteredMessages); + + // Preserve the optimistic user message during an active send. + // The Gateway may not include the user's message in chat.history + // until the run completes, causing it to flash out of the UI. + let finalMessages = enrichedMessages; + const userMsgAt = get().lastUserMessageAt; + if (get().sending && userMsgAt) { + const userMsMs = toMs(userMsgAt); + const hasRecentUser = enrichedMessages.some( + (m) => m.role === 'user' && m.timestamp && Math.abs(toMs(m.timestamp) - userMsMs) < 5000, + ); + if (!hasRecentUser) { + const currentMsgs = get().messages; + const optimistic = [...currentMsgs].reverse().find( + (m) => m.role === 'user' && m.timestamp && Math.abs(toMs(m.timestamp) - userMsMs) < 5000, + ); + if (optimistic) { + finalMessages = [...enrichedMessages, optimistic]; + } + } + } + + set({ messages: finalMessages, thinkingLevel, loading: false }); + + // Extract first user message text as a session label for display in the toolbar. + // Skip main sessions (key ends with ":main") — they rely on the Gateway-provided + // displayName (e.g. the configured agent name "ClawX") instead. + const isMainSession = currentSessionKey.endsWith(':main'); + if (!isMainSession) { + const firstUserMsg = finalMessages.find((m) => m.role === 'user'); + if (firstUserMsg) { + const labelText = getMessageText(firstUserMsg.content).trim(); + if (labelText) { + const truncated = labelText.length > 50 ? `${labelText.slice(0, 50)}…` : labelText; + set((s) => ({ + sessionLabels: { ...s.sessionLabels, [currentSessionKey]: truncated }, + })); + } + } + } + + // Record last activity time from the last message in history + const lastMsg = finalMessages[finalMessages.length - 1]; + if (lastMsg?.timestamp) { + const lastAt = toMs(lastMsg.timestamp); + set((s) => ({ + sessionLastActivity: { ...s.sessionLastActivity, [currentSessionKey]: lastAt }, + })); + } + + // Async: load missing image previews from disk (updates in background) + loadMissingPreviews(finalMessages).then((updated) => { + if (updated) { + // Create new object references so React.memo detects changes. + // loadMissingPreviews mutates AttachedFileMeta in place, so we + // must produce fresh message + file references for each affected msg. + set({ + messages: finalMessages.map(msg => + msg._attachedFiles + ? { ...msg, _attachedFiles: msg._attachedFiles.map(f => ({ ...f })) } + : msg + ), + }); + } + }); + const { pendingFinal, lastUserMessageAt, sending: isSendingNow } = get(); + + // If we're sending but haven't received streaming events, check + // whether the loaded history reveals intermediate tool-call activity. + // This surfaces progress via the pendingFinal → ActivityIndicator path. + const userMsTs = lastUserMessageAt ? toMs(lastUserMessageAt) : 0; + const isAfterUserMsg = (msg: RawMessage): boolean => { + if (!userMsTs || !msg.timestamp) return true; + return toMs(msg.timestamp) >= userMsTs; + }; + + if (isSendingNow && !pendingFinal) { + const hasRecentAssistantActivity = [...filteredMessages].reverse().some((msg) => { + if (msg.role !== 'assistant') return false; + return isAfterUserMsg(msg); + }); + if (hasRecentAssistantActivity) { + set({ pendingFinal: true }); + } + } + + // If pendingFinal, check whether the AI produced a final text response. + if (pendingFinal || get().pendingFinal) { + const recentAssistant = [...filteredMessages].reverse().find((msg) => { + if (msg.role !== 'assistant') return false; + if (!hasNonToolAssistantContent(msg)) return false; + return isAfterUserMsg(msg); + }); + if (recentAssistant) { + clearHistoryPoll(); + set({ sending: false, activeRunId: null, pendingFinal: false }); + } + } + }; + try { const result = await invokeIpc( 'gateway:rpc', @@ -30,118 +151,28 @@ export function createHistoryActions( if (result.success && result.result) { const data = result.result; - const rawMessages = Array.isArray(data.messages) ? data.messages as RawMessage[] : []; - - // Before filtering: attach images/files from tool_result messages to the next assistant message - const messagesWithToolImages = enrichWithToolResultFiles(rawMessages); - const filteredMessages = messagesWithToolImages.filter((msg) => !isToolResultRole(msg.role)); - // Restore file attachments for user/assistant messages (from cache + text patterns) - const enrichedMessages = enrichWithCachedImages(filteredMessages); + let rawMessages = Array.isArray(data.messages) ? data.messages as RawMessage[] : []; const thinkingLevel = data.thinkingLevel ? String(data.thinkingLevel) : null; - - // Preserve the optimistic user message during an active send. - // The Gateway may not include the user's message in chat.history - // until the run completes, causing it to flash out of the UI. - let finalMessages = enrichedMessages; - const userMsgAt = get().lastUserMessageAt; - if (get().sending && userMsgAt) { - const userMsMs = toMs(userMsgAt); - const hasRecentUser = enrichedMessages.some( - (m) => m.role === 'user' && m.timestamp && Math.abs(toMs(m.timestamp) - userMsMs) < 5000, - ); - if (!hasRecentUser) { - const currentMsgs = get().messages; - const optimistic = [...currentMsgs].reverse().find( - (m) => m.role === 'user' && m.timestamp && Math.abs(toMs(m.timestamp) - userMsMs) < 5000, - ); - if (optimistic) { - finalMessages = [...enrichedMessages, optimistic]; - } - } - } - - set({ messages: finalMessages, thinkingLevel, loading: false }); - - // Extract first user message text as a session label for display in the toolbar. - // Skip main sessions (key ends with ":main") — they rely on the Gateway-provided - // displayName (e.g. the configured agent name "ClawX") instead. - const isMainSession = currentSessionKey.endsWith(':main'); - if (!isMainSession) { - const firstUserMsg = finalMessages.find((m) => m.role === 'user'); - if (firstUserMsg) { - const labelText = getMessageText(firstUserMsg.content).trim(); - if (labelText) { - const truncated = labelText.length > 50 ? `${labelText.slice(0, 50)}…` : labelText; - set((s) => ({ - sessionLabels: { ...s.sessionLabels, [currentSessionKey]: truncated }, - })); - } - } - } - - // Record last activity time from the last message in history - const lastMsg = finalMessages[finalMessages.length - 1]; - if (lastMsg?.timestamp) { - const lastAt = toMs(lastMsg.timestamp); - set((s) => ({ - sessionLastActivity: { ...s.sessionLastActivity, [currentSessionKey]: lastAt }, - })); - } - - // Async: load missing image previews from disk (updates in background) - loadMissingPreviews(finalMessages).then((updated) => { - if (updated) { - // Create new object references so React.memo detects changes. - // loadMissingPreviews mutates AttachedFileMeta in place, so we - // must produce fresh message + file references for each affected msg. - set({ - messages: finalMessages.map(msg => - msg._attachedFiles - ? { ...msg, _attachedFiles: msg._attachedFiles.map(f => ({ ...f })) } - : msg - ), - }); - } - }); - const { pendingFinal, lastUserMessageAt, sending: isSendingNow } = get(); - - // If we're sending but haven't received streaming events, check - // whether the loaded history reveals intermediate tool-call activity. - // This surfaces progress via the pendingFinal → ActivityIndicator path. - const userMsTs = lastUserMessageAt ? toMs(lastUserMessageAt) : 0; - const isAfterUserMsg = (msg: RawMessage): boolean => { - if (!userMsTs || !msg.timestamp) return true; - return toMs(msg.timestamp) >= userMsTs; - }; - - if (isSendingNow && !pendingFinal) { - const hasRecentAssistantActivity = [...filteredMessages].reverse().some((msg) => { - if (msg.role !== 'assistant') return false; - return isAfterUserMsg(msg); - }); - if (hasRecentAssistantActivity) { - set({ pendingFinal: true }); - } - } - - // If pendingFinal, check whether the AI produced a final text response. - if (pendingFinal || get().pendingFinal) { - const recentAssistant = [...filteredMessages].reverse().find((msg) => { - if (msg.role !== 'assistant') return false; - if (!hasNonToolAssistantContent(msg)) return false; - return isAfterUserMsg(msg); - }); - if (recentAssistant) { - clearHistoryPoll(); - set({ sending: false, activeRunId: null, pendingFinal: false }); - } + if (rawMessages.length === 0 && isCronSessionKey(currentSessionKey)) { + rawMessages = await loadCronFallbackMessages(currentSessionKey, 200); } + applyLoadedMessages(rawMessages, thinkingLevel); } else { - set({ messages: [], loading: false }); + const fallbackMessages = await loadCronFallbackMessages(currentSessionKey, 200); + if (fallbackMessages.length > 0) { + applyLoadedMessages(fallbackMessages, null); + } else { + set({ messages: [], loading: false }); + } } } catch (err) { console.warn('Failed to load chat history:', err); - set({ messages: [], loading: false }); + const fallbackMessages = await loadCronFallbackMessages(currentSessionKey, 200); + if (fallbackMessages.length > 0) { + applyLoadedMessages(fallbackMessages, null); + } else { + set({ messages: [], loading: false }); + } } }, }; diff --git a/src/stores/chat/session-actions.ts b/src/stores/chat/session-actions.ts index 7e1e1341a..bca90d2fc 100644 --- a/src/stores/chat/session-actions.ts +++ b/src/stores/chat/session-actions.ts @@ -9,6 +9,19 @@ function getAgentIdFromSessionKey(sessionKey: string): string { return agentId || 'main'; } +function parseSessionUpdatedAtMs(value: unknown): number | undefined { + if (typeof value === 'number' && Number.isFinite(value)) { + return toMs(value); + } + if (typeof value === 'string' && value.trim()) { + const parsed = Date.parse(value); + if (Number.isFinite(parsed)) { + return parsed; + } + } + return undefined; +} + export function createSessionActions( set: ChatSet, get: ChatGet, @@ -31,6 +44,7 @@ export function createSessionActions( displayName: s.displayName ? String(s.displayName) : undefined, thinkingLevel: s.thinkingLevel ? String(s.thinkingLevel) : undefined, model: s.model ? String(s.model) : undefined, + updatedAt: parseSessionUpdatedAtMs(s.updatedAt), })).filter((s: ChatSession) => s.key); const canonicalBySuffix = new Map(); @@ -76,11 +90,21 @@ export function createSessionActions( ] : dedupedSessions; - set({ + const discoveredActivity = Object.fromEntries( + sessionsWithCurrent + .filter((session) => typeof session.updatedAt === 'number' && Number.isFinite(session.updatedAt)) + .map((session) => [session.key, session.updatedAt!]), + ); + + set((state) => ({ sessions: sessionsWithCurrent, currentSessionKey: nextSessionKey, currentAgentId: getAgentIdFromSessionKey(nextSessionKey), - }); + sessionLastActivity: { + ...state.sessionLastActivity, + ...discoveredActivity, + }, + })); if (currentSessionKey !== nextSessionKey) { get().loadHistory(); diff --git a/src/stores/chat/types.ts b/src/stores/chat/types.ts index c1bf3ebce..3f503ee46 100644 --- a/src/stores/chat/types.ts +++ b/src/stores/chat/types.ts @@ -44,6 +44,7 @@ export interface ChatSession { displayName?: string; thinkingLevel?: string; model?: string; + updatedAt?: number; } export interface ToolStatus { diff --git a/src/stores/gateway.ts b/src/stores/gateway.ts index 61a472ad2..d65e35e65 100644 --- a/src/stores/gateway.ts +++ b/src/stores/gateway.ts @@ -65,10 +65,19 @@ function handleGatewayNotification(notification: { method?: string; params?: Rec if (phase === 'started' && runId != null && sessionKey != null) { import('./chat') .then(({ useChatStore }) => { - useChatStore.getState().handleChatEvent({ + const state = useChatStore.getState(); + const resolvedSessionKey = String(sessionKey); + const shouldRefreshSessions = + resolvedSessionKey !== state.currentSessionKey + || !state.sessions.some((session) => session.key === resolvedSessionKey); + if (shouldRefreshSessions) { + void state.loadSessions(); + } + + state.handleChatEvent({ state: 'started', runId, - sessionKey, + sessionKey: resolvedSessionKey, }); }) .catch(() => {}); @@ -78,8 +87,22 @@ function handleGatewayNotification(notification: { method?: string; params?: Rec import('./chat') .then(({ useChatStore }) => { const state = useChatStore.getState(); - state.loadHistory(true); - if (state.sending) { + const resolvedSessionKey = sessionKey != null ? String(sessionKey) : null; + const shouldRefreshSessions = resolvedSessionKey != null && ( + resolvedSessionKey !== state.currentSessionKey + || !state.sessions.some((session) => session.key === resolvedSessionKey) + ); + if (shouldRefreshSessions) { + void state.loadSessions(); + } + + const matchesCurrentSession = resolvedSessionKey == null || resolvedSessionKey === state.currentSessionKey; + const matchesActiveRun = runId != null && state.activeRunId != null && String(runId) === state.activeRunId; + + if (matchesCurrentSession || matchesActiveRun) { + void state.loadHistory(true); + } + if ((matchesCurrentSession || matchesActiveRun) && state.sending) { useChatStore.setState({ sending: false, activeRunId: null, diff --git a/tests/unit/chat-history-actions.test.ts b/tests/unit/chat-history-actions.test.ts new file mode 100644 index 000000000..eb03c62bb --- /dev/null +++ b/tests/unit/chat-history-actions.test.ts @@ -0,0 +1,131 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +const invokeIpcMock = vi.fn(); +const hostApiFetchMock = vi.fn(); +const clearHistoryPoll = vi.fn(); +const enrichWithCachedImages = vi.fn((messages) => messages); +const enrichWithToolResultFiles = vi.fn((messages) => messages); +const getMessageText = vi.fn((content: unknown) => typeof content === 'string' ? content : ''); +const hasNonToolAssistantContent = vi.fn((message: { content?: unknown } | undefined) => { + if (!message) return false; + return typeof message.content === 'string' ? message.content.trim().length > 0 : true; +}); +const isToolResultRole = vi.fn((role: unknown) => role === 'toolresult' || role === 'tool_result'); +const loadMissingPreviews = vi.fn(async () => false); +const toMs = vi.fn((ts: number) => ts < 1e12 ? ts * 1000 : ts); + +vi.mock('@/lib/api-client', () => ({ + invokeIpc: (...args: unknown[]) => invokeIpcMock(...args), +})); + +vi.mock('@/lib/host-api', () => ({ + hostApiFetch: (...args: unknown[]) => hostApiFetchMock(...args), +})); + +vi.mock('@/stores/chat/helpers', () => ({ + clearHistoryPoll: (...args: unknown[]) => clearHistoryPoll(...args), + enrichWithCachedImages: (...args: unknown[]) => enrichWithCachedImages(...args), + enrichWithToolResultFiles: (...args: unknown[]) => enrichWithToolResultFiles(...args), + getMessageText: (...args: unknown[]) => getMessageText(...args), + hasNonToolAssistantContent: (...args: unknown[]) => hasNonToolAssistantContent(...args), + isToolResultRole: (...args: unknown[]) => isToolResultRole(...args), + loadMissingPreviews: (...args: unknown[]) => loadMissingPreviews(...args), + toMs: (...args: unknown[]) => toMs(...args as Parameters), +})); + +type ChatLikeState = { + currentSessionKey: string; + messages: Array<{ role: string; timestamp?: number; content?: unknown; _attachedFiles?: unknown[] }>; + loading: boolean; + error: string | null; + sending: boolean; + lastUserMessageAt: number | null; + pendingFinal: boolean; + sessionLabels: Record; + sessionLastActivity: Record; + thinkingLevel: string | null; + activeRunId: string | null; +}; + +function makeHarness(initial?: Partial) { + let state: ChatLikeState = { + currentSessionKey: 'agent:main:main', + messages: [], + loading: false, + error: null, + sending: false, + lastUserMessageAt: null, + pendingFinal: false, + sessionLabels: {}, + sessionLastActivity: {}, + thinkingLevel: null, + activeRunId: null, + ...initial, + }; + + const set = (partial: Partial | ((s: ChatLikeState) => Partial)) => { + const patch = typeof partial === 'function' ? partial(state) : partial; + state = { ...state, ...patch }; + }; + const get = () => state; + return { set, get, read: () => state }; +} + +describe('chat history actions', () => { + beforeEach(() => { + vi.resetAllMocks(); + invokeIpcMock.mockResolvedValue({ success: true, result: { messages: [] } }); + hostApiFetchMock.mockResolvedValue({ messages: [] }); + }); + + it('uses cron session fallback when gateway history is empty', async () => { + const { createHistoryActions } = await import('@/stores/chat/history-actions'); + const h = makeHarness({ + currentSessionKey: 'agent:main:cron:job-1', + }); + const actions = createHistoryActions(h.set as never, h.get as never); + + hostApiFetchMock.mockResolvedValueOnce({ + messages: [ + { + id: 'cron-meta-job-1', + role: 'system', + content: 'Scheduled task: Drink water', + timestamp: 1773281731495, + }, + { + id: 'cron-run-1', + role: 'assistant', + content: 'Drink water 💧', + timestamp: 1773281732751, + }, + ], + }); + + await actions.loadHistory(); + + expect(hostApiFetchMock).toHaveBeenCalledWith( + '/api/cron/session-history?sessionKey=agent%3Amain%3Acron%3Ajob-1&limit=200', + ); + expect(h.read().messages.map((message) => message.content)).toEqual([ + 'Scheduled task: Drink water', + 'Drink water 💧', + ]); + expect(h.read().sessionLastActivity['agent:main:cron:job-1']).toBe(1773281732751); + expect(h.read().loading).toBe(false); + }); + + it('does not use cron fallback for normal sessions', async () => { + const { createHistoryActions } = await import('@/stores/chat/history-actions'); + const h = makeHarness({ + currentSessionKey: 'agent:main:main', + }); + const actions = createHistoryActions(h.set as never, h.get as never); + + await actions.loadHistory(); + + expect(hostApiFetchMock).not.toHaveBeenCalled(); + expect(h.read().messages).toEqual([]); + expect(h.read().loading).toBe(false); + }); +}); diff --git a/tests/unit/chat-session-actions.test.ts b/tests/unit/chat-session-actions.test.ts index 79a78b59a..7c1d5574a 100644 --- a/tests/unit/chat-session-actions.test.ts +++ b/tests/unit/chat-session-actions.test.ts @@ -8,7 +8,7 @@ vi.mock('@/lib/api-client', () => ({ type ChatLikeState = { currentSessionKey: string; - sessions: Array<{ key: string; displayName?: string }>; + sessions: Array<{ key: string; displayName?: string; updatedAt?: number }>; messages: Array<{ role: string; timestamp?: number; content?: unknown }>; sessionLabels: Record; sessionLastActivity: Record; @@ -119,5 +119,38 @@ describe('chat session actions', () => { expect(next.pendingFinal).toBe(false); nowSpy.mockRestore(); }); + + it('seeds sessionLastActivity from backend updatedAt metadata', async () => { + const { createSessionActions } = await import('@/stores/chat/session-actions'); + const h = makeHarness({ + currentSessionKey: 'agent:main:main', + sessions: [], + }); + const actions = createSessionActions(h.set as never, h.get as never); + + invokeIpcMock.mockResolvedValueOnce({ + success: true, + result: { + sessions: [ + { + key: 'agent:main:main', + displayName: 'Main', + updatedAt: 1773281700000, + }, + { + key: 'agent:main:cron:job-1', + label: 'Cron: Drink water', + updatedAt: 1773281731621, + }, + ], + }, + }); + + await actions.loadSessions(); + + expect(h.read().sessionLastActivity['agent:main:main']).toBe(1773281700000); + expect(h.read().sessionLastActivity['agent:main:cron:job-1']).toBe(1773281731621); + expect(h.read().sessions.find((session) => session.key === 'agent:main:cron:job-1')?.updatedAt).toBe(1773281731621); + }); });