feat: unify cron delivery account and target selection (#642)

This commit is contained in:
cedric
2026-03-25 10:12:49 +08:00
committed by GitHub
Unverified
parent 9aea3c9441
commit 9d40e1fa05
20 changed files with 2073 additions and 88 deletions

View File

@@ -32,6 +32,7 @@ import {
validateChannelConfig,
validateChannelCredentials,
} from '../utils/channel-config';
import { toOpenClawChannelType, toUiChannelType } from '../utils/channel-alias';
import { checkUvInstalled, installUv, setupManagedPython } from '../utils/uv-setup';
import {
ensureDingTalkPluginInstalled,
@@ -494,7 +495,13 @@ function registerUnifiedRequestHandlers(gatewayManager: GatewayManager): void {
break;
}
if (request.action === 'create') {
type CronCreateInput = { name: string; message: string; schedule: string; enabled?: boolean };
type CronCreateInput = {
name: string;
message: string;
schedule: string;
delivery?: { mode: string; channel?: string; to?: string };
enabled?: boolean;
};
const payload = request.payload as
| { input?: CronCreateInput }
| [CronCreateInput]
@@ -516,8 +523,12 @@ function registerUnifiedRequestHandlers(gatewayManager: GatewayManager): void {
enabled: input.enabled ?? true,
wakeMode: 'next-heartbeat',
sessionTarget: 'isolated',
delivery: { mode: 'none' },
delivery: normalizeCronDelivery(input.delivery),
};
const unsupportedDeliveryError = getUnsupportedCronDeliveryError(gatewayInput.delivery.channel);
if (gatewayInput.delivery.mode === 'announce' && unsupportedDeliveryError) {
throw new Error(unsupportedDeliveryError);
}
const created = await gatewayManager.rpc('cron.add', gatewayInput);
data = created && typeof created === 'object' ? transformCronJob(created as GatewayCronJob) : created;
break;
@@ -530,11 +541,19 @@ function registerUnifiedRequestHandlers(gatewayManager: GatewayManager): void {
const id = Array.isArray(payload) ? payload[0] : payload?.id;
const input = Array.isArray(payload) ? payload[1] : payload?.input;
if (!id || !input) throw new Error('Invalid cron.update payload');
const patch = { ...input };
if (typeof patch.schedule === 'string') patch.schedule = { kind: 'cron', expr: patch.schedule };
if (typeof patch.message === 'string') {
patch.payload = { kind: 'agentTurn', message: patch.message };
delete patch.message;
const patch = buildCronUpdatePatch(input);
const deliveryPatch = patch.delivery && typeof patch.delivery === 'object'
? patch.delivery as Record<string, unknown>
: undefined;
const deliveryChannel = typeof deliveryPatch?.channel === 'string' && deliveryPatch.channel.trim()
? deliveryPatch.channel.trim()
: undefined;
const deliveryMode = typeof deliveryPatch?.mode === 'string' && deliveryPatch.mode.trim()
? deliveryPatch.mode.trim()
: undefined;
const unsupportedDeliveryError = getUnsupportedCronDeliveryError(deliveryChannel);
if (unsupportedDeliveryError && deliveryMode !== 'none') {
throw new Error(unsupportedDeliveryError);
}
data = await gatewayManager.rpc('cron.update', { id, patch });
break;
@@ -716,7 +735,7 @@ interface GatewayCronJob {
updatedAtMs: number;
schedule: { kind: string; expr?: string; everyMs?: number; at?: string; tz?: string };
payload: { kind: string; message?: string; text?: string };
delivery?: { mode: string; channel?: string; to?: string };
delivery?: { mode: string; channel?: string; to?: string; accountId?: string };
sessionTarget?: string;
state: {
nextRunAtMs?: number;
@@ -727,17 +746,109 @@ interface GatewayCronJob {
};
}
type GatewayCronDelivery = NonNullable<GatewayCronJob['delivery']>;
function getUnsupportedCronDeliveryError(channel: string | undefined): string | null {
if (!channel) return null;
return toUiChannelType(channel) === 'wechat'
? 'WeChat scheduled delivery is not supported because the plugin requires a live conversation context token.'
: null;
}
function normalizeCronDelivery(
rawDelivery: unknown,
fallbackMode: GatewayCronDelivery['mode'] = 'none',
): GatewayCronDelivery {
if (!rawDelivery || typeof rawDelivery !== 'object') {
return { mode: fallbackMode };
}
const delivery = rawDelivery as Record<string, unknown>;
const mode = typeof delivery.mode === 'string' && delivery.mode.trim()
? delivery.mode.trim()
: fallbackMode;
const channel = typeof delivery.channel === 'string' && delivery.channel.trim()
? toOpenClawChannelType(delivery.channel.trim())
: undefined;
const to = typeof delivery.to === 'string' && delivery.to.trim()
? delivery.to.trim()
: undefined;
const accountId = typeof delivery.accountId === 'string' && delivery.accountId.trim()
? delivery.accountId.trim()
: undefined;
if (mode === 'announce' && !channel) {
return { mode: 'none' };
}
return {
mode,
...(channel ? { channel } : {}),
...(to ? { to } : {}),
...(accountId ? { accountId } : {}),
};
}
function normalizeCronDeliveryPatch(rawDelivery: unknown): Record<string, unknown> {
if (!rawDelivery || typeof rawDelivery !== 'object') {
return {};
}
const delivery = rawDelivery as Record<string, unknown>;
const patch: Record<string, unknown> = {};
if ('mode' in delivery) {
patch.mode = typeof delivery.mode === 'string' && delivery.mode.trim()
? delivery.mode.trim()
: 'none';
}
if ('channel' in delivery) {
patch.channel = typeof delivery.channel === 'string' && delivery.channel.trim()
? toOpenClawChannelType(delivery.channel.trim())
: '';
}
if ('to' in delivery) {
patch.to = typeof delivery.to === 'string' ? delivery.to : '';
}
if ('accountId' in delivery) {
patch.accountId = typeof delivery.accountId === 'string' ? delivery.accountId : '';
}
return patch;
}
function buildCronUpdatePatch(input: Record<string, unknown>): Record<string, unknown> {
const patch = { ...input };
if (typeof patch.schedule === 'string') {
patch.schedule = { kind: 'cron', expr: patch.schedule };
}
if (typeof patch.message === 'string') {
patch.payload = { kind: 'agentTurn', message: patch.message };
delete patch.message;
}
if ('delivery' in patch) {
patch.delivery = normalizeCronDeliveryPatch(patch.delivery);
}
return patch;
}
/**
* Transform a Gateway CronJob to the frontend CronJob format
*/
function transformCronJob(job: GatewayCronJob) {
// Extract message from payload
const message = job.payload?.message || job.payload?.text || '';
const gatewayDelivery = normalizeCronDelivery(job.delivery);
const channelType = gatewayDelivery.channel ? toUiChannelType(gatewayDelivery.channel) : undefined;
const delivery = channelType
? { ...gatewayDelivery, channel: channelType }
: gatewayDelivery;
// Build target from delivery info — only if a delivery channel is specified
const channelType = job.delivery?.channel;
const target = channelType
? { channelType, channelId: channelType, channelName: channelType }
? { channelType, channelId: delivery.accountId || gatewayDelivery.channel, channelName: channelType, recipient: delivery.to }
: undefined;
// Build lastRun from state
@@ -760,6 +871,7 @@ function transformCronJob(job: GatewayCronJob) {
name: job.name,
message,
schedule: job.schedule, // Pass the object through; frontend parseCronSchedule handles it
delivery,
target,
enabled: job.enabled,
createdAt: new Date(job.createdAtMs).toISOString(),
@@ -831,6 +943,7 @@ function registerCronHandlers(gatewayManager: GatewayManager): void {
name: string;
message: string;
schedule: string;
delivery?: GatewayCronDelivery;
enabled?: boolean;
}) => {
try {
@@ -845,8 +958,12 @@ function registerCronHandlers(gatewayManager: GatewayManager): void {
// not external messaging channels. Setting mode='none' prevents
// the Gateway from attempting channel delivery (which would fail
// with "Channel is required" when no channels are configured).
delivery: { mode: 'none' },
delivery: normalizeCronDelivery(input.delivery),
};
const unsupportedDeliveryError = getUnsupportedCronDeliveryError(gatewayInput.delivery.channel);
if (gatewayInput.delivery.mode === 'announce' && unsupportedDeliveryError) {
throw new Error(unsupportedDeliveryError);
}
const result = await gatewayManager.rpc('cron.add', gatewayInput);
// Transform the returned job to frontend format
if (result && typeof result === 'object') {
@@ -862,18 +979,22 @@ function registerCronHandlers(gatewayManager: GatewayManager): void {
// Update an existing cron job
ipcMain.handle('cron:update', async (_, id: string, input: Record<string, unknown>) => {
try {
// Transform schedule string to CronSchedule object if present
const patch = { ...input };
if (typeof patch.schedule === 'string') {
patch.schedule = { kind: 'cron', expr: patch.schedule };
}
// Transform message to payload format if present
if (typeof patch.message === 'string') {
patch.payload = { kind: 'agentTurn', message: patch.message };
delete patch.message;
const patch = buildCronUpdatePatch(input);
const deliveryPatch = patch.delivery && typeof patch.delivery === 'object'
? patch.delivery as Record<string, unknown>
: undefined;
const deliveryChannel = typeof deliveryPatch?.channel === 'string' && deliveryPatch.channel.trim()
? deliveryPatch.channel.trim()
: undefined;
const deliveryMode = typeof deliveryPatch?.mode === 'string' && deliveryPatch.mode.trim()
? deliveryPatch.mode.trim()
: undefined;
const unsupportedDeliveryError = getUnsupportedCronDeliveryError(deliveryChannel);
if (unsupportedDeliveryError && deliveryMode !== 'none') {
throw new Error(unsupportedDeliveryError);
}
const result = await gatewayManager.rpc('cron.update', { id, patch });
return result;
return result && typeof result === 'object' ? transformCronJob(result as GatewayCronJob) : result;
} catch (error) {
console.error('Failed to update cron job:', error);
throw error;