Source code for src.intelligence.dependency_inferer

"""
Dependency Inference Engine for Marcus Phase 2.

Smart dependency detection to prevent illogical task assignments like
"Deploy to production" before development is complete.
"""

import logging
import re
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Set

from src.core.models import Task, TaskStatus

logger = logging.getLogger(__name__)

# Setup-tier keyword matcher (issue #455 / Codex P2 on PR #466).
# Word-boundary anchored prefix match.  Prefixes are intentionally
# shorter than the canonical word so common inflections match:
#
#   ``\binit\w*``     → init, initial, initialize, initialization
#   ``\bconfig\w*``   → config, configure, configured, configuration
#   ``\binstall\w*``  → install, installs, installer, installation
#   ``\bscaffold\w*`` → scaffold, scaffolds, scaffolding
#   ``\bsetup\w*``    → setup, setups
#   ``\bfoundation\w*`` → foundation, foundations, foundational
#
# The ``\b`` boundary rejects un-/re-/pre-/post-prefixed false
# positives: ``uninstall`` (preceded by 'n' — word char, no boundary),
# ``reconfigure`` (preceded by 'e'), ``presetup``, ``postinit``.
_SETUP_PATTERN: re.Pattern[str] = re.compile(
    r"\b(setup|init|config|install|scaffold|foundation)\w*",
    re.IGNORECASE,
)


