Files
SuperCharged-Claude-Code-Up…/skills/ralph/meta_agent_orchestrator.py
uroma 87748afb75 feat: Complete sync of all Claude Code CLI upgrades
- Add all 21 commands (clawd, ralph, prometheus*, dexto*)
- Add all hooks (intelligent-router, clawd-*, prometheus-wrapper, unified-integration-v2)
- Add skills (ralph, prometheus master)
- Add MCP servers (registry.json, manager.sh)
- Add plugins directory with marketplaces
- Add health-check.sh and aliases.sh scripts
- Complete repository synchronization with local ~/.claude/

Total changes: 100+ new files added
All integrations now fully backed up in repository

🤖 Generated with Claude Code
Co-Authored-By: Claude <noreply@anthropic.com>
2026-01-27 20:39:25 +00:00

614 lines
19 KiB
Python

#!/usr/bin/env python3
"""
Ralph Meta-Agent Orchestrator
Manages multi-agent orchestration for Ralph, including:
- Task breakdown and dependency management
- Worker agent spawning and coordination
- File locking and conflict resolution
- Progress tracking and observability
"""
import json
import os
import redis
import subprocess
import sys
import time
from typing import List, Dict, Optional, Set
from dataclasses import dataclass, asdict
from enum import Enum
import hashlib
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('.ralph/multi-agent.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('ralph.orchestrator')
class TaskType(Enum):
"""Types of tasks that can be executed"""
ANALYSIS = "analysis"
FRONTEND = "frontend"
BACKEND = "backend"
TESTING = "testing"
DOCS = "docs"
REFACTOR = "refactor"
DEPLOYMENT = "deployment"
class TaskStatus(Enum):
"""Task execution status"""
PENDING = "pending"
QUEUED = "queued"
RUNNING = "running"
COMPLETE = "complete"
FAILED = "failed"
CANCELLED = "cancelled"
class AgentStatus(Enum):
"""Worker agent status"""
IDLE = "idle"
BUSY = "busy"
ERROR = "error"
OFFLINE = "offline"
@dataclass
class Task:
"""Represents a unit of work"""
id: str
type: TaskType
description: str
dependencies: List[str]
files: List[str]
priority: int = 5
specialization: Optional[str] = None
timeout: int = 300
retry_count: int = 0
max_retries: int = 3
status: TaskStatus = TaskStatus.PENDING
result: Optional[str] = None
error: Optional[str] = None
created_at: float = None
started_at: Optional[float] = None
completed_at: Optional[float] = None
def __post_init__(self):
if self.created_at is None:
self.created_at = time.time()
def to_dict(self) -> Dict:
"""Convert to dictionary, handling enums"""
data = asdict(self)
data['type'] = self.type.value
data['status'] = self.status.value
return data
@classmethod
def from_dict(cls, data: Dict) -> 'Task':
"""Create from dictionary, handling enums"""
if isinstance(data.get('type'), str):
data['type'] = TaskType(data['type'])
if isinstance(data.get('status'), str):
data['status'] = TaskStatus(data['status'])
return cls(**data)
@dataclass
class AgentInfo:
"""Information about a worker agent"""
id: str
specialization: str
status: AgentStatus
current_task: Optional[str] = None
working_files: List[str] = None
progress: float = 0.0
completed_count: int = 0
last_heartbeat: float = None
def __post_init__(self):
if self.working_files is None:
self.working_files = []
if self.last_heartbeat is None:
self.last_heartbeat = time.time()
def to_dict(self) -> Dict:
"""Convert to dictionary, handling enums"""
data = asdict(self)
data['status'] = self.status.value
return data
class MetaAgent:
"""
Meta-Agent Orchestrator for Ralph Multi-Agent System
Coordinates multiple Claude worker agents to execute complex tasks
in parallel with intelligent conflict resolution and observability.
"""
def __init__(self, config: Optional[Dict] = None):
"""Initialize the meta-agent orchestrator"""
self.config = config or self._load_config()
# Redis connection
self.redis = redis.Redis(
host=self.config.get('task_queue_host', 'localhost'),
port=self.config.get('task_queue_port', 6379),
db=self.config.get('task_queue_db', 0),
password=self.config.get('task_queue_password'),
decode_responses=True
)
# Configuration
self.max_workers = self.config.get('max_workers', 12)
self.min_workers = self.config.get('min_workers', 2)
self.agent_timeout = self.config.get('agent_timeout', 300)
self.file_lock_timeout = self.config.get('file_lock_timeout', 300)
self.max_retries = self.config.get('max_retries', 3)
# Queue names
self.task_queue = 'claude_tasks'
self.pending_queue = 'claude_tasks:pending'
self.complete_queue = 'claude_tasks:complete'
self.failed_queue = 'claude_tasks:failed'
# Worker agents
self.workers: Dict[str, AgentInfo] = {}
# Tasks
self.tasks: Dict[str, Task] = {}
logger.info("Meta-Agent Orchestrator initialized")
def _load_config(self) -> Dict:
"""Load configuration from environment variables"""
return {
'max_workers': int(os.getenv('RALPH_MAX_WORKERS', 12)),
'min_workers': int(os.getenv('RALPH_MIN_WORKERS', 2)),
'task_queue_host': os.getenv('RALPH_TASK_QUEUE_HOST', 'localhost'),
'task_queue_port': int(os.getenv('RALPH_TASK_QUEUE_PORT', 6379)),
'task_queue_db': int(os.getenv('RALPH_TASK_QUEUE_DB', 0)),
'task_queue_password': os.getenv('RALPH_TASK_QUEUE_PASSWORD'),
'agent_timeout': int(os.getenv('RALPH_AGENT_TIMEOUT', 300)),
'file_lock_timeout': int(os.getenv('RALPH_FILE_LOCK_TIMEOUT', 300)),
'max_retries': int(os.getenv('RALPH_MAX_RETRIES', 3)),
'observability_enabled': os.getenv('RALPH_OBSERVABILITY_ENABLED', 'true').lower() == 'true',
'observability_port': int(os.getenv('RALPH_OBSERVABILITY_PORT', 3001)),
'observability_host': os.getenv('RALPH_OBSERVABILITY_HOST', 'localhost'),
}
def analyze_project(self, requirements: str, project_context: Optional[Dict] = None) -> List[Task]:
"""
Analyze requirements and break into parallelizable tasks
Args:
requirements: User requirements/task description
project_context: Optional project context (files, structure, etc.)
Returns:
List of tasks with dependencies
"""
logger.info(f"Analyzing requirements: {requirements[:100]}...")
# Build analysis prompt
prompt = self._build_analysis_prompt(requirements, project_context)
# Call Claude to analyze
task_data = self._call_claude_analysis(prompt)
# Parse and create tasks
tasks = []
for item in task_data:
task = Task(
id=item['id'],
type=TaskType(item.get('type', 'analysis')),
description=item['description'],
dependencies=item.get('dependencies', []),
files=item.get('files', []),
priority=item.get('priority', 5),
specialization=item.get('specialization'),
timeout=item.get('timeout', 300)
)
tasks.append(task)
self.tasks[task.id] = task
logger.info(f"Created {len(tasks)} tasks from requirements")
return tasks
def _build_analysis_prompt(self, requirements: str, project_context: Optional[Dict]) -> str:
"""Build prompt for Claude analysis"""
prompt = f"""You are a task orchestration expert. Analyze these requirements and break them into independent tasks that can be executed in parallel by specialized AI agents.
REQUIREMENTS:
{requirements}
"""
if project_context:
prompt += f"""
PROJECT CONTEXT:
{json.dumps(project_context, indent=2)}
"""
prompt += """
Return a JSON array of tasks. Each task must have:
- id: unique identifier (e.g., "task-001", "task-002")
- type: one of [analysis, frontend, backend, testing, docs, refactor, deployment]
- description: clear description of what needs to be done
- dependencies: array of task IDs that must complete first (empty if no dependencies)
- files: array of files this task will modify (empty if analysis task)
- priority: 1-10 (higher = more important, default 5)
- specialization: optional specific agent type if needed
- timeout: estimated seconds to complete (default 300)
IMPORTANT:
- Maximize parallelization - minimize dependencies
- Group related file modifications in single tasks
- Consider file access conflicts when creating tasks
- Include testing tasks for implementation tasks
- Include documentation tasks for user-facing features
Example output format:
[
{
"id": "analyze-1",
"type": "analysis",
"description": "Analyze codebase structure and identify components",
"dependencies": [],
"files": [],
"priority": 10,
"timeout": 120
},
{
"id": "refactor-auth",
"type": "refactor",
"description": "Refactor authentication components",
"dependencies": ["analyze-1"],
"files": ["src/auth/**/*.ts"],
"priority": 8,
"specialization": "frontend"
}
]
"""
return prompt
def _call_claude_analysis(self, prompt: str) -> List[Dict]:
"""Call Claude for task analysis"""
# This would integrate with Claude Code API
# For now, return a mock response
logger.warning("Using mock Claude analysis - implement actual API call")
# Example mock response
return [
{
"id": "analyze-1",
"type": "analysis",
"description": "Analyze project structure",
"dependencies": [],
"files": [],
"priority": 10,
"timeout": 120
}
]
def distribute_tasks(self, tasks: List[Task]):
"""
Distribute tasks to worker agents, respecting dependencies
Args:
tasks: List of tasks to distribute
"""
logger.info(f"Distributing {len(tasks)} tasks")
# Sort tasks by dependencies (topological sort)
sorted_tasks = self._topological_sort(tasks)
# Queue tasks
for task in sorted_tasks:
self._queue_task(task)
logger.info(f"Queued {len(sorted_tasks)} tasks")
def _topological_sort(self, tasks: List[Task]) -> List[Task]:
"""
Sort tasks by dependencies using topological sort
Args:
tasks: List of tasks with dependencies
Returns:
Tasks sorted in dependency order
"""
# Build dependency graph
task_map = {task.id: task for task in tasks}
in_degree = {task.id: len(task.dependencies) for task in tasks}
queue = [task_id for task_id, degree in in_degree.items() if degree == 0]
result = []
while queue:
# Sort by priority
queue.sort(key=lambda tid: task_map[tid].priority, reverse=True)
task_id = queue.pop(0)
result.append(task_map[task_id])
# Update in-degree for dependent tasks
for task in tasks:
if task_id in task.dependencies:
in_degree[task.id] -= 1
if in_degree[task.id] == 0:
queue.append(task.id)
# Check for circular dependencies
if len(result) != len(tasks):
logger.warning("Circular dependencies detected, returning partial sort")
return result
def _queue_task(self, task: Task):
"""
Queue a task for execution
Args:
task: Task to queue
"""
# Check if dependencies are complete
if self._dependencies_complete(task):
# Queue for immediate execution
self.redis.lpush(self.task_queue, json.dumps(task.to_dict()))
task.status = TaskStatus.QUEUED
else:
# Queue for later
self.redis.lpush(self.pending_queue, json.dumps(task.to_dict()))
# Store task status
self.redis.hset(f"task:{task.id}", mapping=task.to_dict())
def _dependencies_complete(self, task: Task) -> bool:
"""
Check if task dependencies are complete
Args:
task: Task to check
Returns:
True if all dependencies are complete
"""
for dep_id in task.dependencies:
if dep_id not in self.tasks:
logger.warning(f"Unknown dependency: {dep_id}")
return False
dep_task = self.tasks[dep_id]
if dep_task.status != TaskStatus.COMPLETE:
return False
return True
def spawn_worker_agents(self, count: Optional[int] = None):
"""
Spawn worker agents for parallel execution
Args:
count: Number of agents to spawn (default from config)
"""
if count is None:
count = self.max_workers
logger.info(f"Spawning {count} worker agents")
specializations = ['frontend', 'backend', 'testing', 'docs', 'refactor', 'analysis']
for i in range(count):
agent_id = f"agent-{i}"
specialization = specializations[i % len(specializations)]
agent = AgentInfo(
id=agent_id,
specialization=specialization,
status=AgentStatus.IDLE
)
self.workers[agent_id] = agent
self._start_worker_process(agent)
# Store agent info in Redis
self.redis.hset(f"agent:{agent_id}", mapping=agent.to_dict())
logger.info(f"Spawned {len(self.workers)} worker agents")
def _start_worker_process(self, agent: AgentInfo):
"""
Start a worker agent process
Args:
agent: Agent info
"""
# This would start the actual worker process
# For now, just log
logger.info(f"Starting worker process: {agent.id} ({agent.specialization})")
# Example: Would use subprocess to start worker
# subprocess.Popen([
# 'claude-code',
# '--mode', 'worker',
# '--id', agent.id,
# '--specialization', agent.specialization
# ])
def monitor_tasks(self):
"""
Monitor task execution and handle failures
Runs continuously until all tasks complete
"""
logger.info("Starting task monitoring")
try:
while True:
# Check if all tasks complete
if self._all_tasks_complete():
logger.info("All tasks completed")
break
# Check for failed tasks
self._handle_failed_tasks()
# Check for pending tasks ready to execute
self._check_pending_tasks()
# Update agent heartbeats
self._update_heartbeats()
# Check for stale agents
self._check_stale_agents()
time.sleep(1)
except KeyboardInterrupt:
logger.info("Monitoring interrupted by user")
def _all_tasks_complete(self) -> bool:
"""Check if all tasks are complete"""
return all(
task.status in [TaskStatus.COMPLETE, TaskStatus.CANCELLED]
for task in self.tasks.values()
)
def _handle_failed_tasks(self):
"""Handle failed tasks with retry logic"""
for task in self.tasks.values():
if task.status == TaskStatus.FAILED:
if task.retry_count < task.max_retries:
logger.info(f"Retrying task {task.id} (attempt {task.retry_count + 1})")
task.retry_count += 1
task.status = TaskStatus.PENDING
self._queue_task(task)
else:
logger.error(f"Task {task.id} failed after {task.max_retries} retries")
def _check_pending_tasks(self):
"""Check if pending tasks can now be executed"""
pending = self.redis.lrange(self.pending_queue, 0, -1)
for task_json in pending:
task = Task.from_dict(json.loads(task_json))
if self._dependencies_complete(task):
# Move to main queue
self.redis.lrem(self.pending_queue, 1, task_json)
self.redis.lpush(self.task_queue, task_json)
logger.info(f"Task {task.id} dependencies complete, queued")
def _update_heartbeats(self):
"""Update agent heartbeats from Redis"""
for agent_id in self.workers.keys():
agent_data = self.redis.hgetall(f"agent:{agent_id}")
if agent_data:
agent = self.workers[agent_id]
agent.last_heartbeat = float(agent_data.get('last_heartbeat', time.time()))
def _check_stale_agents(self):
"""Check for agents that haven't sent heartbeat"""
timeout = self.config.get('agent_timeout', 300)
now = time.time()
for agent in self.workers.values():
if agent.status != AgentStatus.OFFLINE:
if now - agent.last_heartbeat > timeout:
logger.warning(f"Agent {agent.id} appears stale (last heartbeat {timeout}s ago)")
agent.status = AgentStatus.OFFLINE
def generate_report(self) -> Dict:
"""
Generate execution report
Returns:
Dictionary with execution statistics
"""
total_tasks = len(self.tasks)
complete_tasks = sum(1 for t in self.tasks.values() if t.status == TaskStatus.COMPLETE)
failed_tasks = sum(1 for t in self.tasks.values() if t.status == TaskStatus.FAILED)
total_duration = max(
(t.completed_at - t.created_at for t in self.tasks.values() if t.completed_at),
default=0
)
report = {
'total_tasks': total_tasks,
'complete_tasks': complete_tasks,
'failed_tasks': failed_tasks,
'success_rate': complete_tasks / total_tasks if total_tasks > 0 else 0,
'total_duration_seconds': total_duration,
'worker_count': len(self.workers),
'tasks_by_type': self._count_tasks_by_type(),
'tasks_by_status': self._count_tasks_by_status(),
}
return report
def _count_tasks_by_type(self) -> Dict[str, int]:
"""Count tasks by type"""
counts = {}
for task in self.tasks.values():
type_name = task.type.value
counts[type_name] = counts.get(type_name, 0) + 1
return counts
def _count_tasks_by_status(self) -> Dict[str, int]:
"""Count tasks by status"""
counts = {}
for task in self.tasks.values():
status_name = task.status.value
counts[status_name] = counts.get(status_name, 0) + 1
return counts
def main():
"""Main entry point for CLI usage"""
import argparse
parser = argparse.ArgumentParser(description='Ralph Meta-Agent Orchestrator')
parser.add_argument('requirements', help='Task requirements')
parser.add_argument('--workers', type=int, help='Number of worker agents')
parser.add_argument('--config', help='Config file path')
args = parser.parse_args()
# Load config
config = {}
if args.config:
with open(args.config) as f:
config = json.load(f)
# Create orchestrator
orchestrator = MetaAgent(config)
# Analyze requirements
tasks = orchestrator.analyze_project(args.requirements)
# Distribute tasks
orchestrator.distribute_tasks(tasks)
# Spawn workers
orchestrator.spawn_worker_agents(args.workers)
# Monitor execution
orchestrator.monitor_tasks()
# Generate report
report = orchestrator.generate_report()
print("\n=== EXECUTION REPORT ===")
print(json.dumps(report, indent=2))
if __name__ == '__main__':
main()