- Added Claude Code integration with full context compaction support - Added OpenClaw integration with deterministic pipeline support - Implemented parallel agent execution (4 projects x 3 roles pattern) - Added workspace isolation with permissions and quotas - Implemented Lobster-compatible YAML workflow parser - Added persistent memory store for cross-session context - Created comprehensive README with hero section This project was 100% autonomously built by Z.AI GLM-5
571 lines
14 KiB
TypeScript
571 lines
14 KiB
TypeScript
/**
|
|
* Event-Driven Coordination System
|
|
*
|
|
* Event bus for inter-agent communication.
|
|
* Agents finish work → emit event → next step triggers automatically.
|
|
*
|
|
* Key features:
|
|
* - Pub/sub event distribution
|
|
* - Event correlation and routing
|
|
* - Event replay for debugging
|
|
* - Dead letter queue for failed handlers
|
|
*/
|
|
|
|
import { randomUUID } from 'crypto';
|
|
import { EventEmitter } from 'events';
|
|
|
|
// ============================================================================
|
|
// Types
|
|
// ============================================================================
|
|
|
|
export type EventPriority = 'low' | 'normal' | 'high' | 'critical';
|
|
|
|
export interface PipelineEvent {
|
|
id: string;
|
|
type: string;
|
|
source: string;
|
|
target?: string;
|
|
payload: unknown;
|
|
priority: EventPriority;
|
|
timestamp: Date;
|
|
correlationId?: string;
|
|
causationId?: string; // ID of event that caused this event
|
|
metadata?: Record<string, unknown>;
|
|
retryCount?: number;
|
|
}
|
|
|
|
export interface EventHandler {
|
|
id: string;
|
|
eventType: string | string[] | '*';
|
|
filter?: EventFilter;
|
|
handler: (event: PipelineEvent) => Promise<void> | void;
|
|
priority?: number;
|
|
once?: boolean;
|
|
}
|
|
|
|
export interface EventFilter {
|
|
source?: string | string[];
|
|
target?: string | string[];
|
|
payloadPattern?: Record<string, unknown>;
|
|
}
|
|
|
|
export interface Subscription {
|
|
id: string;
|
|
eventType: string;
|
|
handlerId: string;
|
|
active: boolean;
|
|
createdAt: Date;
|
|
eventsReceived: number;
|
|
}
|
|
|
|
export interface EventBusConfig {
|
|
maxHistorySize: number;
|
|
deadLetterQueueSize: number;
|
|
retryAttempts: number;
|
|
retryDelay: number;
|
|
enableReplay: boolean;
|
|
}
|
|
|
|
export interface EventBusStats {
|
|
eventsPublished: number;
|
|
eventsProcessed: number;
|
|
eventsFailed: number;
|
|
handlersRegistered: number;
|
|
queueSize: number;
|
|
historySize: number;
|
|
}
|
|
|
|
// ============================================================================
|
|
// Event Bus
|
|
// ============================================================================
|
|
|
|
/**
|
|
* EventBus - Central event distribution system
|
|
*/
|
|
export class EventBus extends EventEmitter {
|
|
private config: EventBusConfig;
|
|
private handlers: Map<string, EventHandler> = new Map();
|
|
private eventQueue: PipelineEvent[] = [];
|
|
private history: PipelineEvent[] = [];
|
|
private deadLetterQueue: PipelineEvent[] = [];
|
|
private processing = false;
|
|
private stats = {
|
|
eventsPublished: 0,
|
|
eventsProcessed: 0,
|
|
eventsFailed: 0
|
|
};
|
|
private processInterval?: ReturnType<typeof setInterval>;
|
|
|
|
constructor(config?: Partial<EventBusConfig>) {
|
|
super();
|
|
this.config = {
|
|
maxHistorySize: 1000,
|
|
deadLetterQueueSize: 100,
|
|
retryAttempts: 3,
|
|
retryDelay: 1000,
|
|
enableReplay: true,
|
|
...config
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Start the event bus
|
|
*/
|
|
start(): void {
|
|
this.processing = true;
|
|
this.processInterval = setInterval(() => this.processQueue(), 50);
|
|
this.emit('started');
|
|
}
|
|
|
|
/**
|
|
* Stop the event bus
|
|
*/
|
|
stop(): void {
|
|
this.processing = false;
|
|
if (this.processInterval) {
|
|
clearInterval(this.processInterval);
|
|
}
|
|
this.emit('stopped');
|
|
}
|
|
|
|
/**
|
|
* Publish an event
|
|
*/
|
|
publish(event: Omit<PipelineEvent, 'id' | 'timestamp'>): string {
|
|
const fullEvent: PipelineEvent = {
|
|
...event,
|
|
id: `evt-${randomUUID().substring(0, 8)}`,
|
|
timestamp: new Date(),
|
|
retryCount: event.retryCount || 0
|
|
};
|
|
|
|
// Add to queue
|
|
this.eventQueue.push(fullEvent);
|
|
this.stats.eventsPublished++;
|
|
|
|
// Add to history
|
|
if (this.config.enableReplay) {
|
|
this.history.push(fullEvent);
|
|
if (this.history.length > this.config.maxHistorySize) {
|
|
this.history.shift();
|
|
}
|
|
}
|
|
|
|
this.emit('eventPublished', { event: fullEvent });
|
|
|
|
return fullEvent.id;
|
|
}
|
|
|
|
/**
|
|
* Publish a batch of events
|
|
*/
|
|
publishBatch(events: Array<Omit<PipelineEvent, 'id' | 'timestamp'>>): string[] {
|
|
return events.map(event => this.publish(event));
|
|
}
|
|
|
|
/**
|
|
* Subscribe to events
|
|
*/
|
|
subscribe(config: {
|
|
eventType: string | string[] | '*';
|
|
handler: (event: PipelineEvent) => Promise<void> | void;
|
|
filter?: EventFilter;
|
|
priority?: number;
|
|
once?: boolean;
|
|
}): string {
|
|
const handlerId = `handler-${randomUUID().substring(0, 8)}`;
|
|
|
|
const handler: EventHandler = {
|
|
id: handlerId,
|
|
eventType: config.eventType,
|
|
filter: config.filter,
|
|
handler: config.handler,
|
|
priority: config.priority || 0,
|
|
once: config.once || false
|
|
};
|
|
|
|
this.handlers.set(handlerId, handler);
|
|
this.emit('handlerRegistered', { handler });
|
|
|
|
return handlerId;
|
|
}
|
|
|
|
/**
|
|
* Unsubscribe from events
|
|
*/
|
|
unsubscribe(handlerId: string): boolean {
|
|
const result = this.handlers.delete(handlerId);
|
|
if (result) {
|
|
this.emit('handlerUnregistered', { handlerId });
|
|
}
|
|
return result;
|
|
}
|
|
|
|
/**
|
|
* Process the event queue
|
|
*/
|
|
private async processQueue(): Promise<void> {
|
|
if (!this.processing || this.eventQueue.length === 0) return;
|
|
|
|
const event = this.eventQueue.shift()!;
|
|
|
|
// Find matching handlers
|
|
const matchingHandlers = this.findMatchingHandlers(event);
|
|
|
|
// Sort by priority (higher first)
|
|
matchingHandlers.sort((a, b) => (b.priority || 0) - (a.priority || 0));
|
|
|
|
// Execute handlers
|
|
for (const handler of matchingHandlers) {
|
|
try {
|
|
await handler.handler(event);
|
|
this.stats.eventsProcessed++;
|
|
|
|
// Remove one-time handlers
|
|
if (handler.once) {
|
|
this.handlers.delete(handler.id);
|
|
}
|
|
|
|
} catch (error) {
|
|
this.stats.eventsFailed++;
|
|
|
|
// Retry logic
|
|
const retryCount = (event.retryCount || 0) + 1;
|
|
|
|
if (retryCount < this.config.retryAttempts) {
|
|
// Re-queue with incremented retry count
|
|
setTimeout(() => {
|
|
this.publish({
|
|
...event,
|
|
retryCount
|
|
});
|
|
}, this.config.retryDelay * retryCount);
|
|
|
|
this.emit('eventRetry', { event, error, retryCount });
|
|
} else {
|
|
// Move to dead letter queue
|
|
this.addToDeadLetterQueue(event, error);
|
|
}
|
|
}
|
|
}
|
|
|
|
this.emit('eventProcessed', { event, handlerCount: matchingHandlers.length });
|
|
}
|
|
|
|
/**
|
|
* Find handlers matching an event
|
|
*/
|
|
private findMatchingHandlers(event: PipelineEvent): EventHandler[] {
|
|
const matching: EventHandler[] = [];
|
|
|
|
for (const handler of this.handlers.values()) {
|
|
// Check event type match
|
|
if (handler.eventType !== '*') {
|
|
const types = Array.isArray(handler.eventType) ? handler.eventType : [handler.eventType];
|
|
if (!types.includes(event.type)) continue;
|
|
}
|
|
|
|
// Check filters
|
|
if (handler.filter && !this.matchesFilter(event, handler.filter)) {
|
|
continue;
|
|
}
|
|
|
|
matching.push(handler);
|
|
}
|
|
|
|
return matching;
|
|
}
|
|
|
|
/**
|
|
* Check if event matches filter
|
|
*/
|
|
private matchesFilter(event: PipelineEvent, filter: EventFilter): boolean {
|
|
// Check source filter
|
|
if (filter.source) {
|
|
const sources = Array.isArray(filter.source) ? filter.source : [filter.source];
|
|
if (!sources.includes(event.source)) return false;
|
|
}
|
|
|
|
// Check target filter
|
|
if (filter.target) {
|
|
const targets = Array.isArray(filter.target) ? filter.target : [filter.target];
|
|
if (event.target && !targets.includes(event.target)) return false;
|
|
}
|
|
|
|
// Check payload pattern
|
|
if (filter.payloadPattern) {
|
|
const payload = event.payload as Record<string, unknown>;
|
|
for (const [key, value] of Object.entries(filter.payloadPattern)) {
|
|
if (payload[key] !== value) return false;
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Add event to dead letter queue
|
|
*/
|
|
private addToDeadLetterQueue(event: PipelineEvent, error: unknown): void {
|
|
this.deadLetterQueue.push({
|
|
...event,
|
|
metadata: {
|
|
...event.metadata,
|
|
error: error instanceof Error ? error.message : String(error),
|
|
failedAt: new Date().toISOString()
|
|
}
|
|
});
|
|
|
|
// Trim queue
|
|
if (this.deadLetterQueue.length > this.config.deadLetterQueueSize) {
|
|
this.deadLetterQueue.shift();
|
|
}
|
|
|
|
this.emit('eventDeadLettered', { event, error });
|
|
}
|
|
|
|
/**
|
|
* Replay events from history
|
|
*/
|
|
replay(fromTimestamp?: Date, toTimestamp?: Date): void {
|
|
if (!this.config.enableReplay) {
|
|
throw new Error('Event replay is disabled');
|
|
}
|
|
|
|
const events = this.history.filter(event => {
|
|
if (fromTimestamp && event.timestamp < fromTimestamp) return false;
|
|
if (toTimestamp && event.timestamp > toTimestamp) return false;
|
|
return true;
|
|
});
|
|
|
|
for (const event of events) {
|
|
this.eventQueue.push({
|
|
...event,
|
|
id: `replay-${event.id}`,
|
|
metadata: { ...event.metadata, replayed: true }
|
|
});
|
|
}
|
|
|
|
this.emit('replayStarted', { count: events.length });
|
|
}
|
|
|
|
/**
|
|
* Get events from history
|
|
*/
|
|
getHistory(filter?: {
|
|
type?: string;
|
|
source?: string;
|
|
from?: Date;
|
|
to?: Date;
|
|
}): PipelineEvent[] {
|
|
let events = [...this.history];
|
|
|
|
if (filter) {
|
|
if (filter.type) {
|
|
events = events.filter(e => e.type === filter.type);
|
|
}
|
|
if (filter.source) {
|
|
events = events.filter(e => e.source === filter.source);
|
|
}
|
|
if (filter.from) {
|
|
events = events.filter(e => e.timestamp >= filter.from!);
|
|
}
|
|
if (filter.to) {
|
|
events = events.filter(e => e.timestamp <= filter.to!);
|
|
}
|
|
}
|
|
|
|
return events;
|
|
}
|
|
|
|
/**
|
|
* Get dead letter queue
|
|
*/
|
|
getDeadLetterQueue(): PipelineEvent[] {
|
|
return [...this.deadLetterQueue];
|
|
}
|
|
|
|
/**
|
|
* Clear dead letter queue
|
|
*/
|
|
clearDeadLetterQueue(): void {
|
|
this.deadLetterQueue = [];
|
|
}
|
|
|
|
/**
|
|
* Get statistics
|
|
*/
|
|
getStats(): EventBusStats {
|
|
return {
|
|
eventsPublished: this.stats.eventsPublished,
|
|
eventsProcessed: this.stats.eventsProcessed,
|
|
eventsFailed: this.stats.eventsFailed,
|
|
handlersRegistered: this.handlers.size,
|
|
queueSize: this.eventQueue.length,
|
|
historySize: this.history.length
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Request-response pattern
|
|
*/
|
|
async request<T = unknown>(
|
|
event: Omit<PipelineEvent, 'id' | 'timestamp'>,
|
|
timeout = 30000
|
|
): Promise<T> {
|
|
return new Promise((resolve, reject) => {
|
|
const correlationId = `req-${randomUUID().substring(0, 8)}`;
|
|
|
|
// Subscribe to response
|
|
const responseHandler = this.subscribe({
|
|
eventType: `${event.type}.response`,
|
|
filter: { payloadPattern: { correlationId } },
|
|
once: true,
|
|
handler: (response) => {
|
|
clearTimeout(timeoutId);
|
|
resolve(response.payload as T);
|
|
}
|
|
});
|
|
|
|
// Set timeout
|
|
const timeoutId = setTimeout(() => {
|
|
this.unsubscribe(responseHandler);
|
|
reject(new Error(`Request timeout for event ${event.type}`));
|
|
}, timeout);
|
|
|
|
// Publish request with correlation ID
|
|
this.publish({
|
|
...event,
|
|
metadata: { ...event.metadata, correlationId }
|
|
});
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Create a correlated event chain
|
|
*/
|
|
createChain(firstEvent: Omit<PipelineEvent, 'id' | 'timestamp' | 'correlationId'>): EventChain {
|
|
const correlationId = `chain-${randomUUID().substring(0, 8)}`;
|
|
|
|
return new EventChain(this, correlationId, firstEvent);
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// Event Chain
|
|
// ============================================================================
|
|
|
|
/**
|
|
* EventChain - Builder for correlated event sequences
|
|
*/
|
|
export class EventChain {
|
|
private bus: EventBus;
|
|
private correlationId: string;
|
|
private events: PipelineEvent[] = [];
|
|
private currentEvent?: PipelineEvent;
|
|
|
|
constructor(bus: EventBus, correlationId: string, firstEvent: Omit<PipelineEvent, 'id' | 'timestamp' | 'correlationId'>) {
|
|
this.bus = bus;
|
|
this.correlationId = correlationId;
|
|
this.currentEvent = {
|
|
...firstEvent,
|
|
id: '',
|
|
timestamp: new Date(),
|
|
correlationId
|
|
} as PipelineEvent;
|
|
}
|
|
|
|
/**
|
|
* Add next event in chain
|
|
*/
|
|
then(event: Omit<PipelineEvent, 'id' | 'timestamp' | 'correlationId' | 'causationId'>): this {
|
|
if (this.currentEvent) {
|
|
this.events.push(this.currentEvent);
|
|
|
|
this.currentEvent = {
|
|
...event,
|
|
id: '',
|
|
timestamp: new Date(),
|
|
correlationId: this.correlationId,
|
|
causationId: this.currentEvent.id || undefined
|
|
} as PipelineEvent;
|
|
}
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* Execute the chain
|
|
*/
|
|
execute(): string[] {
|
|
if (this.currentEvent) {
|
|
this.events.push(this.currentEvent);
|
|
}
|
|
|
|
return this.events.map(event =>
|
|
this.bus.publish({
|
|
...event,
|
|
correlationId: this.correlationId
|
|
})
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Get correlation ID
|
|
*/
|
|
getCorrelationId(): string {
|
|
return this.correlationId;
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// Predefined Pipeline Events
|
|
// ============================================================================
|
|
|
|
/**
|
|
* Standard pipeline event types
|
|
*/
|
|
export const PipelineEventTypes = {
|
|
// Agent lifecycle
|
|
AGENT_STARTED: 'agent.started',
|
|
AGENT_COMPLETED: 'agent.completed',
|
|
AGENT_FAILED: 'agent.failed',
|
|
AGENT_TIMEOUT: 'agent.timeout',
|
|
|
|
// Task lifecycle
|
|
TASK_CREATED: 'task.created',
|
|
TASK_ASSIGNED: 'task.assigned',
|
|
TASK_STARTED: 'task.started',
|
|
TASK_COMPLETED: 'task.completed',
|
|
TASK_FAILED: 'task.failed',
|
|
|
|
// Code pipeline
|
|
CODE_WRITTEN: 'code.written',
|
|
CODE_REVIEWED: 'code.reviewed',
|
|
CODE_APPROVED: 'code.approved',
|
|
CODE_REJECTED: 'code.rejected',
|
|
CODE_TESTED: 'code.tested',
|
|
TESTS_PASSED: 'tests.passed',
|
|
TESTS_FAILED: 'tests.failed',
|
|
|
|
// State machine
|
|
STATE_ENTERED: 'state.entered',
|
|
STATE_EXITED: 'state.exited',
|
|
TRANSITION: 'state.transition',
|
|
|
|
// Coordination
|
|
PIPELINE_STARTED: 'pipeline.started',
|
|
PIPELINE_COMPLETED: 'pipeline.completed',
|
|
PIPELINE_PAUSED: 'pipeline.paused',
|
|
PIPELINE_RESUMED: 'pipeline.resumed',
|
|
|
|
// Human interaction
|
|
HUMAN_INPUT_REQUIRED: 'human.input_required',
|
|
HUMAN_INPUT_RECEIVED: 'human.input_received',
|
|
HUMAN_APPROVAL_REQUIRED: 'human.approval_required',
|
|
HUMAN_APPROVED: 'human.approved',
|
|
HUMAN_REJECTED: 'human.rejected'
|
|
} as const;
|
|
|
|
// Default event bus instance
|
|
export const defaultEventBus = new EventBus();
|