#!/usr/bin/env python3 """ Ralph Observability Server WebSocket server for real-time multi-agent monitoring and observability. Provides live updates of agent status, task progress, and system metrics. """ import json import asyncio import websockets import redis import logging from typing import Set, Dict from dataclasses import dataclass import os # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('ralph.observability') @dataclass class ConnectedClient: """Represents a connected WebSocket client""" websocket: websockets.WebSocketServerProtocol agent_filter: str = None # Optional filter for specific agent class ObservabilityServer: """ WebSocket server for real-time Ralph multi-agent observability Provides: - Live agent status updates - Task progress tracking - Conflict detection and alerts - Performance metrics - Activity streaming """ def __init__(self, host: str = 'localhost', port: int = 3001, redis_config: Dict = None): """Initialize the observability server""" self.host = host self.port = port # Redis connection redis_config = redis_config or {} self.redis = redis.Redis( host=redis_config.get('host', 'localhost'), port=redis_config.get('port', 6379), db=redis_config.get('db', 0), password=redis_config.get('password'), decode_responses=True ) # Connected clients self.clients: Set[ConnectedClient] = set() # Tracking state self.known_agents: Dict[str, dict] = {} self.last_conflicts: list = [] logger.info(f"Observability server initialized on {host}:{port}") async def handle_client(self, websocket: websockets.WebSocketServerProtocol, path: str): """Handle a new WebSocket client connection""" client = ConnectedClient(websocket=websocket) self.clients.add(client) logger.info(f"Client connected: {websocket.remote_address}") try: # Send initial state await self.send_initial_state(client) # Handle incoming messages async for message in websocket: await self.handle_message(client, message) except websockets.exceptions.ConnectionClosed: logger.info(f"Client disconnected: {websocket.remote_address}") finally: self.clients.remove(client) async def send_initial_state(self, client: ConnectedClient): """Send initial system state to newly connected client""" # Get all agents agents = self.get_all_agents() # Get stats stats = self.get_system_stats() # Send initial state initial_message = { 'type': 'initial_state', 'agents': agents, 'stats': stats, 'conflicts': self.last_conflicts } try: await client.websocket.send(json.dumps(initial_message)) except Exception as e: logger.error(f"Error sending initial state: {e}") async def handle_message(self, client: ConnectedClient, message: str): """Handle incoming message from client""" try: data = json.loads(message) if data.get('type') == 'subscribe_agent': # Subscribe to specific agent updates client.agent_filter = data.get('agent_id') logger.info(f"Client {client.websocket.remote_address} subscribed to {client.agent_filter}") elif data.get('type') == 'unsubscribe': client.agent_filter = None except json.JSONDecodeError: logger.warning(f"Invalid JSON from client: {message}") def get_all_agents(self) -> list: """Get all agent information from Redis""" agents = [] # Get all agent keys agent_keys = self.redis.keys('agent:*') for key in agent_keys: agent_data = self.redis.hgetall(key) if agent_data: # Parse JSON fields if 'working_files' in agent_data: try: agent_data['working_files'] = json.loads(agent_data['working_files']) except json.JSONDecodeError: agent_data['working_files'] = [] if 'progress' in agent_data: agent_data['progress'] = float(agent_data.get('progress', 0)) if 'completed_count' in agent_data: agent_data['completedCount'] = int(agent_data.get('completed_count', 0)) agents.append(agent_data) return agents def get_system_stats(self) -> dict: """Get system-wide statistics""" # Get task counts total_tasks = 0 completed_tasks = 0 task_keys = self.redis.keys('task:*') for key in task_keys: status = self.redis.hget(key, 'status') total_tasks += 1 if status == 'complete': completed_tasks += 1 # Get file locks lock_keys = self.redis.keys('lock:*') return { 'total_tasks': total_tasks, 'completed_tasks': completed_tasks, 'active_locks': len(lock_keys), 'agent_count': len(self.get_all_agents()) } async def broadcast_update(self, message: dict): """Broadcast update to all connected clients""" if not self.clients: return message_str = json.dumps(message) # Remove disconnected clients disconnected = set() for client in self.clients: # Apply agent filter if set if client.agent_filter: # Check if message is relevant to this client's filter agent_id = message.get('agent', {}).get('id') or message.get('agentId') if agent_id != client.agent_filter: continue try: await client.websocket.send(message_str) except Exception as e: logger.warning(f"Error sending to client: {e}") disconnected.add(client) # Clean up disconnected clients self.clients -= disconnected async def monitor_redis(self): """Monitor Redis for updates and broadcast to clients""" pubsub = self.redis.pubsub() # Subscribe to channels channels = [ 'ralph:agent_updates', 'ralph:task_updates', 'ralph:conflicts', 'ralph:events' ] for channel in channels: pubsub.subscribe(channel) logger.info(f"Subscribed to Redis channels: {channels}") async for message in pubsub.listen(): if message['type'] == 'message': try: data = json.loads(message['data']) await self.broadcast_update(data) except json.JSONDecodeError: logger.warning(f"Invalid JSON in Redis message: {message['data']}") async def poll_agent_updates(self): """Poll for agent updates (fallback if pubsub not available)""" while True: try: agents = self.get_all_agents() # Check for updates for agent_data in agents: agent_id = agent_data.get('id') if agent_id not in self.known_agents: # New agent self.known_agents[agent_id] = agent_data await self.broadcast_update({ 'type': 'agent_update', 'agent': agent_data }) else: # Check for changes old_data = self.known_agents[agent_id] if agent_data != old_data: self.known_agents[agent_id] = agent_data await self.broadcast_update({ 'type': 'agent_update', 'agent': agent_data }) # Check for removed agents current_ids = {a.get('id') for a in agents} known_ids = set(self.known_agents.keys()) for removed_id in known_ids - current_ids: del self.known_agents[removed_id] await self.broadcast_update({ 'type': 'agent_removed', 'agentId': removed_id }) await asyncio.sleep(1) except Exception as e: logger.error(f"Error polling agent updates: {e}") await asyncio.sleep(5) async def monitor_conflicts(self): """Monitor for file access conflicts""" while True: try: # Get all active locks lock_keys = self.redis.keys('lock:*') # Build file-to-agent mapping file_agents = {} for lock_key in lock_keys: file_path = lock_key.replace('lock:', '') agent_id = self.redis.get(lock_key) if agent_id: if file_path not in file_agents: file_agents[file_path] = [] file_agents[file_path].append(agent_id) # Check for conflicts (multiple agents on same file) conflicts = [ {'file': f, 'agents': agents} for f, agents in file_agents.items() if len(agents) > 1 ] # Detect new conflicts for conflict in conflicts: if conflict not in self.last_conflicts: self.last_conflicts.append(conflict) await self.broadcast_update({ 'type': 'conflict', 'conflict': conflict }) # Detect resolved conflicts self.last_conflicts = [ c for c in self.last_conflicts if c in conflicts ] await asyncio.sleep(2) except Exception as e: logger.error(f"Error monitoring conflicts: {e}") await asyncio.sleep(5) async def start_http_api(self): """Start simple HTTP API for status polling""" from aiohttp import web app = web.Application() async def status_handler(request): """Handler for /api/status endpoint""" agents = self.get_all_agents() stats = self.get_system_stats() return web.json_response({ 'agents': agents, 'stats': stats, 'conflicts': self.last_conflicts }) app.router.add_get('/api/status', status_handler) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, self.host, self.port + 1) # HTTP on port+1 await site.start() logger.info(f"HTTP API started on {self.host}:{self.port + 1}") async def run(self): """Start the observability server""" logger.info(f"Starting observability server on {self.host}:{self.port}") # Start HTTP API await self.start_http_api() # Start monitoring tasks monitor_task = asyncio.create_task(self.monitor_redis()) poll_task = asyncio.create_task(self.poll_agent_updates()) conflict_task = asyncio.create_task(self.monitor_conflicts()) # Start WebSocket server async with websockets.serve(self.handle_client, self.host, self.port): logger.info(f"WebSocket server listening on {self.host}:{self.port}") # Keep running await asyncio.Future() def main(): """Main entry point""" import argparse parser = argparse.ArgumentParser(description='Ralph Observability Server') parser.add_argument('--host', default='localhost', help='Host to bind to') parser.add_argument('--port', type=int, default=3001, help='WebSocket port') parser.add_argument('--redis-host', default='localhost', help='Redis host') parser.add_argument('--redis-port', type=int, default=6379, help='Redis port') args = parser.parse_args() # Create server server = ObservabilityServer( host=args.host, port=args.port, redis_config={ 'host': args.redis_host, 'port': args.redis_port } ) # Run server asyncio.run(server.run()) if __name__ == '__main__': main()