Files
admin c629646b9f Complete Agent Pipeline System with Claude Code & OpenClaw Integration
- Added Claude Code integration with full context compaction support
- Added OpenClaw integration with deterministic pipeline support
- Implemented parallel agent execution (4 projects x 3 roles pattern)
- Added workspace isolation with permissions and quotas
- Implemented Lobster-compatible YAML workflow parser
- Added persistent memory store for cross-session context
- Created comprehensive README with hero section

This project was 100% autonomously built by Z.AI GLM-5
2026-03-03 13:12:14 +00:00

571 lines
14 KiB
TypeScript

/**
* Event-Driven Coordination System
*
* Event bus for inter-agent communication.
* Agents finish work → emit event → next step triggers automatically.
*
* Key features:
* - Pub/sub event distribution
* - Event correlation and routing
* - Event replay for debugging
* - Dead letter queue for failed handlers
*/
import { randomUUID } from 'crypto';
import { EventEmitter } from 'events';
// ============================================================================
// Types
// ============================================================================
export type EventPriority = 'low' | 'normal' | 'high' | 'critical';
export interface PipelineEvent {
id: string;
type: string;
source: string;
target?: string;
payload: unknown;
priority: EventPriority;
timestamp: Date;
correlationId?: string;
causationId?: string; // ID of event that caused this event
metadata?: Record<string, unknown>;
retryCount?: number;
}
export interface EventHandler {
id: string;
eventType: string | string[] | '*';
filter?: EventFilter;
handler: (event: PipelineEvent) => Promise<void> | void;
priority?: number;
once?: boolean;
}
export interface EventFilter {
source?: string | string[];
target?: string | string[];
payloadPattern?: Record<string, unknown>;
}
export interface Subscription {
id: string;
eventType: string;
handlerId: string;
active: boolean;
createdAt: Date;
eventsReceived: number;
}
export interface EventBusConfig {
maxHistorySize: number;
deadLetterQueueSize: number;
retryAttempts: number;
retryDelay: number;
enableReplay: boolean;
}
export interface EventBusStats {
eventsPublished: number;
eventsProcessed: number;
eventsFailed: number;
handlersRegistered: number;
queueSize: number;
historySize: number;
}
// ============================================================================
// Event Bus
// ============================================================================
/**
* EventBus - Central event distribution system
*/
export class EventBus extends EventEmitter {
private config: EventBusConfig;
private handlers: Map<string, EventHandler> = new Map();
private eventQueue: PipelineEvent[] = [];
private history: PipelineEvent[] = [];
private deadLetterQueue: PipelineEvent[] = [];
private processing = false;
private stats = {
eventsPublished: 0,
eventsProcessed: 0,
eventsFailed: 0
};
private processInterval?: ReturnType<typeof setInterval>;
constructor(config?: Partial<EventBusConfig>) {
super();
this.config = {
maxHistorySize: 1000,
deadLetterQueueSize: 100,
retryAttempts: 3,
retryDelay: 1000,
enableReplay: true,
...config
};
}
/**
* Start the event bus
*/
start(): void {
this.processing = true;
this.processInterval = setInterval(() => this.processQueue(), 50);
this.emit('started');
}
/**
* Stop the event bus
*/
stop(): void {
this.processing = false;
if (this.processInterval) {
clearInterval(this.processInterval);
}
this.emit('stopped');
}
/**
* Publish an event
*/
publish(event: Omit<PipelineEvent, 'id' | 'timestamp'>): string {
const fullEvent: PipelineEvent = {
...event,
id: `evt-${randomUUID().substring(0, 8)}`,
timestamp: new Date(),
retryCount: event.retryCount || 0
};
// Add to queue
this.eventQueue.push(fullEvent);
this.stats.eventsPublished++;
// Add to history
if (this.config.enableReplay) {
this.history.push(fullEvent);
if (this.history.length > this.config.maxHistorySize) {
this.history.shift();
}
}
this.emit('eventPublished', { event: fullEvent });
return fullEvent.id;
}
/**
* Publish a batch of events
*/
publishBatch(events: Array<Omit<PipelineEvent, 'id' | 'timestamp'>>): string[] {
return events.map(event => this.publish(event));
}
/**
* Subscribe to events
*/
subscribe(config: {
eventType: string | string[] | '*';
handler: (event: PipelineEvent) => Promise<void> | void;
filter?: EventFilter;
priority?: number;
once?: boolean;
}): string {
const handlerId = `handler-${randomUUID().substring(0, 8)}`;
const handler: EventHandler = {
id: handlerId,
eventType: config.eventType,
filter: config.filter,
handler: config.handler,
priority: config.priority || 0,
once: config.once || false
};
this.handlers.set(handlerId, handler);
this.emit('handlerRegistered', { handler });
return handlerId;
}
/**
* Unsubscribe from events
*/
unsubscribe(handlerId: string): boolean {
const result = this.handlers.delete(handlerId);
if (result) {
this.emit('handlerUnregistered', { handlerId });
}
return result;
}
/**
* Process the event queue
*/
private async processQueue(): Promise<void> {
if (!this.processing || this.eventQueue.length === 0) return;
const event = this.eventQueue.shift()!;
// Find matching handlers
const matchingHandlers = this.findMatchingHandlers(event);
// Sort by priority (higher first)
matchingHandlers.sort((a, b) => (b.priority || 0) - (a.priority || 0));
// Execute handlers
for (const handler of matchingHandlers) {
try {
await handler.handler(event);
this.stats.eventsProcessed++;
// Remove one-time handlers
if (handler.once) {
this.handlers.delete(handler.id);
}
} catch (error) {
this.stats.eventsFailed++;
// Retry logic
const retryCount = (event.retryCount || 0) + 1;
if (retryCount < this.config.retryAttempts) {
// Re-queue with incremented retry count
setTimeout(() => {
this.publish({
...event,
retryCount
});
}, this.config.retryDelay * retryCount);
this.emit('eventRetry', { event, error, retryCount });
} else {
// Move to dead letter queue
this.addToDeadLetterQueue(event, error);
}
}
}
this.emit('eventProcessed', { event, handlerCount: matchingHandlers.length });
}
/**
* Find handlers matching an event
*/
private findMatchingHandlers(event: PipelineEvent): EventHandler[] {
const matching: EventHandler[] = [];
for (const handler of this.handlers.values()) {
// Check event type match
if (handler.eventType !== '*') {
const types = Array.isArray(handler.eventType) ? handler.eventType : [handler.eventType];
if (!types.includes(event.type)) continue;
}
// Check filters
if (handler.filter && !this.matchesFilter(event, handler.filter)) {
continue;
}
matching.push(handler);
}
return matching;
}
/**
* Check if event matches filter
*/
private matchesFilter(event: PipelineEvent, filter: EventFilter): boolean {
// Check source filter
if (filter.source) {
const sources = Array.isArray(filter.source) ? filter.source : [filter.source];
if (!sources.includes(event.source)) return false;
}
// Check target filter
if (filter.target) {
const targets = Array.isArray(filter.target) ? filter.target : [filter.target];
if (event.target && !targets.includes(event.target)) return false;
}
// Check payload pattern
if (filter.payloadPattern) {
const payload = event.payload as Record<string, unknown>;
for (const [key, value] of Object.entries(filter.payloadPattern)) {
if (payload[key] !== value) return false;
}
}
return true;
}
/**
* Add event to dead letter queue
*/
private addToDeadLetterQueue(event: PipelineEvent, error: unknown): void {
this.deadLetterQueue.push({
...event,
metadata: {
...event.metadata,
error: error instanceof Error ? error.message : String(error),
failedAt: new Date().toISOString()
}
});
// Trim queue
if (this.deadLetterQueue.length > this.config.deadLetterQueueSize) {
this.deadLetterQueue.shift();
}
this.emit('eventDeadLettered', { event, error });
}
/**
* Replay events from history
*/
replay(fromTimestamp?: Date, toTimestamp?: Date): void {
if (!this.config.enableReplay) {
throw new Error('Event replay is disabled');
}
const events = this.history.filter(event => {
if (fromTimestamp && event.timestamp < fromTimestamp) return false;
if (toTimestamp && event.timestamp > toTimestamp) return false;
return true;
});
for (const event of events) {
this.eventQueue.push({
...event,
id: `replay-${event.id}`,
metadata: { ...event.metadata, replayed: true }
});
}
this.emit('replayStarted', { count: events.length });
}
/**
* Get events from history
*/
getHistory(filter?: {
type?: string;
source?: string;
from?: Date;
to?: Date;
}): PipelineEvent[] {
let events = [...this.history];
if (filter) {
if (filter.type) {
events = events.filter(e => e.type === filter.type);
}
if (filter.source) {
events = events.filter(e => e.source === filter.source);
}
if (filter.from) {
events = events.filter(e => e.timestamp >= filter.from!);
}
if (filter.to) {
events = events.filter(e => e.timestamp <= filter.to!);
}
}
return events;
}
/**
* Get dead letter queue
*/
getDeadLetterQueue(): PipelineEvent[] {
return [...this.deadLetterQueue];
}
/**
* Clear dead letter queue
*/
clearDeadLetterQueue(): void {
this.deadLetterQueue = [];
}
/**
* Get statistics
*/
getStats(): EventBusStats {
return {
eventsPublished: this.stats.eventsPublished,
eventsProcessed: this.stats.eventsProcessed,
eventsFailed: this.stats.eventsFailed,
handlersRegistered: this.handlers.size,
queueSize: this.eventQueue.length,
historySize: this.history.length
};
}
/**
* Request-response pattern
*/
async request<T = unknown>(
event: Omit<PipelineEvent, 'id' | 'timestamp'>,
timeout = 30000
): Promise<T> {
return new Promise((resolve, reject) => {
const correlationId = `req-${randomUUID().substring(0, 8)}`;
// Subscribe to response
const responseHandler = this.subscribe({
eventType: `${event.type}.response`,
filter: { payloadPattern: { correlationId } },
once: true,
handler: (response) => {
clearTimeout(timeoutId);
resolve(response.payload as T);
}
});
// Set timeout
const timeoutId = setTimeout(() => {
this.unsubscribe(responseHandler);
reject(new Error(`Request timeout for event ${event.type}`));
}, timeout);
// Publish request with correlation ID
this.publish({
...event,
metadata: { ...event.metadata, correlationId }
});
});
}
/**
* Create a correlated event chain
*/
createChain(firstEvent: Omit<PipelineEvent, 'id' | 'timestamp' | 'correlationId'>): EventChain {
const correlationId = `chain-${randomUUID().substring(0, 8)}`;
return new EventChain(this, correlationId, firstEvent);
}
}
// ============================================================================
// Event Chain
// ============================================================================
/**
* EventChain - Builder for correlated event sequences
*/
export class EventChain {
private bus: EventBus;
private correlationId: string;
private events: PipelineEvent[] = [];
private currentEvent?: PipelineEvent;
constructor(bus: EventBus, correlationId: string, firstEvent: Omit<PipelineEvent, 'id' | 'timestamp' | 'correlationId'>) {
this.bus = bus;
this.correlationId = correlationId;
this.currentEvent = {
...firstEvent,
id: '',
timestamp: new Date(),
correlationId
} as PipelineEvent;
}
/**
* Add next event in chain
*/
then(event: Omit<PipelineEvent, 'id' | 'timestamp' | 'correlationId' | 'causationId'>): this {
if (this.currentEvent) {
this.events.push(this.currentEvent);
this.currentEvent = {
...event,
id: '',
timestamp: new Date(),
correlationId: this.correlationId,
causationId: this.currentEvent.id || undefined
} as PipelineEvent;
}
return this;
}
/**
* Execute the chain
*/
execute(): string[] {
if (this.currentEvent) {
this.events.push(this.currentEvent);
}
return this.events.map(event =>
this.bus.publish({
...event,
correlationId: this.correlationId
})
);
}
/**
* Get correlation ID
*/
getCorrelationId(): string {
return this.correlationId;
}
}
// ============================================================================
// Predefined Pipeline Events
// ============================================================================
/**
* Standard pipeline event types
*/
export const PipelineEventTypes = {
// Agent lifecycle
AGENT_STARTED: 'agent.started',
AGENT_COMPLETED: 'agent.completed',
AGENT_FAILED: 'agent.failed',
AGENT_TIMEOUT: 'agent.timeout',
// Task lifecycle
TASK_CREATED: 'task.created',
TASK_ASSIGNED: 'task.assigned',
TASK_STARTED: 'task.started',
TASK_COMPLETED: 'task.completed',
TASK_FAILED: 'task.failed',
// Code pipeline
CODE_WRITTEN: 'code.written',
CODE_REVIEWED: 'code.reviewed',
CODE_APPROVED: 'code.approved',
CODE_REJECTED: 'code.rejected',
CODE_TESTED: 'code.tested',
TESTS_PASSED: 'tests.passed',
TESTS_FAILED: 'tests.failed',
// State machine
STATE_ENTERED: 'state.entered',
STATE_EXITED: 'state.exited',
TRANSITION: 'state.transition',
// Coordination
PIPELINE_STARTED: 'pipeline.started',
PIPELINE_COMPLETED: 'pipeline.completed',
PIPELINE_PAUSED: 'pipeline.paused',
PIPELINE_RESUMED: 'pipeline.resumed',
// Human interaction
HUMAN_INPUT_REQUIRED: 'human.input_required',
HUMAN_INPUT_RECEIVED: 'human.input_received',
HUMAN_APPROVAL_REQUIRED: 'human.approval_required',
HUMAN_APPROVED: 'human.approved',
HUMAN_REJECTED: 'human.rejected'
} as const;
// Default event bus instance
export const defaultEventBus = new EventBus();