"""
Contextual Learning System for Marcus Phase 3.
Learns patterns specific to teams, technologies, and project types
to provide intelligent, context-aware recommendations.
"""
import logging
import statistics
from collections import Counter, defaultdict
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Dict, List, Tuple
logger = logging.getLogger(__name__)
[docs]
@dataclass
class TeamLearnings:
"""Learnings specific to a team."""
team_id: str
velocity_patterns: Dict[str, float] # task_type -> avg completion time
skill_strengths: Dict[str, float] # skill -> proficiency score
preferred_task_types: Dict[str, float] # task_type -> preference score
collaboration_patterns: Dict[str, Any]
quality_metrics: Dict[str, float]
last_updated: datetime
[docs]
@dataclass
class TechnologyLearnings:
"""Learnings specific to technology stacks."""
tech_stack: str
typical_patterns: Dict[str, Any]
estimation_multipliers: Dict[str, float] # task_type -> multiplier
common_dependencies: List[Tuple[str, str]] # (prerequisite, dependent)
risk_factors: Dict[str, float] # risk_type -> probability
best_practices: List[str]
last_updated: datetime
[docs]
@dataclass
class ProjectTypeLearnings:
"""Learnings specific to project types."""
project_type: str
typical_phases: List[str]
phase_dependencies: Dict[str, List[str]]
success_patterns: Dict[str, Any]
common_pitfalls: List[str]
resource_requirements: Dict[str, float]
last_updated: datetime
[docs]
@dataclass
class AdaptedTemplate:
"""Template adapted based on learnings."""
template_id: str
original_template: Dict[str, Any]
adaptations: Dict[str, Any]
adaptation_reasoning: str
confidence: float
usage_count: int
success_rate: float
last_used: datetime
[docs]
class ContextualLearningSystem:
"""
Learns patterns specific to teams, technologies, and project types.
Provides intelligent adaptation based on context-specific learnings
rather than generic patterns.
"""
[docs]
def __init__(self) -> None:
# Learning storage
self.team_learnings: Dict[str, TeamLearnings] = {}
self.technology_learnings: Dict[str, TechnologyLearnings] = {}
self.project_type_learnings: Dict[str, ProjectTypeLearnings] = {}
self.adapted_templates: Dict[str, AdaptedTemplate] = {}
# Learning parameters
self.min_samples_for_learning = 3
self.learning_decay_days = 90 # Learning becomes less relevant after 90 days
self.confidence_threshold = 0.7
# Context tracking
self.context_performance: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
logger.info("Contextual learning system initialized")
[docs]
async def learn_team_patterns(
self, team_id: str, completed_projects: List[Dict[str, Any]]
) -> TeamLearnings:
"""
Learn team-specific patterns from completed projects.
Args
----
team_id: Unique team identifier
completed_projects: List of completed project data
Returns
-------
Team learnings with patterns and preferences
"""
logger.info(f"Learning patterns for team: {team_id}")
if len(completed_projects) < self.min_samples_for_learning:
logger.warning(
f"Insufficient data for team {team_id} learning: "
f"{len(completed_projects)} projects"
)
return self._create_default_team_learnings(team_id)
# Analyze velocity patterns
velocity_patterns = self._analyze_team_velocity(completed_projects)
# Analyze skill strengths
skill_strengths = self._analyze_team_skills(completed_projects)
# Analyze task preferences
preferred_task_types = self._analyze_task_preferences(completed_projects)
# Analyze collaboration patterns
collaboration_patterns = self._analyze_collaboration_patterns(
completed_projects
)
# Analyze quality metrics
quality_metrics = self._analyze_team_quality(completed_projects)
team_learnings = TeamLearnings(
team_id=team_id,
velocity_patterns=velocity_patterns,
skill_strengths=skill_strengths,
preferred_task_types=preferred_task_types,
collaboration_patterns=collaboration_patterns,
quality_metrics=quality_metrics,
last_updated=datetime.now(timezone.utc),
)
self.team_learnings[team_id] = team_learnings
logger.info(
f"Learned {len(velocity_patterns)} velocity patterns for team {team_id}"
)
return team_learnings
[docs]
async def learn_technology_patterns(
self, tech_stack: str, project_outcomes: List[Dict[str, Any]]
) -> TechnologyLearnings:
"""
Learn technology-specific patterns from project outcomes.
Args
----
tech_stack: Technology stack identifier (e.g., "react-python-postgres")
project_outcomes: List of project outcome data
Returns
-------
Technology learnings with patterns and multipliers
"""
logger.info(f"Learning patterns for tech stack: {tech_stack}")
if len(project_outcomes) < self.min_samples_for_learning:
tech_learnings = self._create_default_tech_learnings(tech_stack)
self.technology_learnings[tech_stack] = tech_learnings
return tech_learnings
# Analyze typical patterns
typical_patterns = self._analyze_tech_patterns(project_outcomes)
# Calculate estimation multipliers
estimation_multipliers = self._calculate_estimation_multipliers(
project_outcomes
)
# Identify common dependencies
common_dependencies = self._identify_tech_dependencies(project_outcomes)
# Analyze risk factors
risk_factors = self._analyze_tech_risks(project_outcomes)
# Extract best practices
best_practices = self._extract_best_practices(project_outcomes)
tech_learnings = TechnologyLearnings(
tech_stack=tech_stack,
typical_patterns=typical_patterns,
estimation_multipliers=estimation_multipliers,
common_dependencies=common_dependencies,
risk_factors=risk_factors,
best_practices=best_practices,
last_updated=datetime.now(timezone.utc),
)
self.technology_learnings[tech_stack] = tech_learnings
logger.info(
f"Learned patterns for {tech_stack} from {len(project_outcomes)} projects"
)
return tech_learnings
[docs]
async def adapt_templates_intelligently(
self, project_context: Dict[str, Any]
) -> Dict[str, AdaptedTemplate]:
"""
Adapt templates based on learned patterns.
Args
----
project_context: Current project context
Returns
-------
Dictionary of adapted templates
"""
team_id = project_context.get("team_id")
tech_stack = project_context.get("tech_stack_key")
project_type = project_context.get("project_type", "general")
# Get relevant learnings
team_learning = self.team_learnings.get(team_id) if team_id else None
tech_learning = (
self.technology_learnings.get(tech_stack) if tech_stack else None
)
project_learning = self.project_type_learnings.get(project_type)
adapted_templates = {}
# Adapt estimation template
if team_learning and tech_learning:
estimation_template = await self._adapt_estimation_template(
team_learning, tech_learning, project_context
)
adapted_templates["estimation"] = estimation_template
# Adapt task generation template
if project_learning and team_learning:
task_template = await self._adapt_task_generation_template(
project_learning, team_learning, project_context
)
adapted_templates["task_generation"] = task_template
# Adapt dependency template
if tech_learning:
dependency_template = await self._adapt_dependency_template(
tech_learning, project_context
)
adapted_templates["dependencies"] = dependency_template
logger.info(f"Adapted {len(adapted_templates)} templates for project context")
return adapted_templates
[docs]
async def get_contextual_recommendations(
self, project_context: Dict[str, Any], current_state: Dict[str, Any]
) -> Dict[str, Any]:
"""
Get contextual recommendations based on learnings.
Args
----
project_context: Project context information
current_state: Current project state
Returns
-------
Context-specific recommendations
"""
recommendations: Dict[str, List[str]] = {
"team_recommendations": [],
"technology_recommendations": [],
"process_recommendations": [],
"risk_mitigations": [],
}
# Team-specific recommendations
team_id = project_context.get("team_id")
if team_id in self.team_learnings:
team_recs = await self._get_team_recommendations(
self.team_learnings[team_id], current_state
)
recommendations["team_recommendations"] = team_recs
# Technology-specific recommendations
tech_stack = project_context.get("tech_stack_key")
if tech_stack in self.technology_learnings:
tech_recs = await self._get_technology_recommendations(
self.technology_learnings[tech_stack], current_state
)
recommendations["technology_recommendations"] = tech_recs
# Process recommendations based on project type
project_type = project_context.get("project_type")
if project_type in self.project_type_learnings:
process_recs = await self._get_process_recommendations(
self.project_type_learnings[project_type], current_state
)
recommendations["process_recommendations"] = process_recs
return recommendations
def _analyze_team_velocity(
self, projects: List[Dict[str, Any]]
) -> Dict[str, float]:
"""Analyze team velocity by task type."""
velocity_data = defaultdict(list)
for project in projects:
for task_data in project.get("tasks", []):
task_type = task_data.get("type", "general")
estimated_hours = task_data.get("estimated_hours", 0)
actual_hours = task_data.get("actual_hours", 0)
# Ensure we have numeric values
try:
estimated_hours = float(estimated_hours)
actual_hours = float(actual_hours)
except (ValueError, TypeError):
continue
if estimated_hours > 0 and actual_hours > 0:
velocity_ratio = actual_hours / estimated_hours
velocity_data[task_type].append(velocity_ratio)
# Calculate average velocity per task type
velocity_patterns = {}
for task_type, ratios in list(velocity_data.items()):
if len(ratios) >= 2: # Need at least 2 samples
velocity_patterns[task_type] = statistics.mean(ratios)
return velocity_patterns
def _analyze_team_skills(self, projects: List[Dict[str, Any]]) -> Dict[str, float]:
"""Analyze team skill strengths."""
skill_performance = defaultdict(list)
for project in projects:
tech_stack = project.get("tech_stack", [])
success_rate = project.get("success_metrics", {}).get(
"completion_rate", 0.8
)
for tech in tech_stack:
skill_performance[tech].append(success_rate)
# Calculate average performance per skill
skill_strengths = {}
for skill, performances in list(skill_performance.items()):
if len(performances) >= 2:
skill_strengths[skill] = statistics.mean(performances)
return skill_strengths
def _analyze_task_preferences(
self, projects: List[Dict[str, Any]]
) -> Dict[str, float]:
"""Analyze team preferences for task types."""
task_type_performance = defaultdict(list)
for project in projects:
for task_data in project.get("tasks", []):
task_type = task_data.get("type", "general")
quality_score = task_data.get("quality_score", 0.8)
completion_time_ratio = task_data.get("completion_time_ratio", 1.0)
# Preference score based on quality and efficiency
preference_score = (quality_score + (2.0 - completion_time_ratio)) / 2
task_type_performance[task_type].append(preference_score)
# Calculate preferences
preferences = {}
for task_type, scores in list(task_type_performance.items()):
if len(scores) >= 2:
preferences[task_type] = statistics.mean(scores)
return preferences
def _analyze_collaboration_patterns(
self, projects: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""Analyze team collaboration patterns."""
patterns = {
"avg_team_size": 0,
"parallel_task_preference": 0.5,
"communication_frequency": "medium",
"review_thoroughness": 0.8,
}
team_sizes = []
parallel_tasks = []
for project in projects:
team_size = project.get("team_size", 3)
team_sizes.append(team_size)
# Analyze parallel vs sequential work
tasks = project.get("tasks", [])
if tasks:
overlapping_tasks = sum(
1 for task in tasks if task.get("status") == "IN_PROGRESS"
)
parallel_ratio = overlapping_tasks / len(tasks)
parallel_tasks.append(parallel_ratio)
if team_sizes:
patterns["avg_team_size"] = statistics.mean(team_sizes)
if parallel_tasks:
patterns["parallel_task_preference"] = statistics.mean(parallel_tasks)
return patterns
def _analyze_team_quality(self, projects: List[Dict[str, Any]]) -> Dict[str, float]:
"""Analyze team quality metrics."""
quality_metrics = {}
all_quality_scores = []
bug_rates = []
review_coverage = []
for project in projects:
success_metrics = project.get("success_metrics", {})
quality_score = success_metrics.get("quality_score", 0.8)
all_quality_scores.append(quality_score)
bug_rate = success_metrics.get("bug_rate", 0.1)
bug_rates.append(bug_rate)
review_cov = success_metrics.get("review_coverage", 0.8)
review_coverage.append(review_cov)
if all_quality_scores:
quality_metrics["average_quality"] = statistics.mean(all_quality_scores)
if bug_rates:
quality_metrics["average_bug_rate"] = statistics.mean(bug_rates)
if review_coverage:
quality_metrics["review_coverage"] = statistics.mean(review_coverage)
return quality_metrics
def _analyze_tech_patterns(self, outcomes: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze technology-specific patterns."""
patterns = {}
# Analyze common project structures
structures: Counter[str] = Counter()
for outcome in outcomes:
structure = outcome.get("project_structure", "standard")
structures[structure] += 1
patterns["common_structures"] = dict(structures.most_common(3))
# Analyze typical timeline patterns
durations = [
o.get("duration_days", 30) for o in outcomes if o.get("duration_days")
]
if durations:
patterns["typical_duration"] = {
"mean": statistics.mean(durations),
"median": statistics.median(durations),
"std_dev": statistics.stdev(durations) if len(durations) > 1 else 0,
}
return patterns
def _calculate_estimation_multipliers(
self, outcomes: List[Dict[str, Any]]
) -> Dict[str, float]:
"""Calculate estimation multipliers for different task types."""
multipliers = {}
task_type_ratios = defaultdict(list)
for outcome in outcomes:
for task_data in outcome.get("tasks", []):
task_type = task_data.get("type", "general")
estimated = task_data.get("estimated_hours", 0)
actual = task_data.get("actual_hours", 0)
if estimated > 0 and actual > 0:
ratio = actual / estimated
task_type_ratios[task_type].append(ratio)
# Calculate multipliers
for task_type, ratios in list(task_type_ratios.items()):
if len(ratios) >= 2:
multipliers[task_type] = statistics.mean(ratios)
return multipliers
def _identify_tech_dependencies(
self, outcomes: List[Dict[str, Any]]
) -> List[Tuple[str, str]]:
"""Identify common technology dependencies."""
dependencies = []
dependency_counter: Counter[Tuple[str, str]] = Counter()
for outcome in outcomes:
project_deps = outcome.get("dependencies", [])
for dep in project_deps:
if (
isinstance(dep, dict)
and "prerequisite" in dep
and "dependent" in dep
):
dep_pair = (dep["prerequisite"], dep["dependent"])
dependency_counter[dep_pair] += 1
# Return most common dependencies
for (prereq, dep), count in dependency_counter.most_common(10):
if count >= 2: # Appeared in at least 2 projects
dependencies.append((prereq, dep))
return dependencies
def _analyze_tech_risks(self, outcomes: List[Dict[str, Any]]) -> Dict[str, float]:
"""Analyze technology-specific risks."""
risks: defaultdict[str, int] = defaultdict(int)
total_projects = len(outcomes)
for outcome in outcomes:
encountered_risks = outcome.get("risks_encountered", [])
for risk in encountered_risks:
risks[risk] += 1
# Convert to probabilities
risk_probabilities = {}
for risk, count in list(risks.items()):
if count >= 2: # Risk appeared in at least 2 projects
risk_probabilities[risk] = count / total_projects
return risk_probabilities
def _extract_best_practices(self, outcomes: List[Dict[str, Any]]) -> List[str]:
"""Extract best practices from successful projects."""
practices: Counter[str] = Counter()
for outcome in outcomes:
success_rate = outcome.get("success_metrics", {}).get(
"completion_rate", 0.8
)
if success_rate > 0.85: # Only from successful projects
project_practices = outcome.get("practices_used", [])
for practice in project_practices:
practices[practice] += 1
# Return practices used in multiple successful projects
return [practice for practice, count in practices.most_common(10) if count >= 2]
def _create_default_team_learnings(self, team_id: str) -> TeamLearnings:
"""Create default team learnings when insufficient data."""
return TeamLearnings(
team_id=team_id,
velocity_patterns={"general": 1.2}, # Assume 20% overrun by default
skill_strengths={},
preferred_task_types={},
collaboration_patterns={
"avg_team_size": 3,
"parallel_task_preference": 0.5,
},
quality_metrics={"average_quality": 0.8},
last_updated=datetime.now(timezone.utc),
)
def _create_default_tech_learnings(self, tech_stack: str) -> TechnologyLearnings:
"""Create default technology learnings when insufficient data."""
return TechnologyLearnings(
tech_stack=tech_stack,
typical_patterns={"typical_duration": {"mean": 30, "median": 28}},
estimation_multipliers={"general": 1.3},
common_dependencies=[],
risk_factors={},
best_practices=["Follow coding standards", "Write tests", "Code review"],
last_updated=datetime.now(timezone.utc),
)
async def _adapt_estimation_template(
self,
team_learning: TeamLearnings,
tech_learning: TechnologyLearnings,
context: Dict[str, Any],
) -> AdaptedTemplate:
"""Adapt estimation template based on learnings."""
adaptations = {}
# Combine team velocity and tech multipliers
for task_type in team_learning.velocity_patterns:
team_velocity = team_learning.velocity_patterns[task_type]
tech_multiplier = tech_learning.estimation_multipliers.get(task_type, 1.0)
# Weighted combination
combined_multiplier = (team_velocity * 0.6) + (tech_multiplier * 0.4)
adaptations[f"{task_type}_multiplier"] = combined_multiplier
return AdaptedTemplate(
template_id="estimation_adapted",
original_template={"base_multiplier": 1.0},
adaptations=adaptations,
adaptation_reasoning=(
"Combined team velocity patterns with "
"technology-specific multipliers"
),
confidence=0.8,
usage_count=0,
success_rate=0.0,
last_used=datetime.now(timezone.utc),
)
async def _adapt_task_generation_template(
self,
project_learning: ProjectTypeLearnings,
team_learning: TeamLearnings,
context: Dict[str, Any],
) -> AdaptedTemplate:
"""Adapt task generation template based on learnings."""
adaptations = {
"preferred_task_types": list(team_learning.preferred_task_types.keys()),
"typical_phases": project_learning.typical_phases,
"team_preferences": team_learning.preferred_task_types,
}
return AdaptedTemplate(
template_id="task_generation_adapted",
original_template={
"standard_phases": ["design", "implement", "test", "deploy"]
},
adaptations=adaptations,
adaptation_reasoning=(
"Adapted based on team preferences and project type patterns"
),
confidence=0.75,
usage_count=0,
success_rate=0.0,
last_used=datetime.now(timezone.utc),
)
async def _adapt_dependency_template(
self, tech_learning: TechnologyLearnings, context: Dict[str, Any]
) -> AdaptedTemplate:
"""Adapt dependency template based on technology learnings."""
adaptations = {
"common_dependencies": tech_learning.common_dependencies,
"risk_factors": tech_learning.risk_factors,
}
return AdaptedTemplate(
template_id="dependencies_adapted",
original_template={"basic_dependencies": []},
adaptations=adaptations,
adaptation_reasoning="Added technology-specific dependency patterns",
confidence=0.7,
usage_count=0,
success_rate=0.0,
last_used=datetime.now(timezone.utc),
)
async def _get_team_recommendations(
self, team_learning: TeamLearnings, current_state: Dict[str, Any]
) -> List[str]:
"""Get team-specific recommendations."""
recommendations = []
# Velocity recommendations
for task_type, velocity in team_learning.velocity_patterns.items():
if velocity > 1.5:
percent = (velocity - 1) * 100
recommendations.append(
f"Team tends to underestimate {task_type} tasks by "
f"{percent:.0f}% - consider adding buffer"
)
# Skill recommendations
strong_skills = [
skill
for skill, strength in team_learning.skill_strengths.items()
if strength > 0.85
]
if strong_skills:
recommendations.append(
f"Leverage team's strong skills in: {', '.join(strong_skills)}"
)
return recommendations
async def _get_technology_recommendations(
self, tech_learning: TechnologyLearnings, current_state: Dict[str, Any]
) -> List[str]:
"""Get technology-specific recommendations."""
recommendations = []
# Risk mitigation
for risk, probability in tech_learning.risk_factors.items():
if probability > 0.3:
percent = probability * 100
recommendations.append(
f"Monitor for {risk} (occurs in {percent:.0f}% of "
f"{tech_learning.tech_stack} projects)"
)
# Best practices
recommendations.extend(
[
f"Recommended practice: {practice}"
for practice in tech_learning.best_practices[:3]
]
)
return recommendations
async def _get_process_recommendations(
self, project_learning: ProjectTypeLearnings, current_state: Dict[str, Any]
) -> List[str]:
"""Get process recommendations based on project type."""
recommendations = []
# Phase recommendations
if project_learning.typical_phases:
phases = ", ".join(project_learning.typical_phases)
recommendations.append(
f"For {project_learning.project_type} projects, "
f"consider phases: {phases}"
)
# Pitfall warnings
for pitfall in project_learning.common_pitfalls[:2]:
recommendations.append(f"Watch out for: {pitfall}")
return recommendations