#!/usr/bin/env python3 """ Ralph Dynamic Agent Selector Intelligently selects and routes to the most appropriate agent based on: - User request analysis - Project context - File types being modified - Current task state - Agent capabilities and performance history """ import json import os import re import time from typing import Dict, List, Optional, Tuple, Set from dataclasses import dataclass, field from enum import Enum import logging from collections import defaultdict logger = logging.getLogger('ralph.selector') class TaskPhase(Enum): """Phases of a task lifecycle""" PLANNING = "planning" DESIGN = "design" IMPLEMENTATION = "implementation" TESTING = "testing" DEPLOYMENT = "deployment" MAINTENANCE = "maintenance" class IntentType(Enum): """Types of user intents""" CREATE = "create" MODIFY = "modify" FIX = "fix" ANALYZE = "analyze" DEPLOY = "deploy" TEST = "test" DESIGN = "design" RESEARCH = "research" OPTIMIZE = "optimize" @dataclass class AgentSelectionScore: """Score for an agent selection decision""" agent_name: str score: float reasons: List[str] = field(default_factory=list) confidence: float = 0.0 estimated_duration: int = 300 # seconds @dataclass class TaskContext: """Context about the current task""" phase: TaskPhase intent: IntentType files_modified: List[str] = field(default_factory=list) files_touched: List[str] = field(default_factory=list) previous_agents: Set[str] = field(default_factory=set) user_history: List[str] = field(default_factory=list) project_type: Optional[str] = None complexity_score: float = 5.0 time_constraint: Optional[int] = None # seconds @dataclass class SelectionRequest: """Request for agent selection""" user_message: str context: TaskContext available_agents: Dict[str, dict] performance_history: Dict[str, dict] = field(default_factory=dict) class DynamicAgentSelector: """ Dynamically selects the best agent for each task Uses multiple signals: - Semantic similarity to agent descriptions - Keyword matching - File type analysis - Task phase awareness - Historical performance - Collaborative filtering """ def __init__(self, registry): """Initialize the selector""" self.registry = registry self.selection_history: List[Dict] = [] self.performance_cache: Dict[str, List[float]] = defaultdict(list) def select_agent(self, request: SelectionRequest) -> AgentSelectionScore: """ Select the best agent for the given request Args: request: Selection request with context Returns: AgentSelectionScore with selected agent and reasoning """ logger.info(f"Selecting agent for: {request.user_message[:100]}...") # Get candidate agents candidates = self._get_candidates(request) if not candidates: # Fallback to general purpose return AgentSelectionScore( agent_name="claude", score=0.5, reasons=["No specialized agent found, using general purpose"], confidence=0.3 ) # Score each candidate scores = [] for agent_name in candidates: score = self._score_agent(agent_name, request) scores.append(score) # Sort by score scores.sort(key=lambda x: x.score, reverse=True) # Get best match best = scores[0] # Log selection self._log_selection(request, best) return best def _get_candidates(self, request: SelectionRequest) -> List[str]: """Get candidate agents for the request""" candidates = set() # Keyword matching keyword_matches = self.registry.find_agents_by_keywords(request.user_message) for agent_name, score, agent in keyword_matches[:5]: # Top 5 candidates.add(agent_name) # File-based matching if request.context.files_modified: file_matches = self.registry.find_agents_by_files(request.context.files_modified) for agent_name, score, agent in file_matches[:3]: candidates.add(agent_name) # Phase-based candidates phase_candidates = self._get_phase_candidates(request.context.phase) candidates.update(phase_candidates) # Intent-based candidates intent_candidates = self._get_intent_candidates(request.context.intent) candidates.update(intent_candidates) # Context-aware candidates context_candidates = self._get_context_candidates(request.context) candidates.update(context_candidates) return list(candidates) def _score_agent(self, agent_name: str, request: SelectionRequest) -> AgentSelectionScore: """Score an agent for the request""" agent = self.registry.get_agent(agent_name) if not agent: return AgentSelectionScore(agent_name=agent_name, score=0.0) score = 0.0 reasons = [] # 1. Keyword matching (0-40 points) keyword_score = self._score_keywords(agent, request.user_message) score += keyword_score if keyword_score > 0: reasons.append(f"Keyword match: {keyword_score:.1f}") # 2. Semantic similarity (0-25 points) semantic_score = self._score_semantic(agent, request) score += semantic_score if semantic_score > 0: reasons.append(f"Semantic fit: {semantic_score:.1f}") # 3. File type matching (0-20 points) file_score = self._score_files(agent, request.context) score += file_score if file_score > 0: reasons.append(f"File match: {file_score:.1f}") # 4. Phase appropriateness (0-10 points) phase_score = self._score_phase(agent, request.context.phase) score += phase_score if phase_score > 0: reasons.append(f"Phase fit: {phase_score:.1f}") # 5. Historical performance (0-5 points) perf_score = self._score_performance(agent_name) score += perf_score if perf_score > 0: reasons.append(f"Performance bonus: {perf_score:.1f}") # Calculate confidence confidence = min(score / 50.0, 1.0) # Estimate duration based on agent and complexity duration = self._estimate_duration(agent, request.context) return AgentSelectionScore( agent_name=agent_name, score=score, reasons=reasons, confidence=confidence, estimated_duration=duration ) def _score_keywords(self, agent, message: str) -> float: """Score keyword matching""" message_lower = message.lower() score = 0.0 for keyword in agent.keywords: if keyword.lower() in message_lower: # Rare keywords get more points weight = 10.0 / len(agent.keywords) score += weight # Direct name mention if agent.name.lower() in message_lower: score += 20.0 return min(score, 40.0) def _score_semantic(self, agent, request: SelectionRequest) -> float: """Score semantic similarity""" score = 0.0 # Check against examples for example in agent.examples: example_text = example['user_request'].lower() request_text = request.user_message.lower() # Simple word overlap example_words = set(example_text.split()) request_words = set(request_text.split()) if example_words and request_words: overlap = len(example_words & request_words) total = len(example_words | request_words) similarity = overlap / total if total > 0 else 0 score += similarity * 15.0 return min(score, 25.0) def _score_files(self, agent, context: TaskContext) -> float: """Score file type matching""" if not context.files_modified and not context.files_touched: return 0.0 all_files = context.files_modified + context.files_touched score = 0.0 for file_path in all_files: file_lower = file_path.lower() for pattern in agent.file_patterns: if pattern.lower() in file_lower: score += 5.0 return min(score, 20.0) def _score_phase(self, agent, phase: TaskPhase) -> float: """Score phase appropriateness""" phase_mappings = { TaskPhase.PLANNING: ['sprint-prioritizer', 'studio-producer'], TaskPhase.DESIGN: ['ui-designer', 'ux-researcher', 'brand-guardian'], TaskPhase.IMPLEMENTATION: ['frontend-developer', 'backend-architect', 'ai-engineer'], TaskPhase.TESTING: ['test-writer-fixer', 'api-tester'], TaskPhase.DEPLOYMENT: ['devops-automator', 'project-shipper'], TaskPhase.MAINTENANCE: ['infrastructure-maintainer', 'support-responder'] } recommended = phase_mappings.get(phase, []) if agent.name in recommended: return 10.0 return 0.0 def _score_performance(self, agent_name: str) -> float: """Score based on historical performance""" if agent_name not in self.performance_cache: return 2.5 # Neutral score for unknown scores = self.performance_cache[agent_name] if not scores: return 2.5 # Average recent performance (last 10) recent = scores[-10:] avg = sum(recent) / len(recent) # Convert to bonus return (avg - 0.5) * 5.0 # Range: -2.5 to +2.5 def _estimate_duration(self, agent, context: TaskContext) -> int: """Estimate task duration in seconds""" base_duration = 300 # 5 minutes # Adjust by complexity complexity_multiplier = 1.0 + (context.complexity_score / 10.0) # Adjust by agent speed (from category) category_speeds = { 'engineering': 1.2, 'design': 1.0, 'testing': 0.8, 'product': 1.0 } speed = category_speeds.get(agent.category.value, 1.0) duration = base_duration * complexity_multiplier * speed return int(duration) def _get_phase_candidates(self, phase: TaskPhase) -> List[str]: """Get agents appropriate for current phase""" phase_mappings = { TaskPhase.PLANNING: ['sprint-prioritizer', 'studio-producer', 'rapid-prototyper'], TaskPhase.DESIGN: ['ui-designer', 'ux-researcher', 'brand-guardian', 'visual-storyteller'], TaskPhase.IMPLEMENTATION: ['frontend-developer', 'backend-architect', 'ai-engineer', 'mobile-app-builder', 'rapid-prototyper'], TaskPhase.TESTING: ['test-writer-fixer', 'api-tester', 'performance-benchmarker'], TaskPhase.DEPLOYMENT: ['devops-automator', 'project-shipper'], TaskPhase.MAINTENANCE: ['infrastructure-maintainer', 'support-responder'] } return phase_mappings.get(phase, []) def _get_intent_candidates(self, intent: IntentType) -> List[str]: """Get agents for specific intent""" intent_mappings = { IntentType.CREATE: ['rapid-prototyper', 'frontend-developer', 'backend-architect'], IntentType.MODIFY: ['frontend-developer', 'backend-architect', 'ui-designer'], IntentType.FIX: ['test-writer-fixer', 'backend-architect', 'frontend-developer'], IntentType.ANALYZE: ['analytics-reporter', 'feedback-synthesizer', 'test-results-analyzer'], IntentType.DEPLOY: ['devops-automator', 'project-shipper'], IntentType.TEST: ['test-writer-fixer', 'api-tester', 'performance-benchmarker'], IntentType.DESIGN: ['ui-designer', 'ux-researcher', 'brand-guardian'], IntentType.RESEARCH: ['trend-researcher', 'ux-researcher'], IntentType.OPTIMIZE: ['performance-benchmarker', 'backend-architect'] } return intent_mappings.get(intent, []) def _get_context_candidates(self, context: TaskContext) -> List[str]: """Get agents based on context""" candidates = [] # Proactive agents proactive = self.registry.find_proactive_agents({ 'code_modified': len(context.files_modified) > 0, 'ui_modified': any(f.endswith(('.tsx', '.jsx', '.vue', '.svelte')) for f in context.files_modified), 'complexity': context.complexity_score }) candidates.extend(proactive) # Project type specific if context.project_type: type_candidates = self._get_project_type_candidates(context.project_type) candidates.extend(type_candidates) return candidates def _get_project_type_candidates(self, project_type: str) -> List[str]: """Get agents for specific project types""" mappings = { 'mobile': ['mobile-app-builder'], 'web': ['frontend-developer', 'ui-designer'], 'api': ['backend-architect', 'api-tester'], 'ml': ['ai-engineer'], 'game': ['frontend-developer', 'ui-designer'] } return mappings.get(project_type.lower(), []) def record_performance(self, agent_name: str, satisfaction: float): """Record agent performance for future selections""" self.performance_cache[agent_name].append(satisfaction) # Keep only last 100 if len(self.performance_cache[agent_name]) > 100: self.performance_cache[agent_name] = self.performance_cache[agent_name][-100:] def _log_selection(self, request: SelectionRequest, selection: AgentSelectionScore): """Log selection for analysis""" log_entry = { 'timestamp': time.time(), 'user_message': request.user_message, 'context': { 'phase': request.context.phase.value, 'intent': request.context.intent.value, 'files': request.context.files_modified }, 'selected_agent': selection.agent_name, 'score': selection.score, 'confidence': selection.confidence, 'reasons': selection.reasons } self.selection_history.append(log_entry) # Keep history manageable if len(self.selection_history) > 1000: self.selection_history = self.selection_history[-1000:] logger.info(f"Selected {selection.agent_name} (score: {selection.score:.1f}, confidence: {selection.confidence:.2f})") class RealTimeAnalyzer: """Analyzes user input in real-time to determine task characteristics""" @staticmethod def detect_intent(message: str) -> IntentType: """Detect the user's intent from their message""" message_lower = message.lower() intent_patterns = { IntentType.CREATE: ['create', 'build', 'make', 'add', 'implement', 'develop', 'scaffold'], IntentType.MODIFY: ['modify', 'change', 'update', 'refactor', 'edit', 'improve'], IntentType.FIX: ['fix', 'bug', 'error', 'issue', 'problem', 'broken', 'not working'], IntentType.ANALYZE: ['analyze', 'check', 'review', 'audit', 'examine', 'investigate'], IntentType.DEPLOY: ['deploy', 'release', 'ship', 'publish', 'launch'], IntentType.TEST: ['test', 'testing', 'verify', 'validate'], IntentType.DESIGN: ['design', 'ui', 'ux', 'mockup', 'wireframe'], IntentType.RESEARCH: ['research', 'find', 'look into', 'investigate', 'explore'], IntentType.OPTIMIZE: ['optimize', 'improve performance', 'speed up', 'faster'] } best_intent = IntentType.CREATE best_score = 0 for intent, patterns in intent_patterns.items(): score = sum(1 for pattern in patterns if pattern in message_lower) if score > best_score: best_score = score best_intent = intent return best_intent @staticmethod def detect_phase(message: str, context: Dict) -> TaskPhase: """Detect the current task phase""" message_lower = message.lower() phase_patterns = { TaskPhase.PLANNING: ['plan', 'roadmap', 'sprint', 'backlog', 'priority'], TaskPhase.DESIGN: ['design', 'mockup', 'wireframe', 'ui', 'ux'], TaskPhase.IMPLEMENTATION: ['implement', 'code', 'develop', 'build', 'create'], TaskPhase.TESTING: ['test', 'testing', 'verify', 'coverage'], TaskPhase.DEPLOYMENT: ['deploy', 'release', 'ship', 'launch'], TaskPhase.MAINTENANCE: ['monitor', 'maintain', 'update', 'fix'] } # Check message first for phase, patterns in phase_patterns.items(): if any(pattern in message_lower for pattern in patterns): return phase # Fall back to context files = context.get('files_modified', []) if any(f.endswith('.test.') for f in files): return TaskPhase.TESTING if any(f.endswith(('.tsx', '.jsx', '.vue')) for f in files): return TaskPhase.IMPLEMENTATION return TaskPhase.IMPLEMENTATION @staticmethod def estimate_complexity(message: str, files: List[str]) -> float: """Estimate task complexity (1-10)""" complexity = 5.0 # Base complexity # Message complexity words = message.split() complexity += min(len(words) / 50, 2.0) # File complexity complexity += min(len(files) / 5, 2.0) # Keyword complexity complex_keywords = ['architecture', 'integration', 'migration', 'refactor', 'system'] complexity += sum(0.5 for kw in complex_keywords if kw in message.lower()) return min(complexity, 10.0) @staticmethod def detect_project_type(files: List[str]) -> Optional[str]: """Detect project type from files""" if not files: return None file_exts = [os.path.splitext(f)[1] for f in files] if '.swift' in file_exts or '.kt' in file_exts: return 'mobile' elif '.tsx' in file_exts or '.jsx' in file_exts: return 'web' elif any(f.endswith('api.py') or f.endswith('controller.py') for f in files): return 'api' elif any('model' in f for f in files): return 'ml' return 'web' # Default def create_selection_request(user_message: str, context: Dict) -> SelectionRequest: """Create a selection request from raw data""" analyzer = RealTimeAnalyzer() return SelectionRequest( user_message=user_message, context=TaskContext( phase=analyzer.detect_phase(user_message, context), intent=analyzer.detect_intent(user_message), files_modified=context.get('files_modified', []), files_touched=context.get('files_touched', []), previous_agents=set(context.get('previous_agents', [])), user_history=context.get('user_history', []), project_type=analyzer.detect_project_type(context.get('files_modified', [])), complexity_score=analyzer.estimate_complexity( user_message, context.get('files_modified', []) ) ), available_agents=context.get('available_agents', {}), performance_history=context.get('performance_history', {}) )