Source code for src.learning.pattern_learner

"""
Pattern Learner for Marcus Phase 2.

Learns patterns from completed projects to improve future recommendations.
"""

import logging
from collections import defaultdict
from dataclasses import asdict, dataclass
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List

from src.core.models import Task, TaskStatus

logger = logging.getLogger(__name__)


[docs] @dataclass class CompletedProject: """Data from a completed project.""" project_id: str name: str tasks: List[Task] completion_date: datetime success_metrics: Dict[str, Any] team_size: int duration_days: int project_type: str
[docs] @dataclass class ProjectLearnings: """Extracted learnings from a project.""" estimation_accuracy: Dict[str, float] # task_type -> accuracy ratio dependency_patterns: List[Dict[str, Any]] workflow_patterns: Dict[str, Any] success_factors: List[str] failure_points: List[str] team_performance: Dict[str, Any]
[docs] @dataclass class Pattern: """A learned pattern.""" pattern_id: str pattern_type: str # 'estimation', 'dependency', 'workflow', etc. description: str conditions: Dict[str, Any] recommendations: Dict[str, Any] confidence: float evidence_count: int last_updated: datetime
[docs] class PatternLearner: """Learns patterns from completed projects."""
[docs] def __init__(self) -> None: self.patterns: Dict[str, Pattern] = {} self.project_history: List[CompletedProject] = [] # Task type mappings for learning self.task_type_patterns = { "setup": r"(setup|init|configure|install)", "design": r"(design|architect|plan|wireframe)", "backend": r"(backend|api|server|endpoint)", "frontend": r"(frontend|ui|client|interface)", "testing": r"(test|qa|quality|verify)", "deployment": r"(deploy|release|launch|production)", "documentation": r"(document|docs|readme|guide)", "bugfix": r"(fix|bug|issue|error)", }
[docs] async def learn_from_project(self, project: CompletedProject) -> None: """ Extract learnings from a completed project. Parameters ---------- project : CompletedProject Completed project data """ logger.info(f"Learning from completed project: {project.name}") # Add to history self.project_history.append(project) # Extract various types of learnings learnings = ProjectLearnings( estimation_accuracy=await self._analyze_estimation_accuracy(project), dependency_patterns=await self._analyze_dependency_patterns(project), workflow_patterns=await self._analyze_workflow_patterns(project), success_factors=await self._identify_success_factors(project), failure_points=await self._identify_failure_points(project), team_performance=await self._analyze_team_performance(project), ) # Update patterns based on learnings await self.update_patterns(learnings) logger.info(f"Updated {len(self.patterns)} patterns from project learnings")
[docs] async def update_patterns(self, learnings: ProjectLearnings) -> None: """ Update pattern library based on new learnings. Parameters ---------- learnings : ProjectLearnings Extracted learnings from a project """ # Update estimation patterns await self._update_estimation_patterns(learnings.estimation_accuracy) # Update dependency patterns await self._update_dependency_patterns(learnings.dependency_patterns) # Update workflow patterns await self._update_workflow_patterns(learnings.workflow_patterns) # Update success/failure patterns await self._update_outcome_patterns( learnings.success_factors, learnings.failure_points ) # Prune old or low-confidence patterns await self._prune_patterns()
async def _analyze_estimation_accuracy( self, project: CompletedProject ) -> Dict[str, float]: """Analyze how accurate task estimates were.""" accuracy_by_type: Dict[str, List[float]] = {} for task in project.tasks: if not task.estimated_hours or not hasattr(task, "actual_hours"): continue task_type = self._classify_task_type(task) estimated = task.estimated_hours actual = getattr(task, "actual_hours", estimated) if actual > 0: accuracy = min(estimated, actual) / max(estimated, actual) if task_type not in accuracy_by_type: accuracy_by_type[task_type] = [] accuracy_by_type[task_type].append(accuracy) # Calculate average accuracy per type return { task_type: sum(accuracies) / len(accuracies) for task_type, accuracies in accuracy_by_type.items() } async def _analyze_dependency_patterns( self, project: CompletedProject ) -> List[Dict[str, Any]]: """Analyze dependency patterns that worked well.""" patterns = [] # Analyze task completion order completed_tasks = [t for t in project.tasks if t.status == TaskStatus.DONE] completed_tasks.sort(key=lambda t: t.updated_at) # Look for sequential patterns for i in range(len(completed_tasks) - 1): current_task = completed_tasks[i] next_task = completed_tasks[i + 1] current_type = self._classify_task_type(current_task) next_type = self._classify_task_type(next_task) if current_type != next_type: patterns.append( { "pattern": f"{current_type}_before_{next_type}", "description": ( f"{current_type} tasks typically complete " f"before {next_type} tasks" ), "confidence": 0.7, "evidence": { "current_task": current_task.name, "next_task": next_task.name, }, } ) return patterns async def _analyze_workflow_patterns( self, project: CompletedProject ) -> Dict[str, Any]: """Analyze workflow patterns.""" patterns: Dict[str, Any] = {} # Analyze parallelism max_concurrent = 0 daily_progress: Dict[Any, int] = defaultdict(int) for task in project.tasks: if task.status == TaskStatus.DONE and task.updated_at: completion_day = task.updated_at.date() daily_progress[completion_day] += 1 max_concurrent = max(max_concurrent, daily_progress[completion_day]) patterns["max_parallelism"] = max_concurrent patterns["avg_daily_completions"] = ( sum(daily_progress.values()) / len(daily_progress) if daily_progress else 0 ) # Analyze phase distribution phase_distribution: Dict[str, int] = defaultdict(int) for task in project.tasks: task_type = self._classify_task_type(task) phase_distribution[task_type] += 1 patterns["phase_distribution"] = dict(phase_distribution) return patterns async def _identify_success_factors(self, project: CompletedProject) -> List[str]: """Identify factors that contributed to project success.""" factors = [] # Check if project completed on time if project.duration_days <= project.success_metrics.get( "planned_duration", float("inf") ): factors.append("completed_on_time") # Check task completion rate completed_tasks = len([t for t in project.tasks if t.status == TaskStatus.DONE]) completion_rate = completed_tasks / len(project.tasks) if project.tasks else 0 if completion_rate > 0.9: factors.append("high_completion_rate") # Check for good estimation accuracy total_estimated = sum(t.estimated_hours or 0 for t in project.tasks) total_actual = sum( getattr(t, "actual_hours", t.estimated_hours or 0) for t in project.tasks ) if total_estimated > 0: estimation_accuracy = min(total_estimated, total_actual) / max( total_estimated, total_actual ) if estimation_accuracy > 0.8: factors.append("accurate_estimation") # Check for balanced workload if project.team_size >= 2: factors.append("team_collaboration") return factors async def _identify_failure_points(self, project: CompletedProject) -> List[str]: """Identify points where the project struggled.""" failure_points = [] # Check for blocked tasks blocked_tasks = [ t for t in project.tasks if t.status.value in ["BLOCKED", "CANCELLED"] ] if len(blocked_tasks) > len(project.tasks) * 0.1: failure_points.append("high_blocked_task_rate") # Check for overrun tasks overrun_count = 0 for task in project.tasks: if ( task.estimated_hours and hasattr(task, "actual_hours") and getattr(task, "actual_hours", 0) > task.estimated_hours * 1.5 ): overrun_count += 1 if overrun_count > len(project.tasks) * 0.2: failure_points.append("frequent_estimation_overruns") # Check for late completion if ( project.duration_days > project.success_metrics.get("planned_duration", 0) * 1.2 ): failure_points.append("project_delay") return failure_points async def _analyze_team_performance( self, project: CompletedProject ) -> Dict[str, Any]: """Analyze team performance metrics.""" performance: Dict[str, Any] = {} # Calculate velocity (tasks per day) if project.duration_days > 0: performance["velocity"] = len(project.tasks) / project.duration_days # Calculate team efficiency performance["team_size"] = project.team_size performance["tasks_per_person"] = ( len(project.tasks) / project.team_size if project.team_size > 0 else 0 ) # Analyze task distribution by type task_type_counts: Dict[str, int] = defaultdict(int) for task in project.tasks: task_type = self._classify_task_type(task) task_type_counts[task_type] += 1 performance["task_type_distribution"] = dict(task_type_counts) return performance async def _update_estimation_patterns( self, accuracy_data: Dict[str, float] ) -> None: """Update estimation accuracy patterns.""" for task_type, accuracy in accuracy_data.items(): pattern_id = f"estimation_{task_type}" if pattern_id in self.patterns: # Update existing pattern pattern = self.patterns[pattern_id] # Weighted average with existing data weight = 0.3 # New data weight old_accuracy = pattern.recommendations.get("accuracy_multiplier", 1.0) new_accuracy = old_accuracy * (1 - weight) + accuracy * weight pattern.recommendations["accuracy_multiplier"] = new_accuracy pattern.evidence_count += 1 pattern.confidence = min(0.95, pattern.confidence + 0.05) pattern.last_updated = datetime.now(timezone.utc) else: # Create new pattern self.patterns[pattern_id] = Pattern( pattern_id=pattern_id, pattern_type="estimation", description=f"Estimation accuracy for {task_type} tasks", conditions={"task_type": task_type}, recommendations={"accuracy_multiplier": accuracy}, confidence=0.6, evidence_count=1, last_updated=datetime.now(timezone.utc), ) async def _update_dependency_patterns( self, dependency_patterns: List[Dict[str, Any]] ) -> None: """Update dependency patterns.""" for pattern_data in dependency_patterns: pattern_id = f"dependency_{pattern_data['pattern']}" if pattern_id in self.patterns: pattern = self.patterns[pattern_id] pattern.evidence_count += 1 pattern.confidence = min(0.95, pattern.confidence + 0.02) pattern.last_updated = datetime.now(timezone.utc) else: self.patterns[pattern_id] = Pattern( pattern_id=pattern_id, pattern_type="dependency", description=pattern_data["description"], conditions=pattern_data.get("conditions", {}), recommendations={"pattern": pattern_data["pattern"]}, confidence=pattern_data.get("confidence", 0.7), evidence_count=1, last_updated=datetime.now(timezone.utc), ) async def _update_workflow_patterns(self, workflow_data: Dict[str, Any]) -> None: """Update workflow patterns.""" pattern_id = "workflow_characteristics" if pattern_id in self.patterns: pattern = self.patterns[pattern_id] # Update with weighted average weight = 0.2 for key, value in workflow_data.items(): if isinstance(value, (int, float)): old_value = pattern.recommendations.get(key, value) pattern.recommendations[key] = ( old_value * (1 - weight) + value * weight ) pattern.evidence_count += 1 pattern.last_updated = datetime.now(timezone.utc) else: self.patterns[pattern_id] = Pattern( pattern_id=pattern_id, pattern_type="workflow", description="Workflow characteristics and patterns", conditions={}, recommendations=workflow_data, confidence=0.7, evidence_count=1, last_updated=datetime.now(timezone.utc), ) async def _update_outcome_patterns( self, success_factors: List[str], failure_points: List[str] ) -> None: """Update success and failure patterns.""" # Update success patterns for factor in success_factors: pattern_id = f"success_{factor}" if pattern_id in self.patterns: pattern = self.patterns[pattern_id] pattern.evidence_count += 1 pattern.confidence = min(0.95, pattern.confidence + 0.03) pattern.last_updated = datetime.now(timezone.utc) else: self.patterns[pattern_id] = Pattern( pattern_id=pattern_id, pattern_type="success_factor", description=f"Success factor: {factor.replace('_', ' ')}", conditions={}, recommendations={"factor": factor, "positive_impact": True}, confidence=0.6, evidence_count=1, last_updated=datetime.now(timezone.utc), ) # Update failure patterns for failure in failure_points: pattern_id = f"failure_{failure}" if pattern_id in self.patterns: pattern = self.patterns[pattern_id] pattern.evidence_count += 1 pattern.confidence = min(0.95, pattern.confidence + 0.03) pattern.last_updated = datetime.now(timezone.utc) else: self.patterns[pattern_id] = Pattern( pattern_id=pattern_id, pattern_type="failure_point", description=f"Failure point: {failure.replace('_', ' ')}", conditions={}, recommendations={"factor": failure, "negative_impact": True}, confidence=0.6, evidence_count=1, last_updated=datetime.now(timezone.utc), ) async def _prune_patterns(self) -> None: """Remove old or low-confidence patterns.""" cutoff_date = datetime.now(timezone.utc) - timedelta(days=180) # 6 months old min_confidence = 0.3 min_evidence = 2 patterns_to_remove = [] for pattern_id, pattern in self.patterns.items(): # Remove if too old and low evidence if ( pattern.last_updated < cutoff_date and pattern.evidence_count < min_evidence ): patterns_to_remove.append(pattern_id) # Remove if very low confidence elif pattern.confidence < min_confidence: patterns_to_remove.append(pattern_id) for pattern_id in patterns_to_remove: del self.patterns[pattern_id] logger.info(f"Pruned low-confidence pattern: {pattern_id}")
[docs] async def calculate_confidence(self, pattern: Pattern) -> float: """ Calculate pattern confidence based on evidence. Parameters ---------- pattern : Pattern Pattern to calculate confidence for Returns ------- float Confidence score between 0 and 1 """ base_confidence = pattern.confidence # Increase confidence with more evidence evidence_bonus = min(0.2, pattern.evidence_count * 0.02) # Decrease confidence if pattern is old days_old = (datetime.now(timezone.utc) - pattern.last_updated).days age_penalty = min(0.3, days_old * 0.001) final_confidence = base_confidence + evidence_bonus - age_penalty return max(0.1, min(0.95, final_confidence))
def _classify_task_type(self, task: Task) -> str: """Classify task type for pattern learning.""" task_text = f"{task.name} {task.description or ''}".lower() import re for task_type, pattern in self.task_type_patterns.items(): if re.search(pattern, task_text): return task_type return "general"
[docs] async def get_patterns_for_context(self, context: Dict[str, Any]) -> List[Pattern]: """ Get patterns relevant to a specific context. Parameters ---------- context : Dict[str, Any] Context information (project type, team size, etc.) Returns ------- List[Pattern] List of relevant patterns """ relevant_patterns = [] for pattern in self.patterns.values(): # Check if pattern conditions match context is_relevant = True for condition_key, condition_value in pattern.conditions.items(): if ( condition_key in context and context[condition_key] != condition_value ): is_relevant = False break if is_relevant: # Update confidence based on current evidence pattern.confidence = await self.calculate_confidence(pattern) relevant_patterns.append(pattern) # Sort by confidence relevant_patterns.sort(key=lambda p: p.confidence, reverse=True) return relevant_patterns
[docs] async def export_patterns(self) -> Dict[str, Any]: """Export patterns for persistence.""" return { "patterns": { pattern_id: asdict(pattern) for pattern_id, pattern in self.patterns.items() }, "export_timestamp": datetime.now(timezone.utc).isoformat(), "pattern_count": len(self.patterns), }
[docs] async def import_patterns(self, pattern_data: Dict[str, Any]) -> None: """Import patterns from persistence.""" for pattern_id, pattern_dict in pattern_data.get("patterns", {}).items(): # Convert datetime strings back to datetime objects if "last_updated" in pattern_dict: pattern_dict["last_updated"] = datetime.fromisoformat( pattern_dict["last_updated"] ) pattern = Pattern(**pattern_dict) self.patterns[pattern_id] = pattern logger.info(f"Imported {len(self.patterns)} patterns")