Files
SuperCharged-Claude-Code-Up…/SSE_REFACTOR_PLAN.md
uroma 55aafbae9a Fix project isolation: Make loadChatHistory respect active project sessions
- Modified loadChatHistory() to check for active project before fetching all sessions
- When active project exists, use project.sessions instead of fetching from API
- Added detailed console logging to debug session filtering
- This prevents ALL sessions from appearing in every project's sidebar

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-22 14:43:05 +00:00

48 KiB

Backend Refactor Plan: Route-Based Session Context with SSE

Executive Summary

This document outlines a comprehensive plan to refactor the Obsidian Web Interface from a WebSocket-based architecture to a hybrid approach using route-based session context with Server-Sent Events (SSE) streaming. This eliminates session context ambiguity and provides clearer, more maintainable code.

Current State:

  • WebSocket at /claude/api/claude/chat (session via query/subscription message)
  • Session IDs passed via query parameters or WebSocket messages
  • Context ambiguity when multiple tabs/sessions are open
  • Complex client-side session management

Target State:

  • Route-based URLs with explicit session context
  • SSE streaming at /api/session/:sessionId/events
  • Clear separation: UI routes vs API endpoints
  • No session context ambiguity

1. New URL Architecture

1.1 Route Structure

# UI Routes (serve HTML/JS)
GET  /terminal/:sessionId              → Terminal UI for specific session
GET  /session/:sessionId               → Session detail page
GET  /sessions                         → Session list page

# API Endpoints (session-scoped)
GET  /api/session/:sessionId/events    → SSE event stream
POST /api/session/:sessionId/prompt    → Send command/prompt
GET  /api/session/:sessionId/status    → Session status/health
GET  /api/session/:sessionId/context   → Session context files
POST /api/session/:sessionId/operations/preview  → Preview operations
POST /api/session/:sessionId/operations/execute  → Execute operations
DELETE /api/session/:sessionId         → Delete/terminate session

# Management Endpoints
GET  /api/sessions                     → List all sessions
POST /api/sessions                     → Create new session
POST /api/sessions/:id/duplicate       → Duplicate session
POST /api/sessions/:id/fork            → Fork session
POST /api/sessions/:id/move            → Move to project

# Legacy Routes (deprecated, redirect to new structure)
/claude/api/claude/chat                → WebSocket (deprecated)
/claude/api/claude/sessions            → Use /api/sessions instead

1.2 URL Validation

// Session ID validation
const SESSION_ID_REGEX = /^[a-zA-Z0-9_-]{8,32}$/;

function validateSessionId(sessionId: string): boolean {
  return SESSION_ID_REGEX.test(sessionId);
}

// Middleware for session validation
function validateSession(req, res, next) {
  const { sessionId } = req.params;

  if (!validateSessionId(sessionId)) {
    return res.status(400).json({
      error: 'Invalid session ID format',
      sessionId
    });
  }

  // Check session exists
  const session = claudeService.getSession(sessionId);
  if (!session) {
    return res.status(404).json({
      error: 'Session not found',
      sessionId
    });
  }

  req.sessionContext = session;
  next();
}

2. EventBus Implementation

2.1 File Structure

/home/uroma/obsidian-web-interface/
├── services/
│   ├── event-bus.js           ← NEW: EventBus implementation
│   ├── sse-manager.js         ← NEW: SSE connection management
│   ├── claude-service.js      ← MODIFIED: Emit events instead of callbacks
│   ├── terminal-service.js    ← MODIFIED: Use EventBus
│   └── database.js
├── routes/
│   ├── session-routes.js      ← NEW: Session-specific routes
│   ├── terminal-routes.js     ← NEW: Terminal-specific routes
│   └── sse-routes.js          ← NEW: SSE endpoint
└── server.js                  ← MODIFIED: Route registration

2.2 EventBus API

// /home/uroma/obsidian-web-interface/services/event-bus.js

const EventEmitter = require('events');

class EventBus extends EventEmitter {
  constructor() {
    super();
    this.setMaxListeners(0); // Unlimited listeners
    this.metrics = {
      eventsEmitted: 0,
      eventsByType: new Map(),
      listenerCounts: new Map()
    };
  }

