#!/usr/bin/env python3 """ Agno Orchestrator - Multi-Agent Orchestration Engine Implements A2A (Agent-to-Agent) communication, parallel execution, and workflow composition """ import json import subprocess import sys from pathlib import Path from typing import Any, Dict, List, Optional, Callable from dataclasses import dataclass, field from enum import Enum import asyncio from datetime import datetime class ExecutionMode(Enum): SEQUENTIAL = "sequential" PARALLEL = "parallel" CONDITIONAL = "conditional" @dataclass class Agent: """Agent definition""" name: str type: str capabilities: List[str] triggers: List[str] command: str = "" priority: int = 0 @dataclass class WorkflowStep: """A single step in a workflow""" agent: str task: str depends_on: List[str] = field(default_factory=list) mode: ExecutionMode = ExecutionMode.SEQUENTIAL condition: Optional[str] = None @dataclass class WorkflowResult: """Result from a workflow execution""" agent: str task: str success: bool output: Any = None error: str = None duration: float = 0.0 timestamp: str = "" class AgentRegistry: """Registry of available agents""" def __init__(self): self.agents: Dict[str, Agent] = {} self._load_builtin_agents() def _load_builtin_agents(self): """Load built-in agents""" self.register(Agent( name="plan-executor", type="workflow", capabilities=["planning", "approval", "execution", "validation"], triggers=["complex_feature", "multi_file_change", "refactoring"], command="~/.claude/agents/plan-executor/execute.sh", priority=1 )) self.register(Agent( name="codebase-indexer", type="skill", capabilities=["semantic_search", "navigation", "indexing"], triggers=["find_file", "codebase_question", "search"], command="~/.claude/skills/codebase-indexer/search.sh", priority=1 )) self.register(Agent( name="mcp-client", type="integration", capabilities=["external_tools", "api_integration"], triggers=["web_search", "image_analysis", "fetch_content"], command="~/.claude/skills/mcp-client/mcp-client.py", priority=2 )) self.register(Agent( name="document-generator", type="generator", capabilities=["documentation", "reports", "markdown"], triggers=["create_docs", "generate_report", "document"], priority=3 )) self.register(Agent( name="self-learner", type="learning", capabilities=["pattern_detection", "knowledge_extraction", "improvement"], triggers=["task_complete", "learn", "improve"], priority=4 )) def register(self, agent: Agent): """Register a new agent""" self.agents[agent.name] = agent def find_agents(self, capability: str = None, trigger: str = None) -> List[Agent]: """Find agents by capability or trigger""" results = [] for agent in self.agents.values(): if capability and capability in agent.capabilities: results.append(agent) if trigger and any(t in trigger for t in agent.triggers): results.append(agent) return sorted(results, key=lambda a: a.priority) class CultureMemory: """Shared memory/culture across agents""" def __init__(self, path: str = None): self.path = path or Path.home() / ".claude" / "orchestrator-memory.json" self.memory = self._load() def _load(self) -> Dict[str, Any]: """Load memory from disk""" if self.path.exists(): try: with open(self.path) as f: return json.load(f) except: pass return { "patterns": [], "solutions": [], "agent_performance": {}, "workflows": [] } def save(self): """Save memory to disk""" self.path.parent.mkdir(parents=True, exist_ok=True) with open(self.path, 'w') as f: json.dump(self.memory, f, indent=2) def add_pattern(self, pattern: str, context: str): """Add a learned pattern""" self.memory["patterns"].append({ "pattern": pattern, "context": context, "timestamp": datetime.now().isoformat() }) def add_solution(self, problem: str, solution: str): """Add a learned solution""" self.memory["solutions"].append({ "problem": problem, "solution": solution, "timestamp": datetime.now().isoformat() }) def record_performance(self, agent: str, task: str, success: bool, duration: float): """Record agent performance""" if agent not in self.memory["agent_performance"]: self.memory["agent_performance"][agent] = { "tasks_completed": 0, "tasks_failed": 0, "total_duration": 0.0 } perf = self.memory["agent_performance"][agent] if success: perf["tasks_completed"] += 1 else: perf["tasks_failed"] += 1 perf["total_duration"] += duration class AgnoOrchestrator: """Main orchestrator for multi-agent workflows""" def __init__(self): self.registry = AgentRegistry() self.memory = CultureMemory() self.active_workflows: Dict[str, List[WorkflowStep]] = {} def plan_workflow(self, request: str) -> List[WorkflowStep]: """Plan a workflow based on a request""" steps = [] request_lower = request.lower() # Detect if complex planning needed if any(word in request_lower for word in ["implement", "create", "build", "add"]): steps.append(WorkflowStep( agent="plan-executor", task=f"Plan: {request}", mode=ExecutionMode.SEQUENTIAL )) # Detect if codebase search needed if any(word in request_lower for word in ["find", "search", "where", "locate"]): steps.append(WorkflowStep( agent="codebase-indexer", task=request, mode=ExecutionMode.SEQUENTIAL )) # Detect if external tools needed if any(word in request_lower for word in ["search web", "fetch", "api", "image"]): steps.append(WorkflowStep( agent="mcp-client", task=request, mode=ExecutionMode.PARALLEL )) # Always add self-learner at the end steps.append(WorkflowStep( agent="self-learner", task="Learn from this execution", mode=ExecutionMode.SEQUENTIAL, depends_on=[s.agent for s in steps[:-1]] )) return steps def execute_step(self, step: WorkflowStep) -> WorkflowResult: """Execute a single workflow step""" agent = self.registry.agents.get(step.agent) if not agent: return WorkflowResult( agent=step.agent, task=step.task, success=False, error=f"Agent {step.agent} not found" ) start_time = datetime.now() try: if agent.command and Path(agent.command).expanduser().exists(): result = subprocess.run( [str(Path(agent.command).expanduser()), step.task], capture_output=True, text=True, timeout=300 ) success = result.returncode == 0 output = result.stdout if success else result.stderr else: # Declarative agent - just record the task success = True output = f"Agent {agent.name} processed: {step.task}" duration = (datetime.now() - start_time).total_seconds() result = WorkflowResult( agent=step.agent, task=step.task, success=success, output=output, duration=duration, timestamp=datetime.now().isoformat() ) # Record in memory self.memory.record_performance(step.agent, step.task, success, duration) return result except Exception as e: return WorkflowResult( agent=step.agent, task=step.task, success=False, error=str(e), duration=(datetime.now() - start_time).total_seconds(), timestamp=datetime.now().isoformat() ) def execute_workflow(self, steps: List[WorkflowStep]) -> List[WorkflowResult]: """Execute a complete workflow""" results = [] completed = set() for step in steps: # Check dependencies if step.depends_on: if not all(dep in completed for dep in step.depends_on): continue result = self.execute_step(step) results.append(result) if result.success: completed.add(step.agent) # Save memory self.memory.save() return results def orchestrate(self, request: str) -> Dict[str, Any]: """Main orchestration entry point""" print(f"🤖 Agno Orchestrator") print(f"Request: {request}") print() # Plan workflow steps = self.plan_workflow(request) print(f"Planned {len(steps)} steps:") for i, step in enumerate(steps, 1): print(f" {i}. {step.agent}: {step.task[:50]}...") print() # Execute workflow results = self.execute_workflow(steps) # Summarize successful = sum(1 for r in results if r.success) print() print(f"Results: {successful}/{len(results)} steps completed") return { "request": request, "steps_planned": len(steps), "steps_completed": successful, "results": [r.__dict__ for r in results] } def main(): """Main entry point""" orchestrator = AgnoOrchestrator() if len(sys.argv) < 2: print("Agno Orchestrator - Multi-Agent Orchestration Engine") print("") print("Usage:") print(" orchestrator.py 'request' - Orchestrate agents for request") print(" orchestrator.py list-agents - List all registered agents") print(" orchestrator.py list-memory - Show culture memory") print("") return if sys.argv[1] == "list-agents": print("Registered Agents:") for name, agent in orchestrator.registry.agents.items(): print(f" • {name}") print(f" Type: {agent.type}") print(f" Capabilities: {', '.join(agent.capabilities)}") print() elif sys.argv[1] == "list-memory": print("Culture Memory:") print(json.dumps(orchestrator.memory.memory, indent=2)) else: request = " ".join(sys.argv[1:]) result = orchestrator.orchestrate(request) print() print(json.dumps(result, indent=2)) if __name__ == "__main__": main()