Source code for src.modes.adaptive.basic_adaptive

"""
Basic Adaptive Mode for Marcus Hybrid Approach.

Implements Adaptive Mode that respects dependencies and prevents illogical
task assignments like "Deploy to production" before development is complete.
"""

import logging
from typing import Any, Dict, List, Optional, Tuple

from src.core.assignment_persistence import AssignmentPersistence
from src.core.models import Priority, Task, TaskStatus

logger = logging.getLogger(__name__)


[docs] class BasicAdaptiveMode: """ Basic Adaptive Mode that coordinates within existing structure. This mode implements task assignment logic that respects dependencies and prevents illogical task assignments. """
[docs] def __init__(self) -> None: self.state: Dict[str, Any] = {"assignment_preferences": {}, "blocked_tasks": []} self.assignment_persistence = AssignmentPersistence() # Common dependency patterns to prevent illogical assignments self.LOGICAL_DEPENDENCY_PATTERNS = [ # Setup must come before everything { "pattern": r"(setup|init|configure|install)", "blocks_until_complete": ( r"(implement|build|create|develop|test|deploy)" ), }, # Design comes before implementation { "pattern": r"(design|architect|plan|schema)", "blocks_until_complete": r"(implement|build|create|code)", }, # Models/Backend before Frontend { "pattern": r"(model|backend|api|server)", "blocks_until_complete": r"(frontend|ui|client|interface)", }, # Implementation before testing { "pattern": r"(implement|build|create|develop)", "blocks_until_complete": r"(test|qa|verify)", }, # Testing before deployment { "pattern": r"(test|qa|quality|verify)", "blocks_until_complete": r"(deploy|release|launch|production)", }, # Basic features before advanced features { "pattern": r"(authentication|login|auth)", "blocks_until_complete": r"(permissions|roles|admin)", }, ]
[docs] async def initialize(self, saved_state: Dict[str, Any]) -> None: """ Initialize mode with saved state. Parameters ---------- saved_state : Dict[str, Any] Previously saved state to restore, or empty dict for fresh start. """ if saved_state: self.state.update(saved_state) logger.info("Adaptive mode initialized with saved state") else: logger.info("Adaptive mode initialized with default state")
[docs] async def get_state(self) -> Dict[str, Any]: """ Get current mode state for saving. Returns ------- Dict[str, Any] Current state dictionary containing preferences and blocked tasks. """ return self.state.copy()
[docs] async def get_status(self) -> Dict[str, Any]: """ Get current mode status. Returns ------- Dict[str, Any] Status dictionary with mode name, preference count, and blocked task count. """ return { "mode": "adaptive", "assignment_preferences": len(self.state.get("assignment_preferences", {})), "blocked_tasks": len(self.state.get("blocked_tasks", [])), }
[docs] async def find_optimal_task_for_agent( self, agent_id: str, agent_skills: List[str], available_tasks: List[Task], assigned_tasks: Dict[str, Task], ) -> Optional[Task]: """ Find optimal task respecting dependencies and preventing illogical assignments. Parameters ---------- agent_id : str ID of the agent requesting work. agent_skills : List[str] Skills/capabilities of the agent. available_tasks : List[Task] Tasks available for assignment. assigned_tasks : Dict[str, Task] Currently assigned tasks (agent_id -> task). Returns ------- Optional[Task] Best task for the agent or None. """ logger.info( f"Finding optimal task for agent {agent_id} with " f"{len(available_tasks)} available tasks" ) # Filter out tasks that are blocked by dependencies unblocked_tasks = await self._filter_unblocked_tasks( available_tasks, assigned_tasks ) if not unblocked_tasks: logger.info("No unblocked tasks available") return None # Score tasks based on multiple factors scored_tasks: List[Tuple[Task, float]] = [] for task in unblocked_tasks: score = await self._calculate_task_score( task=task, agent_id=agent_id, agent_skills=agent_skills, available_tasks=available_tasks, ) scored_tasks.append((task, score)) # Sort by score (highest first) scored_tasks.sort(key=lambda x: x[1], reverse=True) if scored_tasks: best_task, best_score = scored_tasks[0] logger.info(f"Selected task '{best_task.name}' with score {best_score:.2f}") return best_task return None
async def _filter_unblocked_tasks( self, tasks: List[Task], assigned_tasks: Dict[str, Task] ) -> List[Task]: """ Filter tasks to only include those not blocked by dependencies. This is the core logic that prevents "Deploy to production" from being assigned before development is complete. Parameters ---------- tasks : List[Task] List of tasks to filter. assigned_tasks : Dict[str, Task] Currently assigned tasks (agent_id -> task). Returns ------- List[Task] List of tasks that are not blocked by dependencies. """ unblocked_tasks: List[Task] = [] for task in tasks: if await self._is_task_unblocked(task, tasks, assigned_tasks): unblocked_tasks.append(task) else: logger.debug(f"Task '{task.name}' is blocked by dependencies") return unblocked_tasks async def _is_task_unblocked( self, task: Task, all_tasks: List[Task], assigned_tasks: Dict[str, Task] ) -> bool: """ Check if a task is unblocked and ready for assignment. Returns False for illogical assignments like deployment before development. Parameters ---------- task : Task The task to check. all_tasks : List[Task] All available tasks to check for dependencies. assigned_tasks : Dict[str, Task] Currently assigned tasks (agent_id -> task). Returns ------- bool True if task is unblocked and ready, False otherwise. """ # Check explicit dependencies first if task.dependencies: for dep_id in task.dependencies: dep_task = next((t for t in all_tasks if t.id == dep_id), None) if dep_task and dep_task.status != TaskStatus.DONE: logger.debug( f"Task '{task.name}' blocked by incomplete " f"dependency '{dep_task.name}'" ) return False # Check logical dependency patterns task_text = f"{task.name} {task.description or ''}".lower() for pattern in self.LOGICAL_DEPENDENCY_PATTERNS: blocking_pattern = pattern["pattern"] blocked_pattern = pattern["blocks_until_complete"] # If this task matches a blocked pattern import re if re.search(blocked_pattern, task_text): # Check if any blocking tasks are incomplete for other_task in all_tasks: other_text = ( f"{other_task.name} {other_task.description or ''}".lower() ) if ( re.search(blocking_pattern, other_text) and other_task.status != TaskStatus.DONE and other_task.id != task.id ): logger.info( f"Task '{task.name}' blocked by logical dependency: " f"'{other_task.name}' must complete first" ) return False # Check for obvious illogical patterns if await self._is_obviously_illogical(task, all_tasks): return False return True async def _is_obviously_illogical(self, task: Task, all_tasks: List[Task]) -> bool: """ Check for obviously illogical task assignments. This prevents the core problem: deploying before building. Parameters ---------- task : Task The task to check. all_tasks : List[Task] All available tasks to check against. Returns ------- bool True if assignment would be illogical, False otherwise. """ task_lower = task.name.lower() # Deployment tasks if any( word in task_lower for word in ["deploy", "production", "release", "launch"] ): # Check if there are any incomplete implementation tasks for other_task in all_tasks: other_lower = other_task.name.lower() if ( any( word in other_lower for word in ["implement", "build", "create", "develop"] ) and other_task.status != TaskStatus.DONE and other_task.id != task.id ): logger.warning( f"Blocking deployment task '{task.name}' - " f"implementation task '{other_task.name}' is not complete" ) return True # Testing tasks if any(word in task_lower for word in ["test", "qa", "quality"]): # Check if there are any incomplete implementation tasks for # the same component for other_task in all_tasks: other_lower = other_task.name.lower() if ( any( word in other_lower for word in ["implement", "build", "create"] ) and other_task.status != TaskStatus.DONE and self._tasks_related(task, other_task) ): logger.info( f"Blocking test task '{task.name}' - related " f"implementation '{other_task.name}' is not complete" ) return True return False def _tasks_related(self, task1: Task, task2: Task) -> bool: """ Check if two tasks are related (same component/feature). Parameters ---------- task1 : Task First task to compare. task2 : Task Second task to compare. Returns ------- bool True if tasks share significant common words, False otherwise. """ # Simple heuristic: check for common words in task names words1 = set(task1.name.lower().split()) words2 = set(task2.name.lower().split()) # Remove common words common_stopwords = { "the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by", } words1 -= common_stopwords words2 -= common_stopwords # If they share significant words, they're probably related intersection = words1 & words2 return ( len(intersection) >= 1 and len(intersection) >= min(len(words1), len(words2)) * 0.3 ) async def _calculate_task_score( self, task: Task, agent_id: str, agent_skills: List[str], available_tasks: List[Task], ) -> float: """ Calculate a score for how well a task matches an agent. Higher score = better match. Parameters ---------- task : Task The task to score. agent_id : str ID of the agent. agent_skills : List[str] Skills/capabilities of the agent. available_tasks : List[Task] All available tasks for context. Returns ------- float Score between 0 and 1, where higher is better match. """ score = 0.0 # Skill matching (40% of score) skill_score = self._calculate_skill_match(task, agent_skills) score += skill_score * 0.4 # Priority weight (30% of score) priority_scores = { Priority.LOW: 0.2, Priority.MEDIUM: 0.5, Priority.HIGH: 0.8, Priority.URGENT: 1.0, } score += priority_scores.get(task.priority, 0.5) * 0.3 # Prefer tasks that unblock others (20% of score) unblocking_score = self._calculate_unblocking_value(task, available_tasks) score += unblocking_score * 0.2 # Agent preference (10% of score) preference_score = self._get_agent_preference_score(agent_id, task) score += preference_score * 0.1 return score def _calculate_skill_match(self, task: Task, agent_skills: List[str]) -> float: """ Calculate how well agent skills match task requirements. Parameters ---------- task : Task The task to evaluate. agent_skills : List[str] Skills/capabilities of the agent. Returns ------- float Skill match score between 0 and 1. """ if not agent_skills: return 0.5 # Neutral score if no skills known # Extract skill indicators from task task_text = ( f"{task.name} {task.description or ''} {' '.join(task.labels)}".lower() ) skill_keywords = { "python": ["python", "django", "flask", "fastapi"], "javascript": ["javascript", "js", "node", "react", "vue", "angular"], "frontend": ["frontend", "ui", "css", "html", "react", "vue", "angular"], "backend": ["backend", "api", "server", "database", "db"], "devops": ["devops", "docker", "ci", "cd", "deploy", "infrastructure"], "testing": ["test", "qa", "quality", "junit", "pytest"], "database": ["database", "sql", "mysql", "postgresql", "mongodb"], "mobile": ["mobile", "ios", "android", "react-native", "flutter"], } # Count matching skills matches = 0 total_possible = 0 for skill in agent_skills: skill_lower = skill.lower() total_possible += 1 # Direct skill match if skill_lower in task_text: matches += 1 continue # Keyword match if skill_lower in skill_keywords: for keyword in skill_keywords[skill_lower]: if keyword in task_text: matches += 1 break return matches / max(total_possible, 1) def _calculate_unblocking_value( self, task: Task, available_tasks: List[Task] ) -> float: """ Calculate how many other tasks this task would unblock. Parameters ---------- task : Task The task to evaluate. available_tasks : List[Task] All available tasks to check for dependencies. Returns ------- float Normalized unblocking value between 0 and 1. """ if not task.id: return 0.0 # Count tasks that depend on this one dependent_count = 0 for other_task in available_tasks: if task.id in other_task.dependencies: dependent_count += 1 # Normalize by total tasks if available_tasks: return dependent_count / len(available_tasks) return 0.0 def _get_agent_preference_score(self, agent_id: str, task: Task) -> float: """ Get preference score based on agent's history. Parameters ---------- agent_id : str ID of the agent. task : Task The task to score. Returns ------- float Preference score between 0 and 1. """ assignment_prefs = self.state.get("assignment_preferences", {}) if isinstance(assignment_prefs, dict): preferences = assignment_prefs.get(agent_id, {}) else: preferences = {} # Simple preference based on task labels score = 0.0 for label in task.labels: if label in preferences: score += preferences[label] * 0.1 return min(score, 1.0) # Cap at 1.0
[docs] async def record_assignment_outcome( self, agent_id: str, task: Task, outcome: str, feedback: Optional[str] = None ) -> None: """ Record the outcome of a task assignment for learning. Parameters ---------- agent_id : str Agent who worked on the task. task : Task Task that was assigned. outcome : str 'completed', 'blocked', 'abandoned'. feedback : Optional[str] Optional feedback from agent. """ # Update agent preferences based on outcome assignment_prefs = self.state.get("assignment_preferences", {}) if not isinstance(assignment_prefs, dict): assignment_prefs = {} self.state["assignment_preferences"] = assignment_prefs if agent_id not in assignment_prefs: assignment_prefs[agent_id] = {} preferences = assignment_prefs[agent_id] if not isinstance(preferences, dict): preferences = {} assignment_prefs[agent_id] = preferences # Adjust preferences based on outcome weight_change = {"completed": 0.1, "blocked": -0.05, "abandoned": -0.1}.get( outcome, 0 ) for label in task.labels: if label not in preferences: preferences[label] = 0.5 preferences[label] = max(0, min(1, preferences[label] + weight_change)) logger.info( f"Recorded {outcome} outcome for agent {agent_id} on task '{task.name}'" )
[docs] async def get_blocking_analysis(self, tasks: List[Task]) -> Dict[str, Any]: """ Analyze what tasks are blocking others. Parameters ---------- tasks : List[Task] List of tasks to analyze. Returns ------- Dict[str, Any] Analysis of blocking relationships. """ blocking_analysis: Dict[str, Any] = { "blocked_tasks": [], "blocking_tasks": [], "dependency_chains": [], "ready_tasks": [], } # Group tasks by status todo_tasks = [t for t in tasks if t.status == TaskStatus.TODO] _done_tasks = [ # noqa: F841 # Reserved for future analysis t for t in tasks if t.status == TaskStatus.DONE ] for task in todo_tasks: if not await self._is_task_unblocked(task, tasks, {}): # Find what's blocking it blockers: List[Dict[str, Any]] = [] # Check explicit dependencies for dep_id in task.dependencies: dep_task = next((t for t in tasks if t.id == dep_id), None) if dep_task and dep_task.status != TaskStatus.DONE: blockers.append( { "type": "explicit_dependency", "blocking_task": dep_task.name, "blocking_task_id": dep_task.id, } ) # Check logical dependencies task_text = f"{task.name} {task.description or ''}".lower() import re for pattern in self.LOGICAL_DEPENDENCY_PATTERNS: if re.search(pattern["blocks_until_complete"], task_text): for other_task in tasks: other_text = ( f"{other_task.name} " f"{other_task.description or ''}".lower() ) if ( re.search(pattern["pattern"], other_text) and other_task.status != TaskStatus.DONE ): blockers.append( { "type": "logical_dependency", "blocking_task": other_task.name, "blocking_task_id": other_task.id, "reason": ( f"Must complete {pattern['pattern']} " f"before {pattern['blocks_until_complete']}" ), } ) if blockers: blocking_analysis["blocked_tasks"].append( {"task": task.name, "task_id": task.id, "blocked_by": blockers} ) else: blocking_analysis["ready_tasks"].append( { "task": task.name, "task_id": task.id, "priority": task.priority.value, } ) return blocking_analysis