/** * Progress Streamer * * Provides real-time progress updates for delegation tasks. * Supports WebSocket and SSE for streaming to clients. */ import { ProgressEvent, ProgressEventType, ProgressConfig, AgentType } from '../core/types'; // ============================================================================ // Default Configuration // ============================================================================ const DEFAULT_CONFIG: ProgressConfig = { enabled: true, updateInterval: 500, // 500ms between updates includePartialResults: true, maxQueueSize: 100 }; // ============================================================================ // Progress Streamer Class // ============================================================================ export class ProgressStreamer { private config: ProgressConfig; private progressQueues: Map = new Map(); private subscribers: Map void>> = new Map(); private progress: Map = new Map(); private steps: Map = new Map(); constructor(config: Partial = {}) { this.config = { ...DEFAULT_CONFIG, ...config }; } // ============================================================================ // Progress Tracking // ============================================================================ /** * Start tracking progress for a request */ startTracking(requestId: string): void { this.progressQueues.set(requestId, []); this.progress.set(requestId, 0); this.steps.set(requestId, { current: 0, total: 0 }); } /** * Stop tracking and cleanup */ stopTracking(requestId: string): void { this.progressQueues.delete(requestId); this.progress.delete(requestId); this.steps.delete(requestId); this.subscribers.delete(requestId); } /** * Emit a progress event */ emit( requestId: string, type: ProgressEventType, message: string, data?: ProgressEvent['data'] ): void { if (!this.config.enabled) return; const event: ProgressEvent = { type, requestId, message, progress: this.progress.get(requestId) || 0, timestamp: new Date(), data }; // Add to queue const queue = this.progressQueues.get(requestId) || []; queue.push(event); // Enforce max size if (queue.length > this.config.maxQueueSize) { queue.shift(); } this.progressQueues.set(requestId, queue); // Notify subscribers this.notifySubscribers(requestId, event); } // ============================================================================ // Convenience Methods // ============================================================================ /** * Emit acknowledgment event */ acknowledge(requestId: string, message: string = 'Got it! Processing...'): void { this.emit(requestId, 'acknowledgment', message); } /** * Emit delegation notification */ notifyDelegation( requestId: string, agentType: AgentType, agentId: string ): void { this.emit(requestId, 'delegation', `Delegating to ${agentType} agent...`, { agentType, agentId }); } /** * Update progress */ updateProgress( requestId: string, progress: number, currentStep?: string, data?: Partial ): void { this.progress.set(requestId, Math.min(100, Math.max(0, progress))); const steps = this.steps.get(requestId); if (steps && data?.completedSteps !== undefined) { steps.current = data.completedSteps; } this.emit(requestId, 'progress', currentStep || 'Processing...', { ...data, currentStep }); } /** * Set total steps for progress calculation */ setTotalSteps(requestId: string, total: number): void { const steps = this.steps.get(requestId); if (steps) { steps.total = total; } } /** * Emit partial results */ partialResult(requestId: string, results: any[]): void { if (!this.config.includePartialResults) return; this.emit(requestId, 'partial-result', 'Found intermediate results...', { partialResults: results }); } /** * Emit completion event */ complete(requestId: string, message: string = 'Complete!'): void { this.progress.set(requestId, 100); this.emit(requestId, 'completion', message); // Cleanup after short delay setTimeout(() => { this.stopTracking(requestId); }, 5000); } /** * Emit error event */ error(requestId: string, message: string): void { this.emit(requestId, 'error', message); } /** * Emit escalation event */ escalate(requestId: string, reason: string): void { this.emit(requestId, 'escalation', `Escalating to main agent: ${reason}`); } // ============================================================================ // Subscription Management // ============================================================================ /** * Subscribe to progress updates for a request */ subscribe( requestId: string, callback: (event: ProgressEvent) => void ): () => void { if (!this.subscribers.has(requestId)) { this.subscribers.set(requestId, new Set()); } const subscribers = this.subscribers.get(requestId)!; subscribers.add(callback); // Send any queued events immediately const queue = this.progressQueues.get(requestId) || []; queue.forEach(event => callback(event)); // Return unsubscribe function return () => { subscribers.delete(callback); if (subscribers.size === 0) { this.subscribers.delete(requestId); } }; } /** * Subscribe to all progress events */ subscribeAll(callback: (event: ProgressEvent) => void): () => void { // Create a unique key for global subscribers const globalKey = '__global__'; if (!this.subscribers.has(globalKey)) { this.subscribers.set(globalKey, new Set()); } const subscribers = this.subscribers.get(globalKey)!; subscribers.add(callback); return () => { subscribers.delete(callback); }; } private notifySubscribers(requestId: string, event: ProgressEvent): void { // Notify request-specific subscribers const requestSubscribers = this.subscribers.get(requestId); if (requestSubscribers) { requestSubscribers.forEach(callback => { try { callback(event); } catch (e) { console.error('Progress subscriber error:', e); } }); } // Notify global subscribers const globalSubscribers = this.subscribers.get('__global__'); if (globalSubscribers) { globalSubscribers.forEach(callback => { try { callback(event); } catch (e) { console.error('Global progress subscriber error:', e); } }); } } // ============================================================================ // Progress Retrieval // ============================================================================ /** * Get current progress for a request */ getProgress(requestId: string): number { return this.progress.get(requestId) || 0; } /** * Get all events for a request */ getEvents(requestId: string): ProgressEvent[] { return this.progressQueues.get(requestId) || []; } /** * Get step progress */ getStepProgress(requestId: string): { current: number; total: number } | undefined { return this.steps.get(requestId); } // ============================================================================ // Utility Methods // ============================================================================ /** * Format progress as string */ formatProgress(requestId: string): string { const progress = this.progress.get(requestId) || 0; const steps = this.steps.get(requestId); if (steps && steps.total > 0) { return `[${'█'.repeat(Math.floor(progress / 5))}${'░'.repeat(20 - Math.floor(progress / 5))}] ${progress.toFixed(0)}% (${steps.current}/${steps.total})`; } return `[${'█'.repeat(Math.floor(progress / 5))}${'░'.repeat(20 - Math.floor(progress / 5))}] ${progress.toFixed(0)}%`; } /** * Clear all tracking data */ clearAll(): void { this.progressQueues.clear(); this.subscribers.clear(); this.progress.clear(); this.steps.clear(); } } // ============================================================================ // Factory Function // ============================================================================ export function createProgressStreamer(config?: Partial): ProgressStreamer { return new ProgressStreamer(config); } // ============================================================================ // Export // ============================================================================ export default ProgressStreamer;