/** * SessionEventStream - Client-side SSE connection manager * * Manages Server-Sent Events connections for real-time session updates. * Handles automatic reconnection with exponential backoff. */ class SessionEventStream { /** * Create a new SSE connection * @param {string} sessionId - Session ID to connect to * @param {Object} options - Configuration options */ constructor(sessionId, options = {}) { this.sessionId = sessionId; this.options = { reconnectInterval: 1000, // Initial reconnect delay (ms) maxReconnectInterval: 30000, // Max reconnect delay (ms) reconnectDecay: 1.5, // Exponential backoff multiplier maxReconnectAttempts: 10, // Maximum reconnection attempts heartbeatTimeout: 60000, // Timeout for heartbeat (ms) ...options }; this.eventSource = null; this.reconnectAttempts = 0; this.isConnected = false; this.isReconnecting = false; this.listeners = new Map(); this.lastHeartbeat = Date.now(); this.heartbeatCheckInterval = null; console.log(`[SSE] Creating SessionEventStream for ${sessionId}`); // Connect immediately this.connect(); } /** * Establish SSE connection */ connect() { if (this.eventSource) { this.eventSource.close(); } // Build URL with cache-busting const url = `/api/session/${this.sessionId}/events?t=${Date.now()}`; console.log(`[SSE] Connecting to ${url}`); try { this.eventSource = new EventSource(url); this._setupEventHandlers(); } catch (error) { console.error(`[SSE] Error creating EventSource:`, error); this._handleReconnect(); } } /** * Setup EventSource event handlers * @private */ _setupEventHandlers() { // Connection opened this.eventSource.onopen = () => { console.log(`[SSE] ✓ Connected to session ${this.sessionId}`); this.isConnected = true; this.isReconnecting = false; this.reconnectAttempts = 0; // Start heartbeat check this.startHeartbeatCheck(); // Emit connected event this.emit('connected', { sessionId: this.sessionId, timestamp: Date.now() }); }; // Handle named events this.eventSource.addEventListener('connected', (e) => { const data = this._parseEvent(e); console.log(`[SSE] Connected event received:`, data); this.emit('connected', data); this.lastHeartbeat = Date.now(); }); this.eventSource.addEventListener('session-output', (e) => { const data = this._parseEvent(e); this.emit('output', data); this.lastHeartbeat = Date.now(); }); this.eventSource.addEventListener('session-error', (e) => { const data = this._parseEvent(e); console.error(`[SSE] Session error:`, data); this.emit('error', data); this.lastHeartbeat = Date.now(); }); this.eventSource.addEventListener('session-status', (e) => { const data = this._parseEvent(e); this.emit('status', data); this.lastHeartbeat = Date.now(); }); this.eventSource.addEventListener('operations-detected', (e) => { const data = this._parseEvent(e); console.log(`[SSE] Operations detected:`, data.operations?.length || 0); this.emit('operations-detected', data); this.lastHeartbeat = Date.now(); }); this.eventSource.addEventListener('operations-executed', (e) => { const data = this._parseEvent(e); console.log(`[SSE] Operations executed:`, data.results?.length || 0); this.emit('operations-executed', data); this.lastHeartbeat = Date.now(); }); this.eventSource.addEventListener('operations-error', (e) => { const data = this._parseEvent(e); console.error(`[SSE] Operations error:`, data); this.emit('operations-error', data); this.lastHeartbeat = Date.now(); }); this.eventSource.addEventListener('approval-request', (e) => { const data = this._parseEvent(e); console.log(`[SSE] Approval request:`, data.command); this.emit('approval-request', data); this.lastHeartbeat = Date.now(); }); this.eventSource.addEventListener('approval-confirmed', (e) => { const data = this._parseEvent(e); console.log(`[SSE] Approval confirmed:`, data.approved); this.emit('approval-confirmed', data); this.lastHeartbeat = Date.now(); }); this.eventSource.addEventListener('approval-expired', (e) => { const data = this._parseEvent(e); console.log(`[SSE] Approval expired:`, data.approvalId); this.emit('approval-expired', data); this.lastHeartbeat = Date.now(); }); this.eventSource.addEventListener('command-sent', (e) => { const data = this._parseEvent(e); this.emit('command-sent', data); this.lastHeartbeat = Date.now(); }); this.eventSource.addEventListener('shutdown', (e) => { const data = this._parseEvent(e); console.warn(`[SSE] Server shutdown`); this.emit('shutdown', data); this.disconnect(); }); // Default message handler (catch-all) this.eventSource.onmessage = (e) => { // Heartbeat comments (lines starting with ':') are handled automatically if (e.data.startsWith(':')) { this.lastHeartbeat = Date.now(); return; } try { const data = this._parseEvent(e); this.emit('message', data); } catch (error) { console.error('[SSE] Error parsing message:', error); } }; // Handle errors this.eventSource.onerror = (e) => { console.error(`[SSE] Connection error:`, e); this.isConnected = false; if (this.heartbeatCheckInterval) { clearInterval(this.heartbeatCheckInterval); this.heartbeatCheckInterval = null; } // Check if it's a permanent error (404, etc.) if (this.eventSource.readyState === EventSource.CLOSED) { console.error(`[SSE] Connection closed permanently`); this.emit('disconnected', { sessionId: this.sessionId, permanent: true, reason: 'connection_closed' }); return; } // Attempt reconnect this._handleReconnect(); }; } /** * Parse SSE event data * @private */ _parseEvent(e) { try { return JSON.parse(e.data); } catch (error) { console.error('[SSE] Error parsing event data:', e.data); return { raw: e.data }; } } /** * Handle reconnection with exponential backoff * @private */ _handleReconnect() { if (this.reconnectAttempts >= this.options.maxReconnectAttempts) { console.error(`[SSE] Max reconnect attempts reached (${this.options.maxReconnectAttempts})`); this.emit('disconnected', { sessionId: this.sessionId, permanent: true, reason: 'max_reconnect_attempts', attempts: this.reconnectAttempts }); return; } if (this.isReconnecting) { return; // Already reconnecting } this.isReconnecting = true; // Calculate delay with exponential backoff const delay = Math.min( this.options.reconnectInterval * Math.pow(this.options.reconnectDecay, this.reconnectAttempts), this.options.maxReconnectInterval ); this.reconnectAttempts++; console.log(`[SSE] Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts}/${this.options.maxReconnectAttempts})`); this.emit('reconnecting', { attempt: this.reconnectAttempts, delay, maxAttempts: this.options.maxReconnectAttempts }); setTimeout(() => { this.isReconnecting = false; this.connect(); }, delay); } /** * Start heartbeat monitoring */ startHeartbeatCheck() { if (this.heartbeatCheckInterval) { clearInterval(this.heartbeatCheckInterval); } this.heartbeatCheckInterval = setInterval(() => { const timeSinceLastHeartbeat = Date.now() - this.lastHeartbeat; if (timeSinceLastHeartbeat > this.options.heartbeatTimeout) { console.warn(`[SSE] Heartbeat timeout (${timeSinceLastHeartbeat}ms), forcing reconnect`); this.eventSource.close(); this._handleReconnect(); } }, this.options.heartbeatTimeout / 2); } /** * Register event listener * @param {string} event - Event name * @param {Function} handler - Event handler function */ on(event, handler) { if (!this.listeners.has(event)) { this.listeners.set(event, []); } this.listeners.get(event).push(handler); } /** * Remove event listener * @param {string} event - Event name * @param {Function} handler - Event handler function */ off(event, handler) { if (!this.listeners.has(event)) { return; } const handlers = this.listeners.get(event); const index = handlers.indexOf(handler); if (index > -1) { handlers.splice(index, 1); } } /** * Emit event to all registered listeners * @private */ emit(event, data) { if (!this.listeners.has(event)) { return; } for (const handler of this.listeners.get(event)) { try { handler(data); } catch (error) { console.error(`[SSE] Error in ${event} handler:`, error); } } } /** * Disconnect and cleanup */ disconnect() { console.log(`[SSE] Disconnecting from session ${this.sessionId}`); if (this.heartbeatCheckInterval) { clearInterval(this.heartbeatCheckInterval); this.heartbeatCheckInterval = null; } if (this.eventSource) { this.eventSource.close(); this.eventSource = null; } this.isConnected = false; this.isReconnecting = false; this.listeners.clear(); } /** * Get connection status * @returns {Object} Connection status */ getStatus() { return { sessionId: this.sessionId, isConnected: this.isConnected, isReconnecting: this.isReconnecting, reconnectAttempts: this.reconnectAttempts, lastHeartbeat: this.lastHeartbeat, readyState: this.eventSource ? this.eventSource.readyState : -1 }; } } // Export for use in UI if (typeof module !== 'undefined' && module.exports) { module.exports = SessionEventStream; } else { window.SessionEventStream = SessionEventStream; }