support reasoning agentid by accountId or session for cron (#847)
This commit is contained in:
committed by
GitHub
Unverified
parent
54ec784545
commit
758a8f8c94
@@ -1,4 +1,5 @@
|
||||
import { readFile, readdir } from 'node:fs/promises';
|
||||
import { extractSessionRecords } from '../../utils/session-util';
|
||||
import type { IncomingMessage, ServerResponse } from 'http';
|
||||
import { join } from 'node:path';
|
||||
import {
|
||||
@@ -679,16 +680,6 @@ function inferTargetKindFromValue(
|
||||
return 'user';
|
||||
}
|
||||
|
||||
function extractSessionRecords(store: JsonRecord): JsonRecord[] {
|
||||
const directEntries = Object.entries(store)
|
||||
.filter(([key, value]) => key !== 'sessions' && value && typeof value === 'object')
|
||||
.map(([, value]) => value as JsonRecord);
|
||||
const arrayEntries = Array.isArray(store.sessions)
|
||||
? store.sessions.filter((entry): entry is JsonRecord => Boolean(entry && typeof entry === 'object'))
|
||||
: [];
|
||||
return [...directEntries, ...arrayEntries];
|
||||
}
|
||||
|
||||
function buildChannelTargetCacheKey(params: {
|
||||
channelType: string;
|
||||
accountId?: string;
|
||||
|
||||
@@ -4,8 +4,14 @@ import { join } from 'node:path';
|
||||
import type { HostApiContext } from '../context';
|
||||
import { parseJsonBody, sendJson } from '../route-utils';
|
||||
import { getOpenClawConfigDir } from '../../utils/paths';
|
||||
import { resolveAccountIdFromSessionHistory } from '../../utils/session-util';
|
||||
import { toOpenClawChannelType, toUiChannelType } from '../../utils/channel-alias';
|
||||
import { resolveAgentIdFromChannel } from '../../utils/agent-config';
|
||||
|
||||
/**
|
||||
* Find agentId from session history by delivery "to" address.
|
||||
* Efficiently searches only agent session directories for matching deliveryContext.to.
|
||||
*/
|
||||
interface GatewayCronJob {
|
||||
id: string;
|
||||
name: string;
|
||||
@@ -461,6 +467,14 @@ export async function handleCronRoutes(
|
||||
const result = await ctx.gatewayManager.rpc('cron.list', { includeDisabled: true }, 8000);
|
||||
const data = result as { jobs?: GatewayCronJob[] };
|
||||
jobs = data?.jobs ?? (Array.isArray(result) ? result as GatewayCronJob[] : []);
|
||||
|
||||
// DEBUG: log name and agentId for each job
|
||||
console.debug('Fetched cron jobs from Gateway:');
|
||||
for (const job of jobs) {
|
||||
const jobAgentId = (job as unknown as { agentId?: string }).agentId;
|
||||
const deliveryInfo = job.delivery ? `delivery={mode:${job.delivery.mode}, channel:${job.delivery.channel || '(none)'}, accountId:${job.delivery.accountId || '(none)'}, to:${job.delivery.to || '(none)'}}` : 'delivery=(none)';
|
||||
console.debug(` - name: "${job.name}", agentId: "${jobAgentId || '(undefined)'}", ${deliveryInfo}, sessionTarget: "${job.sessionTarget || '(none)'}", payload.kind: "${job.payload?.kind || '(none)'}"`);
|
||||
}
|
||||
} catch {
|
||||
// Fallback: read cron.json directly when Gateway RPC fails/times out.
|
||||
try {
|
||||
@@ -477,7 +491,8 @@ export async function handleCronRoutes(
|
||||
|
||||
// Run repair in background — don't block the response.
|
||||
if (!usedFallback && jobs.length > 0) {
|
||||
const jobsToRepair = jobs.filter((job) => {
|
||||
// Repair 1: delivery channel missing
|
||||
const jobsToRepairDelivery = jobs.filter((job) => {
|
||||
const isIsolatedAgent =
|
||||
(job.sessionTarget === 'isolated' || !job.sessionTarget) &&
|
||||
job.payload?.kind === 'agentTurn';
|
||||
@@ -487,10 +502,10 @@ export async function handleCronRoutes(
|
||||
!job.delivery?.channel
|
||||
);
|
||||
});
|
||||
if (jobsToRepair.length > 0) {
|
||||
if (jobsToRepairDelivery.length > 0) {
|
||||
// Fire-and-forget: repair in background
|
||||
void (async () => {
|
||||
for (const job of jobsToRepair) {
|
||||
for (const job of jobsToRepairDelivery) {
|
||||
try {
|
||||
await ctx.gatewayManager.rpc('cron.update', {
|
||||
id: job.id,
|
||||
@@ -502,7 +517,7 @@ export async function handleCronRoutes(
|
||||
}
|
||||
})();
|
||||
// Optimistically fix the response data
|
||||
for (const job of jobsToRepair) {
|
||||
for (const job of jobsToRepairDelivery) {
|
||||
job.delivery = { mode: 'none' };
|
||||
if (job.state?.lastError?.includes('Channel is required')) {
|
||||
job.state.lastError = undefined;
|
||||
@@ -510,6 +525,68 @@ export async function handleCronRoutes(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Repair 2: agentId is undefined for jobs with announce delivery
|
||||
// Only repair undefined -> inferred agent, NOT main -> inferred agent
|
||||
const jobsToRepairAgent = jobs.filter((job) => {
|
||||
const jobAgentId = (job as unknown as { agentId?: string }).agentId;
|
||||
return (
|
||||
(job.sessionTarget === 'isolated' || !job.sessionTarget) &&
|
||||
job.payload?.kind === 'agentTurn' &&
|
||||
job.delivery?.mode === 'announce' &&
|
||||
job.delivery?.channel &&
|
||||
jobAgentId === undefined // Only repair when agentId is completely undefined
|
||||
);
|
||||
});
|
||||
if (jobsToRepairAgent.length > 0) {
|
||||
console.debug(`Found ${jobsToRepairAgent.length} jobs needing agent repair:`);
|
||||
for (const job of jobsToRepairAgent) {
|
||||
console.debug(` - Job "${job.name}" (id: ${job.id}): current agentId="${(job as unknown as { agentId?: string }).agentId || '(undefined)'}", channel="${job.delivery?.channel}", accountId="${job.delivery?.accountId || '(none)'}"`);
|
||||
}
|
||||
// Fire-and-forget: repair in background
|
||||
void (async () => {
|
||||
for (const job of jobsToRepairAgent) {
|
||||
try {
|
||||
const channel = toOpenClawChannelType(job.delivery!.channel!);
|
||||
const accountId = job.delivery!.accountId;
|
||||
const toAddress = job.delivery!.to;
|
||||
|
||||
// Try 1: resolve from channel + accountId binding
|
||||
let correctAgentId = await resolveAgentIdFromChannel(channel, accountId);
|
||||
|
||||
// If no accountId, try to resolve it from session history using "to" address, then get agentId
|
||||
let resolvedAccountId: string | null = null;
|
||||
if (!correctAgentId && !accountId && toAddress) {
|
||||
console.debug(`No binding found for channel="${channel}", accountId="${accountId || '(none)'}", trying session history for to="${toAddress}"`);
|
||||
resolvedAccountId = await resolveAccountIdFromSessionHistory(toAddress, channel);
|
||||
if (resolvedAccountId) {
|
||||
console.debug(`Resolved accountId="${resolvedAccountId}" from session history, now resolving agentId`);
|
||||
correctAgentId = await resolveAgentIdFromChannel(channel, resolvedAccountId);
|
||||
}
|
||||
}
|
||||
|
||||
if (correctAgentId) {
|
||||
console.debug(`Repairing job "${job.name}": agentId "${(job as unknown as { agentId?: string }).agentId || '(undefined)'}" -> "${correctAgentId}"`);
|
||||
// When accountId was resolved via to address, include it in the patch
|
||||
const patch: Record<string, unknown> = { agentId: correctAgentId };
|
||||
if (resolvedAccountId && !accountId) {
|
||||
patch.delivery = { accountId: resolvedAccountId };
|
||||
}
|
||||
await ctx.gatewayManager.rpc('cron.update', { id: job.id, patch });
|
||||
// Update the local job object so response reflects correct agentId
|
||||
(job as unknown as { agentId: string }).agentId = correctAgentId;
|
||||
if (resolvedAccountId && !accountId && job.delivery) {
|
||||
job.delivery.accountId = resolvedAccountId;
|
||||
}
|
||||
} else {
|
||||
console.warn(`Could not resolve agent for job "${job.name}": channel="${channel}", accountId="${accountId || '(none)'}", to="${toAddress || '(none)'}"`);
|
||||
}
|
||||
} catch (error) {
|
||||
console.error(`Failed to repair agent for job "${job.name}":`, error);
|
||||
}
|
||||
}
|
||||
})();
|
||||
}
|
||||
}
|
||||
|
||||
sendJson(res, 200, jobs.map((job) => ({ ...transformCronJob(job), ...(usedFallback ? { _fromFallback: true } : {}) })));
|
||||
@@ -532,6 +609,8 @@ export async function handleCronRoutes(
|
||||
const agentId = typeof input.agentId === 'string' && input.agentId.trim()
|
||||
? input.agentId.trim()
|
||||
: 'main';
|
||||
// DEBUG: log the input and resolved agentId
|
||||
console.debug(`Creating cron job: name="${input.name}", input.agentId="${input.agentId || '(not provided)'}", resolved agentId="${agentId}"`);
|
||||
const delivery = normalizeCronDelivery(input.delivery);
|
||||
const unsupportedDeliveryError = getUnsupportedCronDeliveryError(delivery.channel);
|
||||
if (delivery.mode === 'announce' && unsupportedDeliveryError) {
|
||||
|
||||
@@ -22,6 +22,8 @@ import {
|
||||
import { syncProxyConfigToOpenClaw } from '../utils/openclaw-proxy';
|
||||
import { buildOpenClawControlUiUrl } from '../utils/openclaw-control-ui';
|
||||
import { logger } from '../utils/logger';
|
||||
import { resolveAgentIdFromChannel } from '../utils/agent-config';
|
||||
import { resolveAccountIdFromSessionHistory } from '../utils/session-util';
|
||||
import {
|
||||
saveChannelConfig,
|
||||
getChannelConfig,
|
||||
@@ -891,8 +893,7 @@ function registerCronHandlers(gatewayManager: GatewayManager): void {
|
||||
ipcMain.handle('cron:list', async () => {
|
||||
try {
|
||||
const result = await gatewayManager.rpc('cron.list', { includeDisabled: true });
|
||||
const data = result as { jobs?: GatewayCronJob[] };
|
||||
const jobs = data?.jobs ?? [];
|
||||
const jobs = Array.isArray(result) ? result : (result as { jobs?: GatewayCronJob[] })?.jobs ?? [];
|
||||
|
||||
// Auto-repair legacy UI-created jobs that were saved without
|
||||
// delivery: { mode: 'none' }. The Gateway auto-normalizes them
|
||||
@@ -1031,6 +1032,65 @@ function registerCronHandlers(gatewayManager: GatewayManager): void {
|
||||
throw error;
|
||||
}
|
||||
});
|
||||
|
||||
// Periodic cron job repair: checks for jobs with undefined agentId and repairs them
|
||||
// This handles cases where cron jobs were created via openclaw CLI without specifying agent
|
||||
const CRON_AGENT_REPAIR_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes
|
||||
let _lastRepairErrorLogAt = 0;
|
||||
const REPAIR_ERROR_LOG_INTERVAL_MS = 60 * 60 * 1000; // 1 hour
|
||||
setInterval(async () => {
|
||||
try {
|
||||
const status = gatewayManager.getStatus();
|
||||
if (status.state !== 'running') return;
|
||||
|
||||
const result = await gatewayManager.rpc('cron.list', { includeDisabled: true });
|
||||
const jobs = Array.isArray(result)
|
||||
? result
|
||||
: (result as { jobs?: Array<{ id: string; name: string; sessionTarget?: string; payload?: { kind: string }; delivery?: { mode: string; channel?: string; to?: string; accountId?: string }; state?: Record<string, unknown> }> })?.jobs ?? [];
|
||||
|
||||
for (const job of jobs) {
|
||||
const jobAgentId = (job as unknown as { agentId?: string }).agentId;
|
||||
if (
|
||||
(job.sessionTarget === 'isolated' || !job.sessionTarget) &&
|
||||
job.payload?.kind === 'agentTurn' &&
|
||||
job.delivery?.mode === 'announce' &&
|
||||
job.delivery?.channel &&
|
||||
jobAgentId === undefined
|
||||
) {
|
||||
const channel = job.delivery.channel;
|
||||
const accountId = job.delivery.accountId;
|
||||
const toAddress = job.delivery.to;
|
||||
|
||||
let correctAgentId = await resolveAgentIdFromChannel(channel, accountId);
|
||||
|
||||
// If no accountId, try to resolve it from session history
|
||||
let resolvedAccountId: string | null = null;
|
||||
if (!correctAgentId && !accountId && toAddress) {
|
||||
resolvedAccountId = await resolveAccountIdFromSessionHistory(toAddress, channel);
|
||||
if (resolvedAccountId) {
|
||||
correctAgentId = await resolveAgentIdFromChannel(channel, resolvedAccountId);
|
||||
}
|
||||
}
|
||||
|
||||
if (correctAgentId) {
|
||||
console.debug(`Periodic repair: job "${job.name}" agentId undefined -> "${correctAgentId}"`);
|
||||
// When accountId was resolved via to address, include it in the patch
|
||||
const patch: Record<string, unknown> = { agentId: correctAgentId };
|
||||
if (resolvedAccountId && !accountId) {
|
||||
patch.delivery = { accountId: resolvedAccountId };
|
||||
}
|
||||
await gatewayManager.rpc('cron.update', { id: job.id, patch });
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
const now = Date.now();
|
||||
if (now - _lastRepairErrorLogAt >= REPAIR_ERROR_LOG_INTERVAL_MS) {
|
||||
_lastRepairErrorLogAt = now;
|
||||
console.debug('Periodic cron repair error:', error);
|
||||
}
|
||||
}
|
||||
}, CRON_AGENT_REPAIR_INTERVAL_MS);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -555,6 +555,25 @@ export async function listConfiguredAgentIds(): Promise<string[]> {
|
||||
return ids.length > 0 ? ids : [MAIN_AGENT_ID];
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve agentId from channel and accountId using bindings.
|
||||
* Returns the agentId if found, or null if no binding exists.
|
||||
*/
|
||||
export async function resolveAgentIdFromChannel(channel: string, accountId?: string): Promise<string | null> {
|
||||
const config = await readOpenClawConfig() as AgentConfigDocument;
|
||||
const { channelToAgent, accountToAgent } = getChannelBindingMap(config.bindings);
|
||||
|
||||
// First try account-specific binding
|
||||
if (accountId) {
|
||||
const agentId = accountToAgent.get(`${channel}:${accountId}`);
|
||||
if (agentId) return agentId;
|
||||
}
|
||||
|
||||
// Fallback to channel-only binding
|
||||
const agentId = channelToAgent.get(channel);
|
||||
return agentId ?? null;
|
||||
}
|
||||
|
||||
export async function createAgent(
|
||||
name: string,
|
||||
options?: { inheritWorkspace?: boolean },
|
||||
|
||||
79
electron/utils/session-util.ts
Normal file
79
electron/utils/session-util.ts
Normal file
@@ -0,0 +1,79 @@
|
||||
/**
|
||||
* Shared session utilities
|
||||
*/
|
||||
import { readFile, readdir } from 'node:fs/promises';
|
||||
import { join } from 'node:path';
|
||||
import { getOpenClawConfigDir } from './paths';
|
||||
|
||||
type JsonRecord = Record<string, unknown>;
|
||||
|
||||
/**
|
||||
* Parse sessions.json supporting both formats:
|
||||
* - Object-keyed: { "agent:xxx:yyy": { deliveryContext: {...} } }
|
||||
* - Array format: { sessions: [...] }
|
||||
*/
|
||||
export function extractSessionRecords(store: JsonRecord): JsonRecord[] {
|
||||
const directEntries = Object.entries(store)
|
||||
.filter(([key, value]) => key !== 'sessions' && value && typeof value === 'object')
|
||||
.map(([, value]) => value as JsonRecord);
|
||||
const arrayEntries = Array.isArray(store.sessions)
|
||||
? store.sessions.filter((entry): entry is JsonRecord => Boolean(entry && typeof entry === 'object'))
|
||||
: [];
|
||||
return [...directEntries, ...arrayEntries];
|
||||
}
|
||||
|
||||
/**
|
||||
* Find accountId from session history by "to" address and channel type.
|
||||
* Searches all agent session directories for a matching deliveryContext.
|
||||
*/
|
||||
export async function resolveAccountIdFromSessionHistory(
|
||||
toAddress: string,
|
||||
channelType: string,
|
||||
): Promise<string | null> {
|
||||
const agentsDir = join(getOpenClawConfigDir(), 'agents');
|
||||
|
||||
let agentDirs: Array<{ name: string; isDirectory: () => boolean }>;
|
||||
try {
|
||||
agentDirs = await readdir(agentsDir, { withFileTypes: true });
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
|
||||
for (const entry of agentDirs) {
|
||||
if (!entry.isDirectory()) continue;
|
||||
|
||||
const sessionsPath = join(agentsDir, entry.name, 'sessions', 'sessions.json');
|
||||
let raw: string;
|
||||
try {
|
||||
raw = await readFile(sessionsPath, 'utf8');
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!raw.trim()) continue;
|
||||
|
||||
let parsed: Record<string, unknown>;
|
||||
try {
|
||||
parsed = JSON.parse(raw);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (const session of extractSessionRecords(parsed as JsonRecord)) {
|
||||
const deliveryContext = session.deliveryContext as Record<string, unknown> | undefined;
|
||||
if (
|
||||
deliveryContext &&
|
||||
typeof deliveryContext.to === 'string' &&
|
||||
deliveryContext.to === toAddress &&
|
||||
typeof deliveryContext.channel === 'string' &&
|
||||
deliveryContext.channel === channelType
|
||||
) {
|
||||
if (typeof deliveryContext.accountId === 'string') {
|
||||
return deliveryContext.accountId;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
Reference in New Issue
Block a user