Add 260+ Claude Code skills from skills.sh
Complete collection of AI agent skills including: - Frontend Development (Vue, React, Next.js, Three.js) - Backend Development (NestJS, FastAPI, Node.js) - Mobile Development (React Native, Expo) - Testing (E2E, frontend, webapp) - DevOps (GitHub Actions, CI/CD) - Marketing (SEO, copywriting, analytics) - Security (binary analysis, vulnerability scanning) - And many more... Synchronized from: https://skills.sh/ Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
350
agent-pipeline-builder/ralph-pipeline.py
Normal file
350
agent-pipeline-builder/ralph-pipeline.py
Normal file
@@ -0,0 +1,350 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Ralph Integration for Agent Pipeline Builder
|
||||
|
||||
Generates pipeline manifests for Ralph Orchestrator to autonomously build and test multi-agent pipelines.
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import Optional, Dict, Any, List
|
||||
|
||||
# Configuration
|
||||
RALPHLOOP_CMD = Path(__file__).parent.parent.parent.parent / "obsidian-web-interface" / "bin" / "ralphloop"
|
||||
PIPELINE_THRESHOLD = 3 # Minimum number of stages to trigger Ralph
|
||||
|
||||
|
||||
def analyze_pipeline_complexity(stages: List[Dict[str, str]]) -> int:
|
||||
"""
|
||||
Analyze pipeline complexity and return estimated difficulty.
|
||||
|
||||
Returns: 1-10 scale
|
||||
"""
|
||||
complexity = len(stages) # Base: one point per stage
|
||||
|
||||
# Check for complex patterns
|
||||
for stage in stages:
|
||||
description = stage.get("description", "").lower()
|
||||
|
||||
# External data sources (+1)
|
||||
if any(word in description for word in ["fetch", "api", "database", "web", "search"]):
|
||||
complexity += 1
|
||||
|
||||
# Complex processing (+1)
|
||||
if any(word in description for word in ["analyze", "transform", "aggregate", "compute"]):
|
||||
complexity += 1
|
||||
|
||||
# Conditional logic (+1)
|
||||
if any(word in description for word in ["filter", "validate", "check", "select"]):
|
||||
complexity += 1
|
||||
|
||||
# Parallel stages add complexity
|
||||
stage_names = [s.get("name", "") for s in stages]
|
||||
if "parallel" in str(stage_names).lower():
|
||||
complexity += 2
|
||||
|
||||
return min(10, complexity)
|
||||
|
||||
|
||||
def create_pipeline_manifest(stages: List[Dict[str, str]], manifest_path: str = ".ralph/PIPELINE.md") -> str:
|
||||
"""
|
||||
Create a Ralph-formatted pipeline manifest.
|
||||
|
||||
Returns the path to the created manifest file.
|
||||
"""
|
||||
ralph_dir = Path(".ralph")
|
||||
ralph_dir.mkdir(exist_ok=True)
|
||||
|
||||
manifest_file = ralph_dir / "PIPELINE.md"
|
||||
|
||||
# Format the pipeline for Ralph
|
||||
manifest_content = f"""# Pipeline: Multi-Agent Workflow
|
||||
|
||||
## Stages
|
||||
|
||||
"""
|
||||
for i, stage in enumerate(stages, 1):
|
||||
manifest_content += f"{i}. **{stage['name']}** - {stage['description']}\n"
|
||||
|
||||
manifest_content += f"""
|
||||
## Data Format
|
||||
|
||||
All stages use JSON with markers: `<<<stage-name>>>...<<<end-stage-name>>>`
|
||||
|
||||
## Task
|
||||
|
||||
Build a complete multi-agent pipeline with the following stages:
|
||||
|
||||
"""
|
||||
for stage in stages:
|
||||
manifest_content += f"""
|
||||
### {stage['name']}
|
||||
|
||||
**Purpose:** {stage['description']}
|
||||
|
||||
**Agent Configuration:**
|
||||
- Model: {stage.get('model', 'sonnet')}
|
||||
- Allowed Tools: {', '.join(stage.get('tools', ['Read', 'Write', 'Bash']))}
|
||||
|
||||
**Output Format:**
|
||||
<<<{stage['name']}>>>
|
||||
```json
|
||||
{{
|
||||
"result": "...",
|
||||
"metadata": {{...}}
|
||||
}}
|
||||
```
|
||||
<<<end-{stage['name']}>>>
|
||||
|
||||
"""
|
||||
|
||||
manifest_content += """
|
||||
## Success Criteria
|
||||
|
||||
The pipeline is complete when:
|
||||
- [ ] All agent definitions are created in `.claude/agents/`
|
||||
- [ ] Pipeline orchestration script is implemented
|
||||
- [ ] Each stage is tested independently
|
||||
- [ ] End-to-end pipeline test passes
|
||||
- [ ] Error handling is verified
|
||||
- [ ] Documentation is complete
|
||||
|
||||
## Instructions
|
||||
|
||||
1. Create agent definition files for each stage
|
||||
2. Implement the pipeline orchestration script
|
||||
3. Test each stage independently with mock data
|
||||
4. Run the full end-to-end pipeline
|
||||
5. Verify error handling and edge cases
|
||||
6. Document usage and testing procedures
|
||||
|
||||
When complete, add <!-- COMPLETE --> marker to this file.
|
||||
Output the final pipeline to `.ralph/iterations/pipeline.md`.
|
||||
"""
|
||||
|
||||
manifest_file.write_text(manifest_content)
|
||||
|
||||
return str(manifest_file)
|
||||
|
||||
|
||||
def should_use_ralph(stages: List[Dict[str, str]]) -> bool:
|
||||
"""
|
||||
Determine if pipeline is complex enough to warrant RalphLoop.
|
||||
"""
|
||||
# Check for explicit opt-in via environment
|
||||
if os.getenv("RALPH_AUTO", "").lower() in ("true", "1", "yes"):
|
||||
return True
|
||||
|
||||
if os.getenv("PIPELINE_USE_RALPH", "").lower() in ("true", "1", "yes"):
|
||||
return True
|
||||
|
||||
# Check stage count
|
||||
if len(stages) >= PIPELINE_THRESHOLD:
|
||||
return True
|
||||
|
||||
# Check complexity
|
||||
complexity = analyze_pipeline_complexity(stages)
|
||||
return complexity >= 5
|
||||
|
||||
|
||||
def run_ralphloop_for_pipeline(stages: List[Dict[str, str]],
|
||||
pipeline_name: str = "multi-agent-pipeline",
|
||||
max_iterations: Optional[int] = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Run RalphLoop for autonomous pipeline construction.
|
||||
|
||||
Returns a dict with:
|
||||
- success: bool
|
||||
- iterations: int
|
||||
- pipeline_path: str (path to generated pipeline)
|
||||
- state: dict (Ralph's final state)
|
||||
- error: str (if failed)
|
||||
"""
|
||||
print("🔄 Delegating to RalphLoop 'Tackle Until Solved' for autonomous pipeline construction...")
|
||||
print(f" Stages: {len(stages)}")
|
||||
print(f" Complexity: {analyze_pipeline_complexity(stages)}/10")
|
||||
print()
|
||||
|
||||
# Create pipeline manifest
|
||||
manifest_path = create_pipeline_manifest(stages)
|
||||
print(f"✅ Pipeline manifest created: {manifest_path}")
|
||||
print()
|
||||
|
||||
# Check if ralphloop exists
|
||||
if not RALPHLOOP_CMD.exists():
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"RalphLoop not found at {RALPHLOOP_CMD}",
|
||||
"iterations": 0,
|
||||
"pipeline_path": "",
|
||||
"state": {}
|
||||
}
|
||||
|
||||
# Build command - use the manifest file as input
|
||||
cmd = [str(RALPHLOOP_CMD), "-i", manifest_path]
|
||||
|
||||
# Add optional parameters
|
||||
if max_iterations:
|
||||
cmd.extend(["--max-iterations", str(max_iterations)])
|
||||
|
||||
# Environment variables
|
||||
env = os.environ.copy()
|
||||
env.setdefault("RALPH_AGENT", "claude")
|
||||
env.setdefault("RALPH_MAX_ITERATIONS", str(max_iterations or 100))
|
||||
|
||||
print(f"Command: {' '.join(cmd)}")
|
||||
print("=" * 60)
|
||||
print()
|
||||
|
||||
# Run RalphLoop
|
||||
try:
|
||||
process = subprocess.Popen(
|
||||
cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
text=True,
|
||||
bufsize=1,
|
||||
env=env
|
||||
)
|
||||
|
||||
# Stream output
|
||||
output_lines = []
|
||||
for line in process.stdout:
|
||||
print(line, end='', flush=True)
|
||||
output_lines.append(line)
|
||||
|
||||
process.wait()
|
||||
returncode = process.returncode
|
||||
|
||||
print()
|
||||
print("=" * 60)
|
||||
|
||||
if returncode == 0:
|
||||
# Read final state
|
||||
state_file = Path(".ralph/state.json")
|
||||
pipeline_file = Path(".ralph/iterations/pipeline.md")
|
||||
|
||||
state = {}
|
||||
if state_file.exists():
|
||||
state = json.loads(state_file.read_text())
|
||||
|
||||
pipeline_path = ""
|
||||
if pipeline_file.exists():
|
||||
pipeline_path = str(pipeline_file)
|
||||
|
||||
iterations = state.get("iteration", 0)
|
||||
|
||||
print(f"✅ Pipeline construction completed in {iterations} iterations")
|
||||
if pipeline_path:
|
||||
print(f" Pipeline: {pipeline_path}")
|
||||
print()
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"iterations": iterations,
|
||||
"pipeline_path": pipeline_path,
|
||||
"state": state,
|
||||
"error": None
|
||||
}
|
||||
else:
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"RalphLoop exited with code {returncode}",
|
||||
"iterations": 0,
|
||||
"pipeline_path": "",
|
||||
"state": {}
|
||||
}
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print()
|
||||
print("⚠️ RalphLoop interrupted by user")
|
||||
return {
|
||||
"success": False,
|
||||
"error": "Interrupted by user",
|
||||
"iterations": 0,
|
||||
"pipeline_path": "",
|
||||
"state": {}
|
||||
}
|
||||
except Exception as e:
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e),
|
||||
"iterations": 0,
|
||||
"pipeline_path": "",
|
||||
"state": {}
|
||||
}
|
||||
|
||||
|
||||
def delegate_pipeline_to_ralph(stages: List[Dict[str, str]],
|
||||
pipeline_name: str = "multi-agent-pipeline") -> Optional[str]:
|
||||
"""
|
||||
Main entry point: Delegate pipeline construction to Ralph if complex.
|
||||
|
||||
If Ralph is used, returns the path to the generated pipeline.
|
||||
If pipeline is simple, returns None (caller should build directly).
|
||||
"""
|
||||
if not should_use_ralph(stages):
|
||||
return None
|
||||
|
||||
result = run_ralphloop_for_pipeline(stages, pipeline_name)
|
||||
|
||||
if result["success"]:
|
||||
return result.get("pipeline_path", "")
|
||||
else:
|
||||
print(f"❌ RalphLoop failed: {result.get('error', 'Unknown error')}")
|
||||
print("Falling back to direct pipeline construction...")
|
||||
return None
|
||||
|
||||
|
||||
# Example pipeline stages for testing
|
||||
EXAMPLE_PIPELINE = [
|
||||
{
|
||||
"name": "researcher",
|
||||
"description": "Finds and fetches raw data from various sources",
|
||||
"model": "haiku",
|
||||
"tools": ["WebSearch", "WebFetch", "Read"]
|
||||
},
|
||||
{
|
||||
"name": "analyzer",
|
||||
"description": "Processes data and selects best options",
|
||||
"model": "sonnet",
|
||||
"tools": ["Read", "Write", "Bash"]
|
||||
},
|
||||
{
|
||||
"name": "writer",
|
||||
"description": "Creates final output from analyzed data",
|
||||
"model": "sonnet",
|
||||
"tools": ["Write", "Edit"]
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description="Test Ralph pipeline integration")
|
||||
parser.add_argument("--test-complexity", action="store_true", help="Only test complexity")
|
||||
parser.add_argument("--force", action="store_true", help="Force Ralph mode")
|
||||
parser.add_argument("--example", action="store_true", help="Run with example pipeline")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.test_complexity:
|
||||
complexity = analyze_pipeline_complexity(EXAMPLE_PIPELINE)
|
||||
print(f"Pipeline complexity: {complexity}/10")
|
||||
print(f"Should use Ralph: {should_use_ralph(EXAMPLE_PIPELINE)}")
|
||||
elif args.example:
|
||||
if args.force:
|
||||
os.environ["PIPELINE_USE_RALPH"] = "true"
|
||||
|
||||
result = delegate_pipeline_to_ralph(EXAMPLE_PIPELINE, "example-pipeline")
|
||||
|
||||
if result:
|
||||
print("\n" + "=" * 60)
|
||||
print(f"PIPELINE GENERATED: {result}")
|
||||
print("=" * 60)
|
||||
else:
|
||||
print("\nPipeline not complex enough for Ralph. Building directly...")
|
||||
Reference in New Issue
Block a user