Files
SuperCharged-Claude-Code-Up…/skills/ralph/observability_server.py
uroma 87748afb75 feat: Complete sync of all Claude Code CLI upgrades
- Add all 21 commands (clawd, ralph, prometheus*, dexto*)
- Add all hooks (intelligent-router, clawd-*, prometheus-wrapper, unified-integration-v2)
- Add skills (ralph, prometheus master)
- Add MCP servers (registry.json, manager.sh)
- Add plugins directory with marketplaces
- Add health-check.sh and aliases.sh scripts
- Complete repository synchronization with local ~/.claude/

Total changes: 100+ new files added
All integrations now fully backed up in repository

🤖 Generated with Claude Code
Co-Authored-By: Claude <noreply@anthropic.com>
2026-01-27 20:39:25 +00:00

391 lines
13 KiB
Python

#!/usr/bin/env python3
"""
Ralph Observability Server
WebSocket server for real-time multi-agent monitoring and observability.
Provides live updates of agent status, task progress, and system metrics.
"""
import json
import asyncio
import websockets
import redis
import logging
from typing import Set, Dict
from dataclasses import dataclass
import os
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('ralph.observability')
@dataclass
class ConnectedClient:
"""Represents a connected WebSocket client"""
websocket: websockets.WebSocketServerProtocol
agent_filter: str = None # Optional filter for specific agent
class ObservabilityServer:
"""
WebSocket server for real-time Ralph multi-agent observability
Provides:
- Live agent status updates
- Task progress tracking
- Conflict detection and alerts
- Performance metrics
- Activity streaming
"""
def __init__(self, host: str = 'localhost', port: int = 3001, redis_config: Dict = None):
"""Initialize the observability server"""
self.host = host
self.port = port
# Redis connection
redis_config = redis_config or {}
self.redis = redis.Redis(
host=redis_config.get('host', 'localhost'),
port=redis_config.get('port', 6379),
db=redis_config.get('db', 0),
password=redis_config.get('password'),
decode_responses=True
)
# Connected clients
self.clients: Set[ConnectedClient] = set()
# Tracking state
self.known_agents: Dict[str, dict] = {}
self.last_conflicts: list = []
logger.info(f"Observability server initialized on {host}:{port}")
async def handle_client(self, websocket: websockets.WebSocketServerProtocol, path: str):
"""Handle a new WebSocket client connection"""
client = ConnectedClient(websocket=websocket)
self.clients.add(client)
logger.info(f"Client connected: {websocket.remote_address}")
try:
# Send initial state
await self.send_initial_state(client)
# Handle incoming messages
async for message in websocket:
await self.handle_message(client, message)
except websockets.exceptions.ConnectionClosed:
logger.info(f"Client disconnected: {websocket.remote_address}")
finally:
self.clients.remove(client)
async def send_initial_state(self, client: ConnectedClient):
"""Send initial system state to newly connected client"""
# Get all agents
agents = self.get_all_agents()
# Get stats
stats = self.get_system_stats()
# Send initial state
initial_message = {
'type': 'initial_state',
'agents': agents,
'stats': stats,
'conflicts': self.last_conflicts
}
try:
await client.websocket.send(json.dumps(initial_message))
except Exception as e:
logger.error(f"Error sending initial state: {e}")
async def handle_message(self, client: ConnectedClient, message: str):
"""Handle incoming message from client"""
try:
data = json.loads(message)
if data.get('type') == 'subscribe_agent':
# Subscribe to specific agent updates
client.agent_filter = data.get('agent_id')
logger.info(f"Client {client.websocket.remote_address} subscribed to {client.agent_filter}")
elif data.get('type') == 'unsubscribe':
client.agent_filter = None
except json.JSONDecodeError:
logger.warning(f"Invalid JSON from client: {message}")
def get_all_agents(self) -> list:
"""Get all agent information from Redis"""
agents = []
# Get all agent keys
agent_keys = self.redis.keys('agent:*')
for key in agent_keys:
agent_data = self.redis.hgetall(key)
if agent_data:
# Parse JSON fields
if 'working_files' in agent_data:
try:
agent_data['working_files'] = json.loads(agent_data['working_files'])
except json.JSONDecodeError:
agent_data['working_files'] = []
if 'progress' in agent_data:
agent_data['progress'] = float(agent_data.get('progress', 0))
if 'completed_count' in agent_data:
agent_data['completedCount'] = int(agent_data.get('completed_count', 0))
agents.append(agent_data)
return agents
def get_system_stats(self) -> dict:
"""Get system-wide statistics"""
# Get task counts
total_tasks = 0
completed_tasks = 0
task_keys = self.redis.keys('task:*')
for key in task_keys:
status = self.redis.hget(key, 'status')
total_tasks += 1
if status == 'complete':
completed_tasks += 1
# Get file locks
lock_keys = self.redis.keys('lock:*')
return {
'total_tasks': total_tasks,
'completed_tasks': completed_tasks,
'active_locks': len(lock_keys),
'agent_count': len(self.get_all_agents())
}
async def broadcast_update(self, message: dict):
"""Broadcast update to all connected clients"""
if not self.clients:
return
message_str = json.dumps(message)
# Remove disconnected clients
disconnected = set()
for client in self.clients:
# Apply agent filter if set
if client.agent_filter:
# Check if message is relevant to this client's filter
agent_id = message.get('agent', {}).get('id') or message.get('agentId')
if agent_id != client.agent_filter:
continue
try:
await client.websocket.send(message_str)
except Exception as e:
logger.warning(f"Error sending to client: {e}")
disconnected.add(client)
# Clean up disconnected clients
self.clients -= disconnected
async def monitor_redis(self):
"""Monitor Redis for updates and broadcast to clients"""
pubsub = self.redis.pubsub()
# Subscribe to channels
channels = [
'ralph:agent_updates',
'ralph:task_updates',
'ralph:conflicts',
'ralph:events'
]
for channel in channels:
pubsub.subscribe(channel)
logger.info(f"Subscribed to Redis channels: {channels}")
async for message in pubsub.listen():
if message['type'] == 'message':
try:
data = json.loads(message['data'])
await self.broadcast_update(data)
except json.JSONDecodeError:
logger.warning(f"Invalid JSON in Redis message: {message['data']}")
async def poll_agent_updates(self):
"""Poll for agent updates (fallback if pubsub not available)"""
while True:
try:
agents = self.get_all_agents()
# Check for updates
for agent_data in agents:
agent_id = agent_data.get('id')
if agent_id not in self.known_agents:
# New agent
self.known_agents[agent_id] = agent_data
await self.broadcast_update({
'type': 'agent_update',
'agent': agent_data
})
else:
# Check for changes
old_data = self.known_agents[agent_id]
if agent_data != old_data:
self.known_agents[agent_id] = agent_data
await self.broadcast_update({
'type': 'agent_update',
'agent': agent_data
})
# Check for removed agents
current_ids = {a.get('id') for a in agents}
known_ids = set(self.known_agents.keys())
for removed_id in known_ids - current_ids:
del self.known_agents[removed_id]
await self.broadcast_update({
'type': 'agent_removed',
'agentId': removed_id
})
await asyncio.sleep(1)
except Exception as e:
logger.error(f"Error polling agent updates: {e}")
await asyncio.sleep(5)
async def monitor_conflicts(self):
"""Monitor for file access conflicts"""
while True:
try:
# Get all active locks
lock_keys = self.redis.keys('lock:*')
# Build file-to-agent mapping
file_agents = {}
for lock_key in lock_keys:
file_path = lock_key.replace('lock:', '')
agent_id = self.redis.get(lock_key)
if agent_id:
if file_path not in file_agents:
file_agents[file_path] = []
file_agents[file_path].append(agent_id)
# Check for conflicts (multiple agents on same file)
conflicts = [
{'file': f, 'agents': agents}
for f, agents in file_agents.items()
if len(agents) > 1
]
# Detect new conflicts
for conflict in conflicts:
if conflict not in self.last_conflicts:
self.last_conflicts.append(conflict)
await self.broadcast_update({
'type': 'conflict',
'conflict': conflict
})
# Detect resolved conflicts
self.last_conflicts = [
c for c in self.last_conflicts
if c in conflicts
]
await asyncio.sleep(2)
except Exception as e:
logger.error(f"Error monitoring conflicts: {e}")
await asyncio.sleep(5)
async def start_http_api(self):
"""Start simple HTTP API for status polling"""
from aiohttp import web
app = web.Application()
async def status_handler(request):
"""Handler for /api/status endpoint"""
agents = self.get_all_agents()
stats = self.get_system_stats()
return web.json_response({
'agents': agents,
'stats': stats,
'conflicts': self.last_conflicts
})
app.router.add_get('/api/status', status_handler)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, self.host, self.port + 1) # HTTP on port+1
await site.start()
logger.info(f"HTTP API started on {self.host}:{self.port + 1}")
async def run(self):
"""Start the observability server"""
logger.info(f"Starting observability server on {self.host}:{self.port}")
# Start HTTP API
await self.start_http_api()
# Start monitoring tasks
monitor_task = asyncio.create_task(self.monitor_redis())
poll_task = asyncio.create_task(self.poll_agent_updates())
conflict_task = asyncio.create_task(self.monitor_conflicts())
# Start WebSocket server
async with websockets.serve(self.handle_client, self.host, self.port):
logger.info(f"WebSocket server listening on {self.host}:{self.port}")
# Keep running
await asyncio.Future()
def main():
"""Main entry point"""
import argparse
parser = argparse.ArgumentParser(description='Ralph Observability Server')
parser.add_argument('--host', default='localhost', help='Host to bind to')
parser.add_argument('--port', type=int, default=3001, help='WebSocket port')
parser.add_argument('--redis-host', default='localhost', help='Redis host')
parser.add_argument('--redis-port', type=int, default=6379, help='Redis port')
args = parser.parse_args()
# Create server
server = ObservabilityServer(
host=args.host,
port=args.port,
redis_config={
'host': args.redis_host,
'port': args.redis_port
}
)
# Run server
asyncio.run(server.run())
if __name__ == '__main__':
main()