- Add all 21 commands (clawd, ralph, prometheus*, dexto*) - Add all hooks (intelligent-router, clawd-*, prometheus-wrapper, unified-integration-v2) - Add skills (ralph, prometheus master) - Add MCP servers (registry.json, manager.sh) - Add plugins directory with marketplaces - Add health-check.sh and aliases.sh scripts - Complete repository synchronization with local ~/.claude/ Total changes: 100+ new files added All integrations now fully backed up in repository 🤖 Generated with Claude Code Co-Authored-By: Claude <noreply@anthropic.com>
477 lines
14 KiB
Python
477 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Ralph Worker Agent
|
|
|
|
Implements specialized worker agents that execute tasks from the task queue.
|
|
Each agent has a specific specialization and handles file locking, task execution,
|
|
and progress reporting.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import redis
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import hashlib
|
|
import logging
|
|
from typing import List, Dict, Optional, Set
|
|
from dataclasses import dataclass, asdict
|
|
from enum import Enum
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger('ralph.worker')
|
|
|
|
|
|
class AgentSpecialization(Enum):
|
|
"""Worker agent specializations"""
|
|
FRONTEND = "frontend"
|
|
BACKEND = "backend"
|
|
TESTING = "testing"
|
|
DOCS = "docs"
|
|
REFACTOR = "refactor"
|
|
ANALYSIS = "analysis"
|
|
DEPLOYMENT = "deployment"
|
|
|
|
|
|
class TaskStatus(Enum):
|
|
"""Task execution status"""
|
|
PENDING = "pending"
|
|
RUNNING = "running"
|
|
COMPLETE = "complete"
|
|
FAILED = "failed"
|
|
|
|
|
|
@dataclass
|
|
class WorkerConfig:
|
|
"""Worker agent configuration"""
|
|
agent_id: str
|
|
specialization: AgentSpecialization
|
|
max_concurrent_tasks: int = 1
|
|
file_lock_timeout: int = 300
|
|
heartbeat_interval: int = 10
|
|
task_timeout: int = 300
|
|
max_retries: int = 3
|
|
log_level: str = "info"
|
|
|
|
|
|
class WorkerAgent:
|
|
"""
|
|
Specialized worker agent for Ralph Multi-Agent System
|
|
|
|
Executes tasks from the queue with:
|
|
- File locking to prevent conflicts
|
|
- Progress tracking and reporting
|
|
- Heartbeat monitoring
|
|
- Error handling and retry logic
|
|
"""
|
|
|
|
def __init__(self, config: WorkerConfig, redis_config: Dict):
|
|
"""Initialize the worker agent"""
|
|
self.config = config
|
|
self.redis = redis.Redis(
|
|
host=redis_config.get('host', 'localhost'),
|
|
port=redis_config.get('port', 6379),
|
|
db=redis_config.get('db', 0),
|
|
password=redis_config.get('password'),
|
|
decode_responses=True
|
|
)
|
|
|
|
# Queue names
|
|
self.task_queue = 'claude_tasks'
|
|
self.pending_queue = 'claude_tasks:pending'
|
|
self.complete_queue = 'claude_tasks:complete'
|
|
self.failed_queue = 'claude_tasks:failed'
|
|
|
|
# State
|
|
self.current_task = None
|
|
self.locked_files: Set[str] = set()
|
|
self.running = True
|
|
|
|
logger.info(f"Worker {config.agent_id} initialized ({config.specialization.value})")
|
|
|
|
def run(self):
|
|
"""Main worker loop"""
|
|
logger.info(f"Worker {self.config.agent_id} starting main loop")
|
|
|
|
# Register worker
|
|
self._register_worker()
|
|
|
|
try:
|
|
while self.running:
|
|
# Send heartbeat
|
|
self._send_heartbeat()
|
|
|
|
# Get task from queue
|
|
task = self._get_task()
|
|
|
|
if task:
|
|
# Check if we can handle this task
|
|
if self._can_handle(task):
|
|
logger.info(f"Worker {self.config.agent_id} accepted task {task['id']}")
|
|
self._execute_task(task)
|
|
else:
|
|
# Put it back for another agent
|
|
logger.info(f"Worker {self.config.agent_id} skipped task {task['id']} (not our specialization)")
|
|
self.redis.rpush(self.task_queue, json.dumps(task))
|
|
time.sleep(1)
|
|
else:
|
|
# No tasks, wait a bit
|
|
time.sleep(self.config.heartbeat_interval)
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info(f"Worker {self.config.agent_id} interrupted by user")
|
|
finally:
|
|
self._cleanup()
|
|
|
|
def _register_worker(self):
|
|
"""Register worker in Redis"""
|
|
worker_data = {
|
|
'id': self.config.agent_id,
|
|
'specialization': self.config.specialization.value,
|
|
'status': 'idle',
|
|
'current_task': '',
|
|
'working_files': json.dumps([]),
|
|
'progress': '0',
|
|
'completed_count': '0',
|
|
'last_heartbeat': str(time.time())
|
|
}
|
|
self.redis.hset(f"agent:{self.config.agent_id}", mapping=worker_data)
|
|
logger.info(f"Worker {self.config.agent_id} registered")
|
|
|
|
def _send_heartbeat(self):
|
|
"""Send heartbeat to indicate worker is alive"""
|
|
self.redis.hset(
|
|
f"agent:{self.config.agent_id}",
|
|
'last_heartbeat',
|
|
str(time.time())
|
|
)
|
|
|
|
def _get_task(self) -> Optional[Dict]:
|
|
"""
|
|
Get task from queue with timeout
|
|
|
|
Returns:
|
|
Task dict or None
|
|
"""
|
|
result = self.redis.brpop(self.task_queue, timeout=self.config.heartbeat_interval)
|
|
if result:
|
|
_, task_json = result
|
|
return json.loads(task_json)
|
|
return None
|
|
|
|
def _can_handle(self, task: Dict) -> bool:
|
|
"""
|
|
Check if this agent can handle the task
|
|
|
|
Args:
|
|
task: Task dict
|
|
|
|
Returns:
|
|
True if agent can handle task
|
|
"""
|
|
# Check specialization
|
|
if task.get('specialization'):
|
|
return task['specialization'] == self.config.specialization.value
|
|
|
|
# Check task type matches our specialization
|
|
task_type = task.get('type', '')
|
|
specialization = self.config.specialization.value
|
|
|
|
# Map task types to specializations
|
|
type_mapping = {
|
|
'frontend': 'frontend',
|
|
'backend': 'backend',
|
|
'testing': 'testing',
|
|
'docs': 'docs',
|
|
'refactor': 'refactor',
|
|
'analysis': 'analysis',
|
|
'deployment': 'deployment'
|
|
}
|
|
|
|
return type_mapping.get(task_type) == specialization
|
|
|
|
def _execute_task(self, task: Dict):
|
|
"""
|
|
Execute a task with proper locking and error handling
|
|
|
|
Args:
|
|
task: Task dict
|
|
"""
|
|
task_id = task['id']
|
|
files = task.get('files', [])
|
|
|
|
logger.info(f"Worker {self.config.agent_id} executing task {task_id}")
|
|
|
|
# Update status
|
|
self._update_status(task_id, 'running', 0, files)
|
|
self.current_task = task_id
|
|
|
|
# Acquire file locks
|
|
locked_files = self._acquire_locks(files)
|
|
|
|
if not locked_files and files:
|
|
logger.warning(f"Could not acquire locks for task {task_id}, re-queuing")
|
|
self.redis.rpush(self.task_queue, json.dumps(task))
|
|
return
|
|
|
|
try:
|
|
# Set up Claude context
|
|
prompt = self._build_prompt(task)
|
|
|
|
# Execute with Claude
|
|
os.environ['CLAUDE_SESSION_ID'] = f"{self.config.agent_id}-{task_id}"
|
|
|
|
# Update progress
|
|
self._update_status(task_id, 'running', 25, files)
|
|
|
|
# Execute the task
|
|
result = self._run_claude(prompt, task)
|
|
|
|
# Update progress
|
|
self._update_status(task_id, 'running', 90, files)
|
|
|
|
# Mark as complete
|
|
self.redis.hset(f"task:{task_id}", 'status', 'complete')
|
|
self.redis.hset(f"task:{task_id}", 'result', result)
|
|
self.redis.lpush(self.complete_queue, json.dumps({
|
|
'task_id': task_id,
|
|
'agent_id': self.config.agent_id,
|
|
'result': result
|
|
}))
|
|
|
|
# Update final status
|
|
self._update_status(task_id, 'complete', 100, files)
|
|
|
|
# Increment completed count
|
|
current_count = int(self.redis.hget(f"agent:{self.config.agent_id}", 'completed_count') or 0)
|
|
self.redis.hset(f"agent:{self.config.agent_id}", 'completed_count', str(current_count + 1))
|
|
|
|
logger.info(f"Worker {self.config.agent_id} completed task {task_id}")
|
|
|
|
# Trigger dependent tasks
|
|
self._trigger_dependencies(task_id)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Worker {self.config.agent_id} failed task {task_id}: {e}")
|
|
|
|
# Mark as failed
|
|
self.redis.hset(f"task:{task_id}", 'status', 'failed')
|
|
self.redis.hset(f"task:{task_id}", 'error', str(e))
|
|
self.redis.lpush(self.failed_queue, json.dumps({
|
|
'task_id': task_id,
|
|
'agent_id': self.config.agent_id,
|
|
'error': str(e)
|
|
}))
|
|
|
|
# Update status
|
|
self._update_status(task_id, 'failed', 0, files)
|
|
|
|
finally:
|
|
# Release locks
|
|
self._release_locks(locked_files)
|
|
self.current_task = None
|
|
self._update_status('', 'idle', 0, [])
|
|
|
|
def _acquire_locks(self, files: List[str]) -> List[str]:
|
|
"""
|
|
Acquire exclusive locks on files
|
|
|
|
Args:
|
|
files: List of file paths to lock
|
|
|
|
Returns:
|
|
List of successfully locked files
|
|
"""
|
|
if not files:
|
|
return []
|
|
|
|
locked = []
|
|
for file_path in files:
|
|
lock_key = f"lock:{file_path}"
|
|
|
|
# Try to acquire lock with timeout
|
|
acquired = self.redis.set(
|
|
lock_key,
|
|
self.config.agent_id,
|
|
nx=True,
|
|
ex=self.config.file_lock_timeout
|
|
)
|
|
|
|
if acquired:
|
|
locked.append(file_path)
|
|
else:
|
|
# Couldn't get lock, release all and retry
|
|
logger.warning(f"Could not acquire lock for {file_path}")
|
|
self._release_locks(locked)
|
|
time.sleep(2)
|
|
return self._acquire_locks(files)
|
|
|
|
logger.info(f"Acquired locks for {len(locked)} files")
|
|
return locked
|
|
|
|
def _release_locks(self, files: List[str]):
|
|
"""
|
|
Release file locks
|
|
|
|
Args:
|
|
files: List of file paths to unlock
|
|
"""
|
|
for file_path in files:
|
|
lock_key = f"lock:{file_path}"
|
|
|
|
# Only release if we own it
|
|
owner = self.redis.get(lock_key)
|
|
if owner == self.config.agent_id:
|
|
self.redis.delete(lock_key)
|
|
|
|
if files:
|
|
logger.info(f"Released locks for {len(files)} files")
|
|
|
|
def _build_prompt(self, task: Dict) -> str:
|
|
"""
|
|
Build Claude prompt from task
|
|
|
|
Args:
|
|
task: Task dict
|
|
|
|
Returns:
|
|
Prompt string for Claude
|
|
"""
|
|
prompt = f"""You are a specialized AI agent working on task: {task['id']}
|
|
|
|
TASK DESCRIPTION:
|
|
{task['description']}
|
|
|
|
TASK TYPE: {task.get('type', 'unknown')}
|
|
SPECIALIZATION: {task.get('specialization', 'none')}
|
|
|
|
FILES TO MODIFY:
|
|
{chr(10).join(task.get('files', ['No files specified']))}
|
|
|
|
CONTEXT:
|
|
- This is part of a multi-agent orchestration system
|
|
- Other agents may be working on related tasks
|
|
- Focus only on your specific task
|
|
- Report progress clearly
|
|
|
|
Execute this task efficiently and report your results.
|
|
"""
|
|
return prompt
|
|
|
|
def _run_claude(self, prompt: str, task: Dict) -> str:
|
|
"""
|
|
Execute task using Claude
|
|
|
|
Args:
|
|
prompt: Prompt for Claude
|
|
task: Task dict
|
|
|
|
Returns:
|
|
Result string
|
|
"""
|
|
# This would integrate with Claude Code API
|
|
# For now, simulate execution
|
|
logger.info(f"Executing Claude task: {task['id']}")
|
|
|
|
# Simulate work
|
|
time.sleep(2)
|
|
|
|
# Return mock result
|
|
return f"Task {task['id']} completed successfully by {self.config.agent_id}"
|
|
|
|
def _update_status(self, task_id: str, status: str, progress: float, files: List[str]):
|
|
"""
|
|
Update agent status in Redis
|
|
|
|
Args:
|
|
task_id: Current task ID
|
|
status: Agent status
|
|
progress: Task progress (0-100)
|
|
files: Files being worked on
|
|
"""
|
|
update_data = {
|
|
'status': status,
|
|
'current_task': task_id,
|
|
'progress': str(progress),
|
|
'working_files': json.dumps(files),
|
|
'last_heartbeat': str(time.time())
|
|
}
|
|
self.redis.hset(f"agent:{self.config.agent_id}", mapping=update_data)
|
|
|
|
def _trigger_dependencies(self, task_id: str):
|
|
"""
|
|
Check and trigger tasks that depend on completed task
|
|
|
|
Args:
|
|
task_id: Completed task ID
|
|
"""
|
|
# Get all pending tasks
|
|
pending = self.redis.lrange(self.pending_queue, 0, -1)
|
|
|
|
for task_json in pending:
|
|
task = json.loads(task_json)
|
|
|
|
# Check if this task depends on the completed task
|
|
if task_id in task.get('dependencies', []):
|
|
# Check if all dependencies are now complete
|
|
deps_complete = all(
|
|
self.redis.hget(f"task:{dep}", 'status') == 'complete'
|
|
for dep in task.get('dependencies', [])
|
|
)
|
|
|
|
if deps_complete:
|
|
# Move to main queue
|
|
self.redis.lrem(self.pending_queue, 1, task_json)
|
|
self.redis.lpush(self.task_queue, task_json)
|
|
logger.info(f"Triggered task {task['id']} (dependencies complete)")
|
|
|
|
def _cleanup(self):
|
|
"""Clean up resources"""
|
|
# Release all locks
|
|
self._release_locks(list(self.locked_files))
|
|
|
|
# Update status to offline
|
|
self.redis.hset(f"agent:{self.config.agent_id}", 'status', 'offline')
|
|
|
|
logger.info(f"Worker {self.config.agent_id} cleaned up")
|
|
|
|
|
|
def main():
|
|
"""Main entry point for CLI usage"""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description='Ralph Worker Agent')
|
|
parser.add_argument('--id', required=True, help='Agent ID')
|
|
parser.add_argument('--specialization', required=True, choices=[s.value for s in AgentSpecialization],
|
|
help='Agent specialization')
|
|
parser.add_argument('--redis-host', default='localhost', help='Redis host')
|
|
parser.add_argument('--redis-port', type=int, default=6379, help='Redis port')
|
|
parser.add_argument('--max-concurrent', type=int, default=1, help='Max concurrent tasks')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Create config
|
|
config = WorkerConfig(
|
|
agent_id=args.id,
|
|
specialization=AgentSpecialization(args.specialization),
|
|
max_concurrent_tasks=args.max_concurrent
|
|
)
|
|
|
|
redis_config = {
|
|
'host': args.redis_host,
|
|
'port': args.redis_port
|
|
}
|
|
|
|
# Create and run worker
|
|
worker = WorkerAgent(config, redis_config)
|
|
worker.run()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|