Files
admin c629646b9f Complete Agent Pipeline System with Claude Code & OpenClaw Integration
- Added Claude Code integration with full context compaction support
- Added OpenClaw integration with deterministic pipeline support
- Implemented parallel agent execution (4 projects x 3 roles pattern)
- Added workspace isolation with permissions and quotas
- Implemented Lobster-compatible YAML workflow parser
- Added persistent memory store for cross-session context
- Created comprehensive README with hero section

This project was 100% autonomously built by Z.AI GLM-5
2026-03-03 13:12:14 +00:00

533 lines
13 KiB
TypeScript

/**
* Agent Orchestration Module
*
* Manages agent lifecycle, task routing, inter-agent communication,
* and coordinated execution of complex multi-agent workflows.
*/
import { randomUUID } from 'crypto';
export type AgentStatus = 'idle' | 'working' | 'waiting' | 'completed' | 'failed';
export type TaskPriority = 'low' | 'medium' | 'high' | 'critical';
export type TaskStatus = 'pending' | 'running' | 'completed' | 'failed' | 'cancelled';
export interface AgentConfig {
id: string;
name: string;
type: string;
capabilities: string[];
maxConcurrentTasks: number;
timeout: number;
metadata?: Record<string, unknown>;
}
export interface Task {
id: string;
type: string;
description: string;
priority: TaskPriority;
status: TaskStatus;
assignedAgent?: string;
input: unknown;
output?: unknown;
error?: string;
createdAt: Date;
startedAt?: Date;
completedAt?: Date;
dependencies: string[];
metadata?: Record<string, unknown>;
}
export interface AgentState {
config: AgentConfig;
status: AgentStatus;
currentTasks: string[];
completedTasks: number;
failedTasks: number;
lastActivity?: Date;
}
export interface OrchestratorEvent {
type: 'task_created' | 'task_assigned' | 'task_completed' | 'task_failed' |
'agent_registered' | 'agent_status_changed';
timestamp: Date;
data: unknown;
}
export type EventHandler = (event: OrchestratorEvent) => void | Promise<void>;
interface TaskQueue {
pending: Task[];
running: Map<string, Task>;
completed: Task[];
failed: Task[];
}
/**
* AgentOrchestrator - Central coordinator for multi-agent systems
*/
export class AgentOrchestrator {
private agents: Map<string, AgentState> = new Map();
private tasks: TaskQueue = {
pending: [],
running: new Map(),
completed: [],
failed: []
};
private eventHandlers: Map<string, EventHandler[]> = new Map();
private taskProcessors: Map<string, (task: Task) => Promise<unknown>> = new Map();
private running = false;
private processInterval?: ReturnType<typeof setInterval>;
constructor() {
this.registerDefaultProcessors();
}
/**
* Register a new agent
*/
registerAgent(config: AgentConfig): AgentState {
const state: AgentState = {
config,
status: 'idle',
currentTasks: [],
completedTasks: 0,
failedTasks: 0
};
this.agents.set(config.id, state);
this.emit('agent_registered', { agent: state });
return state;
}
/**
* Unregister an agent
*/
unregisterAgent(agentId: string): boolean {
const agent = this.agents.get(agentId);
if (!agent) return false;
// Reassign any tasks the agent was working on
for (const taskId of agent.currentTasks) {
const task = this.tasks.running.get(taskId);
if (task) {
task.status = 'pending';
task.assignedAgent = undefined;
this.tasks.pending.push(task);
this.tasks.running.delete(taskId);
}
}
this.agents.delete(agentId);
return true;
}
/**
* Get agent state
*/
getAgent(agentId: string): AgentState | undefined {
return this.agents.get(agentId);
}
/**
* Get all agents
*/
getAllAgents(): AgentState[] {
return Array.from(this.agents.values());
}
/**
* Create a new task
*/
createTask(
type: string,
description: string,
input: unknown,
options: {
priority?: TaskPriority;
dependencies?: string[];
assignedAgent?: string;
metadata?: Record<string, unknown>;
} = {}
): Task {
const task: Task = {
id: randomUUID(),
type,
description,
priority: options.priority || 'medium',
status: 'pending',
input,
createdAt: new Date(),
dependencies: options.dependencies || [],
assignedAgent: options.assignedAgent,
metadata: options.metadata
};
this.tasks.pending.push(task);
this.emit('task_created', { task });
// Auto-assign if agent specified
if (options.assignedAgent) {
this.assignTask(task.id, options.assignedAgent);
}
return task;
}
/**
* Assign a task to a specific agent
*/
assignTask(taskId: string, agentId: string): boolean {
const agent = this.agents.get(agentId);
if (!agent) return false;
if (agent.currentTasks.length >= agent.config.maxConcurrentTasks) {
return false;
}
const taskIndex = this.tasks.pending.findIndex(t => t.id === taskId);
if (taskIndex === -1) return false;
const task = this.tasks.pending[taskIndex];
task.assignedAgent = agentId;
this.emit('task_assigned', { task, agent });
return true;
}
/**
* Get task by ID
*/
getTask(taskId: string): Task | undefined {
return (
this.tasks.pending.find(t => t.id === taskId) ||
this.tasks.running.get(taskId) ||
this.tasks.completed.find(t => t.id === taskId) ||
this.tasks.failed.find(t => t.id === taskId)
);
}
/**
* Get all tasks by status
*/
getTasksByStatus(status: TaskStatus): Task[] {
switch (status) {
case 'pending':
return [...this.tasks.pending];
case 'running':
return Array.from(this.tasks.running.values());
case 'completed':
return [...this.tasks.completed];
case 'failed':
return [...this.tasks.failed];
case 'cancelled':
return [...this.tasks.failed.filter(t => t.error === 'Cancelled')];
}
}
/**
* Register a task processor
*/
registerProcessor(
taskType: string,
processor: (task: Task) => Promise<unknown>
): void {
this.taskProcessors.set(taskType, processor);
}
/**
* Start the orchestrator
*/
start(): void {
if (this.running) return;
this.running = true;
this.processInterval = setInterval(() => this.process(), 100);
}
/**
* Stop the orchestrator
*/
stop(): void {
this.running = false;
if (this.processInterval) {
clearInterval(this.processInterval);
}
}
/**
* Process pending tasks
*/
private async process(): Promise<void> {
if (!this.running) return;
// Get tasks ready to run (dependencies satisfied)
const readyTasks = this.getReadyTasks();
for (const task of readyTasks) {
// Find available agent
const agent = this.findAvailableAgent(task);
if (!agent) continue;
// Move task to running
const taskIndex = this.tasks.pending.indexOf(task);
if (taskIndex > -1) {
this.tasks.pending.splice(taskIndex, 1);
}
task.status = 'running';
task.startedAt = new Date();
task.assignedAgent = agent.config.id;
this.tasks.running.set(task.id, task);
agent.currentTasks.push(task.id);
agent.status = 'working';
agent.lastActivity = new Date();
this.updateAgentStatus(agent.config.id, 'working');
// Execute task
this.executeTask(task, agent);
}
}
/**
* Get tasks that are ready to run
*/
private getReadyTasks(): Task[] {
return this.tasks.pending
.filter(task => {
// Check dependencies
for (const depId of task.dependencies) {
const depTask = this.getTask(depId);
if (!depTask || depTask.status !== 'completed') {
return false;
}
}
return true;
})
.sort((a, b) => {
// Sort by priority
const priorityOrder = { critical: 0, high: 1, medium: 2, low: 3 };
return priorityOrder[a.priority] - priorityOrder[b.priority];
});
}
/**
* Find an available agent for a task
*/
private findAvailableAgent(task: Task): AgentState | undefined {
// If task is pre-assigned, use that agent
if (task.assignedAgent) {
const agent = this.agents.get(task.assignedAgent);
if (agent && agent.currentTasks.length < agent.config.maxConcurrentTasks) {
return agent;
}
}
// Find best available agent
const availableAgents = Array.from(this.agents.values())
.filter(a =>
a.currentTasks.length < a.config.maxConcurrentTasks &&
a.config.capabilities.includes(task.type)
)
.sort((a, b) => {
// Prefer agents with fewer current tasks
return a.currentTasks.length - b.currentTasks.length;
});
return availableAgents[0];
}
/**
* Execute a task
*/
private async executeTask(task: Task, agent: AgentState): Promise<void> {
const processor = this.taskProcessors.get(task.type);
try {
if (!processor) {
throw new Error(`No processor registered for task type: ${task.type}`);
}
const output = await Promise.race([
processor(task),
this.createTimeout(task.id, agent.config.timeout)
]);
task.output = output;
task.status = 'completed';
task.completedAt = new Date();
this.tasks.running.delete(task.id);
this.tasks.completed.push(task);
agent.completedTasks++;
agent.lastActivity = new Date();
this.emit('task_completed', { task, agent });
} catch (error) {
task.status = 'failed';
task.error = error instanceof Error ? error.message : String(error);
task.completedAt = new Date();
this.tasks.running.delete(task.id);
this.tasks.failed.push(task);
agent.failedTasks++;
agent.lastActivity = new Date();
this.emit('task_failed', { task, agent, error: task.error });
}
// Remove from agent's current tasks
const taskIdx = agent.currentTasks.indexOf(task.id);
if (taskIdx > -1) {
agent.currentTasks.splice(taskIdx, 1);
}
// Update agent status
if (agent.currentTasks.length === 0) {
this.updateAgentStatus(agent.config.id, 'idle');
}
}
/**
* Create a timeout promise
*/
private createTimeout(taskId: string, timeout: number): Promise<never> {
return new Promise((_, reject) => {
setTimeout(() => reject(new Error(`Task ${taskId} timed out`)), timeout);
});
}
/**
* Update agent status
*/
private updateAgentStatus(agentId: string, status: AgentStatus): void {
const agent = this.agents.get(agentId);
if (agent) {
agent.status = status;
this.emit('agent_status_changed', { agent });
}
}
/**
* Register default task processors
*/
private registerDefaultProcessors(): void {
// Default processors can be registered here
}
/**
* Subscribe to orchestrator events
*/
on(event: OrchestratorEvent['type'], handler: EventHandler): () => void {
if (!this.eventHandlers.has(event)) {
this.eventHandlers.set(event, []);
}
this.eventHandlers.get(event)!.push(handler);
// Return unsubscribe function
return () => {
const handlers = this.eventHandlers.get(event);
if (handlers) {
const idx = handlers.indexOf(handler);
if (idx > -1) handlers.splice(idx, 1);
}
};
}
/**
* Emit an event
*/
private emit(type: OrchestratorEvent['type'], data: unknown): void {
const event: OrchestratorEvent = {
type,
timestamp: new Date(),
data
};
const handlers = this.eventHandlers.get(type) || [];
for (const handler of handlers) {
try {
handler(event);
} catch (error) {
console.error(`Error in event handler for ${type}:`, error);
}
}
}
/**
* Get orchestrator statistics
*/
getStats(): {
agents: { total: number; idle: number; working: number };
tasks: { pending: number; running: number; completed: number; failed: number };
} {
const agentStates = Array.from(this.agents.values());
return {
agents: {
total: agentStates.length,
idle: agentStates.filter(a => a.status === 'idle').length,
working: agentStates.filter(a => a.status === 'working').length
},
tasks: {
pending: this.tasks.pending.length,
running: this.tasks.running.size,
completed: this.tasks.completed.length,
failed: this.tasks.failed.length
}
};
}
/**
* Cancel a task
*/
cancelTask(taskId: string): boolean {
const task = this.tasks.running.get(taskId) ||
this.tasks.pending.find(t => t.id === taskId);
if (!task) return false;
task.status = 'cancelled';
task.error = 'Cancelled';
task.completedAt = new Date();
if (this.tasks.running.has(taskId)) {
this.tasks.running.delete(taskId);
this.tasks.failed.push(task);
} else {
const idx = this.tasks.pending.indexOf(task);
if (idx > -1) this.tasks.pending.splice(idx, 1);
this.tasks.failed.push(task);
}
return true;
}
/**
* Retry a failed task
*/
retryTask(taskId: string): boolean {
const taskIndex = this.tasks.failed.findIndex(t => t.id === taskId);
if (taskIndex === -1) return false;
const task = this.tasks.failed[taskIndex];
task.status = 'pending';
task.error = undefined;
task.startedAt = undefined;
task.completedAt = undefined;
this.tasks.failed.splice(taskIndex, 1);
this.tasks.pending.push(task);
return true;
}
}
// Singleton instance
export const defaultOrchestrator = new AgentOrchestrator();