# SSE Refactor: Quick Reference ## File Locations ``` /home/uroma/obsidian-web-interface/ ├── services/ │ ├── event-bus.js ← NEW: Event pub/sub system │ ├── sse-manager.js ← NEW: SSE connection management │ └── claude-service.js ← MODIFY: Emit events instead of callbacks ├── routes/ │ ├── session-routes.js ← NEW: Session API endpoints │ └── sse-routes.js ← NEW: SSE streaming endpoint ├── middleware/ │ └── validation.js ← NEW: Request validation middleware ├── public/js/ │ └── sse-client.js ← NEW: Client-side SSE manager └── server.js ← MODIFY: Register new routes ``` ## Key Code Snippets ### 1. EventBus Usage ```javascript const eventBus = require('./services/event-bus'); // Emit an event eventBus.emit('session-output', { sessionId: 'session-123', type: 'stdout', content: 'Hello from terminal' }); // Subscribe to events const unsubscribe = eventBus.subscribe('session-output', 'session-123', (data) => { console.log('Output:', data.content); }); // Unsubscribe when done unsubscribe(); // Subscribe to all session events const unsubAll = eventBus.subscribeToSession('session-123', (data) => { console.log('Event:', data._eventType, data); }); ``` ### 2. SSE Manager Usage ```javascript const sseManager = require('./services/sse-manager'); // In route handler app.get('/api/session/:sessionId/events', (req, res) => { const { sessionId } = req.params; sseManager.addConnection(sessionId, res, req); // Response stays open for streaming }); // Get connection stats const stats = sseManager.getStats(); // { totalSessions: 5, totalConnections: 12, ... } // Get connection count for specific session const count = sseManager.getConnectionCount('session-123'); // Broadcast to all connections for a session sseManager.broadcastToSession('session-123', { type: 'custom-event', data: 'hello' }); ``` ### 3. Client-Side Usage ```javascript // Include script // Create connection const eventStream = new SessionEventStream('session-123', { reconnectInterval: 2000, maxReconnectAttempts: 20 }); // Listen for events eventStream.on('output', (data) => { console.log('Output:', data.content); }); eventStream.on('error', (data) => { console.error('Error:', data.error); }); eventStream.on('connected', () => { console.log('Connected!'); }); // Send command via REST API async function sendCommand(command) { await fetch(`/api/session/session-123/prompt`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ command }) }); } // Cleanup eventStream.disconnect(); ``` ### 4. Validation Middleware ```javascript const { validateSessionId, validateCommand } = require('./middleware/validation'); // Validate session ID parameter router.get('/api/session/:sessionId/status', validateSessionId, (req, res) => { // req.sessionContext is available res.json(req.sessionContext); }); // Validate request body router.post('/api/session/:sessionId/prompt', validateSessionId, validateCommand, (req, res) => { // req.body.command is validated const { command } = req.body; }); ``` ## API Endpoints ### SSE Endpoints | Endpoint | Method | Description | |----------|--------|-------------| | `/api/session/:sessionId/events` | GET | SSE event stream | | `/api/session/:sessionId/events/status` | GET | Connection status | | `/api/sse/stats` | GET | Global SSE stats | ### Session Endpoints | Endpoint | Method | Description | |----------|--------|-------------| | `/api/session/:sessionId/prompt` | POST | Send command | | `/api/session/:sessionId/status` | GET | Session status | | `/api/session/:sessionId/context` | GET | Session context | | `/api/session/:sessionId/operations/preview` | POST | Preview operations | | `/api/session/:sessionId/operations/execute` | POST | Execute operations | | `/api/session/:sessionId` | DELETE | Delete session | | `/api/session/:sessionId/duplicate` | POST | Duplicate session | | `/api/session/:sessionId/fork` | POST | Fork session | | `/api/session/:sessionId/move` | POST | Move to project | ## Event Types | Event Type | Description | Fields | |------------|-------------|--------| | `connected` | SSE connection established | `sessionId`, `timestamp` | | `session-output` | Output from Claude | `type`, `content`, `sessionId` | | `session-error` | Session error | `error`, `code`, `recoverable` | | `session-status` | Status update | `status`, `pid`, `uptime` | | `operations-detected` | Operations found | `operations[]`, `response` | | `operations-executed` | Operations completed | `results[]` | | `operations-error` | Operations failed | `error`, `operations[]` | | `approval-request` | Approval needed | `approvalId`, `command`, `explanation` | | `approval-confirmed` | Approval handled | `approvalId`, `approved` | | `approval-expired` | Approval timeout | `approvalId` | | `command-sent` | Command sent | `sessionId`, `command` | | `session-created` | New session | `sessionId`, `mode` | | `session-deleted` | Session removed | `sessionId` | ## Migration Checklist ### Phase 1: Infrastructure (1-2 weeks) - [ ] Implement EventBus service - [ ] Implement SSE Manager service - [ ] Create session routes - [ ] Create SSE routes - [ ] Create validation middleware - [ ] Add routes to server.js - [ ] Write unit tests - [ ] Deploy to staging ### Phase 2: Client Migration (2-3 weeks) - [ ] Create SSE client library - [ ] Update IDE UI for new sessions - [ ] Add feature flag for SSE vs WebSocket - [ ] A/B test SSE vs WebSocket - [ ] Monitor metrics - [ ] Fix bugs - [ ] Document findings ### Phase 3: Deprecation (1-2 weeks) - [ ] Add deprecation warnings to WebSocket - [ ] Update documentation - [ ] Redirect old routes - [ ] Communicate with users ### Phase 4: Cleanup (1 week) - [ ] Remove WebSocket code - [ ] Remove legacy client code - [ ] Clean up unused dependencies - [ ] Final regression test ## Monitoring Endpoints ```bash # Event bus metrics curl http://localhost:3010/api/debug/metrics # SSE connection stats curl http://localhost:3010/api/sse/stats # Session status curl http://localhost:3010/api/session/SESSION_ID/status # Connection count for session curl http://localhost:3010/api/session/SESSION_ID/events/status ``` ## Configuration ### Server-Side (no config needed) EventBus and SSEManager work out of the box with sensible defaults. ### Client-Side Options ```javascript const options = { reconnectInterval: 1000, // Initial reconnect delay (ms) maxReconnectInterval: 30000, // Max reconnect delay (ms) reconnectDecay: 1.5, // Exponential backoff maxReconnectAttempts: 10, // Max reconnect attempts heartbeatTimeout: 60000 // Heartbeat timeout (ms) }; const stream = new SessionEventStream('session-123', options); ``` ## Testing ```bash # Test SSE connection curl -N http://localhost:3010/api/session/SESSION_ID/events # Test with event output curl -N -H "Accept: text/event-stream" \ http://localhost:3010/api/session/SESSION_ID/events # Send command curl -X POST http://localhost:3010/api/session/SESSION_ID/prompt \ -H "Content-Type: application/json" \ -d '{"command":"echo hello"}' # Check metrics curl http://localhost:3010/api/debug/metrics | jq ``` ## nginx Configuration ```nginx location /api/session/ { proxy_buffering off; proxy_cache off; proxy_pass http://localhost:3010; proxy_http_version 1.1; proxy_set_header Connection ''; proxy_set_header Cache-Control no-cache; proxy_set_header X-Accel-Buffering no; proxy_read_timeout 86400s; proxy_send_timeout 86400s; } ``` ## Common Issues | Issue | Solution | |-------|----------| | Connection closes immediately | Check nginx buffering settings | | No events received | Verify session ID is valid | | Frequent reconnects | Increase heartbeat timeout | | Memory leaks | Ensure listeners are unsubscribed | | 404 on session endpoint | Check session exists, validate format | ## Rollback Plan If issues occur: 1. **Phase 1-2:** Disable new routes, keep using WebSocket 2. **Phase 3:** Re-enable WebSocket as fallback 3. **Any phase:** Revert git commit, restart services ## Support - Full plan: `SSE_REFACTOR_PLAN.md` - Implementation guide: `SSE_IMPLEMENTATION_GUIDE.md` - Code: `services/event-bus.js`, `services/sse-manager.js` - Routes: `routes/session-routes.js`, `routes/sse-routes.js` --- Last Updated: 2025-01-21 Version: 1.0