#!/usr/bin/env python3 """ Ralph Meta-Agent Orchestrator Manages multi-agent orchestration for Ralph, including: - Task breakdown and dependency management - Worker agent spawning and coordination - File locking and conflict resolution - Progress tracking and observability """ import json import os import redis import subprocess import sys import time from typing import List, Dict, Optional, Set from dataclasses import dataclass, asdict from enum import Enum import hashlib import logging # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('.ralph/multi-agent.log'), logging.StreamHandler() ] ) logger = logging.getLogger('ralph.orchestrator') class TaskType(Enum): """Types of tasks that can be executed""" ANALYSIS = "analysis" FRONTEND = "frontend" BACKEND = "backend" TESTING = "testing" DOCS = "docs" REFACTOR = "refactor" DEPLOYMENT = "deployment" class TaskStatus(Enum): """Task execution status""" PENDING = "pending" QUEUED = "queued" RUNNING = "running" COMPLETE = "complete" FAILED = "failed" CANCELLED = "cancelled" class AgentStatus(Enum): """Worker agent status""" IDLE = "idle" BUSY = "busy" ERROR = "error" OFFLINE = "offline" @dataclass class Task: """Represents a unit of work""" id: str type: TaskType description: str dependencies: List[str] files: List[str] priority: int = 5 specialization: Optional[str] = None timeout: int = 300 retry_count: int = 0 max_retries: int = 3 status: TaskStatus = TaskStatus.PENDING result: Optional[str] = None error: Optional[str] = None created_at: float = None started_at: Optional[float] = None completed_at: Optional[float] = None def __post_init__(self): if self.created_at is None: self.created_at = time.time() def to_dict(self) -> Dict: """Convert to dictionary, handling enums""" data = asdict(self) data['type'] = self.type.value data['status'] = self.status.value return data @classmethod def from_dict(cls, data: Dict) -> 'Task': """Create from dictionary, handling enums""" if isinstance(data.get('type'), str): data['type'] = TaskType(data['type']) if isinstance(data.get('status'), str): data['status'] = TaskStatus(data['status']) return cls(**data) @dataclass class AgentInfo: """Information about a worker agent""" id: str specialization: str status: AgentStatus current_task: Optional[str] = None working_files: List[str] = None progress: float = 0.0 completed_count: int = 0 last_heartbeat: float = None def __post_init__(self): if self.working_files is None: self.working_files = [] if self.last_heartbeat is None: self.last_heartbeat = time.time() def to_dict(self) -> Dict: """Convert to dictionary, handling enums""" data = asdict(self) data['status'] = self.status.value return data class MetaAgent: """ Meta-Agent Orchestrator for Ralph Multi-Agent System Coordinates multiple Claude worker agents to execute complex tasks in parallel with intelligent conflict resolution and observability. """ def __init__(self, config: Optional[Dict] = None): """Initialize the meta-agent orchestrator""" self.config = config or self._load_config() # Redis connection self.redis = redis.Redis( host=self.config.get('task_queue_host', 'localhost'), port=self.config.get('task_queue_port', 6379), db=self.config.get('task_queue_db', 0), password=self.config.get('task_queue_password'), decode_responses=True ) # Configuration self.max_workers = self.config.get('max_workers', 12) self.min_workers = self.config.get('min_workers', 2) self.agent_timeout = self.config.get('agent_timeout', 300) self.file_lock_timeout = self.config.get('file_lock_timeout', 300) self.max_retries = self.config.get('max_retries', 3) # Queue names self.task_queue = 'claude_tasks' self.pending_queue = 'claude_tasks:pending' self.complete_queue = 'claude_tasks:complete' self.failed_queue = 'claude_tasks:failed' # Worker agents self.workers: Dict[str, AgentInfo] = {} # Tasks self.tasks: Dict[str, Task] = {} logger.info("Meta-Agent Orchestrator initialized") def _load_config(self) -> Dict: """Load configuration from environment variables""" return { 'max_workers': int(os.getenv('RALPH_MAX_WORKERS', 12)), 'min_workers': int(os.getenv('RALPH_MIN_WORKERS', 2)), 'task_queue_host': os.getenv('RALPH_TASK_QUEUE_HOST', 'localhost'), 'task_queue_port': int(os.getenv('RALPH_TASK_QUEUE_PORT', 6379)), 'task_queue_db': int(os.getenv('RALPH_TASK_QUEUE_DB', 0)), 'task_queue_password': os.getenv('RALPH_TASK_QUEUE_PASSWORD'), 'agent_timeout': int(os.getenv('RALPH_AGENT_TIMEOUT', 300)), 'file_lock_timeout': int(os.getenv('RALPH_FILE_LOCK_TIMEOUT', 300)), 'max_retries': int(os.getenv('RALPH_MAX_RETRIES', 3)), 'observability_enabled': os.getenv('RALPH_OBSERVABILITY_ENABLED', 'true').lower() == 'true', 'observability_port': int(os.getenv('RALPH_OBSERVABILITY_PORT', 3001)), 'observability_host': os.getenv('RALPH_OBSERVABILITY_HOST', 'localhost'), } def analyze_project(self, requirements: str, project_context: Optional[Dict] = None) -> List[Task]: """ Analyze requirements and break into parallelizable tasks Args: requirements: User requirements/task description project_context: Optional project context (files, structure, etc.) Returns: List of tasks with dependencies """ logger.info(f"Analyzing requirements: {requirements[:100]}...") # Build analysis prompt prompt = self._build_analysis_prompt(requirements, project_context) # Call Claude to analyze task_data = self._call_claude_analysis(prompt) # Parse and create tasks tasks = [] for item in task_data: task = Task( id=item['id'], type=TaskType(item.get('type', 'analysis')), description=item['description'], dependencies=item.get('dependencies', []), files=item.get('files', []), priority=item.get('priority', 5), specialization=item.get('specialization'), timeout=item.get('timeout', 300) ) tasks.append(task) self.tasks[task.id] = task logger.info(f"Created {len(tasks)} tasks from requirements") return tasks def _build_analysis_prompt(self, requirements: str, project_context: Optional[Dict]) -> str: """Build prompt for Claude analysis""" prompt = f"""You are a task orchestration expert. Analyze these requirements and break them into independent tasks that can be executed in parallel by specialized AI agents. REQUIREMENTS: {requirements} """ if project_context: prompt += f""" PROJECT CONTEXT: {json.dumps(project_context, indent=2)} """ prompt += """ Return a JSON array of tasks. Each task must have: - id: unique identifier (e.g., "task-001", "task-002") - type: one of [analysis, frontend, backend, testing, docs, refactor, deployment] - description: clear description of what needs to be done - dependencies: array of task IDs that must complete first (empty if no dependencies) - files: array of files this task will modify (empty if analysis task) - priority: 1-10 (higher = more important, default 5) - specialization: optional specific agent type if needed - timeout: estimated seconds to complete (default 300) IMPORTANT: - Maximize parallelization - minimize dependencies - Group related file modifications in single tasks - Consider file access conflicts when creating tasks - Include testing tasks for implementation tasks - Include documentation tasks for user-facing features Example output format: [ { "id": "analyze-1", "type": "analysis", "description": "Analyze codebase structure and identify components", "dependencies": [], "files": [], "priority": 10, "timeout": 120 }, { "id": "refactor-auth", "type": "refactor", "description": "Refactor authentication components", "dependencies": ["analyze-1"], "files": ["src/auth/**/*.ts"], "priority": 8, "specialization": "frontend" } ] """ return prompt def _call_claude_analysis(self, prompt: str) -> List[Dict]: """Call Claude for task analysis""" # This would integrate with Claude Code API # For now, return a mock response logger.warning("Using mock Claude analysis - implement actual API call") # Example mock response return [ { "id": "analyze-1", "type": "analysis", "description": "Analyze project structure", "dependencies": [], "files": [], "priority": 10, "timeout": 120 } ] def distribute_tasks(self, tasks: List[Task]): """ Distribute tasks to worker agents, respecting dependencies Args: tasks: List of tasks to distribute """ logger.info(f"Distributing {len(tasks)} tasks") # Sort tasks by dependencies (topological sort) sorted_tasks = self._topological_sort(tasks) # Queue tasks for task in sorted_tasks: self._queue_task(task) logger.info(f"Queued {len(sorted_tasks)} tasks") def _topological_sort(self, tasks: List[Task]) -> List[Task]: """ Sort tasks by dependencies using topological sort Args: tasks: List of tasks with dependencies Returns: Tasks sorted in dependency order """ # Build dependency graph task_map = {task.id: task for task in tasks} in_degree = {task.id: len(task.dependencies) for task in tasks} queue = [task_id for task_id, degree in in_degree.items() if degree == 0] result = [] while queue: # Sort by priority queue.sort(key=lambda tid: task_map[tid].priority, reverse=True) task_id = queue.pop(0) result.append(task_map[task_id]) # Update in-degree for dependent tasks for task in tasks: if task_id in task.dependencies: in_degree[task.id] -= 1 if in_degree[task.id] == 0: queue.append(task.id) # Check for circular dependencies if len(result) != len(tasks): logger.warning("Circular dependencies detected, returning partial sort") return result def _queue_task(self, task: Task): """ Queue a task for execution Args: task: Task to queue """ # Check if dependencies are complete if self._dependencies_complete(task): # Queue for immediate execution self.redis.lpush(self.task_queue, json.dumps(task.to_dict())) task.status = TaskStatus.QUEUED else: # Queue for later self.redis.lpush(self.pending_queue, json.dumps(task.to_dict())) # Store task status self.redis.hset(f"task:{task.id}", mapping=task.to_dict()) def _dependencies_complete(self, task: Task) -> bool: """ Check if task dependencies are complete Args: task: Task to check Returns: True if all dependencies are complete """ for dep_id in task.dependencies: if dep_id not in self.tasks: logger.warning(f"Unknown dependency: {dep_id}") return False dep_task = self.tasks[dep_id] if dep_task.status != TaskStatus.COMPLETE: return False return True def spawn_worker_agents(self, count: Optional[int] = None): """ Spawn worker agents for parallel execution Args: count: Number of agents to spawn (default from config) """ if count is None: count = self.max_workers logger.info(f"Spawning {count} worker agents") specializations = ['frontend', 'backend', 'testing', 'docs', 'refactor', 'analysis'] for i in range(count): agent_id = f"agent-{i}" specialization = specializations[i % len(specializations)] agent = AgentInfo( id=agent_id, specialization=specialization, status=AgentStatus.IDLE ) self.workers[agent_id] = agent self._start_worker_process(agent) # Store agent info in Redis self.redis.hset(f"agent:{agent_id}", mapping=agent.to_dict()) logger.info(f"Spawned {len(self.workers)} worker agents") def _start_worker_process(self, agent: AgentInfo): """ Start a worker agent process Args: agent: Agent info """ # This would start the actual worker process # For now, just log logger.info(f"Starting worker process: {agent.id} ({agent.specialization})") # Example: Would use subprocess to start worker # subprocess.Popen([ # 'claude-code', # '--mode', 'worker', # '--id', agent.id, # '--specialization', agent.specialization # ]) def monitor_tasks(self): """ Monitor task execution and handle failures Runs continuously until all tasks complete """ logger.info("Starting task monitoring") try: while True: # Check if all tasks complete if self._all_tasks_complete(): logger.info("All tasks completed") break # Check for failed tasks self._handle_failed_tasks() # Check for pending tasks ready to execute self._check_pending_tasks() # Update agent heartbeats self._update_heartbeats() # Check for stale agents self._check_stale_agents() time.sleep(1) except KeyboardInterrupt: logger.info("Monitoring interrupted by user") def _all_tasks_complete(self) -> bool: """Check if all tasks are complete""" return all( task.status in [TaskStatus.COMPLETE, TaskStatus.CANCELLED] for task in self.tasks.values() ) def _handle_failed_tasks(self): """Handle failed tasks with retry logic""" for task in self.tasks.values(): if task.status == TaskStatus.FAILED: if task.retry_count < task.max_retries: logger.info(f"Retrying task {task.id} (attempt {task.retry_count + 1})") task.retry_count += 1 task.status = TaskStatus.PENDING self._queue_task(task) else: logger.error(f"Task {task.id} failed after {task.max_retries} retries") def _check_pending_tasks(self): """Check if pending tasks can now be executed""" pending = self.redis.lrange(self.pending_queue, 0, -1) for task_json in pending: task = Task.from_dict(json.loads(task_json)) if self._dependencies_complete(task): # Move to main queue self.redis.lrem(self.pending_queue, 1, task_json) self.redis.lpush(self.task_queue, task_json) logger.info(f"Task {task.id} dependencies complete, queued") def _update_heartbeats(self): """Update agent heartbeats from Redis""" for agent_id in self.workers.keys(): agent_data = self.redis.hgetall(f"agent:{agent_id}") if agent_data: agent = self.workers[agent_id] agent.last_heartbeat = float(agent_data.get('last_heartbeat', time.time())) def _check_stale_agents(self): """Check for agents that haven't sent heartbeat""" timeout = self.config.get('agent_timeout', 300) now = time.time() for agent in self.workers.values(): if agent.status != AgentStatus.OFFLINE: if now - agent.last_heartbeat > timeout: logger.warning(f"Agent {agent.id} appears stale (last heartbeat {timeout}s ago)") agent.status = AgentStatus.OFFLINE def generate_report(self) -> Dict: """ Generate execution report Returns: Dictionary with execution statistics """ total_tasks = len(self.tasks) complete_tasks = sum(1 for t in self.tasks.values() if t.status == TaskStatus.COMPLETE) failed_tasks = sum(1 for t in self.tasks.values() if t.status == TaskStatus.FAILED) total_duration = max( (t.completed_at - t.created_at for t in self.tasks.values() if t.completed_at), default=0 ) report = { 'total_tasks': total_tasks, 'complete_tasks': complete_tasks, 'failed_tasks': failed_tasks, 'success_rate': complete_tasks / total_tasks if total_tasks > 0 else 0, 'total_duration_seconds': total_duration, 'worker_count': len(self.workers), 'tasks_by_type': self._count_tasks_by_type(), 'tasks_by_status': self._count_tasks_by_status(), } return report def _count_tasks_by_type(self) -> Dict[str, int]: """Count tasks by type""" counts = {} for task in self.tasks.values(): type_name = task.type.value counts[type_name] = counts.get(type_name, 0) + 1 return counts def _count_tasks_by_status(self) -> Dict[str, int]: """Count tasks by status""" counts = {} for task in self.tasks.values(): status_name = task.status.value counts[status_name] = counts.get(status_name, 0) + 1 return counts def main(): """Main entry point for CLI usage""" import argparse parser = argparse.ArgumentParser(description='Ralph Meta-Agent Orchestrator') parser.add_argument('requirements', help='Task requirements') parser.add_argument('--workers', type=int, help='Number of worker agents') parser.add_argument('--config', help='Config file path') args = parser.parse_args() # Load config config = {} if args.config: with open(args.config) as f: config = json.load(f) # Create orchestrator orchestrator = MetaAgent(config) # Analyze requirements tasks = orchestrator.analyze_project(args.requirements) # Distribute tasks orchestrator.distribute_tasks(tasks) # Spawn workers orchestrator.spawn_worker_agents(args.workers) # Monitor execution orchestrator.monitor_tasks() # Generate report report = orchestrator.generate_report() print("\n=== EXECUTION REPORT ===") print(json.dumps(report, indent=2)) if __name__ == '__main__': main()