Files
SuperCharged-Claude-Code-Up…/mcp-servers/everycode-mcp/everycode_mcp/server.py
uroma 3b128ba3bd feat: Add unified agent integration with Prometheus, Every Code, and Dexto
This commit adds comprehensive integration of three major AI agent platforms:

## MCP Servers (3)
- Prometheus MCP: Knowledge graph code reasoning with AST analysis
- Every Code MCP: Fast terminal-based coding agent with Auto Drive
- Dexto MCP: Agent harness with orchestration and session management

## Claude Code Skills (6)
- /agent-plan: Generate implementation plans
- /agent-fix-bug: Fix bugs end-to-end
- /agent-solve: Solve complex problems
- /agent-review: Review code quality
- /agent-context: Get code context
- /agent-orchestrate: Orchestrate workflows

## Ralph Auto-Integration
- Pattern-based auto-trigger for all three platforms
- Intelligent backend selection
- Multi-platform coordination
- Configuration in ralph/ralph.yml

## Documentation
- Complete integration guides
- Ralph auto-integration documentation
- Setup scripts
- Usage examples

Co-Authored-By: Claude <noreply@anthropic.com>
2026-01-27 20:23:14 +00:00

334 lines
11 KiB
Python

"""
Every Code MCP Server
Implements the MCP server for Every Code, exposing Auto Drive,
planning, and code operations as MCP tools.
"""
import asyncio
import json
import os
import sys
from pathlib import Path
from typing import Any, Optional
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
from pydantic import BaseModel, Field
class EveryCodeCLI:
"""Interface to Every Code CLI."""
def __init__(self, repo_path: str):
self.repo_path = Path(repo_path)
self.code_command = self._find_code_command()
def _find_code_command(self) -> str:
"""Find the code or coder command."""
# Check if code is in PATH
for cmd in ["code", "coder"]:
try:
import shutil
if shutil.which(cmd):
return cmd
except Exception:
pass
# Fallback to npx
return "npx -y @just-every/code"
async def run_command(self, args: list[str]) -> dict:
"""Run Every Code command and return result."""
import subprocess
cmd = f"{self.code_command} {' '.join(args)}"
try:
result = subprocess.run(
cmd,
shell=True,
cwd=self.repo_path,
capture_output=True,
text=True,
timeout=300, # 5 minute timeout
)
return {
"success": result.returncode == 0,
"stdout": result.stdout,
"stderr": result.stderr,
"returncode": result.returncode,
}
except subprocess.TimeoutExpired:
return {"success": False, "error": "Command timed out"}
except Exception as e:
return {"success": False, "error": str(e)}
# Global CLI instance
_cli: Optional[EveryCodeCLI] = None
def get_cli() -> EveryCodeCLI:
"""Get or create the CLI instance."""
global _cli
if _cli is None:
repo_path = os.environ.get('EVERYCODE_REPO_PATH', os.getcwd())
_cli = EveryCodeCLI(repo_path)
return _cli
# Tool input schemas
class PlanInput(BaseModel):
prompt: str = Field(description="The feature or task to plan")
scope: Optional[str] = Field(None, description="Optional scope or constraint")
class SolveInput(BaseModel):
problem: str = Field(description="The problem to solve")
context: Optional[str] = Field(None, description="Additional context")
class AutoDriveInput(BaseModel):
task: str = Field(description="The task to automate")
mode: Optional[str] = Field("continuous", description="Execution mode: continuous, single, approval")
class ReviewInput(BaseModel):
files: Optional[list[str]] = Field(None, description="Specific files to review, or None for all changes")
class BrowserInput(BaseModel):
action: str = Field(description="Action: goto, click, type, screenshot, etc.")
url: Optional[str] = Field(None, description="URL for goto action")
selector: Optional[str] = Field(None, description="CSS selector")
text: Optional[str] = Field(None, description="Text to type")
class CreateFileInput(BaseModel):
path: str = Field(description="Relative path where to create the file")
content: str = Field(description="Content to write to the file")
class EditFileInput(BaseModel):
path: str = Field(description="Relative path to the file to edit")
old_text: str = Field(description="The exact text to replace")
new_text: str = Field(description="The replacement text")
class SearchCodeInput(BaseModel):
query: str = Field(description="Search query for code")
file_pattern: Optional[str] = Field(None, description="Optional file pattern filter")
# Create MCP server
server = Server("everycode-mcp")
@server.list_tools()
async def list_tools() -> list[Tool]:
"""List all available MCP tools."""
return [
# Core Every Code commands
Tool(
name="everycode_plan",
description="Generate an implementation plan using Every Code's planning capabilities.",
inputSchema=PlanInput.model_json_schema(),
),
Tool(
name="everycode_solve",
description="Solve complex problems by coordinating multiple agents and approaches.",
inputSchema=SolveInput.model_json_schema(),
),
Tool(
name="everycode_auto_drive",
description="Run automated multi-agent task execution with Auto Drive.",
inputSchema=AutoDriveInput.model_json_schema(),
),
Tool(
name="everycode_review",
description="Run code review with Auto Review (background quality checks).",
inputSchema=ReviewInput.model_json_schema(),
),
# Browser automation
Tool(
name="everycode_browser",
description="Automate browser interactions (goto, click, type, screenshot).",
inputSchema=BrowserInput.model_json_schema(),
),
# File operations
Tool(
name="everycode_create_file",
description="Create a new file with the given content.",
inputSchema=CreateFileInput.model_json_schema(),
),
Tool(
name="everycode_edit_file",
description="Edit a file by replacing exact text.",
inputSchema=EditFileInput.model_json_schema(),
),
Tool(
name="everycode_search_code",
description="Search code using Every Code's search capabilities.",
inputSchema=SearchCodeInput.model_json_schema(),
),
]
@server.call_tool()
async def call_tool(name: str, arguments: Any) -> list[TextContent]:
"""Handle tool calls."""
cli = get_cli()
repo_path = os.environ.get('EVERYCODE_REPO_PATH', os.getcwd())
try:
if name == "everycode_plan":
input_data = PlanInput(**arguments)
# Use Every Code's /plan command
prompt = input_data.prompt
if input_data.scope:
prompt += f" (scope: {input_data.scope})"
result = await cli.run_command(["/plan", prompt])
return [TextContent(
type="text",
text=f"# Plan Generated\n\n{result.get('stdout', '')}"
)]
elif name == "everycode_solve":
input_data = SolveInput(**arguments)
# Use Every Code's /solve command
task = input_data.problem
if input_data.context:
task += f"\n\nContext: {input_data.context}"
result = await cli.run_command(["/solve", task])
return [TextContent(
type="text",
text=f"# Solution\n\n{result.get('stdout', '')}"
)]
elif name == "everycode_auto_drive":
input_data = AutoDriveInput(**arguments)
# Use Every Code's /auto command
mode_flags = {
"continuous": [],
"single": ["--single"],
"approval": ["--approval"],
}
args = ["/auto"] + mode_flags.get(input_data.mode, []) + [input_data.task]
result = await cli.run_command(args)
return [TextContent(
type="text",
text=f"# Auto Drive Results\n\n{result.get('stdout', '')}"
)]
elif name == "everycode_review":
input_data = ReviewInput(**arguments)
# Use Every Code's review feature
if input_data.files:
result = await cli.run_command(["--review"] + input_data.files)
else:
result = await cli.run_command(["--review"])
return [TextContent(
type="text",
text=f"# Code Review\n\n{result.get('stdout', '')}"
)]
elif name == "everycode_browser":
input_data = BrowserInput(**arguments)
# Browser automation would be done via Every Code's browser integration
# For now, return a placeholder
return [TextContent(
type="text",
text=f"# Browser Action: {input_data.action}\n\nBrowser automation requires Every Code's full integration. Action queued: {input_data.action}"
)]
elif name == "everycode_create_file":
input_data = CreateFileInput(**arguments)
file_path = Path(repo_path) / input_data.path
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(input_data.content)
return [TextContent(
type="text",
text=json.dumps({"success": True, "path": input_data.path}, indent=2)
)]
elif name == "everycode_edit_file":
input_data = EditFileInput(**arguments)
file_path = Path(repo_path) / input_data.path
if file_path.exists():
content = file_path.read_text()
if input_data.old_text in content:
new_content = content.replace(input_data.old_text, input_data.new_text)
file_path.write_text(new_content)
return [TextContent(
type="text",
text=json.dumps({"success": True, "path": input_data.path}, indent=2)
)]
else:
return [TextContent(
type="text",
text=json.dumps({"error": "Old text not found in file"}, indent=2)
)]
else:
return [TextContent(
type="text",
text=json.dumps({"error": "File not found"}, indent=2)
)]
elif name == "everycode_search_code":
input_data = SearchCodeInput(**arguments)
# Use Every Code's search or grep
result = await cli.run_command(["search", input_data.query])
return [TextContent(
type="text",
text=f"# Search Results\n\n{result.get('stdout', '')}"
)]
else:
return [TextContent(
type="text",
text=json.dumps({"error": f"Unknown tool: {name}"}, indent=2)
)]
except Exception as e:
return [TextContent(
type="text",
text=json.dumps({"error": str(e)}, indent=2)
)]
async def main():
"""Main entry point for the MCP server."""
# Parse command line arguments
repo_path = None
for i, arg in enumerate(sys.argv):
if arg in ["--repo", "-r"] and i + 1 < len(sys.argv):
repo_path = sys.argv[i + 1]
break
if repo_path:
os.environ["EVERYCODE_REPO_PATH"] = repo_path
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options(),
)
if __name__ == "__main__":
asyncio.run(main())