/** * EventBus - Central event pub/sub system for session events * * Replaces callback-based event handling with a clean publish/subscribe pattern. * All session events flow through the EventBus, allowing multiple subscribers * to listen to the same events. */ const EventEmitter = require('events'); class EventBus extends EventEmitter { constructor() { super(); this.setMaxListeners(0); // Unlimited listeners for scalability // Metrics for monitoring this.metrics = { eventsEmitted: 0, eventsByType: new Map(), listenerCounts: new Map() }; } /** * Subscribe to an event type * @param {string} eventType - Event type (e.g., 'session-output', 'session-error') * @param {string|null} sessionId - Session ID to filter events (null for all sessions) * @param {Function} handler - Event handler function * @returns {Function} Unsubscribe function */ subscribe(eventType, sessionId, handler) { const listenerId = `${eventType}-${sessionId || 'global'}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; const wrappedHandler = (data) => { // Filter by session ID if specified if (sessionId !== null && data.sessionId !== sessionId) { console.log(`[EventBus] Filtered event ${eventType}: subscribed=${sessionId}, data.sessionId=${data.sessionId}`); return; } console.log(`[EventBus] Calling handler for ${eventType}, session ${sessionId}`); try { handler(data); } catch (error) { console.error(`[EventBus] Error in handler for ${eventType}:`, error); console.error(`[EventBus] Handler:`, handler.name || 'anonymous'); console.error(`[EventBus] Data:`, data); } }; this.on(eventType, wrappedHandler); // Track listener count for metrics const key = `${eventType}-${sessionId || 'global'}`; this.metrics.listenerCounts.set(key, (this.metrics.listenerCounts.get(key) || 0) + 1); console.log(`[EventBus] Subscribed to ${eventType} for session ${sessionId || 'all'}. Total: ${this.metrics.listenerCounts.get(key)}`); // Return unsubscribe function return () => { this.off(eventType, wrappedHandler); const currentCount = this.metrics.listenerCounts.get(key) || 0; this.metrics.listenerCounts.set(key, Math.max(0, currentCount - 1)); console.log(`[EventBus] Unsubscribed from ${eventType} for session ${sessionId || 'all'}. Remaining: ${this.metrics.listenerCounts.get(key)}`); }; } /** * Emit an event to all subscribers * @param {string} eventType - Event type * @param {Object} data - Event data (must include sessionId for session-scoped events) */ emit(eventType, data = {}) { this.metrics.eventsEmitted++; this.metrics.eventsByType.set(eventType, (this.metrics.eventsByType.get(eventType) || 0) + 1); // Add metadata to all events const eventData = { ...data, _timestamp: Date.now(), _eventType: eventType }; // Log important events if (eventType.includes('error') || eventType.includes('expired')) { console.error(`[EventBus] Emitting ${eventType}:`, data); } else { console.log(`[EventBus] Emitting ${eventType} for session ${data.sessionId || 'unknown'}`); } super.emit(eventType, eventData); } /** * Subscribe to all events for a specific session * @param {string} sessionId - Session ID * @param {Function} handler - Handler for all session events * @returns {Function} Unsubscribe function */ subscribeToSession(sessionId, handler) { const eventTypes = [ 'session-output', 'session-error', 'session-status', 'operations-detected', 'operations-executed', 'operations-error', 'approval-request', 'approval-confirmed', 'approval-expired', 'session-created', 'session-deleted', 'command-sent', 'command-complete', 'terminal-created', 'terminal-closed' ]; console.log(`[EventBus] Subscribing to all events for session ${sessionId}`); const unsubscribers = eventTypes.map(type => this.subscribe(type, sessionId, handler) ); // Return combined unsubscribe function return () => { console.log(`[EventBus] Unsubscribing from all events for session ${sessionId}`); unsubscribers.forEach(unsub => unsub()); }; } /** * Get current metrics * @returns {Object} Metrics object */ getMetrics() { return { eventsEmitted: this.metrics.eventsEmitted, eventsByType: Object.fromEntries(this.metrics.eventsByType), listenerCounts: Object.fromEntries(this.metrics.listenerCounts), activeListeners: this.listenerCount('session-output') + this.listenerCount('session-error') + this.listenerCount('session-status') + this.listenerCount('operations-detected') }; } /** * Log metrics summary */ logMetrics() { const metrics = this.getMetrics(); console.log('[EventBus] Metrics Summary:'); console.log(` Total events emitted: ${metrics.eventsEmitted}`); console.log(` Events by type:`, metrics.eventsByType); console.log(` Listener counts:`, metrics.listenerCounts); console.log(` Active listeners: ${metrics.activeListeners}`); } /** * Clear all listeners and reset metrics (useful for testing) */ clear() { this.removeAllListeners(); this.metrics = { eventsEmitted: 0, eventsByType: new Map(), listenerCounts: new Map() }; console.log('[EventBus] Cleared all listeners and reset metrics'); } } // Singleton instance const eventBus = new EventBus(); // Log metrics periodically (every 5 minutes in production) if (process.env.NODE_ENV === 'production') { setInterval(() => { eventBus.logMetrics(); }, 5 * 60 * 1000); } module.exports = eventBus;