- Modified loadChatHistory() to check for active project before fetching all sessions - When active project exists, use project.sessions instead of fetching from API - Added detailed console logging to debug session filtering - This prevents ALL sessions from appearing in every project's sidebar Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
186 lines
6.0 KiB
JavaScript
186 lines
6.0 KiB
JavaScript
/**
|
|
* EventBus - Central event pub/sub system for session events
|
|
*
|
|
* Replaces callback-based event handling with a clean publish/subscribe pattern.
|
|
* All session events flow through the EventBus, allowing multiple subscribers
|
|
* to listen to the same events.
|
|
*/
|
|
|
|
const EventEmitter = require('events');
|
|
|
|
class EventBus extends EventEmitter {
|
|
constructor() {
|
|
super();
|
|
this.setMaxListeners(0); // Unlimited listeners for scalability
|
|
|
|
// Metrics for monitoring
|
|
this.metrics = {
|
|
eventsEmitted: 0,
|
|
eventsByType: new Map(),
|
|
listenerCounts: new Map()
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Subscribe to an event type
|
|
* @param {string} eventType - Event type (e.g., 'session-output', 'session-error')
|
|
* @param {string|null} sessionId - Session ID to filter events (null for all sessions)
|
|
* @param {Function} handler - Event handler function
|
|
* @returns {Function} Unsubscribe function
|
|
*/
|
|
subscribe(eventType, sessionId, handler) {
|
|
const listenerId = `${eventType}-${sessionId || 'global'}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
|
|
|
|
const wrappedHandler = (data) => {
|
|
// Filter by session ID if specified
|
|
if (sessionId !== null && data.sessionId !== sessionId) {
|
|
console.log(`[EventBus] Filtered event ${eventType}: subscribed=${sessionId}, data.sessionId=${data.sessionId}`);
|
|
return;
|
|
}
|
|
|
|
console.log(`[EventBus] Calling handler for ${eventType}, session ${sessionId}`);
|
|
|
|
try {
|
|
handler(data);
|
|
} catch (error) {
|
|
console.error(`[EventBus] Error in handler for ${eventType}:`, error);
|
|
console.error(`[EventBus] Handler:`, handler.name || 'anonymous');
|
|
console.error(`[EventBus] Data:`, data);
|
|
}
|
|
};
|
|
|
|
this.on(eventType, wrappedHandler);
|
|
|
|
// Track listener count for metrics
|
|
const key = `${eventType}-${sessionId || 'global'}`;
|
|
this.metrics.listenerCounts.set(key, (this.metrics.listenerCounts.get(key) || 0) + 1);
|
|
|
|
console.log(`[EventBus] Subscribed to ${eventType} for session ${sessionId || 'all'}. Total: ${this.metrics.listenerCounts.get(key)}`);
|
|
|
|
// Return unsubscribe function
|
|
return () => {
|
|
this.off(eventType, wrappedHandler);
|
|
const currentCount = this.metrics.listenerCounts.get(key) || 0;
|
|
this.metrics.listenerCounts.set(key, Math.max(0, currentCount - 1));
|
|
console.log(`[EventBus] Unsubscribed from ${eventType} for session ${sessionId || 'all'}. Remaining: ${this.metrics.listenerCounts.get(key)}`);
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Emit an event to all subscribers
|
|
* @param {string} eventType - Event type
|
|
* @param {Object} data - Event data (must include sessionId for session-scoped events)
|
|
*/
|
|
emit(eventType, data = {}) {
|
|
this.metrics.eventsEmitted++;
|
|
this.metrics.eventsByType.set(eventType, (this.metrics.eventsByType.get(eventType) || 0) + 1);
|
|
|
|
// Add metadata to all events
|
|
const eventData = {
|
|
...data,
|
|
_timestamp: Date.now(),
|
|
_eventType: eventType
|
|
};
|
|
|
|
// Log important events
|
|
if (eventType.includes('error') || eventType.includes('expired')) {
|
|
console.error(`[EventBus] Emitting ${eventType}:`, data);
|
|
} else {
|
|
console.log(`[EventBus] Emitting ${eventType} for session ${data.sessionId || 'unknown'}`);
|
|
}
|
|
|
|
super.emit(eventType, eventData);
|
|
}
|
|
|
|
/**
|
|
* Subscribe to all events for a specific session
|
|
* @param {string} sessionId - Session ID
|
|
* @param {Function} handler - Handler for all session events
|
|
* @returns {Function} Unsubscribe function
|
|
*/
|
|
subscribeToSession(sessionId, handler) {
|
|
const eventTypes = [
|
|
'session-output',
|
|
'session-error',
|
|
'session-status',
|
|
'operations-detected',
|
|
'operations-executed',
|
|
'operations-error',
|
|
'approval-request',
|
|
'approval-confirmed',
|
|
'approval-expired',
|
|
'session-created',
|
|
'session-deleted',
|
|
'command-sent',
|
|
'command-complete',
|
|
'terminal-created',
|
|
'terminal-closed'
|
|
];
|
|
|
|
console.log(`[EventBus] Subscribing to all events for session ${sessionId}`);
|
|
|
|
const unsubscribers = eventTypes.map(type =>
|
|
this.subscribe(type, sessionId, handler)
|
|
);
|
|
|
|
// Return combined unsubscribe function
|
|
return () => {
|
|
console.log(`[EventBus] Unsubscribing from all events for session ${sessionId}`);
|
|
unsubscribers.forEach(unsub => unsub());
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Get current metrics
|
|
* @returns {Object} Metrics object
|
|
*/
|
|
getMetrics() {
|
|
return {
|
|
eventsEmitted: this.metrics.eventsEmitted,
|
|
eventsByType: Object.fromEntries(this.metrics.eventsByType),
|
|
listenerCounts: Object.fromEntries(this.metrics.listenerCounts),
|
|
activeListeners: this.listenerCount('session-output') +
|
|
this.listenerCount('session-error') +
|
|
this.listenerCount('session-status') +
|
|
this.listenerCount('operations-detected')
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Log metrics summary
|
|
*/
|
|
logMetrics() {
|
|
const metrics = this.getMetrics();
|
|
console.log('[EventBus] Metrics Summary:');
|
|
console.log(` Total events emitted: ${metrics.eventsEmitted}`);
|
|
console.log(` Events by type:`, metrics.eventsByType);
|
|
console.log(` Listener counts:`, metrics.listenerCounts);
|
|
console.log(` Active listeners: ${metrics.activeListeners}`);
|
|
}
|
|
|
|
/**
|
|
* Clear all listeners and reset metrics (useful for testing)
|
|
*/
|
|
clear() {
|
|
this.removeAllListeners();
|
|
this.metrics = {
|
|
eventsEmitted: 0,
|
|
eventsByType: new Map(),
|
|
listenerCounts: new Map()
|
|
};
|
|
console.log('[EventBus] Cleared all listeners and reset metrics');
|
|
}
|
|
}
|
|
|
|
// Singleton instance
|
|
const eventBus = new EventBus();
|
|
|
|
// Log metrics periodically (every 5 minutes in production)
|
|
if (process.env.NODE_ENV === 'production') {
|
|
setInterval(() => {
|
|
eventBus.logMetrics();
|
|
}, 5 * 60 * 1000);
|
|
}
|
|
|
|
module.exports = eventBus;
|