feat(cron): implement cron session management and logging features, including session key parsing and fallback message handling (#429)
This commit is contained in:
@@ -1,6 +1,9 @@
|
||||
import { readFile } from 'node:fs/promises';
|
||||
import type { IncomingMessage, ServerResponse } from 'http';
|
||||
import { join } from 'node:path';
|
||||
import type { HostApiContext } from '../context';
|
||||
import { parseJsonBody, sendJson } from '../route-utils';
|
||||
import { getOpenClawConfigDir } from '../../utils/paths';
|
||||
|
||||
interface GatewayCronJob {
|
||||
id: string;
|
||||
@@ -15,6 +18,7 @@ interface GatewayCronJob {
|
||||
sessionTarget?: string;
|
||||
state: {
|
||||
nextRunAtMs?: number;
|
||||
runningAtMs?: number;
|
||||
lastRunAtMs?: number;
|
||||
lastStatus?: string;
|
||||
lastError?: string;
|
||||
@@ -22,6 +26,241 @@ interface GatewayCronJob {
|
||||
};
|
||||
}
|
||||
|
||||
interface CronRunLogEntry {
|
||||
jobId?: string;
|
||||
action?: string;
|
||||
status?: string;
|
||||
error?: string;
|
||||
summary?: string;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
ts?: number;
|
||||
runAtMs?: number;
|
||||
durationMs?: number;
|
||||
model?: string;
|
||||
provider?: string;
|
||||
}
|
||||
|
||||
interface CronSessionKeyParts {
|
||||
agentId: string;
|
||||
jobId: string;
|
||||
runSessionId?: string;
|
||||
}
|
||||
|
||||
interface CronSessionFallbackMessage {
|
||||
id: string;
|
||||
role: 'assistant' | 'system';
|
||||
content: string;
|
||||
timestamp: number;
|
||||
isError?: boolean;
|
||||
}
|
||||
|
||||
function parseCronSessionKey(sessionKey: string): CronSessionKeyParts | null {
|
||||
if (!sessionKey.startsWith('agent:')) return null;
|
||||
const parts = sessionKey.split(':');
|
||||
if (parts.length < 4 || parts[2] !== 'cron') return null;
|
||||
|
||||
const agentId = parts[1] || 'main';
|
||||
const jobId = parts[3];
|
||||
if (!jobId) return null;
|
||||
|
||||
if (parts.length === 4) {
|
||||
return { agentId, jobId };
|
||||
}
|
||||
|
||||
if (parts.length === 6 && parts[4] === 'run' && parts[5]) {
|
||||
return { agentId, jobId, runSessionId: parts[5] };
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
function normalizeTimestampMs(value: unknown): number | undefined {
|
||||
if (typeof value === 'number' && Number.isFinite(value)) {
|
||||
return value < 1e12 ? value * 1000 : value;
|
||||
}
|
||||
if (typeof value === 'string' && value.trim()) {
|
||||
const parsed = Date.parse(value);
|
||||
if (Number.isFinite(parsed)) {
|
||||
return parsed;
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function formatDuration(durationMs: number | undefined): string | null {
|
||||
if (!durationMs || !Number.isFinite(durationMs)) return null;
|
||||
if (durationMs < 1000) return `${Math.round(durationMs)}ms`;
|
||||
if (durationMs < 10_000) return `${(durationMs / 1000).toFixed(1)}s`;
|
||||
return `${Math.round(durationMs / 1000)}s`;
|
||||
}
|
||||
|
||||
function buildCronRunMessage(entry: CronRunLogEntry, index: number): CronSessionFallbackMessage | null {
|
||||
const timestamp = normalizeTimestampMs(entry.ts) ?? normalizeTimestampMs(entry.runAtMs);
|
||||
if (!timestamp) return null;
|
||||
|
||||
const status = typeof entry.status === 'string' ? entry.status.toLowerCase() : '';
|
||||
const summary = typeof entry.summary === 'string' ? entry.summary.trim() : '';
|
||||
const error = typeof entry.error === 'string' ? entry.error.trim() : '';
|
||||
let content = summary || error;
|
||||
|
||||
if (!content) {
|
||||
content = status === 'error'
|
||||
? 'Scheduled task failed.'
|
||||
: 'Scheduled task completed.';
|
||||
}
|
||||
|
||||
if (status === 'error' && !content.toLowerCase().startsWith('run failed:')) {
|
||||
content = `Run failed: ${content}`;
|
||||
}
|
||||
|
||||
const meta: string[] = [];
|
||||
const duration = formatDuration(entry.durationMs);
|
||||
if (duration) meta.push(`Duration: ${duration}`);
|
||||
if (entry.provider && entry.model) {
|
||||
meta.push(`Model: ${entry.provider}/${entry.model}`);
|
||||
} else if (entry.model) {
|
||||
meta.push(`Model: ${entry.model}`);
|
||||
}
|
||||
if (meta.length > 0) {
|
||||
content = `${content}\n\n${meta.join(' | ')}`;
|
||||
}
|
||||
|
||||
return {
|
||||
id: `cron-run-${entry.sessionId ?? entry.ts ?? index}`,
|
||||
role: status === 'error' ? 'system' : 'assistant',
|
||||
content,
|
||||
timestamp,
|
||||
...(status === 'error' ? { isError: true } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
async function readCronRunLog(jobId: string): Promise<CronRunLogEntry[]> {
|
||||
const logPath = join(getOpenClawConfigDir(), 'cron', 'runs', `${jobId}.jsonl`);
|
||||
const raw = await readFile(logPath, 'utf8').catch(() => '');
|
||||
if (!raw.trim()) return [];
|
||||
|
||||
const entries: CronRunLogEntry[] = [];
|
||||
for (const line of raw.split(/\r?\n/)) {
|
||||
const trimmed = line.trim();
|
||||
if (!trimmed) continue;
|
||||
try {
|
||||
const entry = JSON.parse(trimmed) as CronRunLogEntry;
|
||||
if (!entry || entry.jobId !== jobId) continue;
|
||||
if (entry.action && entry.action !== 'finished') continue;
|
||||
entries.push(entry);
|
||||
} catch {
|
||||
// Ignore malformed log lines so one bad entry does not hide the rest.
|
||||
}
|
||||
}
|
||||
return entries;
|
||||
}
|
||||
|
||||
async function readSessionStoreEntry(
|
||||
agentId: string,
|
||||
sessionKey: string,
|
||||
): Promise<Record<string, unknown> | undefined> {
|
||||
const storePath = join(getOpenClawConfigDir(), 'agents', agentId, 'sessions', 'sessions.json');
|
||||
const raw = await readFile(storePath, 'utf8').catch(() => '');
|
||||
if (!raw.trim()) return undefined;
|
||||
|
||||
try {
|
||||
const store = JSON.parse(raw) as Record<string, unknown>;
|
||||
const directEntry = store[sessionKey];
|
||||
if (directEntry && typeof directEntry === 'object') {
|
||||
return directEntry as Record<string, unknown>;
|
||||
}
|
||||
|
||||
const sessions = (store as { sessions?: unknown }).sessions;
|
||||
if (Array.isArray(sessions)) {
|
||||
const arrayEntry = sessions.find((entry) => {
|
||||
if (!entry || typeof entry !== 'object') return false;
|
||||
const record = entry as Record<string, unknown>;
|
||||
return record.key === sessionKey || record.sessionKey === sessionKey;
|
||||
});
|
||||
if (arrayEntry && typeof arrayEntry === 'object') {
|
||||
return arrayEntry as Record<string, unknown>;
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}
|
||||
|
||||
export function buildCronSessionFallbackMessages(params: {
|
||||
sessionKey: string;
|
||||
job?: Pick<GatewayCronJob, 'name' | 'payload' | 'state'>;
|
||||
runs: CronRunLogEntry[];
|
||||
sessionEntry?: { label?: string; updatedAt?: number };
|
||||
limit?: number;
|
||||
}): CronSessionFallbackMessage[] {
|
||||
const parsed = parseCronSessionKey(params.sessionKey);
|
||||
if (!parsed) return [];
|
||||
|
||||
const matchingRuns = params.runs
|
||||
.filter((entry) => {
|
||||
if (!parsed.runSessionId) return true;
|
||||
return entry.sessionId === parsed.runSessionId
|
||||
|| entry.sessionKey === `${params.sessionKey}`;
|
||||
})
|
||||
.sort((a, b) => {
|
||||
const left = normalizeTimestampMs(a.ts) ?? normalizeTimestampMs(a.runAtMs) ?? 0;
|
||||
const right = normalizeTimestampMs(b.ts) ?? normalizeTimestampMs(b.runAtMs) ?? 0;
|
||||
return left - right;
|
||||
});
|
||||
|
||||
const messages: CronSessionFallbackMessage[] = [];
|
||||
const prompt = params.job?.payload?.message || params.job?.payload?.text || '';
|
||||
const taskName = params.job?.name?.trim()
|
||||
|| params.sessionEntry?.label?.replace(/^Cron:\s*/, '').trim()
|
||||
|| '';
|
||||
const firstRelevantTimestamp = matchingRuns.length > 0
|
||||
? (normalizeTimestampMs(matchingRuns[0]?.runAtMs) ?? normalizeTimestampMs(matchingRuns[0]?.ts))
|
||||
: (normalizeTimestampMs(params.job?.state?.runningAtMs) ?? params.sessionEntry?.updatedAt);
|
||||
|
||||
if (taskName || prompt) {
|
||||
const lines = [taskName ? `Scheduled task: ${taskName}` : 'Scheduled task'];
|
||||
if (prompt) lines.push(`Prompt: ${prompt}`);
|
||||
messages.push({
|
||||
id: `cron-meta-${parsed.jobId}`,
|
||||
role: 'system',
|
||||
content: lines.join('\n'),
|
||||
timestamp: Math.max(0, (firstRelevantTimestamp ?? Date.now()) - 1),
|
||||
});
|
||||
}
|
||||
|
||||
matchingRuns.forEach((entry, index) => {
|
||||
const message = buildCronRunMessage(entry, index);
|
||||
if (message) messages.push(message);
|
||||
});
|
||||
|
||||
if (matchingRuns.length === 0) {
|
||||
const runningAt = normalizeTimestampMs(params.job?.state?.runningAtMs);
|
||||
if (runningAt) {
|
||||
messages.push({
|
||||
id: `cron-running-${parsed.jobId}`,
|
||||
role: 'system',
|
||||
content: 'This scheduled task is still running in OpenClaw, but no chat transcript is available yet.',
|
||||
timestamp: runningAt,
|
||||
});
|
||||
} else if (messages.length === 0) {
|
||||
messages.push({
|
||||
id: `cron-empty-${parsed.jobId}`,
|
||||
role: 'system',
|
||||
content: 'No chat transcript is available for this scheduled task yet.',
|
||||
timestamp: params.sessionEntry?.updatedAt ?? Date.now(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
const limit = typeof params.limit === 'number' && Number.isFinite(params.limit)
|
||||
? Math.max(1, Math.floor(params.limit))
|
||||
: messages.length;
|
||||
return messages.slice(-limit);
|
||||
}
|
||||
|
||||
function transformCronJob(job: GatewayCronJob) {
|
||||
const message = job.payload?.message || job.payload?.text || '';
|
||||
const channelType = job.delivery?.channel;
|
||||
@@ -60,6 +299,47 @@ export async function handleCronRoutes(
|
||||
url: URL,
|
||||
ctx: HostApiContext,
|
||||
): Promise<boolean> {
|
||||
if (url.pathname === '/api/cron/session-history' && req.method === 'GET') {
|
||||
const sessionKey = url.searchParams.get('sessionKey')?.trim() || '';
|
||||
const parsedSession = parseCronSessionKey(sessionKey);
|
||||
if (!parsedSession) {
|
||||
sendJson(res, 400, { success: false, error: `Invalid cron sessionKey: ${sessionKey}` });
|
||||
return true;
|
||||
}
|
||||
|
||||
const rawLimit = Number(url.searchParams.get('limit') || '200');
|
||||
const limit = Number.isFinite(rawLimit)
|
||||
? Math.min(Math.max(Math.floor(rawLimit), 1), 200)
|
||||
: 200;
|
||||
|
||||
try {
|
||||
const [jobsResult, runs, sessionEntry] = await Promise.all([
|
||||
ctx.gatewayManager.rpc('cron.list', { includeDisabled: true })
|
||||
.catch(() => ({ jobs: [] as GatewayCronJob[] })),
|
||||
readCronRunLog(parsedSession.jobId),
|
||||
readSessionStoreEntry(parsedSession.agentId, sessionKey),
|
||||
]);
|
||||
|
||||
const jobs = (jobsResult as { jobs?: GatewayCronJob[] }).jobs ?? [];
|
||||
const job = jobs.find((item) => item.id === parsedSession.jobId);
|
||||
const messages = buildCronSessionFallbackMessages({
|
||||
sessionKey,
|
||||
job,
|
||||
runs,
|
||||
sessionEntry: sessionEntry ? {
|
||||
label: typeof sessionEntry.label === 'string' ? sessionEntry.label : undefined,
|
||||
updatedAt: normalizeTimestampMs(sessionEntry.updatedAt),
|
||||
} : undefined,
|
||||
limit,
|
||||
});
|
||||
|
||||
sendJson(res, 200, { messages });
|
||||
} catch (error) {
|
||||
sendJson(res, 500, { success: false, error: String(error) });
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
if (url.pathname === '/api/cron/jobs' && req.method === 'GET') {
|
||||
try {
|
||||
const result = await ctx.gatewayManager.rpc('cron.list', { includeDisabled: true });
|
||||
|
||||
Reference in New Issue
Block a user