Source code for src.monitoring.project_monitor

"""Project monitoring and health tracking system.

This module implements comprehensive project monitoring capabilities including
real-time health tracking, risk assessment, performance metrics collection,
and automated alerting for the Marcus system.

Classes
-------
ProjectMonitor
    Core monitoring system that provides continuous project health tracking,
    AI-powered risk analysis, and automated issue detection.

Key Features
------------
- Continuous monitoring loop with configurable intervals
- Real-time project state tracking and metrics collection
- AI-powered project health analysis and risk assessment
- Automated detection of stalled tasks, capacity issues, and bottlenecks
- Historical data tracking for trend analysis
- Integration with MCP Kanban system for task data

Examples
--------
Basic monitoring setup:

>>> import asyncio
>>> from src.monitoring.project_monitor import ProjectMonitor
>>>
>>> async def main():
...     monitor = ProjectMonitor()
...
...     # Start continuous monitoring
...     await monitor.start_monitoring()
>>>
>>> asyncio.run(main())

Manual project state check:

>>> monitor = ProjectMonitor()
>>> state = await monitor.get_project_state()
>>> print(f"Progress: {state.progress_percent}%")
>>> print(f"Risk Level: {state.risk_level}")
>>> print(f"Velocity: {state.team_velocity} tasks/week")

Risk and blocker management:

>>> # Get current risks
>>> risks = monitor.get_current_risks()
>>> for risk in risks:
...     print(f"{risk.risk_type}: {risk.description}")
>>>
>>> # Record a blocker
>>> blocker = await monitor.record_blocker(
...     agent_id="agent-123",
...     task_id="task-456",
...     description="Database connection failed"
... )
"""

import asyncio
import logging
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional

from src.config.settings import Settings
from src.core.models import (
    BlockerReport,
    ProjectRisk,
    ProjectState,
    RiskLevel,
    Task,
    TaskStatus,
)
from src.core.types import ProjectOutcome
from src.integrations.ai_analysis_engine import AIAnalysisEngine
from src.integrations.github_mcp_interface import GitHubMCPInterface
from src.integrations.kanban_client import KanbanClient
from src.learning.project_pattern_learner import ProjectPatternLearner
from src.quality.project_quality_assessor import ProjectQualityAssessor

logger = logging.getLogger(__name__)


