#!/usr/bin/env python3 """ Ralph Command Entry Point Main entry point for the /ralph command in Claude Code. This script is invoked when users run /ralph in the CLI. """ import os import sys import json import argparse from pathlib import Path # Add current directory to path sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from ralph_agent_integration import RalphAgentIntegration from meta_agent_orchestrator import MetaAgent def main(): """Main entry point for /ralph command""" parser = argparse.ArgumentParser( description='RalphLoop - Autonomous agent iteration and orchestration', prog='ralph' ) parser.add_argument( 'task', nargs='*', help='Task description or requirements' ) parser.add_argument( '--mode', choices=['single', 'multi', 'auto'], default='auto', help='Execution mode: single agent, multi-agent, or auto-detect' ) parser.add_argument( '--workers', type=int, help='Number of worker agents (for multi-agent mode)' ) parser.add_argument( '--delegate', action='store_true', help='Enable automatic agent delegation' ) parser.add_argument( '--no-delegate', action='store_true', help='Disable automatic agent delegation' ) parser.add_argument( '--proactive', action='store_true', help='Enable proactive agents' ) parser.add_argument( '--status', action='store_true', help='Show Ralph status and exit' ) parser.add_argument( '--list-agents', action='store_true', help='List all available agents and exit' ) parser.add_argument( '--observability', action='store_true', help='Enable observability dashboard' ) args = parser.parse_args() # Check environment variables for defaults multi_agent = os.getenv('RALPH_MULTI_AGENT', '').lower() == 'true' auto_delegate = os.getenv('RALPH_AUTO_DELEGATE', '').lower() == 'true' proactive_agents = os.getenv('RALPH_PROACTIVE_AGENTS', '').lower() == 'true' observability = os.getenv('RALPH_OBSERVABILITY_ENABLED', '').lower() == 'true' # Override with command line flags if args.delegate: auto_delegate = True elif args.no_delegate: auto_delegate = False if args.proactive: proactive_agents = True if args.observability: observability = True # Initialize Ralph integration integration = RalphAgentIntegration() # Handle special commands if args.status: status = integration.get_agent_status() print(json.dumps(status, indent=2)) return 0 if args.list_agents: agents = integration.registry.get_all_agents() print(f"\n=== Ralph Agents ({len(agents)} total) ===\n") by_category = {} for name, agent in agents.items(): cat = agent.category.value if cat not in by_category: by_category[cat] = [] by_category[cat].append((name, agent)) for category, agent_list in sorted(by_category.items()): print(f"\n{category.upper()}:") for name, agent in agent_list: print(f" - {name}: {agent.description[:80]}...") return 0 # Get task from arguments or stdin if args.task: task = ' '.join(args.task) else: # Read from stdin if no task provided print("Enter your task (press Ctrl+D when done):") task = sys.stdin.read().strip() if not task: parser.print_help() return 1 # Determine execution mode mode = args.mode if mode == 'auto': # Auto-detect based on task complexity complexity = integration.analyzer.estimate_complexity(task, []) if complexity >= 7.0 or multi_agent: mode = 'multi' else: mode = 'single' print(f"\n{'='*60}") print(f"RalphLoop: 'Tackle Until Solved'") print(f"{'='*60}") print(f"\nTask: {task[:100]}") print(f"Mode: {mode}") print(f"Auto-Delegate: {auto_delegate}") print(f"Proactive Agents: {proactive_agents}") print(f"\n{'='*60}\n") # Execute task try: if mode == 'multi': # Multi-agent orchestration print("šŸš€ Starting multi-agent orchestration...") orchestrator = MetaAgent() tasks = orchestrator.analyze_project(task) orchestrator.distribute_tasks(tasks) orchestrator.spawn_worker_agents(args.workers or int(os.getenv('RALPH_MAX_WORKERS', 12))) orchestrator.monitor_tasks() report = orchestrator.generate_report() print("\n=== EXECUTION REPORT ===") print(json.dumps(report, indent=2)) else: # Single agent with optional delegation if auto_delegate: print("šŸ” Analyzing task for agent delegation...\n") response = integration.process_user_message(task) print(f"\nAction: {response['action'].upper()}") if 'agent' in response: agent_info = response['agent'] print(f"Agent: {agent_info['name']}") print(f"Confidence: {agent_info.get('confidence', 0):.2%}") if agent_info.get('reasons'): print(f"Reasons:") for reason in agent_info['reasons']: print(f" - {reason}") # Handle multi-agent workflow if appropriate workflow = integration.suggest_multi_agent_workflow(task) if len(workflow) > 1: print(f"\nšŸ“‹ Suggested Multi-Agent Workflow ({len(workflow)} phases):") for i, step in enumerate(workflow, 1): print(f" {i}. [{step['phase']}] {step['agent']}: {step['task']}") # Ask if user wants to proceed print("\nWould you like to execute this workflow? (Requires multi-agent mode)") else: # Direct handling without delegation print("šŸŽÆ Processing task directly (no delegation)\n") print("Task would be processed by Claude directly.") print(f"\n{'='*60}") print(f"āœ… Ralph execution complete") print(f"{'='*60}\n") return 0 except KeyboardInterrupt: print("\n\nāš ļø Ralph interrupted by user") return 130 except Exception as e: print(f"\n\nāŒ Error: {e}") import traceback traceback.print_exc() return 1 if __name__ == '__main__': sys.exit(main())