- Modified loadChatHistory() to check for active project before fetching all sessions - When active project exists, use project.sessions instead of fetching from API - Added detailed console logging to debug session filtering - This prevents ALL sessions from appearing in every project's sidebar Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
48 KiB
Backend Refactor Plan: Route-Based Session Context with SSE
Executive Summary
This document outlines a comprehensive plan to refactor the Obsidian Web Interface from a WebSocket-based architecture to a hybrid approach using route-based session context with Server-Sent Events (SSE) streaming. This eliminates session context ambiguity and provides clearer, more maintainable code.
Current State:
- WebSocket at
/claude/api/claude/chat(session via query/subscription message) - Session IDs passed via query parameters or WebSocket messages
- Context ambiguity when multiple tabs/sessions are open
- Complex client-side session management
Target State:
- Route-based URLs with explicit session context
- SSE streaming at
/api/session/:sessionId/events - Clear separation: UI routes vs API endpoints
- No session context ambiguity
1. New URL Architecture
1.1 Route Structure
# UI Routes (serve HTML/JS)
GET /terminal/:sessionId → Terminal UI for specific session
GET /session/:sessionId → Session detail page
GET /sessions → Session list page
# API Endpoints (session-scoped)
GET /api/session/:sessionId/events → SSE event stream
POST /api/session/:sessionId/prompt → Send command/prompt
GET /api/session/:sessionId/status → Session status/health
GET /api/session/:sessionId/context → Session context files
POST /api/session/:sessionId/operations/preview → Preview operations
POST /api/session/:sessionId/operations/execute → Execute operations
DELETE /api/session/:sessionId → Delete/terminate session
# Management Endpoints
GET /api/sessions → List all sessions
POST /api/sessions → Create new session
POST /api/sessions/:id/duplicate → Duplicate session
POST /api/sessions/:id/fork → Fork session
POST /api/sessions/:id/move → Move to project
# Legacy Routes (deprecated, redirect to new structure)
/claude/api/claude/chat → WebSocket (deprecated)
/claude/api/claude/sessions → Use /api/sessions instead
1.2 URL Validation
// Session ID validation
const SESSION_ID_REGEX = /^[a-zA-Z0-9_-]{8,32}$/;
function validateSessionId(sessionId: string): boolean {
return SESSION_ID_REGEX.test(sessionId);
}
// Middleware for session validation
function validateSession(req, res, next) {
const { sessionId } = req.params;
if (!validateSessionId(sessionId)) {
return res.status(400).json({
error: 'Invalid session ID format',
sessionId
});
}
// Check session exists
const session = claudeService.getSession(sessionId);
if (!session) {
return res.status(404).json({
error: 'Session not found',
sessionId
});
}
req.sessionContext = session;
next();
}
2. EventBus Implementation
2.1 File Structure
/home/uroma/obsidian-web-interface/
├── services/
│ ├── event-bus.js ← NEW: EventBus implementation
│ ├── sse-manager.js ← NEW: SSE connection management
│ ├── claude-service.js ← MODIFIED: Emit events instead of callbacks
│ ├── terminal-service.js ← MODIFIED: Use EventBus
│ └── database.js
├── routes/
│ ├── session-routes.js ← NEW: Session-specific routes
│ ├── terminal-routes.js ← NEW: Terminal-specific routes
│ └── sse-routes.js ← NEW: SSE endpoint
└── server.js ← MODIFIED: Route registration
2.2 EventBus API
// /home/uroma/obsidian-web-interface/services/event-bus.js
const EventEmitter = require('events');
class EventBus extends EventEmitter {
constructor() {
super();
this.setMaxListeners(0); // Unlimited listeners
this.metrics = {
eventsEmitted: 0,
eventsByType: new Map(),
listenerCounts: new Map()
};
}
/**
* Subscribe to an event type
* @param {string} eventType - Event type (e.g., 'session-output', 'session-error')
* @param {string} sessionId - Session ID to filter events (optional, null for all)
* @param {Function} handler - Event handler function
* @returns {Function} Unsubscribe function
*/
subscribe(eventType, sessionId, handler) {
const listenerId = `${eventType}-${sessionId || 'global'}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
const wrappedHandler = (data) => {
// Filter by session ID if specified
if (sessionId !== null && data.sessionId !== sessionId) {
return;
}
try {
handler(data);
} catch (error) {
console.error(`[EventBus] Error in handler for ${eventType}:`, error);
}
};
this.on(eventType, wrappedHandler);
// Track listener
const key = `${eventType}-${sessionId || 'global'}`;
this.metrics.listenerCounts.set(key, (this.metrics.listenerCounts.get(key) || 0) + 1);
// Return unsubscribe function
return () => {
this.off(eventType, wrappedHandler);
this.metrics.listenerCounts.set(key, Math.max(0, (this.metrics.listenerCounts.get(key) || 0) - 1));
};
}
/**
* Emit an event
* @param {string} eventType - Event type
* @param {Object} data - Event data (must include sessionId for session-scoped events)
*/
emit(eventType, data = {}) {
this.metrics.eventsEmitted++;
this.metrics.eventsByType.set(eventType, (this.metrics.eventsByType.get(eventType) || 0) + 1);
// Add timestamp to all events
const eventData = {
...data,
_timestamp: Date.now(),
_eventType: eventType
};
super.emit(eventType, eventData);
}
/**
* Subscribe to all events for a session
* @param {string} sessionId - Session ID
* @param {Function} handler - Handler for all session events
* @returns {Function} Unsubscribe function
*/
subscribeToSession(sessionId, handler) {
const eventTypes = [
'session-output',
'session-error',
'session-status',
'operations-detected',
'operations-executed',
'operations-error',
'approval-request',
'approval-confirmed',
'approval-expired',
'session-created',
'session-deleted'
];
const unsubscribers = eventTypes.map(type =>
this.subscribe(type, sessionId, handler)
);
// Return combined unsubscribe function
return () => unsubscribers.forEach(unsub => unsub());
}
/**
* Get metrics
*/
getMetrics() {
return {
eventsEmitted: this.metrics.eventsEmitted,
eventsByType: Object.fromEntries(this.metrics.eventsByType),
listenerCounts: Object.fromEntries(this.metrics.listenerCounts),
activeListeners: this.listenerCount('session-output') +
this.listenerCount('session-error') +
this.listenerCount('session-status')
};
}
/**
* Clear all listeners (useful for testing)
*/
clear() {
this.removeAllListeners();
this.metrics = {
eventsEmitted: 0,
eventsByType: new Map(),
listenerCounts: new Map()
};
}
}
// Singleton instance
const eventBus = new EventBus();
module.exports = eventBus;
2.3 Event Types
// Event type definitions
// Session lifecycle events
eventBus.emit('session-created', { sessionId, mode, workingDir });
eventBus.emit('session-deleted', { sessionId });
// Session output events
eventBus.emit('session-output', {
sessionId,
type: 'stdout' | 'stderr' | 'exit' | 'ready',
content: 'output text',
timestamp: Date.now()
});
eventBus.emit('session-error', {
sessionId,
error: 'Error message',
code: 'ERR_CODE',
recoverable: true
});
eventBus.emit('session-status', {
sessionId,
status: 'running' | 'idle' | 'exited' | 'error',
pid: 12345,
uptime: 45000
});
// Operation events
eventBus.emit('operations-detected', {
sessionId,
operations: [{ type: 'write', path: '/path', content: '...' }],
response: 'full response text'
});
eventBus.emit('operations-executed', {
sessionId,
results: [{ success: true, path: '/path', operation: 'write' }]
});
eventBus.emit('operations-error', {
sessionId,
error: 'Error message',
operations: []
});
// Approval events
eventBus.emit('approval-request', {
sessionId,
approvalId: 'apr-123',
command: 'rm -rf /',
explanation: 'Remove files',
expiresAt: Date.now() + 300000
});
eventBus.emit('approval-confirmed', {
sessionId,
approvalId: 'apr-123',
approved: true,
customCommand: null
});
eventBus.emit('approval-expired', {
sessionId,
approvalId: 'apr-123'
});
3. SSE Endpoint Implementation
3.1 SSE Manager
// /home/uroma/obsidian-web-interface/services/sse-manager.js
const eventBus = require('./event-bus');
class SSEManager {
constructor() {
// Map: sessionId -> Set of response objects
this.connections = new Map();
this.heartbeatInterval = 30000; // 30 seconds
this.heartbeatTimers = new Map();
}
/**
* Add SSE connection for a session
* @param {string} sessionId - Session ID
* @param {Object} res - Express response object
* @param {Object} req - Express request object
*/
addConnection(sessionId, res, req) {
// Setup SSE headers
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache, no-transform');
res.setHeader('X-Accel-Buffering', 'no'); // Disable nginx buffering
res.setHeader('Connection', 'keep-alive');
res.flushHeaders();
// Add connection to set
if (!this.connections.has(sessionId)) {
this.connections.set(sessionId, new Set());
}
const connectionSet = this.connections.get(sessionId);
connectionSet.add(res);
console.log(`[SSEManager] Connection added for session ${sessionId}. Total connections: ${connectionSet.size}`);
// Send initial connection event
this.sendEvent(sessionId, res, {
type: 'connected',
sessionId,
timestamp: Date.now(),
message: 'SSE connection established'
});
// Start heartbeat for this connection
this.startHeartbeat(sessionId, res);
// Subscribe to all session events
const unsubscribe = eventBus.subscribeToSession(sessionId, (eventData) => {
this.sendEvent(sessionId, res, eventData);
});
// Handle client disconnect
req.on('close', () => {
this.removeConnection(sessionId, res, unsubscribe);
});
// Send immediate status update
eventBus.emit('session-status', {
sessionId,
status: 'connected'
});
}
/**
* Send SSE event to a specific connection
* @param {string} sessionId - Session ID
* @param {Object} res - Response object
* @param {Object} data - Event data
*/
sendEvent(sessionId, res, data) {
if (res.destroyed || res.writableEnded) {
return false;
}
try {
const eventName = data.type || 'message';
const eventData = JSON.stringify(data);
res.write(`event: ${eventName}\n`);
res.write(`data: ${eventData}\n`);
res.write(`id: ${Date.now()}\n`);
res.write('\n');
return true;
} catch (error) {
console.error(`[SSEManager] Error sending event to ${sessionId}:`, error);
return false;
}
}
/**
* Send event to all connections for a session
* @param {string} sessionId - Session ID
* @param {Object} data - Event data
*/
broadcastToSession(sessionId, data) {
const connectionSet = this.connections.get(sessionId);
if (!connectionSet) {
return 0;
}
let sentCount = 0;
const deadConnections = [];
for (const res of connectionSet) {
if (!this.sendEvent(sessionId, res, data)) {
deadConnections.push(res);
} else {
sentCount++;
}
}
// Clean up dead connections
deadConnections.forEach(res => {
connectionSet.delete(res);
});
return sentCount;
}
/**
* Start heartbeat for a connection
* @param {string} sessionId - Session ID
* @param {Object} res - Response object
*/
startHeartbeat(sessionId, res) {
const timerId = setInterval(() => {
if (res.destroyed || res.writableEnded) {
clearInterval(timerId);
this.heartbeatTimers.delete(`${sessionId}-${res.socket?.remotePort}`);
return;
}
// Send heartbeat comment
try {
res.write(': heartbeat\n\n');
} catch (error) {
clearInterval(timerId);
this.removeConnection(sessionId, res, () => {});
}
}, this.heartbeatInterval);
this.heartbeatTimers.set(`${sessionId}-${res.socket?.remotePort}`, timerId);
}
/**
* Remove SSE connection
* @param {string} sessionId - Session ID
* @param {Object} res - Response object
* @param {Function} unsubscribe - Unsubscribe function from EventBus
*/
removeConnection(sessionId, res, unsubscribe) {
const connectionSet = this.connections.get(sessionId);
if (connectionSet) {
connectionSet.delete(res);
if (connectionSet.size === 0) {
this.connections.delete(sessionId);
}
console.log(`[SSEManager] Connection removed for session ${sessionId}. Remaining: ${connectionSet.size || 0}`);
}
// Stop heartbeat
const timerId = this.heartbeatTimers.get(`${sessionId}-${res.socket?.remotePort}`);
if (timerId) {
clearInterval(timerId);
this.heartbeatTimers.delete(`${sessionId}-${res.socket?.remotePort}`);
}
// Unsubscribe from events
if (unsubscribe) {
unsubscribe();
}
}
/**
* Get connection count for a session
* @param {string} sessionId - Session ID
*/
getConnectionCount(sessionId) {
return this.connections.get(sessionId)?.size || 0;
}
/**
* Get all active sessions
*/
getActiveSessions() {
return Array.from(this.connections.keys());
}
/**
* Clean up all connections (for shutdown)
*/
cleanup() {
console.log('[SSEManager] Cleaning up all connections...');
for (const [sessionId, connectionSet] of this.connections.entries()) {
for (const res of connectionSet) {
try {
res.end();
} catch (error) {
// Ignore errors during cleanup
}
}
}
// Clear all heartbeat timers
for (const timerId of this.heartbeatTimers.values()) {
clearInterval(timerId);
}
this.connections.clear();
this.heartbeatTimers.clear();
console.log('[SSEManager] Cleanup complete');
}
}
// Singleton instance
const sseManager = new SSEManager();
module.exports = sseManager;
3.2 SSE Route
// /home/uroma/obsidian-web-interface/routes/sse-routes.js
const express = require('express');
const sseManager = require('../services/sse-manager');
const { validateSessionId } = require('../middleware/validation');
const router = express.Router();
/**
* SSE endpoint for session events
* GET /api/session/:sessionId/events
*/
router.get('/session/:sessionId/events', validateSessionId, (req, res) => {
const { sessionId } = req.params;
console.log(`[SSE] New connection for session ${sessionId}`);
// Add SSE connection
sseManager.addConnection(sessionId, res, req);
// Note: Response is kept open for SSE streaming
});
/**
* Get connection status for a session
* GET /api/session/:sessionId/events/status
*/
router.get('/session/:sessionId/events/status', validateSessionId, (req, res) => {
const { sessionId } = req.params;
res.json({
sessionId,
activeConnections: sseManager.getConnectionCount(sessionId),
timestamp: Date.now()
});
});
module.exports = router;
4. Route Implementation
4.1 Session Routes
// /home/uroma/obsidian-web-interface/routes/session-routes.js
const express = require('express');
const claudeService = require('../services/claude-service');
const eventBus = require('../services/event-bus');
const { validateSessionId } = require('../middleware/validation');
const router = express.Router();
/**
* Send command/prompt to session
* POST /api/session/:sessionId/prompt
*/
router.post('/session/:sessionId/prompt', validateSessionId, async (req, res) => {
const { sessionId } = req.params;
const { command, context } = req.body;
if (!command) {
return res.status(400).json({
error: 'Command is required'
});
}
try {
// Send command to Claude service
await claudeService.sendCommand(sessionId, command);
// Response will come via SSE
res.json({
success: true,
sessionId,
message: 'Command sent',
timestamp: Date.now()
});
// Emit command-sent event
eventBus.emit('command-sent', {
sessionId,
command: command.substring(0, 100) + (command.length > 100 ? '...' : ''),
timestamp: Date.now()
});
} catch (error) {
console.error(`[SessionRoutes] Error sending command:`, error);
// Emit error event
eventBus.emit('session-error', {
sessionId,
error: error.message,
code: 'COMMAND_SEND_ERROR',
recoverable: false
});
res.status(500).json({
error: 'Failed to send command',
message: error.message
});
}
});
/**
* Get session status
* GET /api/session/:sessionId/status
*/
router.get('/session/:sessionId/status', validateSessionId, (req, res) => {
const { sessionId } = req.params;
try {
const session = claudeService.getSession(sessionId);
if (!session) {
return res.status(404).json({
error: 'Session not found',
sessionId
});
}
res.json({
sessionId: session.id,
status: session.status,
mode: session.mode,
createdAt: session.createdAt,
lastActivity: session.lastActivity,
pid: process.pid,
uptime: Date.now() - new Date(session.createdAt).getTime()
});
} catch (error) {
console.error(`[SessionRoutes] Error getting status:`, error);
res.status(500).json({
error: 'Failed to get session status',
message: error.message
});
}
});
/**
* Get session context files
* GET /api/session/:sessionId/context
*/
router.get('/session/:sessionId/context', validateSessionId, async (req, res) => {
const { sessionId } = req.params;
try {
const context = await claudeService.getSessionContext(sessionId);
res.json({
sessionId,
context: context.files,
timestamp: Date.now()
});
} catch (error) {
console.error(`[SessionRoutes] Error getting context:`, error);
res.status(500).json({
error: 'Failed to get session context',
message: error.message
});
}
});
/**
* Preview operations for session
* POST /api/session/:sessionId/operations/preview
*/
router.post('/session/:sessionId/operations/preview', validateSessionId, async (req, res) => {
const { sessionId } = req.params;
const { response } = req.body;
if (!response) {
return res.status(400).json({
error: 'Response is required'
});
}
try {
const operations = await claudeService.previewOperations(sessionId, response);
// Emit operations-detected event
eventBus.emit('operations-detected', {
sessionId,
operations,
response
});
res.json({
success: true,
operations,
count: operations.length
});
} catch (error) {
console.error(`[SessionRoutes] Error previewing operations:`, error);
eventBus.emit('session-error', {
sessionId,
error: error.message,
code: 'PREVIEW_ERROR',
recoverable: true
});
res.status(500).json({
error: 'Failed to preview operations',
message: error.message
});
}
});
/**
* Execute operations for session
* POST /api/session/:sessionId/operations/execute
*/
router.post('/session/:sessionId/operations/execute', validateSessionId, async (req, res) => {
const { sessionId } = req.params;
const { operations } = req.body;
if (!operations || !Array.isArray(operations)) {
return res.status(400).json({
error: 'Operations array is required'
});
}
try {
const results = await claudeService.executeOperations(sessionId, operations);
// Emit operations-executed event
eventBus.emit('operations-executed', {
sessionId,
results
});
res.json({
success: true,
results,
executed: results.length
});
} catch (error) {
console.error(`[SessionRoutes] Error executing operations:`, error);
eventBus.emit('operations-error', {
sessionId,
error: error.message,
operations
});
res.status(500).json({
error: 'Failed to execute operations',
message: error.message
});
}
});
/**
* Delete/terminate session
* DELETE /api/session/:sessionId
*/
router.delete('/session/:sessionId', validateSessionId, async (req, res) => {
const { sessionId } = req.params;
try {
await claudeService.deleteSession(sessionId);
// Emit session-deleted event
eventBus.emit('session-deleted', {
sessionId,
timestamp: Date.now()
});
res.json({
success: true,
message: 'Session deleted',
sessionId
});
} catch (error) {
console.error(`[SessionRoutes] Error deleting session:`, error);
res.status(500).json({
error: 'Failed to delete session',
message: error.message
});
}
});
module.exports = router;
4.2 Terminal Routes
// /home/uroma/obsidian-web-interface/routes/terminal-routes.js
const express = require('express');
const terminalService = require('../services/terminal-service');
const eventBus = require('../services/event-bus');
const router = express.Router();
/**
* Create new terminal
* POST /api/terminal
*/
router.post('/terminal', async (req, res) => {
const { workingDir, mode, sessionId } = req.body;
try {
const result = terminalService.createTerminal({
workingDir,
mode,
sessionId
});
if (result.success) {
// Emit terminal-created event
eventBus.emit('terminal-created', {
terminalId: result.terminalId,
sessionId,
workingDir,
mode
});
res.json({
success: true,
terminalId: result.terminalId
});
} else {
res.status(500).json({
error: result.error
});
}
} catch (error) {
console.error('[TerminalRoutes] Error creating terminal:', error);
res.status(500).json({
error: 'Failed to create terminal',
message: error.message
});
}
});
/**
* Get terminal output (polling endpoint, kept for compatibility)
* GET /api/terminal/:terminalId/output
*/
router.get('/terminal/:terminalId/output', (req, res) => {
const { terminalId } = req.params;
const sinceIndex = parseInt(req.query.since) || 0;
const result = terminalService.getTerminalOutput(terminalId, sinceIndex);
if (result.success) {
res.json(result);
} else {
res.status(404).json({
error: result.error
});
}
});
/**
* Send input to terminal
* POST /api/terminal/:terminalId/input
*/
router.post('/terminal/:terminalId/input', (req, res) => {
const { terminalId } = req.params;
const { data } = req.body;
if (!data) {
return res.status(400).json({
error: 'Input data is required'
});
}
const result = terminalService.sendTerminalInput(terminalId, data);
if (result.success) {
res.json({ success: true });
} else {
res.status(500).json({
error: result.error
});
}
});
/**
* Close terminal
* DELETE /api/terminal/:terminalId
*/
router.delete('/terminal/:terminalId', (req, res) => {
const { terminalId } = req.params;
const result = terminalService.closeTerminal(terminalId);
if (result.success) {
eventBus.emit('terminal-closed', {
terminalId
});
res.json({ success: true });
} else {
res.status(500).json({
error: result.error
});
}
});
module.exports = router;
5. Integration with Existing Code
5.1 Modify ClaudeService to use EventBus
// In services/claude-service.js
const eventBus = require('./event-bus');
class ClaudeCodeService {
constructor() {
// ... existing code ...
}
// Replace callback-based approach with EventBus
sendCommand(sessionId, command) {
const session = this.sessions.get(sessionId);
if (!session) {
throw new Error(`Session ${sessionId} not found`);
}
// ... send command to process ...
// Instead of callback, emit event
eventBus.emit('command-sent', {
sessionId,
command,
timestamp: Date.now()
});
}
// When output is received from Claude Code process
handleOutput(sessionId, output) {
// Emit event instead of callback
eventBus.emit('session-output', {
sessionId,
type: output.type || 'stdout',
content: output.content || output,
timestamp: Date.now()
});
}
// When operations are detected
handleOperationsDetected(sessionId, operations, response) {
eventBus.emit('operations-detected', {
sessionId,
operations,
response
});
}
// When operations are executed
handleOperationsExecuted(sessionId, results) {
eventBus.emit('operations-executed', {
sessionId,
results
});
}
}
5.2 Update server.js
// In server.js
const eventBus = require('./services/event-bus');
const sseManager = require('./services/sse-manager');
const sessionRoutes = require('./routes/session-routes');
const terminalRoutes = require('./routes/terminal-routes');
const sseRoutes = require('./routes/sse-routes');
// ... existing middleware ...
// Register new routes
app.use('/api', sessionRoutes);
app.use('/api', terminalRoutes);
app.use('/api', sseRoutes);
// Keep legacy routes for backward compatibility (with deprecation warnings)
app.use('/claude/api/claude', (req, res, next) => {
console.warn('[DEPRECATION] /claude/api/claude/* routes are deprecated. Use /api/session/:sessionId/* instead.');
next();
});
// ... existing WebSocket code (deprecated) ...
// Graceful shutdown
process.on('SIGTERM', async () => {
console.log('[Server] SIGTERM received, shutting down gracefully...');
// Cleanup SSE connections
sseManager.cleanup();
// ... existing cleanup ...
process.exit(0);
});
6. Client-Side Reconnection Logic
6.1 SSE Client Implementation
// Client-side SSE connection manager
class SessionEventStream {
constructor(sessionId, options = {}) {
this.sessionId = sessionId;
this.options = {
reconnectInterval: 1000, // Initial reconnect delay
maxReconnectInterval: 30000, // Max reconnect delay
reconnectDecay: 1.5, // Exponential backoff multiplier
maxReconnectAttempts: 10,
heartbeatTimeout: 60000, // Timeout for heartbeat
...options
};
this.eventSource = null;
this.reconnectAttempts = 0;
this.isConnected = false;
this.listeners = new Map();
this.lastHeartbeat = Date.now();
this.heartbeatCheckInterval = null;
// Connect immediately
this.connect();
}
connect() {
if (this.eventSource) {
this.eventSource.close();
}
const url = `/api/session/${this.sessionId}/events`;
console.log(`[SSE] Connecting to ${url}`);
this.eventSource = new EventSource(url);
// Connection opened
this.eventSource.onopen = () => {
console.log(`[SSE] Connected to session ${this.sessionId}`);
this.isConnected = true;
this.reconnectAttempts = 0;
// Start heartbeat check
this.startHeartbeatCheck();
// Emit connected event
this.emit('connected', {
sessionId: this.sessionId,
timestamp: Date.now()
});
};
// Handle named events
this.eventSource.addEventListener('connected', (e) => {
const data = JSON.parse(e.data);
this.emit('connected', data);
this.lastHeartbeat = Date.now();
});
this.eventSource.addEventListener('session-output', (e) => {
const data = JSON.parse(e.data);
this.emit('output', data);
this.lastHeartbeat = Date.now();
});
this.eventSource.addEventListener('session-error', (e) => {
const data = JSON.parse(e.data);
this.emit('error', data);
this.lastHeartbeat = Date.now();
});
this.eventSource.addEventListener('session-status', (e) => {
const data = JSON.parse(e.data);
this.emit('status', data);
this.lastHeartbeat = Date.now();
});
this.eventSource.addEventListener('operations-detected', (e) => {
const data = JSON.parse(e.data);
this.emit('operations-detected', data);
this.lastHeartbeat = Date.now();
});
this.eventSource.addEventListener('operations-executed', (e) => {
const data = JSON.parse(e.data);
this.emit('operations-executed', data);
this.lastHeartbeat = Date.now();
});
this.eventSource.addEventListener('approval-request', (e) => {
const data = JSON.parse(e.data);
this.emit('approval-request', data);
this.lastHeartbeat = Date.now();
});
// Default message handler
this.eventSource.onmessage = (e) => {
// Heartbeat comments are handled automatically
if (e.data.startsWith(':')) {
this.lastHeartbeat = Date.now();
return;
}
try {
const data = JSON.parse(e.data);
this.emit('message', data);
} catch (error) {
console.error('[SSE] Error parsing message:', error);
}
};
// Handle errors
this.eventSource.onerror = (e) => {
console.error(`[SSE] Error:`, e);
this.isConnected = false;
if (this.heartbeatCheckInterval) {
clearInterval(this.heartbeatCheckInterval);
this.heartbeatCheckInterval = null;
}
// Don't reconnect if it's a 404 (session not found)
if (e.target && e.target.readyState === 2) {
console.error(`[SSE] Connection closed permanently`);
this.emit('disconnected', {
sessionId: this.sessionId,
permanent: true
});
return;
}
// Attempt reconnect
if (this.reconnectAttempts < this.options.maxReconnectAttempts) {
const delay = Math.min(
this.options.reconnectInterval * Math.pow(this.options.reconnectDecay, this.reconnectAttempts),
this.options.maxReconnectInterval
);
this.reconnectAttempts++;
console.log(`[SSE] Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts}/${this.options.maxReconnectAttempts})`);
this.emit('reconnecting', {
attempt: this.reconnectAttempts,
delay,
maxAttempts: this.options.maxReconnectAttempts
});
setTimeout(() => this.connect(), delay);
} else {
console.error(`[SSE] Max reconnect attempts reached`);
this.emit('disconnected', {
sessionId: this.sessionId,
permanent: true,
reason: 'max_reconnect_attempts'
});
}
};
}
startHeartbeatCheck() {
if (this.heartbeatCheckInterval) {
clearInterval(this.heartbeatCheckInterval);
}
this.heartbeatCheckInterval = setInterval(() => {
const timeSinceLastHeartbeat = Date.now() - this.lastHeartbeat;
if (timeSinceLastHeartbeat > this.options.heartbeatTimeout) {
console.warn(`[SSE] Heartbeat timeout (${timeSinceLastHeartbeat}ms)`);
this.eventSource.close();
this.connect(); // Force reconnect
}
}, this.options.heartbeatTimeout / 2);
}
on(event, handler) {
if (!this.listeners.has(event)) {
this.listeners.set(event, []);
}
this.listeners.get(event).push(handler);
}
off(event, handler) {
if (!this.listeners.has(event)) {
return;
}
const handlers = this.listeners.get(event);
const index = handlers.indexOf(handler);
if (index > -1) {
handlers.splice(index, 1);
}
}
emit(event, data) {
if (!this.listeners.has(event)) {
return;
}
for (const handler of this.listeners.get(event)) {
try {
handler(data);
} catch (error) {
console.error(`[SSE] Error in handler for ${event}:`, error);
}
}
}
disconnect() {
console.log(`[SSE] Disconnecting from session ${this.sessionId}`);
if (this.heartbeatCheckInterval) {
clearInterval(this.heartbeatCheckInterval);
}
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
this.isConnected = false;
this.listeners.clear();
}
}
// Export for use in UI
window.SessionEventStream = SessionEventStream;
6.2 Usage Example
// In the terminal UI or session page
const sessionId = 'session-123'; // From URL parameter
const eventStream = new SessionEventStream(sessionId, {
reconnectInterval: 2000,
maxReconnectAttempts: 20
});
// Listen for output
eventStream.on('output', (data) => {
console.log('Output:', data.content);
// Update UI
appendToTerminal(data.content);
});
// Listen for errors
eventStream.on('error', (data) => {
console.error('Session error:', data.error);
showError(data.error);
});
// Listen for connection status
eventStream.on('connected', () => {
showConnectionStatus('Connected');
});
eventStream.on('reconnecting', (data) => {
showConnectionStatus(`Reconnecting... (${data.attempt}/${data.maxAttempts})`);
});
// Send command
async function sendCommand(command) {
const response = await fetch(`/api/session/${sessionId}/prompt`, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({ command })
});
const result = await response.json();
if (!result.success) {
console.error('Failed to send command:', result.error);
}
}
// Cleanup on page unload
window.addEventListener('beforeunload', () => {
eventStream.disconnect();
});
7. Migration Path
7.1 Phase 1: Add New Infrastructure (Non-Breaking)
Duration: 1-2 weeks
Tasks:
- Implement EventBus service
- Implement SSE Manager service
- Create new route modules (session-routes, terminal-routes, sse-routes)
- Add new SSE endpoint alongside existing WebSocket
- Deploy new code without removing old functionality
Risk: Low - No breaking changes
Testing:
- Unit tests for EventBus
- Integration tests for SSE connections
- Load testing for SSE streaming
- Verify WebSocket still works
7.2 Phase 2: Migrate Client-Side (Gradual)
Duration: 2-3 weeks
Tasks:
- Update IDE UI to use SSE for new sessions
- Keep WebSocket for existing sessions (feature flag)
- Add telemetry to track usage patterns
- Fix any issues discovered during migration
Risk: Medium - Client-side changes
Testing:
- A/B testing: SSE vs WebSocket
- Monitor error rates
- Performance comparison
- User feedback
7.3 Phase 3: Deprecate WebSocket
Duration: 1-2 weeks
Tasks:
- Add deprecation warnings to WebSocket endpoint
- Redirect WebSocket connections to SSE with message
- Update all documentation
- Add migration guide for any custom clients
Risk: Low - Only affects legacy clients
Testing:
- Verify deprecation warnings appear
- Test redirect logic
- Confirm new clients use SSE
7.4 Phase 4: Remove Legacy Code
Duration: 1 week
Tasks:
- Remove WebSocket server code
- Remove old client-side WebSocket code
- Clean up unused middleware
- Update API documentation
Risk: Low - All clients migrated
Testing:
- Full regression test
- Load testing
- Security audit
8. Risk Assessment & Mitigation
8.1 High-Risk Areas
| Risk | Impact | Probability | Mitigation |
|---|---|---|---|
| SSE connection drops | High | Medium | Implement exponential backoff reconnection, heartbeat monitoring |
| nginx buffering issues | High | Medium | Add X-Accel-Buffering: no header, configure nginx properly |
| EventBus memory leaks | High | Low | Implement listener cleanup, add metrics/monitoring |
| Session context confusion during migration | High | Medium | Use feature flags, clear versioning, extensive logging |
| Client compatibility | Medium | Low | Test on multiple browsers, provide polyfills if needed |
| Performance degradation | Medium | Low | Load testing, optimize event filtering, use compression |
8.2 nginx Configuration
# nginx configuration for SSE
location /api/session/ {
# Disable buffering for SSE
proxy_buffering off;
proxy_cache off;
# SSE-specific headers
proxy_pass http://localhost:3010;
proxy_http_version 1.1;
proxy_set_header Connection '';
proxy_set_header Cache-Control no-cache;
proxy_set_header X-Accel-Buffering no;
# Timeouts
proxy_read_timeout 86400s; # 24 hours
proxy_send_timeout 86400s;
# Disable buffering
proxy_buffering off;
}
8.3 Monitoring & Observability
// Add monitoring hooks
eventBus.on('newListener', (event) => {
console.log(`[EventBus] New listener for ${event}`);
metrics.increment('eventbus.listeners.added', { event });
});
eventBus.on('removeListener', (event) => {
console.log(`[EventBus] Removed listener for ${event}`);
metrics.increment('eventbus.listeners.removed', { event });
});
// SSE connection monitoring
setInterval(() => {
const activeSessions = sseManager.getActiveSessions();
const metrics = eventBus.getMetrics();
console.log('[Metrics]', {
activeSSEConnections: activeSessions.length,
sessions: activeSessions,
eventsEmitted: metrics.eventsEmitted,
activeListeners: metrics.activeListeners
});
// Send to monitoring service
monitoring.gauge('sse.connections', activeSessions.length);
monitoring.gauge('eventbus.listeners', metrics.activeListeners);
}, 60000); // Every minute
9. Testing Strategy
9.1 Unit Tests
// tests/event-bus.test.js
describe('EventBus', () => {
it('should emit and receive events', (done) => {
eventBus.subscribe('test-event', null, (data) => {
assert.equal(data.test, 'value');
done();
});
eventBus.emit('test-event', { test: 'value' });
});
it('should filter events by session ID', (done) => {
const handler = jest.fn();
eventBus.subscribe('test-event', 'session-1', handler);
eventBus.emit('test-event', { sessionId: 'session-1' });
eventBus.emit('test-event', { sessionId: 'session-2' });
expect(handler).toHaveBeenCalledTimes(1);
expect(handler).toHaveBeenCalledWith(expect.objectContaining({
sessionId: 'session-1'
}));
done();
});
it('should unsubscribe correctly', (done) => {
const handler = jest.fn();
const unsubscribe = eventBus.subscribe('test-event', null, handler);
unsubscribe();
eventBus.emit('test-event', { test: 'value' });
expect(handler).not.toHaveBeenCalled();
done();
});
});
9.2 Integration Tests
// tests/sse-integration.test.js
describe('SSE Integration', () => {
it('should establish SSE connection and receive events', async () => {
const sessionId = 'test-session-' + Date.now();
// Create session
const session = await createSession(sessionId);
// Connect to SSE
const events = [];
const eventSource = new EventSource(`/api/session/${sessionId}/events`);
eventSource.addEventListener('connected', (e) => {
events.push(JSON.parse(e.data));
// Send command
fetch(`/api/session/${sessionId}/prompt`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ command: 'echo test' })
});
});
eventSource.addEventListener('session-output', (e) => {
events.push(JSON.parse(e.data));
eventSource.close();
});
// Wait for events
await waitFor(() => events.length >= 2);
assert.equal(events[0].type, 'connected');
assert.equal(events[1].type, 'session-output');
});
it('should handle reconnection after connection drop', async () => {
// Test reconnection logic
});
});
9.3 Load Testing
// tests/load-test.js
// Simulate 100 concurrent SSE connections
const connections = [];
for (let i = 0; i < 100; i++) {
const sessionId = `load-test-${i}`;
const eventSource = new EventSource(`/api/session/${sessionId}/events`);
connections.push({ sessionId, eventSource });
}
// Monitor memory usage, CPU, response times
// ...
// Cleanup
connections.forEach(conn => conn.eventSource.close());
10. File Structure Summary
/home/uroma/obsidian-web-interface/
├── services/
│ ├── event-bus.js ← NEW
│ ├── sse-manager.js ← NEW
│ ├── claude-service.js ← MODIFY
│ ├── terminal-service.js ← MODIFY
│ └── database.js ← NO CHANGE
├── routes/
│ ├── session-routes.js ← NEW
│ ├── terminal-routes.js ← NEW
│ └── sse-routes.js ← NEW
├── middleware/
│ └── validation.js ← NEW (session ID validation)
├── public/
│ ├── js/
│ │ ├── sse-client.js ← NEW
│ │ └── session-ui.js ← MODIFY
│ └── ...
├── tests/
│ ├── event-bus.test.js ← NEW
│ ├── sse-manager.test.js ← NEW
│ └── integration.test.js ← NEW
├── server.js ← MODIFY (add route registration)
└── package.json ← MODIFY (no new deps needed)
11. Dependencies
No new npm dependencies required! SSE is built into Node.js/Express:
- EventEmitter: Built into Node.js
- Express: Already installed
- EventSource: Built into modern browsers
Optional additions:
@types/node- For TypeScript support (if migrating to TS)supertest- For testing HTTP endpoints
12. Success Criteria
12.1 Functional Requirements
- SSE endpoint accepts connections for valid session IDs
- SSE endpoint returns 404 for invalid session IDs
- Session events are streamed to connected clients
- Multiple clients can connect to same session
- Reconnection works with exponential backoff
- Heartbeat prevents connection drops
- No session context ambiguity
12.2 Non-Functional Requirements
- Response time < 100ms for event delivery
- Support 100+ concurrent SSE connections
- Memory usage stable over 24 hours
- No memory leaks in EventBus
- Error rate < 0.1%
- WebSocket deprecated with clear migration path
12.3 Migration Success
- All clients using SSE (WebSocket usage < 5%)
- No increase in error rates during migration
- User feedback positive
- Documentation complete and accurate
13. Rollback Plan
If issues arise during deployment:
-
Immediate Rollback (Phase 1-2):
- Disable new SSE routes via feature flag
- Continue using WebSocket
- Zero downtime
-
Partial Rollback (Phase 3):
- Keep SSE available
- Re-enable WebSocket as fallback
- Investigate issues
-
Code Rollback:
- Revert to previous commit
- Restore database if schema changed
- Restart services
-
Data Migration:
- No data migration required (stateless design)
14. Documentation Updates
14.1 API Documentation
Update README.md and API docs with:
- New SSE endpoint documentation
- Event type reference
- Code examples for client-side usage
- Migration guide from WebSocket to SSE
14.2 Architecture Diagrams
Create/update diagrams showing:
- Event flow from Claude Service to clients via EventBus
- SSE connection lifecycle
- Reconnection logic flow
15. Next Steps
- Review this plan with team and stakeholders
- Create GitHub issues for each phase
- Set up feature flags for gradual rollout
- Implement EventBus (Phase 1)
- Write tests for EventBus
- Implement SSE Manager (Phase 1)
- Deploy Phase 1 to staging environment
- Monitor metrics and gather feedback
- Proceed to Phase 2 when confident
Appendix A: Code Snippets
A.1 Session ID Validation Middleware
// middleware/validation.js
const SESSION_ID_PATTERN = /^[a-zA-Z0-9_-]{8,32}$/;
function validateSessionId(req, res, next) {
const { sessionId } = req.params;
if (!sessionId) {
return res.status(400).json({
error: 'Session ID is required'
});
}
if (!SESSION_ID_PATTERN.test(sessionId)) {
return res.status(400).json({
error: 'Invalid session ID format',
pattern: '8-32 characters, alphanumeric, underscore, hyphen'
});
}
// Check session exists (optional, can be done in route handler)
const claudeService = require('../services/claude-service');
const session = claudeService.getSession(sessionId);
if (!session) {
return res.status(404).json({
error: 'Session not found',
sessionId
});
}
req.sessionContext = session;
next();
}
module.exports = { validateSessionId };
A.2 Complete server.js Integration
// In server.js - add these sections
// After existing imports
const eventBus = require('./services/event-bus');
const sseManager = require('./services/sse-manager');
const sessionRoutes = require('./routes/session-routes');
const terminalRoutes = require('./routes/terminal-routes');
const sseRoutes = require('./routes/sse-routes');
// After existing middleware
// Register new API routes
app.use('/api', sessionRoutes);
app.use('/api', terminalRoutes);
app.use('/api', sseRoutes);
// Deprecated routes (with warnings)
app.use('/claude/api/claude', (req, res, next) => {
console.warn('[DEPRECATED] /claude/api/claude/* is deprecated. Use /api/session/:sessionId/* instead.');
console.warn('[DEPRECATED] WebSocket connections should migrate to SSE: /api/session/:sessionId/events');
next();
});
// Monitoring endpoint
app.get('/api/debug/metrics', (req, res) => {
res.json({
eventBus: eventBus.getMetrics(),
sse: {
activeSessions: sseManager.getActiveSessions(),
totalConnections: Array.from(sseManager.connections.values())
.reduce((sum, set) => sum + set.size, 0)
}
});
});
// Graceful shutdown
const cleanup = async () => {
console.log('[Server] Starting graceful shutdown...');
// Cleanup SSE connections
sseManager.cleanup();
// ... existing cleanup code ...
process.exit(0);
};
process.on('SIGTERM', cleanup);
process.on('SIGINT', cleanup);
End of Refactor Plan
Last Updated: 2025-01-21 Version: 1.0 Author: Backend Architecture Team