"""
Project Pattern Learner.
This module extracts patterns from completed projects to improve future recommendations.
It analyzes project outcomes, team performance, and quality metrics to identify
successful patterns and common pitfalls.
"""
import json
import statistics
from collections import defaultdict
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from src.core.code_analyzer import CodeAnalyzer
from src.core.models import ProjectState, Task, TaskStatus, WorkerStatus
from src.core.types import ProjectOutcome
from src.integrations.ai_analysis_engine import AIAnalysisEngine
from src.quality.board_quality_validator import BoardQualityValidator, QualityReport
[docs]
@dataclass
class ProjectPattern:
"""Comprehensive pattern extracted from a project."""
project_id: str
project_name: str
outcome: ProjectOutcome
quality_metrics: Dict[str, float]
team_composition: Dict[str, Any]
velocity_pattern: Dict[str, float]
task_patterns: Dict[str, Any]
blocker_patterns: Dict[str, Any]
technology_stack: List[str]
implementation_patterns: Dict[str, Any]
success_factors: List[str]
risk_factors: List[str]
extracted_at: datetime
confidence_score: float
[docs]
class ProjectPatternLearner:
"""
Extracts and learns patterns from completed projects.
This class analyzes completed projects to identify patterns that lead to
success or failure, helping Marcus make better decisions for future projects.
"""
[docs]
def __init__(
self,
pattern_db: Optional[Any] = None,
ai_engine: Optional[AIAnalysisEngine] = None,
code_analyzer: Optional[CodeAnalyzer] = None,
) -> None:
"""
Initialize the pattern learner.
Parameters
----------
pattern_db : Optional[Any]
Database for storing patterns (deprecated, no longer used).
ai_engine : Optional[AIAnalysisEngine]
AI engine for analysis. Creates new if not provided.
code_analyzer : Optional[CodeAnalyzer]
Code analyzer for GitHub integration.
"""
# Pattern database removed (was part of pipeline recommendation system)
self.pattern_db = pattern_db # Keep for backward compatibility
self.ai_engine = ai_engine or AIAnalysisEngine()
self.code_analyzer = code_analyzer
self.quality_validator = BoardQualityValidator()
# Pattern storage
self.learned_patterns: List[ProjectPattern] = []
self._load_existing_patterns()
# Pattern database reference (deprecated)
if self.pattern_db and hasattr(self.pattern_db, "pattern_learner"):
self.pattern_db.pattern_learner = self
def _load_existing_patterns(self) -> None:
"""Load previously learned patterns from storage."""
patterns_file = (
Path(__file__).parent.parent.parent / "data" / "learned_patterns.json"
)
if patterns_file.exists():
with open(patterns_file, "r") as f:
data = json.load(f)
# Reconstruct patterns from JSON
for pattern_data in data.get("patterns", []):
# Convert datetime strings back to datetime objects
pattern_data["extracted_at"] = datetime.fromisoformat(
pattern_data["extracted_at"]
)
# Reconstruct ProjectOutcome
outcome_data = pattern_data["outcome"]
pattern_data["outcome"] = ProjectOutcome(**outcome_data)
self.learned_patterns.append(ProjectPattern(**pattern_data))
[docs]
async def learn_from_project(
self,
project_state: ProjectState,
tasks: List[Task],
team_members: List[WorkerStatus],
outcome: ProjectOutcome,
github_owner: Optional[str] = None,
github_repo: Optional[str] = None,
) -> ProjectPattern:
"""
Extract patterns from a completed project.
Parameters
----------
project_state : ProjectState
Final state of the completed project
tasks : List[Task]
All tasks from the project
team_members : List[WorkerStatus]
Team members who worked on the project
outcome : ProjectOutcome
Actual project outcome metrics
github_owner : Optional[str]
GitHub repository owner for code analysis
github_repo : Optional[str]
GitHub repository name for code analysis
Returns
-------
ProjectPattern
Extracted pattern from the project
"""
# Analyze project quality
quality_report = self.quality_validator.validate_board(tasks)
quality_metrics = self._extract_quality_metrics(quality_report, tasks)
# Analyze team performance
team_metrics = self._analyze_team_performance(tasks, team_members)
team_composition = self._analyze_team_composition(team_members)
# Analyze task patterns
task_patterns = self._analyze_task_patterns(tasks)
velocity_pattern = self._analyze_velocity_pattern(tasks)
# Analyze blockers and risks
blocker_patterns = self._analyze_blocker_patterns(tasks)
# Analyze implementation if GitHub integration available
implementation_patterns = {}
technology_stack = []
if self.code_analyzer and github_owner and github_repo:
implementation_patterns = await self._analyze_implementation(
tasks, github_owner, github_repo
)
technology_stack = await self._detect_technology_stack(
github_owner, github_repo
)
# Use AI to identify success/risk factors
success_factors, risk_factors = await self._identify_key_factors(
project_state, quality_metrics, team_metrics, outcome
)
# Calculate confidence score
confidence_score = self._calculate_confidence_score(
quality_report.score, outcome.quality_score, len(tasks), len(team_members)
)
# Create pattern
pattern = ProjectPattern(
project_id=project_state.board_id,
project_name=project_state.project_name,
outcome=outcome,
quality_metrics=quality_metrics,
team_composition=team_composition,
velocity_pattern=velocity_pattern,
task_patterns=task_patterns,
blocker_patterns=blocker_patterns,
technology_stack=technology_stack,
implementation_patterns=implementation_patterns,
success_factors=success_factors,
risk_factors=risk_factors,
extracted_at=datetime.now(timezone.utc),
confidence_score=confidence_score,
)
# Store pattern
self._store_pattern(pattern)
# Update pattern database (deprecated - kept for backward compatibility)
if self.pattern_db and hasattr(self.pattern_db, "add_success_pattern"):
if outcome.successful:
self.pattern_db.add_success_pattern(self._pattern_to_flow_data(pattern))
else:
self.pattern_db.add_failure_pattern(
self._pattern_to_flow_data(pattern), outcome.failure_reasons or []
)
return pattern
def _extract_quality_metrics(
self, quality_report: QualityReport, tasks: List[Task]
) -> Dict[str, float]:
"""Extract detailed quality metrics from the project."""
completed_tasks = [t for t in tasks if t.status == TaskStatus.DONE]
metrics = {
"board_quality_score": quality_report.score,
"description_quality": quality_report.metrics.get(
"description_coverage", 0
),
"label_quality": quality_report.metrics.get("label_coverage", 0),
"estimate_accuracy": self._calculate_estimate_accuracy(completed_tasks),
"completion_rate": len(completed_tasks) / len(tasks) if tasks else 0,
"on_time_delivery": self._calculate_on_time_delivery(completed_tasks),
"rework_rate": self._calculate_rework_rate(tasks),
"blocker_rate": (
len([t for t in tasks if t.status == TaskStatus.BLOCKED]) / len(tasks)
if tasks
else 0
),
}
return metrics
def _analyze_team_performance(
self, tasks: List[Task], team_members: List[WorkerStatus]
) -> TeamPerformanceMetrics:
"""Analyze team performance metrics."""
# Calculate velocity
completed_tasks = [t for t in tasks if t.status == TaskStatus.DONE]
# Group tasks by week
tasks_by_week = defaultdict(list)
for task in completed_tasks:
if task.updated_at:
week = task.updated_at.isocalendar()[1]
tasks_by_week[week].append(task)
weekly_velocities = [len(tasks) for tasks in tasks_by_week.values()]
avg_velocity = statistics.mean(weekly_velocities) if weekly_velocities else 0
# Calculate completion rate
completion_rate = len(completed_tasks) / len(tasks) if tasks else 0
# Calculate blocker resolution time (simplified)
blocker_times = []
for task in tasks:
if task.status == TaskStatus.DONE and hasattr(task, "blocker_duration"):
blocker_times.append(task.blocker_duration)
avg_blocker_time = statistics.mean(blocker_times) if blocker_times else 0
# Calculate collaboration score (based on task handoffs)
collaboration_score = self._calculate_collaboration_score(tasks, team_members)
# Calculate skill utilization
skill_utilization = self._calculate_skill_utilization(tasks, team_members)
# Calculate per-agent performance
agent_performance = {}
for member in team_members:
member_tasks = [
t for t in completed_tasks if t.assigned_to == member.worker_id
]
if member_tasks:
agent_performance[member.worker_id] = {
"completed_tasks": len(member_tasks),
"avg_completion_time": (
statistics.mean(
[
(t.updated_at - t.created_at).days
for t in member_tasks
if t.updated_at and t.created_at
]
)
if member_tasks
else 0
),
"quality_score": member.performance_score,
}
return TeamPerformanceMetrics(
average_velocity=avg_velocity,
task_completion_rate=completion_rate,
blocker_resolution_time=avg_blocker_time,
collaboration_score=collaboration_score,
skill_utilization=skill_utilization,
agent_performance=agent_performance,
)
def _analyze_team_composition(
self, team_members: List[WorkerStatus]
) -> Dict[str, Any]:
"""Analyze team composition and skills."""
roles: Dict[str, int] = defaultdict(int)
skill_coverage: Dict[str, int] = defaultdict(int)
experience_distribution: Dict[str, int] = {"senior": 0, "mid": 0, "junior": 0}
composition = {
"team_size": len(team_members),
"roles": roles,
"skill_coverage": skill_coverage,
"experience_distribution": experience_distribution,
}
for member in team_members:
# Count roles
roles[member.role] += 1
# Count skills
for skill in member.skills:
skill_coverage[skill] += 1
# Estimate experience level based on completed tasks
if member.completed_tasks_count > 50:
experience_distribution["senior"] += 1
elif member.completed_tasks_count > 20:
experience_distribution["mid"] += 1
else:
experience_distribution["junior"] += 1
# Convert defaultdicts to regular dicts for JSON serialization
composition["roles"] = dict(roles)
composition["skill_coverage"] = dict(skill_coverage)
return composition
def _analyze_task_patterns(self, tasks: List[Task]) -> Dict[str, Any]:
"""Analyze patterns in task structure and organization."""
patterns = {
"task_size_distribution": self._get_task_size_distribution(tasks),
"dependency_depth": self._calculate_dependency_depth(tasks),
"parallel_work_ratio": self._calculate_parallel_work_ratio(tasks),
"task_type_distribution": self._get_task_type_distribution(tasks),
"priority_distribution": self._get_priority_distribution(tasks),
"phase_structure": self._analyze_phase_structure(tasks),
}
return patterns
def _analyze_velocity_pattern(self, tasks: List[Task]) -> Dict[str, float]:
"""Analyze velocity patterns throughout the project."""
completed_tasks = [t for t in tasks if t.status == TaskStatus.DONE]
if not completed_tasks:
return {}
# Sort by completion date
completed_tasks.sort(key=lambda t: t.updated_at or datetime.now(timezone.utc))
# Calculate velocity by project phase (quartiles)
total_tasks = len(completed_tasks)
quartile_size = total_tasks // 4
velocity_pattern = {}
phases = ["start", "early", "middle", "end"]
for i, phase in enumerate(phases):
start_idx = i * quartile_size
end_idx = (i + 1) * quartile_size if i < 3 else total_tasks
phase_tasks = completed_tasks[start_idx:end_idx]
if phase_tasks:
# Calculate tasks per day for this phase
duration = (phase_tasks[-1].updated_at - phase_tasks[0].updated_at).days
velocity = len(phase_tasks) / max(duration, 1)
velocity_pattern[phase] = velocity
return velocity_pattern
def _analyze_blocker_patterns(self, tasks: List[Task]) -> Dict[str, Any]:
"""Analyze patterns in blockers and impediments."""
blocked_tasks = [
t for t in tasks if hasattr(t, "was_blocked") and t.was_blocked
]
patterns = {
"blocker_frequency": len(blocked_tasks) / len(tasks) if tasks else 0,
"blocker_categories": self._categorize_blockers(blocked_tasks),
"blocker_timing": self._analyze_blocker_timing(blocked_tasks, tasks),
"recurring_blockers": self._identify_recurring_blockers(blocked_tasks),
}
return patterns
async def _analyze_implementation(
self, tasks: List[Task], owner: str, repo: str
) -> Dict[str, Any]:
"""Analyze implementation patterns from GitHub."""
if not self.code_analyzer:
return {}
endpoints_created: List[str] = []
models_created: List[str] = []
patterns: Dict[str, Any] = {
"endpoints_created": endpoints_created,
"models_created": models_created,
"test_coverage": 0,
"code_review_metrics": {},
"refactoring_rate": 0,
}
# Analyze implementation details for completed tasks
for task in tasks:
if task.status == TaskStatus.DONE and task.assigned_to:
# Get implementation details
worker = WorkerStatus(
worker_id=task.assigned_to,
name=task.assigned_to,
role="developer",
email=f"{task.assigned_to}@example.com",
current_tasks=[],
completed_tasks_count=0,
capacity=40,
skills=[],
availability={},
)
analysis = await self.code_analyzer.analyze_task_completion(
task, worker, owner, repo
)
# Extract patterns
if analysis.get("findings", {}).get("implementations"):
impl = analysis["findings"]["implementations"]
endpoints_created.extend(impl.get("endpoints", []))
models_created.extend(impl.get("models", []))
return patterns
async def _detect_technology_stack(self, owner: str, repo: str) -> List[str]:
"""Detect technology stack from repository."""
# This would analyze package files, imports, etc.
# Simplified for now
return ["python", "fastapi", "postgresql", "react"]
async def _identify_key_factors(
self,
project_state: ProjectState,
quality_metrics: Dict[str, float],
team_metrics: TeamPerformanceMetrics,
outcome: ProjectOutcome,
) -> Tuple[List[str], List[str]]:
"""Use AI to identify key success and risk factors."""
if not self.ai_engine.client:
# Fallback analysis
return self._identify_key_factors_fallback(
quality_metrics, team_metrics, outcome
)
prompt = f"""Analyze this completed project and identify key success \
factors and risk factors.
Project Outcome:
- Success: {outcome.successful}
- Duration: {outcome.completion_time_days} days
- Quality Score: {outcome.quality_score}
- Cost: ${outcome.cost}
Quality Metrics:
{json.dumps(quality_metrics, indent=2)}
Team Performance:
- Average Velocity: {team_metrics.average_velocity} tasks/week
- Completion Rate: {team_metrics.task_completion_rate:.2%}
- Collaboration Score: {team_metrics.collaboration_score:.2f}
Identify:
1. 3-5 key factors that contributed to the outcome
2. 3-5 risk factors or issues that impacted the project
Return JSON:
{{
"success_factors": ["factor1", "factor2", ...],
"risk_factors": ["risk1", "risk2", ...]
}}"""
try:
response = await self.ai_engine._call_claude(prompt)
result = json.loads(response)
return (result.get("success_factors", []), result.get("risk_factors", []))
except Exception:
return self._identify_key_factors_fallback(
quality_metrics, team_metrics, outcome
)
def _identify_key_factors_fallback(
self,
quality_metrics: Dict[str, float],
team_metrics: TeamPerformanceMetrics,
outcome: ProjectOutcome,
) -> Tuple[List[str], List[str]]:
"""Fallback method to identify key factors."""
success_factors = []
risk_factors = []
# Analyze quality metrics
if quality_metrics["board_quality_score"] > 0.8:
success_factors.append("High quality task definitions and organization")
elif quality_metrics["board_quality_score"] < 0.5:
risk_factors.append("Poor task definition and organization")
if quality_metrics["on_time_delivery"] > 0.8:
success_factors.append("Excellent time estimation and delivery")
elif quality_metrics["on_time_delivery"] < 0.5:
risk_factors.append("Poor time estimation leading to delays")
# Analyze team metrics
if team_metrics.average_velocity > 10:
success_factors.append("High team velocity and productivity")
elif team_metrics.average_velocity < 3:
risk_factors.append("Low team velocity impacting progress")
if team_metrics.collaboration_score > 0.7:
success_factors.append("Strong team collaboration")
# Analyze outcome
if outcome.successful and outcome.quality_score > 0.8:
success_factors.append("Focus on quality throughout development")
if outcome.failure_reasons:
risk_factors.extend(outcome.failure_reasons[:3])
return success_factors[:5], risk_factors[:5]
def _calculate_confidence_score(
self,
board_quality: float,
outcome_quality: float,
task_count: int,
team_size: int,
) -> float:
"""Calculate confidence score for the extracted pattern."""
# Base confidence on data quality and completeness
scores = []
# Board quality contribution
scores.append(board_quality)
# Outcome quality contribution
scores.append(outcome_quality)
# Task count contribution (more tasks = more data)
task_score = min(task_count / 50, 1.0) # Normalize to 50 tasks
scores.append(task_score)
# Team size contribution
team_score = min(team_size / 5, 1.0) # Normalize to 5 team members
scores.append(team_score)
return statistics.mean(scores)
def _store_pattern(self, pattern: ProjectPattern) -> None:
"""Store the learned pattern."""
self.learned_patterns.append(pattern)
# Save to disk
patterns_file = (
Path(__file__).parent.parent.parent / "data" / "learned_patterns.json"
)
patterns_file.parent.mkdir(exist_ok=True)
# Convert patterns to JSON-serializable format
patterns_data = []
for p in self.learned_patterns:
pattern_dict = asdict(p)
# Convert datetime to ISO format
pattern_dict["extracted_at"] = p.extracted_at.isoformat()
# Convert ProjectOutcome to dict
pattern_dict["outcome"] = asdict(p.outcome)
patterns_data.append(pattern_dict)
with open(patterns_file, "w") as f:
json.dump({"patterns": patterns_data}, f, indent=2)
def _pattern_to_flow_data(self, pattern: ProjectPattern) -> Dict[str, Any]:
"""Convert pattern to flow data format for pattern database."""
return {
"flow_id": pattern.project_id,
"project_name": pattern.project_name,
"metrics": {
"task_count": pattern.task_patterns.get("task_count", 0),
"complexity_score": pattern.quality_metrics.get(
"board_quality_score", 0
),
"confidence_avg": pattern.confidence_score,
"total_cost": pattern.outcome.cost,
"total_duration_ms": pattern.outcome.completion_time_days
* 24
* 60
* 60
* 1000,
},
"requirements": [], # Would need to extract from tasks
"tasks": [], # Would need to include task details
"decisions": [], # Would need to extract from project history
}
# Helper methods for analysis
def _calculate_estimate_accuracy(self, completed_tasks: List[Task]) -> float:
"""Calculate how accurate time estimates were."""
# This would need actual vs estimated time data
# Placeholder for now
return 0.75
def _calculate_on_time_delivery(self, completed_tasks: List[Task]) -> float:
"""Calculate percentage of tasks delivered on time."""
on_time = 0
total_with_due_date = 0
for task in completed_tasks:
if task.due_date and task.updated_at:
total_with_due_date += 1
if task.updated_at <= task.due_date:
on_time += 1
return on_time / total_with_due_date if total_with_due_date > 0 else 1.0
def _calculate_rework_rate(self, tasks: List[Task]) -> float:
"""Calculate rate of tasks that needed rework."""
# This would need to track task state changes
# Placeholder for now
return 0.1
def _calculate_collaboration_score(
self, tasks: List[Task], team_members: List[WorkerStatus]
) -> float:
"""Calculate collaboration score based on task handoffs."""
if len(team_members) < 2:
return 0.0
# Count tasks that involved multiple team members
collaborative_tasks = 0
for task in tasks:
# This would need task history to track reassignments
# Placeholder logic
if task.labels and any("collaborative" in label for label in task.labels):
collaborative_tasks += 1
return min(collaborative_tasks / (len(tasks) * 0.3), 1.0) if tasks else 0.0
def _calculate_skill_utilization(
self, tasks: List[Task], team_members: List[WorkerStatus]
) -> Dict[str, float]:
"""Calculate how well team skills were utilized."""
skill_usage: Dict[str, int] = defaultdict(int)
skill_availability: Dict[str, int] = defaultdict(int)
# Count available skills
for member in team_members:
for skill in member.skills:
skill_availability[skill] += 1
# Count skill usage in tasks
for task in tasks:
if task.labels:
for label in task.labels:
if label.startswith("skill:"):
skill = label.replace("skill:", "")
skill_usage[skill] += 1
# Calculate utilization
utilization = {}
for skill, available in skill_availability.items():
used = skill_usage.get(skill, 0)
utilization[skill] = min(
used / (available * 5), 1.0
) # Assume 5 tasks per skill
return utilization
def _get_task_size_distribution(self, tasks: List[Task]) -> Dict[str, int]:
"""Get distribution of task sizes."""
distribution = {
"small": 0, # < 4 hours
"medium": 0, # 4-8 hours
"large": 0, # 8-16 hours
"xlarge": 0, # > 16 hours
}
for task in tasks:
if task.estimated_hours:
if task.estimated_hours < 4:
distribution["small"] += 1
elif task.estimated_hours < 8:
distribution["medium"] += 1
elif task.estimated_hours < 16:
distribution["large"] += 1
else:
distribution["xlarge"] += 1
return distribution
def _calculate_dependency_depth(self, tasks: List[Task]) -> int:
"""Calculate maximum dependency chain depth."""
max_depth = 0
def get_depth(task_id: str, current_depth: int = 0) -> int:
task = next((t for t in tasks if t.id == task_id), None)
if not task or not task.dependencies:
return current_depth
max_dep_depth = current_depth
for dep_id in task.dependencies:
dep_depth = get_depth(dep_id, current_depth + 1)
max_dep_depth = max(max_dep_depth, dep_depth)
return max_dep_depth
for task in tasks:
depth = get_depth(task.id)
max_depth = max(max_depth, depth)
return max_depth
def _calculate_parallel_work_ratio(self, tasks: List[Task]) -> float:
"""Calculate ratio of tasks that can be done in parallel."""
if not tasks:
return 0.0
# Tasks without dependencies can be done in parallel
parallel_tasks = len([t for t in tasks if not t.dependencies])
return parallel_tasks / len(tasks)
def _get_task_type_distribution(self, tasks: List[Task]) -> Dict[str, int]:
"""Get distribution of task types."""
distribution: Dict[str, int] = defaultdict(int)
for task in tasks:
if task.labels:
for label in task.labels:
if label.startswith("type:"):
task_type = label.replace("type:", "")
distribution[task_type] += 1
break
else:
distribution["untyped"] += 1
else:
distribution["untyped"] += 1
return dict(distribution)
def _get_priority_distribution(self, tasks: List[Task]) -> Dict[str, int]:
"""Get distribution of priorities."""
distribution: Dict[str, int] = defaultdict(int)
for task in tasks:
# Priority is always present in the Task model, so no need for None check
distribution[task.priority.value] += 1
return dict(distribution)
def _analyze_phase_structure(self, tasks: List[Task]) -> Dict[str, Any]:
"""Analyze how project was structured in phases."""
phases = defaultdict(list)
for task in tasks:
if task.labels:
for label in task.labels:
if label.startswith("phase:"):
phase = label.replace("phase:", "")
phases[phase].append(task)
break
else:
phases["unphased"].append(task)
else:
phases["unphased"].append(task)
# Analyze phase characteristics
phase_info = {}
for phase_name, phase_tasks in phases.items():
phase_info[phase_name] = {
"task_count": len(phase_tasks),
"completion_rate": (
len([t for t in phase_tasks if t.status == TaskStatus.DONE])
/ len(phase_tasks)
if phase_tasks
else 0
),
"avg_task_size": (
statistics.mean(
[t.estimated_hours for t in phase_tasks if t.estimated_hours]
)
if any(t.estimated_hours for t in phase_tasks)
else 0
),
}
return phase_info
def _categorize_blockers(self, blocked_tasks: List[Task]) -> Dict[str, int]:
"""Categorize blockers by type."""
categories: Dict[str, int] = defaultdict(int)
# This would need actual blocker descriptions
# Placeholder categorization
for task in blocked_tasks:
if task.labels:
if any("technical" in label for label in task.labels):
categories["technical"] += 1
elif any("dependency" in label for label in task.labels):
categories["dependency"] += 1
elif any("external" in label for label in task.labels):
categories["external"] += 1
else:
categories["other"] += 1
else:
categories["unknown"] += 1
return dict(categories)
def _analyze_blocker_timing(
self, blocked_tasks: List[Task], all_tasks: List[Task]
) -> Dict[str, float]:
"""Analyze when blockers tend to occur in project lifecycle."""
if not all_tasks:
return {}
# Divide project into quartiles
quartiles: Dict[str, float] = {"q1": 0.0, "q2": 0.0, "q3": 0.0, "q4": 0.0}
# This would need actual timeline data
# Placeholder distribution
total_blocked = len(blocked_tasks)
if total_blocked > 0:
quartiles["q1"] = 0.2 # 20% in first quarter
quartiles["q2"] = 0.4 # 40% in second quarter
quartiles["q3"] = 0.3 # 30% in third quarter
quartiles["q4"] = 0.1 # 10% in final quarter
return quartiles
def _identify_recurring_blockers(self, blocked_tasks: List[Task]) -> List[str]:
"""Identify patterns in recurring blockers."""
# This would analyze blocker descriptions for patterns
# Placeholder for now
return [
"Waiting for API documentation",
"Database schema changes",
"External service integration",
]
[docs]
def find_similar_projects(
self, target_pattern: ProjectPattern, min_similarity: float = 0.7
) -> List[Tuple[ProjectPattern, float]]:
"""
Find similar projects based on patterns.
Parameters
----------
target_pattern : ProjectPattern
Pattern to match against
min_similarity : float
Minimum similarity score (0-1)
Returns
-------
List[Tuple[ProjectPattern, float]]
List of (pattern, similarity_score) tuples
"""
similar_projects = []
for pattern in self.learned_patterns:
similarity = self._calculate_pattern_similarity(target_pattern, pattern)
if similarity >= min_similarity:
similar_projects.append((pattern, similarity))
# Sort by similarity
similar_projects.sort(key=lambda x: x[1], reverse=True)
return similar_projects
def _calculate_pattern_similarity(
self, pattern1: ProjectPattern, pattern2: ProjectPattern
) -> float:
"""Calculate similarity between two project patterns."""
scores = []
# Team composition similarity
team_sim = self._calculate_team_similarity(
pattern1.team_composition, pattern2.team_composition
)
scores.append(team_sim * 0.2) # 20% weight
# Task pattern similarity
task_sim = self._calculate_task_pattern_similarity(
pattern1.task_patterns, pattern2.task_patterns
)
scores.append(task_sim * 0.3) # 30% weight
# Technology stack similarity
tech_sim = self._calculate_tech_stack_similarity(
pattern1.technology_stack, pattern2.technology_stack
)
scores.append(tech_sim * 0.2) # 20% weight
# Quality metrics similarity
quality_sim = self._calculate_quality_similarity(
pattern1.quality_metrics, pattern2.quality_metrics
)
scores.append(quality_sim * 0.3) # 30% weight
return sum(scores)
def _calculate_team_similarity(
self, team1: Dict[str, Any], team2: Dict[str, Any]
) -> float:
"""Calculate similarity between team compositions."""
scores = []
# Team size similarity
size1 = team1.get("team_size", 0)
size2 = team2.get("team_size", 0)
size_sim = 1 - abs(size1 - size2) / max(size1, size2, 1)
scores.append(size_sim)
# Role overlap
roles1 = set(team1.get("roles", {}).keys())
roles2 = set(team2.get("roles", {}).keys())
if roles1 or roles2:
role_sim = len(roles1 & roles2) / len(roles1 | roles2)
scores.append(role_sim)
# Skill overlap
skills1 = set(team1.get("skill_coverage", {}).keys())
skills2 = set(team2.get("skill_coverage", {}).keys())
if skills1 or skills2:
skill_sim = len(skills1 & skills2) / len(skills1 | skills2)
scores.append(skill_sim)
return statistics.mean(scores) if scores else 0.0
def _calculate_task_pattern_similarity(
self, pattern1: Dict[str, Any], pattern2: Dict[str, Any]
) -> float:
"""Calculate similarity between task patterns."""
scores = []
# Task size distribution similarity
size_dist1 = pattern1.get("task_size_distribution", {})
size_dist2 = pattern2.get("task_size_distribution", {})
for size in ["small", "medium", "large", "xlarge"]:
val1 = size_dist1.get(size, 0)
val2 = size_dist2.get(size, 0)
total = val1 + val2
if total > 0:
scores.append(1 - abs(val1 - val2) / total)
# Dependency depth similarity
depth1 = pattern1.get("dependency_depth", 0)
depth2 = pattern2.get("dependency_depth", 0)
if depth1 or depth2:
depth_sim = 1 - abs(depth1 - depth2) / max(depth1, depth2)
scores.append(depth_sim)
# Parallel work ratio similarity
parallel1 = pattern1.get("parallel_work_ratio", 0)
parallel2 = pattern2.get("parallel_work_ratio", 0)
scores.append(1 - abs(parallel1 - parallel2))
return statistics.mean(scores) if scores else 0.0
def _calculate_tech_stack_similarity(
self, stack1: List[str], stack2: List[str]
) -> float:
"""Calculate similarity between technology stacks."""
if not stack1 and not stack2:
return 1.0
if not stack1 or not stack2:
return 0.0
set1 = set(stack1)
set2 = set(stack2)
return len(set1 & set2) / len(set1 | set2)
def _calculate_quality_similarity(
self, metrics1: Dict[str, float], metrics2: Dict[str, float]
) -> float:
"""Calculate similarity between quality metrics."""
scores = []
# Compare each metric
for metric in ["board_quality_score", "completion_rate", "on_time_delivery"]:
val1 = metrics1.get(metric, 0)
val2 = metrics2.get(metric, 0)
scores.append(1 - abs(val1 - val2))
return statistics.mean(scores) if scores else 0.0
[docs]
def get_recommendations_from_patterns(
self, current_project: Dict[str, Any], max_recommendations: int = 5
) -> List[Dict[str, Any]]:
"""
Get recommendations based on learned patterns.
Parameters
----------
current_project : Dict[str, Any]
Current project information
max_recommendations : int
Maximum number of recommendations to return
Returns
-------
List[Dict[str, Any]]
List of recommendations with confidence scores
"""
recommendations = []
# Find similar successful projects
successful_patterns = [
p
for p in self.learned_patterns
if p.outcome.successful and p.confidence_score > 0.7
]
# Extract recommendations from successful patterns
for pattern in successful_patterns[:max_recommendations]:
recommendations_list: List[Dict[str, str]] = []
rec: Dict[str, Any] = {
"type": "pattern_based",
"source_project": pattern.project_name,
"confidence": pattern.confidence_score,
"success_factors": pattern.success_factors,
"recommendations": recommendations_list,
}
# Team composition recommendations
if pattern.team_composition["team_size"] > 0:
team_size = pattern.team_composition["team_size"]
roles = ", ".join(pattern.team_composition["roles"].keys())
recommendations_list.append(
{
"category": "team",
"suggestion": (
f"Consider team size of {team_size} " f"with roles: {roles}"
),
}
)
# Task organization recommendations
if pattern.task_patterns.get("parallel_work_ratio", 0) > 0.3:
parallel_ratio = pattern.task_patterns["parallel_work_ratio"]
recommendations_list.append(
{
"category": "planning",
"suggestion": (
f"Structure {parallel_ratio:.0%} "
"of tasks for parallel execution"
),
}
)
# Quality recommendations
if pattern.quality_metrics["board_quality_score"] > 0.8:
recommendations_list.append(
{
"category": "quality",
"suggestion": (
"Maintain high task definition quality with "
"detailed descriptions and clear acceptance criteria"
),
}
)
recommendations.append(rec)
return recommendations