Files
SuperCharged-Claude-Code-Up…/services/sse-manager.js
uroma 55aafbae9a Fix project isolation: Make loadChatHistory respect active project sessions
- Modified loadChatHistory() to check for active project before fetching all sessions
- When active project exists, use project.sessions instead of fetching from API
- Added detailed console logging to debug session filtering
- This prevents ALL sessions from appearing in every project's sidebar

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-22 14:43:05 +00:00

332 lines
11 KiB
JavaScript

/**
* SSEManager - Manages Server-Sent Events connections
*
* Handles SSE connection lifecycle, heartbeat, and event delivery
* for real-time streaming of session events to clients.
*/
const eventBus = require('./event-bus');
class SSEManager {
constructor() {
// Map: sessionId -> Set of response objects
this.connections = new Map();
// Heartbeat configuration
this.heartbeatInterval = 30000; // 30 seconds
this.heartbeatTimers = new Map();
// Connection tracking
this.totalConnectionsCreated = 0;
this.totalConnectionsClosed = 0;
}
/**
* Add SSE connection for a session
* @param {string} sessionId - Session ID
* @param {Object} res - Express response object
* @param {Object} req - Express request object
*/
addConnection(sessionId, res, req) {
// Setup SSE headers
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache, no-transform');
res.setHeader('X-Accel-Buffering', 'no'); // Disable nginx buffering
res.setHeader('Connection', 'keep-alive');
// Flush headers to establish connection
res.flushHeaders();
// Add connection to set
if (!this.connections.has(sessionId)) {
this.connections.set(sessionId, new Set());
}
const connectionSet = this.connections.get(sessionId);
connectionSet.add(res);
this.totalConnectionsCreated++;
console.log(`[SSEManager] ✓ Connection added for session ${sessionId}`);
console.log(`[SSEManager] Total connections for ${sessionId}: ${connectionSet.size}`);
console.log(`[SSEManager] Total active sessions: ${this.connections.size}`);
console.log(`[SSEManager] Total connections created: ${this.totalConnectionsCreated}`);
// Get client info
const clientIp = req.ip || req.socket.remoteAddress;
const userAgent = req.get('User-Agent') || 'unknown';
// Send initial connection event
this.sendEvent(sessionId, res, {
type: 'connected',
sessionId,
timestamp: Date.now(),
message: 'SSE connection established',
client: {
ip: clientIp,
userAgent: userAgent.substring(0, 100) // Truncate very long user agents
}
});
// Start heartbeat for this connection
this.startHeartbeat(sessionId, res);
// Subscribe to all session events from EventBus
const unsubscribe = eventBus.subscribeToSession(sessionId, (eventData) => {
console.log(`[SSEManager] Received event for session ${sessionId}:`, eventData.type, eventData._eventType);
this.sendEvent(sessionId, res, eventData);
});
// Handle client disconnect
req.on('close', () => {
this.removeConnection(sessionId, res, unsubscribe);
});
// Log initial session status
eventBus.emit('session-status', {
sessionId,
status: 'sse-connected',
connections: connectionSet.size
});
}
/**
* Send SSE event to a specific connection
* @param {string} sessionId - Session ID
* @param {Object} res - Response object
* @param {Object} data - Event data
* @returns {boolean} True if sent successfully
*/
sendEvent(sessionId, res, data) {
// Check if connection is still alive
if (res.destroyed || res.writableEnded || res.closed) {
console.log(`[SSEManager] Skipping send to ${sessionId} - connection closed`);
return false;
}
try {
// Use the EventBus event type (_eventType) for the SSE event name
// This ensures frontend receives events like 'session-output' not 'stdout'
const eventName = data._eventType || data.type || 'message';
const eventData = JSON.stringify(data);
console.log(`[SSEManager] SENDING to ${sessionId}: event=${eventName}, _eventType=${data._eventType}, type=${data.type}`);
// SSE format: event: <name>\ndata: <json>\nid: <id>\n\n
res.write(`event: ${eventName}\n`);
res.write(`data: ${eventData}\n`);
res.write(`id: ${Date.now()}\n`);
res.write('\n');
console.log(`[SSEManager] ✓ Sent to ${sessionId}: ${eventName}`);
return true;
} catch (error) {
console.error(`[SSEManager] ✗ Error sending event to ${sessionId}:`, error.message);
return false;
}
}
/**
* Send event to all connections for a session
* @param {string} sessionId - Session ID
* @param {Object} data - Event data
* @returns {number} Number of clients the event was sent to
*/
broadcastToSession(sessionId, data) {
const connectionSet = this.connections.get(sessionId);
if (!connectionSet) {
return 0;
}
let sentCount = 0;
const deadConnections = [];
for (const res of connectionSet) {
if (!this.sendEvent(sessionId, res, data)) {
deadConnections.push(res);
} else {
sentCount++;
}
}
// Clean up dead connections
deadConnections.forEach(res => {
console.warn(`[SSEManager] Removing dead connection for session ${sessionId}`);
connectionSet.delete(res);
this.totalConnectionsClosed++;
});
if (deadConnections.length > 0) {
console.log(`[SSEManager] Cleaned up ${deadConnections.length} dead connection(s) for ${sessionId}`);
}
return sentCount;
}
/**
* Start heartbeat for a connection
* @param {string} sessionId - Session ID
* @param {Object} res - Response object
*/
startHeartbeat(sessionId, res) {
const connectionId = `${sessionId}-${res.socket?.remotePort || Math.random()}`;
const timerId = setInterval(() => {
if (res.destroyed || res.writableEnded || res.closed) {
clearInterval(timerId);
this.heartbeatTimers.delete(connectionId);
return;
}
// Send heartbeat comment (keeps connection alive)
try {
res.write(': heartbeat\n\n');
} catch (error) {
console.error(`[SSEManager] Heartbeat failed for ${sessionId}:`, error.message);
clearInterval(timerId);
this.heartbeatTimers.delete(connectionId);
this.removeConnection(sessionId, res, () => {});
}
}, this.heartbeatInterval);
this.heartbeatTimers.set(connectionId, timerId);
console.log(`[SSEManager] Heartbeat started for ${sessionId} (${this.heartbeatInterval}ms interval)`);
}
/**
* Remove SSE connection
* @param {string} sessionId - Session ID
* @param {Object} res - Response object
* @param {Function} unsubscribe - Unsubscribe function from EventBus
*/
removeConnection(sessionId, res, unsubscribe) {
const connectionSet = this.connections.get(sessionId);
if (connectionSet) {
connectionSet.delete(res);
if (connectionSet.size === 0) {
this.connections.delete(sessionId);
console.log(`[SSEManager] No more connections for session ${sessionId}, removing from tracking`);
}
this.totalConnectionsClosed++;
}
console.log(`[SSEManager] ✗ Connection removed for session ${sessionId}`);
console.log(`[SSEManager] Remaining connections for ${sessionId}: ${connectionSet?.size || 0}`);
// Stop heartbeat
const connectionId = `${sessionId}-${res.socket?.remotePort || 'unknown'}`;
const timerId = this.heartbeatTimers.get(connectionId);
if (timerId) {
clearInterval(timerId);
this.heartbeatTimers.delete(connectionId);
console.log(`[SSEManager] Heartbeat stopped for ${sessionId}`);
}
// Unsubscribe from events
if (unsubscribe) {
unsubscribe();
}
// Log final status
console.log(`[SSEManager] Total connections created: ${this.totalConnectionsCreated}`);
console.log(`[SSEManager] Total connections closed: ${this.totalConnectionsClosed}`);
console.log(`[SSEManager] Active connections: ${this.totalConnectionsCreated - this.totalConnectionsClosed}`);
}
/**
* Get connection count for a session
* @param {string} sessionId - Session ID
* @returns {number} Number of active connections
*/
getConnectionCount(sessionId) {
return this.connections.get(sessionId)?.size || 0;
}
/**
* Get all active session IDs
* @returns {Array<string>} Array of session IDs with active connections
*/
getActiveSessions() {
return Array.from(this.connections.keys());
}
/**
* Get detailed connection stats
* @returns {Object} Connection statistics
*/
getStats() {
const sessionStats = {};
for (const [sessionId, connectionSet] of this.connections.entries()) {
sessionStats[sessionId] = connectionSet.size;
}
return {
totalSessions: this.connections.size,
totalConnections: Array.from(this.connections.values())
.reduce((sum, set) => sum + set.size, 0),
sessions: sessionStats,
totalCreated: this.totalConnectionsCreated,
totalClosed: this.totalConnectionsClosed,
activeHeartbeats: this.heartbeatTimers.size
};
}
/**
* Clean up all connections (for shutdown)
*/
cleanup() {
console.log('[SSEManager] Cleaning up all connections...');
let closedCount = 0;
for (const [sessionId, connectionSet] of this.connections.entries()) {
for (const res of connectionSet) {
try {
res.write('event: shutdown\ndata: {"message":"Server shutting down"}\n\n');
res.end();
closedCount++;
} catch (error) {
console.error(`[SSEManager] Error closing connection for ${sessionId}:`, error.message);
}
}
}
// Clear all heartbeat timers
for (const timerId of this.heartbeatTimers.values()) {
clearInterval(timerId);
}
this.connections.clear();
this.heartbeatTimers.clear();
console.log(`[SSEManager] Cleanup complete. Closed ${closedCount} connection(s)`);
}
/**
* Log connection stats
*/
logStats() {
const stats = this.getStats();
console.log('[SSEManager] Connection Stats:');
console.log(` Active sessions: ${stats.totalSessions}`);
console.log(` Total connections: ${stats.totalConnections}`);
console.log(` Sessions:`, stats.sessions);
console.log(` Total created: ${stats.totalCreated}`);
console.log(` Total closed: ${stats.totalClosed}`);
console.log(` Active heartbeats: ${stats.activeHeartbeats}`);
}
}
// Singleton instance
const sseManager = new SSEManager();
// Log stats periodically (every 5 minutes in production)
if (process.env.NODE_ENV === 'production') {
setInterval(() => {
sseManager.logStats();
}, 5 * 60 * 1000);
}
module.exports = sseManager;