src.monitoring.project_monitor module#

Project monitoring and health tracking system.

This module implements comprehensive project monitoring capabilities including real-time health tracking, risk assessment, performance metrics collection, and automated alerting for the Marcus system.

Classes#

ProjectMonitor

Core monitoring system that provides continuous project health tracking, AI-powered risk analysis, and automated issue detection.

Key Features#

  • Continuous monitoring loop with configurable intervals

  • Real-time project state tracking and metrics collection

  • AI-powered project health analysis and risk assessment

  • Automated detection of stalled tasks, capacity issues, and bottlenecks

  • Historical data tracking for trend analysis

  • Integration with MCP Kanban system for task data

Examples

Basic monitoring setup:

>>> import asyncio
>>> from src.monitoring.project_monitor import ProjectMonitor
>>>
>>> async def main():
...     monitor = ProjectMonitor()
...
...     # Start continuous monitoring
...     await monitor.start_monitoring()
>>>
>>> asyncio.run(main())

Manual project state check:

>>> monitor = ProjectMonitor()
>>> state = await monitor.get_project_state()
>>> print(f"Progress: {state.progress_percent}%")
>>> print(f"Risk Level: {state.risk_level}")
>>> print(f"Velocity: {state.team_velocity} tasks/week")

Risk and blocker management:

>>> # Get current risks
>>> risks = monitor.get_current_risks()
>>> for risk in risks:
...     print(f"{risk.risk_type}: {risk.description}")
>>>
>>> # Record a blocker
>>> blocker = await monitor.record_blocker(
...     agent_id="agent-123",
...     task_id="task-456",
...     description="Database connection failed"
... )
class src.monitoring.project_monitor.ProjectMonitor[source]#

Bases: object

Continuous monitoring system for project health and performance tracking.

The ProjectMonitor class provides comprehensive project oversight through real-time monitoring, AI-powered analysis, and automated issue detection. It integrates with the MCP Kanban system to track project progress and identifies potential risks before they impact project timelines.

settings#

Configuration settings for the monitoring system

Type:

Settings

kanban_client#

Client for interacting with the MCP Kanban board

Type:

KanbanClient

ai_engine#

AI engine for project health analysis and risk assessment

Type:

AIAnalysisEngine

current_state#

Current project state with metrics and health indicators

Type:

Optional[ProjectState]

blockers#

List of reported blockers across all tasks

Type:

List[BlockerReport]

risks#

List of identified project risks with mitigation strategies

Type:

List[ProjectRisk]

historical_data#

Historical metrics data for trend analysis

Type:

List[Dict]

check_interval#

Interval in seconds between monitoring checks (default: 900s/15min)

Type:

int

is_monitoring#

Flag indicating if continuous monitoring is active

Type:

bool

start_monitoring()[source]#

Start the continuous monitoring loop

Return type:

None

stop_monitoring()[source]#

Stop the continuous monitoring loop

Return type:

None

get_project_state()[source]#

Get current project state with latest metrics

Return type:

ProjectState

record_blocker(agent_id, task_id, description)[source]#

Record a new blocker report

Parameters:
  • agent_id (str)

  • task_id (str)

  • description (str)

Return type:

BlockerReport

get_current_risks()[source]#

Get list of current project risks

Return type:

List[ProjectRisk]

get_active_blockers()[source]#

Get list of unresolved blockers

Return type:

List[BlockerReport]

Examples

Initialize and start monitoring:

>>> monitor = ProjectMonitor()
>>> await monitor.start_monitoring()

Get project health status:

>>> state = await monitor.get_project_state()
>>> print(f"Progress: {state.progress_percent}%")
>>> print(f"Risk: {state.risk_level.value}")

Record a blocker:

>>> blocker = await monitor.record_blocker(
...     agent_id="agent-001",
...     task_id="task-123",
...     description="API endpoint not responding"
... )
__init__()[source]#
Return type:

None

async start_monitoring()[source]#

Start the continuous monitoring loop.

Initiates the main monitoring loop that continuously collects project data, analyzes health metrics, checks for issues, and records historical data at regular intervals defined by check_interval.

