#!/usr/bin/env python3 """ Ralph Worker Agent Implements specialized worker agents that execute tasks from the task queue. Each agent has a specific specialization and handles file locking, task execution, and progress reporting. """ import json import os import redis import subprocess import sys import time import hashlib import logging from typing import List, Dict, Optional, Set from dataclasses import dataclass, asdict from enum import Enum # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('ralph.worker') class AgentSpecialization(Enum): """Worker agent specializations""" FRONTEND = "frontend" BACKEND = "backend" TESTING = "testing" DOCS = "docs" REFACTOR = "refactor" ANALYSIS = "analysis" DEPLOYMENT = "deployment" class TaskStatus(Enum): """Task execution status""" PENDING = "pending" RUNNING = "running" COMPLETE = "complete" FAILED = "failed" @dataclass class WorkerConfig: """Worker agent configuration""" agent_id: str specialization: AgentSpecialization max_concurrent_tasks: int = 1 file_lock_timeout: int = 300 heartbeat_interval: int = 10 task_timeout: int = 300 max_retries: int = 3 log_level: str = "info" class WorkerAgent: """ Specialized worker agent for Ralph Multi-Agent System Executes tasks from the queue with: - File locking to prevent conflicts - Progress tracking and reporting - Heartbeat monitoring - Error handling and retry logic """ def __init__(self, config: WorkerConfig, redis_config: Dict): """Initialize the worker agent""" self.config = config self.redis = redis.Redis( host=redis_config.get('host', 'localhost'), port=redis_config.get('port', 6379), db=redis_config.get('db', 0), password=redis_config.get('password'), decode_responses=True ) # Queue names self.task_queue = 'claude_tasks' self.pending_queue = 'claude_tasks:pending' self.complete_queue = 'claude_tasks:complete' self.failed_queue = 'claude_tasks:failed' # State self.current_task = None self.locked_files: Set[str] = set() self.running = True logger.info(f"Worker {config.agent_id} initialized ({config.specialization.value})") def run(self): """Main worker loop""" logger.info(f"Worker {self.config.agent_id} starting main loop") # Register worker self._register_worker() try: while self.running: # Send heartbeat self._send_heartbeat() # Get task from queue task = self._get_task() if task: # Check if we can handle this task if self._can_handle(task): logger.info(f"Worker {self.config.agent_id} accepted task {task['id']}") self._execute_task(task) else: # Put it back for another agent logger.info(f"Worker {self.config.agent_id} skipped task {task['id']} (not our specialization)") self.redis.rpush(self.task_queue, json.dumps(task)) time.sleep(1) else: # No tasks, wait a bit time.sleep(self.config.heartbeat_interval) except KeyboardInterrupt: logger.info(f"Worker {self.config.agent_id} interrupted by user") finally: self._cleanup() def _register_worker(self): """Register worker in Redis""" worker_data = { 'id': self.config.agent_id, 'specialization': self.config.specialization.value, 'status': 'idle', 'current_task': '', 'working_files': json.dumps([]), 'progress': '0', 'completed_count': '0', 'last_heartbeat': str(time.time()) } self.redis.hset(f"agent:{self.config.agent_id}", mapping=worker_data) logger.info(f"Worker {self.config.agent_id} registered") def _send_heartbeat(self): """Send heartbeat to indicate worker is alive""" self.redis.hset( f"agent:{self.config.agent_id}", 'last_heartbeat', str(time.time()) ) def _get_task(self) -> Optional[Dict]: """ Get task from queue with timeout Returns: Task dict or None """ result = self.redis.brpop(self.task_queue, timeout=self.config.heartbeat_interval) if result: _, task_json = result return json.loads(task_json) return None def _can_handle(self, task: Dict) -> bool: """ Check if this agent can handle the task Args: task: Task dict Returns: True if agent can handle task """ # Check specialization if task.get('specialization'): return task['specialization'] == self.config.specialization.value # Check task type matches our specialization task_type = task.get('type', '') specialization = self.config.specialization.value # Map task types to specializations type_mapping = { 'frontend': 'frontend', 'backend': 'backend', 'testing': 'testing', 'docs': 'docs', 'refactor': 'refactor', 'analysis': 'analysis', 'deployment': 'deployment' } return type_mapping.get(task_type) == specialization def _execute_task(self, task: Dict): """ Execute a task with proper locking and error handling Args: task: Task dict """ task_id = task['id'] files = task.get('files', []) logger.info(f"Worker {self.config.agent_id} executing task {task_id}") # Update status self._update_status(task_id, 'running', 0, files) self.current_task = task_id # Acquire file locks locked_files = self._acquire_locks(files) if not locked_files and files: logger.warning(f"Could not acquire locks for task {task_id}, re-queuing") self.redis.rpush(self.task_queue, json.dumps(task)) return try: # Set up Claude context prompt = self._build_prompt(task) # Execute with Claude os.environ['CLAUDE_SESSION_ID'] = f"{self.config.agent_id}-{task_id}" # Update progress self._update_status(task_id, 'running', 25, files) # Execute the task result = self._run_claude(prompt, task) # Update progress self._update_status(task_id, 'running', 90, files) # Mark as complete self.redis.hset(f"task:{task_id}", 'status', 'complete') self.redis.hset(f"task:{task_id}", 'result', result) self.redis.lpush(self.complete_queue, json.dumps({ 'task_id': task_id, 'agent_id': self.config.agent_id, 'result': result })) # Update final status self._update_status(task_id, 'complete', 100, files) # Increment completed count current_count = int(self.redis.hget(f"agent:{self.config.agent_id}", 'completed_count') or 0) self.redis.hset(f"agent:{self.config.agent_id}", 'completed_count', str(current_count + 1)) logger.info(f"Worker {self.config.agent_id} completed task {task_id}") # Trigger dependent tasks self._trigger_dependencies(task_id) except Exception as e: logger.error(f"Worker {self.config.agent_id} failed task {task_id}: {e}") # Mark as failed self.redis.hset(f"task:{task_id}", 'status', 'failed') self.redis.hset(f"task:{task_id}", 'error', str(e)) self.redis.lpush(self.failed_queue, json.dumps({ 'task_id': task_id, 'agent_id': self.config.agent_id, 'error': str(e) })) # Update status self._update_status(task_id, 'failed', 0, files) finally: # Release locks self._release_locks(locked_files) self.current_task = None self._update_status('', 'idle', 0, []) def _acquire_locks(self, files: List[str]) -> List[str]: """ Acquire exclusive locks on files Args: files: List of file paths to lock Returns: List of successfully locked files """ if not files: return [] locked = [] for file_path in files: lock_key = f"lock:{file_path}" # Try to acquire lock with timeout acquired = self.redis.set( lock_key, self.config.agent_id, nx=True, ex=self.config.file_lock_timeout ) if acquired: locked.append(file_path) else: # Couldn't get lock, release all and retry logger.warning(f"Could not acquire lock for {file_path}") self._release_locks(locked) time.sleep(2) return self._acquire_locks(files) logger.info(f"Acquired locks for {len(locked)} files") return locked def _release_locks(self, files: List[str]): """ Release file locks Args: files: List of file paths to unlock """ for file_path in files: lock_key = f"lock:{file_path}" # Only release if we own it owner = self.redis.get(lock_key) if owner == self.config.agent_id: self.redis.delete(lock_key) if files: logger.info(f"Released locks for {len(files)} files") def _build_prompt(self, task: Dict) -> str: """ Build Claude prompt from task Args: task: Task dict Returns: Prompt string for Claude """ prompt = f"""You are a specialized AI agent working on task: {task['id']} TASK DESCRIPTION: {task['description']} TASK TYPE: {task.get('type', 'unknown')} SPECIALIZATION: {task.get('specialization', 'none')} FILES TO MODIFY: {chr(10).join(task.get('files', ['No files specified']))} CONTEXT: - This is part of a multi-agent orchestration system - Other agents may be working on related tasks - Focus only on your specific task - Report progress clearly Execute this task efficiently and report your results. """ return prompt def _run_claude(self, prompt: str, task: Dict) -> str: """ Execute task using Claude Args: prompt: Prompt for Claude task: Task dict Returns: Result string """ # This would integrate with Claude Code API # For now, simulate execution logger.info(f"Executing Claude task: {task['id']}") # Simulate work time.sleep(2) # Return mock result return f"Task {task['id']} completed successfully by {self.config.agent_id}" def _update_status(self, task_id: str, status: str, progress: float, files: List[str]): """ Update agent status in Redis Args: task_id: Current task ID status: Agent status progress: Task progress (0-100) files: Files being worked on """ update_data = { 'status': status, 'current_task': task_id, 'progress': str(progress), 'working_files': json.dumps(files), 'last_heartbeat': str(time.time()) } self.redis.hset(f"agent:{self.config.agent_id}", mapping=update_data) def _trigger_dependencies(self, task_id: str): """ Check and trigger tasks that depend on completed task Args: task_id: Completed task ID """ # Get all pending tasks pending = self.redis.lrange(self.pending_queue, 0, -1) for task_json in pending: task = json.loads(task_json) # Check if this task depends on the completed task if task_id in task.get('dependencies', []): # Check if all dependencies are now complete deps_complete = all( self.redis.hget(f"task:{dep}", 'status') == 'complete' for dep in task.get('dependencies', []) ) if deps_complete: # Move to main queue self.redis.lrem(self.pending_queue, 1, task_json) self.redis.lpush(self.task_queue, task_json) logger.info(f"Triggered task {task['id']} (dependencies complete)") def _cleanup(self): """Clean up resources""" # Release all locks self._release_locks(list(self.locked_files)) # Update status to offline self.redis.hset(f"agent:{self.config.agent_id}", 'status', 'offline') logger.info(f"Worker {self.config.agent_id} cleaned up") def main(): """Main entry point for CLI usage""" import argparse parser = argparse.ArgumentParser(description='Ralph Worker Agent') parser.add_argument('--id', required=True, help='Agent ID') parser.add_argument('--specialization', required=True, choices=[s.value for s in AgentSpecialization], help='Agent specialization') parser.add_argument('--redis-host', default='localhost', help='Redis host') parser.add_argument('--redis-port', type=int, default=6379, help='Redis port') parser.add_argument('--max-concurrent', type=int, default=1, help='Max concurrent tasks') args = parser.parse_args() # Create config config = WorkerConfig( agent_id=args.id, specialization=AgentSpecialization(args.specialization), max_concurrent_tasks=args.max_concurrent ) redis_config = { 'host': args.redis_host, 'port': args.redis_port } # Create and run worker worker = WorkerAgent(config, redis_config) worker.run() if __name__ == '__main__': main()