Files
SuperCharged-Claude-Code-Up…/skills/ralph/worker_agent.py
uroma 87748afb75 feat: Complete sync of all Claude Code CLI upgrades
- Add all 21 commands (clawd, ralph, prometheus*, dexto*)
- Add all hooks (intelligent-router, clawd-*, prometheus-wrapper, unified-integration-v2)
- Add skills (ralph, prometheus master)
- Add MCP servers (registry.json, manager.sh)
- Add plugins directory with marketplaces
- Add health-check.sh and aliases.sh scripts
- Complete repository synchronization with local ~/.claude/

Total changes: 100+ new files added
All integrations now fully backed up in repository

🤖 Generated with Claude Code
Co-Authored-By: Claude <noreply@anthropic.com>
2026-01-27 20:39:25 +00:00

477 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Ralph Worker Agent
Implements specialized worker agents that execute tasks from the task queue.
Each agent has a specific specialization and handles file locking, task execution,
and progress reporting.
"""
import json
import os
import redis
import subprocess
import sys
import time
import hashlib
import logging
from typing import List, Dict, Optional, Set
from dataclasses import dataclass, asdict
from enum import Enum
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('ralph.worker')
class AgentSpecialization(Enum):
"""Worker agent specializations"""
FRONTEND = "frontend"
BACKEND = "backend"
TESTING = "testing"
DOCS = "docs"
REFACTOR = "refactor"
ANALYSIS = "analysis"
DEPLOYMENT = "deployment"
class TaskStatus(Enum):
"""Task execution status"""
PENDING = "pending"
RUNNING = "running"
COMPLETE = "complete"
FAILED = "failed"
@dataclass
class WorkerConfig:
"""Worker agent configuration"""
agent_id: str
specialization: AgentSpecialization
max_concurrent_tasks: int = 1
file_lock_timeout: int = 300
heartbeat_interval: int = 10
task_timeout: int = 300
max_retries: int = 3
log_level: str = "info"
class WorkerAgent:
"""
Specialized worker agent for Ralph Multi-Agent System
Executes tasks from the queue with:
- File locking to prevent conflicts
- Progress tracking and reporting
- Heartbeat monitoring
- Error handling and retry logic
"""
def __init__(self, config: WorkerConfig, redis_config: Dict):
"""Initialize the worker agent"""
self.config = config
self.redis = redis.Redis(
host=redis_config.get('host', 'localhost'),
port=redis_config.get('port', 6379),
db=redis_config.get('db', 0),
password=redis_config.get('password'),
decode_responses=True
)
# Queue names
self.task_queue = 'claude_tasks'
self.pending_queue = 'claude_tasks:pending'
self.complete_queue = 'claude_tasks:complete'
self.failed_queue = 'claude_tasks:failed'
# State
self.current_task = None
self.locked_files: Set[str] = set()
self.running = True
logger.info(f"Worker {config.agent_id} initialized ({config.specialization.value})")
def run(self):
"""Main worker loop"""
logger.info(f"Worker {self.config.agent_id} starting main loop")
# Register worker
self._register_worker()
try:
while self.running:
# Send heartbeat
self._send_heartbeat()
# Get task from queue
task = self._get_task()
if task:
# Check if we can handle this task
if self._can_handle(task):
logger.info(f"Worker {self.config.agent_id} accepted task {task['id']}")
self._execute_task(task)
else:
# Put it back for another agent
logger.info(f"Worker {self.config.agent_id} skipped task {task['id']} (not our specialization)")
self.redis.rpush(self.task_queue, json.dumps(task))
time.sleep(1)
else:
# No tasks, wait a bit
time.sleep(self.config.heartbeat_interval)
except KeyboardInterrupt:
logger.info(f"Worker {self.config.agent_id} interrupted by user")
finally:
self._cleanup()
def _register_worker(self):
"""Register worker in Redis"""
worker_data = {
'id': self.config.agent_id,
'specialization': self.config.specialization.value,
'status': 'idle',
'current_task': '',
'working_files': json.dumps([]),
'progress': '0',
'completed_count': '0',
'last_heartbeat': str(time.time())
}
self.redis.hset(f"agent:{self.config.agent_id}", mapping=worker_data)
logger.info(f"Worker {self.config.agent_id} registered")
def _send_heartbeat(self):
"""Send heartbeat to indicate worker is alive"""
self.redis.hset(
f"agent:{self.config.agent_id}",
'last_heartbeat',
str(time.time())
)
def _get_task(self) -> Optional[Dict]:
"""
Get task from queue with timeout
Returns:
Task dict or None
"""
result = self.redis.brpop(self.task_queue, timeout=self.config.heartbeat_interval)
if result:
_, task_json = result
return json.loads(task_json)
return None
def _can_handle(self, task: Dict) -> bool:
"""
Check if this agent can handle the task
Args:
task: Task dict
Returns:
True if agent can handle task
"""
# Check specialization
if task.get('specialization'):
return task['specialization'] == self.config.specialization.value
# Check task type matches our specialization
task_type = task.get('type', '')
specialization = self.config.specialization.value
# Map task types to specializations
type_mapping = {
'frontend': 'frontend',
'backend': 'backend',
'testing': 'testing',
'docs': 'docs',
'refactor': 'refactor',
'analysis': 'analysis',
'deployment': 'deployment'
}
return type_mapping.get(task_type) == specialization
def _execute_task(self, task: Dict):
"""
Execute a task with proper locking and error handling
Args:
task: Task dict
"""
task_id = task['id']
files = task.get('files', [])
logger.info(f"Worker {self.config.agent_id} executing task {task_id}")
# Update status
self._update_status(task_id, 'running', 0, files)
self.current_task = task_id
# Acquire file locks
locked_files = self._acquire_locks(files)
if not locked_files and files:
logger.warning(f"Could not acquire locks for task {task_id}, re-queuing")
self.redis.rpush(self.task_queue, json.dumps(task))
return
try:
# Set up Claude context
prompt = self._build_prompt(task)
# Execute with Claude
os.environ['CLAUDE_SESSION_ID'] = f"{self.config.agent_id}-{task_id}"
# Update progress
self._update_status(task_id, 'running', 25, files)
# Execute the task
result = self._run_claude(prompt, task)
# Update progress
self._update_status(task_id, 'running', 90, files)
# Mark as complete
self.redis.hset(f"task:{task_id}", 'status', 'complete')
self.redis.hset(f"task:{task_id}", 'result', result)
self.redis.lpush(self.complete_queue, json.dumps({
'task_id': task_id,
'agent_id': self.config.agent_id,
'result': result
}))
# Update final status
self._update_status(task_id, 'complete', 100, files)
# Increment completed count
current_count = int(self.redis.hget(f"agent:{self.config.agent_id}", 'completed_count') or 0)
self.redis.hset(f"agent:{self.config.agent_id}", 'completed_count', str(current_count + 1))
logger.info(f"Worker {self.config.agent_id} completed task {task_id}")
# Trigger dependent tasks
self._trigger_dependencies(task_id)
except Exception as e:
logger.error(f"Worker {self.config.agent_id} failed task {task_id}: {e}")
# Mark as failed
self.redis.hset(f"task:{task_id}", 'status', 'failed')
self.redis.hset(f"task:{task_id}", 'error', str(e))
self.redis.lpush(self.failed_queue, json.dumps({
'task_id': task_id,
'agent_id': self.config.agent_id,
'error': str(e)
}))
# Update status
self._update_status(task_id, 'failed', 0, files)
finally:
# Release locks
self._release_locks(locked_files)
self.current_task = None
self._update_status('', 'idle', 0, [])
def _acquire_locks(self, files: List[str]) -> List[str]:
"""
Acquire exclusive locks on files
Args:
files: List of file paths to lock
Returns:
List of successfully locked files
"""
if not files:
return []
locked = []
for file_path in files:
lock_key = f"lock:{file_path}"
# Try to acquire lock with timeout
acquired = self.redis.set(
lock_key,
self.config.agent_id,
nx=True,
ex=self.config.file_lock_timeout
)
if acquired:
locked.append(file_path)
else:
# Couldn't get lock, release all and retry
logger.warning(f"Could not acquire lock for {file_path}")
self._release_locks(locked)
time.sleep(2)
return self._acquire_locks(files)
logger.info(f"Acquired locks for {len(locked)} files")
return locked
def _release_locks(self, files: List[str]):
"""
Release file locks
Args:
files: List of file paths to unlock
"""
for file_path in files:
lock_key = f"lock:{file_path}"
# Only release if we own it
owner = self.redis.get(lock_key)
if owner == self.config.agent_id:
self.redis.delete(lock_key)
if files:
logger.info(f"Released locks for {len(files)} files")
def _build_prompt(self, task: Dict) -> str:
"""
Build Claude prompt from task
Args:
task: Task dict
Returns:
Prompt string for Claude
"""
prompt = f"""You are a specialized AI agent working on task: {task['id']}
TASK DESCRIPTION:
{task['description']}
TASK TYPE: {task.get('type', 'unknown')}
SPECIALIZATION: {task.get('specialization', 'none')}
FILES TO MODIFY:
{chr(10).join(task.get('files', ['No files specified']))}
CONTEXT:
- This is part of a multi-agent orchestration system
- Other agents may be working on related tasks
- Focus only on your specific task
- Report progress clearly
Execute this task efficiently and report your results.
"""
return prompt
def _run_claude(self, prompt: str, task: Dict) -> str:
"""
Execute task using Claude
Args:
prompt: Prompt for Claude
task: Task dict
Returns:
Result string
"""
# This would integrate with Claude Code API
# For now, simulate execution
logger.info(f"Executing Claude task: {task['id']}")
# Simulate work
time.sleep(2)
# Return mock result
return f"Task {task['id']} completed successfully by {self.config.agent_id}"
def _update_status(self, task_id: str, status: str, progress: float, files: List[str]):
"""
Update agent status in Redis
Args:
task_id: Current task ID
status: Agent status
progress: Task progress (0-100)
files: Files being worked on
"""
update_data = {
'status': status,
'current_task': task_id,
'progress': str(progress),
'working_files': json.dumps(files),
'last_heartbeat': str(time.time())
}
self.redis.hset(f"agent:{self.config.agent_id}", mapping=update_data)
def _trigger_dependencies(self, task_id: str):
"""
Check and trigger tasks that depend on completed task
Args:
task_id: Completed task ID
"""
# Get all pending tasks
pending = self.redis.lrange(self.pending_queue, 0, -1)
for task_json in pending:
task = json.loads(task_json)
# Check if this task depends on the completed task
if task_id in task.get('dependencies', []):
# Check if all dependencies are now complete
deps_complete = all(
self.redis.hget(f"task:{dep}", 'status') == 'complete'
for dep in task.get('dependencies', [])
)
if deps_complete:
# Move to main queue
self.redis.lrem(self.pending_queue, 1, task_json)
self.redis.lpush(self.task_queue, task_json)
logger.info(f"Triggered task {task['id']} (dependencies complete)")
def _cleanup(self):
"""Clean up resources"""
# Release all locks
self._release_locks(list(self.locked_files))
# Update status to offline
self.redis.hset(f"agent:{self.config.agent_id}", 'status', 'offline')
logger.info(f"Worker {self.config.agent_id} cleaned up")
def main():
"""Main entry point for CLI usage"""
import argparse
parser = argparse.ArgumentParser(description='Ralph Worker Agent')
parser.add_argument('--id', required=True, help='Agent ID')
parser.add_argument('--specialization', required=True, choices=[s.value for s in AgentSpecialization],
help='Agent specialization')
parser.add_argument('--redis-host', default='localhost', help='Redis host')
parser.add_argument('--redis-port', type=int, default=6379, help='Redis port')
parser.add_argument('--max-concurrent', type=int, default=1, help='Max concurrent tasks')
args = parser.parse_args()
# Create config
config = WorkerConfig(
agent_id=args.id,
specialization=AgentSpecialization(args.specialization),
max_concurrent_tasks=args.max_concurrent
)
redis_config = {
'host': args.redis_host,
'port': args.redis_port
}
# Create and run worker
worker = WorkerAgent(config, redis_config)
worker.run()
if __name__ == '__main__':
main()