The monitoring loop performs the following operations: 1. Collect current project data from the kanban board 2. Analyze project health using AI engine 3. Check for various issues (stalled tasks, capacity problems, etc.) 4. Record metrics for historical tracking

The loop continues until stop_monitoring() is called or an unrecoverable error occurs. Individual monitoring cycles that encounter errors will log the error and continue with the next cycle.

Raises:

Exception – If there are critical errors in the monitoring setup that prevent the loop from starting

Return type:

None

Examples

>>> monitor = ProjectMonitor()
>>> await monitor.start_monitoring()  # Runs indefinitely

Run monitoring for a specific duration:

>>> import asyncio
>>> monitor = ProjectMonitor()
>>>
>>> # Start monitoring in background
>>> monitor_task = asyncio.create_task(monitor.start_monitoring())
>>>
>>> # Do other work...
>>> await asyncio.sleep(3600)  # Run for 1 hour
>>>
>>> # Stop monitoring
>>> await monitor.stop_monitoring()
>>> await monitor_task

Notes

The monitoring interval can be configured via the ‘monitoring_interval’ setting. Default is 900 seconds (15 minutes). Shorter intervals provide more responsive monitoring but increase system load.

async stop_monitoring()[source]#

Stop the continuous monitoring loop.

Gracefully stops the monitoring loop by setting the is_monitoring flag to False. The current monitoring cycle will complete before the loop exits.

Examples

>>> monitor = ProjectMonitor()
>>> # Start monitoring in background
>>> monitor_task = asyncio.create_task(monitor.start_monitoring())
>>>
>>> # Stop monitoring after some time
>>> await monitor.stop_monitoring()
>>> await monitor_task  # Wait for loop to exit
Return type:

None

async get_project_state()[source]#

Get current project state with latest metrics.

Retrieves the current project state including progress metrics, task counts, velocity data, and risk assessment. If no current state exists, it will collect fresh data from the kanban board.

Returns:

Current project state containing: - board_id: Kanban board identifier - project_name: Name of the project - total_tasks: Total number of tasks - completed_tasks: Number of completed tasks - in_progress_tasks: Number of tasks in progress - blocked_tasks: Number of blocked tasks - progress_percent: Overall progress percentage - overdue_tasks: List of overdue tasks - team_velocity: Tasks completed per week - risk_level: Overall project risk level - last_updated: Timestamp of last update

Return type:

ProjectState

Examples

>>> monitor = ProjectMonitor()
>>> state = await monitor.get_project_state()
>>> print(f"Project: {state.project_name}")
>>> print(f"Progress: {state.progress_percent:.1f}%")
>>> print(f"Velocity: {state.team_velocity} tasks/week")
>>> print(f"Risk Level: {state.risk_level.value}")

Check for overdue tasks:

>>> state = await monitor.get_project_state()
>>> if state.overdue_tasks:
...     print(f"Warning: {len(state.overdue_tasks)} overdue tasks")
...     for task in state.overdue_tasks:
...         print(f"  - {task.name} (due: {task.due_date})")
async record_blocker(agent_id, task_id, description)[source]#

Record a new blocker report from an agent.

Creates a new blocker report when an agent encounters an issue that prevents task progress. The blocker is tracked for resolution and included in project health assessments.

Parameters:
  • agent_id (str) – Unique identifier of the agent reporting the blocker

  • task_id (str) – Unique identifier of the blocked task

  • description (str) – Detailed description of the blocking issue

Returns:

Created blocker report containing: - task_id: ID of the blocked task - reporter_id: ID of the reporting agent - description: Description of the blocking issue - severity: Risk level of the blocker (default: MEDIUM) - reported_at: Timestamp when blocker was reported - resolved: Resolution status (default: False)

Return type:

BlockerReport

Examples

>>> blocker = await monitor.record_blocker(
...     agent_id="agent-backend-001",
...     task_id="task-user-auth-123",
...     description="Database connection timeout after 30 seconds"
... )
>>> print(f"Blocker recorded: {blocker.task_id}")

Record multiple related blockers:

>>> blockers = []
>>> for task_id in failing_tasks:
...     blocker = await monitor.record_blocker(
...         agent_id="agent-001",
...         task_id=task_id,
...         description="External API rate limit exceeded"
...     )
...     blockers.append(blocker)

