Files
SuperCharged-Claude-Code-Up…/SSE_REFACTOR_PLAN.md
uroma 55aafbae9a Fix project isolation: Make loadChatHistory respect active project sessions
- Modified loadChatHistory() to check for active project before fetching all sessions
- When active project exists, use project.sessions instead of fetching from API
- Added detailed console logging to debug session filtering
- This prevents ALL sessions from appearing in every project's sidebar

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-22 14:43:05 +00:00

1854 lines
48 KiB
Markdown

# Backend Refactor Plan: Route-Based Session Context with SSE
## Executive Summary
This document outlines a comprehensive plan to refactor the Obsidian Web Interface from a WebSocket-based architecture to a hybrid approach using **route-based session context** with **Server-Sent Events (SSE)** streaming. This eliminates session context ambiguity and provides clearer, more maintainable code.
**Current State:**
- WebSocket at `/claude/api/claude/chat` (session via query/subscription message)
- Session IDs passed via query parameters or WebSocket messages
- Context ambiguity when multiple tabs/sessions are open
- Complex client-side session management
**Target State:**
- Route-based URLs with explicit session context
- SSE streaming at `/api/session/:sessionId/events`
- Clear separation: UI routes vs API endpoints
- No session context ambiguity
---
## 1. New URL Architecture
### 1.1 Route Structure
```
# UI Routes (serve HTML/JS)
GET /terminal/:sessionId → Terminal UI for specific session
GET /session/:sessionId → Session detail page
GET /sessions → Session list page
# API Endpoints (session-scoped)
GET /api/session/:sessionId/events → SSE event stream
POST /api/session/:sessionId/prompt → Send command/prompt
GET /api/session/:sessionId/status → Session status/health
GET /api/session/:sessionId/context → Session context files
POST /api/session/:sessionId/operations/preview → Preview operations
POST /api/session/:sessionId/operations/execute → Execute operations
DELETE /api/session/:sessionId → Delete/terminate session
# Management Endpoints
GET /api/sessions → List all sessions
POST /api/sessions → Create new session
POST /api/sessions/:id/duplicate → Duplicate session
POST /api/sessions/:id/fork → Fork session
POST /api/sessions/:id/move → Move to project
# Legacy Routes (deprecated, redirect to new structure)
/claude/api/claude/chat → WebSocket (deprecated)
/claude/api/claude/sessions → Use /api/sessions instead
```
### 1.2 URL Validation
```typescript
// Session ID validation
const SESSION_ID_REGEX = /^[a-zA-Z0-9_-]{8,32}$/;
function validateSessionId(sessionId: string): boolean {
return SESSION_ID_REGEX.test(sessionId);
}
// Middleware for session validation
function validateSession(req, res, next) {
const { sessionId } = req.params;
if (!validateSessionId(sessionId)) {
return res.status(400).json({
error: 'Invalid session ID format',
sessionId
});
}
// Check session exists
const session = claudeService.getSession(sessionId);
if (!session) {
return res.status(404).json({
error: 'Session not found',
sessionId
});
}
req.sessionContext = session;
next();
}
```
---
## 2. EventBus Implementation
### 2.1 File Structure
```
/home/uroma/obsidian-web-interface/
├── services/
│ ├── event-bus.js ← NEW: EventBus implementation
│ ├── sse-manager.js ← NEW: SSE connection management
│ ├── claude-service.js ← MODIFIED: Emit events instead of callbacks
│ ├── terminal-service.js ← MODIFIED: Use EventBus
│ └── database.js
├── routes/
│ ├── session-routes.js ← NEW: Session-specific routes
│ ├── terminal-routes.js ← NEW: Terminal-specific routes
│ └── sse-routes.js ← NEW: SSE endpoint
└── server.js ← MODIFIED: Route registration
```
### 2.2 EventBus API
```javascript
// /home/uroma/obsidian-web-interface/services/event-bus.js
const EventEmitter = require('events');
class EventBus extends EventEmitter {
constructor() {
super();
this.setMaxListeners(0); // Unlimited listeners
this.metrics = {
eventsEmitted: 0,
eventsByType: new Map(),
listenerCounts: new Map()
};
}
/**
* Subscribe to an event type
* @param {string} eventType - Event type (e.g., 'session-output', 'session-error')
* @param {string} sessionId - Session ID to filter events (optional, null for all)
* @param {Function} handler - Event handler function
* @returns {Function} Unsubscribe function
*/
subscribe(eventType, sessionId, handler) {
const listenerId = `${eventType}-${sessionId || 'global'}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
const wrappedHandler = (data) => {
// Filter by session ID if specified
if (sessionId !== null && data.sessionId !== sessionId) {
return;
}
try {
handler(data);
} catch (error) {
console.error(`[EventBus] Error in handler for ${eventType}:`, error);
}
};
this.on(eventType, wrappedHandler);
// Track listener
const key = `${eventType}-${sessionId || 'global'}`;
this.metrics.listenerCounts.set(key, (this.metrics.listenerCounts.get(key) || 0) + 1);
// Return unsubscribe function
return () => {
this.off(eventType, wrappedHandler);
this.metrics.listenerCounts.set(key, Math.max(0, (this.metrics.listenerCounts.get(key) || 0) - 1));
};
}
/**
* Emit an event
* @param {string} eventType - Event type
* @param {Object} data - Event data (must include sessionId for session-scoped events)
*/
emit(eventType, data = {}) {
this.metrics.eventsEmitted++;
this.metrics.eventsByType.set(eventType, (this.metrics.eventsByType.get(eventType) || 0) + 1);
// Add timestamp to all events
const eventData = {
...data,
_timestamp: Date.now(),
_eventType: eventType
};
super.emit(eventType, eventData);
}
/**
* Subscribe to all events for a session
* @param {string} sessionId - Session ID
* @param {Function} handler - Handler for all session events
* @returns {Function} Unsubscribe function
*/
subscribeToSession(sessionId, handler) {
const eventTypes = [
'session-output',
'session-error',
'session-status',
'operations-detected',
'operations-executed',
'operations-error',
'approval-request',
'approval-confirmed',
'approval-expired',
'session-created',
'session-deleted'
];
const unsubscribers = eventTypes.map(type =>
this.subscribe(type, sessionId, handler)
);
// Return combined unsubscribe function
return () => unsubscribers.forEach(unsub => unsub());
}
/**
* Get metrics
*/
getMetrics() {
return {
eventsEmitted: this.metrics.eventsEmitted,
eventsByType: Object.fromEntries(this.metrics.eventsByType),
listenerCounts: Object.fromEntries(this.metrics.listenerCounts),
activeListeners: this.listenerCount('session-output') +
this.listenerCount('session-error') +
this.listenerCount('session-status')
};
}
/**
* Clear all listeners (useful for testing)
*/
clear() {
this.removeAllListeners();
this.metrics = {
eventsEmitted: 0,
eventsByType: new Map(),
listenerCounts: new Map()
};
}
}
// Singleton instance
const eventBus = new EventBus();
module.exports = eventBus;
```
### 2.3 Event Types
```javascript
// Event type definitions
// Session lifecycle events
eventBus.emit('session-created', { sessionId, mode, workingDir });
eventBus.emit('session-deleted', { sessionId });
// Session output events
eventBus.emit('session-output', {
sessionId,
type: 'stdout' | 'stderr' | 'exit' | 'ready',
content: 'output text',
timestamp: Date.now()
});
eventBus.emit('session-error', {
sessionId,
error: 'Error message',
code: 'ERR_CODE',
recoverable: true
});
eventBus.emit('session-status', {
sessionId,
status: 'running' | 'idle' | 'exited' | 'error',
pid: 12345,
uptime: 45000
});
// Operation events
eventBus.emit('operations-detected', {
sessionId,
operations: [{ type: 'write', path: '/path', content: '...' }],
response: 'full response text'
});
eventBus.emit('operations-executed', {
sessionId,
results: [{ success: true, path: '/path', operation: 'write' }]
});
eventBus.emit('operations-error', {
sessionId,
error: 'Error message',
operations: []
});
// Approval events
eventBus.emit('approval-request', {
sessionId,
approvalId: 'apr-123',
command: 'rm -rf /',
explanation: 'Remove files',
expiresAt: Date.now() + 300000
});
eventBus.emit('approval-confirmed', {
sessionId,
approvalId: 'apr-123',
approved: true,
customCommand: null
});
eventBus.emit('approval-expired', {
sessionId,
approvalId: 'apr-123'
});
```
---
## 3. SSE Endpoint Implementation
### 3.1 SSE Manager
```javascript
// /home/uroma/obsidian-web-interface/services/sse-manager.js
const eventBus = require('./event-bus');
class SSEManager {
constructor() {
// Map: sessionId -> Set of response objects
this.connections = new Map();
this.heartbeatInterval = 30000; // 30 seconds
this.heartbeatTimers = new Map();
}
/**
* Add SSE connection for a session
* @param {string} sessionId - Session ID
* @param {Object} res - Express response object
* @param {Object} req - Express request object
*/
addConnection(sessionId, res, req) {
// Setup SSE headers
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache, no-transform');
res.setHeader('X-Accel-Buffering', 'no'); // Disable nginx buffering
res.setHeader('Connection', 'keep-alive');
res.flushHeaders();
// Add connection to set
if (!this.connections.has(sessionId)) {
this.connections.set(sessionId, new Set());
}
const connectionSet = this.connections.get(sessionId);
connectionSet.add(res);
console.log(`[SSEManager] Connection added for session ${sessionId}. Total connections: ${connectionSet.size}`);
// Send initial connection event
this.sendEvent(sessionId, res, {
type: 'connected',
sessionId,
timestamp: Date.now(),
message: 'SSE connection established'
});
// Start heartbeat for this connection
this.startHeartbeat(sessionId, res);
// Subscribe to all session events
const unsubscribe = eventBus.subscribeToSession(sessionId, (eventData) => {
this.sendEvent(sessionId, res, eventData);
});
// Handle client disconnect
req.on('close', () => {
this.removeConnection(sessionId, res, unsubscribe);
});
// Send immediate status update
eventBus.emit('session-status', {
sessionId,
status: 'connected'
});
}
/**
* Send SSE event to a specific connection
* @param {string} sessionId - Session ID
* @param {Object} res - Response object
* @param {Object} data - Event data
*/
sendEvent(sessionId, res, data) {
if (res.destroyed || res.writableEnded) {
return false;
}
try {
const eventName = data.type || 'message';
const eventData = JSON.stringify(data);
res.write(`event: ${eventName}\n`);
res.write(`data: ${eventData}\n`);
res.write(`id: ${Date.now()}\n`);
res.write('\n');
return true;
} catch (error) {
console.error(`[SSEManager] Error sending event to ${sessionId}:`, error);
return false;
}
}
/**
* Send event to all connections for a session
* @param {string} sessionId - Session ID
* @param {Object} data - Event data
*/
broadcastToSession(sessionId, data) {
const connectionSet = this.connections.get(sessionId);
if (!connectionSet) {
return 0;
}
let sentCount = 0;
const deadConnections = [];
for (const res of connectionSet) {
if (!this.sendEvent(sessionId, res, data)) {
deadConnections.push(res);
} else {
sentCount++;
}
}
// Clean up dead connections
deadConnections.forEach(res => {
connectionSet.delete(res);
});
return sentCount;
}
/**
* Start heartbeat for a connection
* @param {string} sessionId - Session ID
* @param {Object} res - Response object
*/
startHeartbeat(sessionId, res) {
const timerId = setInterval(() => {
if (res.destroyed || res.writableEnded) {
clearInterval(timerId);
this.heartbeatTimers.delete(`${sessionId}-${res.socket?.remotePort}`);
return;
}
// Send heartbeat comment
try {
res.write(': heartbeat\n\n');
} catch (error) {
clearInterval(timerId);
this.removeConnection(sessionId, res, () => {});
}
}, this.heartbeatInterval);
this.heartbeatTimers.set(`${sessionId}-${res.socket?.remotePort}`, timerId);
}
/**
* Remove SSE connection
* @param {string} sessionId - Session ID
* @param {Object} res - Response object
* @param {Function} unsubscribe - Unsubscribe function from EventBus
*/
removeConnection(sessionId, res, unsubscribe) {
const connectionSet = this.connections.get(sessionId);
if (connectionSet) {
connectionSet.delete(res);
if (connectionSet.size === 0) {
this.connections.delete(sessionId);
}
console.log(`[SSEManager] Connection removed for session ${sessionId}. Remaining: ${connectionSet.size || 0}`);
}
// Stop heartbeat
const timerId = this.heartbeatTimers.get(`${sessionId}-${res.socket?.remotePort}`);
if (timerId) {
clearInterval(timerId);
this.heartbeatTimers.delete(`${sessionId}-${res.socket?.remotePort}`);
}
// Unsubscribe from events
if (unsubscribe) {
unsubscribe();
}
}
/**
* Get connection count for a session
* @param {string} sessionId - Session ID
*/
getConnectionCount(sessionId) {
return this.connections.get(sessionId)?.size || 0;
}
/**
* Get all active sessions
*/
getActiveSessions() {
return Array.from(this.connections.keys());
}
/**
* Clean up all connections (for shutdown)
*/
cleanup() {
console.log('[SSEManager] Cleaning up all connections...');
for (const [sessionId, connectionSet] of this.connections.entries()) {
for (const res of connectionSet) {
try {
res.end();
} catch (error) {
// Ignore errors during cleanup
}
}
}
// Clear all heartbeat timers
for (const timerId of this.heartbeatTimers.values()) {
clearInterval(timerId);
}
this.connections.clear();
this.heartbeatTimers.clear();
console.log('[SSEManager] Cleanup complete');
}
}
// Singleton instance
const sseManager = new SSEManager();
module.exports = sseManager;
```
### 3.2 SSE Route
```javascript
// /home/uroma/obsidian-web-interface/routes/sse-routes.js
const express = require('express');
const sseManager = require('../services/sse-manager');
const { validateSessionId } = require('../middleware/validation');
const router = express.Router();
/**
* SSE endpoint for session events
* GET /api/session/:sessionId/events
*/
router.get('/session/:sessionId/events', validateSessionId, (req, res) => {
const { sessionId } = req.params;
console.log(`[SSE] New connection for session ${sessionId}`);
// Add SSE connection
sseManager.addConnection(sessionId, res, req);
// Note: Response is kept open for SSE streaming
});
/**
* Get connection status for a session
* GET /api/session/:sessionId/events/status
*/
router.get('/session/:sessionId/events/status', validateSessionId, (req, res) => {
const { sessionId } = req.params;
res.json({
sessionId,
activeConnections: sseManager.getConnectionCount(sessionId),
timestamp: Date.now()
});
});
module.exports = router;
```
---
## 4. Route Implementation
### 4.1 Session Routes
```javascript
// /home/uroma/obsidian-web-interface/routes/session-routes.js
const express = require('express');
const claudeService = require('../services/claude-service');
const eventBus = require('../services/event-bus');
const { validateSessionId } = require('../middleware/validation');
const router = express.Router();
/**
* Send command/prompt to session
* POST /api/session/:sessionId/prompt
*/
router.post('/session/:sessionId/prompt', validateSessionId, async (req, res) => {
const { sessionId } = req.params;
const { command, context } = req.body;
if (!command) {
return res.status(400).json({
error: 'Command is required'
});
}
try {
// Send command to Claude service
await claudeService.sendCommand(sessionId, command);
// Response will come via SSE
res.json({
success: true,
sessionId,
message: 'Command sent',
timestamp: Date.now()
});
// Emit command-sent event
eventBus.emit('command-sent', {
sessionId,
command: command.substring(0, 100) + (command.length > 100 ? '...' : ''),
timestamp: Date.now()
});
} catch (error) {
console.error(`[SessionRoutes] Error sending command:`, error);
// Emit error event
eventBus.emit('session-error', {
sessionId,
error: error.message,
code: 'COMMAND_SEND_ERROR',
recoverable: false
});
res.status(500).json({
error: 'Failed to send command',
message: error.message
});
}
});
/**
* Get session status
* GET /api/session/:sessionId/status
*/
router.get('/session/:sessionId/status', validateSessionId, (req, res) => {
const { sessionId } = req.params;
try {
const session = claudeService.getSession(sessionId);
if (!session) {
return res.status(404).json({
error: 'Session not found',
sessionId
});
}
res.json({
sessionId: session.id,
status: session.status,
mode: session.mode,
createdAt: session.createdAt,
lastActivity: session.lastActivity,
pid: process.pid,
uptime: Date.now() - new Date(session.createdAt).getTime()
});
} catch (error) {
console.error(`[SessionRoutes] Error getting status:`, error);
res.status(500).json({
error: 'Failed to get session status',
message: error.message
});
}
});
/**
* Get session context files
* GET /api/session/:sessionId/context
*/
router.get('/session/:sessionId/context', validateSessionId, async (req, res) => {
const { sessionId } = req.params;
try {
const context = await claudeService.getSessionContext(sessionId);
res.json({
sessionId,
context: context.files,
timestamp: Date.now()
});
} catch (error) {
console.error(`[SessionRoutes] Error getting context:`, error);
res.status(500).json({
error: 'Failed to get session context',
message: error.message
});
}
});
/**
* Preview operations for session
* POST /api/session/:sessionId/operations/preview
*/
router.post('/session/:sessionId/operations/preview', validateSessionId, async (req, res) => {
const { sessionId } = req.params;
const { response } = req.body;
if (!response) {
return res.status(400).json({
error: 'Response is required'
});
}
try {
const operations = await claudeService.previewOperations(sessionId, response);
// Emit operations-detected event
eventBus.emit('operations-detected', {
sessionId,
operations,
response
});
res.json({
success: true,
operations,
count: operations.length
});
} catch (error) {
console.error(`[SessionRoutes] Error previewing operations:`, error);
eventBus.emit('session-error', {
sessionId,
error: error.message,
code: 'PREVIEW_ERROR',
recoverable: true
});
res.status(500).json({
error: 'Failed to preview operations',
message: error.message
});
}
});
/**
* Execute operations for session
* POST /api/session/:sessionId/operations/execute
*/
router.post('/session/:sessionId/operations/execute', validateSessionId, async (req, res) => {
const { sessionId } = req.params;
const { operations } = req.body;
if (!operations || !Array.isArray(operations)) {
return res.status(400).json({
error: 'Operations array is required'
});
}
try {
const results = await claudeService.executeOperations(sessionId, operations);
// Emit operations-executed event
eventBus.emit('operations-executed', {
sessionId,
results
});
res.json({
success: true,
results,
executed: results.length
});
} catch (error) {
console.error(`[SessionRoutes] Error executing operations:`, error);
eventBus.emit('operations-error', {
sessionId,
error: error.message,
operations
});
res.status(500).json({
error: 'Failed to execute operations',
message: error.message
});
}
});
/**
* Delete/terminate session
* DELETE /api/session/:sessionId
*/
router.delete('/session/:sessionId', validateSessionId, async (req, res) => {
const { sessionId } = req.params;
try {
await claudeService.deleteSession(sessionId);
// Emit session-deleted event
eventBus.emit('session-deleted', {
sessionId,
timestamp: Date.now()
});
res.json({
success: true,
message: 'Session deleted',
sessionId
});
} catch (error) {
console.error(`[SessionRoutes] Error deleting session:`, error);
res.status(500).json({
error: 'Failed to delete session',
message: error.message
});
}
});
module.exports = router;
```
### 4.2 Terminal Routes
```javascript
// /home/uroma/obsidian-web-interface/routes/terminal-routes.js
const express = require('express');
const terminalService = require('../services/terminal-service');
const eventBus = require('../services/event-bus');
const router = express.Router();
/**
* Create new terminal
* POST /api/terminal
*/
router.post('/terminal', async (req, res) => {
const { workingDir, mode, sessionId } = req.body;
try {
const result = terminalService.createTerminal({
workingDir,
mode,
sessionId
});
if (result.success) {
// Emit terminal-created event
eventBus.emit('terminal-created', {
terminalId: result.terminalId,
sessionId,
workingDir,
mode
});
res.json({
success: true,
terminalId: result.terminalId
});
} else {
res.status(500).json({
error: result.error
});
}
} catch (error) {
console.error('[TerminalRoutes] Error creating terminal:', error);
res.status(500).json({
error: 'Failed to create terminal',
message: error.message
});
}
});
/**
* Get terminal output (polling endpoint, kept for compatibility)
* GET /api/terminal/:terminalId/output
*/
router.get('/terminal/:terminalId/output', (req, res) => {
const { terminalId } = req.params;
const sinceIndex = parseInt(req.query.since) || 0;
const result = terminalService.getTerminalOutput(terminalId, sinceIndex);
if (result.success) {
res.json(result);
} else {
res.status(404).json({
error: result.error
});
}
});
/**
* Send input to terminal
* POST /api/terminal/:terminalId/input
*/
router.post('/terminal/:terminalId/input', (req, res) => {
const { terminalId } = req.params;
const { data } = req.body;
if (!data) {
return res.status(400).json({
error: 'Input data is required'
});
}
const result = terminalService.sendTerminalInput(terminalId, data);
if (result.success) {
res.json({ success: true });
} else {
res.status(500).json({
error: result.error
});
}
});
/**
* Close terminal
* DELETE /api/terminal/:terminalId
*/
router.delete('/terminal/:terminalId', (req, res) => {
const { terminalId } = req.params;
const result = terminalService.closeTerminal(terminalId);
if (result.success) {
eventBus.emit('terminal-closed', {
terminalId
});
res.json({ success: true });
} else {
res.status(500).json({
error: result.error
});
}
});
module.exports = router;
```
---
## 5. Integration with Existing Code
### 5.1 Modify ClaudeService to use EventBus
```javascript
// In services/claude-service.js
const eventBus = require('./event-bus');
class ClaudeCodeService {
constructor() {
// ... existing code ...
}
// Replace callback-based approach with EventBus
sendCommand(sessionId, command) {
const session = this.sessions.get(sessionId);
if (!session) {
throw new Error(`Session ${sessionId} not found`);
}
// ... send command to process ...
// Instead of callback, emit event
eventBus.emit('command-sent', {
sessionId,
command,
timestamp: Date.now()
});
}
// When output is received from Claude Code process
handleOutput(sessionId, output) {
// Emit event instead of callback
eventBus.emit('session-output', {
sessionId,
type: output.type || 'stdout',
content: output.content || output,
timestamp: Date.now()
});
}
// When operations are detected
handleOperationsDetected(sessionId, operations, response) {
eventBus.emit('operations-detected', {
sessionId,
operations,
response
});
}
// When operations are executed
handleOperationsExecuted(sessionId, results) {
eventBus.emit('operations-executed', {
sessionId,
results
});
}
}
```
### 5.2 Update server.js
```javascript
// In server.js
const eventBus = require('./services/event-bus');
const sseManager = require('./services/sse-manager');
const sessionRoutes = require('./routes/session-routes');
const terminalRoutes = require('./routes/terminal-routes');
const sseRoutes = require('./routes/sse-routes');
// ... existing middleware ...
// Register new routes
app.use('/api', sessionRoutes);
app.use('/api', terminalRoutes);
app.use('/api', sseRoutes);
// Keep legacy routes for backward compatibility (with deprecation warnings)
app.use('/claude/api/claude', (req, res, next) => {
console.warn('[DEPRECATION] /claude/api/claude/* routes are deprecated. Use /api/session/:sessionId/* instead.');
next();
});
// ... existing WebSocket code (deprecated) ...
// Graceful shutdown
process.on('SIGTERM', async () => {
console.log('[Server] SIGTERM received, shutting down gracefully...');
// Cleanup SSE connections
sseManager.cleanup();
// ... existing cleanup ...
process.exit(0);
});
```
---
## 6. Client-Side Reconnection Logic
### 6.1 SSE Client Implementation
```javascript
// Client-side SSE connection manager
class SessionEventStream {
constructor(sessionId, options = {}) {
this.sessionId = sessionId;
this.options = {
reconnectInterval: 1000, // Initial reconnect delay
maxReconnectInterval: 30000, // Max reconnect delay
reconnectDecay: 1.5, // Exponential backoff multiplier
maxReconnectAttempts: 10,
heartbeatTimeout: 60000, // Timeout for heartbeat
...options
};
this.eventSource = null;
this.reconnectAttempts = 0;
this.isConnected = false;
this.listeners = new Map();
this.lastHeartbeat = Date.now();
this.heartbeatCheckInterval = null;
// Connect immediately
this.connect();
}
connect() {
if (this.eventSource) {
this.eventSource.close();
}
const url = `/api/session/${this.sessionId}/events`;
console.log(`[SSE] Connecting to ${url}`);
this.eventSource = new EventSource(url);
// Connection opened
this.eventSource.onopen = () => {
console.log(`[SSE] Connected to session ${this.sessionId}`);
this.isConnected = true;
this.reconnectAttempts = 0;
// Start heartbeat check
this.startHeartbeatCheck();
// Emit connected event
this.emit('connected', {
sessionId: this.sessionId,
timestamp: Date.now()
});
};
// Handle named events
this.eventSource.addEventListener('connected', (e) => {
const data = JSON.parse(e.data);
this.emit('connected', data);
this.lastHeartbeat = Date.now();
});
this.eventSource.addEventListener('session-output', (e) => {
const data = JSON.parse(e.data);
this.emit('output', data);
this.lastHeartbeat = Date.now();
});
this.eventSource.addEventListener('session-error', (e) => {
const data = JSON.parse(e.data);
this.emit('error', data);
this.lastHeartbeat = Date.now();
});
this.eventSource.addEventListener('session-status', (e) => {
const data = JSON.parse(e.data);
this.emit('status', data);
this.lastHeartbeat = Date.now();
});
this.eventSource.addEventListener('operations-detected', (e) => {
const data = JSON.parse(e.data);
this.emit('operations-detected', data);
this.lastHeartbeat = Date.now();
});
this.eventSource.addEventListener('operations-executed', (e) => {
const data = JSON.parse(e.data);
this.emit('operations-executed', data);
this.lastHeartbeat = Date.now();
});
this.eventSource.addEventListener('approval-request', (e) => {
const data = JSON.parse(e.data);
this.emit('approval-request', data);
this.lastHeartbeat = Date.now();
});
// Default message handler
this.eventSource.onmessage = (e) => {
// Heartbeat comments are handled automatically
if (e.data.startsWith(':')) {
this.lastHeartbeat = Date.now();
return;
}
try {
const data = JSON.parse(e.data);
this.emit('message', data);
} catch (error) {
console.error('[SSE] Error parsing message:', error);
}
};
// Handle errors
this.eventSource.onerror = (e) => {
console.error(`[SSE] Error:`, e);
this.isConnected = false;
if (this.heartbeatCheckInterval) {
clearInterval(this.heartbeatCheckInterval);
this.heartbeatCheckInterval = null;
}
// Don't reconnect if it's a 404 (session not found)
if (e.target && e.target.readyState === 2) {
console.error(`[SSE] Connection closed permanently`);
this.emit('disconnected', {
sessionId: this.sessionId,
permanent: true
});
return;
}
// Attempt reconnect
if (this.reconnectAttempts < this.options.maxReconnectAttempts) {
const delay = Math.min(
this.options.reconnectInterval * Math.pow(this.options.reconnectDecay, this.reconnectAttempts),
this.options.maxReconnectInterval
);
this.reconnectAttempts++;
console.log(`[SSE] Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts}/${this.options.maxReconnectAttempts})`);
this.emit('reconnecting', {
attempt: this.reconnectAttempts,
delay,
maxAttempts: this.options.maxReconnectAttempts
});
setTimeout(() => this.connect(), delay);
} else {
console.error(`[SSE] Max reconnect attempts reached`);
this.emit('disconnected', {
sessionId: this.sessionId,
permanent: true,
reason: 'max_reconnect_attempts'
});
}
};
}
startHeartbeatCheck() {
if (this.heartbeatCheckInterval) {
clearInterval(this.heartbeatCheckInterval);
}
this.heartbeatCheckInterval = setInterval(() => {
const timeSinceLastHeartbeat = Date.now() - this.lastHeartbeat;
if (timeSinceLastHeartbeat > this.options.heartbeatTimeout) {
console.warn(`[SSE] Heartbeat timeout (${timeSinceLastHeartbeat}ms)`);
this.eventSource.close();
this.connect(); // Force reconnect
}
}, this.options.heartbeatTimeout / 2);
}
on(event, handler) {
if (!this.listeners.has(event)) {
this.listeners.set(event, []);
}
this.listeners.get(event).push(handler);
}
off(event, handler) {
if (!this.listeners.has(event)) {
return;
}
const handlers = this.listeners.get(event);
const index = handlers.indexOf(handler);
if (index > -1) {
handlers.splice(index, 1);
}
}
emit(event, data) {
if (!this.listeners.has(event)) {
return;
}
for (const handler of this.listeners.get(event)) {
try {
handler(data);
} catch (error) {
console.error(`[SSE] Error in handler for ${event}:`, error);
}
}
}
disconnect() {
console.log(`[SSE] Disconnecting from session ${this.sessionId}`);
if (this.heartbeatCheckInterval) {
clearInterval(this.heartbeatCheckInterval);
}
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
this.isConnected = false;
this.listeners.clear();
}
}
// Export for use in UI
window.SessionEventStream = SessionEventStream;
```
### 6.2 Usage Example
```javascript
// In the terminal UI or session page
const sessionId = 'session-123'; // From URL parameter
const eventStream = new SessionEventStream(sessionId, {
reconnectInterval: 2000,
maxReconnectAttempts: 20
});
// Listen for output
eventStream.on('output', (data) => {
console.log('Output:', data.content);
// Update UI
appendToTerminal(data.content);
});
// Listen for errors
eventStream.on('error', (data) => {
console.error('Session error:', data.error);
showError(data.error);
});
// Listen for connection status
eventStream.on('connected', () => {
showConnectionStatus('Connected');
});
eventStream.on('reconnecting', (data) => {
showConnectionStatus(`Reconnecting... (${data.attempt}/${data.maxAttempts})`);
});
// Send command
async function sendCommand(command) {
const response = await fetch(`/api/session/${sessionId}/prompt`, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({ command })
});
const result = await response.json();
if (!result.success) {
console.error('Failed to send command:', result.error);
}
}
// Cleanup on page unload
window.addEventListener('beforeunload', () => {
eventStream.disconnect();
});
```
---
## 7. Migration Path
### 7.1 Phase 1: Add New Infrastructure (Non-Breaking)
**Duration:** 1-2 weeks
**Tasks:**
1. Implement EventBus service
2. Implement SSE Manager service
3. Create new route modules (session-routes, terminal-routes, sse-routes)
4. Add new SSE endpoint alongside existing WebSocket
5. Deploy new code without removing old functionality
**Risk:** Low - No breaking changes
**Testing:**
- Unit tests for EventBus
- Integration tests for SSE connections
- Load testing for SSE streaming
- Verify WebSocket still works
### 7.2 Phase 2: Migrate Client-Side (Gradual)
**Duration:** 2-3 weeks
**Tasks:**
1. Update IDE UI to use SSE for new sessions
2. Keep WebSocket for existing sessions (feature flag)
3. Add telemetry to track usage patterns
4. Fix any issues discovered during migration
**Risk:** Medium - Client-side changes
**Testing:**
- A/B testing: SSE vs WebSocket
- Monitor error rates
- Performance comparison
- User feedback
### 7.3 Phase 3: Deprecate WebSocket
**Duration:** 1-2 weeks
**Tasks:**
1. Add deprecation warnings to WebSocket endpoint
2. Redirect WebSocket connections to SSE with message
3. Update all documentation
4. Add migration guide for any custom clients
**Risk:** Low - Only affects legacy clients
**Testing:**
- Verify deprecation warnings appear
- Test redirect logic
- Confirm new clients use SSE
### 7.4 Phase 4: Remove Legacy Code
**Duration:** 1 week
**Tasks:**
1. Remove WebSocket server code
2. Remove old client-side WebSocket code
3. Clean up unused middleware
4. Update API documentation
**Risk:** Low - All clients migrated
**Testing:**
- Full regression test
- Load testing
- Security audit
---
## 8. Risk Assessment & Mitigation
### 8.1 High-Risk Areas
| Risk | Impact | Probability | Mitigation |
|------|--------|-------------|------------|
| **SSE connection drops** | High | Medium | Implement exponential backoff reconnection, heartbeat monitoring |
| **nginx buffering issues** | High | Medium | Add `X-Accel-Buffering: no` header, configure nginx properly |
| **EventBus memory leaks** | High | Low | Implement listener cleanup, add metrics/monitoring |
| **Session context confusion during migration** | High | Medium | Use feature flags, clear versioning, extensive logging |
| **Client compatibility** | Medium | Low | Test on multiple browsers, provide polyfills if needed |
| **Performance degradation** | Medium | Low | Load testing, optimize event filtering, use compression |
### 8.2 nginx Configuration
```nginx
# nginx configuration for SSE
location /api/session/ {
# Disable buffering for SSE
proxy_buffering off;
proxy_cache off;
# SSE-specific headers
proxy_pass http://localhost:3010;
proxy_http_version 1.1;
proxy_set_header Connection '';
proxy_set_header Cache-Control no-cache;
proxy_set_header X-Accel-Buffering no;
# Timeouts
proxy_read_timeout 86400s; # 24 hours
proxy_send_timeout 86400s;
# Disable buffering
proxy_buffering off;
}
```
### 8.3 Monitoring & Observability
```javascript
// Add monitoring hooks
eventBus.on('newListener', (event) => {
console.log(`[EventBus] New listener for ${event}`);
metrics.increment('eventbus.listeners.added', { event });
});
eventBus.on('removeListener', (event) => {
console.log(`[EventBus] Removed listener for ${event}`);
metrics.increment('eventbus.listeners.removed', { event });
});
// SSE connection monitoring
setInterval(() => {
const activeSessions = sseManager.getActiveSessions();
const metrics = eventBus.getMetrics();
console.log('[Metrics]', {
activeSSEConnections: activeSessions.length,
sessions: activeSessions,
eventsEmitted: metrics.eventsEmitted,
activeListeners: metrics.activeListeners
});
// Send to monitoring service
monitoring.gauge('sse.connections', activeSessions.length);
monitoring.gauge('eventbus.listeners', metrics.activeListeners);
}, 60000); // Every minute
```
---
## 9. Testing Strategy
### 9.1 Unit Tests
```javascript
// tests/event-bus.test.js
describe('EventBus', () => {
it('should emit and receive events', (done) => {
eventBus.subscribe('test-event', null, (data) => {
assert.equal(data.test, 'value');
done();
});
eventBus.emit('test-event', { test: 'value' });
});
it('should filter events by session ID', (done) => {
const handler = jest.fn();
eventBus.subscribe('test-event', 'session-1', handler);
eventBus.emit('test-event', { sessionId: 'session-1' });
eventBus.emit('test-event', { sessionId: 'session-2' });
expect(handler).toHaveBeenCalledTimes(1);
expect(handler).toHaveBeenCalledWith(expect.objectContaining({
sessionId: 'session-1'
}));
done();
});
it('should unsubscribe correctly', (done) => {
const handler = jest.fn();
const unsubscribe = eventBus.subscribe('test-event', null, handler);
unsubscribe();
eventBus.emit('test-event', { test: 'value' });
expect(handler).not.toHaveBeenCalled();
done();
});
});
```
### 9.2 Integration Tests
```javascript
// tests/sse-integration.test.js
describe('SSE Integration', () => {
it('should establish SSE connection and receive events', async () => {
const sessionId = 'test-session-' + Date.now();
// Create session
const session = await createSession(sessionId);
// Connect to SSE
const events = [];
const eventSource = new EventSource(`/api/session/${sessionId}/events`);
eventSource.addEventListener('connected', (e) => {
events.push(JSON.parse(e.data));
// Send command
fetch(`/api/session/${sessionId}/prompt`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ command: 'echo test' })
});
});
eventSource.addEventListener('session-output', (e) => {
events.push(JSON.parse(e.data));
eventSource.close();
});
// Wait for events
await waitFor(() => events.length >= 2);
assert.equal(events[0].type, 'connected');
assert.equal(events[1].type, 'session-output');
});
it('should handle reconnection after connection drop', async () => {
// Test reconnection logic
});
});
```
### 9.3 Load Testing
```javascript
// tests/load-test.js
// Simulate 100 concurrent SSE connections
const connections = [];
for (let i = 0; i < 100; i++) {
const sessionId = `load-test-${i}`;
const eventSource = new EventSource(`/api/session/${sessionId}/events`);
connections.push({ sessionId, eventSource });
}
// Monitor memory usage, CPU, response times
// ...
// Cleanup
connections.forEach(conn => conn.eventSource.close());
```
---
## 10. File Structure Summary
```
/home/uroma/obsidian-web-interface/
├── services/
│ ├── event-bus.js ← NEW
│ ├── sse-manager.js ← NEW
│ ├── claude-service.js ← MODIFY
│ ├── terminal-service.js ← MODIFY
│ └── database.js ← NO CHANGE
├── routes/
│ ├── session-routes.js ← NEW
│ ├── terminal-routes.js ← NEW
│ └── sse-routes.js ← NEW
├── middleware/
│ └── validation.js ← NEW (session ID validation)
├── public/
│ ├── js/
│ │ ├── sse-client.js ← NEW
│ │ └── session-ui.js ← MODIFY
│ └── ...
├── tests/
│ ├── event-bus.test.js ← NEW
│ ├── sse-manager.test.js ← NEW
│ └── integration.test.js ← NEW
├── server.js ← MODIFY (add route registration)
└── package.json ← MODIFY (no new deps needed)
```
---
## 11. Dependencies
No new npm dependencies required! SSE is built into Node.js/Express:
- **EventEmitter**: Built into Node.js
- **Express**: Already installed
- **EventSource**: Built into modern browsers
Optional additions:
- `@types/node` - For TypeScript support (if migrating to TS)
- `supertest` - For testing HTTP endpoints
---
## 12. Success Criteria
### 12.1 Functional Requirements
- [ ] SSE endpoint accepts connections for valid session IDs
- [ ] SSE endpoint returns 404 for invalid session IDs
- [ ] Session events are streamed to connected clients
- [ ] Multiple clients can connect to same session
- [ ] Reconnection works with exponential backoff
- [ ] Heartbeat prevents connection drops
- [ ] No session context ambiguity
### 12.2 Non-Functional Requirements
- [ ] Response time < 100ms for event delivery
- [ ] Support 100+ concurrent SSE connections
- [ ] Memory usage stable over 24 hours
- [ ] No memory leaks in EventBus
- [ ] Error rate < 0.1%
- [ ] WebSocket deprecated with clear migration path
### 12.3 Migration Success
- [ ] All clients using SSE (WebSocket usage < 5%)
- [ ] No increase in error rates during migration
- [ ] User feedback positive
- [ ] Documentation complete and accurate
---
## 13. Rollback Plan
If issues arise during deployment:
1. **Immediate Rollback (Phase 1-2):**
- Disable new SSE routes via feature flag
- Continue using WebSocket
- Zero downtime
2. **Partial Rollback (Phase 3):**
- Keep SSE available
- Re-enable WebSocket as fallback
- Investigate issues
3. **Code Rollback:**
- Revert to previous commit
- Restore database if schema changed
- Restart services
4. **Data Migration:**
- No data migration required (stateless design)
---
## 14. Documentation Updates
### 14.1 API Documentation
Update README.md and API docs with:
- New SSE endpoint documentation
- Event type reference
- Code examples for client-side usage
- Migration guide from WebSocket to SSE
### 14.2 Architecture Diagrams
Create/update diagrams showing:
- Event flow from Claude Service to clients via EventBus
- SSE connection lifecycle
- Reconnection logic flow
---
## 15. Next Steps
1. **Review this plan** with team and stakeholders
2. **Create GitHub issues** for each phase
3. **Set up feature flags** for gradual rollout
4. **Implement EventBus** (Phase 1)
5. **Write tests** for EventBus
6. **Implement SSE Manager** (Phase 1)
7. **Deploy Phase 1** to staging environment
8. **Monitor metrics** and gather feedback
9. **Proceed to Phase 2** when confident
---
## Appendix A: Code Snippets
### A.1 Session ID Validation Middleware
```javascript
// middleware/validation.js
const SESSION_ID_PATTERN = /^[a-zA-Z0-9_-]{8,32}$/;
function validateSessionId(req, res, next) {
const { sessionId } = req.params;
if (!sessionId) {
return res.status(400).json({
error: 'Session ID is required'
});
}
if (!SESSION_ID_PATTERN.test(sessionId)) {
return res.status(400).json({
error: 'Invalid session ID format',
pattern: '8-32 characters, alphanumeric, underscore, hyphen'
});
}
// Check session exists (optional, can be done in route handler)
const claudeService = require('../services/claude-service');
const session = claudeService.getSession(sessionId);
if (!session) {
return res.status(404).json({
error: 'Session not found',
sessionId
});
}
req.sessionContext = session;
next();
}
module.exports = { validateSessionId };
```
### A.2 Complete server.js Integration
```javascript
// In server.js - add these sections
// After existing imports
const eventBus = require('./services/event-bus');
const sseManager = require('./services/sse-manager');
const sessionRoutes = require('./routes/session-routes');
const terminalRoutes = require('./routes/terminal-routes');
const sseRoutes = require('./routes/sse-routes');
// After existing middleware
// Register new API routes
app.use('/api', sessionRoutes);
app.use('/api', terminalRoutes);
app.use('/api', sseRoutes);
// Deprecated routes (with warnings)
app.use('/claude/api/claude', (req, res, next) => {
console.warn('[DEPRECATED] /claude/api/claude/* is deprecated. Use /api/session/:sessionId/* instead.');
console.warn('[DEPRECATED] WebSocket connections should migrate to SSE: /api/session/:sessionId/events');
next();
});
// Monitoring endpoint
app.get('/api/debug/metrics', (req, res) => {
res.json({
eventBus: eventBus.getMetrics(),
sse: {
activeSessions: sseManager.getActiveSessions(),
totalConnections: Array.from(sseManager.connections.values())
.reduce((sum, set) => sum + set.size, 0)
}
});
});
// Graceful shutdown
const cleanup = async () => {
console.log('[Server] Starting graceful shutdown...');
// Cleanup SSE connections
sseManager.cleanup();
// ... existing cleanup code ...
process.exit(0);
};
process.on('SIGTERM', cleanup);
process.on('SIGINT', cleanup);
```
---
**End of Refactor Plan**
Last Updated: 2025-01-21
Version: 1.0
Author: Backend Architecture Team