""" Prometheus MCP Server Implements the MCP server for Prometheus, exposing knowledge graph queries, file operations, and agent capabilities as MCP tools. """ import asyncio import json import os import sys from pathlib import Path from typing import Any, Optional from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent from pydantic import BaseModel, Field # Simple knowledge graph stub (would connect to real Prometheus/Neo4j) class SimpleKnowledgeGraph: """Simplified knowledge graph for MCP integration.""" def __init__(self, repo_path: str): self.repo_path = Path(repo_path) self._files = {} self._build_index() def _build_index(self): """Build a simple file index.""" if not self.repo_path.exists(): return for root, dirs, files in os.walk(self.repo_path): # Skip common directories to ignore dirs[:] = [d for d in dirs if d not in { '.git', '__pycache__', 'node_modules', '.next', 'venv', '.venv', 'dist', 'build' }] for file in files: file_path = Path(root) / file rel_path = file_path.relative_to(self.repo_path) self._files[str(rel_path)] = file_path def find_files_by_basename(self, basename: str) -> list[dict]: """Find files by basename.""" results = [] for rel_path, full_path in self._files.items(): if Path(rel_path).name == basename or Path(rel_path).name.startswith(f"{basename}."): results.append({ "relative_path": rel_path, "basename": Path(rel_path).name, }) return results[:5] # Limit results def search_code_by_text(self, text: str, file_pattern: Optional[str] = None) -> list[dict]: """Search code files for text.""" results = [] code_extensions = {'.py', '.js', '.ts', '.tsx', '.jsx', '.java', '.go', '.rs'} for rel_path, full_path in self._files.items(): if file_pattern and file_pattern not in rel_path: continue if Path(rel_path).suffix not in code_extensions: continue try: content = full_path.read_text() if text in content: # Find line numbers lines = content.split('\n') for i, line in enumerate(lines, 1): if text in line: results.append({ "relative_path": rel_path, "line_number": i, "text": line.strip(), }) break # Only show first match per file except Exception: pass return results[:10] def read_file_lines(self, rel_path: str, start_line: int = 1, end_line: Optional[int] = None) -> Optional[dict]: """Read specific lines from a file.""" if rel_path not in self._files: return None file_path = self._files[rel_path] try: content = file_path.read_text() lines = content.split('\n') if end_line is None: end_line = len(lines) + 1 selected_lines = lines[start_line - 1:end_line] return { "relative_path": rel_path, "start_line": start_line, "end_line": end_line, "content": '\n'.join(selected_lines), "total_lines": len(lines), } except Exception as e: return {"error": str(e)} def search_docs(self, text: str) -> list[dict]: """Search documentation files.""" results = [] doc_extensions = {'.md', '.txt', '.rst', '.adoc'} for rel_path, full_path in self._files.items(): if Path(rel_path).suffix not in doc_extensions: continue try: content = full_path.read_text() if text.lower() in content.lower(): # Find surrounding context lines = content.split('\n') for i, line in enumerate(lines, 1): if text.lower() in line.lower(): context_start = max(0, i - 2) context_end = min(len(lines), i + 3) results.append({ "relative_path": rel_path, "line_number": i, "context": '\n'.join(lines[context_start:context_end]), }) break except Exception: pass return results[:10] # Global knowledge graph instance _kg: Optional[SimpleKnowledgeGraph] = None def get_kg() -> SimpleKnowledgeGraph: """Get or create the knowledge graph instance.""" global _kg if _kg is None: repo_path = os.environ.get('PROMETHEUS_REPO_PATH', os.getcwd()) _kg = SimpleKnowledgeGraph(repo_path) return _kg # Tool input schemas class FindFileInput(BaseModel): basename: str = Field(description="The basename of the file to find (e.g., 'main.py', 'index.js')") class SearchCodeInput(BaseModel): text: str = Field(description="The text to search for in code files") file_pattern: Optional[str] = Field(None, description="Optional file pattern to filter (e.g., 'src/', '.py')") class ReadFileInput(BaseModel): path: str = Field(description="Relative path to the file from repo root") start_line: Optional[int] = Field(1, description="Starting line number (1-indexed)") end_line: Optional[int] = Field(None, description="Ending line number (exclusive)") class SearchDocsInput(BaseModel): text: str = Field(description="The text to search for in documentation") class CreateFileInput(BaseModel): path: str = Field(description="Relative path where to create the file") content: str = Field(description="Content to write to the file") class EditFileInput(BaseModel): path: str = Field(description="Relative path to the file to edit") old_text: str = Field(description="The exact text to replace") new_text: str = Field(description="The replacement text") class ClassifyIssueInput(BaseModel): issue_title: str = Field(description="The title of the GitHub issue") issue_body: str = Field(description="The body/description of the issue") class AnswerQuestionInput(BaseModel): question: str = Field(description="The question to answer about the codebase") context: Optional[str] = Field(None, description="Optional additional context") # Create MCP server server = Server("prometheus-mcp") @server.list_tools() async def list_tools() -> list[Tool]: """List all available MCP tools.""" return [ # Knowledge Graph Tools Tool( name="prometheus_find_file", description="Find files in the codebase by basename. Returns matching files with their relative paths.", inputSchema=FindFileInput.model_json_schema(), ), Tool( name="prometheus_search_code", description="Search for text in code files. Returns matching lines with file paths and line numbers.", inputSchema=SearchCodeInput.model_json_schema(), ), Tool( name="prometheus_read_file", description="Read a file with optional line number range. Returns file content with line numbers.", inputSchema=ReadFileInput.model_json_schema(), ), Tool( name="prometheus_search_docs", description="Search documentation files for text. Returns matching sections with context.", inputSchema=SearchDocsInput.model_json_schema(), ), # File Operation Tools Tool( name="prometheus_create_file", description="Create a new file with the given content at the specified path.", inputSchema=CreateFileInput.model_json_schema(), ), Tool( name="prometheus_edit_file", description="Edit a file by replacing exact text. Useful for making precise edits.", inputSchema=EditFileInput.model_json_schema(), ), # Agent Tools Tool( name="prometheus_classify_issue", description="Classify a GitHub issue into categories: bug, feature, question, or documentation.", inputSchema=ClassifyIssueInput.model_json_schema(), ), Tool( name="prometheus_answer_question", description="Answer questions about the codebase using semantic search and context retrieval.", inputSchema=AnswerQuestionInput.model_json_schema(), ), ] @server.call_tool() async def call_tool(name: str, arguments: Any) -> list[TextContent]: """Handle tool calls.""" kg = get_kg() try: if name == "prometheus_find_file": input_data = FindFileInput(**arguments) results = kg.find_files_by_basename(input_data.basename) return [TextContent( type="text", text=json.dumps({"files": results}, indent=2) )] elif name == "prometheus_search_code": input_data = SearchCodeInput(**arguments) results = kg.search_code_by_text(input_data.text, input_data.file_pattern) return [TextContent( type="text", text=json.dumps({"matches": results}, indent=2) )] elif name == "prometheus_read_file": input_data = ReadFileInput(**arguments) result = kg.read_file_lines(input_data.path, input_data.start_line, input_data.end_line) return [TextContent( type="text", text=json.dumps(result, indent=2) )] elif name == "prometheus_search_docs": input_data = SearchDocsInput(**arguments) results = kg.search_docs(input_data.text) return [TextContent( type="text", text=json.dumps({"matches": results}, indent=2) )] elif name == "prometheus_create_file": input_data = CreateFileInput(**arguments) repo_path = os.environ.get('PROMETHEUS_REPO_PATH', os.getcwd()) file_path = Path(repo_path) / input_data.path file_path.parent.mkdir(parents=True, exist_ok=True) file_path.write_text(input_data.content) return [TextContent( type="text", text=json.dumps({"success": True, "path": input_data.path}, indent=2) )] elif name == "prometheus_edit_file": input_data = EditFileInput(**arguments) repo_path = os.environ.get('PROMETHEUS_REPO_PATH', os.getcwd()) file_path = Path(repo_path) / input_data.path if file_path.exists(): content = file_path.read_text() if input_data.old_text in content: new_content = content.replace(input_data.old_text, input_data.new_text) file_path.write_text(new_content) return [TextContent( type="text", text=json.dumps({"success": True, "path": input_data.path}, indent=2) )] else: return [TextContent( type="text", text=json.dumps({"error": "Old text not found in file"}, indent=2) )] else: return [TextContent( type="text", text=json.dumps({"error": "File not found"}, indent=2) )] elif name == "prometheus_classify_issue": input_data = ClassifyIssueInput(**arguments) # Simple classification heuristic text = (input_data.issue_title + " " + input_data.issue_body).lower() category = "question" if any(word in text for word in ["bug", "fix", "error", "issue", "broken", "crash"]): category = "bug" elif any(word in text for word in ["feature", "add", "implement", "enhancement", "request"]): category = "feature" elif any(word in text for word in ["doc", "documentation", "readme", "guide"]): category = "documentation" return [TextContent( type="text", text=json.dumps({ "category": category, "confidence": "medium", "reasoning": f"Classified based on keyword analysis" }, indent=2) )] elif name == "prometheus_answer_question": input_data = AnswerQuestionInput(**arguments) # Search for relevant context code_results = kg.search_code_by_text(input_data.question[:50]) doc_results = kg.search_docs(input_data.question[:50]) answer = f"Based on the codebase search for '{input_data.question}':\n\n" if doc_results: answer += "**Relevant Documentation:**\n" for match in doc_results[:3]: answer += f"- {match['relative_path']}:{match['line_number']}\n" if code_results: answer += "\n**Relevant Code:**\n" for match in code_results[:5]: answer += f"- {match['relative_path']}:{match['line_number']}\n" return [TextContent( type="text", text=answer )] else: return [TextContent( type="text", text=json.dumps({"error": f"Unknown tool: {name}"}, indent=2) )] except Exception as e: return [TextContent( type="text", text=json.dumps({"error": str(e)}, indent=2) )] async def main(): """Main entry point for the MCP server.""" # Parse command line arguments repo_path = None for i, arg in enumerate(sys.argv): if arg in ["--repo", "-r"] and i + 1 < len(sys.argv): repo_path = sys.argv[i + 1] break if repo_path: os.environ["PROMETHEUS_REPO_PATH"] = repo_path async with stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, server.create_initialization_options(), ) if __name__ == "__main__": asyncio.run(main())