Source code for src.core.ai_powered_task_assignment

"""
AI-Powered Task Assignment for Marcus.

This module upgrades the basic task assignment logic to use
Phase 1-4 AI capabilities for intelligent task selection.
"""

import logging
from typing import Any, Dict, List, Optional, Set

from src.ai.core.ai_engine import MarcusAIEngine
from src.ai.types import AssignmentContext
from src.core.models import Priority, Task, TaskStatus
from src.integrations.ai_analysis_engine import AIAnalysisEngine
from src.intelligence.dependency_inferer_hybrid import HybridDependencyInferer

logger = logging.getLogger(__name__)


[docs] class AITaskAssignmentEngine: """ Intelligent task assignment using Phase 1-4 capabilities. Features: - Phase 1: Safety checks (no deploy before implement) - Phase 2: Dependency analysis (prioritize unblocking tasks) - Phase 3: AI-powered agent matching - Phase 4: Predictive impact analysis """
[docs] def __init__( self, ai_engine: MarcusAIEngine, project_tasks: List[Task], ai_analysis_engine: Optional[AIAnalysisEngine] = None, ): self.ai_engine = ai_engine self.project_tasks = project_tasks # Use hybrid dependency inferer for better accuracy with fewer API calls self.dependency_inferer = HybridDependencyInferer(ai_analysis_engine)
[docs] async def find_optimal_task_for_agent( self, agent_id: str, agent_info: Dict[str, Any], available_tasks: List[Task], assigned_task_ids: Set[str], ) -> Optional[Task]: """ Find the best task for an agent using AI capabilities. This replaces the basic skill/priority matching with intelligent analysis. """ if not available_tasks: return None logger.info( f"Finding optimal task for agent {agent_id} " f"from {len(available_tasks)} tasks" ) # Step 1: Safety filtering (Phase 1) safe_tasks = await self._filter_safe_tasks(available_tasks) logger.info(f"After safety filtering: {len(safe_tasks)} tasks remain") if not safe_tasks: return None # Step 2: Dependency analysis (Phase 2) dependency_scores = await self._analyze_dependencies(safe_tasks) # Step 3: AI-powered analysis (Phase 3) ai_scores = await self._get_ai_recommendations(safe_tasks, agent_info) # Step 4: Predictive impact (Phase 4) impact_scores = await self._predict_task_impact(safe_tasks) # Step 5: Combine scores intelligently best_task = await self._select_best_task( safe_tasks, dependency_scores, ai_scores, impact_scores, agent_info ) if best_task: logger.info(f"Selected task '{best_task.name}' for agent {agent_id}") else: logger.info(f"No suitable task found for agent {agent_id}") return best_task
async def _filter_safe_tasks(self, tasks: List[Task]) -> List[Task]: """Phase 1: Filter out unsafe tasks (e.g., deployment before implementation).""" safe_tasks = [] for task in tasks: # Check if this is a deployment/release task if self._is_deployment_task(task): # Check if dependencies are complete if not await self._are_dependencies_complete(task): logger.warning( f"Filtering out unsafe task: {task.name} - " f"dependencies incomplete" ) continue # Additional safety check with AI (if method exists) if hasattr(self.ai_engine, "check_deployment_safety"): safety_check = await self.ai_engine.check_deployment_safety( task, self.project_tasks ) if not safety_check.get("safe", False): logger.warning( f"AI safety check failed for: {task.name} - " f"{safety_check.get('reason')}" ) continue safe_tasks.append(task) return safe_tasks async def _analyze_dependencies(self, tasks: List[Task]) -> Dict[str, float]: """Phase 2: Analyze task dependencies and prioritize unblocking tasks.""" dependency_scores = {} # Build dependency graph dependency_graph = await self.dependency_inferer.infer_dependencies( self.project_tasks ) for task in tasks: # Count how many tasks this would unblock unblocked_count = 0 for other_task in self.project_tasks: if ( task.id in other_task.dependencies and other_task.status == TaskStatus.TODO ): unblocked_count += 1 # Check if task is on critical path critical_path = dependency_graph.get_critical_path() is_critical = task.id in critical_path # Calculate dependency score score = unblocked_count * 0.5 if is_critical: score += 0.5 dependency_scores[task.id] = score logger.debug( f"Task {task.name}: unblocks {unblocked_count}, " f"critical: {is_critical}, score: {score}" ) return dependency_scores async def _get_ai_recommendations( self, tasks: List[Task], agent_info: Dict[str, Any] ) -> Dict[str, float]: """Phase 3: Get AI-powered recommendations for agent-task matching.""" ai_scores = {} # Get AI recommendations for each task for task in tasks: # Prepare context for AI analysis context = AssignmentContext( task=task, agent_id=agent_info["worker_id"], agent_status=agent_info, available_tasks=tasks, project_context={ "total_tasks": len(self.project_tasks), "completed_tasks": len( [t for t in self.project_tasks if t.status == TaskStatus.DONE] ), "project_phase": self._detect_project_phase(), }, team_status={}, # Could include other agents' status ) # Use hybrid AI decision framework (if method exists) if hasattr(self.ai_engine, "analyze_task_assignment"): ai_analysis = await self.ai_engine.analyze_task_assignment(context) else: # Fallback to default scoring ai_analysis = {"suitability_score": 0.5, "confidence": 1.0} # Extract score from AI analysis score = ai_analysis.get("suitability_score", 0.5) score *= ai_analysis.get("confidence", 1.0) ai_scores[task.id] = score logger.debug( f"AI score for {task.name}: {score} " f"(confidence: {ai_analysis.get('confidence')})" ) return ai_scores async def _predict_task_impact(self, tasks: List[Task]) -> Dict[str, float]: """Phase 4: Predict the impact of completing each task.""" impact_scores = {} for task in tasks: # Predict how completing this task affects project timeline # (if method exists) if hasattr(self.ai_engine, "predict_task_impact"): impact_analysis = await self.ai_engine.predict_task_impact( task, self.project_tasks, { "current_velocity": self._calculate_velocity(), "team_size": 3, # Could be dynamic }, ) else: # Fallback to default impact scoring impact_analysis = {"impact_score": 0.5, "confidence": 1.0} # Score based on timeline reduction and risk mitigation timeline_impact = ( impact_analysis.get("timeline_reduction_days", 0) / 10 ) # Normalize risk_reduction = impact_analysis.get("risk_reduction", 0) score = (timeline_impact * 0.6) + (risk_reduction * 0.4) impact_scores[task.id] = min(score, 1.0) # Cap at 1.0 logger.debug( f"Impact score for {task.name}: {score} " f"(timeline: {timeline_impact}, risk: {risk_reduction})" ) return impact_scores async def _select_best_task( self, tasks: List[Task], dependency_scores: Dict[str, float], ai_scores: Dict[str, float], impact_scores: Dict[str, float], agent_info: Dict[str, Any], ) -> Optional[Task]: """Combine all scores to select the best task.""" best_task = None best_combined_score = -1.0 # Weights for different factors weights = { "skill_match": 0.15, # Basic skill matching (reduced) "priority": 0.15, # Task priority (reduced) "dependencies": 0.25, # Unblocking other tasks (important) "ai_recommendation": 0.30, # AI suitability analysis (most important) "impact": 0.15, # Project impact prediction } for task in tasks: # Basic skill match score skill_score = self._calculate_skill_match( task, agent_info.get("skills", []) ) # Priority score priority_score = { Priority.URGENT: 1.0, Priority.HIGH: 0.8, Priority.MEDIUM: 0.5, Priority.LOW: 0.2, }.get(task.priority, 0.5) # Get scores from our analysis dep_score = dependency_scores.get(task.id, 0) ai_score = ai_scores.get(task.id, 0.5) impact_score = impact_scores.get(task.id, 0.5) # Calculate combined score combined_score = ( skill_score * weights["skill_match"] + priority_score * weights["priority"] + dep_score * weights["dependencies"] + ai_score * weights["ai_recommendation"] + impact_score * weights["impact"] ) logger.debug( f"Task {task.name} scores - " f"skill: {skill_score:.2f}, priority: {priority_score:.2f}, " f"deps: {dep_score:.2f}, ai: {ai_score:.2f}, " f"impact: {impact_score:.2f}, combined: {combined_score:.2f}" ) if combined_score > best_combined_score: best_combined_score = combined_score best_task = task return best_task def _is_deployment_task(self, task: Task) -> bool: """Check if task is deployment-related.""" keywords = ["deploy", "release", "production", "launch", "rollout"] task_lower = task.name.lower() return any(keyword in task_lower for keyword in keywords) async def _are_dependencies_complete(self, task: Task) -> bool: """Check if all task dependencies are complete.""" if not task.dependencies: return True for dep_id in task.dependencies: dep_task = next((t for t in self.project_tasks if t.id == dep_id), None) if dep_task and dep_task.status != TaskStatus.DONE: return False return True def _detect_project_phase(self) -> str: """Detect current project phase based on task completion.""" total_tasks = len(self.project_tasks) if total_tasks == 0: return "initialization" completed = len([t for t in self.project_tasks if t.status == TaskStatus.DONE]) completion_ratio = completed / total_tasks if completion_ratio < 0.1: return "foundation" elif completion_ratio < 0.3: return "early_development" elif completion_ratio < 0.6: return "active_development" elif completion_ratio < 0.8: return "integration" elif completion_ratio < 0.95: return "testing" else: return "deployment" def _calculate_velocity(self) -> float: """Calculate team velocity (tasks per day).""" # Simplified - in reality would track completion times completed_tasks = [t for t in self.project_tasks if t.status == TaskStatus.DONE] if not completed_tasks: return 2.0 # Default estimate # Assume project started 30 days ago (would track actual dates) days_elapsed = 30 return len(completed_tasks) / days_elapsed def _calculate_skill_match(self, task: Task, agent_skills: List[str]) -> float: """Calculate skill match between agent and task.""" if not agent_skills or not task.labels: return 0.5 # Neutral score matching_skills = set(agent_skills) & set(task.labels) return len(matching_skills) / len(task.labels)
# Function to integrate into marcus_mcp_server.py
[docs] async def find_optimal_task_for_agent_ai_powered( agent_id: str, agent_status: Dict[str, Any], project_tasks: List[Task], available_tasks: List[Task], assigned_task_ids: Set[str], ai_engine: MarcusAIEngine, ) -> Optional[Task]: """ AI-powered task assignment to replace the basic version. This should be called from request_next_task in marcus_mcp_server.py """ # Initialize AI assignment engine assignment_engine = AITaskAssignmentEngine(ai_engine, project_tasks) # Find optimal task optimal_task = await assignment_engine.find_optimal_task_for_agent( agent_id=agent_id, agent_info=agent_status, available_tasks=available_tasks, assigned_task_ids=assigned_task_ids, ) return optimal_task