src.monitoring package#
Marcus Monitoring Module.
This module provides comprehensive project monitoring capabilities for the Marcus system, including real-time project health tracking, risk assessment, performance metrics collection, and automated alerting systems.
Key Components#
- ProjectMonitorclass
Core monitoring system that tracks project health, analyzes metrics, and identifies potential risks or bottlenecks in real-time.
Monitoring Capabilities#
Real-time Health Tracking: Continuous monitoring of project progress, task completion rates, and team velocity
Risk Assessment: AI-powered analysis to identify potential project risks and recommend mitigation strategies
Performance Metrics: Collection and analysis of key performance indicators including velocity, completion rates, and cycle times
Automated Alerting: Detection of stalled tasks, capacity issues, and dependency bottlenecks with automated notifications
Historical Analysis: Trend analysis and historical data tracking for predictive insights
Usage Example#
>>> from src.monitoring import ProjectMonitor
>>>
>>> # Initialize the monitoring system
>>> monitor = ProjectMonitor()
>>>
>>> # Start continuous monitoring
>>> await monitor.start_monitoring()
>>>
>>> # Get current project state
>>> state = await monitor.get_project_state()
>>> print(f"Project progress: {state.progress_percent}%")
>>>
>>> # Check for current risks
>>> risks = monitor.get_current_risks()
>>> for risk in risks:
... print(f"Risk: {risk.description} (Severity: {risk.severity})")
Notes
The monitoring module integrates with the MCP Kanban system and AI analysis engine to provide comprehensive project oversight. It operates asynchronously to avoid blocking project workflows while maintaining real-time monitoring capabilities.
See also
src.core.modelsProject state and risk model definitions
src.integrations.kanban_clientKanban board integration
src.integrations.ai_analysis_engineAI-powered project analysis
- class src.monitoring.ProjectMonitor[source]#
Bases:
objectContinuous monitoring system for project health and performance tracking.
The ProjectMonitor class provides comprehensive project oversight through real-time monitoring, AI-powered analysis, and automated issue detection. It integrates with the MCP Kanban system to track project progress and identifies potential risks before they impact project timelines.
- kanban_client#
Client for interacting with the MCP Kanban board
- Type:
- ai_engine#
AI engine for project health analysis and risk assessment
- Type:
- current_state#
Current project state with metrics and health indicators
- Type:
Optional[ProjectState]
- blockers#
List of reported blockers across all tasks
- Type:
List[BlockerReport]
- risks#
List of identified project risks with mitigation strategies
- Type:
List[ProjectRisk]
- historical_data#
Historical metrics data for trend analysis
- Type:
List[Dict]
- record_blocker(agent_id, task_id, description)[source]#
Record a new blocker report
- Parameters:
- Return type:
Examples
Initialize and start monitoring:
>>> monitor = ProjectMonitor() >>> await monitor.start_monitoring()
Get project health status:
>>> state = await monitor.get_project_state() >>> print(f"Progress: {state.progress_percent}%") >>> print(f"Risk: {state.risk_level.value}")
Record a blocker:
>>> blocker = await monitor.record_blocker( ... agent_id="agent-001", ... task_id="task-123", ... description="API endpoint not responding" ... )
- async start_monitoring()[source]#
Start the continuous monitoring loop.
Initiates the main monitoring loop that continuously collects project data, analyzes health metrics, checks for issues, and records historical data at regular intervals defined by check_interval.
The monitoring loop performs the following operations: 1. Collect current project data from the kanban board 2. Analyze project health using AI engine 3. Check for various issues (stalled tasks, capacity problems, etc.) 4. Record metrics for historical tracking
The loop continues until stop_monitoring() is called or an unrecoverable error occurs. Individual monitoring cycles that encounter errors will log the error and continue with the next cycle.
- Raises:
Exception – If there are critical errors in the monitoring setup that prevent the loop from starting
- Return type:
Examples
>>> monitor = ProjectMonitor() >>> await monitor.start_monitoring() # Runs indefinitely
Run monitoring for a specific duration:
>>> import asyncio >>> monitor = ProjectMonitor() >>> >>> # Start monitoring in background >>> monitor_task = asyncio.create_task(monitor.start_monitoring()) >>> >>> # Do other work... >>> await asyncio.sleep(3600) # Run for 1 hour >>> >>> # Stop monitoring >>> await monitor.stop_monitoring() >>> await monitor_task
Notes
The monitoring interval can be configured via the ‘monitoring_interval’ setting. Default is 900 seconds (15 minutes). Shorter intervals provide more responsive monitoring but increase system load.
- async stop_monitoring()[source]#
Stop the continuous monitoring loop.
Gracefully stops the monitoring loop by setting the is_monitoring flag to False. The current monitoring cycle will complete before the loop exits.
Examples
>>> monitor = ProjectMonitor() >>> # Start monitoring in background >>> monitor_task = asyncio.create_task(monitor.start_monitoring()) >>> >>> # Stop monitoring after some time >>> await monitor.stop_monitoring() >>> await monitor_task # Wait for loop to exit
- Return type:
- async get_project_state()[source]#
Get current project state with latest metrics.
Retrieves the current project state including progress metrics, task counts, velocity data, and risk assessment. If no current state exists, it will collect fresh data from the kanban board.
- Returns:
Current project state containing: - board_id: Kanban board identifier - project_name: Name of the project - total_tasks: Total number of tasks - completed_tasks: Number of completed tasks - in_progress_tasks: Number of tasks in progress - blocked_tasks: Number of blocked tasks - progress_percent: Overall progress percentage - overdue_tasks: List of overdue tasks - team_velocity: Tasks completed per week - risk_level: Overall project risk level - last_updated: Timestamp of last update
- Return type:
Examples
>>> monitor = ProjectMonitor() >>> state = await monitor.get_project_state() >>> print(f"Project: {state.project_name}") >>> print(f"Progress: {state.progress_percent:.1f}%") >>> print(f"Velocity: {state.team_velocity} tasks/week") >>> print(f"Risk Level: {state.risk_level.value}")
Check for overdue tasks:
>>> state = await monitor.get_project_state() >>> if state.overdue_tasks: ... print(f"Warning: {len(state.overdue_tasks)} overdue tasks") ... for task in state.overdue_tasks: ... print(f" - {task.name} (due: {task.due_date})")
- async record_blocker(agent_id, task_id, description)[source]#
Record a new blocker report from an agent.
Creates a new blocker report when an agent encounters an issue that prevents task progress. The blocker is tracked for resolution and included in project health assessments.
- Parameters:
- Returns:
Created blocker report containing: - task_id: ID of the blocked task - reporter_id: ID of the reporting agent - description: Description of the blocking issue - severity: Risk level of the blocker (default: MEDIUM) - reported_at: Timestamp when blocker was reported - resolved: Resolution status (default: False)
- Return type:
Examples
>>> blocker = await monitor.record_blocker( ... agent_id="agent-backend-001", ... task_id="task-user-auth-123", ... description="Database connection timeout after 30 seconds" ... ) >>> print(f"Blocker recorded: {blocker.task_id}")
Record multiple related blockers:
>>> blockers = [] >>> for task_id in failing_tasks: ... blocker = await monitor.record_blocker( ... agent_id="agent-001", ... task_id=task_id, ... description="External API rate limit exceeded" ... ) ... blockers.append(blocker)
Notes
Blockers are automatically assigned MEDIUM severity. For more precise severity assessment, consider extending this method to accept severity as a parameter or implement automatic severity classification based on the description content.
- get_current_risks()[source]#
Get current project risks identified by monitoring and analysis.
Returns the list of all currently identified project risks, including those from AI analysis, automated issue detection, and manual risk assessments.
- Returns:
List of current project risks, each containing: - risk_type: Category of risk (e.g., ‘stalled_task’, ‘capacity’) - description: Detailed description of the risk - severity: Risk severity level (LOW, MEDIUM, HIGH, CRITICAL) - probability: Likelihood of risk occurrence (0.0-1.0) - impact: Description of potential impact - mitigation_strategy: Recommended mitigation approach - identified_at: Timestamp when risk was identified
- Return type:
Examples
>>> risks = monitor.get_current_risks() >>> print(f"Total risks: {len(risks)}") >>> >>> # Filter by severity >>> critical_risks = [ ... risk for risk in risks ... if risk.severity == RiskLevel.CRITICAL ... ] >>> >>> # Display risk summary >>> for risk in risks: ... print(f"{risk.risk_type}: {risk.description}") ... print(f" Severity: {risk.severity.value}") ... print(f" Mitigation: {risk.mitigation_strategy}")
- get_active_blockers()[source]#
Get active (unresolved) blocker reports.
Returns all blocker reports that have not yet been resolved, providing visibility into current impediments affecting project progress.
- Returns:
List of unresolved blocker reports, each containing: - task_id: ID of the blocked task - reporter_id: ID of the agent that reported the blocker - description: Description of the blocking issue - severity: Severity level of the blocker - reported_at: When the blocker was first reported - resolved: Resolution status (False for active blockers)
- Return type:
Examples
>>> blockers = monitor.get_active_blockers() >>> print(f"Active blockers: {len(blockers)}") >>> >>> # Group by task >>> from collections import defaultdict >>> by_task = defaultdict(list) >>> for blocker in blockers: ... by_task[blocker.task_id].append(blocker) >>> >>> # Display blocker summary >>> for blocker in blockers: ... print(f"Task {blocker.task_id}: {blocker.description}") ... print(f" Reported by: {blocker.reporter_id}") ... print(f" Severity: {blocker.severity.value}") ... print(f" Age: {datetime.now(timezone.utc) - blocker.reported_at}")
Notes
Active blockers represent ongoing impediments that require attention. Regular review of active blockers helps identify patterns and prioritize resolution efforts.
- async trigger_project_completion_learning(team_members, outcome, github_owner=None, github_repo=None)[source]#
Trigger pattern learning and quality assessment when a project completes.
This method should be called when a project reaches completion to: 1. Extract patterns from the completed project 2. Run comprehensive quality assessment 3. Store insights for future projects
- Parameters:
- Returns:
Results containing pattern extraction and quality assessment
- Return type:
Examples
>>> outcome = ProjectOutcome( ... successful=True, ... completion_time_days=35, ... quality_score=0.85, ... cost=50000 ... ) >>> results = await monitor.trigger_project_completion_learning( ... team_members, outcome, "owner", "repo" ... ) >>> print(f"Quality Grade: {results['quality_assessment']['grade']}")
Notes
This method integrates pattern learning with quality assessment to provide comprehensive insights that help improve future projects.
- async get_pattern_based_recommendations()[source]#
Get recommendations based on learned patterns for the current project.
- Returns:
List of recommendations with type, message, confidence, and impact
- Return type:
Examples
>>> recommendations = await monitor.get_pattern_based_recommendations() >>> for rec in recommendations: ... print(f"{rec['message']} (confidence: {rec['confidence']:.0%})")
Submodules#
- src.monitoring.assignment_monitor module
- src.monitoring.error_predictor module
- src.monitoring.project_monitor module
- Classes
- Key Features
ProjectMonitorProjectMonitor.settingsProjectMonitor.kanban_clientProjectMonitor.ai_engineProjectMonitor.current_stateProjectMonitor.blockersProjectMonitor.risksProjectMonitor.historical_dataProjectMonitor.check_intervalProjectMonitor.is_monitoringProjectMonitor.start_monitoring()ProjectMonitor.stop_monitoring()ProjectMonitor.get_project_state()ProjectMonitor.record_blocker()ProjectMonitor.get_current_risks()ProjectMonitor.get_active_blockers()ProjectMonitor.__init__()ProjectMonitor.start_monitoring()ProjectMonitor.stop_monitoring()ProjectMonitor.get_project_state()ProjectMonitor.record_blocker()ProjectMonitor.get_current_risks()ProjectMonitor.get_active_blockers()ProjectMonitor.trigger_project_completion_learning()ProjectMonitor.get_pattern_based_recommendations()