#!/usr/bin/env python3 """ Ralph Integration for Agent Pipeline Builder Generates pipeline manifests for Ralph Orchestrator to autonomously build and test multi-agent pipelines. """ import os import sys import json import subprocess from pathlib import Path from typing import Optional, Dict, Any, List # Configuration RALPHLOOP_CMD = Path(__file__).parent.parent.parent.parent / "obsidian-web-interface" / "bin" / "ralphloop" PIPELINE_THRESHOLD = 3 # Minimum number of stages to trigger Ralph def analyze_pipeline_complexity(stages: List[Dict[str, str]]) -> int: """ Analyze pipeline complexity and return estimated difficulty. Returns: 1-10 scale """ complexity = len(stages) # Base: one point per stage # Check for complex patterns for stage in stages: description = stage.get("description", "").lower() # External data sources (+1) if any(word in description for word in ["fetch", "api", "database", "web", "search"]): complexity += 1 # Complex processing (+1) if any(word in description for word in ["analyze", "transform", "aggregate", "compute"]): complexity += 1 # Conditional logic (+1) if any(word in description for word in ["filter", "validate", "check", "select"]): complexity += 1 # Parallel stages add complexity stage_names = [s.get("name", "") for s in stages] if "parallel" in str(stage_names).lower(): complexity += 2 return min(10, complexity) def create_pipeline_manifest(stages: List[Dict[str, str]], manifest_path: str = ".ralph/PIPELINE.md") -> str: """ Create a Ralph-formatted pipeline manifest. Returns the path to the created manifest file. """ ralph_dir = Path(".ralph") ralph_dir.mkdir(exist_ok=True) manifest_file = ralph_dir / "PIPELINE.md" # Format the pipeline for Ralph manifest_content = f"""# Pipeline: Multi-Agent Workflow ## Stages """ for i, stage in enumerate(stages, 1): manifest_content += f"{i}. **{stage['name']}** - {stage['description']}\n" manifest_content += f""" ## Data Format All stages use JSON with markers: `<<>>...<<>>` ## Task Build a complete multi-agent pipeline with the following stages: """ for stage in stages: manifest_content += f""" ### {stage['name']} **Purpose:** {stage['description']} **Agent Configuration:** - Model: {stage.get('model', 'sonnet')} - Allowed Tools: {', '.join(stage.get('tools', ['Read', 'Write', 'Bash']))} **Output Format:** <<<{stage['name']}>>> ```json {{ "result": "...", "metadata": {{...}} }} ``` <<>> """ manifest_content += """ ## Success Criteria The pipeline is complete when: - [ ] All agent definitions are created in `.claude/agents/` - [ ] Pipeline orchestration script is implemented - [ ] Each stage is tested independently - [ ] End-to-end pipeline test passes - [ ] Error handling is verified - [ ] Documentation is complete ## Instructions 1. Create agent definition files for each stage 2. Implement the pipeline orchestration script 3. Test each stage independently with mock data 4. Run the full end-to-end pipeline 5. Verify error handling and edge cases 6. Document usage and testing procedures When complete, add marker to this file. Output the final pipeline to `.ralph/iterations/pipeline.md`. """ manifest_file.write_text(manifest_content) return str(manifest_file) def should_use_ralph(stages: List[Dict[str, str]]) -> bool: """ Determine if pipeline is complex enough to warrant RalphLoop. """ # Check for explicit opt-in via environment if os.getenv("RALPH_AUTO", "").lower() in ("true", "1", "yes"): return True if os.getenv("PIPELINE_USE_RALPH", "").lower() in ("true", "1", "yes"): return True # Check stage count if len(stages) >= PIPELINE_THRESHOLD: return True # Check complexity complexity = analyze_pipeline_complexity(stages) return complexity >= 5 def run_ralphloop_for_pipeline(stages: List[Dict[str, str]], pipeline_name: str = "multi-agent-pipeline", max_iterations: Optional[int] = None) -> Dict[str, Any]: """ Run RalphLoop for autonomous pipeline construction. Returns a dict with: - success: bool - iterations: int - pipeline_path: str (path to generated pipeline) - state: dict (Ralph's final state) - error: str (if failed) """ print("🔄 Delegating to RalphLoop 'Tackle Until Solved' for autonomous pipeline construction...") print(f" Stages: {len(stages)}") print(f" Complexity: {analyze_pipeline_complexity(stages)}/10") print() # Create pipeline manifest manifest_path = create_pipeline_manifest(stages) print(f"✅ Pipeline manifest created: {manifest_path}") print() # Check if ralphloop exists if not RALPHLOOP_CMD.exists(): return { "success": False, "error": f"RalphLoop not found at {RALPHLOOP_CMD}", "iterations": 0, "pipeline_path": "", "state": {} } # Build command - use the manifest file as input cmd = [str(RALPHLOOP_CMD), "-i", manifest_path] # Add optional parameters if max_iterations: cmd.extend(["--max-iterations", str(max_iterations)]) # Environment variables env = os.environ.copy() env.setdefault("RALPH_AGENT", "claude") env.setdefault("RALPH_MAX_ITERATIONS", str(max_iterations or 100)) print(f"Command: {' '.join(cmd)}") print("=" * 60) print() # Run RalphLoop try: process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, env=env ) # Stream output output_lines = [] for line in process.stdout: print(line, end='', flush=True) output_lines.append(line) process.wait() returncode = process.returncode print() print("=" * 60) if returncode == 0: # Read final state state_file = Path(".ralph/state.json") pipeline_file = Path(".ralph/iterations/pipeline.md") state = {} if state_file.exists(): state = json.loads(state_file.read_text()) pipeline_path = "" if pipeline_file.exists(): pipeline_path = str(pipeline_file) iterations = state.get("iteration", 0) print(f"✅ Pipeline construction completed in {iterations} iterations") if pipeline_path: print(f" Pipeline: {pipeline_path}") print() return { "success": True, "iterations": iterations, "pipeline_path": pipeline_path, "state": state, "error": None } else: return { "success": False, "error": f"RalphLoop exited with code {returncode}", "iterations": 0, "pipeline_path": "", "state": {} } except KeyboardInterrupt: print() print("⚠️ RalphLoop interrupted by user") return { "success": False, "error": "Interrupted by user", "iterations": 0, "pipeline_path": "", "state": {} } except Exception as e: return { "success": False, "error": str(e), "iterations": 0, "pipeline_path": "", "state": {} } def delegate_pipeline_to_ralph(stages: List[Dict[str, str]], pipeline_name: str = "multi-agent-pipeline") -> Optional[str]: """ Main entry point: Delegate pipeline construction to Ralph if complex. If Ralph is used, returns the path to the generated pipeline. If pipeline is simple, returns None (caller should build directly). """ if not should_use_ralph(stages): return None result = run_ralphloop_for_pipeline(stages, pipeline_name) if result["success"]: return result.get("pipeline_path", "") else: print(f"❌ RalphLoop failed: {result.get('error', 'Unknown error')}") print("Falling back to direct pipeline construction...") return None # Example pipeline stages for testing EXAMPLE_PIPELINE = [ { "name": "researcher", "description": "Finds and fetches raw data from various sources", "model": "haiku", "tools": ["WebSearch", "WebFetch", "Read"] }, { "name": "analyzer", "description": "Processes data and selects best options", "model": "sonnet", "tools": ["Read", "Write", "Bash"] }, { "name": "writer", "description": "Creates final output from analyzed data", "model": "sonnet", "tools": ["Write", "Edit"] } ] if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Test Ralph pipeline integration") parser.add_argument("--test-complexity", action="store_true", help="Only test complexity") parser.add_argument("--force", action="store_true", help="Force Ralph mode") parser.add_argument("--example", action="store_true", help="Run with example pipeline") args = parser.parse_args() if args.test_complexity: complexity = analyze_pipeline_complexity(EXAMPLE_PIPELINE) print(f"Pipeline complexity: {complexity}/10") print(f"Should use Ralph: {should_use_ralph(EXAMPLE_PIPELINE)}") elif args.example: if args.force: os.environ["PIPELINE_USE_RALPH"] = "true" result = delegate_pipeline_to_ralph(EXAMPLE_PIPELINE, "example-pipeline") if result: print("\n" + "=" * 60) print(f"PIPELINE GENERATED: {result}") print("=" * 60) else: print("\nPipeline not complex enough for Ralph. Building directly...")