Files
SuperCharged-Claude-Code-Up…/skills/ralph/observability_server.py
Claude 237b307262 Add Ralph Python implementation and framework integration updates
## Ralph Skill - Complete Python Implementation
- __main__.py: Main entry point for Ralph autonomous agent
- agent_capability_registry.py: Agent capability registry (FIXED syntax error)
- dynamic_agent_selector.py: Dynamic agent selection logic
- meta_agent_orchestrator.py: Meta-orchestration for multi-agent workflows
- worker_agent.py: Worker agent implementation
- ralph_agent_integration.py: Integration with Claude Code
- superpowers_integration.py: Superpowers framework integration
- observability_dashboard.html: Real-time observability UI
- observability_server.py: Dashboard server
- multi-agent-architecture.md: Architecture documentation
- SUPERPOWERS_INTEGRATION.md: Integration guide

## Framework Integration Status
-  codebase-indexer (Chippery): Complete implementation with 5 scripts
-  ralph (Ralph Orchestrator): Complete Python implementation
-  always-use-superpowers: Declarative skill (SKILL.md)
-  auto-superpowers: Declarative skill (SKILL.md)
-  auto-dispatcher: Declarative skill (Ralph framework)
-  autonomous-planning: Declarative skill (Ralph framework)
-  mcp-client: Declarative skill (AGIAgent/Agno framework)

## Agent Updates
- Updated README.md with latest integration status
- Added framework integration agents

Token Savings: ~99% via semantic codebase indexing

🤖 Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
2026-01-26 19:02:30 +04:00