[docs] @dataclass class DependencyPattern: """Pattern for inferring dependencies.""" name: str description: str condition_pattern: str # Regex pattern to match dependent task dependency_pattern: str # Regex pattern to match dependency task confidence: float mandatory: bool # Whether this dependency is strictly required
[docs] @dataclass class InferredDependency: """Inferred dependency between tasks.""" dependent_task_id: str dependency_task_id: str dependency_type: str # 'hard', 'soft', 'logical' confidence: float reasoning: str source: str = "unknown" # pattern_matching, prd_bundled_design, manual
[docs] @dataclass class DependencyGraph: """Graph representation of task dependencies.""" nodes: Dict[str, Task] edges: List[InferredDependency] adjacency_list: Dict[str, List[str]] reverse_adjacency: Dict[str, List[str]]
[docs] def has_cycle(self) -> bool: """Check if the dependency graph has cycles.""" visited = set() rec_stack = set() def dfs(node_id: str) -> bool: if node_id in rec_stack: return True if node_id in visited: return False visited.add(node_id) rec_stack.add(node_id) for neighbor in self.adjacency_list.get(node_id, []): if dfs(neighbor): return True rec_stack.remove(node_id) return False for node_id in self.nodes: if node_id not in visited: if dfs(node_id): return True return False
[docs] def get_critical_path(self) -> List[str]: """Get the critical path (longest dependency chain).""" # Topological sort to find longest path in_degree: Dict[str, int] = defaultdict(int) for node_id in self.nodes: for dep in self.adjacency_list.get(node_id, []): in_degree[dep] += 1 # Initialize distances distances: Dict[str, float] = {node_id: 0.0 for node_id in self.nodes} # BFS-like approach for longest path queue = deque([node_id for node_id in self.nodes if in_degree[node_id] == 0]) while queue: current = queue.popleft() for neighbor in self.adjacency_list.get(current, []): # Calculate distance based on estimated hours current_task = self.nodes[current] new_distance = distances[current] + (current_task.estimated_hours or 1) if new_distance > distances[neighbor]: distances[neighbor] = new_distance in_degree[neighbor] -= 1 if in_degree[neighbor] == 0: queue.append(neighbor) # Find path to node with maximum distance # Handle empty task graph case if not distances: return [] max(distances.values()) end_node = max(distances.items(), key=lambda x: x[1])[0] # Reconstruct path path = [] current_node: Optional[str] = end_node while current_node: path.append(current_node) # Find predecessor with maximum distance predecessors = self.reverse_adjacency.get(current_node, []) if predecessors: current_node = max(predecessors, key=lambda p: distances[p]) else: current_node = None path.reverse() return path
[docs] class DependencyInferer: """Infers dependencies between tasks to prevent illogical assignments."""
[docs] def __init__(self) -> None: # Core dependency patterns that prevent illogical assignments self.dependency_patterns = [ # Setup/Configuration comes before everything DependencyPattern( name="setup_blocks_all", description="Setup tasks must complete before development", condition_pattern=r"(implement|build|create|develop|test|deploy)", dependency_pattern=r"(setup|init|configure|install|scaffold)", confidence=0.95, mandatory=True, ), # Design comes before implementation # (unified pattern with specific task prefixes) DependencyPattern( name="design_before_implementation", description="Design must complete before implementation", condition_pattern=r"\b(implement|build|create|code|develop)\b", dependency_pattern=r"\b(design|architect|plan|wireframe|spec)\b", confidence=0.95, mandatory=True, ), # Models/Backend before Frontend DependencyPattern( name="backend_before_frontend", description="Backend/API must exist before frontend integration", condition_pattern=r"\b(frontend|ui|client|interface)\b", dependency_pattern=r"\b(backend|api|server|endpoint|service)\b", confidence=0.85, mandatory=False, ), # Implementation before testing DependencyPattern( name="implementation_before_testing", description="Implementation must complete before testing", condition_pattern=r"\b(test|qa|quality|verify|testing)\b", dependency_pattern=r"\b(implement|build|create|develop)\b", confidence=0.95, mandatory=True, ), # Testing before deployment DependencyPattern( name="testing_before_deployment", description="Testing must complete before deployment", condition_pattern=r"\b(deploy|release|launch|production)\b", dependency_pattern=r"\b(test|qa|quality|verify|testing)\b", confidence=0.95, mandatory=True, ), # Database schema before models DependencyPattern( name="schema_before_models", description="Database schema must be designed before models", condition_pattern=r"(model|entity|orm)", dependency_pattern=r"(schema|database.*design)", confidence=0.85, mandatory=False, ), # Authentication before authorization DependencyPattern( name="auth_before_authz", description="Authentication must exist before authorization", condition_pattern=r"(authorization|permission|role|access)", dependency_pattern=r"(authentication|login|signin)", confidence=0.90, mandatory=True, ), # Basic features before advanced features DependencyPattern( name="basic_before_advanced", description="Basic CRUD before advanced features", condition_pattern=r"(advanced|complex|optimization|caching)", dependency_pattern=r"(basic|crud|create|read|update|delete)", confidence=0.75, mandatory=False, ), # Component-specific dependencies DependencyPattern( name="component_implementation_order", description="Component implementation before component testing", condition_pattern=r"test", dependency_pattern=r"(implement|build|create|develop)", confidence=0.90, mandatory=True, ), ] # Technology-specific patterns self.tech_patterns = { "react": [ { "condition": r"component.*test", "dependency": r"component.*implement", "confidence": 0.90, } ], "database": [ { "condition": r"migration", "dependency": r"schema.*design", "confidence": 0.85, } ], }
[docs] async def infer_dependencies(self, tasks: List[Task]) -> DependencyGraph: """ Infer dependencies from task names and descriptions. Parameters ---------- tasks : List[Task] List of tasks to analyze Returns ------- DependencyGraph Dependency graph with inferred dependencies """ logger.info(f"Inferring dependencies for {len(tasks)} tasks") # Create nodes mapping nodes = {task.id: task for task in tasks} # Infer dependencies using patterns inferred_dependencies = [] for dependent_task in tasks: for dependency_task in tasks: if dependent_task.id == dependency_task.id: continue # Check each pattern for pattern in self.dependency_patterns: dependency = self._check_pattern( dependent_task, dependency_task, pattern ) if dependency: inferred_dependencies.append(dependency) # Remove duplicates and conflicts cleaned_dependencies = self._clean_dependencies(inferred_dependencies) # Build adjacency lists adjacency_list = defaultdict(list) reverse_adjacency = defaultdict(list) for dep in cleaned_dependencies: adjacency_list[dep.dependency_task_id].append(dep.dependent_task_id) reverse_adjacency[dep.dependent_task_id].append(dep.dependency_task_id) # Create dependency graph graph = DependencyGraph( nodes=nodes, edges=cleaned_dependencies, adjacency_list=dict(adjacency_list), reverse_adjacency=dict(reverse_adjacency), ) # Check for cycles and resolve if necessary if graph.has_cycle(): logger.warning("Cycle detected in dependency graph, attempting to resolve") graph = self._resolve_cycles(graph) logger.info(f"Inferred {len(cleaned_dependencies)} dependencies") return graph
def _check_pattern( self, dependent_task: Task, dependency_task: Task, pattern: DependencyPattern ) -> Optional[InferredDependency]: """Check if a dependency pattern matches between two tasks.""" # Get task text for analysis dependent_text = ( f"{dependent_task.name} {dependent_task.description or ''}".lower() ) dependency_text = ( f"{dependency_task.name} {dependency_task.description or ''}".lower() ) # Check if dependent task matches the condition pattern if not re.search(pattern.condition_pattern, dependent_text): return None # Check if dependency task matches the dependency pattern if not re.search(pattern.dependency_pattern, dependency_text): return None # Additional logical checks if not self._is_logical_dependency(dependent_task, dependency_task, pattern): return None # Create dependency dependency_type = "hard" if pattern.mandatory else "soft" return InferredDependency( dependent_task_id=dependent_task.id, dependency_task_id=dependency_task.id, dependency_type=dependency_type, confidence=pattern.confidence, reasoning=f"Pattern: {pattern.description}", source="pattern_matching", ) def _is_logical_dependency( self, dependent_task: Task, dependency_task: Task, pattern: DependencyPattern ) -> bool: """Perform additional logical checks for dependency validity.""" # Don't create dependencies between completed tasks and new tasks if ( dependency_task.status == TaskStatus.DONE and dependent_task.status == TaskStatus.TODO ): return False # Check for component/feature matching dependent_words = set(dependent_task.name.lower().split()) dependency_words = set(dependency_task.name.lower().split()) # Remove common stop words stop_words = { "the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by", } dependent_words -= stop_words dependency_words -= stop_words # Must share at least one meaningful word for component-specific patterns if pattern.name == "component_implementation_order": intersection = dependent_words & dependency_words if len(intersection) == 0: return False # Prevent circular dependencies by enforcing logical task ordering dependent_task_type = self._classify_task_type(dependent_task.name) dependency_task_type = self._classify_task_type(dependency_task.name) # Define logical task ordering: # setup < design < implementation < testing < deployment # # ``setup`` (issue #455) sits below design so foundation / # scaffolding tasks (e.g. "Tech Foundation Setup", "Install # Dependencies") are valid prerequisites of design and impl # tasks. Pre-#455 they classified as ``other`` (order 2.5) # which blocked them from being prereqs of impl (order 2.0) # and produced orphan top-level nodes in the DAG. task_order = { "setup": 0.5, "design": 1, "implementation": 2, "testing": 3, "deployment": 4, "other": 2.5, # Default to implementation level } dependent_order = task_order.get(dependent_task_type, task_order["other"]) dependency_order = task_order.get(dependency_task_type, task_order["other"]) # Dependency task should come before dependent task in logical order if dependency_order >= dependent_order: return False # Check temporal logic - dependency should typically come before dependent if ( hasattr(dependency_task, "created_at") and hasattr(dependent_task, "created_at") and dependency_task.created_at and dependent_task.created_at ): # If dependency was created much later, it might not be a real dependency time_diff = (dependency_task.created_at - dependent_task.created_at).days if time_diff > 7: # More than a week later return False return True def _classify_task_type(self, task_name: str) -> str: """ Classify task into type based on name patterns. Returns ------- str One of ``'setup'``, ``'design'``, ``'testing'``, ``'deployment'``, ``'implementation'``, or ``'other'``. Notes ----- Tier-checking order is significant. Design / testing / deployment / implementation are checked before setup because their keywords are more semantically specific (e.g., "Design Initial Setup" is a design task, not a setup task). Setup is checked last among the named tiers so it acts as a catch-all for foundation / scaffolding work that doesn't carry an implementation verb. See issue #455 — pre-#455 setup tasks fell to ``'other'`` and were blocked from being prerequisites of implementation tasks by the order check in :meth:`_is_logical_dependency`. """ name_lower = task_name.lower() # Design/planning tasks if any( word in name_lower for word in [ "design", "plan", "architect", "wireframe", "spec", "research", "analyze", ] ): return "design" # Testing tasks if any( word in name_lower for word in ["test", "qa", "quality", "verify", "validation", "check"] ): return "testing" # Deployment tasks if any( word in name_lower for word in ["deploy", "release", "launch", "production", "publish"] ): return "deployment" # Implementation tasks if any( word in name_lower for word in ["implement", "build", "create", "develop", "code", "write"] ): return "implementation" # Setup / foundation / scaffolding tasks (issue #455). Last # among named tiers so design / impl / testing / deployment # win when keywords overlap (e.g. "Design initial setup" # stays a design task). # # Word-boundary matching (Codex P2 / Claude bot review on # PR #466): plain ``substring in name`` would match # ``"install"`` inside ``"uninstall"``, classifying teardown # tasks as setup and (combined with the new setup order 0.5) # making them eligible prerequisites of impl tasks via # ``setup_blocks_all`` — a regression my own PR would # introduce. Anchored prefix match (``\bword\w*``) accepts: # # - ``\binit\w*`` → init, initial, initialize, initialization # - ``\binstall\w*`` → install, installs, installation # - ``\bconfigure\w*`` → configure, configured, configuring # # And rejects: # - ``uninstall`` (preceded by 'n', no \b boundary at 'i') # - ``reconfigure`` (preceded by 'e', no \b boundary) # - ``presetup`` / ``postsetup`` (preceded by word chars) if _SETUP_PATTERN.search(name_lower): return "setup" return "other" def _clean_dependencies( self, dependencies: List[InferredDependency] ) -> List[InferredDependency]: """Remove duplicate and conflicting dependencies.""" # Group by task pair dependency_groups = defaultdict(list) for dep in dependencies: key = (dep.dependent_task_id, dep.dependency_task_id) dependency_groups[key].append(dep) # Keep the highest confidence dependency for each pair cleaned = [] for deps in list(dependency_groups.values()): best_dep = max(deps, key=lambda d: d.confidence) cleaned.append(best_dep) # Remove circular dependencies cleaned = self._remove_circular_dependencies(cleaned) # Remove transitive dependencies that are implied by other dependencies # (A -> B -> C implies A -> C, so we can remove direct A -> C) cleaned = self._remove_transitive_dependencies(cleaned) return cleaned def _remove_circular_dependencies( self, dependencies: List[InferredDependency] ) -> List[InferredDependency]: """ Remove circular dependencies by breaking cycles. Strategy: Build a graph, detect cycles, and remove the lowest confidence dependency from each cycle to break it. """ # Build adjacency list graph = defaultdict(list) dep_map = {} # (from_id, to_id) -> dependency for dep in dependencies: graph[dep.dependency_task_id].append(dep.dependent_task_id) dep_map[(dep.dependency_task_id, dep.dependent_task_id)] = dep # Detect cycles using DFS visited = set() rec_stack = set() cycles_found = [] def detect_cycle(node: str, path: List[str]) -> Optional[List[str]]: if node in rec_stack: # Found cycle - extract it cycle_start = path.index(node) cycle = path[cycle_start:] + [node] cycles_found.append(cycle) return cycle if node in visited: return None visited.add(node) rec_stack.add(node) for neighbor in graph[node]: if detect_cycle(neighbor, path + [neighbor]): break # Only need to find one cycle per path rec_stack.remove(node) return None # Find cycles for node in list( graph.keys() ): # Convert to list to avoid dictionary size change during iteration if node not in visited: detect_cycle(node, [node]) # Remove lowest confidence dependency from each cycle dependencies_to_remove = set() for cycle in cycles_found: if len(cycle) < 2: continue logger.warning(f"Detected circular dependency: {' -> '.join(cycle)}") # Find the lowest confidence dependency in the cycle cycle_deps = [] for i in range(len(cycle) - 1): from_task = cycle[i] to_task = cycle[i + 1] if (from_task, to_task) in dep_map: cycle_deps.append(dep_map[(from_task, to_task)]) if cycle_deps: # Remove the dependency with lowest confidence weakest_dep = min(cycle_deps, key=lambda d: d.confidence) dependencies_to_remove.add( (weakest_dep.dependency_task_id, weakest_dep.dependent_task_id) ) logger.info( f"Breaking circular dependency by removing: " f"{weakest_dep.dependency_task_id} -> " f"{weakest_dep.dependent_task_id} " f"(confidence: {weakest_dep.confidence})" ) # Filter out the dependencies we decided to remove cleaned = [] for dep in dependencies: key = (dep.dependency_task_id, dep.dependent_task_id) if key not in dependencies_to_remove: cleaned.append(dep) logger.info( f"Removed {len(dependencies) - len(cleaned)} dependencies to " f"break circular references" ) return cleaned def _remove_transitive_dependencies( self, dependencies: List[InferredDependency] ) -> List[InferredDependency]: """Remove dependencies that are implied by transitive relationships.""" # Build adjacency map adjacency = defaultdict(set) dep_map = {} for dep in dependencies: adjacency[dep.dependency_task_id].add(dep.dependent_task_id) dep_map[(dep.dependency_task_id, dep.dependent_task_id)] = dep # Find transitive closures def find_transitive_paths(start: str, end: str, visited: Set[str]) -> bool: if start == end: return True if start in visited: return False visited.add(start) for neighbor in adjacency[start]: if find_transitive_paths(neighbor, end, visited.copy()): return True return False # Keep only non-transitive dependencies filtered = [] for dep in dependencies: # Check if there's an indirect path has_indirect_path = False for intermediate in list(adjacency): if ( intermediate != dep.dependency_task_id and intermediate != dep.dependent_task_id ): path_to_intermediate = find_transitive_paths( dep.dependency_task_id, intermediate, set() ) path_from_intermediate = find_transitive_paths( intermediate, dep.dependent_task_id, set() ) if path_to_intermediate and path_from_intermediate: has_indirect_path = True break # Keep if no indirect path or if it's a mandatory dependency if not has_indirect_path or dep.dependency_type == "hard": filtered.append(dep) return filtered def _resolve_cycles(self, graph: DependencyGraph) -> DependencyGraph: """Resolve cycles in dependency graph by removing lowest confidence edges.""" while graph.has_cycle(): # Find cycles using DFS cycles = self._find_cycles(graph) if not cycles: break # Remove the lowest confidence edge from the first cycle cycle = cycles[0] min_confidence = float("inf") edge_to_remove = None for i in range(len(cycle)): current = cycle[i] next_node = cycle[(i + 1) % len(cycle)] # Find the edge for edge in graph.edges: if ( edge.dependency_task_id == current and edge.dependent_task_id == next_node ): if edge.confidence < min_confidence: min_confidence = edge.confidence edge_to_remove = edge break if edge_to_remove: graph.edges.remove(edge_to_remove) # Rebuild adjacency lists adjacency_list = defaultdict(list) reverse_adjacency = defaultdict(list) for dep in graph.edges: adjacency_list[dep.dependency_task_id].append(dep.dependent_task_id) reverse_adjacency[dep.dependent_task_id].append( dep.dependency_task_id ) graph.adjacency_list = dict(adjacency_list) graph.reverse_adjacency = dict(reverse_adjacency) logger.info( f"Removed edge to break cycle: " f"{edge_to_remove.dependency_task_id} -> " f"{edge_to_remove.dependent_task_id}" ) else: break return graph def _find_cycles(self, graph: DependencyGraph) -> List[List[str]]: """Find all cycles in the dependency graph.""" cycles = [] visited = set() def dfs(node: str, path: List[str], visited_in_path: Set[str]) -> None: if node in visited_in_path: # Found a cycle cycle_start = path.index(node) cycle = path[cycle_start:] + [node] cycles.append(cycle) return if node in visited: return visited.add(node) visited_in_path.add(node) path.append(node) for neighbor in graph.adjacency_list.get(node, []): dfs(neighbor, path.copy(), visited_in_path.copy()) for node in graph.nodes: if node not in visited: dfs(node, [], set()) return cycles
[docs] async def validate_dependencies(self, graph: DependencyGraph) -> Dict[str, Any]: """ Validate dependency graph for correctness. Returns ------- Dict[str, Any] Validation results with any issues found """ issues = [] warnings = [] # Check for cycles if graph.has_cycle(): issues.append("Dependency graph contains cycles") # Check for isolated components all_nodes = set(graph.nodes.keys()) connected_nodes = set() for edge in graph.edges: connected_nodes.add(edge.dependency_task_id) connected_nodes.add(edge.dependent_task_id) isolated_nodes = all_nodes - connected_nodes if isolated_nodes: warnings.append(f"{len(isolated_nodes)} tasks have no dependencies") # Check for unrealistic dependency chains critical_path = graph.get_critical_path() if len(critical_path) > 20: warnings.append( f"Very long dependency chain detected ({len(critical_path)} tasks)" ) # Check for mandatory patterns missing_mandatory = [] for task in list(graph.nodes.values()): task_text = f"{task.name} {task.description or ''}".lower() # Check if deployment tasks have testing dependencies if re.search(r"(deploy|release|launch|production)", task_text): has_test_dependency = any( re.search(r"(test|qa|quality)", graph.nodes[dep_id].name.lower()) for dep_id in graph.reverse_adjacency.get(task.id, []) ) if not has_test_dependency: missing_mandatory.append( f"Deployment task '{task.name}' missing test dependency" ) if missing_mandatory: issues.extend(missing_mandatory) return { "valid": len(issues) == 0, "issues": issues, "warnings": warnings, "statistics": { "total_tasks": len(graph.nodes), "total_dependencies": len(graph.edges), "isolated_tasks": len(isolated_nodes), "critical_path_length": len(critical_path), "has_cycles": graph.has_cycle(), }, }