/** * SSEManager - Manages Server-Sent Events connections * * Handles SSE connection lifecycle, heartbeat, and event delivery * for real-time streaming of session events to clients. */ const eventBus = require('./event-bus'); class SSEManager { constructor() { // Map: sessionId -> Set of response objects this.connections = new Map(); // Heartbeat configuration this.heartbeatInterval = 30000; // 30 seconds this.heartbeatTimers = new Map(); // Connection tracking this.totalConnectionsCreated = 0; this.totalConnectionsClosed = 0; } /** * Add SSE connection for a session * @param {string} sessionId - Session ID * @param {Object} res - Express response object * @param {Object} req - Express request object */ addConnection(sessionId, res, req) { // Setup SSE headers res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache, no-transform'); res.setHeader('X-Accel-Buffering', 'no'); // Disable nginx buffering res.setHeader('Connection', 'keep-alive'); // Flush headers to establish connection res.flushHeaders(); // Add connection to set if (!this.connections.has(sessionId)) { this.connections.set(sessionId, new Set()); } const connectionSet = this.connections.get(sessionId); connectionSet.add(res); this.totalConnectionsCreated++; console.log(`[SSEManager] ✓ Connection added for session ${sessionId}`); console.log(`[SSEManager] Total connections for ${sessionId}: ${connectionSet.size}`); console.log(`[SSEManager] Total active sessions: ${this.connections.size}`); console.log(`[SSEManager] Total connections created: ${this.totalConnectionsCreated}`); // Get client info const clientIp = req.ip || req.socket.remoteAddress; const userAgent = req.get('User-Agent') || 'unknown'; // Send initial connection event this.sendEvent(sessionId, res, { type: 'connected', sessionId, timestamp: Date.now(), message: 'SSE connection established', client: { ip: clientIp, userAgent: userAgent.substring(0, 100) // Truncate very long user agents } }); // Start heartbeat for this connection this.startHeartbeat(sessionId, res); // Subscribe to all session events from EventBus const unsubscribe = eventBus.subscribeToSession(sessionId, (eventData) => { console.log(`[SSEManager] Received event for session ${sessionId}:`, eventData.type, eventData._eventType); this.sendEvent(sessionId, res, eventData); }); // Handle client disconnect req.on('close', () => { this.removeConnection(sessionId, res, unsubscribe); }); // Log initial session status eventBus.emit('session-status', { sessionId, status: 'sse-connected', connections: connectionSet.size }); } /** * Send SSE event to a specific connection * @param {string} sessionId - Session ID * @param {Object} res - Response object * @param {Object} data - Event data * @returns {boolean} True if sent successfully */ sendEvent(sessionId, res, data) { // Check if connection is still alive if (res.destroyed || res.writableEnded || res.closed) { console.log(`[SSEManager] Skipping send to ${sessionId} - connection closed`); return false; } try { // Use the EventBus event type (_eventType) for the SSE event name // This ensures frontend receives events like 'session-output' not 'stdout' const eventName = data._eventType || data.type || 'message'; const eventData = JSON.stringify(data); console.log(`[SSEManager] SENDING to ${sessionId}: event=${eventName}, _eventType=${data._eventType}, type=${data.type}`); // SSE format: event: \ndata: \nid: \n\n res.write(`event: ${eventName}\n`); res.write(`data: ${eventData}\n`); res.write(`id: ${Date.now()}\n`); res.write('\n'); console.log(`[SSEManager] ✓ Sent to ${sessionId}: ${eventName}`); return true; } catch (error) { console.error(`[SSEManager] ✗ Error sending event to ${sessionId}:`, error.message); return false; } } /** * Send event to all connections for a session * @param {string} sessionId - Session ID * @param {Object} data - Event data * @returns {number} Number of clients the event was sent to */ broadcastToSession(sessionId, data) { const connectionSet = this.connections.get(sessionId); if (!connectionSet) { return 0; } let sentCount = 0; const deadConnections = []; for (const res of connectionSet) { if (!this.sendEvent(sessionId, res, data)) { deadConnections.push(res); } else { sentCount++; } } // Clean up dead connections deadConnections.forEach(res => { console.warn(`[SSEManager] Removing dead connection for session ${sessionId}`); connectionSet.delete(res); this.totalConnectionsClosed++; }); if (deadConnections.length > 0) { console.log(`[SSEManager] Cleaned up ${deadConnections.length} dead connection(s) for ${sessionId}`); } return sentCount; } /** * Start heartbeat for a connection * @param {string} sessionId - Session ID * @param {Object} res - Response object */ startHeartbeat(sessionId, res) { const connectionId = `${sessionId}-${res.socket?.remotePort || Math.random()}`; const timerId = setInterval(() => { if (res.destroyed || res.writableEnded || res.closed) { clearInterval(timerId); this.heartbeatTimers.delete(connectionId); return; } // Send heartbeat comment (keeps connection alive) try { res.write(': heartbeat\n\n'); } catch (error) { console.error(`[SSEManager] Heartbeat failed for ${sessionId}:`, error.message); clearInterval(timerId); this.heartbeatTimers.delete(connectionId); this.removeConnection(sessionId, res, () => {}); } }, this.heartbeatInterval); this.heartbeatTimers.set(connectionId, timerId); console.log(`[SSEManager] Heartbeat started for ${sessionId} (${this.heartbeatInterval}ms interval)`); } /** * Remove SSE connection * @param {string} sessionId - Session ID * @param {Object} res - Response object * @param {Function} unsubscribe - Unsubscribe function from EventBus */ removeConnection(sessionId, res, unsubscribe) { const connectionSet = this.connections.get(sessionId); if (connectionSet) { connectionSet.delete(res); if (connectionSet.size === 0) { this.connections.delete(sessionId); console.log(`[SSEManager] No more connections for session ${sessionId}, removing from tracking`); } this.totalConnectionsClosed++; } console.log(`[SSEManager] ✗ Connection removed for session ${sessionId}`); console.log(`[SSEManager] Remaining connections for ${sessionId}: ${connectionSet?.size || 0}`); // Stop heartbeat const connectionId = `${sessionId}-${res.socket?.remotePort || 'unknown'}`; const timerId = this.heartbeatTimers.get(connectionId); if (timerId) { clearInterval(timerId); this.heartbeatTimers.delete(connectionId); console.log(`[SSEManager] Heartbeat stopped for ${sessionId}`); } // Unsubscribe from events if (unsubscribe) { unsubscribe(); } // Log final status console.log(`[SSEManager] Total connections created: ${this.totalConnectionsCreated}`); console.log(`[SSEManager] Total connections closed: ${this.totalConnectionsClosed}`); console.log(`[SSEManager] Active connections: ${this.totalConnectionsCreated - this.totalConnectionsClosed}`); } /** * Get connection count for a session * @param {string} sessionId - Session ID * @returns {number} Number of active connections */ getConnectionCount(sessionId) { return this.connections.get(sessionId)?.size || 0; } /** * Get all active session IDs * @returns {Array} Array of session IDs with active connections */ getActiveSessions() { return Array.from(this.connections.keys()); } /** * Get detailed connection stats * @returns {Object} Connection statistics */ getStats() { const sessionStats = {}; for (const [sessionId, connectionSet] of this.connections.entries()) { sessionStats[sessionId] = connectionSet.size; } return { totalSessions: this.connections.size, totalConnections: Array.from(this.connections.values()) .reduce((sum, set) => sum + set.size, 0), sessions: sessionStats, totalCreated: this.totalConnectionsCreated, totalClosed: this.totalConnectionsClosed, activeHeartbeats: this.heartbeatTimers.size }; } /** * Clean up all connections (for shutdown) */ cleanup() { console.log('[SSEManager] Cleaning up all connections...'); let closedCount = 0; for (const [sessionId, connectionSet] of this.connections.entries()) { for (const res of connectionSet) { try { res.write('event: shutdown\ndata: {"message":"Server shutting down"}\n\n'); res.end(); closedCount++; } catch (error) { console.error(`[SSEManager] Error closing connection for ${sessionId}:`, error.message); } } } // Clear all heartbeat timers for (const timerId of this.heartbeatTimers.values()) { clearInterval(timerId); } this.connections.clear(); this.heartbeatTimers.clear(); console.log(`[SSEManager] Cleanup complete. Closed ${closedCount} connection(s)`); } /** * Log connection stats */ logStats() { const stats = this.getStats(); console.log('[SSEManager] Connection Stats:'); console.log(` Active sessions: ${stats.totalSessions}`); console.log(` Total connections: ${stats.totalConnections}`); console.log(` Sessions:`, stats.sessions); console.log(` Total created: ${stats.totalCreated}`); console.log(` Total closed: ${stats.totalClosed}`); console.log(` Active heartbeats: ${stats.activeHeartbeats}`); } } // Singleton instance const sseManager = new SSEManager(); // Log stats periodically (every 5 minutes in production) if (process.env.NODE_ENV === 'production') { setInterval(() => { sseManager.logStats(); }, 5 * 60 * 1000); } module.exports = sseManager;