391 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Ralph Observability Server
WebSocket server for real-time multi-agent monitoring and observability.
Provides live updates of agent status, task progress, and system metrics.
"""
import json
import asyncio
import websockets
import redis
import logging
from typing import Set, Dict
from dataclasses import dataclass
import os
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('ralph.observability')
@dataclass
class ConnectedClient:
"""Represents a connected WebSocket client"""
websocket: websockets.WebSocketServerProtocol
agent_filter: str = None # Optional filter for specific agent
class ObservabilityServer:
"""
WebSocket server for real-time Ralph multi-agent observability
Provides:
- Live agent status updates
- Task progress tracking
- Conflict detection and alerts
- Performance metrics
- Activity streaming
"""
def __init__(self, host: str = 'localhost', port: int = 3001, redis_config: Dict = None):
"""Initialize the observability server"""
self.host = host
self.port = port
# Redis connection
redis_config = redis_config or {}
self.redis = redis.Redis(
host=redis_config.get('host', 'localhost'),
port=redis_config.get('port', 6379),
db=redis_config.get('db', 0),
password=redis_config.get('password'),
decode_responses=True
)
# Connected clients
self.clients: Set[ConnectedClient] = set()
# Tracking state
self.known_agents: Dict[str, dict] = {}
self.last_conflicts: list = []
logger.info(f"Observability server initialized on {host}:{port}")
async def handle_client(self, websocket: websockets.WebSocketServerProtocol, path: str):
"""Handle a new WebSocket client connection"""
client = ConnectedClient(websocket=websocket)
self.clients.add(client)
logger.info(f"Client connected: {websocket.remote_address}")
try:
# Send initial state
await self.send_initial_state(client)
# Handle incoming messages
async for message in websocket:
await self.handle_message(client, message)
except websockets.exceptions.ConnectionClosed:
logger.info(f"Client disconnected: {websocket.remote_address}")
finally:
self.clients.remove(client)
async def send_initial_state(self, client: ConnectedClient):
"""Send initial system state to newly connected client"""
# Get all agents
agents = self.get_all_agents()
# Get stats
stats = self.get_system_stats()
# Send initial state
initial_message = {
'type': 'initial_state',
'agents': agents,
'stats': stats,
'conflicts': self.last_conflicts
}
try:
await client.websocket.send(json.dumps(initial_message))
except Exception as e:
logger.error(f"Error sending initial state: {e}")
async def handle_message(self, client: ConnectedClient, message: str):
"""Handle incoming message from client"""
try:
data = json.loads(message)
if data.get('type') == 'subscribe_agent':
# Subscribe to specific agent updates
client.agent_filter = data.get('agent_id')
logger.info(f"Client {client.websocket.remote_address} subscribed to {client.agent_filter}")
elif data.get('type') == 'unsubscribe':
client.agent_filter = None
except json.JSONDecodeError:
logger.warning(f"Invalid JSON from client: {message}")
def get_all_agents(self) -> list:
"""Get all agent information from Redis"""
agents = []
# Get all agent keys
agent_keys = self.redis.keys('agent:*')
for key in agent_keys:
agent_data = self.redis.hgetall(key)
if agent_data:
# Parse JSON fields
if 'working_files' in agent_data:
try:
agent_data['working_files'] = json.loads(agent_data['working_files'])
except json.JSONDecodeError:
agent_data['working_files'] = []
if 'progress' in agent_data:
agent_data['progress'] = float(agent_data.get('progress', 0))
if 'completed_count' in agent_data:
agent_data['completedCount'] = int(agent_data.get('completed_count', 0))
agents.append(agent_data)
return agents
def get_system_stats(self) -> dict:
"""Get system-wide statistics"""
# Get task counts
total_tasks = 0
completed_tasks = 0
task_keys = self.redis.keys('task:*')
for key in task_keys:
status = self.redis.hget(key, 'status')
total_tasks += 1
if status == 'complete':
completed_tasks += 1
# Get file locks
lock_keys = self.redis.keys('lock:*')
return {
'total_tasks': total_tasks,
'completed_tasks': completed_tasks,
'active_locks': len(lock_keys),
'agent_count': len(self.get_all_agents())
}
async def broadcast_update(self, message: dict):
"""Broadcast update to all connected clients"""
if not self.clients:
return
message_str = json.dumps(message)
# Remove disconnected clients
disconnected = set()
for client in self.clients:
# Apply agent filter if set
if client.agent_filter:
# Check if message is relevant to this client's filter
agent_id = message.get('agent', {}).get('id') or message.get('agentId')
if agent_id != client.agent_filter:
continue
try:
await client.websocket.send(message_str)
except Exception as e:
logger.warning(f"Error sending to client: {e}")
disconnected.add(client)
# Clean up disconnected clients
self.clients -= disconnected
async def monitor_redis(self):
"""Monitor Redis for updates and broadcast to clients"""
pubsub = self.redis.pubsub()
# Subscribe to channels
channels = [
'ralph:agent_updates',
'ralph:task_updates',
'ralph:conflicts',
'ralph:events'
]
for channel in channels:
pubsub.subscribe(channel)
logger.info(f"Subscribed to Redis channels: {channels}")
async for message in pubsub.listen():
if message['type'] == 'message':
try:
data = json.loads(message['data'])
await self.broadcast_update(data)
except json.JSONDecodeError:
logger.warning(f"Invalid JSON in Redis message: {message['data']}")
async def poll_agent_updates(self):
"""Poll for agent updates (fallback if pubsub not available)"""
while True:
try:
agents = self.get_all_agents()
# Check for updates
for agent_data in agents:
agent_id = agent_data.get('id')
if agent_id not in self.known_agents:
# New agent
self.known_agents[agent_id] = agent_data
await self.broadcast_update({
'type': 'agent_update',
'agent': agent_data
})
else:
# Check for changes
old_data = self.known_agents[agent_id]
if agent_data != old_data:
self.known_agents[agent_id] = agent_data
await self.broadcast_update({
'type': 'agent_update',
'agent': agent_data
})
# Check for removed agents
current_ids = {a.get('id') for a in agents}
known_ids = set(self.known_agents.keys())
for removed_id in known_ids - current_ids:
del self.known_agents[removed_id]
await self.broadcast_update({
'type': 'agent_removed',
'agentId': removed_id
})
await asyncio.sleep(1)
except Exception as e:
logger.error(f"Error polling agent updates: {e}")
await asyncio.sleep(5)
async def monitor_conflicts(self):
"""Monitor for file access conflicts"""
while True:
try:
# Get all active locks
lock_keys = self.redis.keys('lock:*')
# Build file-to-agent mapping
file_agents = {}
for lock_key in lock_keys:
file_path = lock_key.replace('lock:', '')
agent_id = self.redis.get(lock_key)
if agent_id:
if file_path not in file_agents:
file_agents[file_path] = []
file_agents[file_path].append(agent_id)
# Check for conflicts (multiple agents on same file)
conflicts = [
{'file': f, 'agents': agents}
for f, agents in file_agents.items()
if len(agents) > 1
]
# Detect new conflicts
for conflict in conflicts:
if conflict not in self.last_conflicts:
self.last_conflicts.append(conflict)
await self.broadcast_update({
'type': 'conflict',
'conflict': conflict
})
# Detect resolved conflicts
self.last_conflicts = [
c for c in self.last_conflicts
if c in conflicts
]
await asyncio.sleep(2)
except Exception as e:
logger.error(f"Error monitoring conflicts: {e}")
await asyncio.sleep(5)
async def start_http_api(self):
"""Start simple HTTP API for status polling"""
from aiohttp import web
app = web.Application()
async def status_handler(request):
"""Handler for /api/status endpoint"""
agents = self.get_all_agents()
stats = self.get_system_stats()
return web.json_response({
'agents': agents,
'stats': stats,
'conflicts': self.last_conflicts
})
app.router.add_get('/api/status', status_handler)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, self.host, self.port + 1) # HTTP on port+1
await site.start()
logger.info(f"HTTP API started on {self.host}:{self.port + 1}")
async def run(self):
"""Start the observability server"""
logger.info(f"Starting observability server on {self.host}:{self.port}")
# Start HTTP API
await self.start_http_api()
# Start monitoring tasks
monitor_task = asyncio.create_task(self.monitor_redis())
poll_task = asyncio.create_task(self.poll_agent_updates())
conflict_task = asyncio.create_task(self.monitor_conflicts())
# Start WebSocket server
async with websockets.serve(self.handle_client, self.host, self.port):
logger.info(f"WebSocket server listening on {self.host}:{self.port}")
# Keep running
await asyncio.Future()
def main():
"""Main entry point"""
import argparse
parser = argparse.ArgumentParser(description='Ralph Observability Server')
parser.add_argument('--host', default='localhost', help='Host to bind to')
parser.add_argument('--port', type=int, default=3001, help='WebSocket port')
parser.add_argument('--redis-host', default='localhost', help='Redis host')
parser.add_argument('--redis-port', type=int, default=6379, help='Redis port')
args = parser.parse_args()
# Create server
server = ObservabilityServer(
host=args.host,
port=args.port,
redis_config={
'host': args.redis_host,
'port': args.redis_port
}
)
# Run server
asyncio.run(server.run())
if __name__ == '__main__':
main()