Files
Agentic-Compaction-and-Pipl…/pipeline-system/engine/parallel-executor.ts
admin c629646b9f Complete Agent Pipeline System with Claude Code & OpenClaw Integration
- Added Claude Code integration with full context compaction support
- Added OpenClaw integration with deterministic pipeline support
- Implemented parallel agent execution (4 projects x 3 roles pattern)
- Added workspace isolation with permissions and quotas
- Implemented Lobster-compatible YAML workflow parser
- Added persistent memory store for cross-session context
- Created comprehensive README with hero section

This project was 100% autonomously built by Z.AI GLM-5
2026-03-03 13:12:14 +00:00

625 lines
17 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* Parallel Execution Engine
*
* Manages concurrent agent sessions with resource pooling.
* Supports: 4 projects × 3 roles = up to 12 concurrent sessions.
*
* Key features:
* - Worker pool with configurable concurrency limits
* - Resource isolation per agent session
* - Automatic scaling based on load
* - Task queuing with priority support
*/
import { randomUUID } from 'crypto';
import { EventEmitter } from 'events';
// ============================================================================
// Types
// ============================================================================
export type AgentRole = 'programmer' | 'reviewer' | 'tester' | 'planner' | 'analyst' | 'custom';
export type TaskStatus = 'pending' | 'queued' | 'running' | 'completed' | 'failed' | 'cancelled';
export type WorkerStatus = 'idle' | 'busy' | 'draining' | 'terminated';
export interface AgentSession {
id: string;
projectId: string;
role: AgentRole;
model?: string; // e.g., 'opus', 'sonnet' for cost optimization
workspace: string;
tools: string[];
memory: Record<string, unknown>;
identity: AgentIdentity;
status: 'active' | 'idle' | 'terminated';
createdAt: Date;
lastActivity: Date;
}
export interface AgentIdentity {
name: string;
description: string;
personality?: string;
systemPrompt?: string;
}
export interface PipelineTask {
id: string;
projectId: string;
role: AgentRole;
type: string;
description: string;
priority: 'low' | 'medium' | 'high' | 'critical';
input: unknown;
dependencies: string[];
timeout: number;
retryCount: number;
maxRetries: number;
status: TaskStatus;
assignedWorker?: string;
result?: unknown;
error?: string;
createdAt: Date;
startedAt?: Date;
completedAt?: Date;
metadata?: Record<string, unknown>;
}
export interface Worker {
id: string;
status: WorkerStatus;
currentTask?: string;
sessions: Map<string, AgentSession>;
completedTasks: number;
failedTasks: number;
createdAt: Date;
lastActivity: Date;
}
export interface ExecutionConfig {
maxWorkers: number;
maxConcurrentPerWorker: number;
taskTimeout: number;
retryAttempts: number;
retryDelay: number;
drainTimeout: number;
}
export interface ExecutionResult {
taskId: string;
success: boolean;
output?: unknown;
error?: string;
duration: number;
workerId: string;
sessionId: string;
}
// ============================================================================
// Parallel Executor
// ============================================================================
/**
* ParallelExecutionEngine - Manages concurrent agent sessions
*/
export class ParallelExecutionEngine extends EventEmitter {
private config: ExecutionConfig;
private workers: Map<string, Worker> = new Map();
private taskQueue: PipelineTask[] = [];
private runningTasks: Map<string, { task: PipelineTask; worker: Worker; session: AgentSession }> = new Map();
private completedTasks: PipelineTask[] = [];
private failedTasks: PipelineTask[] = [];
private sessions: Map<string, AgentSession> = new Map();
private processing = false;
private processInterval?: ReturnType<typeof setInterval>;
private taskHandlers: Map<string, (task: PipelineTask, session: AgentSession) => Promise<unknown>> = new Map();
constructor(config?: Partial<ExecutionConfig>) {
super();
this.config = {
maxWorkers: config?.maxWorkers || 4,
maxConcurrentPerWorker: config?.maxConcurrentPerWorker || 3,
taskTimeout: config?.taskTimeout || 300000, // 5 minutes
retryAttempts: config?.retryAttempts || 3,
retryDelay: config?.retryDelay || 5000,
drainTimeout: config?.drainTimeout || 60000,
...config
};
}
/**
* Start the execution engine
*/
start(): void {
// Initialize workers
for (let i = 0; i < this.config.maxWorkers; i++) {
this.createWorker();
}
// Start processing loop
this.processing = true;
this.processInterval = setInterval(() => this.processQueue(), 100);
this.emit('started', { workerCount: this.workers.size });
}
/**
* Stop the execution engine
*/
async stop(): Promise<void> {
this.processing = false;
if (this.processInterval) {
clearInterval(this.processInterval);
}
// Wait for running tasks to complete or drain
await this.drain();
// Terminate workers
for (const worker of this.workers.values()) {
worker.status = 'terminated';
}
this.emit('stopped');
}
/**
* Create a new worker
*/
private createWorker(): Worker {
const worker: Worker = {
id: `worker-${randomUUID().substring(0, 8)}`,
status: 'idle',
sessions: new Map(),
completedTasks: 0,
failedTasks: 0,
createdAt: new Date(),
lastActivity: new Date()
};
this.workers.set(worker.id, worker);
this.emit('workerCreated', { worker });
return worker;
}
/**
* Create an agent session
*/
createSession(config: {
projectId: string;
role: AgentRole;
model?: string;
workspace: string;
tools: string[];
identity: AgentIdentity;
}): AgentSession {
const session: AgentSession = {
id: `session-${config.projectId}-${config.role}-${randomUUID().substring(0, 8)}`,
projectId: config.projectId,
role: config.role,
model: config.model || this.getDefaultModelForRole(config.role),
workspace: config.workspace,
tools: config.tools,
memory: {},
identity: config.identity,
status: 'idle',
createdAt: new Date(),
lastActivity: new Date()
};
this.sessions.set(session.id, session);
this.emit('sessionCreated', { session });
return session;
}
/**
* Get default model for a role (cost optimization)
*/
private getDefaultModelForRole(role: AgentRole): string {
switch (role) {
case 'programmer':
return 'opus'; // Best for complex coding
case 'reviewer':
return 'sonnet'; // Cost-effective for review
case 'tester':
return 'sonnet'; // Good for test generation
case 'planner':
return 'opus'; // Complex planning
case 'analyst':
return 'sonnet';
default:
return 'sonnet';
}
}
/**
* Submit a task for execution
*/
submitTask(task: Omit<PipelineTask, 'id' | 'status' | 'retryCount' | 'createdAt'>): PipelineTask {
const fullTask: PipelineTask = {
...task,
id: `task-${randomUUID().substring(0, 8)}`,
status: 'pending',
retryCount: 0,
createdAt: new Date()
};
this.taskQueue.push(fullTask);
this.emit('taskSubmitted', { task: fullTask });
// Sort by priority
this.prioritizeQueue();
return fullTask;
}
/**
* Submit multiple tasks for parallel execution
*/
submitBatch(tasks: Array<Omit<PipelineTask, 'id' | 'status' | 'retryCount' | 'createdAt'>>): PipelineTask[] {
return tasks.map(task => this.submitTask(task));
}
/**
* Prioritize the task queue
*/
private prioritizeQueue(): void {
const priorityOrder = { critical: 0, high: 1, medium: 2, low: 3 };
this.taskQueue.sort((a, b) => {
// First by priority
const priorityDiff = priorityOrder[a.priority] - priorityOrder[b.priority];
if (priorityDiff !== 0) return priorityDiff;
// Then by creation time (FIFO within priority)
return a.createdAt.getTime() - b.createdAt.getTime();
});
}
/**
* Process the task queue
*/
private async processQueue(): Promise<void> {
if (!this.processing) return;
// Find tasks ready to run (dependencies met)
const readyTasks = this.getReadyTasks();
for (const task of readyTasks) {
const worker = this.findAvailableWorker(task);
if (!worker) break; // No workers available
await this.executeTask(task, worker);
}
}
/**
* Get tasks that are ready to execute
*/
private getReadyTasks(): PipelineTask[] {
return this.taskQueue.filter(task => {
if (task.status !== 'pending') return false;
// Check dependencies
for (const depId of task.dependencies) {
const depTask = this.getTask(depId);
if (!depTask || depTask.status !== 'completed') {
return false;
}
}
return true;
});
}
/**
* Find an available worker for a task
*/
private findAvailableWorker(task: PipelineTask): Worker | undefined {
// First, try to find a worker already handling the project
for (const worker of this.workers.values()) {
if (worker.status !== 'idle' && worker.status !== 'busy') continue;
const hasProject = Array.from(worker.sessions.values())
.some(s => s.projectId === task.projectId);
if (hasProject && worker.sessions.size < this.config.maxConcurrentPerWorker) {
return worker;
}
}
// Then, find any available worker
for (const worker of this.workers.values()) {
if (worker.status !== 'idle' && worker.status !== 'busy') continue;
if (worker.sessions.size < this.config.maxConcurrentPerWorker) {
return worker;
}
}
// Create new worker if under limit
if (this.workers.size < this.config.maxWorkers) {
return this.createWorker();
}
return undefined;
}
/**
* Execute a task
*/
private async executeTask(task: PipelineTask, worker: Worker): Promise<void> {
// Move task from queue to running
const taskIndex = this.taskQueue.indexOf(task);
if (taskIndex > -1) {
this.taskQueue.splice(taskIndex, 1);
}
task.status = 'running';
task.startedAt = new Date();
task.assignedWorker = worker.id;
// Create or get session
const session = this.getOrCreateSession(task, worker);
// Track running task
this.runningTasks.set(task.id, { task, worker, session });
// Update worker status
worker.status = 'busy';
worker.currentTask = task.id;
worker.lastActivity = new Date();
this.emit('taskStarted', { task, worker, session });
// Get task handler
const handler = this.taskHandlers.get(task.type) || this.defaultTaskHandler;
try {
// Execute with timeout
const result = await Promise.race([
handler(task, session),
this.createTimeout(task)
]);
task.result = result;
task.status = 'completed';
task.completedAt = new Date();
worker.completedTasks++;
this.completedTasks.push(task);
this.emit('taskCompleted', { task, worker, session, result });
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
task.error = errorMessage;
task.retryCount++;
if (task.retryCount < task.maxRetries) {
// Retry
task.status = 'pending';
this.taskQueue.push(task);
this.emit('taskRetrying', { task, attempt: task.retryCount });
} else {
// Failed
task.status = 'failed';
task.completedAt = new Date();
worker.failedTasks++;
this.failedTasks.push(task);
this.emit('taskFailed', { task, worker, error: errorMessage });
}
}
// Cleanup
this.runningTasks.delete(task.id);
worker.currentTask = undefined;
worker.lastActivity = new Date();
// Update worker status
if (worker.sessions.size === 0 || this.runningTasks.size === 0) {
worker.status = 'idle';
}
session.lastActivity = new Date();
}
/**
* Get or create session for a task
*/
private getOrCreateSession(task: PipelineTask, worker: Worker): AgentSession {
// Look for existing session for this project/role
for (const session of worker.sessions.values()) {
if (session.projectId === task.projectId && session.role === task.role) {
return session;
}
}
// Create new session
const session = this.createSession({
projectId: task.projectId,
role: task.role,
workspace: `workspace/${task.projectId}/${task.role}`,
tools: this.getToolsForRole(task.role),
identity: this.getIdentityForRole(task.role)
});
worker.sessions.set(session.id, session);
return session;
}
/**
* Get tools available for a role
*/
private getToolsForRole(role: AgentRole): string[] {
const toolMap: Record<AgentRole, string[]> = {
programmer: ['read', 'write', 'execute', 'git', 'test', 'lint', 'build'],
reviewer: ['read', 'diff', 'comment', 'lint', 'test'],
tester: ['read', 'execute', 'test', 'mock'],
planner: ['read', 'write', 'diagram'],
analyst: ['read', 'query', 'report'],
custom: ['read']
};
return toolMap[role] || ['read'];
}
/**
* Get identity for a role
*/
private getIdentityForRole(role: AgentRole): AgentIdentity {
const identityMap: Record<AgentRole, AgentIdentity> = {
programmer: {
name: 'Code Architect',
description: 'Expert developer who writes clean, efficient code',
personality: 'Methodical, detail-oriented, focuses on best practices'
},
reviewer: {
name: 'Code Reviewer',
description: 'Experienced engineer who catches bugs and improves code quality',
personality: 'Thorough, constructive, focuses on maintainability'
},
tester: {
name: 'QA Engineer',
description: 'Test specialist who ensures code correctness',
personality: 'Systematic, edge-case focused, quality-driven'
},
planner: {
name: 'Technical Architect',
description: 'Strategic thinker who plans implementation',
personality: 'Analytical, systematic, big-picture focused'
},
analyst: {
name: 'Data Analyst',
description: 'Data specialist who extracts insights',
personality: 'Curious, methodical, detail-oriented'
},
custom: {
name: 'Custom Agent',
description: 'Generic agent for custom tasks',
personality: 'Adaptable'
}
};
return identityMap[role] || identityMap.custom;
}
/**
* Default task handler
*/
private async defaultTaskHandler(task: PipelineTask, session: AgentSession): Promise<unknown> {
// This would be replaced by actual LLM invocation
return {
message: `Task ${task.type} completed by ${session.identity.name}`,
projectId: task.projectId,
role: task.role
};
}
/**
* Create timeout promise
*/
private createTimeout(task: PipelineTask): Promise<never> {
return new Promise((_, reject) => {
setTimeout(() => {
reject(new Error(`Task ${task.id} timed out after ${task.timeout}ms`));
}, task.timeout);
});
}
/**
* Get task by ID
*/
getTask(taskId: string): PipelineTask | undefined {
return (
this.taskQueue.find(t => t.id === taskId) ||
this.runningTasks.get(taskId)?.task ||
this.completedTasks.find(t => t.id === taskId) ||
this.failedTasks.find(t => t.id === taskId)
);
}
/**
* Register a task handler
*/
registerHandler(taskType: string, handler: (task: PipelineTask, session: AgentSession) => Promise<unknown>): void {
this.taskHandlers.set(taskType, handler);
}
/**
* Drain - wait for running tasks to complete
*/
private async drain(): Promise<void> {
while (this.runningTasks.size > 0) {
await new Promise(resolve => setTimeout(resolve, 100));
}
}
/**
* Get engine statistics
*/
getStats(): {
workers: { total: number; idle: number; busy: number };
tasks: { pending: number; running: number; completed: number; failed: number };
sessions: number;
} {
let idleWorkers = 0;
let busyWorkers = 0;
for (const worker of this.workers.values()) {
if (worker.status === 'idle') idleWorkers++;
else if (worker.status === 'busy') busyWorkers++;
}
return {
workers: {
total: this.workers.size,
idle: idleWorkers,
busy: busyWorkers
},
tasks: {
pending: this.taskQueue.length,
running: this.runningTasks.size,
completed: this.completedTasks.length,
failed: this.failedTasks.length
},
sessions: this.sessions.size
};
}
/**
* Get sessions by project
*/
getSessionsByProject(projectId: string): AgentSession[] {
return Array.from(this.sessions.values()).filter(s => s.projectId === projectId);
}
/**
* Get all sessions
*/
getAllSessions(): AgentSession[] {
return Array.from(this.sessions.values());
}
/**
* Terminate a session
*/
terminateSession(sessionId: string): boolean {
const session = this.sessions.get(sessionId);
if (session) {
session.status = 'terminated';
this.emit('sessionTerminated', { session });
return true;
}
return false;
}
}
// Default instance
export const defaultExecutor = new ParallelExecutionEngine();