/** * Parallel Execution Engine * * Manages concurrent agent sessions with resource pooling. * Supports: 4 projects × 3 roles = up to 12 concurrent sessions. * * Key features: * - Worker pool with configurable concurrency limits * - Resource isolation per agent session * - Automatic scaling based on load * - Task queuing with priority support */ import { randomUUID } from 'crypto'; import { EventEmitter } from 'events'; // ============================================================================ // Types // ============================================================================ export type AgentRole = 'programmer' | 'reviewer' | 'tester' | 'planner' | 'analyst' | 'custom'; export type TaskStatus = 'pending' | 'queued' | 'running' | 'completed' | 'failed' | 'cancelled'; export type WorkerStatus = 'idle' | 'busy' | 'draining' | 'terminated'; export interface AgentSession { id: string; projectId: string; role: AgentRole; model?: string; // e.g., 'opus', 'sonnet' for cost optimization workspace: string; tools: string[]; memory: Record; identity: AgentIdentity; status: 'active' | 'idle' | 'terminated'; createdAt: Date; lastActivity: Date; } export interface AgentIdentity { name: string; description: string; personality?: string; systemPrompt?: string; } export interface PipelineTask { id: string; projectId: string; role: AgentRole; type: string; description: string; priority: 'low' | 'medium' | 'high' | 'critical'; input: unknown; dependencies: string[]; timeout: number; retryCount: number; maxRetries: number; status: TaskStatus; assignedWorker?: string; result?: unknown; error?: string; createdAt: Date; startedAt?: Date; completedAt?: Date; metadata?: Record; } export interface Worker { id: string; status: WorkerStatus; currentTask?: string; sessions: Map; completedTasks: number; failedTasks: number; createdAt: Date; lastActivity: Date; } export interface ExecutionConfig { maxWorkers: number; maxConcurrentPerWorker: number; taskTimeout: number; retryAttempts: number; retryDelay: number; drainTimeout: number; } export interface ExecutionResult { taskId: string; success: boolean; output?: unknown; error?: string; duration: number; workerId: string; sessionId: string; } // ============================================================================ // Parallel Executor // ============================================================================ /** * ParallelExecutionEngine - Manages concurrent agent sessions */ export class ParallelExecutionEngine extends EventEmitter { private config: ExecutionConfig; private workers: Map = new Map(); private taskQueue: PipelineTask[] = []; private runningTasks: Map = new Map(); private completedTasks: PipelineTask[] = []; private failedTasks: PipelineTask[] = []; private sessions: Map = new Map(); private processing = false; private processInterval?: ReturnType; private taskHandlers: Map Promise> = new Map(); constructor(config?: Partial) { super(); this.config = { maxWorkers: config?.maxWorkers || 4, maxConcurrentPerWorker: config?.maxConcurrentPerWorker || 3, taskTimeout: config?.taskTimeout || 300000, // 5 minutes retryAttempts: config?.retryAttempts || 3, retryDelay: config?.retryDelay || 5000, drainTimeout: config?.drainTimeout || 60000, ...config }; } /** * Start the execution engine */ start(): void { // Initialize workers for (let i = 0; i < this.config.maxWorkers; i++) { this.createWorker(); } // Start processing loop this.processing = true; this.processInterval = setInterval(() => this.processQueue(), 100); this.emit('started', { workerCount: this.workers.size }); } /** * Stop the execution engine */ async stop(): Promise { this.processing = false; if (this.processInterval) { clearInterval(this.processInterval); } // Wait for running tasks to complete or drain await this.drain(); // Terminate workers for (const worker of this.workers.values()) { worker.status = 'terminated'; } this.emit('stopped'); } /** * Create a new worker */ private createWorker(): Worker { const worker: Worker = { id: `worker-${randomUUID().substring(0, 8)}`, status: 'idle', sessions: new Map(), completedTasks: 0, failedTasks: 0, createdAt: new Date(), lastActivity: new Date() }; this.workers.set(worker.id, worker); this.emit('workerCreated', { worker }); return worker; } /** * Create an agent session */ createSession(config: { projectId: string; role: AgentRole; model?: string; workspace: string; tools: string[]; identity: AgentIdentity; }): AgentSession { const session: AgentSession = { id: `session-${config.projectId}-${config.role}-${randomUUID().substring(0, 8)}`, projectId: config.projectId, role: config.role, model: config.model || this.getDefaultModelForRole(config.role), workspace: config.workspace, tools: config.tools, memory: {}, identity: config.identity, status: 'idle', createdAt: new Date(), lastActivity: new Date() }; this.sessions.set(session.id, session); this.emit('sessionCreated', { session }); return session; } /** * Get default model for a role (cost optimization) */ private getDefaultModelForRole(role: AgentRole): string { switch (role) { case 'programmer': return 'opus'; // Best for complex coding case 'reviewer': return 'sonnet'; // Cost-effective for review case 'tester': return 'sonnet'; // Good for test generation case 'planner': return 'opus'; // Complex planning case 'analyst': return 'sonnet'; default: return 'sonnet'; } } /** * Submit a task for execution */ submitTask(task: Omit): PipelineTask { const fullTask: PipelineTask = { ...task, id: `task-${randomUUID().substring(0, 8)}`, status: 'pending', retryCount: 0, createdAt: new Date() }; this.taskQueue.push(fullTask); this.emit('taskSubmitted', { task: fullTask }); // Sort by priority this.prioritizeQueue(); return fullTask; } /** * Submit multiple tasks for parallel execution */ submitBatch(tasks: Array>): PipelineTask[] { return tasks.map(task => this.submitTask(task)); } /** * Prioritize the task queue */ private prioritizeQueue(): void { const priorityOrder = { critical: 0, high: 1, medium: 2, low: 3 }; this.taskQueue.sort((a, b) => { // First by priority const priorityDiff = priorityOrder[a.priority] - priorityOrder[b.priority]; if (priorityDiff !== 0) return priorityDiff; // Then by creation time (FIFO within priority) return a.createdAt.getTime() - b.createdAt.getTime(); }); } /** * Process the task queue */ private async processQueue(): Promise { if (!this.processing) return; // Find tasks ready to run (dependencies met) const readyTasks = this.getReadyTasks(); for (const task of readyTasks) { const worker = this.findAvailableWorker(task); if (!worker) break; // No workers available await this.executeTask(task, worker); } } /** * Get tasks that are ready to execute */ private getReadyTasks(): PipelineTask[] { return this.taskQueue.filter(task => { if (task.status !== 'pending') return false; // Check dependencies for (const depId of task.dependencies) { const depTask = this.getTask(depId); if (!depTask || depTask.status !== 'completed') { return false; } } return true; }); } /** * Find an available worker for a task */ private findAvailableWorker(task: PipelineTask): Worker | undefined { // First, try to find a worker already handling the project for (const worker of this.workers.values()) { if (worker.status !== 'idle' && worker.status !== 'busy') continue; const hasProject = Array.from(worker.sessions.values()) .some(s => s.projectId === task.projectId); if (hasProject && worker.sessions.size < this.config.maxConcurrentPerWorker) { return worker; } } // Then, find any available worker for (const worker of this.workers.values()) { if (worker.status !== 'idle' && worker.status !== 'busy') continue; if (worker.sessions.size < this.config.maxConcurrentPerWorker) { return worker; } } // Create new worker if under limit if (this.workers.size < this.config.maxWorkers) { return this.createWorker(); } return undefined; } /** * Execute a task */ private async executeTask(task: PipelineTask, worker: Worker): Promise { // Move task from queue to running const taskIndex = this.taskQueue.indexOf(task); if (taskIndex > -1) { this.taskQueue.splice(taskIndex, 1); } task.status = 'running'; task.startedAt = new Date(); task.assignedWorker = worker.id; // Create or get session const session = this.getOrCreateSession(task, worker); // Track running task this.runningTasks.set(task.id, { task, worker, session }); // Update worker status worker.status = 'busy'; worker.currentTask = task.id; worker.lastActivity = new Date(); this.emit('taskStarted', { task, worker, session }); // Get task handler const handler = this.taskHandlers.get(task.type) || this.defaultTaskHandler; try { // Execute with timeout const result = await Promise.race([ handler(task, session), this.createTimeout(task) ]); task.result = result; task.status = 'completed'; task.completedAt = new Date(); worker.completedTasks++; this.completedTasks.push(task); this.emit('taskCompleted', { task, worker, session, result }); } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); task.error = errorMessage; task.retryCount++; if (task.retryCount < task.maxRetries) { // Retry task.status = 'pending'; this.taskQueue.push(task); this.emit('taskRetrying', { task, attempt: task.retryCount }); } else { // Failed task.status = 'failed'; task.completedAt = new Date(); worker.failedTasks++; this.failedTasks.push(task); this.emit('taskFailed', { task, worker, error: errorMessage }); } } // Cleanup this.runningTasks.delete(task.id); worker.currentTask = undefined; worker.lastActivity = new Date(); // Update worker status if (worker.sessions.size === 0 || this.runningTasks.size === 0) { worker.status = 'idle'; } session.lastActivity = new Date(); } /** * Get or create session for a task */ private getOrCreateSession(task: PipelineTask, worker: Worker): AgentSession { // Look for existing session for this project/role for (const session of worker.sessions.values()) { if (session.projectId === task.projectId && session.role === task.role) { return session; } } // Create new session const session = this.createSession({ projectId: task.projectId, role: task.role, workspace: `workspace/${task.projectId}/${task.role}`, tools: this.getToolsForRole(task.role), identity: this.getIdentityForRole(task.role) }); worker.sessions.set(session.id, session); return session; } /** * Get tools available for a role */ private getToolsForRole(role: AgentRole): string[] { const toolMap: Record = { programmer: ['read', 'write', 'execute', 'git', 'test', 'lint', 'build'], reviewer: ['read', 'diff', 'comment', 'lint', 'test'], tester: ['read', 'execute', 'test', 'mock'], planner: ['read', 'write', 'diagram'], analyst: ['read', 'query', 'report'], custom: ['read'] }; return toolMap[role] || ['read']; } /** * Get identity for a role */ private getIdentityForRole(role: AgentRole): AgentIdentity { const identityMap: Record = { programmer: { name: 'Code Architect', description: 'Expert developer who writes clean, efficient code', personality: 'Methodical, detail-oriented, focuses on best practices' }, reviewer: { name: 'Code Reviewer', description: 'Experienced engineer who catches bugs and improves code quality', personality: 'Thorough, constructive, focuses on maintainability' }, tester: { name: 'QA Engineer', description: 'Test specialist who ensures code correctness', personality: 'Systematic, edge-case focused, quality-driven' }, planner: { name: 'Technical Architect', description: 'Strategic thinker who plans implementation', personality: 'Analytical, systematic, big-picture focused' }, analyst: { name: 'Data Analyst', description: 'Data specialist who extracts insights', personality: 'Curious, methodical, detail-oriented' }, custom: { name: 'Custom Agent', description: 'Generic agent for custom tasks', personality: 'Adaptable' } }; return identityMap[role] || identityMap.custom; } /** * Default task handler */ private async defaultTaskHandler(task: PipelineTask, session: AgentSession): Promise { // This would be replaced by actual LLM invocation return { message: `Task ${task.type} completed by ${session.identity.name}`, projectId: task.projectId, role: task.role }; } /** * Create timeout promise */ private createTimeout(task: PipelineTask): Promise { return new Promise((_, reject) => { setTimeout(() => { reject(new Error(`Task ${task.id} timed out after ${task.timeout}ms`)); }, task.timeout); }); } /** * Get task by ID */ getTask(taskId: string): PipelineTask | undefined { return ( this.taskQueue.find(t => t.id === taskId) || this.runningTasks.get(taskId)?.task || this.completedTasks.find(t => t.id === taskId) || this.failedTasks.find(t => t.id === taskId) ); } /** * Register a task handler */ registerHandler(taskType: string, handler: (task: PipelineTask, session: AgentSession) => Promise): void { this.taskHandlers.set(taskType, handler); } /** * Drain - wait for running tasks to complete */ private async drain(): Promise { while (this.runningTasks.size > 0) { await new Promise(resolve => setTimeout(resolve, 100)); } } /** * Get engine statistics */ getStats(): { workers: { total: number; idle: number; busy: number }; tasks: { pending: number; running: number; completed: number; failed: number }; sessions: number; } { let idleWorkers = 0; let busyWorkers = 0; for (const worker of this.workers.values()) { if (worker.status === 'idle') idleWorkers++; else if (worker.status === 'busy') busyWorkers++; } return { workers: { total: this.workers.size, idle: idleWorkers, busy: busyWorkers }, tasks: { pending: this.taskQueue.length, running: this.runningTasks.size, completed: this.completedTasks.length, failed: this.failedTasks.length }, sessions: this.sessions.size }; } /** * Get sessions by project */ getSessionsByProject(projectId: string): AgentSession[] { return Array.from(this.sessions.values()).filter(s => s.projectId === projectId); } /** * Get all sessions */ getAllSessions(): AgentSession[] { return Array.from(this.sessions.values()); } /** * Terminate a session */ terminateSession(sessionId: string): boolean { const session = this.sessions.get(sessionId); if (session) { session.status = 'terminated'; this.emit('sessionTerminated', { session }); return true; } return false; } } // Default instance export const defaultExecutor = new ParallelExecutionEngine();