#!/usr/bin/env python3 """ Ralph Agent Integration System Main integration layer that ties together: - Agent Capability Registry - Dynamic Agent Selector - Real-Time Need Detection - Multi-Agent Orchestration - Performance Tracking This system allows Ralph to automatically select and delegate to the most appropriate contains-studio agent based on real-time task analysis. """ import json import os import sys import time import subprocess import logging from typing import Dict, List, Optional, Any, Set from dataclasses import dataclass, field, asdict from enum import Enum from datetime import datetime # Add parent directory to path for imports sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from agent_capability_registry import AgentCapabilityRegistry, AgentCategory from dynamic_agent_selector import DynamicAgentSelector, SelectionRequest, TaskContext, RealTimeAnalyzer logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('ralph.integration') @dataclass class AgentDelegation: """Record of an agent delegation""" task_id: str agent_name: str user_request: str delegated_at: float started_at: Optional[float] = None completed_at: Optional[float] = None status: str = "pending" # pending, running, completed, failed result: Optional[str] = None error: Optional[str] = None satisfaction_score: Optional[float] = None @dataclass class RalphContext: """Persistent context for Ralph's decision making""" current_task: Optional[str] = None task_phase: str = "planning" files_modified: List[str] = field(default_factory=list) files_touched: List[str] = field(default_factory=set) active_agents: Set[str] = field(default_factory=set) delegation_history: List[AgentDelegation] = field(default_factory=list) performance_scores: Dict[str, List[float]] = field(default_factory=dict) project_type: Optional[str] = None session_start: float = field(default_factory=time.time) def to_dict(self) -> Dict: """Convert to dictionary for serialization""" data = asdict(self) data['active_agents'] = list(data['active_agents']) return data @classmethod def from_dict(cls, data: Dict) -> 'RalphContext': """Create from dictionary""" data['active_agents'] = set(data.get('active_agents', [])) return cls(**data) class RalphAgentIntegration: """ Main integration system for Ralph's agent orchestration Responsibilities: - Analyze user requests in real-time - Select appropriate specialized agents - Delegate tasks and track execution - Monitor performance and adapt - Coordinate multi-agent workflows """ def __init__(self, agents_dir: Optional[str] = None, context_file: Optional[str] = None): """Initialize the integration system""" # Initialize components self.registry = AgentCapabilityRegistry(agents_dir) self.selector = DynamicAgentSelector(self.registry) self.analyzer = RealTimeAnalyzer() # Load or create context self.context_file = context_file or '.ralph/context.json' self.context = self._load_context() # Active delegations self.active_delegations: Dict[str, AgentDelegation] = {} logger.info("Ralph Agent Integration initialized") logger.info(f"Loaded {len(self.registry.get_all_agents())} agents from registry") def _load_context(self) -> RalphContext: """Load persistent context from file""" if os.path.exists(self.context_file): try: with open(self.context_file, 'r') as f: data = json.load(f) return RalphContext.from_dict(data) except Exception as e: logger.warning(f"Could not load context: {e}") return RalphContext() def _save_context(self): """Save persistent context to file""" os.makedirs(os.path.dirname(self.context_file), exist_ok=True) with open(self.context_file, 'w') as f: json.dump(self.context.to_dict(), f, indent=2) def process_user_message(self, message: str, files_modified: List[str] = None) -> Dict: """ Process a user message and determine if agent delegation is needed Args: message: User's request files_modified: List of files being modified (if any) Returns: Response dict with action and details """ start_time = time.time() # Update context if files_modified: self.context.files_modified = files_modified self.context.files_touched.update(files_modified) # Analyze request intent = self.analyzer.detect_intent(message) phase = self.analyzer.detect_phase(message, {'files_modified': files_modified}) complexity = self.analyzer.estimate_complexity(message, files_modified or []) # Detect if we need specialized agent selection_request = create_selection_request(message, { 'files_modified': files_modified or [], 'files_touched': list(self.context.files_touched), 'previous_agents': list(self.context.active_agents), 'user_history': [], 'project_type': self.context.project_type }) # Select best agent selection = self.selector.select_agent(selection_request) # Decide action based on confidence and score response = { 'timestamp': datetime.now().isoformat(), 'user_message': message, 'processing_time': time.time() - start_time, 'analysis': { 'intent': intent.value, 'phase': phase.value, 'complexity': complexity } } if selection.score >= 30 and selection.confidence >= 0.6: # High confidence - delegate to specialized agent delegation = self._delegate_to_agent(selection, message, files_modified) response['action'] = 'delegated' response['agent'] = { 'name': selection.agent_name, 'confidence': selection.confidence, 'score': selection.score, 'reasons': selection.reasons, 'estimated_duration': selection.estimated_duration } response['delegation'] = { 'task_id': delegation.task_id, 'status': delegation.status } logger.info(f"Delegated to {selection.agent_name} (confidence: {selection.confidence:.2f})") elif selection.score >= 15: # Medium confidence - suggest agent but ask for confirmation response['action'] = 'suggest' response['agent'] = { 'name': selection.agent_name, 'confidence': selection.confidence, 'score': selection.score, 'reasons': selection.reasons } response['suggestion'] = f"Would you like me to delegate this to the {selection.agent_name} agent?" logger.info(f"Suggested {selection.agent_name} (confidence: {selection.confidence:.2f})") else: # Low confidence - handle with general Claude response['action'] = 'handle' response['agent'] = { 'name': 'claude', 'confidence': selection.confidence, 'note': 'No specialized agent found, handling directly' } logger.info("Handling directly (no specialized agent needed)") # Save context self._save_context() return response def _delegate_to_agent(self, selection, user_request: str, files: List[str] = None) -> AgentDelegation: """Delegate task to a specialized agent""" import uuid task_id = str(uuid.uuid4()) delegation = AgentDelegation( task_id=task_id, agent_name=selection.agent_name, user_request=user_request, delegated_at=time.time() ) # Update context self.context.active_agents.add(selection.agent_name) self.context.delegation_history.append(delegation) self.active_delegations[task_id] = delegation # Execute delegation self._execute_agent_delegation(delegation) return delegation def _execute_agent_delegation(self, delegation: AgentDelegation): """Execute the actual agent delegation""" agent_name = delegation.agent_name try: delegation.status = "running" delegation.started_at = time.time() logger.info(f"Executing delegation {delegation.task_id} with agent {agent_name}") # Call the agent via Claude Code's subagent system result = self._call_subagent(agent_name, delegation.user_request) delegation.status = "completed" delegation.completed_at = time.time() delegation.result = result # Record performance duration = delegation.completed_at - delegation.started_at self._record_performance(agent_name, duration, success=True) logger.info(f"Delegation {delegation.task_id} completed in {duration:.1f}s") except Exception as e: delegation.status = "failed" delegation.completed_at = time.time() delegation.error = str(e) self._record_performance(agent_name, 0, success=False) logger.error(f"Delegation {delegation.task_id} failed: {e}") # Update context if delegation.task_id in self.active_delegations: del self.active_delegations[delegation.task_id] self._save_context() def _call_subagent(self, agent_name: str, request: str) -> str: """ Call a Claude Code subagent This integrates with Claude Code's agent system to invoke the specialized contains-studio agents. """ # Check if agent file exists agent_path = self._find_agent_file(agent_name) if not agent_path: raise ValueError(f"Agent {agent_name} not found") logger.info(f"Calling agent from: {agent_path}") # Use Claude Code's Task tool to invoke the agent # This would be called from within Claude Code itself # For now, return a simulated response return f"Task '{request}' would be delegated to {agent_name} agent" def _find_agent_file(self, agent_name: str) -> Optional[str]: """Find the agent file in the agents directory""" # Search in standard locations search_paths = [ os.path.expanduser('~/.claude/agents'), os.path.join(os.path.dirname(__file__), '../../agents'), ] for base_path in search_paths: if not os.path.exists(base_path): continue # Search in all subdirectories for root, dirs, files in os.walk(base_path): for file in files: if file == f"{agent_name}.md": return os.path.join(root, file) return None def _record_performance(self, agent_name: str, duration: float, success: bool): """Record agent performance for future selection""" # Score based on duration and success # Faster + successful = higher score score = 1.0 if success else 0.0 if success and duration > 0: # Normalize duration (5 min = 1.0, faster = higher) duration_score = min(300 / duration, 1.5) score = min(duration_score, 1.0) if agent_name not in self.context.performance_scores: self.context.performance_scores[agent_name] = [] self.context.performance_scores[agent_name].append(score) # Keep only last 50 if len(self.context.performance_scores[agent_name]) > 50: self.context.performance_scores[agent_name] = self.context.performance_scores[agent_name][-50:] # Also update selector's cache self.selector.record_performance(agent_name, score) def get_agent_status(self) -> Dict: """Get current status of all agents""" agents = self.registry.get_all_agents() status = { 'total_agents': len(agents), 'active_agents': len(self.context.active_agents), 'agents_by_category': {}, 'recent_delegations': [], 'performance_summary': {} } # Group by category for agent_name, agent in agents.items(): cat = agent.category.value if cat not in status['agents_by_category']: status['agents_by_category'][cat] = [] status['agents_by_category'][cat].append({ 'name': agent_name, 'description': agent.description[:100] + '...' }) # Recent delegations status['recent_delegations'] = [ { 'task_id': d.task_id, 'agent': d.agent_name, 'status': d.status, 'duration': (d.completed_at or time.time()) - d.started_at if d.started_at else None } for d in self.context.delegation_history[-10:] ] # Performance summary for agent_name, scores in self.context.performance_scores.items(): if scores: status['performance_summary'][agent_name] = { 'avg_score': sum(scores) / len(scores), 'total_delegations': len(scores), 'last_score': scores[-1] } return status def suggest_multi_agent_workflow(self, task: str) -> List[Dict]: """ Suggest a multi-agent workflow for a complex task Args: task: Complex task description Returns: List of agent delegations in execution order """ # Analyze task for sub-components workflow = [] # Detect task type task_lower = task.lower() # Full feature development if any(kw in task_lower for kw in ['build', 'create', 'implement', 'develop']): workflow.extend([ {'phase': 'planning', 'agent': 'sprint-prioritizer', 'task': f'Plan: {task}'}, {'phase': 'design', 'agent': 'ui-designer', 'task': f'Design UI for: {task}'}, {'phase': 'implementation', 'agent': 'frontend-developer', 'task': f'Implement: {task}'}, {'phase': 'testing', 'agent': 'test-writer-fixer', 'task': f'Test: {task}'} ]) # App development elif any(kw in task_lower for kw in ['app', 'mobile', 'ios', 'android']): workflow.extend([ {'phase': 'planning', 'agent': 'rapid-prototyper', 'task': f'Prototype: {task}'}, {'phase': 'design', 'agent': 'ui-designer', 'task': f'Design app UI'}, {'phase': 'implementation', 'agent': 'mobile-app-builder', 'task': f'Build: {task}'}, {'phase': 'testing', 'agent': 'test-writer-fixer', 'task': f'Test app'}, {'phase': 'deployment', 'agent': 'app-store-optimizer', 'task': f'Optimize for app store'} ]) # Backend/API elif any(kw in task_lower for kw in ['api', 'backend', 'server', 'database']): workflow.extend([ {'phase': 'planning', 'agent': 'backend-architect', 'task': f'Design API: {task}'}, {'phase': 'implementation', 'agent': 'backend-architect', 'task': f'Implement: {task}'}, {'phase': 'testing', 'agent': 'api-tester', 'task': f'Test API'}, {'phase': 'deployment', 'agent': 'devops-automator', 'task': f'Deploy API'} ]) # AI/ML feature elif any(kw in task_lower for kw in ['ai', 'ml', 'machine learning', 'recommendation']): workflow.extend([ {'phase': 'planning', 'agent': 'ai-engineer', 'task': f'Plan AI feature: {task}'}, {'phase': 'implementation', 'agent': 'ai-engineer', 'task': f'Implement: {task}'}, {'phase': 'testing', 'agent': 'test-writer-fixer', 'task': f'Test AI feature'} ]) # Design task elif any(kw in task_lower for kw in ['design', 'ui', 'ux', 'mockup']): workflow.extend([ {'phase': 'design', 'agent': 'ux-researcher', 'task': f'Research users for: {task}'}, {'phase': 'design', 'agent': 'ui-designer', 'task': f'Create designs: {task}'}, {'phase': 'design', 'agent': 'whimsy-injector', 'task': f'Add delightful details'} ]) return workflow def handle_multi_agent_task(self, task: str) -> Dict: """ Handle a complex task with multiple agents Args: task: Complex task description Returns: Results from all agents """ workflow = self.suggest_multi_agent_workflow(task) results = { 'task': task, 'workflow': workflow, 'results': [], 'total_duration': 0, 'successful_phases': 0, 'failed_phases': 0 } for step in workflow: try: logger.info(f"Executing phase '{step['phase']}' with {step['agent']}") start_time = time.time() # Delegate to agent response = self.process_user_message(step['task']) duration = time.time() - start_time results['results'].append({ 'phase': step['phase'], 'agent': step['agent'], 'duration': duration, 'status': response.get('action', 'unknown'), 'success': response.get('action') in ['delegated', 'handle'] }) results['total_duration'] += duration if response.get('action') in ['delegated', 'handle']: results['successful_phases'] += 1 else: results['failed_phases'] += 1 except Exception as e: logger.error(f"Error in phase '{step['phase']}': {e}") results['results'].append({ 'phase': step['phase'], 'agent': step['agent'], 'error': str(e), 'success': False }) results['failed_phases'] += 1 return results # CLI interface for testing def main(): """Main entry point for CLI usage""" import argparse parser = argparse.ArgumentParser(description='Ralph Agent Integration') parser.add_argument('message', help='User message to process') parser.add_argument('--files', nargs='*', help='Files being modified') parser.add_argument('--multi-agent', action='store_true', help='Use multi-agent workflow') parser.add_argument('--status', action='store_true', help='Show agent status') args = parser.parse_args() # Initialize integration integration = RalphAgentIntegration() if args.status: # Show status status = integration.get_agent_status() print(json.dumps(status, indent=2)) elif args.multi_agent: # Multi-agent workflow results = integration.handle_multi_agent_task(args.message) print(json.dumps(results, indent=2)) else: # Single message response = integration.process_user_message(args.message, args.files) print(json.dumps(response, indent=2)) if __name__ == '__main__': main()