/** * Event-Driven Coordination System * * Event bus for inter-agent communication. * Agents finish work → emit event → next step triggers automatically. * * Key features: * - Pub/sub event distribution * - Event correlation and routing * - Event replay for debugging * - Dead letter queue for failed handlers */ import { randomUUID } from 'crypto'; import { EventEmitter } from 'events'; // ============================================================================ // Types // ============================================================================ export type EventPriority = 'low' | 'normal' | 'high' | 'critical'; export interface PipelineEvent { id: string; type: string; source: string; target?: string; payload: unknown; priority: EventPriority; timestamp: Date; correlationId?: string; causationId?: string; // ID of event that caused this event metadata?: Record; retryCount?: number; } export interface EventHandler { id: string; eventType: string | string[] | '*'; filter?: EventFilter; handler: (event: PipelineEvent) => Promise | void; priority?: number; once?: boolean; } export interface EventFilter { source?: string | string[]; target?: string | string[]; payloadPattern?: Record; } export interface Subscription { id: string; eventType: string; handlerId: string; active: boolean; createdAt: Date; eventsReceived: number; } export interface EventBusConfig { maxHistorySize: number; deadLetterQueueSize: number; retryAttempts: number; retryDelay: number; enableReplay: boolean; } export interface EventBusStats { eventsPublished: number; eventsProcessed: number; eventsFailed: number; handlersRegistered: number; queueSize: number; historySize: number; } // ============================================================================ // Event Bus // ============================================================================ /** * EventBus - Central event distribution system */ export class EventBus extends EventEmitter { private config: EventBusConfig; private handlers: Map = new Map(); private eventQueue: PipelineEvent[] = []; private history: PipelineEvent[] = []; private deadLetterQueue: PipelineEvent[] = []; private processing = false; private stats = { eventsPublished: 0, eventsProcessed: 0, eventsFailed: 0 }; private processInterval?: ReturnType; constructor(config?: Partial) { super(); this.config = { maxHistorySize: 1000, deadLetterQueueSize: 100, retryAttempts: 3, retryDelay: 1000, enableReplay: true, ...config }; } /** * Start the event bus */ start(): void { this.processing = true; this.processInterval = setInterval(() => this.processQueue(), 50); this.emit('started'); } /** * Stop the event bus */ stop(): void { this.processing = false; if (this.processInterval) { clearInterval(this.processInterval); } this.emit('stopped'); } /** * Publish an event */ publish(event: Omit): string { const fullEvent: PipelineEvent = { ...event, id: `evt-${randomUUID().substring(0, 8)}`, timestamp: new Date(), retryCount: event.retryCount || 0 }; // Add to queue this.eventQueue.push(fullEvent); this.stats.eventsPublished++; // Add to history if (this.config.enableReplay) { this.history.push(fullEvent); if (this.history.length > this.config.maxHistorySize) { this.history.shift(); } } this.emit('eventPublished', { event: fullEvent }); return fullEvent.id; } /** * Publish a batch of events */ publishBatch(events: Array>): string[] { return events.map(event => this.publish(event)); } /** * Subscribe to events */ subscribe(config: { eventType: string | string[] | '*'; handler: (event: PipelineEvent) => Promise | void; filter?: EventFilter; priority?: number; once?: boolean; }): string { const handlerId = `handler-${randomUUID().substring(0, 8)}`; const handler: EventHandler = { id: handlerId, eventType: config.eventType, filter: config.filter, handler: config.handler, priority: config.priority || 0, once: config.once || false }; this.handlers.set(handlerId, handler); this.emit('handlerRegistered', { handler }); return handlerId; } /** * Unsubscribe from events */ unsubscribe(handlerId: string): boolean { const result = this.handlers.delete(handlerId); if (result) { this.emit('handlerUnregistered', { handlerId }); } return result; } /** * Process the event queue */ private async processQueue(): Promise { if (!this.processing || this.eventQueue.length === 0) return; const event = this.eventQueue.shift()!; // Find matching handlers const matchingHandlers = this.findMatchingHandlers(event); // Sort by priority (higher first) matchingHandlers.sort((a, b) => (b.priority || 0) - (a.priority || 0)); // Execute handlers for (const handler of matchingHandlers) { try { await handler.handler(event); this.stats.eventsProcessed++; // Remove one-time handlers if (handler.once) { this.handlers.delete(handler.id); } } catch (error) { this.stats.eventsFailed++; // Retry logic const retryCount = (event.retryCount || 0) + 1; if (retryCount < this.config.retryAttempts) { // Re-queue with incremented retry count setTimeout(() => { this.publish({ ...event, retryCount }); }, this.config.retryDelay * retryCount); this.emit('eventRetry', { event, error, retryCount }); } else { // Move to dead letter queue this.addToDeadLetterQueue(event, error); } } } this.emit('eventProcessed', { event, handlerCount: matchingHandlers.length }); } /** * Find handlers matching an event */ private findMatchingHandlers(event: PipelineEvent): EventHandler[] { const matching: EventHandler[] = []; for (const handler of this.handlers.values()) { // Check event type match if (handler.eventType !== '*') { const types = Array.isArray(handler.eventType) ? handler.eventType : [handler.eventType]; if (!types.includes(event.type)) continue; } // Check filters if (handler.filter && !this.matchesFilter(event, handler.filter)) { continue; } matching.push(handler); } return matching; } /** * Check if event matches filter */ private matchesFilter(event: PipelineEvent, filter: EventFilter): boolean { // Check source filter if (filter.source) { const sources = Array.isArray(filter.source) ? filter.source : [filter.source]; if (!sources.includes(event.source)) return false; } // Check target filter if (filter.target) { const targets = Array.isArray(filter.target) ? filter.target : [filter.target]; if (event.target && !targets.includes(event.target)) return false; } // Check payload pattern if (filter.payloadPattern) { const payload = event.payload as Record; for (const [key, value] of Object.entries(filter.payloadPattern)) { if (payload[key] !== value) return false; } } return true; } /** * Add event to dead letter queue */ private addToDeadLetterQueue(event: PipelineEvent, error: unknown): void { this.deadLetterQueue.push({ ...event, metadata: { ...event.metadata, error: error instanceof Error ? error.message : String(error), failedAt: new Date().toISOString() } }); // Trim queue if (this.deadLetterQueue.length > this.config.deadLetterQueueSize) { this.deadLetterQueue.shift(); } this.emit('eventDeadLettered', { event, error }); } /** * Replay events from history */ replay(fromTimestamp?: Date, toTimestamp?: Date): void { if (!this.config.enableReplay) { throw new Error('Event replay is disabled'); } const events = this.history.filter(event => { if (fromTimestamp && event.timestamp < fromTimestamp) return false; if (toTimestamp && event.timestamp > toTimestamp) return false; return true; }); for (const event of events) { this.eventQueue.push({ ...event, id: `replay-${event.id}`, metadata: { ...event.metadata, replayed: true } }); } this.emit('replayStarted', { count: events.length }); } /** * Get events from history */ getHistory(filter?: { type?: string; source?: string; from?: Date; to?: Date; }): PipelineEvent[] { let events = [...this.history]; if (filter) { if (filter.type) { events = events.filter(e => e.type === filter.type); } if (filter.source) { events = events.filter(e => e.source === filter.source); } if (filter.from) { events = events.filter(e => e.timestamp >= filter.from!); } if (filter.to) { events = events.filter(e => e.timestamp <= filter.to!); } } return events; } /** * Get dead letter queue */ getDeadLetterQueue(): PipelineEvent[] { return [...this.deadLetterQueue]; } /** * Clear dead letter queue */ clearDeadLetterQueue(): void { this.deadLetterQueue = []; } /** * Get statistics */ getStats(): EventBusStats { return { eventsPublished: this.stats.eventsPublished, eventsProcessed: this.stats.eventsProcessed, eventsFailed: this.stats.eventsFailed, handlersRegistered: this.handlers.size, queueSize: this.eventQueue.length, historySize: this.history.length }; } /** * Request-response pattern */ async request( event: Omit, timeout = 30000 ): Promise { return new Promise((resolve, reject) => { const correlationId = `req-${randomUUID().substring(0, 8)}`; // Subscribe to response const responseHandler = this.subscribe({ eventType: `${event.type}.response`, filter: { payloadPattern: { correlationId } }, once: true, handler: (response) => { clearTimeout(timeoutId); resolve(response.payload as T); } }); // Set timeout const timeoutId = setTimeout(() => { this.unsubscribe(responseHandler); reject(new Error(`Request timeout for event ${event.type}`)); }, timeout); // Publish request with correlation ID this.publish({ ...event, metadata: { ...event.metadata, correlationId } }); }); } /** * Create a correlated event chain */ createChain(firstEvent: Omit): EventChain { const correlationId = `chain-${randomUUID().substring(0, 8)}`; return new EventChain(this, correlationId, firstEvent); } } // ============================================================================ // Event Chain // ============================================================================ /** * EventChain - Builder for correlated event sequences */ export class EventChain { private bus: EventBus; private correlationId: string; private events: PipelineEvent[] = []; private currentEvent?: PipelineEvent; constructor(bus: EventBus, correlationId: string, firstEvent: Omit) { this.bus = bus; this.correlationId = correlationId; this.currentEvent = { ...firstEvent, id: '', timestamp: new Date(), correlationId } as PipelineEvent; } /** * Add next event in chain */ then(event: Omit): this { if (this.currentEvent) { this.events.push(this.currentEvent); this.currentEvent = { ...event, id: '', timestamp: new Date(), correlationId: this.correlationId, causationId: this.currentEvent.id || undefined } as PipelineEvent; } return this; } /** * Execute the chain */ execute(): string[] { if (this.currentEvent) { this.events.push(this.currentEvent); } return this.events.map(event => this.bus.publish({ ...event, correlationId: this.correlationId }) ); } /** * Get correlation ID */ getCorrelationId(): string { return this.correlationId; } } // ============================================================================ // Predefined Pipeline Events // ============================================================================ /** * Standard pipeline event types */ export const PipelineEventTypes = { // Agent lifecycle AGENT_STARTED: 'agent.started', AGENT_COMPLETED: 'agent.completed', AGENT_FAILED: 'agent.failed', AGENT_TIMEOUT: 'agent.timeout', // Task lifecycle TASK_CREATED: 'task.created', TASK_ASSIGNED: 'task.assigned', TASK_STARTED: 'task.started', TASK_COMPLETED: 'task.completed', TASK_FAILED: 'task.failed', // Code pipeline CODE_WRITTEN: 'code.written', CODE_REVIEWED: 'code.reviewed', CODE_APPROVED: 'code.approved', CODE_REJECTED: 'code.rejected', CODE_TESTED: 'code.tested', TESTS_PASSED: 'tests.passed', TESTS_FAILED: 'tests.failed', // State machine STATE_ENTERED: 'state.entered', STATE_EXITED: 'state.exited', TRANSITION: 'state.transition', // Coordination PIPELINE_STARTED: 'pipeline.started', PIPELINE_COMPLETED: 'pipeline.completed', PIPELINE_PAUSED: 'pipeline.paused', PIPELINE_RESUMED: 'pipeline.resumed', // Human interaction HUMAN_INPUT_REQUIRED: 'human.input_required', HUMAN_INPUT_RECEIVED: 'human.input_received', HUMAN_APPROVAL_REQUIRED: 'human.approval_required', HUMAN_APPROVED: 'human.approved', HUMAN_REJECTED: 'human.rejected' } as const; // Default event bus instance export const defaultEventBus = new EventBus();