"""
Phase Dependency Enforcer.
Enforces strict development lifecycle phase dependencies within features,
ensuring tasks follow the correct order:
Design → Implementation → Testing → Documentation
"""
import logging
from collections import defaultdict
from dataclasses import dataclass
# Define enums locally to avoid import issues
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple
from src.core.models import Task
from src.integrations.enhanced_task_classifier import EnhancedTaskClassifier
from src.integrations.nlp_task_utils import TaskType
[docs]
class TaskPhase(Enum):
"""Development lifecycle phases in execution order."""
DESIGN = 1
INFRASTRUCTURE = 2
IMPLEMENTATION = 3
TESTING = 4
INTEGRATION = 5
DOCUMENTATION = 6
DEPLOYMENT = 7
[docs]
@classmethod
def get_dependencies(cls, phase: "TaskPhase") -> List["TaskPhase"]:
"""Get all phases that must complete before this phase."""
return [p for p in cls if p.value < phase.value]
[docs]
class DependencyType(Enum):
"""Types of task dependencies."""
PHASE = "phase"
FEATURE = "feature"
TECHNICAL = "technical"
DATA = "data"
GLOBAL = "global"
MANUAL = "manual"
# For now, use regular Task model instead of EnhancedTask to avoid circular imports
EnhancedTask = Task
logger = logging.getLogger(__name__)
[docs]
@dataclass
class FeatureGroup:
"""Represents a group of tasks belonging to the same feature."""
feature_name: str
tasks: List[Task]
phase_tasks: Dict[TaskPhase, List[Task]]
[docs]
def get_tasks_by_phase(self, phase: TaskPhase) -> List[Task]:
"""Get all tasks in a specific phase."""
return self.phase_tasks.get(phase, [])
[docs]
def has_phase(self, phase: TaskPhase) -> bool:
"""Check if feature has tasks in a specific phase."""
return phase in self.phase_tasks and len(self.phase_tasks[phase]) > 0
[docs]
class PhaseDependencyEnforcer:
"""
Enforces development lifecycle phase dependencies.
Ensures tasks within the same feature follow proper phase ordering:
Design → Infrastructure → Implementation → Testing → Documentation → Deployment
"""
# Define phase ordering
PHASE_ORDER = [
TaskPhase.DESIGN,
TaskPhase.INFRASTRUCTURE,
TaskPhase.IMPLEMENTATION,
TaskPhase.TESTING,
TaskPhase.INTEGRATION,
TaskPhase.DOCUMENTATION,
TaskPhase.DEPLOYMENT,
]
# Map TaskType to TaskPhase
TYPE_TO_PHASE_MAP = {
TaskType.DESIGN: TaskPhase.DESIGN,
TaskType.INFRASTRUCTURE: TaskPhase.INFRASTRUCTURE,
TaskType.IMPLEMENTATION: TaskPhase.IMPLEMENTATION,
TaskType.TESTING: TaskPhase.TESTING,
TaskType.INTEGRATION: TaskPhase.INTEGRATION,
TaskType.DOCUMENTATION: TaskPhase.DOCUMENTATION,
TaskType.DEPLOYMENT: TaskPhase.DEPLOYMENT,
TaskType.OTHER: TaskPhase.IMPLEMENTATION, # Default to implementation
}
[docs]
def __init__(self) -> None:
"""Initialize the phase dependency enforcer."""
self.task_classifier = EnhancedTaskClassifier()
[docs]
def enforce_phase_dependencies(self, tasks: List[Task]) -> List[Task]:
"""
Apply phase-based dependencies to tasks.
Parameters
----------
tasks
List of tasks to process.
Returns
-------
List of tasks with phase dependencies added
"""
logger.info(f"Enforcing phase dependencies for {len(tasks)} tasks")
# Group tasks by feature
feature_groups = self._group_tasks_by_feature(tasks)
logger.info(f"Identified {len(feature_groups)} feature groups")
# Apply phase dependencies within each feature
for feature_name, feature_group in feature_groups.items():
logger.debug(
f"Processing feature: {feature_name} with "
f"{len(feature_group.tasks)} tasks"
)
self._apply_phase_dependencies_to_feature(feature_group)
# Log summary
total_dependencies_added = sum(
len(task.dependencies)
for task in tasks
if hasattr(task, "_phase_dependencies_added")
and task._phase_dependencies_added
)
logger.info(
f"Phase dependency enforcement complete. "
f"Added {total_dependencies_added} dependencies"
)
return tasks
def _group_tasks_by_feature(self, tasks: List[Task]) -> Dict[str, FeatureGroup]:
"""
Group tasks by their feature/component.
Strategy:
1. Use explicit feature labels if present
2. Extract feature from task name patterns
3. Group by shared keywords/context
"""
feature_groups = defaultdict(list)
for task in tasks:
feature = self._identify_task_feature(task)
feature_groups[feature].append(task)
# Convert to FeatureGroup objects with phase classification
result = {}
for feature_name, feature_tasks in feature_groups.items():
phase_tasks = self._classify_tasks_by_phase(feature_tasks)
result[feature_name] = FeatureGroup(
feature_name=feature_name, tasks=feature_tasks, phase_tasks=phase_tasks
)
return result
def _identify_task_feature(self, task: Task) -> str:
"""
Identify which feature/component a task belongs to.
Uses fine-grained detection to prevent over-grouping independent features.
For example, "List Users" and "Get User by ID" should be separate features,
not grouped together just because they both contain "user".
Priority:
1. Explicit 'feature:' or 'feature-' label (highest priority)
2. Component-specific labels (e.g., 'authentication', 'payment')
3. Full task name after removing phase prefix (prevents over-grouping)
4. Default to task name as-is
"""
# Priority 1: Check for explicit feature label
if task.labels:
for label in task.labels:
if label.startswith("feature:"):
return label.replace("feature:", "")
# Also check for feature-X pattern
if label.startswith("feature-"):
return label
# Priority 2: Check for component labels
component_labels = [
"authentication",
"auth",
"user-management",
"user-mgmt",
"payment",
"dashboard",
"api",
"frontend",
"backend",
"database",
"monitoring",
"logging",
"security",
"notification",
"infra",
"infrastructure",
]
for label in task.labels:
label_lower = label.lower()
# Check direct match
if label_lower in component_labels:
return label_lower
# Check with component: prefix
if label_lower.startswith("component:"):
component = label_lower.replace("component:", "")
if component in component_labels:
return component
# Priority 3: Extract from task name (FINE-GRAINED)
# Use full remaining name after phase prefix to maintain specificity
name_lower = task.name.lower()
# Remove phase prefixes to get the specific feature name
phase_prefixes = [
"design",
"implement",
"test",
"document",
"deploy",
"build",
"create",
]
for prefix in phase_prefixes:
if name_lower.startswith(prefix + " "):
# Use FULL remaining name as feature ID (fine-grained)
# "Design List Users" → "list-users"
# "Design Get User by ID" → "get-user-by-id"
feature_name = name_lower[len(prefix) :].strip()
return feature_name.replace(" ", "-")
# No phase prefix found - use full name as feature ID
return name_lower.replace(" ", "-")
def _classify_tasks_by_phase(
self, tasks: List[Task]
) -> Dict[TaskPhase, List[Task]]:
"""Classify tasks into their development phases."""
phase_tasks = defaultdict(list)
for task in tasks:
# Get task type from classifier
task_type = self.task_classifier.classify(task)
# Map to phase
phase = self.TYPE_TO_PHASE_MAP.get(task_type, TaskPhase.IMPLEMENTATION)
# Store phase info on task for later use if it supports it
if hasattr(task, "phase"):
task.phase = phase
# Note: phase_confidence not available in base Task model
phase_tasks[phase].append(task)
return dict(phase_tasks)
def _apply_phase_dependencies_to_feature(self, feature_group: FeatureGroup) -> None:
"""
Apply phase ordering dependencies within a feature.
Rules:
- Tasks in phase N depend on ALL tasks in phases < N
- Within the same phase, no automatic dependencies
- Preserve existing manual dependencies
"""
phase_tasks = feature_group.phase_tasks
violations_corrected = 0
# Process phases in order
for i, current_phase in enumerate(self.PHASE_ORDER):
if not feature_group.has_phase(current_phase):
continue
current_phase_tasks = phase_tasks[current_phase]
# Find all tasks from previous phases
previous_phase_tasks = []
for prev_phase in self.PHASE_ORDER[:i]:
if feature_group.has_phase(prev_phase):
previous_phase_tasks.extend(phase_tasks[prev_phase])
# Add dependencies from previous phases
if previous_phase_tasks:
for task in current_phase_tasks:
violations = self._add_phase_dependencies(
task, previous_phase_tasks
)
violations_corrected += violations
if violations_corrected > 0:
logger.warning(
f"Corrected {violations_corrected} phase ordering violations "
f"in feature '{feature_group.feature_name}'"
)
def _add_phase_dependencies(
self, dependent_task: Task, dependency_tasks: List[Task]
) -> int:
"""
Add phase-based dependencies to a task.
Parameters
----------
dependent_task
Task that will depend on others.
dependency_tasks
Tasks that must complete first.
Returns
-------
Number of violations corrected
"""
if not dependency_tasks:
return 0
# Track additions for logging
added_count = 0
violations_found = 0
# Ensure dependencies list exists
if (
not hasattr(dependent_task, "dependencies")
or dependent_task.dependencies is None
):
dependent_task.dependencies = []
for dep_task in dependency_tasks:
# Skip self-dependencies
if dep_task.id == dependent_task.id:
continue
# Skip if already has dependency
if dep_task.id in dependent_task.dependencies:
continue
# Add the dependency
# Feature grouping already ensures these are same-feature tasks
dependent_task.dependencies.append(dep_task.id)
added_count += 1
violations_found += 1
# Add metadata if task supports it
if hasattr(dependent_task, "add_dependency"):
dependent_task.add_dependency(dep_task.id, DependencyType.PHASE)
logger.debug(
f"Added phase dependency: {dependent_task.name} "
f"({self._get_task_phase_name(dependent_task)}) depends on "
f"{dep_task.name} ({self._get_task_phase_name(dep_task)})"
)
# Note: _phase_dependencies_added not available in base Task model
# Dependencies are tracked in task.dependencies list instead
if added_count > 0:
logger.debug(
f"Added {added_count} phase dependencies to task: {dependent_task.name}"
)
return violations_found
def _get_task_phase_name(self, task: Task) -> str:
"""Get the phase name for a task."""
if hasattr(task, "phase") and task.phase:
if hasattr(task.phase, "name"):
return str(task.phase.name)
else:
return str(task.phase)
# Fallback to type classification
task_type = self.task_classifier.classify(task)
phase = self.TYPE_TO_PHASE_MAP.get(task_type, TaskPhase.IMPLEMENTATION)
return str(phase.name)
[docs]
def validate_phase_ordering(self, tasks: List[Task]) -> Tuple[bool, List[str]]:
"""
Validate that tasks follow proper phase ordering.
Returns
-------
Tuple of (is_valid, list_of_errors)
"""
errors = []
task_map = {task.id: task for task in tasks}
for task in tasks:
if not task.dependencies:
continue
task_phase = self._get_task_phase_enum(task)
if not task_phase:
continue
for dep_id in task.dependencies:
dep_task = task_map.get(dep_id)
if not dep_task:
continue
dep_phase = self._get_task_phase_enum(dep_task)
if not dep_phase:
continue
# Check if dependency violates phase ordering
if dep_phase.value > task_phase.value:
error = (
f"Phase order violation: {task.name} ({task_phase.name}) "
f"depends on {dep_task.name} ({dep_phase.name})"
)
errors.append(error)
return len(errors) == 0, errors
def _get_task_phase_enum(self, task: Task) -> Optional[TaskPhase]:
"""Get the TaskPhase enum for a task."""
if hasattr(task, "phase") and task.phase:
from typing import cast
# Cast to avoid mypy error - we check isinstance below
phase = cast(TaskPhase, task.phase)
if isinstance(phase, TaskPhase):
return phase
# Fallback to type classification
task_type = self.task_classifier.classify(task)
return self.TYPE_TO_PHASE_MAP.get(task_type, TaskPhase.IMPLEMENTATION)
def _get_task_phase(self, task_type: TaskType) -> TaskPhase:
"""Convert TaskType to TaskPhase."""
return self.TYPE_TO_PHASE_MAP.get(task_type, TaskPhase.IMPLEMENTATION)
def _should_depend_on_phase(
self, task_phase: TaskPhase, other_phase: TaskPhase
) -> bool:
"""Check if task_phase should depend on other_phase based on phase ordering."""
return task_phase.value > other_phase.value
[docs]
def get_phase_statistics(self, tasks: List[Task]) -> Dict[str, Any]:
"""
Get statistics about phase distribution and dependencies.
Returns
-------
Dictionary with phase statistics
"""
stats: Dict[str, Any] = {
"total_tasks": len(tasks),
"phase_distribution": {},
"dependency_count": 0,
"phase_dependency_count": 0,
"features_identified": set(),
}
# Count tasks by phase
phase_counts: defaultdict[str, int] = defaultdict(int)
for task in tasks:
phase = self._get_task_phase_name(task)
phase_counts[phase] += 1
# Count dependencies
if task.dependencies:
stats["dependency_count"] += len(task.dependencies)
# Count phase dependencies if enhanced task
if hasattr(task, "get_dependencies_by_type"):
try:
phase_deps = task.get_dependencies_by_type(DependencyType.PHASE)
stats["phase_dependency_count"] += len(phase_deps)
except Exception as e:
# Skip if method not available or fails, but log the issue
logger.debug(
"Failed to get phase dependencies for task %s: %s",
getattr(task, "name", "unknown"),
str(e),
)
stats["phase_distribution"] = dict(phase_counts)
# Get feature count
feature_groups = self._group_tasks_by_feature(tasks)
stats["features_identified"] = list(feature_groups.keys())
stats["feature_count"] = len(feature_groups)
return stats