  /**
   * Subscribe to an event type
   * @param {string} eventType - Event type (e.g., 'session-output', 'session-error')
   * @param {string} sessionId - Session ID to filter events (optional, null for all)
   * @param {Function} handler - Event handler function
   * @returns {Function} Unsubscribe function
   */
  subscribe(eventType, sessionId, handler) {
    const listenerId = `${eventType}-${sessionId || 'global'}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;

    const wrappedHandler = (data) => {
      // Filter by session ID if specified
      if (sessionId !== null && data.sessionId !== sessionId) {
        return;
      }
      try {
        handler(data);
      } catch (error) {
        console.error(`[EventBus] Error in handler for ${eventType}:`, error);
      }
    };

    this.on(eventType, wrappedHandler);

    // Track listener
    const key = `${eventType}-${sessionId || 'global'}`;
    this.metrics.listenerCounts.set(key, (this.metrics.listenerCounts.get(key) || 0) + 1);

    // Return unsubscribe function
    return () => {
      this.off(eventType, wrappedHandler);
      this.metrics.listenerCounts.set(key, Math.max(0, (this.metrics.listenerCounts.get(key) || 0) - 1));
    };
  }

  /**
   * Emit an event
   * @param {string} eventType - Event type
   * @param {Object} data - Event data (must include sessionId for session-scoped events)
   */
  emit(eventType, data = {}) {
    this.metrics.eventsEmitted++;
    this.metrics.eventsByType.set(eventType, (this.metrics.eventsByType.get(eventType) || 0) + 1);

    // Add timestamp to all events
    const eventData = {
      ...data,
      _timestamp: Date.now(),
      _eventType: eventType
    };

    super.emit(eventType, eventData);
  }

  /**
   * Subscribe to all events for a session
   * @param {string} sessionId - Session ID
   * @param {Function} handler - Handler for all session events
   * @returns {Function} Unsubscribe function
   */
  subscribeToSession(sessionId, handler) {
    const eventTypes = [
      'session-output',
      'session-error',
      'session-status',
      'operations-detected',
      'operations-executed',
      'operations-error',
      'approval-request',
      'approval-confirmed',
      'approval-expired',
      'session-created',
      'session-deleted'
    ];

    const unsubscribers = eventTypes.map(type =>
      this.subscribe(type, sessionId, handler)
    );

    // Return combined unsubscribe function
    return () => unsubscribers.forEach(unsub => unsub());
  }

  /**
   * Get metrics
   */
  getMetrics() {
    return {
      eventsEmitted: this.metrics.eventsEmitted,
      eventsByType: Object.fromEntries(this.metrics.eventsByType),
      listenerCounts: Object.fromEntries(this.metrics.listenerCounts),
      activeListeners: this.listenerCount('session-output') +
                       this.listenerCount('session-error') +
                       this.listenerCount('session-status')
    };
  }

  /**
   * Clear all listeners (useful for testing)
   */
  clear() {
    this.removeAllListeners();
    this.metrics = {
      eventsEmitted: 0,
      eventsByType: new Map(),
      listenerCounts: new Map()
    };
  }
}

// Singleton instance
const eventBus = new EventBus();

module.exports = eventBus;

2.3 Event Types

// Event type definitions

// Session lifecycle events
eventBus.emit('session-created', { sessionId, mode, workingDir });
eventBus.emit('session-deleted', { sessionId });

// Session output events
eventBus.emit('session-output', {
  sessionId,
  type: 'stdout' | 'stderr' | 'exit' | 'ready',
  content: 'output text',
  timestamp: Date.now()
});

eventBus.emit('session-error', {
  sessionId,
  error: 'Error message',
  code: 'ERR_CODE',
  recoverable: true
});

eventBus.emit('session-status', {
  sessionId,
  status: 'running' | 'idle' | 'exited' | 'error',
  pid: 12345,
  uptime: 45000
});

// Operation events
eventBus.emit('operations-detected', {
  sessionId,
  operations: [{ type: 'write', path: '/path', content: '...' }],
  response: 'full response text'
});

eventBus.emit('operations-executed', {
  sessionId,
  results: [{ success: true, path: '/path', operation: 'write' }]
});

eventBus.emit('operations-error', {
  sessionId,
  error: 'Error message',
  operations: []
});

// Approval events
eventBus.emit('approval-request', {
  sessionId,
  approvalId: 'apr-123',
  command: 'rm -rf /',
  explanation: 'Remove files',
  expiresAt: Date.now() + 300000
});

eventBus.emit('approval-confirmed', {
  sessionId,
  approvalId: 'apr-123',
  approved: true,
  customCommand: null
});

eventBus.emit('approval-expired', {
  sessionId,
  approvalId: 'apr-123'
});

3. SSE Endpoint Implementation

3.1 SSE Manager

// /home/uroma/obsidian-web-interface/services/sse-manager.js

const eventBus = require('./event-bus');

class SSEManager {
  constructor() {
    // Map: sessionId -> Set of response objects
    this.connections = new Map();
    this.heartbeatInterval = 30000; // 30 seconds
    this.heartbeatTimers = new Map();
  }

  /**
   * Add SSE connection for a session
   * @param {string} sessionId - Session ID
   * @param {Object} res - Express response object
   * @param {Object} req - Express request object
   */
  addConnection(sessionId, res, req) {
    // Setup SSE headers
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache, no-transform');
    res.setHeader('X-Accel-Buffering', 'no'); // Disable nginx buffering
    res.setHeader('Connection', 'keep-alive');
    res.flushHeaders();

    // Add connection to set
    if (!this.connections.has(sessionId)) {
      this.connections.set(sessionId, new Set());
    }
    const connectionSet = this.connections.get(sessionId);
    connectionSet.add(res);

    console.log(`[SSEManager] Connection added for session ${sessionId}. Total connections: ${connectionSet.size}`);

    // Send initial connection event
    this.sendEvent(sessionId, res, {
      type: 'connected',
      sessionId,
      timestamp: Date.now(),
      message: 'SSE connection established'
    });

    // Start heartbeat for this connection
    this.startHeartbeat(sessionId, res);

    // Subscribe to all session events
    const unsubscribe = eventBus.subscribeToSession(sessionId, (eventData) => {
      this.sendEvent(sessionId, res, eventData);
    });

    // Handle client disconnect
    req.on('close', () => {
      this.removeConnection(sessionId, res, unsubscribe);
    });

    // Send immediate status update
    eventBus.emit('session-status', {
      sessionId,
      status: 'connected'
    });
  }

  /**
   * Send SSE event to a specific connection
   * @param {string} sessionId - Session ID
   * @param {Object} res - Response object
   * @param {Object} data - Event data
   */
  sendEvent(sessionId, res, data) {
    if (res.destroyed || res.writableEnded) {
      return false;
    }

    try {
      const eventName = data.type || 'message';
      const eventData = JSON.stringify(data);

      res.write(`event: ${eventName}\n`);
      res.write(`data: ${eventData}\n`);
      res.write(`id: ${Date.now()}\n`);
      res.write('\n');

      return true;
    } catch (error) {
      console.error(`[SSEManager] Error sending event to ${sessionId}:`, error);
      return false;
    }
  }

  /**
   * Send event to all connections for a session
   * @param {string} sessionId - Session ID
   * @param {Object} data - Event data
   */
  broadcastToSession(sessionId, data) {
    const connectionSet = this.connections.get(sessionId);
    if (!connectionSet) {
      return 0;
    }

    let sentCount = 0;
    const deadConnections = [];

    for (const res of connectionSet) {
      if (!this.sendEvent(sessionId, res, data)) {
        deadConnections.push(res);
      } else {
        sentCount++;
      }
    }

    // Clean up dead connections
    deadConnections.forEach(res => {
      connectionSet.delete(res);
    });

    return sentCount;
  }

  /**
   * Start heartbeat for a connection
   * @param {string} sessionId - Session ID
   * @param {Object} res - Response object
   */
  startHeartbeat(sessionId, res) {
    const timerId = setInterval(() => {
      if (res.destroyed || res.writableEnded) {
        clearInterval(timerId);
        this.heartbeatTimers.delete(`${sessionId}-${res.socket?.remotePort}`);
        return;
      }

      // Send heartbeat comment
      try {
        res.write(': heartbeat\n\n');
      } catch (error) {
        clearInterval(timerId);
        this.removeConnection(sessionId, res, () => {});
      }
    }, this.heartbeatInterval);

    this.heartbeatTimers.set(`${sessionId}-${res.socket?.remotePort}`, timerId);
  }

  /**
   * Remove SSE connection
   * @param {string} sessionId - Session ID
   * @param {Object} res - Response object
   * @param {Function} unsubscribe - Unsubscribe function from EventBus
   */
  removeConnection(sessionId, res, unsubscribe) {
    const connectionSet = this.connections.get(sessionId);
    if (connectionSet) {
      connectionSet.delete(res);

      if (connectionSet.size === 0) {
        this.connections.delete(sessionId);
      }

      console.log(`[SSEManager] Connection removed for session ${sessionId}. Remaining: ${connectionSet.size || 0}`);
    }

    // Stop heartbeat
    const timerId = this.heartbeatTimers.get(`${sessionId}-${res.socket?.remotePort}`);
    if (timerId) {
      clearInterval(timerId);
      this.heartbeatTimers.delete(`${sessionId}-${res.socket?.remotePort}`);
    }

    // Unsubscribe from events
    if (unsubscribe) {
      unsubscribe();
    }
  }

  /**
   * Get connection count for a session
   * @param {string} sessionId - Session ID
   */
  getConnectionCount(sessionId) {
    return this.connections.get(sessionId)?.size || 0;
  }

  /**
   * Get all active sessions
   */
  getActiveSessions() {
    return Array.from(this.connections.keys());
  }

  /**
   * Clean up all connections (for shutdown)
   */
  cleanup() {
    console.log('[SSEManager] Cleaning up all connections...');

    for (const [sessionId, connectionSet] of this.connections.entries()) {
      for (const res of connectionSet) {
        try {
          res.end();
        } catch (error) {
          // Ignore errors during cleanup
        }
      }
    }

    // Clear all heartbeat timers
    for (const timerId of this.heartbeatTimers.values()) {
      clearInterval(timerId);
    }

    this.connections.clear();
    this.heartbeatTimers.clear();

    console.log('[SSEManager] Cleanup complete');
  }
}

// Singleton instance
const sseManager = new SSEManager();

module.exports = sseManager;

3.2 SSE Route

// /home/uroma/obsidian-web-interface/routes/sse-routes.js

const express = require('express');
const sseManager = require('../services/sse-manager');
const { validateSessionId } = require('../middleware/validation');

const router = express.Router();

/**
 * SSE endpoint for session events
 * GET /api/session/:sessionId/events
 */
router.get('/session/:sessionId/events', validateSessionId, (req, res) => {
  const { sessionId } = req.params;

  console.log(`[SSE] New connection for session ${sessionId}`);

  // Add SSE connection
  sseManager.addConnection(sessionId, res, req);

  // Note: Response is kept open for SSE streaming
});

/**
 * Get connection status for a session
 * GET /api/session/:sessionId/events/status
 */
router.get('/session/:sessionId/events/status', validateSessionId, (req, res) => {
  const { sessionId } = req.params;

  res.json({
    sessionId,
    activeConnections: sseManager.getConnectionCount(sessionId),
    timestamp: Date.now()
  });
});

module.exports = router;

4. Route Implementation

4.1 Session Routes

// /home/uroma/obsidian-web-interface/routes/session-routes.js

const express = require('express');
const claudeService = require('../services/claude-service');
const eventBus = require('../services/event-bus');
const { validateSessionId } = require('../middleware/validation');

const router = express.Router();

/**
 * Send command/prompt to session
 * POST /api/session/:sessionId/prompt
 */
router.post('/session/:sessionId/prompt', validateSessionId, async (req, res) => {
  const { sessionId } = req.params;
  const { command, context } = req.body;

  if (!command) {
    return res.status(400).json({
      error: 'Command is required'
    });
  }

  try {
    // Send command to Claude service
    await claudeService.sendCommand(sessionId, command);

    // Response will come via SSE
    res.json({
      success: true,
      sessionId,
      message: 'Command sent',
      timestamp: Date.now()
    });

    // Emit command-sent event
    eventBus.emit('command-sent', {
      sessionId,
      command: command.substring(0, 100) + (command.length > 100 ? '...' : ''),
      timestamp: Date.now()
    });
  } catch (error) {
    console.error(`[SessionRoutes] Error sending command:`, error);

    // Emit error event
    eventBus.emit('session-error', {
      sessionId,
      error: error.message,
      code: 'COMMAND_SEND_ERROR',
      recoverable: false
    });

    res.status(500).json({
      error: 'Failed to send command',
      message: error.message
    });
  }
});

/**
 * Get session status
 * GET /api/session/:sessionId/status
 */
router.get('/session/:sessionId/status', validateSessionId, (req, res) => {
  const { sessionId } = req.params;

  try {
    const session = claudeService.getSession(sessionId);

    if (!session) {
      return res.status(404).json({
        error: 'Session not found',
        sessionId
      });
    }

    res.json({
      sessionId: session.id,
      status: session.status,
      mode: session.mode,
      createdAt: session.createdAt,
      lastActivity: session.lastActivity,
      pid: process.pid,
      uptime: Date.now() - new Date(session.createdAt).getTime()
    });
  } catch (error) {
    console.error(`[SessionRoutes] Error getting status:`, error);
    res.status(500).json({
      error: 'Failed to get session status',
      message: error.message
    });
  }
});

/**
 * Get session context files
 * GET /api/session/:sessionId/context
 */
router.get('/session/:sessionId/context', validateSessionId, async (req, res) => {
  const { sessionId } = req.params;

  try {
    const context = await claudeService.getSessionContext(sessionId);

    res.json({
      sessionId,
      context: context.files,
      timestamp: Date.now()
    });
  } catch (error) {
    console.error(`[SessionRoutes] Error getting context:`, error);
    res.status(500).json({
      error: 'Failed to get session context',
      message: error.message
    });
  }
});

/**
 * Preview operations for session
 * POST /api/session/:sessionId/operations/preview
 */
router.post('/session/:sessionId/operations/preview', validateSessionId, async (req, res) => {
  const { sessionId } = req.params;
  const { response } = req.body;

  if (!response) {
    return res.status(400).json({
      error: 'Response is required'
    });
  }

  try {
    const operations = await claudeService.previewOperations(sessionId, response);

    // Emit operations-detected event
    eventBus.emit('operations-detected', {
      sessionId,
      operations,
      response
    });

    res.json({
      success: true,
      operations,
      count: operations.length
    });
  } catch (error) {
    console.error(`[SessionRoutes] Error previewing operations:`, error);

    eventBus.emit('session-error', {
      sessionId,
      error: error.message,
      code: 'PREVIEW_ERROR',
      recoverable: true
    });

    res.status(500).json({
      error: 'Failed to preview operations',
      message: error.message
    });
  }
});

/**
 * Execute operations for session
 * POST /api/session/:sessionId/operations/execute
 */
router.post('/session/:sessionId/operations/execute', validateSessionId, async (req, res) => {
  const { sessionId } = req.params;
  const { operations } = req.body;

  if (!operations || !Array.isArray(operations)) {
    return res.status(400).json({
      error: 'Operations array is required'
    });
  }

  try {
    const results = await claudeService.executeOperations(sessionId, operations);

    // Emit operations-executed event
    eventBus.emit('operations-executed', {
      sessionId,
      results
    });

    res.json({
      success: true,
      results,
      executed: results.length
    });
  } catch (error) {
    console.error(`[SessionRoutes] Error executing operations:`, error);

    eventBus.emit('operations-error', {
      sessionId,
      error: error.message,
      operations
    });

    res.status(500).json({
      error: 'Failed to execute operations',
      message: error.message
    });
  }
});

/**
 * Delete/terminate session
 * DELETE /api/session/:sessionId
 */
router.delete('/session/:sessionId', validateSessionId, async (req, res) => {
  const { sessionId } = req.params;

  try {
    await claudeService.deleteSession(sessionId);

    // Emit session-deleted event
    eventBus.emit('session-deleted', {
      sessionId,
      timestamp: Date.now()
    });

    res.json({
      success: true,
      message: 'Session deleted',
      sessionId
    });
  } catch (error) {
    console.error(`[SessionRoutes] Error deleting session:`, error);
    res.status(500).json({
      error: 'Failed to delete session',
      message: error.message
    });
  }
});

module.exports = router;

4.2 Terminal Routes

// /home/uroma/obsidian-web-interface/routes/terminal-routes.js

const express = require('express');
const terminalService = require('../services/terminal-service');
const eventBus = require('../services/event-bus');

const router = express.Router();

/**
 * Create new terminal
 * POST /api/terminal
 */
router.post('/terminal', async (req, res) => {
  const { workingDir, mode, sessionId } = req.body;

  try {
    const result = terminalService.createTerminal({
      workingDir,
      mode,
      sessionId
    });

    if (result.success) {
      // Emit terminal-created event
      eventBus.emit('terminal-created', {
        terminalId: result.terminalId,
        sessionId,
        workingDir,
        mode
      });

      res.json({
        success: true,
        terminalId: result.terminalId
      });
    } else {
      res.status(500).json({
        error: result.error
      });
    }
  } catch (error) {
    console.error('[TerminalRoutes] Error creating terminal:', error);
    res.status(500).json({
      error: 'Failed to create terminal',
      message: error.message
    });
  }
});

/**
 * Get terminal output (polling endpoint, kept for compatibility)
 * GET /api/terminal/:terminalId/output
 */
router.get('/terminal/:terminalId/output', (req, res) => {
  const { terminalId } = req.params;
  const sinceIndex = parseInt(req.query.since) || 0;

  const result = terminalService.getTerminalOutput(terminalId, sinceIndex);

  if (result.success) {
    res.json(result);
  } else {
    res.status(404).json({
      error: result.error
    });
  }
});

/**
 * Send input to terminal
 * POST /api/terminal/:terminalId/input
 */
router.post('/terminal/:terminalId/input', (req, res) => {
  const { terminalId } = req.params;
  const { data } = req.body;

  if (!data) {
    return res.status(400).json({
      error: 'Input data is required'
    });
  }

  const result = terminalService.sendTerminalInput(terminalId, data);

  if (result.success) {
    res.json({ success: true });
  } else {
    res.status(500).json({
      error: result.error
    });
  }
});

/**
 * Close terminal
 * DELETE /api/terminal/:terminalId
 */
router.delete('/terminal/:terminalId', (req, res) => {
  const { terminalId } = req.params;

  const result = terminalService.closeTerminal(terminalId);

  if (result.success) {
    eventBus.emit('terminal-closed', {
      terminalId
    });

    res.json({ success: true });
  } else {
    res.status(500).json({
      error: result.error
    });
  }
});

module.exports = router;

5. Integration with Existing Code

5.1 Modify ClaudeService to use EventBus

// In services/claude-service.js

const eventBus = require('./event-bus');

class ClaudeCodeService {
  constructor() {
    // ... existing code ...
  }

  // Replace callback-based approach with EventBus
  sendCommand(sessionId, command) {
    const session = this.sessions.get(sessionId);
    if (!session) {
      throw new Error(`Session ${sessionId} not found`);
    }

    // ... send command to process ...

    // Instead of callback, emit event
    eventBus.emit('command-sent', {
      sessionId,
      command,
      timestamp: Date.now()
    });
  }

  // When output is received from Claude Code process
  handleOutput(sessionId, output) {
    // Emit event instead of callback
    eventBus.emit('session-output', {
      sessionId,
      type: output.type || 'stdout',
      content: output.content || output,
      timestamp: Date.now()
    });
  }

  // When operations are detected
  handleOperationsDetected(sessionId, operations, response) {
    eventBus.emit('operations-detected', {
      sessionId,
      operations,
      response
    });
  }

  // When operations are executed
  handleOperationsExecuted(sessionId, results) {
    eventBus.emit('operations-executed', {
      sessionId,
      results
    });
  }
}

5.2 Update server.js

// In server.js

const eventBus = require('./services/event-bus');
const sseManager = require('./services/sse-manager');
const sessionRoutes = require('./routes/session-routes');
const terminalRoutes = require('./routes/terminal-routes');
const sseRoutes = require('./routes/sse-routes');

// ... existing middleware ...

// Register new routes
app.use('/api', sessionRoutes);
app.use('/api', terminalRoutes);
app.use('/api', sseRoutes);

// Keep legacy routes for backward compatibility (with deprecation warnings)
app.use('/claude/api/claude', (req, res, next) => {
  console.warn('[DEPRECATION] /claude/api/claude/* routes are deprecated. Use /api/session/:sessionId/* instead.');
  next();
});

// ... existing WebSocket code (deprecated) ...

// Graceful shutdown
process.on('SIGTERM', async () => {
  console.log('[Server] SIGTERM received, shutting down gracefully...');

  // Cleanup SSE connections
  sseManager.cleanup();

  // ... existing cleanup ...

  process.exit(0);
});

6. Client-Side Reconnection Logic

6.1 SSE Client Implementation

// Client-side SSE connection manager

class SessionEventStream {
  constructor(sessionId, options = {}) {
    this.sessionId = sessionId;
    this.options = {
      reconnectInterval: 1000, // Initial reconnect delay
      maxReconnectInterval: 30000, // Max reconnect delay
      reconnectDecay: 1.5, // Exponential backoff multiplier
      maxReconnectAttempts: 10,
      heartbeatTimeout: 60000, // Timeout for heartbeat
      ...options
    };

    this.eventSource = null;
    this.reconnectAttempts = 0;
    this.isConnected = false;
    this.listeners = new Map();
    this.lastHeartbeat = Date.now();
    this.heartbeatCheckInterval = null;

    // Connect immediately
    this.connect();
  }

  connect() {
    if (this.eventSource) {
      this.eventSource.close();
    }

    const url = `/api/session/${this.sessionId}/events`;

    console.log(`[SSE] Connecting to ${url}`);

    this.eventSource = new EventSource(url);

    // Connection opened
    this.eventSource.onopen = () => {
      console.log(`[SSE] Connected to session ${this.sessionId}`);
      this.isConnected = true;
      this.reconnectAttempts = 0;

      // Start heartbeat check
      this.startHeartbeatCheck();

      // Emit connected event
      this.emit('connected', {
        sessionId: this.sessionId,
        timestamp: Date.now()
      });
    };

    // Handle named events
    this.eventSource.addEventListener('connected', (e) => {
      const data = JSON.parse(e.data);
      this.emit('connected', data);
      this.lastHeartbeat = Date.now();
    });

    this.eventSource.addEventListener('session-output', (e) => {
      const data = JSON.parse(e.data);
      this.emit('output', data);
      this.lastHeartbeat = Date.now();
    });

    this.eventSource.addEventListener('session-error', (e) => {
      const data = JSON.parse(e.data);
      this.emit('error', data);
      this.lastHeartbeat = Date.now();
    });

    this.eventSource.addEventListener('session-status', (e) => {
      const data = JSON.parse(e.data);
      this.emit('status', data);
      this.lastHeartbeat = Date.now();
    });

    this.eventSource.addEventListener('operations-detected', (e) => {
      const data = JSON.parse(e.data);
      this.emit('operations-detected', data);
      this.lastHeartbeat = Date.now();
    });

    this.eventSource.addEventListener('operations-executed', (e) => {
      const data = JSON.parse(e.data);
      this.emit('operations-executed', data);
      this.lastHeartbeat = Date.now();
    });

    this.eventSource.addEventListener('approval-request', (e) => {
      const data = JSON.parse(e.data);
      this.emit('approval-request', data);
      this.lastHeartbeat = Date.now();
    });

    // Default message handler
    this.eventSource.onmessage = (e) => {
      // Heartbeat comments are handled automatically
      if (e.data.startsWith(':')) {
        this.lastHeartbeat = Date.now();
        return;
      }

      try {
        const data = JSON.parse(e.data);
        this.emit('message', data);
      } catch (error) {
        console.error('[SSE] Error parsing message:', error);
      }
    };

    // Handle errors
    this.eventSource.onerror = (e) => {
      console.error(`[SSE] Error:`, e);

      this.isConnected = false;

      if (this.heartbeatCheckInterval) {
        clearInterval(this.heartbeatCheckInterval);
        this.heartbeatCheckInterval = null;
      }

      // Don't reconnect if it's a 404 (session not found)
      if (e.target && e.target.readyState === 2) {
        console.error(`[SSE] Connection closed permanently`);
        this.emit('disconnected', {
          sessionId: this.sessionId,
          permanent: true
        });
        return;
      }

      // Attempt reconnect
      if (this.reconnectAttempts < this.options.maxReconnectAttempts) {
        const delay = Math.min(
          this.options.reconnectInterval * Math.pow(this.options.reconnectDecay, this.reconnectAttempts),
          this.options.maxReconnectInterval
        );

        this.reconnectAttempts++;

        console.log(`[SSE] Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts}/${this.options.maxReconnectAttempts})`);

        this.emit('reconnecting', {
          attempt: this.reconnectAttempts,
          delay,
          maxAttempts: this.options.maxReconnectAttempts
        });

        setTimeout(() => this.connect(), delay);
      } else {
        console.error(`[SSE] Max reconnect attempts reached`);
        this.emit('disconnected', {
          sessionId: this.sessionId,
          permanent: true,
          reason: 'max_reconnect_attempts'
        });
      }
    };
  }

  startHeartbeatCheck() {
    if (this.heartbeatCheckInterval) {
      clearInterval(this.heartbeatCheckInterval);
    }

    this.heartbeatCheckInterval = setInterval(() => {
      const timeSinceLastHeartbeat = Date.now() - this.lastHeartbeat;

      if (timeSinceLastHeartbeat > this.options.heartbeatTimeout) {
        console.warn(`[SSE] Heartbeat timeout (${timeSinceLastHeartbeat}ms)`);
        this.eventSource.close();
        this.connect(); // Force reconnect
      }
    }, this.options.heartbeatTimeout / 2);
  }

  on(event, handler) {
    if (!this.listeners.has(event)) {
      this.listeners.set(event, []);
    }
    this.listeners.get(event).push(handler);
  }

  off(event, handler) {
    if (!this.listeners.has(event)) {
      return;
    }
    const handlers = this.listeners.get(event);
    const index = handlers.indexOf(handler);
    if (index > -1) {
      handlers.splice(index, 1);
    }
  }

  emit(event, data) {
    if (!this.listeners.has(event)) {
      return;
    }
    for (const handler of this.listeners.get(event)) {
      try {
        handler(data);
      } catch (error) {
        console.error(`[SSE] Error in handler for ${event}:`, error);
      }
    }
  }

  disconnect() {
    console.log(`[SSE] Disconnecting from session ${this.sessionId}`);

    if (this.heartbeatCheckInterval) {
      clearInterval(this.heartbeatCheckInterval);
    }

    if (this.eventSource) {
      this.eventSource.close();
      this.eventSource = null;
    }

    this.isConnected = false;
    this.listeners.clear();
  }
}

// Export for use in UI
window.SessionEventStream = SessionEventStream;

6.2 Usage Example

// In the terminal UI or session page

const sessionId = 'session-123'; // From URL parameter
const eventStream = new SessionEventStream(sessionId, {
  reconnectInterval: 2000,
  maxReconnectAttempts: 20
});

// Listen for output
eventStream.on('output', (data) => {
  console.log('Output:', data.content);
  // Update UI
  appendToTerminal(data.content);
});

// Listen for errors
eventStream.on('error', (data) => {
  console.error('Session error:', data.error);
  showError(data.error);
});

// Listen for connection status
eventStream.on('connected', () => {
  showConnectionStatus('Connected');
});

eventStream.on('reconnecting', (data) => {
  showConnectionStatus(`Reconnecting... (${data.attempt}/${data.maxAttempts})`);
});

// Send command
async function sendCommand(command) {
  const response = await fetch(`/api/session/${sessionId}/prompt`, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json'
    },
    body: JSON.stringify({ command })
  });

  const result = await response.json();

  if (!result.success) {
    console.error('Failed to send command:', result.error);
  }
}

// Cleanup on page unload
window.addEventListener('beforeunload', () => {
  eventStream.disconnect();
});

7. Migration Path

7.1 Phase 1: Add New Infrastructure (Non-Breaking)

Duration: 1-2 weeks

Tasks:

  1. Implement EventBus service
  2. Implement SSE Manager service
  3. Create new route modules (session-routes, terminal-routes, sse-routes)
  4. Add new SSE endpoint alongside existing WebSocket
  5. Deploy new code without removing old functionality

Risk: Low - No breaking changes

Testing:

  • Unit tests for EventBus
  • Integration tests for SSE connections
  • Load testing for SSE streaming
  • Verify WebSocket still works

7.2 Phase 2: Migrate Client-Side (Gradual)

Duration: 2-3 weeks

Tasks:

  1. Update IDE UI to use SSE for new sessions
  2. Keep WebSocket for existing sessions (feature flag)
  3. Add telemetry to track usage patterns
  4. Fix any issues discovered during migration

Risk: Medium - Client-side changes

Testing:

  • A/B testing: SSE vs WebSocket
  • Monitor error rates
  • Performance comparison
  • User feedback

7.3 Phase 3: Deprecate WebSocket

Duration: 1-2 weeks

Tasks:

  1. Add deprecation warnings to WebSocket endpoint
  2. Redirect WebSocket connections to SSE with message
  3. Update all documentation
  4. Add migration guide for any custom clients

Risk: Low - Only affects legacy clients

Testing:

  • Verify deprecation warnings appear
  • Test redirect logic
  • Confirm new clients use SSE

7.4 Phase 4: Remove Legacy Code

Duration: 1 week

Tasks:

  1. Remove WebSocket server code
  2. Remove old client-side WebSocket code
  3. Clean up unused middleware
  4. Update API documentation

Risk: Low - All clients migrated

Testing:

  • Full regression test
  • Load testing
  • Security audit

8. Risk Assessment & Mitigation

8.1 High-Risk Areas

Risk Impact Probability Mitigation
SSE connection drops High Medium Implement exponential backoff reconnection, heartbeat monitoring
nginx buffering issues High Medium Add X-Accel-Buffering: no header, configure nginx properly
EventBus memory leaks High Low Implement listener cleanup, add metrics/monitoring
Session context confusion during migration High Medium Use feature flags, clear versioning, extensive logging
Client compatibility Medium Low Test on multiple browsers, provide polyfills if needed
Performance degradation Medium Low Load testing, optimize event filtering, use compression

8.2 nginx Configuration

# nginx configuration for SSE

location /api/session/ {
  # Disable buffering for SSE
  proxy_buffering off;
  proxy_cache off;

  # SSE-specific headers
  proxy_pass http://localhost:3010;
  proxy_http_version 1.1;
  proxy_set_header Connection '';
  proxy_set_header Cache-Control no-cache;
  proxy_set_header X-Accel-Buffering no;

  # Timeouts
  proxy_read_timeout 86400s;  # 24 hours
  proxy_send_timeout 86400s;

  # Disable buffering
  proxy_buffering off;
}

8.3 Monitoring & Observability

// Add monitoring hooks

eventBus.on('newListener', (event) => {
  console.log(`[EventBus] New listener for ${event}`);
  metrics.increment('eventbus.listeners.added', { event });
});

eventBus.on('removeListener', (event) => {
  console.log(`[EventBus] Removed listener for ${event}`);
  metrics.increment('eventbus.listeners.removed', { event });
});

// SSE connection monitoring
setInterval(() => {
  const activeSessions = sseManager.getActiveSessions();
  const metrics = eventBus.getMetrics();

  console.log('[Metrics]', {
    activeSSEConnections: activeSessions.length,
    sessions: activeSessions,
    eventsEmitted: metrics.eventsEmitted,
    activeListeners: metrics.activeListeners
  });

  // Send to monitoring service
  monitoring.gauge('sse.connections', activeSessions.length);
  monitoring.gauge('eventbus.listeners', metrics.activeListeners);
}, 60000); // Every minute

9. Testing Strategy

9.1 Unit Tests

// tests/event-bus.test.js

describe('EventBus', () => {
  it('should emit and receive events', (done) => {
    eventBus.subscribe('test-event', null, (data) => {
      assert.equal(data.test, 'value');
      done();
    });

    eventBus.emit('test-event', { test: 'value' });
  });

  it('should filter events by session ID', (done) => {
    const handler = jest.fn();

    eventBus.subscribe('test-event', 'session-1', handler);
    eventBus.emit('test-event', { sessionId: 'session-1' });
    eventBus.emit('test-event', { sessionId: 'session-2' });

    expect(handler).toHaveBeenCalledTimes(1);
    expect(handler).toHaveBeenCalledWith(expect.objectContaining({
      sessionId: 'session-1'
    }));

    done();
  });

  it('should unsubscribe correctly', (done) => {
    const handler = jest.fn();

    const unsubscribe = eventBus.subscribe('test-event', null, handler);
    unsubscribe();

    eventBus.emit('test-event', { test: 'value' });

    expect(handler).not.toHaveBeenCalled();
    done();
  });
});

9.2 Integration Tests

// tests/sse-integration.test.js

describe('SSE Integration', () => {
  it('should establish SSE connection and receive events', async () => {
    const sessionId = 'test-session-' + Date.now();

    // Create session
    const session = await createSession(sessionId);

    // Connect to SSE
    const events = [];
    const eventSource = new EventSource(`/api/session/${sessionId}/events`);

    eventSource.addEventListener('connected', (e) => {
      events.push(JSON.parse(e.data));

      // Send command
      fetch(`/api/session/${sessionId}/prompt`, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ command: 'echo test' })
      });
    });

    eventSource.addEventListener('session-output', (e) => {
      events.push(JSON.parse(e.data));
      eventSource.close();
    });

    // Wait for events
    await waitFor(() => events.length >= 2);

    assert.equal(events[0].type, 'connected');
    assert.equal(events[1].type, 'session-output');
  });

  it('should handle reconnection after connection drop', async () => {
    // Test reconnection logic
  });
});

9.3 Load Testing

// tests/load-test.js

// Simulate 100 concurrent SSE connections
const connections = [];

for (let i = 0; i < 100; i++) {
  const sessionId = `load-test-${i}`;
  const eventSource = new EventSource(`/api/session/${sessionId}/events`);

  connections.push({ sessionId, eventSource });
}

// Monitor memory usage, CPU, response times
// ...

// Cleanup
connections.forEach(conn => conn.eventSource.close());

10. File Structure Summary

/home/uroma/obsidian-web-interface/
├── services/
│   ├── event-bus.js           ← NEW
│   ├── sse-manager.js         ← NEW
│   ├── claude-service.js      ← MODIFY
│   ├── terminal-service.js    ← MODIFY
│   └── database.js            ← NO CHANGE
├── routes/
│   ├── session-routes.js      ← NEW
│   ├── terminal-routes.js     ← NEW
│   └── sse-routes.js          ← NEW
├── middleware/
│   └── validation.js          ← NEW (session ID validation)
├── public/
│   ├── js/
│   │   ├── sse-client.js      ← NEW
│   │   └── session-ui.js      ← MODIFY
│   └── ...
├── tests/
│   ├── event-bus.test.js      ← NEW
│   ├── sse-manager.test.js    ← NEW
│   └── integration.test.js    ← NEW
├── server.js                  ← MODIFY (add route registration)
└── package.json               ← MODIFY (no new deps needed)

11. Dependencies

No new npm dependencies required! SSE is built into Node.js/Express:

  • EventEmitter: Built into Node.js
  • Express: Already installed
  • EventSource: Built into modern browsers

Optional additions:

  • @types/node - For TypeScript support (if migrating to TS)
  • supertest - For testing HTTP endpoints

12. Success Criteria

12.1 Functional Requirements

  • SSE endpoint accepts connections for valid session IDs
  • SSE endpoint returns 404 for invalid session IDs
  • Session events are streamed to connected clients
  • Multiple clients can connect to same session
  • Reconnection works with exponential backoff
  • Heartbeat prevents connection drops
  • No session context ambiguity

12.2 Non-Functional Requirements

  • Response time < 100ms for event delivery
  • Support 100+ concurrent SSE connections
  • Memory usage stable over 24 hours
  • No memory leaks in EventBus
  • Error rate < 0.1%
  • WebSocket deprecated with clear migration path

12.3 Migration Success

  • All clients using SSE (WebSocket usage < 5%)
  • No increase in error rates during migration
  • User feedback positive
  • Documentation complete and accurate

13. Rollback Plan

If issues arise during deployment:

  1. Immediate Rollback (Phase 1-2):

    • Disable new SSE routes via feature flag
    • Continue using WebSocket
    • Zero downtime
  2. Partial Rollback (Phase 3):

    • Keep SSE available
    • Re-enable WebSocket as fallback
    • Investigate issues
  3. Code Rollback:

    • Revert to previous commit
    • Restore database if schema changed
    • Restart services
  4. Data Migration:

    • No data migration required (stateless design)

14. Documentation Updates

14.1 API Documentation

Update README.md and API docs with:

  • New SSE endpoint documentation
  • Event type reference
  • Code examples for client-side usage
  • Migration guide from WebSocket to SSE

14.2 Architecture Diagrams

Create/update diagrams showing:

  • Event flow from Claude Service to clients via EventBus
  • SSE connection lifecycle
  • Reconnection logic flow

15. Next Steps

  1. Review this plan with team and stakeholders
  2. Create GitHub issues for each phase
  3. Set up feature flags for gradual rollout
  4. Implement EventBus (Phase 1)
  5. Write tests for EventBus
  6. Implement SSE Manager (Phase 1)
  7. Deploy Phase 1 to staging environment
  8. Monitor metrics and gather feedback
  9. Proceed to Phase 2 when confident

Appendix A: Code Snippets

A.1 Session ID Validation Middleware

// middleware/validation.js

const SESSION_ID_PATTERN = /^[a-zA-Z0-9_-]{8,32}$/;

function validateSessionId(req, res, next) {
  const { sessionId } = req.params;

  if (!sessionId) {
    return res.status(400).json({
      error: 'Session ID is required'
    });
  }

  if (!SESSION_ID_PATTERN.test(sessionId)) {
    return res.status(400).json({
      error: 'Invalid session ID format',
      pattern: '8-32 characters, alphanumeric, underscore, hyphen'
    });
  }

  // Check session exists (optional, can be done in route handler)
  const claudeService = require('../services/claude-service');
  const session = claudeService.getSession(sessionId);

  if (!session) {
    return res.status(404).json({
      error: 'Session not found',
      sessionId
    });
  }

  req.sessionContext = session;
  next();
}

module.exports = { validateSessionId };

A.2 Complete server.js Integration

// In server.js - add these sections

// After existing imports
const eventBus = require('./services/event-bus');
const sseManager = require('./services/sse-manager');
const sessionRoutes = require('./routes/session-routes');
const terminalRoutes = require('./routes/terminal-routes');
const sseRoutes = require('./routes/sse-routes');

// After existing middleware
// Register new API routes
app.use('/api', sessionRoutes);
app.use('/api', terminalRoutes);
app.use('/api', sseRoutes);

// Deprecated routes (with warnings)
app.use('/claude/api/claude', (req, res, next) => {
  console.warn('[DEPRECATED] /claude/api/claude/* is deprecated. Use /api/session/:sessionId/* instead.');
  console.warn('[DEPRECATED] WebSocket connections should migrate to SSE: /api/session/:sessionId/events');
  next();
});

// Monitoring endpoint
app.get('/api/debug/metrics', (req, res) => {
  res.json({
    eventBus: eventBus.getMetrics(),
    sse: {
      activeSessions: sseManager.getActiveSessions(),
      totalConnections: Array.from(sseManager.connections.values())
        .reduce((sum, set) => sum + set.size, 0)
    }
  });
});

// Graceful shutdown
const cleanup = async () => {
  console.log('[Server] Starting graceful shutdown...');

  // Cleanup SSE connections
  sseManager.cleanup();

  // ... existing cleanup code ...

  process.exit(0);
};

process.on('SIGTERM', cleanup);
process.on('SIGINT', cleanup);

End of Refactor Plan

Last Updated: 2025-01-21 Version: 1.0 Author: Backend Architecture Team