## Ralph Skill - Complete Python Implementation - __main__.py: Main entry point for Ralph autonomous agent - agent_capability_registry.py: Agent capability registry (FIXED syntax error) - dynamic_agent_selector.py: Dynamic agent selection logic - meta_agent_orchestrator.py: Meta-orchestration for multi-agent workflows - worker_agent.py: Worker agent implementation - ralph_agent_integration.py: Integration with Claude Code - superpowers_integration.py: Superpowers framework integration - observability_dashboard.html: Real-time observability UI - observability_server.py: Dashboard server - multi-agent-architecture.md: Architecture documentation - SUPERPOWERS_INTEGRATION.md: Integration guide ## Framework Integration Status - ✅ codebase-indexer (Chippery): Complete implementation with 5 scripts - ✅ ralph (Ralph Orchestrator): Complete Python implementation - ✅ always-use-superpowers: Declarative skill (SKILL.md) - ✅ auto-superpowers: Declarative skill (SKILL.md) - ✅ auto-dispatcher: Declarative skill (Ralph framework) - ✅ autonomous-planning: Declarative skill (Ralph framework) - ✅ mcp-client: Declarative skill (AGIAgent/Agno framework) ## Agent Updates - Updated README.md with latest integration status - Added framework integration agents Token Savings: ~99% via semantic codebase indexing 🤖 Generated with Claude Code Co-Authored-By: Claude <noreply@anthropic.com>
548 lines
19 KiB
Python
Executable File
548 lines
19 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Ralph Dynamic Agent Selector
|
|
|
|
Intelligently selects and routes to the most appropriate agent based on:
|
|
- User request analysis
|
|
- Project context
|
|
- File types being modified
|
|
- Current task state
|
|
- Agent capabilities and performance history
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import time
|
|
from typing import Dict, List, Optional, Tuple, Set
|
|
from dataclasses import dataclass, field
|
|
from enum import Enum
|
|
import logging
|
|
from collections import defaultdict
|
|
|
|
logger = logging.getLogger('ralph.selector')
|
|
|
|
|
|
class TaskPhase(Enum):
|
|
"""Phases of a task lifecycle"""
|
|
PLANNING = "planning"
|
|
DESIGN = "design"
|
|
IMPLEMENTATION = "implementation"
|
|
TESTING = "testing"
|
|
DEPLOYMENT = "deployment"
|
|
MAINTENANCE = "maintenance"
|
|
|
|
|
|
class IntentType(Enum):
|
|
"""Types of user intents"""
|
|
CREATE = "create"
|
|
MODIFY = "modify"
|
|
FIX = "fix"
|
|
ANALYZE = "analyze"
|
|
DEPLOY = "deploy"
|
|
TEST = "test"
|
|
DESIGN = "design"
|
|
RESEARCH = "research"
|
|
OPTIMIZE = "optimize"
|
|
|
|
|
|
@dataclass
|
|
class AgentSelectionScore:
|
|
"""Score for an agent selection decision"""
|
|
agent_name: str
|
|
score: float
|
|
reasons: List[str] = field(default_factory=list)
|
|
confidence: float = 0.0
|
|
estimated_duration: int = 300 # seconds
|
|
|
|
|
|
@dataclass
|
|
class TaskContext:
|
|
"""Context about the current task"""
|
|
phase: TaskPhase
|
|
intent: IntentType
|
|
files_modified: List[str] = field(default_factory=list)
|
|
files_touched: List[str] = field(default_factory=list)
|
|
previous_agents: Set[str] = field(default_factory=set)
|
|
user_history: List[str] = field(default_factory=list)
|
|
project_type: Optional[str] = None
|
|
complexity_score: float = 5.0
|
|
time_constraint: Optional[int] = None # seconds
|
|
|
|
|
|
@dataclass
|
|
class SelectionRequest:
|
|
"""Request for agent selection"""
|
|
user_message: str
|
|
context: TaskContext
|
|
available_agents: Dict[str, dict]
|
|
performance_history: Dict[str, dict] = field(default_factory=dict)
|
|
|
|
|
|
class DynamicAgentSelector:
|
|
"""
|
|
Dynamically selects the best agent for each task
|
|
|
|
Uses multiple signals:
|
|
- Semantic similarity to agent descriptions
|
|
- Keyword matching
|
|
- File type analysis
|
|
- Task phase awareness
|
|
- Historical performance
|
|
- Collaborative filtering
|
|
"""
|
|
|
|
def __init__(self, registry):
|
|
"""Initialize the selector"""
|
|
self.registry = registry
|
|
self.selection_history: List[Dict] = []
|
|
self.performance_cache: Dict[str, List[float]] = defaultdict(list)
|
|
|
|
def select_agent(self, request: SelectionRequest) -> AgentSelectionScore:
|
|
"""
|
|
Select the best agent for the given request
|
|
|
|
Args:
|
|
request: Selection request with context
|
|
|
|
Returns:
|
|
AgentSelectionScore with selected agent and reasoning
|
|
"""
|
|
logger.info(f"Selecting agent for: {request.user_message[:100]}...")
|
|
|
|
# Get candidate agents
|
|
candidates = self._get_candidates(request)
|
|
|
|
if not candidates:
|
|
# Fallback to general purpose
|
|
return AgentSelectionScore(
|
|
agent_name="claude",
|
|
score=0.5,
|
|
reasons=["No specialized agent found, using general purpose"],
|
|
confidence=0.3
|
|
)
|
|
|
|
# Score each candidate
|
|
scores = []
|
|
for agent_name in candidates:
|
|
score = self._score_agent(agent_name, request)
|
|
scores.append(score)
|
|
|
|
# Sort by score
|
|
scores.sort(key=lambda x: x.score, reverse=True)
|
|
|
|
# Get best match
|
|
best = scores[0]
|
|
|
|
# Log selection
|
|
self._log_selection(request, best)
|
|
|
|
return best
|
|
|
|
def _get_candidates(self, request: SelectionRequest) -> List[str]:
|
|
"""Get candidate agents for the request"""
|
|
candidates = set()
|
|
|
|
# Keyword matching
|
|
keyword_matches = self.registry.find_agents_by_keywords(request.user_message)
|
|
for agent_name, score, agent in keyword_matches[:5]: # Top 5
|
|
candidates.add(agent_name)
|
|
|
|
# File-based matching
|
|
if request.context.files_modified:
|
|
file_matches = self.registry.find_agents_by_files(request.context.files_modified)
|
|
for agent_name, score, agent in file_matches[:3]:
|
|
candidates.add(agent_name)
|
|
|
|
# Phase-based candidates
|
|
phase_candidates = self._get_phase_candidates(request.context.phase)
|
|
candidates.update(phase_candidates)
|
|
|
|
# Intent-based candidates
|
|
intent_candidates = self._get_intent_candidates(request.context.intent)
|
|
candidates.update(intent_candidates)
|
|
|
|
# Context-aware candidates
|
|
context_candidates = self._get_context_candidates(request.context)
|
|
candidates.update(context_candidates)
|
|
|
|
return list(candidates)
|
|
|
|
def _score_agent(self, agent_name: str, request: SelectionRequest) -> AgentSelectionScore:
|
|
"""Score an agent for the request"""
|
|
agent = self.registry.get_agent(agent_name)
|
|
if not agent:
|
|
return AgentSelectionScore(agent_name=agent_name, score=0.0)
|
|
|
|
score = 0.0
|
|
reasons = []
|
|
|
|
# 1. Keyword matching (0-40 points)
|
|
keyword_score = self._score_keywords(agent, request.user_message)
|
|
score += keyword_score
|
|
if keyword_score > 0:
|
|
reasons.append(f"Keyword match: {keyword_score:.1f}")
|
|
|
|
# 2. Semantic similarity (0-25 points)
|
|
semantic_score = self._score_semantic(agent, request)
|
|
score += semantic_score
|
|
if semantic_score > 0:
|
|
reasons.append(f"Semantic fit: {semantic_score:.1f}")
|
|
|
|
# 3. File type matching (0-20 points)
|
|
file_score = self._score_files(agent, request.context)
|
|
score += file_score
|
|
if file_score > 0:
|
|
reasons.append(f"File match: {file_score:.1f}")
|
|
|
|
# 4. Phase appropriateness (0-10 points)
|
|
phase_score = self._score_phase(agent, request.context.phase)
|
|
score += phase_score
|
|
if phase_score > 0:
|
|
reasons.append(f"Phase fit: {phase_score:.1f}")
|
|
|
|
# 5. Historical performance (0-5 points)
|
|
perf_score = self._score_performance(agent_name)
|
|
score += perf_score
|
|
if perf_score > 0:
|
|
reasons.append(f"Performance bonus: {perf_score:.1f}")
|
|
|
|
# Calculate confidence
|
|
confidence = min(score / 50.0, 1.0)
|
|
|
|
# Estimate duration based on agent and complexity
|
|
duration = self._estimate_duration(agent, request.context)
|
|
|
|
return AgentSelectionScore(
|
|
agent_name=agent_name,
|
|
score=score,
|
|
reasons=reasons,
|
|
confidence=confidence,
|
|
estimated_duration=duration
|
|
)
|
|
|
|
def _score_keywords(self, agent, message: str) -> float:
|
|
"""Score keyword matching"""
|
|
message_lower = message.lower()
|
|
score = 0.0
|
|
|
|
for keyword in agent.keywords:
|
|
if keyword.lower() in message_lower:
|
|
# Rare keywords get more points
|
|
weight = 10.0 / len(agent.keywords)
|
|
score += weight
|
|
|
|
# Direct name mention
|
|
if agent.name.lower() in message_lower:
|
|
score += 20.0
|
|
|
|
return min(score, 40.0)
|
|
|
|
def _score_semantic(self, agent, request: SelectionRequest) -> float:
|
|
"""Score semantic similarity"""
|
|
score = 0.0
|
|
|
|
# Check against examples
|
|
for example in agent.examples:
|
|
example_text = example['user_request'].lower()
|
|
request_text = request.user_message.lower()
|
|
|
|
# Simple word overlap
|
|
example_words = set(example_text.split())
|
|
request_words = set(request_text.split())
|
|
|
|
if example_words and request_words:
|
|
overlap = len(example_words & request_words)
|
|
total = len(example_words | request_words)
|
|
similarity = overlap / total if total > 0 else 0
|
|
|
|
score += similarity * 15.0
|
|
|
|
return min(score, 25.0)
|
|
|
|
def _score_files(self, agent, context: TaskContext) -> float:
|
|
"""Score file type matching"""
|
|
if not context.files_modified and not context.files_touched:
|
|
return 0.0
|
|
|
|
all_files = context.files_modified + context.files_touched
|
|
score = 0.0
|
|
|
|
for file_path in all_files:
|
|
file_lower = file_path.lower()
|
|
|
|
for pattern in agent.file_patterns:
|
|
if pattern.lower() in file_lower:
|
|
score += 5.0
|
|
|
|
return min(score, 20.0)
|
|
|
|
def _score_phase(self, agent, phase: TaskPhase) -> float:
|
|
"""Score phase appropriateness"""
|
|
phase_mappings = {
|
|
TaskPhase.PLANNING: ['sprint-prioritizer', 'studio-producer'],
|
|
TaskPhase.DESIGN: ['ui-designer', 'ux-researcher', 'brand-guardian'],
|
|
TaskPhase.IMPLEMENTATION: ['frontend-developer', 'backend-architect', 'ai-engineer'],
|
|
TaskPhase.TESTING: ['test-writer-fixer', 'api-tester'],
|
|
TaskPhase.DEPLOYMENT: ['devops-automator', 'project-shipper'],
|
|
TaskPhase.MAINTENANCE: ['infrastructure-maintainer', 'support-responder']
|
|
}
|
|
|
|
recommended = phase_mappings.get(phase, [])
|
|
if agent.name in recommended:
|
|
return 10.0
|
|
|
|
return 0.0
|
|
|
|
def _score_performance(self, agent_name: str) -> float:
|
|
"""Score based on historical performance"""
|
|
if agent_name not in self.performance_cache:
|
|
return 2.5 # Neutral score for unknown
|
|
|
|
scores = self.performance_cache[agent_name]
|
|
if not scores:
|
|
return 2.5
|
|
|
|
# Average recent performance (last 10)
|
|
recent = scores[-10:]
|
|
avg = sum(recent) / len(recent)
|
|
|
|
# Convert to bonus
|
|
return (avg - 0.5) * 5.0 # Range: -2.5 to +2.5
|
|
|
|
def _estimate_duration(self, agent, context: TaskContext) -> int:
|
|
"""Estimate task duration in seconds"""
|
|
base_duration = 300 # 5 minutes
|
|
|
|
# Adjust by complexity
|
|
complexity_multiplier = 1.0 + (context.complexity_score / 10.0)
|
|
|
|
# Adjust by agent speed (from category)
|
|
category_speeds = {
|
|
'engineering': 1.2,
|
|
'design': 1.0,
|
|
'testing': 0.8,
|
|
'product': 1.0
|
|
}
|
|
|
|
speed = category_speeds.get(agent.category.value, 1.0)
|
|
|
|
duration = base_duration * complexity_multiplier * speed
|
|
|
|
return int(duration)
|
|
|
|
def _get_phase_candidates(self, phase: TaskPhase) -> List[str]:
|
|
"""Get agents appropriate for current phase"""
|
|
phase_mappings = {
|
|
TaskPhase.PLANNING: ['sprint-prioritizer', 'studio-producer', 'rapid-prototyper'],
|
|
TaskPhase.DESIGN: ['ui-designer', 'ux-researcher', 'brand-guardian', 'visual-storyteller'],
|
|
TaskPhase.IMPLEMENTATION: ['frontend-developer', 'backend-architect', 'ai-engineer',
|
|
'mobile-app-builder', 'rapid-prototyper'],
|
|
TaskPhase.TESTING: ['test-writer-fixer', 'api-tester', 'performance-benchmarker'],
|
|
TaskPhase.DEPLOYMENT: ['devops-automator', 'project-shipper'],
|
|
TaskPhase.MAINTENANCE: ['infrastructure-maintainer', 'support-responder']
|
|
}
|
|
|
|
return phase_mappings.get(phase, [])
|
|
|
|
def _get_intent_candidates(self, intent: IntentType) -> List[str]:
|
|
"""Get agents for specific intent"""
|
|
intent_mappings = {
|
|
IntentType.CREATE: ['rapid-prototyper', 'frontend-developer', 'backend-architect'],
|
|
IntentType.MODIFY: ['frontend-developer', 'backend-architect', 'ui-designer'],
|
|
IntentType.FIX: ['test-writer-fixer', 'backend-architect', 'frontend-developer'],
|
|
IntentType.ANALYZE: ['analytics-reporter', 'feedback-synthesizer', 'test-results-analyzer'],
|
|
IntentType.DEPLOY: ['devops-automator', 'project-shipper'],
|
|
IntentType.TEST: ['test-writer-fixer', 'api-tester', 'performance-benchmarker'],
|
|
IntentType.DESIGN: ['ui-designer', 'ux-researcher', 'brand-guardian'],
|
|
IntentType.RESEARCH: ['trend-researcher', 'ux-researcher'],
|
|
IntentType.OPTIMIZE: ['performance-benchmarker', 'backend-architect']
|
|
}
|
|
|
|
return intent_mappings.get(intent, [])
|
|
|
|
def _get_context_candidates(self, context: TaskContext) -> List[str]:
|
|
"""Get agents based on context"""
|
|
candidates = []
|
|
|
|
# Proactive agents
|
|
proactive = self.registry.find_proactive_agents({
|
|
'code_modified': len(context.files_modified) > 0,
|
|
'ui_modified': any(f.endswith(('.tsx', '.jsx', '.vue', '.svelte'))
|
|
for f in context.files_modified),
|
|
'complexity': context.complexity_score
|
|
})
|
|
candidates.extend(proactive)
|
|
|
|
# Project type specific
|
|
if context.project_type:
|
|
type_candidates = self._get_project_type_candidates(context.project_type)
|
|
candidates.extend(type_candidates)
|
|
|
|
return candidates
|
|
|
|
def _get_project_type_candidates(self, project_type: str) -> List[str]:
|
|
"""Get agents for specific project types"""
|
|
mappings = {
|
|
'mobile': ['mobile-app-builder'],
|
|
'web': ['frontend-developer', 'ui-designer'],
|
|
'api': ['backend-architect', 'api-tester'],
|
|
'ml': ['ai-engineer'],
|
|
'game': ['frontend-developer', 'ui-designer']
|
|
}
|
|
|
|
return mappings.get(project_type.lower(), [])
|
|
|
|
def record_performance(self, agent_name: str, satisfaction: float):
|
|
"""Record agent performance for future selections"""
|
|
self.performance_cache[agent_name].append(satisfaction)
|
|
|
|
# Keep only last 100
|
|
if len(self.performance_cache[agent_name]) > 100:
|
|
self.performance_cache[agent_name] = self.performance_cache[agent_name][-100:]
|
|
|
|
def _log_selection(self, request: SelectionRequest, selection: AgentSelectionScore):
|
|
"""Log selection for analysis"""
|
|
log_entry = {
|
|
'timestamp': time.time(),
|
|
'user_message': request.user_message,
|
|
'context': {
|
|
'phase': request.context.phase.value,
|
|
'intent': request.context.intent.value,
|
|
'files': request.context.files_modified
|
|
},
|
|
'selected_agent': selection.agent_name,
|
|
'score': selection.score,
|
|
'confidence': selection.confidence,
|
|
'reasons': selection.reasons
|
|
}
|
|
|
|
self.selection_history.append(log_entry)
|
|
|
|
# Keep history manageable
|
|
if len(self.selection_history) > 1000:
|
|
self.selection_history = self.selection_history[-1000:]
|
|
|
|
logger.info(f"Selected {selection.agent_name} (score: {selection.score:.1f}, confidence: {selection.confidence:.2f})")
|
|
|
|
|
|
class RealTimeAnalyzer:
|
|
"""Analyzes user input in real-time to determine task characteristics"""
|
|
|
|
@staticmethod
|
|
def detect_intent(message: str) -> IntentType:
|
|
"""Detect the user's intent from their message"""
|
|
message_lower = message.lower()
|
|
|
|
intent_patterns = {
|
|
IntentType.CREATE: ['create', 'build', 'make', 'add', 'implement', 'develop', 'scaffold'],
|
|
IntentType.MODIFY: ['modify', 'change', 'update', 'refactor', 'edit', 'improve'],
|
|
IntentType.FIX: ['fix', 'bug', 'error', 'issue', 'problem', 'broken', 'not working'],
|
|
IntentType.ANALYZE: ['analyze', 'check', 'review', 'audit', 'examine', 'investigate'],
|
|
IntentType.DEPLOY: ['deploy', 'release', 'ship', 'publish', 'launch'],
|
|
IntentType.TEST: ['test', 'testing', 'verify', 'validate'],
|
|
IntentType.DESIGN: ['design', 'ui', 'ux', 'mockup', 'wireframe'],
|
|
IntentType.RESEARCH: ['research', 'find', 'look into', 'investigate', 'explore'],
|
|
IntentType.OPTIMIZE: ['optimize', 'improve performance', 'speed up', 'faster']
|
|
}
|
|
|
|
best_intent = IntentType.CREATE
|
|
best_score = 0
|
|
|
|
for intent, patterns in intent_patterns.items():
|
|
score = sum(1 for pattern in patterns if pattern in message_lower)
|
|
if score > best_score:
|
|
best_score = score
|
|
best_intent = intent
|
|
|
|
return best_intent
|
|
|
|
@staticmethod
|
|
def detect_phase(message: str, context: Dict) -> TaskPhase:
|
|
"""Detect the current task phase"""
|
|
message_lower = message.lower()
|
|
|
|
phase_patterns = {
|
|
TaskPhase.PLANNING: ['plan', 'roadmap', 'sprint', 'backlog', 'priority'],
|
|
TaskPhase.DESIGN: ['design', 'mockup', 'wireframe', 'ui', 'ux'],
|
|
TaskPhase.IMPLEMENTATION: ['implement', 'code', 'develop', 'build', 'create'],
|
|
TaskPhase.TESTING: ['test', 'testing', 'verify', 'coverage'],
|
|
TaskPhase.DEPLOYMENT: ['deploy', 'release', 'ship', 'launch'],
|
|
TaskPhase.MAINTENANCE: ['monitor', 'maintain', 'update', 'fix']
|
|
}
|
|
|
|
# Check message first
|
|
for phase, patterns in phase_patterns.items():
|
|
if any(pattern in message_lower for pattern in patterns):
|
|
return phase
|
|
|
|
# Fall back to context
|
|
files = context.get('files_modified', [])
|
|
if any(f.endswith('.test.') for f in files):
|
|
return TaskPhase.TESTING
|
|
if any(f.endswith(('.tsx', '.jsx', '.vue')) for f in files):
|
|
return TaskPhase.IMPLEMENTATION
|
|
|
|
return TaskPhase.IMPLEMENTATION
|
|
|
|
@staticmethod
|
|
def estimate_complexity(message: str, files: List[str]) -> float:
|
|
"""Estimate task complexity (1-10)"""
|
|
complexity = 5.0 # Base complexity
|
|
|
|
# Message complexity
|
|
words = message.split()
|
|
complexity += min(len(words) / 50, 2.0)
|
|
|
|
# File complexity
|
|
complexity += min(len(files) / 5, 2.0)
|
|
|
|
# Keyword complexity
|
|
complex_keywords = ['architecture', 'integration', 'migration', 'refactor', 'system']
|
|
complexity += sum(0.5 for kw in complex_keywords if kw in message.lower())
|
|
|
|
return min(complexity, 10.0)
|
|
|
|
@staticmethod
|
|
def detect_project_type(files: List[str]) -> Optional[str]:
|
|
"""Detect project type from files"""
|
|
if not files:
|
|
return None
|
|
|
|
file_exts = [os.path.splitext(f)[1] for f in files]
|
|
|
|
if '.swift' in file_exts or '.kt' in file_exts:
|
|
return 'mobile'
|
|
elif '.tsx' in file_exts or '.jsx' in file_exts:
|
|
return 'web'
|
|
elif any(f.endswith('api.py') or f.endswith('controller.py') for f in files):
|
|
return 'api'
|
|
elif any('model' in f for f in files):
|
|
return 'ml'
|
|
|
|
return 'web' # Default
|
|
|
|
|
|
def create_selection_request(user_message: str, context: Dict) -> SelectionRequest:
|
|
"""Create a selection request from raw data"""
|
|
analyzer = RealTimeAnalyzer()
|
|
|
|
return SelectionRequest(
|
|
user_message=user_message,
|
|
context=TaskContext(
|
|
phase=analyzer.detect_phase(user_message, context),
|
|
intent=analyzer.detect_intent(user_message),
|
|
files_modified=context.get('files_modified', []),
|
|
files_touched=context.get('files_touched', []),
|
|
previous_agents=set(context.get('previous_agents', [])),
|
|
user_history=context.get('user_history', []),
|
|
project_type=analyzer.detect_project_type(context.get('files_modified', [])),
|
|
complexity_score=analyzer.estimate_complexity(
|
|
user_message,
|
|
context.get('files_modified', [])
|
|
)
|
|
),
|
|
available_agents=context.get('available_agents', {}),
|
|
performance_history=context.get('performance_history', {})
|
|
)
|