# Backend Refactor Plan: Route-Based Session Context with SSE ## Executive Summary This document outlines a comprehensive plan to refactor the Obsidian Web Interface from a WebSocket-based architecture to a hybrid approach using **route-based session context** with **Server-Sent Events (SSE)** streaming. This eliminates session context ambiguity and provides clearer, more maintainable code. **Current State:** - WebSocket at `/claude/api/claude/chat` (session via query/subscription message) - Session IDs passed via query parameters or WebSocket messages - Context ambiguity when multiple tabs/sessions are open - Complex client-side session management **Target State:** - Route-based URLs with explicit session context - SSE streaming at `/api/session/:sessionId/events` - Clear separation: UI routes vs API endpoints - No session context ambiguity --- ## 1. New URL Architecture ### 1.1 Route Structure ``` # UI Routes (serve HTML/JS) GET /terminal/:sessionId → Terminal UI for specific session GET /session/:sessionId → Session detail page GET /sessions → Session list page # API Endpoints (session-scoped) GET /api/session/:sessionId/events → SSE event stream POST /api/session/:sessionId/prompt → Send command/prompt GET /api/session/:sessionId/status → Session status/health GET /api/session/:sessionId/context → Session context files POST /api/session/:sessionId/operations/preview → Preview operations POST /api/session/:sessionId/operations/execute → Execute operations DELETE /api/session/:sessionId → Delete/terminate session # Management Endpoints GET /api/sessions → List all sessions POST /api/sessions → Create new session POST /api/sessions/:id/duplicate → Duplicate session POST /api/sessions/:id/fork → Fork session POST /api/sessions/:id/move → Move to project # Legacy Routes (deprecated, redirect to new structure) /claude/api/claude/chat → WebSocket (deprecated) /claude/api/claude/sessions → Use /api/sessions instead ``` ### 1.2 URL Validation ```typescript // Session ID validation const SESSION_ID_REGEX = /^[a-zA-Z0-9_-]{8,32}$/; function validateSessionId(sessionId: string): boolean { return SESSION_ID_REGEX.test(sessionId); } // Middleware for session validation function validateSession(req, res, next) { const { sessionId } = req.params; if (!validateSessionId(sessionId)) { return res.status(400).json({ error: 'Invalid session ID format', sessionId }); } // Check session exists const session = claudeService.getSession(sessionId); if (!session) { return res.status(404).json({ error: 'Session not found', sessionId }); } req.sessionContext = session; next(); } ``` --- ## 2. EventBus Implementation ### 2.1 File Structure ``` /home/uroma/obsidian-web-interface/ ├── services/ │ ├── event-bus.js ← NEW: EventBus implementation │ ├── sse-manager.js ← NEW: SSE connection management │ ├── claude-service.js ← MODIFIED: Emit events instead of callbacks │ ├── terminal-service.js ← MODIFIED: Use EventBus │ └── database.js ├── routes/ │ ├── session-routes.js ← NEW: Session-specific routes │ ├── terminal-routes.js ← NEW: Terminal-specific routes │ └── sse-routes.js ← NEW: SSE endpoint └── server.js ← MODIFIED: Route registration ``` ### 2.2 EventBus API ```javascript // /home/uroma/obsidian-web-interface/services/event-bus.js const EventEmitter = require('events'); class EventBus extends EventEmitter { constructor() { super(); this.setMaxListeners(0); // Unlimited listeners this.metrics = { eventsEmitted: 0, eventsByType: new Map(), listenerCounts: new Map() }; } /** * Subscribe to an event type * @param {string} eventType - Event type (e.g., 'session-output', 'session-error') * @param {string} sessionId - Session ID to filter events (optional, null for all) * @param {Function} handler - Event handler function * @returns {Function} Unsubscribe function */ subscribe(eventType, sessionId, handler) { const listenerId = `${eventType}-${sessionId || 'global'}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; const wrappedHandler = (data) => { // Filter by session ID if specified if (sessionId !== null && data.sessionId !== sessionId) { return; } try { handler(data); } catch (error) { console.error(`[EventBus] Error in handler for ${eventType}:`, error); } }; this.on(eventType, wrappedHandler); // Track listener const key = `${eventType}-${sessionId || 'global'}`; this.metrics.listenerCounts.set(key, (this.metrics.listenerCounts.get(key) || 0) + 1); // Return unsubscribe function return () => { this.off(eventType, wrappedHandler); this.metrics.listenerCounts.set(key, Math.max(0, (this.metrics.listenerCounts.get(key) || 0) - 1)); }; } /** * Emit an event * @param {string} eventType - Event type * @param {Object} data - Event data (must include sessionId for session-scoped events) */ emit(eventType, data = {}) { this.metrics.eventsEmitted++; this.metrics.eventsByType.set(eventType, (this.metrics.eventsByType.get(eventType) || 0) + 1); // Add timestamp to all events const eventData = { ...data, _timestamp: Date.now(), _eventType: eventType }; super.emit(eventType, eventData); } /** * Subscribe to all events for a session * @param {string} sessionId - Session ID * @param {Function} handler - Handler for all session events * @returns {Function} Unsubscribe function */ subscribeToSession(sessionId, handler) { const eventTypes = [ 'session-output', 'session-error', 'session-status', 'operations-detected', 'operations-executed', 'operations-error', 'approval-request', 'approval-confirmed', 'approval-expired', 'session-created', 'session-deleted' ]; const unsubscribers = eventTypes.map(type => this.subscribe(type, sessionId, handler) ); // Return combined unsubscribe function return () => unsubscribers.forEach(unsub => unsub()); } /** * Get metrics */ getMetrics() { return { eventsEmitted: this.metrics.eventsEmitted, eventsByType: Object.fromEntries(this.metrics.eventsByType), listenerCounts: Object.fromEntries(this.metrics.listenerCounts), activeListeners: this.listenerCount('session-output') + this.listenerCount('session-error') + this.listenerCount('session-status') }; } /** * Clear all listeners (useful for testing) */ clear() { this.removeAllListeners(); this.metrics = { eventsEmitted: 0, eventsByType: new Map(), listenerCounts: new Map() }; } } // Singleton instance const eventBus = new EventBus(); module.exports = eventBus; ``` ### 2.3 Event Types ```javascript // Event type definitions // Session lifecycle events eventBus.emit('session-created', { sessionId, mode, workingDir }); eventBus.emit('session-deleted', { sessionId }); // Session output events eventBus.emit('session-output', { sessionId, type: 'stdout' | 'stderr' | 'exit' | 'ready', content: 'output text', timestamp: Date.now() }); eventBus.emit('session-error', { sessionId, error: 'Error message', code: 'ERR_CODE', recoverable: true }); eventBus.emit('session-status', { sessionId, status: 'running' | 'idle' | 'exited' | 'error', pid: 12345, uptime: 45000 }); // Operation events eventBus.emit('operations-detected', { sessionId, operations: [{ type: 'write', path: '/path', content: '...' }], response: 'full response text' }); eventBus.emit('operations-executed', { sessionId, results: [{ success: true, path: '/path', operation: 'write' }] }); eventBus.emit('operations-error', { sessionId, error: 'Error message', operations: [] }); // Approval events eventBus.emit('approval-request', { sessionId, approvalId: 'apr-123', command: 'rm -rf /', explanation: 'Remove files', expiresAt: Date.now() + 300000 }); eventBus.emit('approval-confirmed', { sessionId, approvalId: 'apr-123', approved: true, customCommand: null }); eventBus.emit('approval-expired', { sessionId, approvalId: 'apr-123' }); ``` --- ## 3. SSE Endpoint Implementation ### 3.1 SSE Manager ```javascript // /home/uroma/obsidian-web-interface/services/sse-manager.js const eventBus = require('./event-bus'); class SSEManager { constructor() { // Map: sessionId -> Set of response objects this.connections = new Map(); this.heartbeatInterval = 30000; // 30 seconds this.heartbeatTimers = new Map(); } /** * Add SSE connection for a session * @param {string} sessionId - Session ID * @param {Object} res - Express response object * @param {Object} req - Express request object */ addConnection(sessionId, res, req) { // Setup SSE headers res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache, no-transform'); res.setHeader('X-Accel-Buffering', 'no'); // Disable nginx buffering res.setHeader('Connection', 'keep-alive'); res.flushHeaders(); // Add connection to set if (!this.connections.has(sessionId)) { this.connections.set(sessionId, new Set()); } const connectionSet = this.connections.get(sessionId); connectionSet.add(res); console.log(`[SSEManager] Connection added for session ${sessionId}. Total connections: ${connectionSet.size}`); // Send initial connection event this.sendEvent(sessionId, res, { type: 'connected', sessionId, timestamp: Date.now(), message: 'SSE connection established' }); // Start heartbeat for this connection this.startHeartbeat(sessionId, res); // Subscribe to all session events const unsubscribe = eventBus.subscribeToSession(sessionId, (eventData) => { this.sendEvent(sessionId, res, eventData); }); // Handle client disconnect req.on('close', () => { this.removeConnection(sessionId, res, unsubscribe); }); // Send immediate status update eventBus.emit('session-status', { sessionId, status: 'connected' }); } /** * Send SSE event to a specific connection * @param {string} sessionId - Session ID * @param {Object} res - Response object * @param {Object} data - Event data */ sendEvent(sessionId, res, data) { if (res.destroyed || res.writableEnded) { return false; } try { const eventName = data.type || 'message'; const eventData = JSON.stringify(data); res.write(`event: ${eventName}\n`); res.write(`data: ${eventData}\n`); res.write(`id: ${Date.now()}\n`); res.write('\n'); return true; } catch (error) { console.error(`[SSEManager] Error sending event to ${sessionId}:`, error); return false; } } /** * Send event to all connections for a session * @param {string} sessionId - Session ID * @param {Object} data - Event data */ broadcastToSession(sessionId, data) { const connectionSet = this.connections.get(sessionId); if (!connectionSet) { return 0; } let sentCount = 0; const deadConnections = []; for (const res of connectionSet) { if (!this.sendEvent(sessionId, res, data)) { deadConnections.push(res); } else { sentCount++; } } // Clean up dead connections deadConnections.forEach(res => { connectionSet.delete(res); }); return sentCount; } /** * Start heartbeat for a connection * @param {string} sessionId - Session ID * @param {Object} res - Response object */ startHeartbeat(sessionId, res) { const timerId = setInterval(() => { if (res.destroyed || res.writableEnded) { clearInterval(timerId); this.heartbeatTimers.delete(`${sessionId}-${res.socket?.remotePort}`); return; } // Send heartbeat comment try { res.write(': heartbeat\n\n'); } catch (error) { clearInterval(timerId); this.removeConnection(sessionId, res, () => {}); } }, this.heartbeatInterval); this.heartbeatTimers.set(`${sessionId}-${res.socket?.remotePort}`, timerId); } /** * Remove SSE connection * @param {string} sessionId - Session ID * @param {Object} res - Response object * @param {Function} unsubscribe - Unsubscribe function from EventBus */ removeConnection(sessionId, res, unsubscribe) { const connectionSet = this.connections.get(sessionId); if (connectionSet) { connectionSet.delete(res); if (connectionSet.size === 0) { this.connections.delete(sessionId); } console.log(`[SSEManager] Connection removed for session ${sessionId}. Remaining: ${connectionSet.size || 0}`); } // Stop heartbeat const timerId = this.heartbeatTimers.get(`${sessionId}-${res.socket?.remotePort}`); if (timerId) { clearInterval(timerId); this.heartbeatTimers.delete(`${sessionId}-${res.socket?.remotePort}`); } // Unsubscribe from events if (unsubscribe) { unsubscribe(); } } /** * Get connection count for a session * @param {string} sessionId - Session ID */ getConnectionCount(sessionId) { return this.connections.get(sessionId)?.size || 0; } /** * Get all active sessions */ getActiveSessions() { return Array.from(this.connections.keys()); } /** * Clean up all connections (for shutdown) */ cleanup() { console.log('[SSEManager] Cleaning up all connections...'); for (const [sessionId, connectionSet] of this.connections.entries()) { for (const res of connectionSet) { try { res.end(); } catch (error) { // Ignore errors during cleanup } } } // Clear all heartbeat timers for (const timerId of this.heartbeatTimers.values()) { clearInterval(timerId); } this.connections.clear(); this.heartbeatTimers.clear(); console.log('[SSEManager] Cleanup complete'); } } // Singleton instance const sseManager = new SSEManager(); module.exports = sseManager; ``` ### 3.2 SSE Route ```javascript // /home/uroma/obsidian-web-interface/routes/sse-routes.js const express = require('express'); const sseManager = require('../services/sse-manager'); const { validateSessionId } = require('../middleware/validation'); const router = express.Router(); /** * SSE endpoint for session events * GET /api/session/:sessionId/events */ router.get('/session/:sessionId/events', validateSessionId, (req, res) => { const { sessionId } = req.params; console.log(`[SSE] New connection for session ${sessionId}`); // Add SSE connection sseManager.addConnection(sessionId, res, req); // Note: Response is kept open for SSE streaming }); /** * Get connection status for a session * GET /api/session/:sessionId/events/status */ router.get('/session/:sessionId/events/status', validateSessionId, (req, res) => { const { sessionId } = req.params; res.json({ sessionId, activeConnections: sseManager.getConnectionCount(sessionId), timestamp: Date.now() }); }); module.exports = router; ``` --- ## 4. Route Implementation ### 4.1 Session Routes ```javascript // /home/uroma/obsidian-web-interface/routes/session-routes.js const express = require('express'); const claudeService = require('../services/claude-service'); const eventBus = require('../services/event-bus'); const { validateSessionId } = require('../middleware/validation'); const router = express.Router(); /** * Send command/prompt to session * POST /api/session/:sessionId/prompt */ router.post('/session/:sessionId/prompt', validateSessionId, async (req, res) => { const { sessionId } = req.params; const { command, context } = req.body; if (!command) { return res.status(400).json({ error: 'Command is required' }); } try { // Send command to Claude service await claudeService.sendCommand(sessionId, command); // Response will come via SSE res.json({ success: true, sessionId, message: 'Command sent', timestamp: Date.now() }); // Emit command-sent event eventBus.emit('command-sent', { sessionId, command: command.substring(0, 100) + (command.length > 100 ? '...' : ''), timestamp: Date.now() }); } catch (error) { console.error(`[SessionRoutes] Error sending command:`, error); // Emit error event eventBus.emit('session-error', { sessionId, error: error.message, code: 'COMMAND_SEND_ERROR', recoverable: false }); res.status(500).json({ error: 'Failed to send command', message: error.message }); } }); /** * Get session status * GET /api/session/:sessionId/status */ router.get('/session/:sessionId/status', validateSessionId, (req, res) => { const { sessionId } = req.params; try { const session = claudeService.getSession(sessionId); if (!session) { return res.status(404).json({ error: 'Session not found', sessionId }); } res.json({ sessionId: session.id, status: session.status, mode: session.mode, createdAt: session.createdAt, lastActivity: session.lastActivity, pid: process.pid, uptime: Date.now() - new Date(session.createdAt).getTime() }); } catch (error) { console.error(`[SessionRoutes] Error getting status:`, error); res.status(500).json({ error: 'Failed to get session status', message: error.message }); } }); /** * Get session context files * GET /api/session/:sessionId/context */ router.get('/session/:sessionId/context', validateSessionId, async (req, res) => { const { sessionId } = req.params; try { const context = await claudeService.getSessionContext(sessionId); res.json({ sessionId, context: context.files, timestamp: Date.now() }); } catch (error) { console.error(`[SessionRoutes] Error getting context:`, error); res.status(500).json({ error: 'Failed to get session context', message: error.message }); } }); /** * Preview operations for session * POST /api/session/:sessionId/operations/preview */ router.post('/session/:sessionId/operations/preview', validateSessionId, async (req, res) => { const { sessionId } = req.params; const { response } = req.body; if (!response) { return res.status(400).json({ error: 'Response is required' }); } try { const operations = await claudeService.previewOperations(sessionId, response); // Emit operations-detected event eventBus.emit('operations-detected', { sessionId, operations, response }); res.json({ success: true, operations, count: operations.length }); } catch (error) { console.error(`[SessionRoutes] Error previewing operations:`, error); eventBus.emit('session-error', { sessionId, error: error.message, code: 'PREVIEW_ERROR', recoverable: true }); res.status(500).json({ error: 'Failed to preview operations', message: error.message }); } }); /** * Execute operations for session * POST /api/session/:sessionId/operations/execute */ router.post('/session/:sessionId/operations/execute', validateSessionId, async (req, res) => { const { sessionId } = req.params; const { operations } = req.body; if (!operations || !Array.isArray(operations)) { return res.status(400).json({ error: 'Operations array is required' }); } try { const results = await claudeService.executeOperations(sessionId, operations); // Emit operations-executed event eventBus.emit('operations-executed', { sessionId, results }); res.json({ success: true, results, executed: results.length }); } catch (error) { console.error(`[SessionRoutes] Error executing operations:`, error); eventBus.emit('operations-error', { sessionId, error: error.message, operations }); res.status(500).json({ error: 'Failed to execute operations', message: error.message }); } }); /** * Delete/terminate session * DELETE /api/session/:sessionId */ router.delete('/session/:sessionId', validateSessionId, async (req, res) => { const { sessionId } = req.params; try { await claudeService.deleteSession(sessionId); // Emit session-deleted event eventBus.emit('session-deleted', { sessionId, timestamp: Date.now() }); res.json({ success: true, message: 'Session deleted', sessionId }); } catch (error) { console.error(`[SessionRoutes] Error deleting session:`, error); res.status(500).json({ error: 'Failed to delete session', message: error.message }); } }); module.exports = router; ``` ### 4.2 Terminal Routes ```javascript // /home/uroma/obsidian-web-interface/routes/terminal-routes.js const express = require('express'); const terminalService = require('../services/terminal-service'); const eventBus = require('../services/event-bus'); const router = express.Router(); /** * Create new terminal * POST /api/terminal */ router.post('/terminal', async (req, res) => { const { workingDir, mode, sessionId } = req.body; try { const result = terminalService.createTerminal({ workingDir, mode, sessionId }); if (result.success) { // Emit terminal-created event eventBus.emit('terminal-created', { terminalId: result.terminalId, sessionId, workingDir, mode }); res.json({ success: true, terminalId: result.terminalId }); } else { res.status(500).json({ error: result.error }); } } catch (error) { console.error('[TerminalRoutes] Error creating terminal:', error); res.status(500).json({ error: 'Failed to create terminal', message: error.message }); } }); /** * Get terminal output (polling endpoint, kept for compatibility) * GET /api/terminal/:terminalId/output */ router.get('/terminal/:terminalId/output', (req, res) => { const { terminalId } = req.params; const sinceIndex = parseInt(req.query.since) || 0; const result = terminalService.getTerminalOutput(terminalId, sinceIndex); if (result.success) { res.json(result); } else { res.status(404).json({ error: result.error }); } }); /** * Send input to terminal * POST /api/terminal/:terminalId/input */ router.post('/terminal/:terminalId/input', (req, res) => { const { terminalId } = req.params; const { data } = req.body; if (!data) { return res.status(400).json({ error: 'Input data is required' }); } const result = terminalService.sendTerminalInput(terminalId, data); if (result.success) { res.json({ success: true }); } else { res.status(500).json({ error: result.error }); } }); /** * Close terminal * DELETE /api/terminal/:terminalId */ router.delete('/terminal/:terminalId', (req, res) => { const { terminalId } = req.params; const result = terminalService.closeTerminal(terminalId); if (result.success) { eventBus.emit('terminal-closed', { terminalId }); res.json({ success: true }); } else { res.status(500).json({ error: result.error }); } }); module.exports = router; ``` --- ## 5. Integration with Existing Code ### 5.1 Modify ClaudeService to use EventBus ```javascript // In services/claude-service.js const eventBus = require('./event-bus'); class ClaudeCodeService { constructor() { // ... existing code ... } // Replace callback-based approach with EventBus sendCommand(sessionId, command) { const session = this.sessions.get(sessionId); if (!session) { throw new Error(`Session ${sessionId} not found`); } // ... send command to process ... // Instead of callback, emit event eventBus.emit('command-sent', { sessionId, command, timestamp: Date.now() }); } // When output is received from Claude Code process handleOutput(sessionId, output) { // Emit event instead of callback eventBus.emit('session-output', { sessionId, type: output.type || 'stdout', content: output.content || output, timestamp: Date.now() }); } // When operations are detected handleOperationsDetected(sessionId, operations, response) { eventBus.emit('operations-detected', { sessionId, operations, response }); } // When operations are executed handleOperationsExecuted(sessionId, results) { eventBus.emit('operations-executed', { sessionId, results }); } } ``` ### 5.2 Update server.js ```javascript // In server.js const eventBus = require('./services/event-bus'); const sseManager = require('./services/sse-manager'); const sessionRoutes = require('./routes/session-routes'); const terminalRoutes = require('./routes/terminal-routes'); const sseRoutes = require('./routes/sse-routes'); // ... existing middleware ... // Register new routes app.use('/api', sessionRoutes); app.use('/api', terminalRoutes); app.use('/api', sseRoutes); // Keep legacy routes for backward compatibility (with deprecation warnings) app.use('/claude/api/claude', (req, res, next) => { console.warn('[DEPRECATION] /claude/api/claude/* routes are deprecated. Use /api/session/:sessionId/* instead.'); next(); }); // ... existing WebSocket code (deprecated) ... // Graceful shutdown process.on('SIGTERM', async () => { console.log('[Server] SIGTERM received, shutting down gracefully...'); // Cleanup SSE connections sseManager.cleanup(); // ... existing cleanup ... process.exit(0); }); ``` --- ## 6. Client-Side Reconnection Logic ### 6.1 SSE Client Implementation ```javascript // Client-side SSE connection manager class SessionEventStream { constructor(sessionId, options = {}) { this.sessionId = sessionId; this.options = { reconnectInterval: 1000, // Initial reconnect delay maxReconnectInterval: 30000, // Max reconnect delay reconnectDecay: 1.5, // Exponential backoff multiplier maxReconnectAttempts: 10, heartbeatTimeout: 60000, // Timeout for heartbeat ...options }; this.eventSource = null; this.reconnectAttempts = 0; this.isConnected = false; this.listeners = new Map(); this.lastHeartbeat = Date.now(); this.heartbeatCheckInterval = null; // Connect immediately this.connect(); } connect() { if (this.eventSource) { this.eventSource.close(); } const url = `/api/session/${this.sessionId}/events`; console.log(`[SSE] Connecting to ${url}`); this.eventSource = new EventSource(url); // Connection opened this.eventSource.onopen = () => { console.log(`[SSE] Connected to session ${this.sessionId}`); this.isConnected = true; this.reconnectAttempts = 0; // Start heartbeat check this.startHeartbeatCheck(); // Emit connected event this.emit('connected', { sessionId: this.sessionId, timestamp: Date.now() }); }; // Handle named events this.eventSource.addEventListener('connected', (e) => { const data = JSON.parse(e.data); this.emit('connected', data); this.lastHeartbeat = Date.now(); }); this.eventSource.addEventListener('session-output', (e) => { const data = JSON.parse(e.data); this.emit('output', data); this.lastHeartbeat = Date.now(); }); this.eventSource.addEventListener('session-error', (e) => { const data = JSON.parse(e.data); this.emit('error', data); this.lastHeartbeat = Date.now(); }); this.eventSource.addEventListener('session-status', (e) => { const data = JSON.parse(e.data); this.emit('status', data); this.lastHeartbeat = Date.now(); }); this.eventSource.addEventListener('operations-detected', (e) => { const data = JSON.parse(e.data); this.emit('operations-detected', data); this.lastHeartbeat = Date.now(); }); this.eventSource.addEventListener('operations-executed', (e) => { const data = JSON.parse(e.data); this.emit('operations-executed', data); this.lastHeartbeat = Date.now(); }); this.eventSource.addEventListener('approval-request', (e) => { const data = JSON.parse(e.data); this.emit('approval-request', data); this.lastHeartbeat = Date.now(); }); // Default message handler this.eventSource.onmessage = (e) => { // Heartbeat comments are handled automatically if (e.data.startsWith(':')) { this.lastHeartbeat = Date.now(); return; } try { const data = JSON.parse(e.data); this.emit('message', data); } catch (error) { console.error('[SSE] Error parsing message:', error); } }; // Handle errors this.eventSource.onerror = (e) => { console.error(`[SSE] Error:`, e); this.isConnected = false; if (this.heartbeatCheckInterval) { clearInterval(this.heartbeatCheckInterval); this.heartbeatCheckInterval = null; } // Don't reconnect if it's a 404 (session not found) if (e.target && e.target.readyState === 2) { console.error(`[SSE] Connection closed permanently`); this.emit('disconnected', { sessionId: this.sessionId, permanent: true }); return; } // Attempt reconnect if (this.reconnectAttempts < this.options.maxReconnectAttempts) { const delay = Math.min( this.options.reconnectInterval * Math.pow(this.options.reconnectDecay, this.reconnectAttempts), this.options.maxReconnectInterval ); this.reconnectAttempts++; console.log(`[SSE] Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts}/${this.options.maxReconnectAttempts})`); this.emit('reconnecting', { attempt: this.reconnectAttempts, delay, maxAttempts: this.options.maxReconnectAttempts }); setTimeout(() => this.connect(), delay); } else { console.error(`[SSE] Max reconnect attempts reached`); this.emit('disconnected', { sessionId: this.sessionId, permanent: true, reason: 'max_reconnect_attempts' }); } }; } startHeartbeatCheck() { if (this.heartbeatCheckInterval) { clearInterval(this.heartbeatCheckInterval); } this.heartbeatCheckInterval = setInterval(() => { const timeSinceLastHeartbeat = Date.now() - this.lastHeartbeat; if (timeSinceLastHeartbeat > this.options.heartbeatTimeout) { console.warn(`[SSE] Heartbeat timeout (${timeSinceLastHeartbeat}ms)`); this.eventSource.close(); this.connect(); // Force reconnect } }, this.options.heartbeatTimeout / 2); } on(event, handler) { if (!this.listeners.has(event)) { this.listeners.set(event, []); } this.listeners.get(event).push(handler); } off(event, handler) { if (!this.listeners.has(event)) { return; } const handlers = this.listeners.get(event); const index = handlers.indexOf(handler); if (index > -1) { handlers.splice(index, 1); } } emit(event, data) { if (!this.listeners.has(event)) { return; } for (const handler of this.listeners.get(event)) { try { handler(data); } catch (error) { console.error(`[SSE] Error in handler for ${event}:`, error); } } } disconnect() { console.log(`[SSE] Disconnecting from session ${this.sessionId}`); if (this.heartbeatCheckInterval) { clearInterval(this.heartbeatCheckInterval); } if (this.eventSource) { this.eventSource.close(); this.eventSource = null; } this.isConnected = false; this.listeners.clear(); } } // Export for use in UI window.SessionEventStream = SessionEventStream; ``` ### 6.2 Usage Example ```javascript // In the terminal UI or session page const sessionId = 'session-123'; // From URL parameter const eventStream = new SessionEventStream(sessionId, { reconnectInterval: 2000, maxReconnectAttempts: 20 }); // Listen for output eventStream.on('output', (data) => { console.log('Output:', data.content); // Update UI appendToTerminal(data.content); }); // Listen for errors eventStream.on('error', (data) => { console.error('Session error:', data.error); showError(data.error); }); // Listen for connection status eventStream.on('connected', () => { showConnectionStatus('Connected'); }); eventStream.on('reconnecting', (data) => { showConnectionStatus(`Reconnecting... (${data.attempt}/${data.maxAttempts})`); }); // Send command async function sendCommand(command) { const response = await fetch(`/api/session/${sessionId}/prompt`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ command }) }); const result = await response.json(); if (!result.success) { console.error('Failed to send command:', result.error); } } // Cleanup on page unload window.addEventListener('beforeunload', () => { eventStream.disconnect(); }); ``` --- ## 7. Migration Path ### 7.1 Phase 1: Add New Infrastructure (Non-Breaking) **Duration:** 1-2 weeks **Tasks:** 1. Implement EventBus service 2. Implement SSE Manager service 3. Create new route modules (session-routes, terminal-routes, sse-routes) 4. Add new SSE endpoint alongside existing WebSocket 5. Deploy new code without removing old functionality **Risk:** Low - No breaking changes **Testing:** - Unit tests for EventBus - Integration tests for SSE connections - Load testing for SSE streaming - Verify WebSocket still works ### 7.2 Phase 2: Migrate Client-Side (Gradual) **Duration:** 2-3 weeks **Tasks:** 1. Update IDE UI to use SSE for new sessions 2. Keep WebSocket for existing sessions (feature flag) 3. Add telemetry to track usage patterns 4. Fix any issues discovered during migration **Risk:** Medium - Client-side changes **Testing:** - A/B testing: SSE vs WebSocket - Monitor error rates - Performance comparison - User feedback ### 7.3 Phase 3: Deprecate WebSocket **Duration:** 1-2 weeks **Tasks:** 1. Add deprecation warnings to WebSocket endpoint 2. Redirect WebSocket connections to SSE with message 3. Update all documentation 4. Add migration guide for any custom clients **Risk:** Low - Only affects legacy clients **Testing:** - Verify deprecation warnings appear - Test redirect logic - Confirm new clients use SSE ### 7.4 Phase 4: Remove Legacy Code **Duration:** 1 week **Tasks:** 1. Remove WebSocket server code 2. Remove old client-side WebSocket code 3. Clean up unused middleware 4. Update API documentation **Risk:** Low - All clients migrated **Testing:** - Full regression test - Load testing - Security audit --- ## 8. Risk Assessment & Mitigation ### 8.1 High-Risk Areas | Risk | Impact | Probability | Mitigation | |------|--------|-------------|------------| | **SSE connection drops** | High | Medium | Implement exponential backoff reconnection, heartbeat monitoring | | **nginx buffering issues** | High | Medium | Add `X-Accel-Buffering: no` header, configure nginx properly | | **EventBus memory leaks** | High | Low | Implement listener cleanup, add metrics/monitoring | | **Session context confusion during migration** | High | Medium | Use feature flags, clear versioning, extensive logging | | **Client compatibility** | Medium | Low | Test on multiple browsers, provide polyfills if needed | | **Performance degradation** | Medium | Low | Load testing, optimize event filtering, use compression | ### 8.2 nginx Configuration ```nginx # nginx configuration for SSE location /api/session/ { # Disable buffering for SSE proxy_buffering off; proxy_cache off; # SSE-specific headers proxy_pass http://localhost:3010; proxy_http_version 1.1; proxy_set_header Connection ''; proxy_set_header Cache-Control no-cache; proxy_set_header X-Accel-Buffering no; # Timeouts proxy_read_timeout 86400s; # 24 hours proxy_send_timeout 86400s; # Disable buffering proxy_buffering off; } ``` ### 8.3 Monitoring & Observability ```javascript // Add monitoring hooks eventBus.on('newListener', (event) => { console.log(`[EventBus] New listener for ${event}`); metrics.increment('eventbus.listeners.added', { event }); }); eventBus.on('removeListener', (event) => { console.log(`[EventBus] Removed listener for ${event}`); metrics.increment('eventbus.listeners.removed', { event }); }); // SSE connection monitoring setInterval(() => { const activeSessions = sseManager.getActiveSessions(); const metrics = eventBus.getMetrics(); console.log('[Metrics]', { activeSSEConnections: activeSessions.length, sessions: activeSessions, eventsEmitted: metrics.eventsEmitted, activeListeners: metrics.activeListeners }); // Send to monitoring service monitoring.gauge('sse.connections', activeSessions.length); monitoring.gauge('eventbus.listeners', metrics.activeListeners); }, 60000); // Every minute ``` --- ## 9. Testing Strategy ### 9.1 Unit Tests ```javascript // tests/event-bus.test.js describe('EventBus', () => { it('should emit and receive events', (done) => { eventBus.subscribe('test-event', null, (data) => { assert.equal(data.test, 'value'); done(); }); eventBus.emit('test-event', { test: 'value' }); }); it('should filter events by session ID', (done) => { const handler = jest.fn(); eventBus.subscribe('test-event', 'session-1', handler); eventBus.emit('test-event', { sessionId: 'session-1' }); eventBus.emit('test-event', { sessionId: 'session-2' }); expect(handler).toHaveBeenCalledTimes(1); expect(handler).toHaveBeenCalledWith(expect.objectContaining({ sessionId: 'session-1' })); done(); }); it('should unsubscribe correctly', (done) => { const handler = jest.fn(); const unsubscribe = eventBus.subscribe('test-event', null, handler); unsubscribe(); eventBus.emit('test-event', { test: 'value' }); expect(handler).not.toHaveBeenCalled(); done(); }); }); ``` ### 9.2 Integration Tests ```javascript // tests/sse-integration.test.js describe('SSE Integration', () => { it('should establish SSE connection and receive events', async () => { const sessionId = 'test-session-' + Date.now(); // Create session const session = await createSession(sessionId); // Connect to SSE const events = []; const eventSource = new EventSource(`/api/session/${sessionId}/events`); eventSource.addEventListener('connected', (e) => { events.push(JSON.parse(e.data)); // Send command fetch(`/api/session/${sessionId}/prompt`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ command: 'echo test' }) }); }); eventSource.addEventListener('session-output', (e) => { events.push(JSON.parse(e.data)); eventSource.close(); }); // Wait for events await waitFor(() => events.length >= 2); assert.equal(events[0].type, 'connected'); assert.equal(events[1].type, 'session-output'); }); it('should handle reconnection after connection drop', async () => { // Test reconnection logic }); }); ``` ### 9.3 Load Testing ```javascript // tests/load-test.js // Simulate 100 concurrent SSE connections const connections = []; for (let i = 0; i < 100; i++) { const sessionId = `load-test-${i}`; const eventSource = new EventSource(`/api/session/${sessionId}/events`); connections.push({ sessionId, eventSource }); } // Monitor memory usage, CPU, response times // ... // Cleanup connections.forEach(conn => conn.eventSource.close()); ``` --- ## 10. File Structure Summary ``` /home/uroma/obsidian-web-interface/ ├── services/ │ ├── event-bus.js ← NEW │ ├── sse-manager.js ← NEW │ ├── claude-service.js ← MODIFY │ ├── terminal-service.js ← MODIFY │ └── database.js ← NO CHANGE ├── routes/ │ ├── session-routes.js ← NEW │ ├── terminal-routes.js ← NEW │ └── sse-routes.js ← NEW ├── middleware/ │ └── validation.js ← NEW (session ID validation) ├── public/ │ ├── js/ │ │ ├── sse-client.js ← NEW │ │ └── session-ui.js ← MODIFY │ └── ... ├── tests/ │ ├── event-bus.test.js ← NEW │ ├── sse-manager.test.js ← NEW │ └── integration.test.js ← NEW ├── server.js ← MODIFY (add route registration) └── package.json ← MODIFY (no new deps needed) ``` --- ## 11. Dependencies No new npm dependencies required! SSE is built into Node.js/Express: - **EventEmitter**: Built into Node.js - **Express**: Already installed - **EventSource**: Built into modern browsers Optional additions: - `@types/node` - For TypeScript support (if migrating to TS) - `supertest` - For testing HTTP endpoints --- ## 12. Success Criteria ### 12.1 Functional Requirements - [ ] SSE endpoint accepts connections for valid session IDs - [ ] SSE endpoint returns 404 for invalid session IDs - [ ] Session events are streamed to connected clients - [ ] Multiple clients can connect to same session - [ ] Reconnection works with exponential backoff - [ ] Heartbeat prevents connection drops - [ ] No session context ambiguity ### 12.2 Non-Functional Requirements - [ ] Response time < 100ms for event delivery - [ ] Support 100+ concurrent SSE connections - [ ] Memory usage stable over 24 hours - [ ] No memory leaks in EventBus - [ ] Error rate < 0.1% - [ ] WebSocket deprecated with clear migration path ### 12.3 Migration Success - [ ] All clients using SSE (WebSocket usage < 5%) - [ ] No increase in error rates during migration - [ ] User feedback positive - [ ] Documentation complete and accurate --- ## 13. Rollback Plan If issues arise during deployment: 1. **Immediate Rollback (Phase 1-2):** - Disable new SSE routes via feature flag - Continue using WebSocket - Zero downtime 2. **Partial Rollback (Phase 3):** - Keep SSE available - Re-enable WebSocket as fallback - Investigate issues 3. **Code Rollback:** - Revert to previous commit - Restore database if schema changed - Restart services 4. **Data Migration:** - No data migration required (stateless design) --- ## 14. Documentation Updates ### 14.1 API Documentation Update README.md and API docs with: - New SSE endpoint documentation - Event type reference - Code examples for client-side usage - Migration guide from WebSocket to SSE ### 14.2 Architecture Diagrams Create/update diagrams showing: - Event flow from Claude Service to clients via EventBus - SSE connection lifecycle - Reconnection logic flow --- ## 15. Next Steps 1. **Review this plan** with team and stakeholders 2. **Create GitHub issues** for each phase 3. **Set up feature flags** for gradual rollout 4. **Implement EventBus** (Phase 1) 5. **Write tests** for EventBus 6. **Implement SSE Manager** (Phase 1) 7. **Deploy Phase 1** to staging environment 8. **Monitor metrics** and gather feedback 9. **Proceed to Phase 2** when confident --- ## Appendix A: Code Snippets ### A.1 Session ID Validation Middleware ```javascript // middleware/validation.js const SESSION_ID_PATTERN = /^[a-zA-Z0-9_-]{8,32}$/; function validateSessionId(req, res, next) { const { sessionId } = req.params; if (!sessionId) { return res.status(400).json({ error: 'Session ID is required' }); } if (!SESSION_ID_PATTERN.test(sessionId)) { return res.status(400).json({ error: 'Invalid session ID format', pattern: '8-32 characters, alphanumeric, underscore, hyphen' }); } // Check session exists (optional, can be done in route handler) const claudeService = require('../services/claude-service'); const session = claudeService.getSession(sessionId); if (!session) { return res.status(404).json({ error: 'Session not found', sessionId }); } req.sessionContext = session; next(); } module.exports = { validateSessionId }; ``` ### A.2 Complete server.js Integration ```javascript // In server.js - add these sections // After existing imports const eventBus = require('./services/event-bus'); const sseManager = require('./services/sse-manager'); const sessionRoutes = require('./routes/session-routes'); const terminalRoutes = require('./routes/terminal-routes'); const sseRoutes = require('./routes/sse-routes'); // After existing middleware // Register new API routes app.use('/api', sessionRoutes); app.use('/api', terminalRoutes); app.use('/api', sseRoutes); // Deprecated routes (with warnings) app.use('/claude/api/claude', (req, res, next) => { console.warn('[DEPRECATED] /claude/api/claude/* is deprecated. Use /api/session/:sessionId/* instead.'); console.warn('[DEPRECATED] WebSocket connections should migrate to SSE: /api/session/:sessionId/events'); next(); }); // Monitoring endpoint app.get('/api/debug/metrics', (req, res) => { res.json({ eventBus: eventBus.getMetrics(), sse: { activeSessions: sseManager.getActiveSessions(), totalConnections: Array.from(sseManager.connections.values()) .reduce((sum, set) => sum + set.size, 0) } }); }); // Graceful shutdown const cleanup = async () => { console.log('[Server] Starting graceful shutdown...'); // Cleanup SSE connections sseManager.cleanup(); // ... existing cleanup code ... process.exit(0); }; process.on('SIGTERM', cleanup); process.on('SIGINT', cleanup); ``` --- **End of Refactor Plan** Last Updated: 2025-01-21 Version: 1.0 Author: Backend Architecture Team