## Ralph Skill - Complete Python Implementation - __main__.py: Main entry point for Ralph autonomous agent - agent_capability_registry.py: Agent capability registry (FIXED syntax error) - dynamic_agent_selector.py: Dynamic agent selection logic - meta_agent_orchestrator.py: Meta-orchestration for multi-agent workflows - worker_agent.py: Worker agent implementation - ralph_agent_integration.py: Integration with Claude Code - superpowers_integration.py: Superpowers framework integration - observability_dashboard.html: Real-time observability UI - observability_server.py: Dashboard server - multi-agent-architecture.md: Architecture documentation - SUPERPOWERS_INTEGRATION.md: Integration guide ## Framework Integration Status - ✅ codebase-indexer (Chippery): Complete implementation with 5 scripts - ✅ ralph (Ralph Orchestrator): Complete Python implementation - ✅ always-use-superpowers: Declarative skill (SKILL.md) - ✅ auto-superpowers: Declarative skill (SKILL.md) - ✅ auto-dispatcher: Declarative skill (Ralph framework) - ✅ autonomous-planning: Declarative skill (Ralph framework) - ✅ mcp-client: Declarative skill (AGIAgent/Agno framework) ## Agent Updates - Updated README.md with latest integration status - Added framework integration agents Token Savings: ~99% via semantic codebase indexing 🤖 Generated with Claude Code Co-Authored-By: Claude <noreply@anthropic.com>
546 lines
19 KiB
Python
Executable File
546 lines
19 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Ralph Agent Integration System
|
|
|
|
Main integration layer that ties together:
|
|
- Agent Capability Registry
|
|
- Dynamic Agent Selector
|
|
- Real-Time Need Detection
|
|
- Multi-Agent Orchestration
|
|
- Performance Tracking
|
|
|
|
This system allows Ralph to automatically select and delegate to the most
|
|
appropriate contains-studio agent based on real-time task analysis.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
import subprocess
|
|
import logging
|
|
from typing import Dict, List, Optional, Any, Set
|
|
from dataclasses import dataclass, field, asdict
|
|
from enum import Enum
|
|
from datetime import datetime
|
|
|
|
# Add parent directory to path for imports
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from agent_capability_registry import AgentCapabilityRegistry, AgentCategory
|
|
from dynamic_agent_selector import DynamicAgentSelector, SelectionRequest, TaskContext, RealTimeAnalyzer
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger('ralph.integration')
|
|
|
|
|
|
@dataclass
|
|
class AgentDelegation:
|
|
"""Record of an agent delegation"""
|
|
task_id: str
|
|
agent_name: str
|
|
user_request: str
|
|
delegated_at: float
|
|
started_at: Optional[float] = None
|
|
completed_at: Optional[float] = None
|
|
status: str = "pending" # pending, running, completed, failed
|
|
result: Optional[str] = None
|
|
error: Optional[str] = None
|
|
satisfaction_score: Optional[float] = None
|
|
|
|
|
|
@dataclass
|
|
class RalphContext:
|
|
"""Persistent context for Ralph's decision making"""
|
|
current_task: Optional[str] = None
|
|
task_phase: str = "planning"
|
|
files_modified: List[str] = field(default_factory=list)
|
|
files_touched: List[str] = field(default_factory=set)
|
|
active_agents: Set[str] = field(default_factory=set)
|
|
delegation_history: List[AgentDelegation] = field(default_factory=list)
|
|
performance_scores: Dict[str, List[float]] = field(default_factory=dict)
|
|
project_type: Optional[str] = None
|
|
session_start: float = field(default_factory=time.time)
|
|
|
|
def to_dict(self) -> Dict:
|
|
"""Convert to dictionary for serialization"""
|
|
data = asdict(self)
|
|
data['active_agents'] = list(data['active_agents'])
|
|
return data
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict) -> 'RalphContext':
|
|
"""Create from dictionary"""
|
|
data['active_agents'] = set(data.get('active_agents', []))
|
|
return cls(**data)
|
|
|
|
|
|
class RalphAgentIntegration:
|
|
"""
|
|
Main integration system for Ralph's agent orchestration
|
|
|
|
Responsibilities:
|
|
- Analyze user requests in real-time
|
|
- Select appropriate specialized agents
|
|
- Delegate tasks and track execution
|
|
- Monitor performance and adapt
|
|
- Coordinate multi-agent workflows
|
|
"""
|
|
|
|
def __init__(self, agents_dir: Optional[str] = None, context_file: Optional[str] = None):
|
|
"""Initialize the integration system"""
|
|
# Initialize components
|
|
self.registry = AgentCapabilityRegistry(agents_dir)
|
|
self.selector = DynamicAgentSelector(self.registry)
|
|
self.analyzer = RealTimeAnalyzer()
|
|
|
|
# Load or create context
|
|
self.context_file = context_file or '.ralph/context.json'
|
|
self.context = self._load_context()
|
|
|
|
# Active delegations
|
|
self.active_delegations: Dict[str, AgentDelegation] = {}
|
|
|
|
logger.info("Ralph Agent Integration initialized")
|
|
logger.info(f"Loaded {len(self.registry.get_all_agents())} agents from registry")
|
|
|
|
def _load_context(self) -> RalphContext:
|
|
"""Load persistent context from file"""
|
|
if os.path.exists(self.context_file):
|
|
try:
|
|
with open(self.context_file, 'r') as f:
|
|
data = json.load(f)
|
|
return RalphContext.from_dict(data)
|
|
except Exception as e:
|
|
logger.warning(f"Could not load context: {e}")
|
|
|
|
return RalphContext()
|
|
|
|
def _save_context(self):
|
|
"""Save persistent context to file"""
|
|
os.makedirs(os.path.dirname(self.context_file), exist_ok=True)
|
|
|
|
with open(self.context_file, 'w') as f:
|
|
json.dump(self.context.to_dict(), f, indent=2)
|
|
|
|
def process_user_message(self, message: str, files_modified: List[str] = None) -> Dict:
|
|
"""
|
|
Process a user message and determine if agent delegation is needed
|
|
|
|
Args:
|
|
message: User's request
|
|
files_modified: List of files being modified (if any)
|
|
|
|
Returns:
|
|
Response dict with action and details
|
|
"""
|
|
start_time = time.time()
|
|
|
|
# Update context
|
|
if files_modified:
|
|
self.context.files_modified = files_modified
|
|
self.context.files_touched.update(files_modified)
|
|
|
|
# Analyze request
|
|
intent = self.analyzer.detect_intent(message)
|
|
phase = self.analyzer.detect_phase(message, {'files_modified': files_modified})
|
|
complexity = self.analyzer.estimate_complexity(message, files_modified or [])
|
|
|
|
# Detect if we need specialized agent
|
|
selection_request = create_selection_request(message, {
|
|
'files_modified': files_modified or [],
|
|
'files_touched': list(self.context.files_touched),
|
|
'previous_agents': list(self.context.active_agents),
|
|
'user_history': [],
|
|
'project_type': self.context.project_type
|
|
})
|
|
|
|
# Select best agent
|
|
selection = self.selector.select_agent(selection_request)
|
|
|
|
# Decide action based on confidence and score
|
|
response = {
|
|
'timestamp': datetime.now().isoformat(),
|
|
'user_message': message,
|
|
'processing_time': time.time() - start_time,
|
|
'analysis': {
|
|
'intent': intent.value,
|
|
'phase': phase.value,
|
|
'complexity': complexity
|
|
}
|
|
}
|
|
|
|
if selection.score >= 30 and selection.confidence >= 0.6:
|
|
# High confidence - delegate to specialized agent
|
|
delegation = self._delegate_to_agent(selection, message, files_modified)
|
|
|
|
response['action'] = 'delegated'
|
|
response['agent'] = {
|
|
'name': selection.agent_name,
|
|
'confidence': selection.confidence,
|
|
'score': selection.score,
|
|
'reasons': selection.reasons,
|
|
'estimated_duration': selection.estimated_duration
|
|
}
|
|
response['delegation'] = {
|
|
'task_id': delegation.task_id,
|
|
'status': delegation.status
|
|
}
|
|
|
|
logger.info(f"Delegated to {selection.agent_name} (confidence: {selection.confidence:.2f})")
|
|
|
|
elif selection.score >= 15:
|
|
# Medium confidence - suggest agent but ask for confirmation
|
|
response['action'] = 'suggest'
|
|
response['agent'] = {
|
|
'name': selection.agent_name,
|
|
'confidence': selection.confidence,
|
|
'score': selection.score,
|
|
'reasons': selection.reasons
|
|
}
|
|
response['suggestion'] = f"Would you like me to delegate this to the {selection.agent_name} agent?"
|
|
|
|
logger.info(f"Suggested {selection.agent_name} (confidence: {selection.confidence:.2f})")
|
|
|
|
else:
|
|
# Low confidence - handle with general Claude
|
|
response['action'] = 'handle'
|
|
response['agent'] = {
|
|
'name': 'claude',
|
|
'confidence': selection.confidence,
|
|
'note': 'No specialized agent found, handling directly'
|
|
}
|
|
|
|
logger.info("Handling directly (no specialized agent needed)")
|
|
|
|
# Save context
|
|
self._save_context()
|
|
|
|
return response
|
|
|
|
def _delegate_to_agent(self, selection, user_request: str, files: List[str] = None) -> AgentDelegation:
|
|
"""Delegate task to a specialized agent"""
|
|
import uuid
|
|
|
|
task_id = str(uuid.uuid4())
|
|
|
|
delegation = AgentDelegation(
|
|
task_id=task_id,
|
|
agent_name=selection.agent_name,
|
|
user_request=user_request,
|
|
delegated_at=time.time()
|
|
)
|
|
|
|
# Update context
|
|
self.context.active_agents.add(selection.agent_name)
|
|
self.context.delegation_history.append(delegation)
|
|
self.active_delegations[task_id] = delegation
|
|
|
|
# Execute delegation
|
|
self._execute_agent_delegation(delegation)
|
|
|
|
return delegation
|
|
|
|
def _execute_agent_delegation(self, delegation: AgentDelegation):
|
|
"""Execute the actual agent delegation"""
|
|
agent_name = delegation.agent_name
|
|
|
|
try:
|
|
delegation.status = "running"
|
|
delegation.started_at = time.time()
|
|
|
|
logger.info(f"Executing delegation {delegation.task_id} with agent {agent_name}")
|
|
|
|
# Call the agent via Claude Code's subagent system
|
|
result = self._call_subagent(agent_name, delegation.user_request)
|
|
|
|
delegation.status = "completed"
|
|
delegation.completed_at = time.time()
|
|
delegation.result = result
|
|
|
|
# Record performance
|
|
duration = delegation.completed_at - delegation.started_at
|
|
self._record_performance(agent_name, duration, success=True)
|
|
|
|
logger.info(f"Delegation {delegation.task_id} completed in {duration:.1f}s")
|
|
|
|
except Exception as e:
|
|
delegation.status = "failed"
|
|
delegation.completed_at = time.time()
|
|
delegation.error = str(e)
|
|
|
|
self._record_performance(agent_name, 0, success=False)
|
|
|
|
logger.error(f"Delegation {delegation.task_id} failed: {e}")
|
|
|
|
# Update context
|
|
if delegation.task_id in self.active_delegations:
|
|
del self.active_delegations[delegation.task_id]
|
|
|
|
self._save_context()
|
|
|
|
def _call_subagent(self, agent_name: str, request: str) -> str:
|
|
"""
|
|
Call a Claude Code subagent
|
|
|
|
This integrates with Claude Code's agent system to invoke
|
|
the specialized contains-studio agents.
|
|
"""
|
|
# Check if agent file exists
|
|
agent_path = self._find_agent_file(agent_name)
|
|
|
|
if not agent_path:
|
|
raise ValueError(f"Agent {agent_name} not found")
|
|
|
|
logger.info(f"Calling agent from: {agent_path}")
|
|
|
|
# Use Claude Code's Task tool to invoke the agent
|
|
# This would be called from within Claude Code itself
|
|
# For now, return a simulated response
|
|
return f"Task '{request}' would be delegated to {agent_name} agent"
|
|
|
|
def _find_agent_file(self, agent_name: str) -> Optional[str]:
|
|
"""Find the agent file in the agents directory"""
|
|
# Search in standard locations
|
|
search_paths = [
|
|
os.path.expanduser('~/.claude/agents'),
|
|
os.path.join(os.path.dirname(__file__), '../../agents'),
|
|
]
|
|
|
|
for base_path in search_paths:
|
|
if not os.path.exists(base_path):
|
|
continue
|
|
|
|
# Search in all subdirectories
|
|
for root, dirs, files in os.walk(base_path):
|
|
for file in files:
|
|
if file == f"{agent_name}.md":
|
|
return os.path.join(root, file)
|
|
|
|
return None
|
|
|
|
def _record_performance(self, agent_name: str, duration: float, success: bool):
|
|
"""Record agent performance for future selection"""
|
|
# Score based on duration and success
|
|
# Faster + successful = higher score
|
|
score = 1.0 if success else 0.0
|
|
|
|
if success and duration > 0:
|
|
# Normalize duration (5 min = 1.0, faster = higher)
|
|
duration_score = min(300 / duration, 1.5)
|
|
score = min(duration_score, 1.0)
|
|
|
|
if agent_name not in self.context.performance_scores:
|
|
self.context.performance_scores[agent_name] = []
|
|
|
|
self.context.performance_scores[agent_name].append(score)
|
|
|
|
# Keep only last 50
|
|
if len(self.context.performance_scores[agent_name]) > 50:
|
|
self.context.performance_scores[agent_name] = self.context.performance_scores[agent_name][-50:]
|
|
|
|
# Also update selector's cache
|
|
self.selector.record_performance(agent_name, score)
|
|
|
|
def get_agent_status(self) -> Dict:
|
|
"""Get current status of all agents"""
|
|
agents = self.registry.get_all_agents()
|
|
|
|
status = {
|
|
'total_agents': len(agents),
|
|
'active_agents': len(self.context.active_agents),
|
|
'agents_by_category': {},
|
|
'recent_delegations': [],
|
|
'performance_summary': {}
|
|
}
|
|
|
|
# Group by category
|
|
for agent_name, agent in agents.items():
|
|
cat = agent.category.value
|
|
if cat not in status['agents_by_category']:
|
|
status['agents_by_category'][cat] = []
|
|
status['agents_by_category'][cat].append({
|
|
'name': agent_name,
|
|
'description': agent.description[:100] + '...'
|
|
})
|
|
|
|
# Recent delegations
|
|
status['recent_delegations'] = [
|
|
{
|
|
'task_id': d.task_id,
|
|
'agent': d.agent_name,
|
|
'status': d.status,
|
|
'duration': (d.completed_at or time.time()) - d.started_at if d.started_at else None
|
|
}
|
|
for d in self.context.delegation_history[-10:]
|
|
]
|
|
|
|
# Performance summary
|
|
for agent_name, scores in self.context.performance_scores.items():
|
|
if scores:
|
|
status['performance_summary'][agent_name] = {
|
|
'avg_score': sum(scores) / len(scores),
|
|
'total_delegations': len(scores),
|
|
'last_score': scores[-1]
|
|
}
|
|
|
|
return status
|
|
|
|
def suggest_multi_agent_workflow(self, task: str) -> List[Dict]:
|
|
"""
|
|
Suggest a multi-agent workflow for a complex task
|
|
|
|
Args:
|
|
task: Complex task description
|
|
|
|
Returns:
|
|
List of agent delegations in execution order
|
|
"""
|
|
# Analyze task for sub-components
|
|
workflow = []
|
|
|
|
# Detect task type
|
|
task_lower = task.lower()
|
|
|
|
# Full feature development
|
|
if any(kw in task_lower for kw in ['build', 'create', 'implement', 'develop']):
|
|
workflow.extend([
|
|
{'phase': 'planning', 'agent': 'sprint-prioritizer', 'task': f'Plan: {task}'},
|
|
{'phase': 'design', 'agent': 'ui-designer', 'task': f'Design UI for: {task}'},
|
|
{'phase': 'implementation', 'agent': 'frontend-developer', 'task': f'Implement: {task}'},
|
|
{'phase': 'testing', 'agent': 'test-writer-fixer', 'task': f'Test: {task}'}
|
|
])
|
|
|
|
# App development
|
|
elif any(kw in task_lower for kw in ['app', 'mobile', 'ios', 'android']):
|
|
workflow.extend([
|
|
{'phase': 'planning', 'agent': 'rapid-prototyper', 'task': f'Prototype: {task}'},
|
|
{'phase': 'design', 'agent': 'ui-designer', 'task': f'Design app UI'},
|
|
{'phase': 'implementation', 'agent': 'mobile-app-builder', 'task': f'Build: {task}'},
|
|
{'phase': 'testing', 'agent': 'test-writer-fixer', 'task': f'Test app'},
|
|
{'phase': 'deployment', 'agent': 'app-store-optimizer', 'task': f'Optimize for app store'}
|
|
])
|
|
|
|
# Backend/API
|
|
elif any(kw in task_lower for kw in ['api', 'backend', 'server', 'database']):
|
|
workflow.extend([
|
|
{'phase': 'planning', 'agent': 'backend-architect', 'task': f'Design API: {task}'},
|
|
{'phase': 'implementation', 'agent': 'backend-architect', 'task': f'Implement: {task}'},
|
|
{'phase': 'testing', 'agent': 'api-tester', 'task': f'Test API'},
|
|
{'phase': 'deployment', 'agent': 'devops-automator', 'task': f'Deploy API'}
|
|
])
|
|
|
|
# AI/ML feature
|
|
elif any(kw in task_lower for kw in ['ai', 'ml', 'machine learning', 'recommendation']):
|
|
workflow.extend([
|
|
{'phase': 'planning', 'agent': 'ai-engineer', 'task': f'Plan AI feature: {task}'},
|
|
{'phase': 'implementation', 'agent': 'ai-engineer', 'task': f'Implement: {task}'},
|
|
{'phase': 'testing', 'agent': 'test-writer-fixer', 'task': f'Test AI feature'}
|
|
])
|
|
|
|
# Design task
|
|
elif any(kw in task_lower for kw in ['design', 'ui', 'ux', 'mockup']):
|
|
workflow.extend([
|
|
{'phase': 'design', 'agent': 'ux-researcher', 'task': f'Research users for: {task}'},
|
|
{'phase': 'design', 'agent': 'ui-designer', 'task': f'Create designs: {task}'},
|
|
{'phase': 'design', 'agent': 'whimsy-injector', 'task': f'Add delightful details'}
|
|
])
|
|
|
|
return workflow
|
|
|
|
def handle_multi_agent_task(self, task: str) -> Dict:
|
|
"""
|
|
Handle a complex task with multiple agents
|
|
|
|
Args:
|
|
task: Complex task description
|
|
|
|
Returns:
|
|
Results from all agents
|
|
"""
|
|
workflow = self.suggest_multi_agent_workflow(task)
|
|
|
|
results = {
|
|
'task': task,
|
|
'workflow': workflow,
|
|
'results': [],
|
|
'total_duration': 0,
|
|
'successful_phases': 0,
|
|
'failed_phases': 0
|
|
}
|
|
|
|
for step in workflow:
|
|
try:
|
|
logger.info(f"Executing phase '{step['phase']}' with {step['agent']}")
|
|
|
|
start_time = time.time()
|
|
|
|
# Delegate to agent
|
|
response = self.process_user_message(step['task'])
|
|
|
|
duration = time.time() - start_time
|
|
|
|
results['results'].append({
|
|
'phase': step['phase'],
|
|
'agent': step['agent'],
|
|
'duration': duration,
|
|
'status': response.get('action', 'unknown'),
|
|
'success': response.get('action') in ['delegated', 'handle']
|
|
})
|
|
|
|
results['total_duration'] += duration
|
|
|
|
if response.get('action') in ['delegated', 'handle']:
|
|
results['successful_phases'] += 1
|
|
else:
|
|
results['failed_phases'] += 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in phase '{step['phase']}': {e}")
|
|
results['results'].append({
|
|
'phase': step['phase'],
|
|
'agent': step['agent'],
|
|
'error': str(e),
|
|
'success': False
|
|
})
|
|
results['failed_phases'] += 1
|
|
|
|
return results
|
|
|
|
|
|
# CLI interface for testing
|
|
def main():
|
|
"""Main entry point for CLI usage"""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description='Ralph Agent Integration')
|
|
parser.add_argument('message', help='User message to process')
|
|
parser.add_argument('--files', nargs='*', help='Files being modified')
|
|
parser.add_argument('--multi-agent', action='store_true', help='Use multi-agent workflow')
|
|
parser.add_argument('--status', action='store_true', help='Show agent status')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Initialize integration
|
|
integration = RalphAgentIntegration()
|
|
|
|
if args.status:
|
|
# Show status
|
|
status = integration.get_agent_status()
|
|
print(json.dumps(status, indent=2))
|
|
elif args.multi_agent:
|
|
# Multi-agent workflow
|
|
results = integration.handle_multi_agent_task(args.message)
|
|
print(json.dumps(results, indent=2))
|
|
else:
|
|
# Single message
|
|
response = integration.process_user_message(args.message, args.files)
|
|
print(json.dumps(response, indent=2))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|