Notes

Blockers are automatically assigned MEDIUM severity. For more precise severity assessment, consider extending this method to accept severity as a parameter or implement automatic severity classification based on the description content.

get_current_risks()[source]#

Get current project risks identified by monitoring and analysis.

Returns the list of all currently identified project risks, including those from AI analysis, automated issue detection, and manual risk assessments.

Returns:

List of current project risks, each containing: - risk_type: Category of risk (e.g., ‘stalled_task’, ‘capacity’) - description: Detailed description of the risk - severity: Risk severity level (LOW, MEDIUM, HIGH, CRITICAL) - probability: Likelihood of risk occurrence (0.0-1.0) - impact: Description of potential impact - mitigation_strategy: Recommended mitigation approach - identified_at: Timestamp when risk was identified

Return type:

List[ProjectRisk]

Examples

>>> risks = monitor.get_current_risks()
>>> print(f"Total risks: {len(risks)}")
>>>
>>> # Filter by severity
>>> critical_risks = [
...     risk for risk in risks
...     if risk.severity == RiskLevel.CRITICAL
... ]
>>>
>>> # Display risk summary
>>> for risk in risks:
...     print(f"{risk.risk_type}: {risk.description}")
...     print(f"  Severity: {risk.severity.value}")
...     print(f"  Mitigation: {risk.mitigation_strategy}")
get_active_blockers()[source]#

Get active (unresolved) blocker reports.

Returns all blocker reports that have not yet been resolved, providing visibility into current impediments affecting project progress.

Returns:

List of unresolved blocker reports, each containing: - task_id: ID of the blocked task - reporter_id: ID of the agent that reported the blocker - description: Description of the blocking issue - severity: Severity level of the blocker - reported_at: When the blocker was first reported - resolved: Resolution status (False for active blockers)

Return type:

List[BlockerReport]

Examples

>>> blockers = monitor.get_active_blockers()
>>> print(f"Active blockers: {len(blockers)}")
>>>
>>> # Group by task
>>> from collections import defaultdict
>>> by_task = defaultdict(list)
>>> for blocker in blockers:
...     by_task[blocker.task_id].append(blocker)
>>>
>>> # Display blocker summary
>>> for blocker in blockers:
...     print(f"Task {blocker.task_id}: {blocker.description}")
...     print(f"  Reported by: {blocker.reporter_id}")
...     print(f"  Severity: {blocker.severity.value}")
...     print(f"  Age: {datetime.now(timezone.utc) - blocker.reported_at}")

Notes

Active blockers represent ongoing impediments that require attention. Regular review of active blockers helps identify patterns and prioritize resolution efforts.

async trigger_project_completion_learning(team_members, outcome, github_owner=None, github_repo=None)[source]#

Trigger pattern learning and quality assessment when a project completes.

This method should be called when a project reaches completion to: 1. Extract patterns from the completed project 2. Run comprehensive quality assessment 3. Store insights for future projects

Parameters:
  • team_members (List[Any]) – List of team members who worked on the project

  • outcome (ProjectOutcome) – The actual outcome of the project

  • github_owner (Optional[str]) – GitHub repository owner for code analysis

  • github_repo (Optional[str]) – GitHub repository name for code analysis

Returns:

Results containing pattern extraction and quality assessment

Return type:

Dict[str, Any]

Examples

>>> outcome = ProjectOutcome(
...     successful=True,
...     completion_time_days=35,
...     quality_score=0.85,
...     cost=50000
... )
>>> results = await monitor.trigger_project_completion_learning(
...     team_members, outcome, "owner", "repo"
... )
>>> print(f"Quality Grade: {results['quality_assessment']['grade']}")

Notes

This method integrates pattern learning with quality assessment to provide comprehensive insights that help improve future projects.

async get_pattern_based_recommendations()[source]#

Get recommendations based on learned patterns for the current project.

Returns:

List of recommendations with type, message, confidence, and impact

Return type:

List[Dict[str, Any]]

Examples

>>> recommendations = await monitor.get_pattern_based_recommendations()
>>> for rec in recommendations:
...     print(f"{rec['message']} (confidence: {rec['confidence']:.0%})")