From 758a8f8c941579eab4ea1faea589f1f93508379c Mon Sep 17 00:00:00 2001 From: Tao Yiping Date: Tue, 14 Apr 2026 14:52:47 +0800 Subject: [PATCH] support reasoning agentid by accountId or session for cron (#847) --- electron/api/routes/channels.ts | 11 +- electron/api/routes/cron.ts | 87 +++++++++- electron/main/ipc-handlers.ts | 64 +++++++- electron/utils/agent-config.ts | 19 +++ electron/utils/session-util.ts | 79 +++++++++ src/pages/Cron/index.tsx | 4 +- src/stores/cron.ts | 113 +++---------- src/stores/gateway.ts | 12 ++ tests/unit/cron-store.test.ts | 276 -------------------------------- 9 files changed, 276 insertions(+), 389 deletions(-) create mode 100644 electron/utils/session-util.ts delete mode 100644 tests/unit/cron-store.test.ts diff --git a/electron/api/routes/channels.ts b/electron/api/routes/channels.ts index 3197d20bc..4405be241 100644 --- a/electron/api/routes/channels.ts +++ b/electron/api/routes/channels.ts @@ -1,4 +1,5 @@ import { readFile, readdir } from 'node:fs/promises'; +import { extractSessionRecords } from '../../utils/session-util'; import type { IncomingMessage, ServerResponse } from 'http'; import { join } from 'node:path'; import { @@ -679,16 +680,6 @@ function inferTargetKindFromValue( return 'user'; } -function extractSessionRecords(store: JsonRecord): JsonRecord[] { - const directEntries = Object.entries(store) - .filter(([key, value]) => key !== 'sessions' && value && typeof value === 'object') - .map(([, value]) => value as JsonRecord); - const arrayEntries = Array.isArray(store.sessions) - ? store.sessions.filter((entry): entry is JsonRecord => Boolean(entry && typeof entry === 'object')) - : []; - return [...directEntries, ...arrayEntries]; -} - function buildChannelTargetCacheKey(params: { channelType: string; accountId?: string; diff --git a/electron/api/routes/cron.ts b/electron/api/routes/cron.ts index 24d85144b..caa1e0143 100644 --- a/electron/api/routes/cron.ts +++ b/electron/api/routes/cron.ts @@ -4,8 +4,14 @@ import { join } from 'node:path'; import type { HostApiContext } from '../context'; import { parseJsonBody, sendJson } from '../route-utils'; import { getOpenClawConfigDir } from '../../utils/paths'; +import { resolveAccountIdFromSessionHistory } from '../../utils/session-util'; import { toOpenClawChannelType, toUiChannelType } from '../../utils/channel-alias'; +import { resolveAgentIdFromChannel } from '../../utils/agent-config'; +/** + * Find agentId from session history by delivery "to" address. + * Efficiently searches only agent session directories for matching deliveryContext.to. + */ interface GatewayCronJob { id: string; name: string; @@ -461,6 +467,14 @@ export async function handleCronRoutes( const result = await ctx.gatewayManager.rpc('cron.list', { includeDisabled: true }, 8000); const data = result as { jobs?: GatewayCronJob[] }; jobs = data?.jobs ?? (Array.isArray(result) ? result as GatewayCronJob[] : []); + + // DEBUG: log name and agentId for each job + console.debug('Fetched cron jobs from Gateway:'); + for (const job of jobs) { + const jobAgentId = (job as unknown as { agentId?: string }).agentId; + const deliveryInfo = job.delivery ? `delivery={mode:${job.delivery.mode}, channel:${job.delivery.channel || '(none)'}, accountId:${job.delivery.accountId || '(none)'}, to:${job.delivery.to || '(none)'}}` : 'delivery=(none)'; + console.debug(` - name: "${job.name}", agentId: "${jobAgentId || '(undefined)'}", ${deliveryInfo}, sessionTarget: "${job.sessionTarget || '(none)'}", payload.kind: "${job.payload?.kind || '(none)'}"`); + } } catch { // Fallback: read cron.json directly when Gateway RPC fails/times out. try { @@ -477,7 +491,8 @@ export async function handleCronRoutes( // Run repair in background — don't block the response. if (!usedFallback && jobs.length > 0) { - const jobsToRepair = jobs.filter((job) => { + // Repair 1: delivery channel missing + const jobsToRepairDelivery = jobs.filter((job) => { const isIsolatedAgent = (job.sessionTarget === 'isolated' || !job.sessionTarget) && job.payload?.kind === 'agentTurn'; @@ -487,10 +502,10 @@ export async function handleCronRoutes( !job.delivery?.channel ); }); - if (jobsToRepair.length > 0) { + if (jobsToRepairDelivery.length > 0) { // Fire-and-forget: repair in background void (async () => { - for (const job of jobsToRepair) { + for (const job of jobsToRepairDelivery) { try { await ctx.gatewayManager.rpc('cron.update', { id: job.id, @@ -502,7 +517,7 @@ export async function handleCronRoutes( } })(); // Optimistically fix the response data - for (const job of jobsToRepair) { + for (const job of jobsToRepairDelivery) { job.delivery = { mode: 'none' }; if (job.state?.lastError?.includes('Channel is required')) { job.state.lastError = undefined; @@ -510,6 +525,68 @@ export async function handleCronRoutes( } } } + + // Repair 2: agentId is undefined for jobs with announce delivery + // Only repair undefined -> inferred agent, NOT main -> inferred agent + const jobsToRepairAgent = jobs.filter((job) => { + const jobAgentId = (job as unknown as { agentId?: string }).agentId; + return ( + (job.sessionTarget === 'isolated' || !job.sessionTarget) && + job.payload?.kind === 'agentTurn' && + job.delivery?.mode === 'announce' && + job.delivery?.channel && + jobAgentId === undefined // Only repair when agentId is completely undefined + ); + }); + if (jobsToRepairAgent.length > 0) { + console.debug(`Found ${jobsToRepairAgent.length} jobs needing agent repair:`); + for (const job of jobsToRepairAgent) { + console.debug(` - Job "${job.name}" (id: ${job.id}): current agentId="${(job as unknown as { agentId?: string }).agentId || '(undefined)'}", channel="${job.delivery?.channel}", accountId="${job.delivery?.accountId || '(none)'}"`); + } + // Fire-and-forget: repair in background + void (async () => { + for (const job of jobsToRepairAgent) { + try { + const channel = toOpenClawChannelType(job.delivery!.channel!); + const accountId = job.delivery!.accountId; + const toAddress = job.delivery!.to; + + // Try 1: resolve from channel + accountId binding + let correctAgentId = await resolveAgentIdFromChannel(channel, accountId); + + // If no accountId, try to resolve it from session history using "to" address, then get agentId + let resolvedAccountId: string | null = null; + if (!correctAgentId && !accountId && toAddress) { + console.debug(`No binding found for channel="${channel}", accountId="${accountId || '(none)'}", trying session history for to="${toAddress}"`); + resolvedAccountId = await resolveAccountIdFromSessionHistory(toAddress, channel); + if (resolvedAccountId) { + console.debug(`Resolved accountId="${resolvedAccountId}" from session history, now resolving agentId`); + correctAgentId = await resolveAgentIdFromChannel(channel, resolvedAccountId); + } + } + + if (correctAgentId) { + console.debug(`Repairing job "${job.name}": agentId "${(job as unknown as { agentId?: string }).agentId || '(undefined)'}" -> "${correctAgentId}"`); + // When accountId was resolved via to address, include it in the patch + const patch: Record = { agentId: correctAgentId }; + if (resolvedAccountId && !accountId) { + patch.delivery = { accountId: resolvedAccountId }; + } + await ctx.gatewayManager.rpc('cron.update', { id: job.id, patch }); + // Update the local job object so response reflects correct agentId + (job as unknown as { agentId: string }).agentId = correctAgentId; + if (resolvedAccountId && !accountId && job.delivery) { + job.delivery.accountId = resolvedAccountId; + } + } else { + console.warn(`Could not resolve agent for job "${job.name}": channel="${channel}", accountId="${accountId || '(none)'}", to="${toAddress || '(none)'}"`); + } + } catch (error) { + console.error(`Failed to repair agent for job "${job.name}":`, error); + } + } + })(); + } } sendJson(res, 200, jobs.map((job) => ({ ...transformCronJob(job), ...(usedFallback ? { _fromFallback: true } : {}) }))); @@ -532,6 +609,8 @@ export async function handleCronRoutes( const agentId = typeof input.agentId === 'string' && input.agentId.trim() ? input.agentId.trim() : 'main'; + // DEBUG: log the input and resolved agentId + console.debug(`Creating cron job: name="${input.name}", input.agentId="${input.agentId || '(not provided)'}", resolved agentId="${agentId}"`); const delivery = normalizeCronDelivery(input.delivery); const unsupportedDeliveryError = getUnsupportedCronDeliveryError(delivery.channel); if (delivery.mode === 'announce' && unsupportedDeliveryError) { diff --git a/electron/main/ipc-handlers.ts b/electron/main/ipc-handlers.ts index 295867878..e988ab5e7 100644 --- a/electron/main/ipc-handlers.ts +++ b/electron/main/ipc-handlers.ts @@ -22,6 +22,8 @@ import { import { syncProxyConfigToOpenClaw } from '../utils/openclaw-proxy'; import { buildOpenClawControlUiUrl } from '../utils/openclaw-control-ui'; import { logger } from '../utils/logger'; +import { resolveAgentIdFromChannel } from '../utils/agent-config'; +import { resolveAccountIdFromSessionHistory } from '../utils/session-util'; import { saveChannelConfig, getChannelConfig, @@ -891,8 +893,7 @@ function registerCronHandlers(gatewayManager: GatewayManager): void { ipcMain.handle('cron:list', async () => { try { const result = await gatewayManager.rpc('cron.list', { includeDisabled: true }); - const data = result as { jobs?: GatewayCronJob[] }; - const jobs = data?.jobs ?? []; + const jobs = Array.isArray(result) ? result : (result as { jobs?: GatewayCronJob[] })?.jobs ?? []; // Auto-repair legacy UI-created jobs that were saved without // delivery: { mode: 'none' }. The Gateway auto-normalizes them @@ -1031,6 +1032,65 @@ function registerCronHandlers(gatewayManager: GatewayManager): void { throw error; } }); + + // Periodic cron job repair: checks for jobs with undefined agentId and repairs them + // This handles cases where cron jobs were created via openclaw CLI without specifying agent + const CRON_AGENT_REPAIR_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes + let _lastRepairErrorLogAt = 0; + const REPAIR_ERROR_LOG_INTERVAL_MS = 60 * 60 * 1000; // 1 hour + setInterval(async () => { + try { + const status = gatewayManager.getStatus(); + if (status.state !== 'running') return; + + const result = await gatewayManager.rpc('cron.list', { includeDisabled: true }); + const jobs = Array.isArray(result) + ? result + : (result as { jobs?: Array<{ id: string; name: string; sessionTarget?: string; payload?: { kind: string }; delivery?: { mode: string; channel?: string; to?: string; accountId?: string }; state?: Record }> })?.jobs ?? []; + + for (const job of jobs) { + const jobAgentId = (job as unknown as { agentId?: string }).agentId; + if ( + (job.sessionTarget === 'isolated' || !job.sessionTarget) && + job.payload?.kind === 'agentTurn' && + job.delivery?.mode === 'announce' && + job.delivery?.channel && + jobAgentId === undefined + ) { + const channel = job.delivery.channel; + const accountId = job.delivery.accountId; + const toAddress = job.delivery.to; + + let correctAgentId = await resolveAgentIdFromChannel(channel, accountId); + + // If no accountId, try to resolve it from session history + let resolvedAccountId: string | null = null; + if (!correctAgentId && !accountId && toAddress) { + resolvedAccountId = await resolveAccountIdFromSessionHistory(toAddress, channel); + if (resolvedAccountId) { + correctAgentId = await resolveAgentIdFromChannel(channel, resolvedAccountId); + } + } + + if (correctAgentId) { + console.debug(`Periodic repair: job "${job.name}" agentId undefined -> "${correctAgentId}"`); + // When accountId was resolved via to address, include it in the patch + const patch: Record = { agentId: correctAgentId }; + if (resolvedAccountId && !accountId) { + patch.delivery = { accountId: resolvedAccountId }; + } + await gatewayManager.rpc('cron.update', { id: job.id, patch }); + } + } + } + } catch (error) { + const now = Date.now(); + if (now - _lastRepairErrorLogAt >= REPAIR_ERROR_LOG_INTERVAL_MS) { + _lastRepairErrorLogAt = now; + console.debug('Periodic cron repair error:', error); + } + } + }, CRON_AGENT_REPAIR_INTERVAL_MS); } /** diff --git a/electron/utils/agent-config.ts b/electron/utils/agent-config.ts index 9dd0309ac..525a5d3c6 100644 --- a/electron/utils/agent-config.ts +++ b/electron/utils/agent-config.ts @@ -555,6 +555,25 @@ export async function listConfiguredAgentIds(): Promise { return ids.length > 0 ? ids : [MAIN_AGENT_ID]; } +/** + * Resolve agentId from channel and accountId using bindings. + * Returns the agentId if found, or null if no binding exists. + */ +export async function resolveAgentIdFromChannel(channel: string, accountId?: string): Promise { + const config = await readOpenClawConfig() as AgentConfigDocument; + const { channelToAgent, accountToAgent } = getChannelBindingMap(config.bindings); + + // First try account-specific binding + if (accountId) { + const agentId = accountToAgent.get(`${channel}:${accountId}`); + if (agentId) return agentId; + } + + // Fallback to channel-only binding + const agentId = channelToAgent.get(channel); + return agentId ?? null; +} + export async function createAgent( name: string, options?: { inheritWorkspace?: boolean }, diff --git a/electron/utils/session-util.ts b/electron/utils/session-util.ts new file mode 100644 index 000000000..c27ad3afa --- /dev/null +++ b/electron/utils/session-util.ts @@ -0,0 +1,79 @@ +/** + * Shared session utilities + */ +import { readFile, readdir } from 'node:fs/promises'; +import { join } from 'node:path'; +import { getOpenClawConfigDir } from './paths'; + +type JsonRecord = Record; + +/** + * Parse sessions.json supporting both formats: + * - Object-keyed: { "agent:xxx:yyy": { deliveryContext: {...} } } + * - Array format: { sessions: [...] } + */ +export function extractSessionRecords(store: JsonRecord): JsonRecord[] { + const directEntries = Object.entries(store) + .filter(([key, value]) => key !== 'sessions' && value && typeof value === 'object') + .map(([, value]) => value as JsonRecord); + const arrayEntries = Array.isArray(store.sessions) + ? store.sessions.filter((entry): entry is JsonRecord => Boolean(entry && typeof entry === 'object')) + : []; + return [...directEntries, ...arrayEntries]; +} + +/** + * Find accountId from session history by "to" address and channel type. + * Searches all agent session directories for a matching deliveryContext. + */ +export async function resolveAccountIdFromSessionHistory( + toAddress: string, + channelType: string, +): Promise { + const agentsDir = join(getOpenClawConfigDir(), 'agents'); + + let agentDirs: Array<{ name: string; isDirectory: () => boolean }>; + try { + agentDirs = await readdir(agentsDir, { withFileTypes: true }); + } catch { + return null; + } + + for (const entry of agentDirs) { + if (!entry.isDirectory()) continue; + + const sessionsPath = join(agentsDir, entry.name, 'sessions', 'sessions.json'); + let raw: string; + try { + raw = await readFile(sessionsPath, 'utf8'); + } catch { + continue; + } + + if (!raw.trim()) continue; + + let parsed: Record; + try { + parsed = JSON.parse(raw); + } catch { + continue; + } + + for (const session of extractSessionRecords(parsed as JsonRecord)) { + const deliveryContext = session.deliveryContext as Record | undefined; + if ( + deliveryContext && + typeof deliveryContext.to === 'string' && + deliveryContext.to === toAddress && + typeof deliveryContext.channel === 'string' && + deliveryContext.channel === channelType + ) { + if (typeof deliveryContext.accountId === 'string') { + return deliveryContext.accountId; + } + } + } + } + + return null; +} diff --git a/src/pages/Cron/index.tsx b/src/pages/Cron/index.tsx index 948214a6e..92ae047e5 100644 --- a/src/pages/Cron/index.tsx +++ b/src/pages/Cron/index.tsx @@ -251,7 +251,6 @@ function TaskDialog({ job, configuredChannels, onClose, onSave }: TaskDialogProp const [name, setName] = useState(job?.name || ''); const [message, setMessage] = useState(job?.message || ''); const [selectedAgentId, setSelectedAgentId] = useState(job?.agentId || useChatStore.getState().currentAgentId); - const [agentIdChanged, setAgentIdChanged] = useState(false); // Extract cron expression string from CronSchedule object or use as-is if string const initialSchedule = (() => { const s = job?.schedule; @@ -411,7 +410,7 @@ function TaskDialog({ job, configuredChannels, onClose, onSave }: TaskDialogProp schedule: finalSchedule, delivery: finalDelivery, enabled, - ...(agentIdChanged ? { agentId: selectedAgentId } : {}), + agentId: selectedAgentId, }); onClose(); toast.success(job ? t('toast.updated') : t('toast.created')); @@ -468,7 +467,6 @@ function TaskDialog({ job, configuredChannels, onClose, onSave }: TaskDialogProp value={selectedAgentId} onChange={(e) => { setSelectedAgentId(e.target.value); - setAgentIdChanged(true); }} className="h-[44px] rounded-xl border-black/10 dark:border-white/10 bg-[#eeece3] dark:bg-muted text-[13px]" > diff --git a/src/stores/cron.ts b/src/stores/cron.ts index 8f2777f2d..0e61a4fed 100644 --- a/src/stores/cron.ts +++ b/src/stores/cron.ts @@ -11,7 +11,7 @@ interface CronState { jobs: CronJob[]; loading: boolean; error: string | null; - + // Actions fetchJobs: () => Promise; createJob: (input: CronJobCreateInput) => Promise; @@ -26,7 +26,7 @@ export const useCronStore = create((set) => ({ jobs: [], loading: false, error: null, - + fetchJobs: async () => { const currentJobs = useCronStore.getState().jobs; // Only show loading spinner when there's no data yet (stale-while-revalidate). @@ -39,49 +39,20 @@ export const useCronStore = create((set) => ({ try { const result = await hostApiFetch('/api/cron/jobs'); - // If Gateway returned fewer jobs than we have, something might be wrong - preserve all known jobs - // and just update agentIds from localStorage for the ones Gateway returned. - // Priority: API agentId (if non-'main') > currentJobs > localStorage > 'main' - const resultIds = new Set(result.map(j => j.id)); - const savedAgentIdMap = JSON.parse(localStorage.getItem('cronAgentIdMap') || '{}') as Record; + // Gateway now correctly returns agentId for all jobs. + // If Gateway returned fewer jobs than we have (e.g. race condition), preserve + // the extra ones from current state to avoid losing data. + const resultIds = new Set(result.map((j) => j.id)); + const extraJobs = currentJobs.filter((j) => !resultIds.has(j.id)); + const allJobs = [...result, ...extraJobs]; - // Update localStorage agentId map with current data - const newAgentIdMap: Record = {}; - - // For jobs returned by Gateway, restore agentId - const jobsWithAgentId = result.map((job) => { - // Priority: API response (if non-'main') > currentJobs > localStorage > default 'main' - const existingJob = currentJobs.find((j) => j.id === job.id); - const savedAgentId = savedAgentIdMap[job.id]; - let agentId = job.agentId; - if (!agentId || agentId === 'main') { - // API returned 'main' or nothing — use cached value - if (existingJob && existingJob.agentId !== 'main') { - agentId = existingJob.agentId; - } else if (savedAgentId && savedAgentId !== 'main') { - agentId = savedAgentId; - } else { - agentId = 'main'; - } - } - if (agentId !== 'main') { - newAgentIdMap[job.id] = agentId; - } - return { ...job, agentId }; - }); - - // If Gateway returned fewer jobs, preserve extra jobs from current state - const extraJobs = currentJobs.filter(j => !resultIds.has(j.id)); - const allJobs = [...jobsWithAgentId, ...extraJobs]; - - localStorage.setItem('cronAgentIdMap', JSON.stringify(newAgentIdMap)); set({ jobs: allJobs, loading: false }); } catch (error) { // Preserve previous jobs on error so the user sees stale data instead of nothing. set({ error: String(error), loading: false }); } }, - + createJob: async (input) => { try { // Auto-capture currentAgentId if not provided @@ -90,59 +61,23 @@ export const useCronStore = create((set) => ({ method: 'POST', body: JSON.stringify({ ...input, agentId }), }); - const jobWithAgentId = { ...job, agentId }; - // Persist agentId to localStorage (since Gateway doesn't return it) - const savedMap = JSON.parse(localStorage.getItem('cronAgentIdMap') || '{}') as Record; - savedMap[jobWithAgentId.id] = agentId; - localStorage.setItem('cronAgentIdMap', JSON.stringify(savedMap)); - set((state) => ({ jobs: [...state.jobs, jobWithAgentId] })); - return jobWithAgentId; + set((state) => ({ jobs: [...state.jobs, job] })); + return job; } catch (error) { console.error('Failed to create cron job:', error); throw error; } }, - + updateJob: async (id, input) => { try { - const currentJob = useCronStore.getState().jobs.find((j) => j.id === id); - const newAgentId = input.agentId; - - // If agentId changed, recreate with new agentId first then delete old one (Gateway doesn't support updating sessionTarget) - if (newAgentId && currentJob && newAgentId !== currentJob.agentId) { - // Create new job with new agentId first (preserves schedule on failure) - const { agentId: _agentId, ...restInput } = input; - const newJob = await hostApiFetch('/api/cron/jobs', { - method: 'POST', - body: JSON.stringify({ ...restInput, agentId: newAgentId }), - }); - const jobWithAgentId = { ...currentJob, ...newJob, agentId: newAgentId }; - // Update localStorage: add new id first, then remove old id - const savedMap = JSON.parse(localStorage.getItem('cronAgentIdMap') || '{}') as Record; - savedMap[jobWithAgentId.id] = newAgentId; - localStorage.setItem('cronAgentIdMap', JSON.stringify(savedMap)); - // Delete old job after new one is created successfully - await hostApiFetch(`/api/cron/jobs/${encodeURIComponent(id)}`, { - method: 'DELETE', - }); - delete savedMap[id]; - localStorage.setItem('cronAgentIdMap', JSON.stringify(savedMap)); - set((state) => ({ - jobs: state.jobs.map((j) => (j.id === id ? jobWithAgentId : j)), - })); - return; - } - - // Normal update for other fields - use currentJob as base, overlay updatedJob to preserve fields const updatedJob = await hostApiFetch(`/api/cron/jobs/${encodeURIComponent(id)}`, { method: 'PUT', body: JSON.stringify(input), }); - // Merge: updatedJob fields override currentJob, but preserve currentJob fields not in updatedJob - const jobWithAgentId = { ...currentJob, ...updatedJob, agentId: currentJob?.agentId ?? updatedJob.agentId }; set((state) => ({ jobs: state.jobs.map((job) => - job.id === id ? jobWithAgentId : job + job.id === id ? updatedJob : job ), })); } catch (error) { @@ -150,16 +85,12 @@ export const useCronStore = create((set) => ({ throw error; } }, - + deleteJob: async (id) => { try { await hostApiFetch(`/api/cron/jobs/${encodeURIComponent(id)}`, { method: 'DELETE', }); - // Remove from localStorage - const savedMap = JSON.parse(localStorage.getItem('cronAgentIdMap') || '{}') as Record; - delete savedMap[id]; - localStorage.setItem('cronAgentIdMap', JSON.stringify(savedMap)); set((state) => ({ jobs: state.jobs.filter((job) => job.id !== id), })); @@ -168,7 +99,7 @@ export const useCronStore = create((set) => ({ throw error; } }, - + toggleJob: async (id, enabled) => { try { await hostApiFetch('/api/cron/toggle', { @@ -185,7 +116,7 @@ export const useCronStore = create((set) => ({ throw error; } }, - + triggerJob: async (id) => { try { await hostApiFetch('/api/cron/trigger', { @@ -194,14 +125,8 @@ export const useCronStore = create((set) => ({ }); // Refresh jobs after trigger to update lastRun/nextRun state try { - const currentJobs = useCronStore.getState().jobs; - const resultJobs = await hostApiFetch('/api/cron/jobs'); - // Preserve agentId from existing jobs - const jobsWithAgentId = resultJobs.map((job) => { - const existing = currentJobs.find((j) => j.id === job.id); - return existing ? { ...job, agentId: existing.agentId } : job; - }); - set({ jobs: jobsWithAgentId }); + const result = await hostApiFetch('/api/cron/jobs'); + set({ jobs: result }); } catch { // Ignore refresh error } @@ -210,6 +135,6 @@ export const useCronStore = create((set) => ({ throw error; } }, - + setJobs: (jobs) => set({ jobs }), })); diff --git a/src/stores/gateway.ts b/src/stores/gateway.ts index 57843753a..dcb2b9bba 100644 --- a/src/stores/gateway.ts +++ b/src/stores/gateway.ts @@ -17,6 +17,7 @@ const LOAD_SESSIONS_MIN_INTERVAL_MS = 1_200; const LOAD_HISTORY_MIN_INTERVAL_MS = 800; let lastLoadSessionsAt = 0; let lastLoadHistoryAt = 0; +let cronRepairTriggeredThisSession = false; interface GatewayHealth { ok: boolean; @@ -262,6 +263,17 @@ export const useGatewayStore = create((set, get) => ({ const unsubscribers: Array<() => void> = []; unsubscribers.push(subscribeHostEvent('gateway:status', (payload) => { set({ status: payload }); + + // Trigger cron repair when gateway becomes ready + if (!cronRepairTriggeredThisSession && payload.state === 'running') { + cronRepairTriggeredThisSession = true; + // Fire-and-forget: fetch cron jobs to trigger repair logic in background + import('./cron') + .then(({ useCronStore }) => { + useCronStore.getState().fetchJobs(); + }) + .catch(() => {}); + } })); unsubscribers.push(subscribeHostEvent<{ message?: string }>('gateway:error', (payload) => { set({ lastError: payload.message || 'Gateway error' }); diff --git a/tests/unit/cron-store.test.ts b/tests/unit/cron-store.test.ts deleted file mode 100644 index 5c96894ad..000000000 --- a/tests/unit/cron-store.test.ts +++ /dev/null @@ -1,276 +0,0 @@ -/** - * Cron Store Tests - */ -import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest'; -import { useCronStore } from '@/stores/cron'; -import { useChatStore } from '@/stores/chat'; - -// Mock hostApiFetch -const mockHostApiFetch = vi.fn(); -vi.mock('@/lib/host-api', () => ({ - hostApiFetch: (...args: unknown[]) => mockHostApiFetch(...args), -})); - -// Mock localStorage -const localStorageMock = { - data: {} as Record, - getItem: vi.fn((key: string) => localStorageMock.data[key] ?? null), - setItem: vi.fn((key: string, value: string) => { localStorageMock.data[key] = value; }), - removeItem: vi.fn((key: string) => { delete localStorageMock.data[key]; }), - clear: vi.fn(() => { localStorageMock.data = {}; }), -}; -Object.defineProperty(global, 'localStorage', { value: localStorageMock }); - -describe('Cron Store', () => { - beforeEach(() => { - vi.resetAllMocks(); - localStorageMock.data = {}; - // Reset stores to default state - useCronStore.setState({ jobs: [], loading: false, error: null }); - useChatStore.setState({ currentAgentId: 'main', currentSessionKey: 'agent:main:session-1' }); - }); - - afterEach(() => { - localStorageMock.data = {}; - }); - - describe('fetchJobs', () => { - it('preserves agentId from localStorage when Gateway does not return agentId', async () => { - // Pre-populate localStorage with job -> agentId mapping - localStorageMock.data['cronAgentIdMap'] = JSON.stringify({ - 'job-1': 'typ-2', - 'job-2': 'agent-3', - }); - - // Gateway returns jobs WITHOUT agentId field - mockHostApiFetch.mockResolvedValueOnce([ - { id: 'job-1', name: 'Job 1', agentId: 'main', schedule: '0 9 * * *', enabled: true, message: 'Hi', delivery: { mode: 'none' }, createdAt: '', updatedAt: '' }, - { id: 'job-2', name: 'Job 2', agentId: 'main', schedule: '0 10 * * *', enabled: true, message: 'Hi', delivery: { mode: 'none' }, createdAt: '', updatedAt: '' }, - ]); - - await useCronStore.getState().fetchJobs(); - - const jobs = useCronStore.getState().jobs; - expect(jobs.find(j => j.id === 'job-1')?.agentId).toBe('typ-2'); - expect(jobs.find(j => j.id === 'job-2')?.agentId).toBe('agent-3'); - }); - - it('preserves extra jobs not returned by Gateway', async () => { - // Pre-populate localStorage - localStorageMock.data['cronAgentIdMap'] = JSON.stringify({}); - - // Set existing job in store - useCronStore.setState({ - jobs: [ - { id: 'job-extra', name: 'Extra Job', agentId: 'typ-2', schedule: '0 9 * * *', enabled: true, message: 'Hi', delivery: { mode: 'none' }, createdAt: '', updatedAt: '' }, - ], - }); - - // Gateway returns fewer jobs (missing job-extra) - mockHostApiFetch.mockResolvedValueOnce([ - { id: 'job-1', name: 'Job 1', agentId: 'main', schedule: '0 9 * * *', enabled: true, message: 'Hi', delivery: { mode: 'none' }, createdAt: '', updatedAt: '' }, - ]); - - await useCronStore.getState().fetchJobs(); - - const jobs = useCronStore.getState().jobs; - expect(jobs.length).toBe(2); - expect(jobs.find(j => j.id === 'job-extra')).toBeDefined(); - }); - - it('defaults to main agent when localStorage has no mapping', async () => { - mockHostApiFetch.mockResolvedValueOnce([ - { id: 'job-1', name: 'Job 1', agentId: 'main', schedule: '0 9 * * *', enabled: true, message: 'Hi', delivery: { mode: 'none' }, createdAt: '', updatedAt: '' }, - ]); - - await useCronStore.getState().fetchJobs(); - - const jobs = useCronStore.getState().jobs; - expect(jobs[0].agentId).toBe('main'); - }); - }); - - describe('createJob', () => { - it('auto-captures currentAgentId when agentId is not provided', async () => { - mockHostApiFetch.mockResolvedValueOnce({ - id: 'new-job', - name: 'New Job', - schedule: { kind: 'cron', expr: '0 9 * * *' }, - enabled: true, - message: 'Hi', - delivery: { mode: 'none' }, - createdAt: '', - updatedAt: '', - }); - - useChatStore.setState({ currentAgentId: 'typ-2' }); - - await useCronStore.getState().createJob({ - name: 'New Job', - message: 'Hi', - schedule: '0 9 * * *', - }); - - // Verify agentId was sent to API - const [, init] = mockHostApiFetch.mock.calls[0] as [string, Record]; - expect((init as { body: string }).body).toContain('"agentId":"typ-2"'); - - // Verify localStorage was updated - expect(localStorageMock.data['cronAgentIdMap']).toContain('typ-2'); - }); - - it('uses provided agentId when explicitly passed', async () => { - mockHostApiFetch.mockResolvedValueOnce({ - id: 'new-job', - name: 'New Job', - schedule: { kind: 'cron', expr: '0 9 * * *' }, - enabled: true, - message: 'Hi', - delivery: { mode: 'none' }, - createdAt: '', - updatedAt: '', - }); - - await useCronStore.getState().createJob({ - name: 'New Job', - message: 'Hi', - schedule: '0 9 * * *', - agentId: 'agent-5', - }); - - const [, init] = mockHostApiFetch.mock.calls[0] as [string, Record]; - expect((init as { body: string }).body).toContain('"agentId":"agent-5"'); - }); - - it('persists agentId to localStorage', async () => { - mockHostApiFetch.mockResolvedValueOnce({ - id: 'job-xyz', - name: 'Job', - schedule: { kind: 'cron', expr: '0 9 * * *' }, - enabled: true, - message: 'Hi', - delivery: { mode: 'none' }, - createdAt: '', - updatedAt: '', - }); - - useChatStore.setState({ currentAgentId: 'custom-agent' }); - - await useCronStore.getState().createJob({ - name: 'Job', - message: 'Hi', - schedule: '0 9 * * *', - }); - - const savedMap = JSON.parse(localStorageMock.data['cronAgentIdMap'] || '{}'); - expect(savedMap['job-xyz']).toBe('custom-agent'); - }); - }); - - describe('updateJob', () => { - it('preserves agentId from currentJob when updating other fields', async () => { - useCronStore.setState({ - jobs: [ - { id: 'job-1', name: 'Old Name', agentId: 'typ-2', schedule: '0 9 * * *', enabled: true, message: 'Hi', delivery: { mode: 'none' }, createdAt: '', updatedAt: '' }, - ], - }); - - // PUT returns job with updated fields but missing agentId - mockHostApiFetch.mockResolvedValueOnce({ - id: 'job-1', - name: 'New Name', - schedule: { kind: 'cron', expr: '0 9 * * *' }, - enabled: true, - message: 'Updated', - delivery: { mode: 'none' }, - createdAt: '', - updatedAt: '', - }); - - await useCronStore.getState().updateJob('job-1', { - name: 'New Name', - message: 'Updated', - schedule: '0 9 * * *', - }); - - const job = useCronStore.getState().jobs.find(j => j.id === 'job-1'); - expect(job?.agentId).toBe('typ-2'); - expect(job?.name).toBe('New Name'); - }); - - it('deletes and recreates job when agentId changes', async () => { - useCronStore.setState({ - jobs: [ - { id: 'job-1', name: 'Job', agentId: 'main', schedule: '0 9 * * *', enabled: true, message: 'Hi', delivery: { mode: 'none' }, createdAt: '', updatedAt: '' }, - ], - }); - - // POST call first (create new job before deleting old one) - mockHostApiFetch.mockResolvedValueOnce({ - id: 'job-new', - name: 'Job', - schedule: { kind: 'cron', expr: '0 9 * * *' }, - enabled: true, - message: 'Hi', - delivery: { mode: 'none' }, - createdAt: '', - updatedAt: '', - }); - // DELETE call (delete old job after new one is created) - mockHostApiFetch.mockResolvedValueOnce({}); - - await useCronStore.getState().updateJob('job-1', { - name: 'Job', - message: 'Hi', - schedule: '0 9 * * *', - agentId: 'new-agent', - }); - - // Should have POST and DELETE calls - expect(mockHostApiFetch).toHaveBeenCalledTimes(2); - - // Verify localStorage updated with new job id - const savedMap = JSON.parse(localStorageMock.data['cronAgentIdMap'] || '{}'); - expect(savedMap['job-1']).toBeUndefined(); - expect(savedMap['job-new']).toBe('new-agent'); - }); - }); - - describe('deleteJob', () => { - it('removes job from localStorage on delete', async () => { - localStorageMock.data['cronAgentIdMap'] = JSON.stringify({ - 'job-1': 'typ-2', - 'job-2': 'main', - }); - - mockHostApiFetch.mockResolvedValueOnce({}); - - await useCronStore.getState().deleteJob('job-1'); - - const savedMap = JSON.parse(localStorageMock.data['cronAgentIdMap'] || '{}'); - expect(savedMap['job-1']).toBeUndefined(); - expect(savedMap['job-2']).toBe('main'); - }); - }); - - describe('triggerJob', () => { - it('preserves agentId from currentJobs after refresh', async () => { - useCronStore.setState({ - jobs: [ - { id: 'job-trigger', name: 'Triggered', agentId: 'typ-2', schedule: '0 9 * * *', enabled: true, message: 'Hi', delivery: { mode: 'none' }, createdAt: '', updatedAt: '' }, - ], - }); - - mockHostApiFetch.mockResolvedValueOnce({}); // trigger call - // fetchJobs after trigger returns same job but without agentId - mockHostApiFetch.mockResolvedValueOnce([ - { id: 'job-trigger', name: 'Triggered', agentId: 'main', schedule: '0 9 * * *', enabled: true, message: 'Hi', delivery: { mode: 'none' }, createdAt: '', updatedAt: '', lastRun: { time: new Date().toISOString(), success: true } }, - ]); - - await useCronStore.getState().triggerJob('job-trigger'); - - const job = useCronStore.getState().jobs.find(j => j.id === 'job-trigger'); - expect(job?.agentId).toBe('typ-2'); - }); - }); -});