[docs] class ProjectMonitor: """Continuous monitoring system for project health and performance tracking. The ProjectMonitor class provides comprehensive project oversight through real-time monitoring, AI-powered analysis, and automated issue detection. It integrates with the MCP Kanban system to track project progress and identifies potential risks before they impact project timelines. Attributes ---------- settings : Settings Configuration settings for the monitoring system kanban_client : KanbanClient Client for interacting with the MCP Kanban board ai_engine : AIAnalysisEngine AI engine for project health analysis and risk assessment current_state : Optional[ProjectState] Current project state with metrics and health indicators blockers : List[BlockerReport] List of reported blockers across all tasks risks : List[ProjectRisk] List of identified project risks with mitigation strategies historical_data : List[Dict] Historical metrics data for trend analysis check_interval : int Interval in seconds between monitoring checks (default: 900s/15min) is_monitoring : bool Flag indicating if continuous monitoring is active Methods ------- start_monitoring() Start the continuous monitoring loop stop_monitoring() Stop the continuous monitoring loop get_project_state() Get current project state with latest metrics record_blocker(agent_id, task_id, description) Record a new blocker report get_current_risks() Get list of current project risks get_active_blockers() Get list of unresolved blockers Examples -------- Initialize and start monitoring: >>> monitor = ProjectMonitor() >>> await monitor.start_monitoring() Get project health status: >>> state = await monitor.get_project_state() >>> print(f"Progress: {state.progress_percent}%") >>> print(f"Risk: {state.risk_level.value}") Record a blocker: >>> blocker = await monitor.record_blocker( ... agent_id="agent-001", ... task_id="task-123", ... description="API endpoint not responding" ... ) """
[docs] def __init__(self) -> None: self.settings = Settings() self.kanban_client = KanbanClient() self.ai_engine = AIAnalysisEngine() # Pattern learning and quality assessment self.pattern_learner = ProjectPatternLearner(ai_engine=self.ai_engine) self.quality_assessor = ProjectQualityAssessor(ai_engine=self.ai_engine) # Recommendation engine removed (pipeline infrastructure dependency) # State tracking self.current_state: Optional[ProjectState] = None self.blockers: List[BlockerReport] = [] self.risks: List[ProjectRisk] = [] self.historical_data: List[Dict[str, Any]] = [] # Monitoring configuration self.check_interval = self.settings.get( "monitoring_interval", 900 ) # 15 minutes self.is_monitoring = False self._last_completion_check: Optional[datetime] = None
[docs] async def start_monitoring(self) -> None: """Start the continuous monitoring loop. Initiates the main monitoring loop that continuously collects project data, analyzes health metrics, checks for issues, and records historical data at regular intervals defined by check_interval. The monitoring loop performs the following operations: 1. Collect current project data from the kanban board 2. Analyze project health using AI engine 3. Check for various issues (stalled tasks, capacity problems, etc.) 4. Record metrics for historical tracking The loop continues until stop_monitoring() is called or an unrecoverable error occurs. Individual monitoring cycles that encounter errors will log the error and continue with the next cycle. Raises ------ Exception If there are critical errors in the monitoring setup that prevent the loop from starting Examples -------- >>> monitor = ProjectMonitor() >>> await monitor.start_monitoring() # Runs indefinitely Run monitoring for a specific duration: >>> import asyncio >>> monitor = ProjectMonitor() >>> >>> # Start monitoring in background >>> monitor_task = asyncio.create_task(monitor.start_monitoring()) >>> >>> # Do other work... >>> await asyncio.sleep(3600) # Run for 1 hour >>> >>> # Stop monitoring >>> await monitor.stop_monitoring() >>> await monitor_task Notes ----- The monitoring interval can be configured via the 'monitoring_interval' setting. Default is 900 seconds (15 minutes). Shorter intervals provide more responsive monitoring but increase system load. """ self.is_monitoring = True while self.is_monitoring: try: # Collect project data await self._collect_project_data() # Analyze project health await self._analyze_project_health() # Check for issues await self._check_for_issues() # Check for project completion await self._check_for_project_completion() # Store historical data self._record_metrics() except Exception as e: import sys print(f"Error in monitoring loop: {e}", file=sys.stderr) # Wait before next check await asyncio.sleep(self.check_interval)
[docs] async def stop_monitoring(self) -> None: """Stop the continuous monitoring loop. Gracefully stops the monitoring loop by setting the is_monitoring flag to False. The current monitoring cycle will complete before the loop exits. Examples -------- >>> monitor = ProjectMonitor() >>> # Start monitoring in background >>> monitor_task = asyncio.create_task(monitor.start_monitoring()) >>> >>> # Stop monitoring after some time >>> await monitor.stop_monitoring() >>> await monitor_task # Wait for loop to exit """ self.is_monitoring = False
[docs] async def get_project_state(self) -> ProjectState: """Get current project state with latest metrics. Retrieves the current project state including progress metrics, task counts, velocity data, and risk assessment. If no current state exists, it will collect fresh data from the kanban board. Returns ------- ProjectState Current project state containing: - board_id: Kanban board identifier - project_name: Name of the project - total_tasks: Total number of tasks - completed_tasks: Number of completed tasks - in_progress_tasks: Number of tasks in progress - blocked_tasks: Number of blocked tasks - progress_percent: Overall progress percentage - overdue_tasks: List of overdue tasks - team_velocity: Tasks completed per week - risk_level: Overall project risk level - last_updated: Timestamp of last update Examples -------- >>> monitor = ProjectMonitor() >>> state = await monitor.get_project_state() >>> print(f"Project: {state.project_name}") >>> print(f"Progress: {state.progress_percent:.1f}%") >>> print(f"Velocity: {state.team_velocity} tasks/week") >>> print(f"Risk Level: {state.risk_level.value}") Check for overdue tasks: >>> state = await monitor.get_project_state() >>> if state.overdue_tasks: ... print(f"Warning: {len(state.overdue_tasks)} overdue tasks") ... for task in state.overdue_tasks: ... print(f" - {task.name} (due: {task.due_date})") """ if not self.current_state: await self._collect_project_data() if self.current_state is None: logger.error("Failed to collect project data: current_state is still None") raise RuntimeError("Unable to retrieve project state from data collection") return self.current_state
async def _collect_project_data(self) -> None: """Collect comprehensive project data from kanban board. Gathers current project metrics including task counts, progress, velocity calculations, and risk assessment. Updates the current_state attribute with fresh data from the kanban board. The method performs the following data collection: 1. Retrieves board summary and all tasks from kanban system 2. Calculates task distribution across status categories 3. Identifies overdue tasks based on due dates 4. Computes progress percentage and team velocity 5. Assesses overall project risk level 6. Updates the current_state with collected metrics Raises ------ Exception If there are errors communicating with the kanban board or processing the retrieved data Notes ----- This method is called automatically by the monitoring loop and when get_project_state() is called with no existing state. It integrates with the MCP Kanban client to retrieve real-time project data. """ # Get board summary summary = await self.kanban_client.get_board_summary() # Get all tasks all_tasks = await self._get_all_tasks() # Calculate metrics total_tasks = len(all_tasks) completed_tasks = len([t for t in all_tasks if t.status == TaskStatus.DONE]) in_progress_tasks = len( [t for t in all_tasks if t.status == TaskStatus.IN_PROGRESS] ) blocked_tasks = len([t for t in all_tasks if t.status == TaskStatus.BLOCKED]) # Find overdue tasks overdue_tasks = [] now = datetime.now(timezone.utc) for task in all_tasks: if task.due_date and task.due_date < now and task.status != TaskStatus.DONE: overdue_tasks.append(task) # Calculate progress progress_percent = ( (completed_tasks / total_tasks * 100) if total_tasks > 0 else 0 ) # Calculate velocity (tasks completed per week) velocity = await self._calculate_velocity(all_tasks) # Determine risk level and score for internal use risk_level = self._assess_risk_level( progress_percent, len(overdue_tasks), blocked_tasks, velocity ) # Store additional calculated metrics as attributes for access by other methods velocity_trend = self._calculate_velocity_trend(velocity) risk_score = self._calculate_risk_score( progress_percent, len(overdue_tasks), blocked_tasks, velocity ) projected_completion = self._calculate_projected_completion( completed_tasks, total_tasks, velocity ) # Update current state with only the fields defined in ProjectState self.current_state = ProjectState( board_id=self.kanban_client.board_id or "unknown", project_name=summary.get("name", "Unknown Project"), total_tasks=total_tasks, completed_tasks=completed_tasks, in_progress_tasks=in_progress_tasks, blocked_tasks=blocked_tasks, progress_percent=progress_percent, overdue_tasks=overdue_tasks, team_velocity=velocity, risk_level=risk_level, last_updated=datetime.now(timezone.utc), ) # Store additional metrics as instance attributes for later use self._velocity_trend = velocity_trend self._risk_score = risk_score self._projected_completion = projected_completion async def _get_all_tasks(self) -> List[Task]: """Get all tasks from all kanban board columns. Retrieves tasks from all standard kanban columns (TODO, IN PROGRESS, BLOCKED, DONE) and converts them to Task objects for analysis. Returns ------- List[Task] List of all tasks across all board columns, with each task containing status, timing, and metadata information Raises ------ Exception If there are errors retrieving tasks from the kanban board or converting card data to Task objects Notes ----- This method directly interfaces with the MCP Kanban system using the mcp_kanban_card_manager tool to retrieve card data from each column, then converts the cards to standardized Task objects. """ # Use the existing get_all_tasks method from KanbanClient return await self.kanban_client.get_all_tasks() async def _calculate_velocity(self, tasks: List[Task]) -> float: """Calculate team velocity as tasks completed per week. Analyzes the provided tasks to determine how many were completed in the last 7 days, providing a measure of team productivity. Parameters ---------- tasks : List[Task] List of all project tasks to analyze for velocity calculation Returns ------- float Number of tasks completed in the last week, representing the team's current velocity Examples -------- >>> tasks = await monitor._get_all_tasks() >>> velocity = await monitor._calculate_velocity(tasks) >>> print(f"Team velocity: {velocity} tasks/week") Notes ----- Velocity is calculated by counting tasks with status DONE that have an updated_at timestamp within the last 7 days. This provides a rolling window of team productivity that can be used for planning and capacity assessment. """ one_week_ago = datetime.now(timezone.utc) - timedelta(days=7) completed_this_week = [ t for t in tasks if t.status == TaskStatus.DONE and t.updated_at > one_week_ago ] return len(completed_this_week) def _assess_risk_level( self, progress: float, overdue_count: int, blocked_count: int, velocity: float ) -> RiskLevel: """Assess overall project risk level based on key metrics. Evaluates multiple project health indicators to determine the overall risk level using a scoring system that considers progress, overdue tasks, blocked tasks, and team velocity. Parameters ---------- progress : float Project progress as a percentage (0-100) overdue_count : int Number of tasks that are past their due date blocked_count : int Number of tasks currently in blocked status velocity : float Team velocity (tasks completed per week) Returns ------- RiskLevel Assessed risk level (LOW, MEDIUM, HIGH, or CRITICAL) Examples -------- >>> risk = monitor._assess_risk_level( ... progress=75.0, ... overdue_count=2, ... blocked_count=1, ... velocity=8.0 ... ) >>> print(f"Risk Level: {risk.value}") # e.g., "LOW" Risk scoring breakdown: >>> # High risk scenario >>> risk = monitor._assess_risk_level( ... progress=15.0, # Low progress: +2 points ... overdue_count=8, # Many overdue: +3 points ... blocked_count=5, # Many blocked: +2 points ... velocity=1.0 # Low velocity: +2 points ... ) # Total: 9 points = CRITICAL Notes ----- Risk scoring system: - Progress < 25%: +2 points, < 50%: +1 point - Overdue tasks: >5: +3, >2: +2, >0: +1 point - Blocked tasks: >3: +2, >0: +1 point - Velocity: <2: +2, <5: +1 point Risk levels by total score: - 0-1 points: LOW risk - 2-3 points: MEDIUM risk - 4-5 points: HIGH risk - 6+ points: CRITICAL risk """ risk_score = 0 # Progress-based risk if progress < 25: risk_score += 2 elif progress < 50: risk_score += 1 # Overdue tasks risk if overdue_count > 5: risk_score += 3 elif overdue_count > 2: risk_score += 2 elif overdue_count > 0: risk_score += 1 # Blocked tasks risk if blocked_count > 3: risk_score += 2 elif blocked_count > 0: risk_score += 1 # Velocity risk if velocity < 2: risk_score += 2 elif velocity < 5: risk_score += 1 # Map score to risk level if risk_score >= 6: return RiskLevel.CRITICAL elif risk_score >= 4: return RiskLevel.HIGH elif risk_score >= 2: return RiskLevel.MEDIUM else: return RiskLevel.LOW def _push_cost_context(self) -> Any: """Open a cost-attribution context for an LLM call in this monitor. Monitor LLM calls run outside the MCP request lifecycle, so we wrap them here to keep their tokens from landing in the 'unassigned' bucket. We pull the **real Marcus project_id** from ``self.kanban_client.project_id`` (the canonical id, not ``board_id`` — those are distinct in Planka/SQLite providers, Codex P2 on PR #515) so the dashboard rolls monitor cost into the same project the user sees in the regular picker, instead of creating a synthetic ``monitor:*`` ghost project. When no project_id is available (kanban client not initialized or no active board), we skip the push entirely; the call falls through to the 'unassigned' bucket — that's correct, because without a project_id we genuinely don't know what to attribute it to. Returns an ExitStack the caller closes (or uses via ``with``). Falls open and silent on any unexpected error. """ from contextlib import ExitStack from src.cost_tracking.cost_recorder import PlannerContext, get_recorder stack = ExitStack() try: state = self.current_state kanban = getattr(self, "kanban_client", None) project_id = getattr(kanban, "project_id", None) if kanban else None if not project_id: # Nothing to attribute against — fall through to # 'unassigned' so the gap stays visible. return stack project_name = state.project_name if state is not None else None stack.enter_context( get_recorder().planner_context( PlannerContext( run_id="unassigned", project_id=str(project_id), project_name=project_name, agent_id="monitor", ) ) ) except Exception: # pragma: no cover - never break the monitor stack.close() return ExitStack() return stack async def _analyze_project_health(self) -> None: """Perform AI-powered project health analysis. Uses the AI analysis engine to perform deep analysis of project health, identifying potential risks and generating mitigation strategies based on current state and historical trends. The analysis considers: 1. Current project state and metrics 2. Recent activity patterns from historical data 3. Team status and capacity information 4. Trend analysis for predictive insights Updates the risks attribute with identified project risks including type classification, severity assessment, and recommended mitigation strategies. Raises ------ Exception If there are errors communicating with the AI analysis engine or processing the analysis results Notes ----- This method requires a valid current_state to perform analysis. If no current state exists, the analysis is skipped. The AI engine analyzes up to the last 10 historical data points for trend analysis. """ if not self.current_state: return # Get recent activities from historical data recent_activities = self.historical_data[-10:] if self.historical_data else [] # Get team status (simplified for now) team_status: Dict[str, Any] = ( {} ) # Would be populated from agent status tracking # Get AI analysis under a cost-attribution context so the LLM # tokens land on the project being analyzed, not 'unassigned'. with self._push_cost_context(): analysis = await self.ai_engine.analyze_project_health( self.current_state, recent_activities, team_status ) # Extract risks from analysis self.risks = [] for risk_data in analysis.get("risk_factors", []): risk = ProjectRisk( risk_type=risk_data["type"], description=risk_data["description"], severity=self._map_severity(risk_data["severity"]), probability=0.5, # Default probability impact=risk_data.get("impact", "Medium"), mitigation_strategy=risk_data.get("mitigation", "Monitor closely"), identified_at=datetime.now(timezone.utc), ) self.risks.append(risk) async def _check_for_issues(self) -> None: """Check for various project issues and potential problems. Performs automated detection of common project issues including stalled tasks, capacity problems, and dependency bottlenecks. Identified issues are added to the risks list for tracking. Issue detection includes: 1. Stalled tasks that haven't progressed within threshold time 2. Capacity issues from too many concurrent in-progress tasks 3. Dependency bottlenecks where blocked tasks affect multiple others Raises ------ Exception If there are errors retrieving task data or analyzing issues Notes ----- This method requires a valid current_state to perform issue detection. Detection thresholds can be configured via settings (e.g., stall_threshold_hours for identifying stalled tasks). """ if not self.current_state: return # Check for stalled tasks await self._check_stalled_tasks() # Check for capacity issues await self._check_capacity_issues() # Check for dependency bottlenecks await self._check_dependency_bottlenecks() async def _check_stalled_tasks(self) -> None: """Identify tasks that haven't progressed within threshold time. Analyzes in-progress tasks to find those that haven't been updated recently, indicating potential stalls or blockers. Creates risk entries for tasks that exceed the stall threshold. The stall threshold is configurable via the 'stall_threshold_hours' setting (default: 24 hours). Tasks in progress longer than this threshold without updates are flagged as stalled. Raises ------ Exception If there are errors retrieving task data or calculating time differences Notes ----- Stalled tasks are added to the risks list with type 'stalled_task' and MEDIUM severity. The risk description includes the task name and duration since last update. """ tasks = await self._get_all_tasks() stall_threshold = timedelta( hours=self.settings.get("stall_threshold_hours", 24) ) now = datetime.now(timezone.utc) for task in tasks: if task.status == TaskStatus.IN_PROGRESS: if now - task.updated_at > stall_threshold: # Task is stalled, create a risk hours_stalled = stall_threshold.total_seconds() / 3600 risk = ProjectRisk( risk_type="stalled_task", description=( f"Task '{task.name}' has been in progress for " f"over {hours_stalled} hours" ), severity=RiskLevel.MEDIUM, probability=1.0, impact="Delays project timeline", mitigation_strategy="Check in with assigned agent", identified_at=now, ) self.risks.append(risk) async def _check_capacity_issues(self) -> None: """Check if team capacity is being exceeded. Analyzes the current workload distribution to identify potential capacity issues where too many tasks are in progress simultaneously, which can lead to reduced productivity and quality issues. Creates a HIGH severity risk if more than 10 tasks (configurable threshold) are in progress concurrently, suggesting potential team overload and recommending task prioritization. Raises ------ Exception If there are errors retrieving or analyzing task data Notes ----- The capacity threshold (currently 10 concurrent tasks) should be made configurable in future versions. This check provides early warning of team overload before it impacts project delivery. """ # This would integrate with agent status tracking # For now, we'll check task distribution tasks = await self._get_all_tasks() in_progress = [t for t in tasks if t.status == TaskStatus.IN_PROGRESS] if len(in_progress) > 10: # Configurable threshold risk = ProjectRisk( risk_type="capacity", description="Too many tasks in progress simultaneously", severity=RiskLevel.HIGH, probability=0.8, impact="Team burnout and quality issues", mitigation_strategy="Prioritize and defer lower priority tasks", identified_at=datetime.now(timezone.utc), ) self.risks.append(risk) async def _check_dependency_bottlenecks(self) -> None: """Identify dependency chains causing project bottlenecks. Analyzes blocked tasks to find those that are preventing multiple other tasks from proceeding, creating dependency bottlenecks that can significantly impact project timelines. For each blocked task, checks how many dependent tasks are waiting. If a blocked task has more than 2 dependents, it's flagged as a HIGH severity risk requiring immediate attention. Raises ------ Exception If there are errors retrieving task dependency information or analyzing dependency chains Notes ----- This method relies on the kanban client's get_dependent_tasks() method to identify task dependencies. Dependency bottlenecks are critical as they can cascade delays across multiple work streams. """ tasks = await self._get_all_tasks() # Find blocked tasks with many dependents for task in tasks: if task.status == TaskStatus.BLOCKED: # For now, skip dependency analysis since # get_dependent_tasks is not implemented # TODO: Implement dependency analysis when method available dependents: List[Task] = [] # Would call self.kanban_client.get_dependent_tasks(task.id) if len(dependents) > 2: num_blocked = len(dependents) risk = ProjectRisk( risk_type="dependency", description=( f"Task '{task.name}' is blocking " f"{num_blocked} other tasks" ), severity=RiskLevel.HIGH, probability=1.0, impact="Multiple tasks cannot proceed", mitigation_strategy="Prioritize unblocking this task", identified_at=datetime.now(timezone.utc), ) self.risks.append(risk) def _record_metrics(self) -> None: """Record current metrics for historical tracking and trend analysis. Captures a snapshot of current project metrics and adds it to the historical data for trend analysis and reporting. Maintains a rolling window of the last 100 metric snapshots. Recorded metrics include: - Timestamp of measurement - Project progress percentage - Team velocity (tasks/week) - Number of blocked tasks - Overall risk level - Total and completed task counts The historical data enables trend analysis, velocity tracking, and identification of patterns in project health over time. Notes ----- Historical data is stored in memory and limited to the last 100 entries to prevent excessive memory usage. For persistent storage, consider extending this method to write to a database or file. """ if self.current_state: metrics = { "timestamp": datetime.now(timezone.utc).isoformat(), "progress": self.current_state.progress_percent, "velocity": self.current_state.team_velocity, "blocked_tasks": self.current_state.blocked_tasks, "risk_level": self.current_state.risk_level.value, "total_tasks": self.current_state.total_tasks, "completed_tasks": self.current_state.completed_tasks, } self.historical_data.append(metrics) # Keep only last 100 entries if len(self.historical_data) > 100: self.historical_data = self.historical_data[-100:]
[docs] async def record_blocker( self, agent_id: str, task_id: str, description: str ) -> BlockerReport: """Record a new blocker report from an agent. Creates a new blocker report when an agent encounters an issue that prevents task progress. The blocker is tracked for resolution and included in project health assessments. Parameters ---------- agent_id : str Unique identifier of the agent reporting the blocker task_id : str Unique identifier of the blocked task description : str Detailed description of the blocking issue Returns ------- BlockerReport Created blocker report containing: - task_id: ID of the blocked task - reporter_id: ID of the reporting agent - description: Description of the blocking issue - severity: Risk level of the blocker (default: MEDIUM) - reported_at: Timestamp when blocker was reported - resolved: Resolution status (default: False) Examples -------- >>> blocker = await monitor.record_blocker( ... agent_id="agent-backend-001", ... task_id="task-user-auth-123", ... description="Database connection timeout after 30 seconds" ... ) >>> print(f"Blocker recorded: {blocker.task_id}") Record multiple related blockers: >>> blockers = [] >>> for task_id in failing_tasks: ... blocker = await monitor.record_blocker( ... agent_id="agent-001", ... task_id=task_id, ... description="External API rate limit exceeded" ... ) ... blockers.append(blocker) Notes ----- Blockers are automatically assigned MEDIUM severity. For more precise severity assessment, consider extending this method to accept severity as a parameter or implement automatic severity classification based on the description content. """ blocker = BlockerReport( task_id=task_id, reporter_id=agent_id, description=description, severity=RiskLevel.MEDIUM, reported_at=datetime.now(timezone.utc), ) self.blockers.append(blocker) return blocker
def _map_severity(self, severity_str: str) -> RiskLevel: """Map string severity to RiskLevel enum. Converts string-based severity levels from external sources (like AI analysis results) to the standardized RiskLevel enum. Parameters ---------- severity_str : str String representation of severity level (case-insensitive). Valid values: "low", "medium", "high", "critical" Returns ------- RiskLevel Corresponding RiskLevel enum value. Returns MEDIUM for unrecognized severity strings as a safe default. Examples -------- >>> risk_level = monitor._map_severity("high") >>> print(risk_level) # RiskLevel.HIGH >>> risk_level = monitor._map_severity("CRITICAL") >>> print(risk_level) # RiskLevel.CRITICAL >>> risk_level = monitor._map_severity("unknown") >>> print(risk_level) # RiskLevel.MEDIUM (default) """ mapping = { "low": RiskLevel.LOW, "medium": RiskLevel.MEDIUM, "high": RiskLevel.HIGH, "critical": RiskLevel.CRITICAL, } return mapping.get(severity_str.lower(), RiskLevel.MEDIUM)
[docs] def get_current_risks(self) -> List[ProjectRisk]: """Get current project risks identified by monitoring and analysis. Returns the list of all currently identified project risks, including those from AI analysis, automated issue detection, and manual risk assessments. Returns ------- List[ProjectRisk] List of current project risks, each containing: - risk_type: Category of risk (e.g., 'stalled_task', 'capacity') - description: Detailed description of the risk - severity: Risk severity level (LOW, MEDIUM, HIGH, CRITICAL) - probability: Likelihood of risk occurrence (0.0-1.0) - impact: Description of potential impact - mitigation_strategy: Recommended mitigation approach - identified_at: Timestamp when risk was identified Examples -------- >>> risks = monitor.get_current_risks() >>> print(f"Total risks: {len(risks)}") >>> >>> # Filter by severity >>> critical_risks = [ ... risk for risk in risks ... if risk.severity == RiskLevel.CRITICAL ... ] >>> >>> # Display risk summary >>> for risk in risks: ... print(f"{risk.risk_type}: {risk.description}") ... print(f" Severity: {risk.severity.value}") ... print(f" Mitigation: {risk.mitigation_strategy}") """ return self.risks
[docs] def get_active_blockers(self) -> List[BlockerReport]: """Get active (unresolved) blocker reports. Returns all blocker reports that have not yet been resolved, providing visibility into current impediments affecting project progress. Returns ------- List[BlockerReport] List of unresolved blocker reports, each containing: - task_id: ID of the blocked task - reporter_id: ID of the agent that reported the blocker - description: Description of the blocking issue - severity: Severity level of the blocker - reported_at: When the blocker was first reported - resolved: Resolution status (False for active blockers) Examples -------- >>> blockers = monitor.get_active_blockers() >>> print(f"Active blockers: {len(blockers)}") >>> >>> # Group by task >>> from collections import defaultdict >>> by_task = defaultdict(list) >>> for blocker in blockers: ... by_task[blocker.task_id].append(blocker) >>> >>> # Display blocker summary >>> for blocker in blockers: ... print(f"Task {blocker.task_id}: {blocker.description}") ... print(f" Reported by: {blocker.reporter_id}") ... print(f" Severity: {blocker.severity.value}") ... print(f" Age: {datetime.now(timezone.utc) - blocker.reported_at}") Notes ----- Active blockers represent ongoing impediments that require attention. Regular review of active blockers helps identify patterns and prioritize resolution efforts. """ return [b for b in self.blockers if not b.resolved]
def _calculate_velocity_trend(self, current_velocity: float) -> str: """Calculate velocity trend based on historical data. Parameters ---------- current_velocity : float Current team velocity in tasks per week Returns ------- str Velocity trend: "increasing", "stable", or "decreasing" """ if len(self.historical_data) < 3: return "stable" # Get last 3 velocity measurements recent_velocities = [ data.get("velocity", 0) for data in self.historical_data[-3:] ] if not recent_velocities: return "stable" avg_recent = sum(recent_velocities) / len(recent_velocities) # Determine trend if current_velocity > avg_recent * 1.1: return "increasing" elif current_velocity < avg_recent * 0.9: return "decreasing" else: return "stable" def _calculate_risk_score( self, progress: float, overdue_count: int, blocked_count: int, velocity: float ) -> float: """Calculate numeric risk score (0-1). Parameters ---------- progress : float Project progress percentage overdue_count : int Number of overdue tasks blocked_count : int Number of blocked tasks velocity : float Team velocity Returns ------- float Risk score between 0 (low risk) and 1 (high risk) """ risk_score = 0.0 # Progress-based risk (0-0.3) if progress < 25: risk_score += 0.3 elif progress < 50: risk_score += 0.15 # Overdue tasks risk (0-0.3) if overdue_count > 5: risk_score += 0.3 elif overdue_count > 2: risk_score += 0.2 elif overdue_count > 0: risk_score += 0.1 # Blocked tasks risk (0-0.2) if blocked_count > 3: risk_score += 0.2 elif blocked_count > 0: risk_score += 0.1 # Velocity risk (0-0.2) if velocity < 2: risk_score += 0.2 elif velocity < 5: risk_score += 0.1 return min(risk_score, 1.0) def _calculate_projected_completion( self, completed_tasks: int, total_tasks: int, velocity: float ) -> Optional[datetime]: """Calculate projected completion date based on velocity. Parameters ---------- completed_tasks : int Number of completed tasks total_tasks : int Total number of tasks velocity : float Tasks completed per week Returns ------- Optional[datetime] Projected completion date, or None if cannot be calculated """ if velocity <= 0 or total_tasks <= completed_tasks: return None remaining_tasks = total_tasks - completed_tasks weeks_to_complete = remaining_tasks / velocity return datetime.now(timezone.utc) + timedelta(weeks=weeks_to_complete) async def _check_for_project_completion(self) -> None: """Check if project has reached completion criteria. Triggers pattern learning and quality assessment when: - Progress >= 95% - No tasks in progress - Less than 5% of tasks are blocked """ if not self.current_state: return # Check completion criteria completion_threshold = self.settings.get( "pattern_learning.completion_threshold", 95 ) if ( self.current_state.progress_percent >= completion_threshold and self.current_state.in_progress_tasks == 0 and ( self.current_state.blocked_tasks / self.current_state.total_tasks < 0.05 if self.current_state.total_tasks > 0 else True ) ): # Check if we've already processed this completion if self._last_completion_check is not None: if (datetime.now(timezone.utc) - self._last_completion_check).days < 1: return self._last_completion_check = datetime.now(timezone.utc) # Trigger completion learning await self._handle_project_completion() async def _handle_project_completion(self) -> None: """Handle project completion by triggering learning and assessment.""" import sys # Ensure current_state is not None if self.current_state is None: return print( f"🎉 Project '{self.current_state.project_name}' appears to be complete!", file=sys.stderr, ) print( f" Progress: {self.current_state.progress_percent:.1f}%", file=sys.stderr, ) completed = self.current_state.completed_tasks total = self.current_state.total_tasks print( f" Completed Tasks: {completed}/{total}", file=sys.stderr, ) # Get configuration config = self.settings.get("quality_assessment", {}) github_owner = config.get("github_owner") github_repo = config.get("github_repo") # Create project outcome based on current state project_duration = self._calculate_project_duration() # Run quality assessment first to determine success all_tasks = await self._get_all_tasks() # Get team members (would need to be tracked or passed in) team_members: List[Any] = [] # This would come from agent tracking github_config = ( { "github_owner": github_owner, "github_repo": github_repo, "project_start_date": config.get("project_start_date", ""), } if github_owner and github_repo else None ) # Pass GitHub MCP interface if available if hasattr(self.kanban_client, "mcp_caller") and github_config: github_interface = GitHubMCPInterface(self.kanban_client.mcp_caller) self.quality_assessor.github_mcp = github_interface assessment = await self.quality_assessor.assess_project_quality( project_state=self.current_state, tasks=all_tasks, team_members=team_members, github_config=github_config, ) # Create outcome based on assessment outcome = ProjectOutcome( successful=assessment.is_successful, completion_time_days=project_duration, quality_score=assessment.overall_score, cost=self._estimate_project_cost(project_duration, len(team_members)), failure_reasons=( [] if assessment.is_successful else assessment.improvement_areas[:3] ), ) # Run pattern learning pattern = await self.pattern_learner.learn_from_project( project_state=self.current_state, tasks=all_tasks, team_members=team_members, outcome=outcome, github_owner=github_owner, github_repo=github_repo, ) # Log results import sys print("\n📊 Quality Assessment Complete:", file=sys.stderr) print(f" Overall Score: {assessment.overall_score:.1%}", file=sys.stderr) print( f" Success: {'✅ Yes' if assessment.is_successful else '❌ No'}", file=sys.stderr, ) print(f" Confidence: {assessment.success_confidence:.1%}", file=sys.stderr) print("\n📈 Key Insights:", file=sys.stderr) for insight in assessment.quality_insights[:3]: print(f" • {insight}", file=sys.stderr) print("\n🎯 Improvement Areas:", file=sys.stderr) for area in assessment.improvement_areas[:3]: print(f" • {area}", file=sys.stderr) print("\n🧠 Pattern Learning Complete:", file=sys.stderr) print(f" Confidence Score: {pattern.confidence_score:.1%}", file=sys.stderr) print( f" Patterns in Database: {len(self.pattern_learner.learned_patterns)}", file=sys.stderr, ) def _calculate_project_duration(self) -> int: """Calculate project duration in days from historical data.""" if not self.historical_data: return 30 # Default estimate # Find first entry first_entry = self.historical_data[0] first_date = datetime.fromisoformat(first_entry["timestamp"]) return (datetime.now(timezone.utc) - first_date).days def _estimate_project_cost(self, duration_days: int, team_size: int) -> float: """Estimate project cost based on duration and team size.""" # Simple cost model: $1000/day per team member daily_rate = self.settings.get("cost_estimation.daily_rate", 1000) daily_rate_value = float(daily_rate) return ( float(duration_days * team_size * daily_rate_value) if team_size > 0 else float(duration_days * daily_rate_value * 3) )
[docs] async def trigger_project_completion_learning( self, team_members: List[Any], outcome: ProjectOutcome, github_owner: Optional[str] = None, github_repo: Optional[str] = None, ) -> Dict[str, Any]: """ Trigger pattern learning and quality assessment when a project completes. This method should be called when a project reaches completion to: 1. Extract patterns from the completed project 2. Run comprehensive quality assessment 3. Store insights for future projects Parameters ---------- team_members : List[Any] List of team members who worked on the project outcome : ProjectOutcome The actual outcome of the project github_owner : Optional[str] GitHub repository owner for code analysis github_repo : Optional[str] GitHub repository name for code analysis Returns ------- Dict[str, Any] Results containing pattern extraction and quality assessment Examples -------- >>> outcome = ProjectOutcome( ... successful=True, ... completion_time_days=35, ... quality_score=0.85, ... cost=50000 ... ) >>> results = await monitor.trigger_project_completion_learning( ... team_members, outcome, "owner", "repo" ... ) >>> print(f"Quality Grade: {results['quality_assessment']['grade']}") Notes ----- This method integrates pattern learning with quality assessment to provide comprehensive insights that help improve future projects. """ if not self.current_state: await self._collect_project_data() # Ensure current_state is not None after data collection if self.current_state is None: logger.error("Failed to collect project data for completion learning") raise RuntimeError( "Unable to retrieve project state for completion learning" ) # Get all tasks for analysis all_tasks = await self._get_all_tasks() # Run pattern learning pattern = await self.pattern_learner.learn_from_project( project_state=self.current_state, tasks=all_tasks, team_members=team_members, outcome=outcome, github_owner=github_owner, github_repo=github_repo, ) # Run quality assessment github_config = ( { "github_owner": github_owner, "github_repo": github_repo, "project_start_date": self.settings.get( "quality_assessment.project_start_date", "" ), } if github_owner and github_repo else None ) assessment = await self.quality_assessor.assess_project_quality( project_state=self.current_state, tasks=all_tasks, team_members=team_members, github_config=github_config, ) # Log completion event with insights import sys print( f"✅ Project '{self.current_state.project_name}' completed:", file=sys.stderr, ) print(f" - Quality Score: {assessment.overall_score:.1%}", file=sys.stderr) print( f" - Pattern Confidence: {pattern.confidence_score:.1%}", file=sys.stderr ) print( f" - Key Success Factors: {', '.join(pattern.success_factors[:3])}", file=sys.stderr, ) return { "pattern_learning": { "success": True, "confidence_score": pattern.confidence_score, "success_factors": pattern.success_factors, "risk_factors": pattern.risk_factors, "patterns_learned": len(self.pattern_learner.learned_patterns), }, "quality_assessment": { "score": assessment.overall_score, "is_successful": assessment.is_successful, "insights": assessment.quality_insights, "improvements": assessment.improvement_areas, "code_quality_score": assessment.code_quality_score, "process_quality_score": assessment.process_quality_score, "delivery_quality_score": assessment.delivery_quality_score, "team_quality_score": assessment.team_quality_score, }, }
[docs] async def get_pattern_based_recommendations(self) -> List[Dict[str, Any]]: """ Get recommendations based on learned patterns for the current project. Returns ------- List[Dict[str, Any]] List of recommendations with type, message, confidence, and impact Examples -------- >>> recommendations = await monitor.get_pattern_based_recommendations() >>> for rec in recommendations: ... print(f"{rec['message']} (confidence: {rec['confidence']:.0%})") """ if not self.current_state: await self._collect_project_data() # Ensure current_state is not None after data collection if self.current_state is None: logger.error("Failed to collect project data for pattern recommendations") raise RuntimeError( "Unable to retrieve project state for pattern recommendations" ) # Recommendation engine was removed (pipeline infrastructure dependency) # Return empty list until reimplemented without pipeline dependencies logger.warning( "Pattern-based recommendations not available - " "recommendation engine removed during pipeline cleanup" ) return []