Source code for src.core.code_analyzer

"""
Code Analyzer for GitHub Integration.

This module analyzes code changes, PRs, and repository state to provide
context-aware information to workers about implemented features.
"""

import re
import sys
from typing import Any, Callable, Dict, List, Optional

from src.core.models import Task, WorkerStatus


[docs] class CodeAnalyzer: """ Analyzes code changes and provides insights for task coordination. This class provides functionality to analyze GitHub repositories, PRs, and code changes to help coordinate tasks between workers by understanding what features have been implemented. Attributes ---------- mcp_caller : Optional[callable] Function to call GitHub MCP tools for API interactions endpoint_patterns : List[str] Regular expression patterns for detecting API endpoints Examples -------- >>> analyzer = CodeAnalyzer(mcp_caller=github_client) >>> details = await analyzer.get_implementation_details( ... "owner", "repo", "endpoints" ... ) """
[docs] def __init__(self, mcp_caller: Optional[Callable[..., Any]] = None) -> None: """ Initialize the code analyzer. Parameters ---------- mcp_caller : Optional[callable], default=None Function to call GitHub MCP tools. Should accept tool name and parameters dict. """ self.mcp_caller = mcp_caller self.endpoint_patterns = [ # FastAPI/Flask style r'@app\.(get|post|put|delete|patch)\(["\']([^"\']+)["\']\)', r'@router\.(get|post|put|delete|patch)\(["\']([^"\']+)["\']\)', # Express.js style r'app\.(get|post|put|delete|patch)\(["\']([^"\']+)["\']\)', r'router\.(get|post|put|delete|patch)\(["\']([^"\']+)["\']\)', # Django style r'path\(["\']([^"\']+)["\']\s*,', # Spring Boot style r'@(Get|Post|Put|Delete|Patch)Mapping\(["\']([^"\']+)["\']\)', ]
[docs] async def analyze_task_completion( self, task: Task, worker: WorkerStatus, owner: str, repo: str ) -> Dict[str, Any]: """ Analyze what was accomplished when a task is completed. Examines commits, PRs, and code changes to understand what was implemented and generate recommendations for subsequent workers. Parameters ---------- task : Task The completed task to analyze worker : WorkerStatus The worker who completed the task owner : str GitHub repository owner username or organization repo : str GitHub repository name Returns ------- Dict[str, Any] Analysis containing: - task_id: ID of the analyzed task - task_name: Name of the task - worker_id: ID of the worker - findings: Dict with commits, PRs, and implementations - recommendations: List of recommendations for next workers Examples -------- >>> analysis = await analyzer.analyze_task_completion( ... task, worker, "myorg", "myrepo" ... ) >>> print(analysis["recommendations"]) """ analysis: Dict[str, Any] = { "task_id": task.id, "task_name": task.name, "worker_id": worker.worker_id, "findings": {}, "recommendations": [], } # Check for recent commits by the worker commits = await self._get_recent_commits(owner, repo, worker.worker_id) if commits: analysis["findings"]["commits"] = commits # Check for PRs prs = await self._get_worker_prs(owner, repo, worker.worker_id) if prs: analysis["findings"]["pull_requests"] = prs # Analyze PR changes for specific implementations for pr in prs: pr_analysis = await self._analyze_pr_changes(owner, repo, pr["number"]) if pr_analysis: analysis["findings"]["implementations"] = pr_analysis # Generate recommendations based on findings analysis["recommendations"] = self._generate_recommendations( task, analysis["findings"] ) return analysis
[docs] async def get_implementation_details( self, owner: str, repo: str, feature_type: str ) -> Dict[str, Any]: """ Get details about existing implementations in the repository. Searches for and analyzes specific types of implementations to help workers understand the current codebase state. Parameters ---------- owner : str GitHub repository owner repo : str GitHub repository name feature_type : str Type of feature to search for. Supported values: - "endpoints": API endpoints - "models": Data models/schemas - "schemas": Database schemas Returns ------- Dict[str, Any] Dictionary containing: - feature_type: The requested feature type - implementations: List of found implementations Examples -------- >>> details = await analyzer.get_implementation_details( ... "owner", "repo", "endpoints" ... ) >>> for endpoint in details["implementations"]: ... print(f"{endpoint['method']} {endpoint['path']}") """ details: Dict[str, Any] = {"feature_type": feature_type, "implementations": []} if feature_type == "endpoints": details["implementations"] = await self._find_endpoints(owner, repo) elif feature_type == "models": # Model finding not implemented - returning empty list details["implementations"] = [] elif feature_type == "schemas": # Schema finding not implemented - returning empty list details["implementations"] = [] return details
async def _get_recent_commits( self, owner: str, repo: str, author: str ) -> List[Dict[str, Any]]: """ Get recent commits by a specific author. Parameters ---------- owner : str Repository owner repo : str Repository name author : str Author name to filter commits Returns ------- List[Dict[str, Any]] List of commit dictionaries containing: - sha: Commit SHA - message: Commit message - date: Commit date - files_changed: Number of files changed """ if not self.mcp_caller: return [] try: result = await self.mcp_caller( "github.list_commits", {"owner": owner, "repo": repo, "perPage": 10} ) commits = [] if result.get("commits"): for commit in result["commits"]: # Check if commit author matches worker if author.lower() in str(commit.get("author", {})).lower(): commits.append( { "sha": commit.get("sha"), "message": commit.get("commit", {}).get("message", ""), "date": commit.get("commit", {}) .get("author", {}) .get("date", ""), "files_changed": len(commit.get("files", [])), } ) return commits except Exception as e: print(f"Error getting commits: {e}", file=sys.stderr) return [] async def _get_worker_prs( self, owner: str, repo: str, author: str ) -> List[Dict[str, Any]]: """ Get pull requests created by a specific worker. Parameters ---------- owner : str Repository owner repo : str Repository name author : str PR author name to filter Returns ------- List[Dict[str, Any]] List of PR dictionaries containing: - number: PR number - title: PR title - state: PR state (open/closed) - merged: Whether PR was merged - branch: Source branch name - created_at: Creation timestamp """ if not self.mcp_caller: return [] try: result = await self.mcp_caller( "github.list_pull_requests", {"owner": owner, "repo": repo, "state": "all", "perPage": 10}, ) prs = [] if result.get("pull_requests"): for pr in result["pull_requests"]: # Check if PR author matches worker if author.lower() in str(pr.get("user", {})).lower(): prs.append( { "number": pr.get("number"), "title": pr.get("title"), "state": pr.get("state"), "merged": pr.get("merged", False), "branch": pr.get("head", {}).get("ref", ""), "created_at": pr.get("created_at"), } ) return prs except Exception as e: print(f"Error getting PRs: {e}", file=sys.stderr) return [] async def _analyze_pr_changes( self, owner: str, repo: str, pr_number: int ) -> Dict[str, Any]: """ Analyze changes in a PR to understand implementations. Examines files changed in a PR to extract information about endpoints, models, configurations, and tests that were added. Parameters ---------- owner : str Repository owner repo : str Repository name pr_number : int Pull request number to analyze Returns ------- Dict[str, Any] Analysis results containing: - endpoints: List of API endpoints found - models: List of data models found - configurations: List of config changes - tests: List of test files added """ if not self.mcp_caller: return {} try: # Get PR files result = await self.mcp_caller( "github.get_pull_request_files", {"owner": owner, "repo": repo, "pullNumber": pr_number}, ) analysis: Dict[str, Any] = { "endpoints": [], "models": [], "configurations": [], "tests": [], } if result.get("files"): for file in result["files"]: filename = file.get("filename", "") patch = file.get("patch", "") # Analyze based on file type and changes if self._is_api_file(filename): endpoints = self._extract_endpoints(patch) if endpoints: analysis["endpoints"].extend(endpoints) elif self._is_model_file(filename): models = self._extract_models(patch) if models: analysis["models"].extend(models) elif self._is_config_file(filename): analysis["configurations"].append( { "file": filename, "changes": self._summarize_config_changes(patch), } ) elif self._is_test_file(filename): analysis["tests"].append( { "file": filename, "type": "unit" if "unit" in filename else "integration", } ) return analysis except Exception as e: print(f"Error analyzing PR: {e}", file=sys.stderr) return {} async def _find_models(self, owner: str, repo: str) -> List[Dict[str, Any]]: """ Find database models in the repository. Parameters ---------- owner : str Repository owner repo : str Repository name Returns ------- List[Dict[str, Any]] List of model dictionaries """ # Stub method - not yet implemented return [] async def _find_schemas(self, owner: str, repo: str) -> List[Dict[str, Any]]: """ Find API schemas in the repository. Parameters ---------- owner : str Repository owner repo : str Repository name Returns ------- List[Dict[str, Any]] List of schema dictionaries """ # Stub method - not yet implemented return [] async def _find_endpoints(self, owner: str, repo: str) -> List[Dict[str, Any]]: """ Find API endpoints in the repository. Searches common API file patterns and extracts endpoint definitions using regex patterns. Parameters ---------- owner : str Repository owner repo : str Repository name Returns ------- List[Dict[str, Any]] List of endpoint dictionaries with method, path, and implementation """ if not self.mcp_caller: return [] try: # Search for common API file patterns search_queries = [ f"repo:{owner}/{repo} path:api extension:py", f"repo:{owner}/{repo} path:routes extension:js", f"repo:{owner}/{repo} path:controllers extension:java", ] endpoints = [] for query in search_queries: result = await self.mcp_caller( "github.search_code", {"query": query, "perPage": 20} ) if result.get("items"): for item in result["items"]: # Get file content to extract endpoints file_result = await self.mcp_caller( "github.get_file_contents", {"owner": owner, "repo": repo, "path": item.get("path")}, ) if file_result.get("content"): content = self._decode_content(file_result["content"]) found_endpoints = self._extract_endpoints(content) endpoints.extend(found_endpoints) return endpoints except Exception as e: print(f"Error finding endpoints: {e}", file=sys.stderr) return [] def _extract_endpoints(self, content: str) -> List[Dict[str, Any]]: """ Extract API endpoints from code content. Uses regex patterns to find endpoint definitions in various web framework formats. Parameters ---------- content : str Source code content to analyze Returns ------- List[Dict[str, Any]] List of endpoints with: - method: HTTP method (GET, POST, etc.) - path: Endpoint path - implementation: Function name if found """ endpoints = [] for pattern in self.endpoint_patterns: matches = re.findall(pattern, content, re.MULTILINE) for match in matches: if isinstance(match, tuple): method = match[0].upper() path = match[1] if len(match) > 1 else match[0] else: method = "GET" # Default path = match endpoints.append( { "method": method, "path": path, "implementation": self._extract_function_name(content, path), } ) return endpoints def _extract_models(self, content: str) -> List[Dict[str, Any]]: """ Extract data models from code. Identifies database models, interfaces, and schema definitions in Python and TypeScript/JavaScript code. Parameters ---------- content : str Source code content to analyze Returns ------- List[Dict[str, Any]] List of models with: - name: Model/interface name - type: Type of model (database_model, interface) - language: Programming language """ models = [] # Python/SQLAlchemy models python_model_pattern = ( r"class\s+(\w+)\s*\([^)]*(?:Model|Base|BaseModel)[^)]*\):" ) matches = re.findall(python_model_pattern, content) for match in matches: models.append( {"name": match, "type": "database_model", "language": "python"} ) # TypeScript/JavaScript interfaces ts_interface_pattern = r"(?:export\s+)?interface\s+(\w+)\s*{" matches = re.findall(ts_interface_pattern, content) for match in matches: models.append( {"name": match, "type": "interface", "language": "typescript"} ) return models def _generate_recommendations( self, task: Task, findings: Dict[str, Any] ) -> List[str]: """ Generate recommendations for next workers based on findings. Creates actionable recommendations based on what was implemented to help subsequent workers understand dependencies. Parameters ---------- task : Task The completed task findings : Dict[str, Any] Analysis findings from commits and PRs Returns ------- List[str] List of recommendation strings for next workers """ recommendations = [] # If endpoints were created if findings.get("implementations", {}).get("endpoints"): endpoints = findings["implementations"]["endpoints"] rec = "The following API endpoints were implemented:\n" for ep in endpoints: rec += f" - {ep['method']} {ep['path']}\n" recommendations.append(rec) # If models were created if findings.get("implementations", {}).get("models"): models = findings["implementations"]["models"] rec = "The following data models were created:\n" for model in models: rec += f" - {model['name']} ({model['type']})\n" recommendations.append(rec) # If tests were added if findings.get("implementations", {}).get("tests"): tests = findings["implementations"]["tests"] rec = ( f"Tests were added in {len(tests)} files. " f"Ensure new features include tests." ) recommendations.append(rec) # Based on task type if "api" in task.name.lower() or "endpoint" in task.name.lower(): recommendations.append( "Frontend developers should use the implemented endpoints. " "Check the PR for request/response formats." ) if "model" in task.name.lower() or "schema" in task.name.lower(): recommendations.append( "Database migrations may be needed. " "API developers should use these models for consistency." ) return recommendations def _is_api_file(self, filename: str) -> bool: """ Check if file is likely an API file. Parameters ---------- filename : str File path to check Returns ------- bool True if filename contains API-related keywords """ api_indicators = ["api", "route", "controller", "endpoint", "view"] return any(indicator in filename.lower() for indicator in api_indicators) def _is_model_file(self, filename: str) -> bool: """ Check if file is likely a model file. Parameters ---------- filename : str File path to check Returns ------- bool True if filename contains model-related keywords """ model_indicators = ["model", "schema", "entity", "domain"] return any(indicator in filename.lower() for indicator in model_indicators) def _is_config_file(self, filename: str) -> bool: """ Check if file is a configuration file. Parameters ---------- filename : str File path to check Returns ------- bool True if filename has config-related extension """ config_extensions = [".json", ".yaml", ".yml", ".env", ".config"] return any(filename.endswith(ext) for ext in config_extensions) def _is_test_file(self, filename: str) -> bool: """ Check if file is a test file. Parameters ---------- filename : str File path to check Returns ------- bool True if filename contains 'test' or 'spec' """ return "test" in filename.lower() or "spec" in filename.lower() def _extract_function_name(self, content: str, path: str) -> Optional[str]: """ Try to extract the function name handling an endpoint. Searches for function definitions near endpoint declarations. Parameters ---------- content : str Source code content path : str Endpoint path to search near Returns ------- Optional[str] Function name if found, None otherwise """ # Look for function definition near the path lines = content.split("\n") for i, line in enumerate(lines): if path in line: # Check next few lines for function definition for j in range(max(0, i - 5), min(len(lines), i + 5)): func_match = re.search(r"def\s+(\w+)\s*\(", lines[j]) if func_match: return func_match.group(1) return None def _decode_content(self, content: str) -> str: """ Decode base64 content from GitHub API. Parameters ---------- content : str Base64 encoded content from GitHub Returns ------- str Decoded UTF-8 string content """ import base64 try: return base64.b64decode(content).decode("utf-8") except Exception: return content def _summarize_config_changes(self, patch: str) -> str: """ Summarize configuration changes from a patch. Counts additions and deletions in a diff patch. Parameters ---------- patch : str Git diff patch content Returns ------- str Summary string like "5 additions, 3 deletions" """ added = len(re.findall(r"^\+[^+]", patch, re.MULTILINE)) removed = len(re.findall(r"^-[^-]", patch, re.MULTILINE)) return f"{added} additions, {removed} deletions"