""" Every Code MCP Server Implements the MCP server for Every Code, exposing Auto Drive, planning, and code operations as MCP tools. """ import asyncio import json import os import sys from pathlib import Path from typing import Any, Optional from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent from pydantic import BaseModel, Field class EveryCodeCLI: """Interface to Every Code CLI.""" def __init__(self, repo_path: str): self.repo_path = Path(repo_path) self.code_command = self._find_code_command() def _find_code_command(self) -> str: """Find the code or coder command.""" # Check if code is in PATH for cmd in ["code", "coder"]: try: import shutil if shutil.which(cmd): return cmd except Exception: pass # Fallback to npx return "npx -y @just-every/code" async def run_command(self, args: list[str]) -> dict: """Run Every Code command and return result.""" import subprocess cmd = f"{self.code_command} {' '.join(args)}" try: result = subprocess.run( cmd, shell=True, cwd=self.repo_path, capture_output=True, text=True, timeout=300, # 5 minute timeout ) return { "success": result.returncode == 0, "stdout": result.stdout, "stderr": result.stderr, "returncode": result.returncode, } except subprocess.TimeoutExpired: return {"success": False, "error": "Command timed out"} except Exception as e: return {"success": False, "error": str(e)} # Global CLI instance _cli: Optional[EveryCodeCLI] = None def get_cli() -> EveryCodeCLI: """Get or create the CLI instance.""" global _cli if _cli is None: repo_path = os.environ.get('EVERYCODE_REPO_PATH', os.getcwd()) _cli = EveryCodeCLI(repo_path) return _cli # Tool input schemas class PlanInput(BaseModel): prompt: str = Field(description="The feature or task to plan") scope: Optional[str] = Field(None, description="Optional scope or constraint") class SolveInput(BaseModel): problem: str = Field(description="The problem to solve") context: Optional[str] = Field(None, description="Additional context") class AutoDriveInput(BaseModel): task: str = Field(description="The task to automate") mode: Optional[str] = Field("continuous", description="Execution mode: continuous, single, approval") class ReviewInput(BaseModel): files: Optional[list[str]] = Field(None, description="Specific files to review, or None for all changes") class BrowserInput(BaseModel): action: str = Field(description="Action: goto, click, type, screenshot, etc.") url: Optional[str] = Field(None, description="URL for goto action") selector: Optional[str] = Field(None, description="CSS selector") text: Optional[str] = Field(None, description="Text to type") class CreateFileInput(BaseModel): path: str = Field(description="Relative path where to create the file") content: str = Field(description="Content to write to the file") class EditFileInput(BaseModel): path: str = Field(description="Relative path to the file to edit") old_text: str = Field(description="The exact text to replace") new_text: str = Field(description="The replacement text") class SearchCodeInput(BaseModel): query: str = Field(description="Search query for code") file_pattern: Optional[str] = Field(None, description="Optional file pattern filter") # Create MCP server server = Server("everycode-mcp") @server.list_tools() async def list_tools() -> list[Tool]: """List all available MCP tools.""" return [ # Core Every Code commands Tool( name="everycode_plan", description="Generate an implementation plan using Every Code's planning capabilities.", inputSchema=PlanInput.model_json_schema(), ), Tool( name="everycode_solve", description="Solve complex problems by coordinating multiple agents and approaches.", inputSchema=SolveInput.model_json_schema(), ), Tool( name="everycode_auto_drive", description="Run automated multi-agent task execution with Auto Drive.", inputSchema=AutoDriveInput.model_json_schema(), ), Tool( name="everycode_review", description="Run code review with Auto Review (background quality checks).", inputSchema=ReviewInput.model_json_schema(), ), # Browser automation Tool( name="everycode_browser", description="Automate browser interactions (goto, click, type, screenshot).", inputSchema=BrowserInput.model_json_schema(), ), # File operations Tool( name="everycode_create_file", description="Create a new file with the given content.", inputSchema=CreateFileInput.model_json_schema(), ), Tool( name="everycode_edit_file", description="Edit a file by replacing exact text.", inputSchema=EditFileInput.model_json_schema(), ), Tool( name="everycode_search_code", description="Search code using Every Code's search capabilities.", inputSchema=SearchCodeInput.model_json_schema(), ), ] @server.call_tool() async def call_tool(name: str, arguments: Any) -> list[TextContent]: """Handle tool calls.""" cli = get_cli() repo_path = os.environ.get('EVERYCODE_REPO_PATH', os.getcwd()) try: if name == "everycode_plan": input_data = PlanInput(**arguments) # Use Every Code's /plan command prompt = input_data.prompt if input_data.scope: prompt += f" (scope: {input_data.scope})" result = await cli.run_command(["/plan", prompt]) return [TextContent( type="text", text=f"# Plan Generated\n\n{result.get('stdout', '')}" )] elif name == "everycode_solve": input_data = SolveInput(**arguments) # Use Every Code's /solve command task = input_data.problem if input_data.context: task += f"\n\nContext: {input_data.context}" result = await cli.run_command(["/solve", task]) return [TextContent( type="text", text=f"# Solution\n\n{result.get('stdout', '')}" )] elif name == "everycode_auto_drive": input_data = AutoDriveInput(**arguments) # Use Every Code's /auto command mode_flags = { "continuous": [], "single": ["--single"], "approval": ["--approval"], } args = ["/auto"] + mode_flags.get(input_data.mode, []) + [input_data.task] result = await cli.run_command(args) return [TextContent( type="text", text=f"# Auto Drive Results\n\n{result.get('stdout', '')}" )] elif name == "everycode_review": input_data = ReviewInput(**arguments) # Use Every Code's review feature if input_data.files: result = await cli.run_command(["--review"] + input_data.files) else: result = await cli.run_command(["--review"]) return [TextContent( type="text", text=f"# Code Review\n\n{result.get('stdout', '')}" )] elif name == "everycode_browser": input_data = BrowserInput(**arguments) # Browser automation would be done via Every Code's browser integration # For now, return a placeholder return [TextContent( type="text", text=f"# Browser Action: {input_data.action}\n\nBrowser automation requires Every Code's full integration. Action queued: {input_data.action}" )] elif name == "everycode_create_file": input_data = CreateFileInput(**arguments) file_path = Path(repo_path) / input_data.path file_path.parent.mkdir(parents=True, exist_ok=True) file_path.write_text(input_data.content) return [TextContent( type="text", text=json.dumps({"success": True, "path": input_data.path}, indent=2) )] elif name == "everycode_edit_file": input_data = EditFileInput(**arguments) file_path = Path(repo_path) / input_data.path if file_path.exists(): content = file_path.read_text() if input_data.old_text in content: new_content = content.replace(input_data.old_text, input_data.new_text) file_path.write_text(new_content) return [TextContent( type="text", text=json.dumps({"success": True, "path": input_data.path}, indent=2) )] else: return [TextContent( type="text", text=json.dumps({"error": "Old text not found in file"}, indent=2) )] else: return [TextContent( type="text", text=json.dumps({"error": "File not found"}, indent=2) )] elif name == "everycode_search_code": input_data = SearchCodeInput(**arguments) # Use Every Code's search or grep result = await cli.run_command(["search", input_data.query]) return [TextContent( type="text", text=f"# Search Results\n\n{result.get('stdout', '')}" )] else: return [TextContent( type="text", text=json.dumps({"error": f"Unknown tool: {name}"}, indent=2) )] except Exception as e: return [TextContent( type="text", text=json.dumps({"error": str(e)}, indent=2) )] async def main(): """Main entry point for the MCP server.""" # Parse command line arguments repo_path = None for i, arg in enumerate(sys.argv): if arg in ["--repo", "-r"] and i + 1 < len(sys.argv): repo_path = sys.argv[i + 1] break if repo_path: os.environ["EVERYCODE_REPO_PATH"] = repo_path async with stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, server.create_initialization_options(), ) if __name__ == "__main__": asyncio.run(main())