System Health & Diagnostics: Marcus Monitoring Intelligence#

Internal Systems Architecture Deep Dive#

Marcus’s System Health & Diagnostics tools are sophisticated monitoring and diagnostic systems that provide real-time visibility into system health, assignment integrity, and operational effectiveness. These aren’t simple ping checks - they’re comprehensive health intelligence systems that monitor system connectivity, validate assignment consistency, assess Kanban board health, and provide deep diagnostic insights for maintaining optimal Marcus coordination effectiveness.


🎯 System Overview#

System Health & Diagnostics Architecture
        ↓
Multi-Layer Health Intelligence
        ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ System          β”‚ Assignment      β”‚ Board Health    β”‚
β”‚ Connectivity    β”‚ Health          β”‚ Monitoring      β”‚
β”‚ & Status        β”‚ Monitoring      β”‚ & Validation    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
        ↓                ↓                ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Health Metrics  β”‚ Consistency     β”‚ Integration     β”‚
β”‚ & Analytics     β”‚ Validation      β”‚ Diagnostics     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
        ↓
Proactive Health Management & System Optimization

Core Purpose: Transform system monitoring from reactive problem detection to proactive health intelligence that prevents coordination failures and optimizes system performance.


πŸ—οΈ Core Diagnostic Tools#

Tool 1: System Connectivity & Status (ping)#

File: src/marcus_mcp/tools/system.py - ping function Purpose: Advanced system connectivity verification with health diagnostics

async def ping(echo: str, state: Any) -> Dict[str, Any]:
    """
    Check Marcus status and connectivity with enhanced health diagnostics.

    Extended health check endpoint that verifies the Marcus system
    is online and responsive. Can echo back a message and provide
    detailed system health information.

    Special echo commands:
    - "health": Return detailed health information
    - "cleanup": Force cleanup of stuck task assignments
    - "reset": Clear all pending assignments (use with caution)
    """

What Happens During a Ping:#

Stage 1: Client Type Detection & Context Analysis

# Intelligent client identification
client_type = "unknown"
if echo:
    echo_lower = echo.lower()
    if "cato" in echo_lower:
        client_type = "cato"
    elif "claude" in echo_lower or "desktop" in echo_lower:
        client_type = "claude_desktop"

# Context-aware response customization
client_context = {
    "cato": {
        "capabilities": ["advanced_ai_analysis", "workflow_optimization"],
        "preferred_response_format": "detailed_technical",
        "monitoring_needs": "performance_metrics"
    },
    "claude_desktop": {
        "capabilities": ["task_management", "project_coordination"],
        "preferred_response_format": "user_friendly",
        "monitoring_needs": "status_overview"
    }
}

Stage 2: System Health Assessment

if echo and echo.lower() == "health":
    health_data = await _get_comprehensive_health_diagnostics(state)

    system_health = {
        "core_systems": {
            "marcus_core": "operational",
            "memory_system": await _check_memory_system_health(state),
            "ai_engine": await _check_ai_engine_health(state),
            "communication_hub": await _check_communication_health(state)
        },
        "integration_health": {
            "kanban_connectivity": await _check_kanban_health(state),
            "database_connectivity": await _check_database_health(state),
            "external_apis": await _check_external_api_health(state)
        },
        "performance_metrics": {
            "response_time": await _measure_system_response_time(),
            "memory_usage": await _get_memory_utilization(),
            "active_connections": await _count_active_connections(),
            "queue_depths": await _analyze_queue_depths(state)
        }
    }

Stage 3: Advanced Diagnostic Commands

if echo and echo.lower() == "cleanup":
    # Force cleanup of stuck assignments
    cleanup_results = await _force_assignment_cleanup(state)

    return {
        "status": "cleanup_completed",
        "assignments_cleared": cleanup_results.cleared_count,
        "stuck_tasks_resolved": cleanup_results.resolved_tasks,
        "system_health": "restored",
        "cleanup_summary": cleanup_results.summary
    }

elif echo and echo.lower() == "reset":
    # Emergency reset (use with caution)
    reset_results = await _emergency_system_reset(state)

    return {
        "status": "system_reset_completed",
        "warning": "All pending assignments cleared",
        "assignments_cleared": reset_results.total_cleared,
        "system_state": "reset_to_clean_slate",
        "requires_reinitialization": True
    }

Tool 2: Assignment Health Monitoring (check_assignment_health)#

Purpose: Comprehensive assignment system integrity verification

async def check_assignment_health(state: Any) -> Dict[str, Any]:
    """
    Monitor assignment system health and detect integrity issues.

    Comprehensive health check that validates:
    - Assignment-lease consistency
    - Task status synchronization
    - Agent assignment conflicts
    - Orphaned assignments detection
    - Performance health metrics
    """

Assignment Health Analysis Workflow:#

Stage 1: Assignment-Lease Consistency Validation

async def _validate_assignment_lease_consistency(state: Any) -> Dict[str, Any]:
    """
    Check consistency between assignments and their leases

    Identifies:
    - Assignments without active leases
    - Expired leases with active assignments
    - Lease renewal failures
    - Assignment state mismatches
    """

    consistency_issues = []

    # Get all active assignments
    active_assignments = state.assignment_persistence.get_all_assignments()

    # Check each assignment's lease status
    for agent_id, assignment in active_assignments.items():
        lease_status = await state.assignment_lease_manager.get_lease_status(
            assignment.task_id
        )

        # Detect consistency issues
        if not lease_status:
            consistency_issues.append({
                "type": "missing_lease",
                "agent_id": agent_id,
                "task_id": assignment.task_id,
                "severity": "high",
                "impact": "Assignment may become stuck without lease management"
            })

        elif lease_status.status == "expired" and assignment.status == "active":
            consistency_issues.append({
                "type": "expired_lease_active_assignment",
                "agent_id": agent_id,
                "task_id": assignment.task_id,
                "lease_expired": lease_status.expired_at,
                "severity": "critical",
                "impact": "Task may be stuck with unresponsive agent"
            })

    return {
        "consistency_score": 1.0 - (len(consistency_issues) / max(len(active_assignments), 1)),
        "issues_detected": len(consistency_issues),
        "issues": consistency_issues,
        "health_status": "healthy" if len(consistency_issues) == 0 else "issues_detected"
    }

Stage 2: Task Status Synchronization Check

async def _validate_task_status_sync(state: Any) -> Dict[str, Any]:
    """
    Ensure task status consistency between Marcus and Kanban systems

    Validates:
    - Marcus internal task status vs Kanban board status
    - Assignment records vs actual task assignments
    - Task completion status synchronization
    - Dependency status accuracy
    """

    sync_issues = []

    # Compare Marcus state with Kanban state
    marcus_tasks = {task.id: task for task in state.project_tasks}

    for task_id, marcus_task in marcus_tasks.items():
        try:
            kanban_task = await state.kanban_client.get_task_by_id(task_id)

            # Check status synchronization
            if marcus_task.status != kanban_task.status:
                sync_issues.append({
                    "type": "status_mismatch",
                    "task_id": task_id,
                    "marcus_status": marcus_task.status,
                    "kanban_status": kanban_task.status,
                    "severity": "medium",
                    "auto_fixable": True
                })

            # Check assignment synchronization
            marcus_assigned = marcus_task.assigned_to
            kanban_assigned = kanban_task.assigned_to

            if marcus_assigned != kanban_assigned:
                sync_issues.append({
                    "type": "assignment_mismatch",
                    "task_id": task_id,
                    "marcus_assigned": marcus_assigned,
                    "kanban_assigned": kanban_assigned,
                    "severity": "high",
                    "requires_resolution": True
                })

        except Exception as e:
            sync_issues.append({
                "type": "sync_error",
                "task_id": task_id,
                "error": str(e),
                "severity": "critical",
                "requires_investigation": True
            })

    return {
        "sync_health_score": 1.0 - (len(sync_issues) / max(len(marcus_tasks), 1)),
        "sync_issues": sync_issues,
        "auto_fixable_issues": len([i for i in sync_issues if i.get("auto_fixable")]),
        "critical_issues": len([i for i in sync_issues if i.get("severity") == "critical"])
    }

Stage 3: Orphaned Assignment Detection

async def _detect_orphaned_assignments(state: Any) -> Dict[str, Any]:
    """
    Identify assignments that have become orphaned or stuck

    Detects:
    - Assignments to offline/unresponsive agents
    - Tasks assigned but not in agent's active task list
    - Long-running assignments without progress
    - Circular assignment dependencies
    """

    orphaned_assignments = []

    # Check for assignments to inactive agents
    active_assignments = state.assignment_persistence.get_all_assignments()

    for agent_id, assignment in active_assignments.items():
        # Check agent responsiveness
        agent_status = state.agent_status.get(agent_id)

        if not agent_status:
            orphaned_assignments.append({
                "type": "unknown_agent",
                "agent_id": agent_id,
                "task_id": assignment.task_id,
                "assigned_at": assignment.assigned_at,
                "severity": "high",
                "recovery_action": "reassign_to_available_agent"
            })

        elif agent_status.last_activity:
            time_since_activity = datetime.now() - agent_status.last_activity

            if time_since_activity > timedelta(hours=6):
                orphaned_assignments.append({
                    "type": "unresponsive_agent",
                    "agent_id": agent_id,
                    "task_id": assignment.task_id,
                    "last_activity": agent_status.last_activity,
                    "hours_since_activity": time_since_activity.total_seconds() / 3600,
                    "severity": "medium",
                    "recovery_action": "check_agent_status_or_reassign"
                })

        # Check for assignments without progress
        if assignment.progress_percentage == 0:
            assignment_age = datetime.now() - assignment.assigned_at

            if assignment_age > timedelta(hours=4):
                orphaned_assignments.append({
                    "type": "stalled_assignment",
                    "agent_id": agent_id,
                    "task_id": assignment.task_id,
                    "assigned_duration": assignment_age.total_seconds() / 3600,
                    "severity": "medium",
                    "recovery_action": "contact_agent_or_provide_support"
                })

    return {
        "orphaned_count": len(orphaned_assignments),
        "orphaned_assignments": orphaned_assignments,
        "recovery_actions_needed": len([a for a in orphaned_assignments if "recovery_action" in a]),
        "immediate_attention_required": len([a for a in orphaned_assignments if a.get("severity") == "high"])
    }

Tool 3: Board Health Monitoring (check_board_health)#

Purpose: Kanban board health and integration diagnostics

async def check_board_health(state: Any) -> Dict[str, Any]:
    """
    Monitor Kanban board health and integration status.

    Comprehensive board health analysis including:
    - Board connectivity and responsiveness
    - Data consistency and synchronization
    - Performance metrics and bottlenecks
    - Integration health with Marcus systems
    """

Board Health Analysis Workflow:#

Stage 1: Connectivity & Performance Assessment

async def _assess_board_connectivity(state: Any) -> Dict[str, Any]:
    """
    Test board connectivity and measure performance metrics

    Tests:
    - API endpoint responsiveness
    - Authentication status
    - Request/response latency
    - Rate limiting status
    - Error rates and patterns
    """

    connectivity_results = {
        "connection_status": "unknown",
        "response_times": {},
        "error_rates": {},
        "authentication_valid": False
    }

    try:
        # Test basic connectivity
        start_time = datetime.now()
        board_info = await state.kanban_client.get_board_info()
        response_time = (datetime.now() - start_time).total_seconds()

        connectivity_results.update({
            "connection_status": "connected",
            "board_info": board_info,
            "basic_response_time": response_time,
            "authentication_valid": True
        })

        # Test various operations for performance profiling
        operations = {
            "get_tasks": lambda: state.kanban_client.get_tasks(),
            "get_columns": lambda: state.kanban_client.get_columns(),
            "get_labels": lambda: state.kanban_client.get_labels()
        }

        for op_name, operation in operations.items():
            try:
                start_time = datetime.now()
                await operation()
                op_response_time = (datetime.now() - start_time).total_seconds()
                connectivity_results["response_times"][op_name] = op_response_time
                connectivity_results["error_rates"][op_name] = 0.0

            except Exception as e:
                connectivity_results["response_times"][op_name] = "timeout"
                connectivity_results["error_rates"][op_name] = 1.0
                connectivity_results[f"{op_name}_error"] = str(e)

    except Exception as e:
        connectivity_results.update({
            "connection_status": "failed",
            "connection_error": str(e),
            "authentication_valid": False
        })

    return connectivity_results

Stage 2: Data Consistency Validation

async def _validate_board_data_consistency(state: Any) -> Dict[str, Any]:
    """
    Validate data consistency between board and Marcus expectations

    Validates:
    - Task count consistency
    - Column structure matches expectations
    - Label system completeness
    - Assignment data accuracy
    """

    consistency_results = {
        "data_consistency_score": 1.0,
        "issues_detected": [],
        "validation_summary": {}
    }

    # Validate task count consistency
    marcus_task_count = len(state.project_tasks)
    try:
        board_tasks = await state.kanban_client.get_tasks()
        board_task_count = len(board_tasks)

        task_count_variance = abs(marcus_task_count - board_task_count)

        if task_count_variance > 0:
            consistency_results["issues_detected"].append({
                "type": "task_count_mismatch",
                "marcus_tasks": marcus_task_count,
                "board_tasks": board_task_count,
                "variance": task_count_variance,
                "severity": "medium" if task_count_variance <= 3 else "high"
            })

        consistency_results["validation_summary"]["task_count_check"] = {
            "passed": task_count_variance == 0,
            "marcus_count": marcus_task_count,
            "board_count": board_task_count
        }

    except Exception as e:
        consistency_results["issues_detected"].append({
            "type": "task_validation_error",
            "error": str(e),
            "severity": "critical"
        })

    # Validate column structure
    try:
        expected_columns = ["TODO", "IN_PROGRESS", "TESTING", "DONE", "BLOCKED"]
        board_columns = await state.kanban_client.get_columns()
        board_column_names = [col.name for col in board_columns]

        missing_columns = set(expected_columns) - set(board_column_names)
        extra_columns = set(board_column_names) - set(expected_columns)

        if missing_columns or extra_columns:
            consistency_results["issues_detected"].append({
                "type": "column_structure_mismatch",
                "missing_columns": list(missing_columns),
                "extra_columns": list(extra_columns),
                "severity": "medium"
            })

        consistency_results["validation_summary"]["column_structure_check"] = {
            "passed": len(missing_columns) == 0 and len(extra_columns) == 0,
            "expected": expected_columns,
            "actual": board_column_names
        }

    except Exception as e:
        consistency_results["issues_detected"].append({
            "type": "column_validation_error",
            "error": str(e),
            "severity": "high"
        })

    # Calculate overall consistency score
    total_issues = len(consistency_results["issues_detected"])
    critical_issues = len([i for i in consistency_results["issues_detected"] if i.get("severity") == "critical"])

    if critical_issues > 0:
        consistency_results["data_consistency_score"] = 0.3
    elif total_issues > 0:
        consistency_results["data_consistency_score"] = max(0.5, 1.0 - (total_issues * 0.1))

    return consistency_results

πŸ“Š Advanced Health Intelligence#

Health Metrics Aggregation#

class SystemHealthAggregator:
    """Aggregates health metrics across all diagnostic tools"""

    async def generate_comprehensive_health_report(
        self,
        state: Any
    ) -> Dict[str, Any]:
        """
        Generate comprehensive system health report

        Combines:
        - System connectivity and performance
        - Assignment system integrity
        - Board health and synchronization
        - Predictive health indicators
        """

        # Gather health data from all diagnostic tools
        ping_health = await self._get_system_health_metrics(state)
        assignment_health = await check_assignment_health(state)
        board_health = await check_board_health(state)

        # Calculate overall system health score
        overall_health_score = self._calculate_overall_health_score(
            ping_health, assignment_health, board_health
        )

        # Generate health recommendations
        recommendations = self._generate_health_recommendations(
            ping_health, assignment_health, board_health
        )

        return {
            "overall_health_score": overall_health_score,
            "health_grade": self._score_to_grade(overall_health_score),
            "system_components": {
                "core_system": ping_health,
                "assignment_system": assignment_health,
                "board_integration": board_health
            },
            "critical_issues": self._identify_critical_issues(
                ping_health, assignment_health, board_health
            ),
            "performance_metrics": self._aggregate_performance_metrics(
                ping_health, assignment_health, board_health
            ),
            "recommendations": recommendations,
            "next_health_check": datetime.now() + timedelta(hours=1),
            "health_trend": self._analyze_health_trend(state)
        }

    def _calculate_overall_health_score(
        self,
        ping_health: Dict[str, Any],
        assignment_health: Dict[str, Any],
        board_health: Dict[str, Any]
    ) -> float:
        """Calculate weighted overall health score"""

        weights = {
            "system_connectivity": 0.25,
            "assignment_consistency": 0.35,  # Critical for coordination
            "board_integration": 0.25,
            "performance_metrics": 0.15
        }

        scores = {
            "system_connectivity": ping_health.get("connectivity_score", 1.0),
            "assignment_consistency": assignment_health.get("consistency_score", 1.0),
            "board_integration": board_health.get("integration_score", 1.0),
            "performance_metrics": self._calculate_performance_score(ping_health, board_health)
        }

        weighted_score = sum(
            scores[component] * weights[component]
            for component in weights
        )

        return round(weighted_score, 3)

Proactive Health Management#

class ProactiveHealthManager:
    """Manages proactive health monitoring and issue prevention"""

    async def identify_health_degradation_patterns(
        self,
        state: Any
    ) -> List[HealthDegradationPattern]:
        """
        Identify patterns that indicate health degradation

        Monitors:
        - Increasing response times
        - Growing assignment inconsistencies
        - Declining board synchronization
        - Resource utilization trends
        """

        patterns = []

        # Analyze response time trends
        recent_response_times = await self._get_recent_response_times(state)
        if self._shows_degradation_trend(recent_response_times):
            patterns.append(HealthDegradationPattern(
                type="performance_degradation",
                severity="medium",
                description="System response times showing upward trend",
                predicted_impact="Coordination delays may increase",
                recommended_action="Investigate resource utilization and optimize"
            ))

        # Analyze assignment consistency trends
        assignment_health_history = await self._get_assignment_health_history(state)
        if self._shows_consistency_degradation(assignment_health_history):
            patterns.append(HealthDegradationPattern(
                type="assignment_consistency_degradation",
                severity="high",
                description="Assignment-lease consistency declining",
                predicted_impact="Risk of stuck tasks and coordination failures",
                recommended_action="Run assignment cleanup and validate lease management"
            ))

        return patterns

    async def execute_proactive_maintenance(
        self,
        state: Any,
        maintenance_type: str = "routine"
    ) -> Dict[str, Any]:
        """
        Execute proactive maintenance based on health analysis

        Maintenance types:
        - routine: Regular optimization and cleanup
        - targeted: Address specific identified issues
        - emergency: Respond to critical health degradation
        """

        maintenance_results = {
            "maintenance_type": maintenance_type,
            "actions_taken": [],
            "issues_resolved": [],
            "performance_improvements": {}
        }

        if maintenance_type in ["routine", "targeted"]:
            # Clean up expired assignments
            cleanup_results = await self._cleanup_expired_assignments(state)
            maintenance_results["actions_taken"].append("expired_assignment_cleanup")
            maintenance_results["issues_resolved"].extend(cleanup_results.resolved_issues)

            # Synchronize task states
            sync_results = await self._synchronize_task_states(state)
            maintenance_results["actions_taken"].append("task_state_synchronization")
            maintenance_results["performance_improvements"]["sync_accuracy"] = sync_results.improvement_score

            # Optimize memory usage
            memory_optimization = await self._optimize_memory_usage(state)
            maintenance_results["actions_taken"].append("memory_optimization")
            maintenance_results["performance_improvements"]["memory_efficiency"] = memory_optimization.efficiency_gain

        return maintenance_results

πŸ” Integration Points#

With Assignment System#

async def validate_assignment_system_health(
    assignment_persistence: AssignmentPersistence,
    lease_manager: AssignmentLeaseManager
) -> Dict[str, Any]:
    """
    Deep validation of assignment system health

    Checks:
    - Assignment persistence integrity
    - Lease management effectiveness
    - Assignment-task synchronization
    - Performance bottlenecks
    """

With Monitoring Systems#

async def integrate_with_monitoring(
    health_data: Dict[str, Any],
    monitoring_system: MonitoringSystem
) -> None:
    """
    Feed health data into monitoring systems

    Provides:
    - Real-time health metrics
    - Alert triggers for degradation
    - Performance trend analysis
    - Predictive maintenance signals
    """

🎯 Key Capabilities#

1. Comprehensive Health Visibility#

System health tools provide complete visibility into Marcus coordination health:

  • System Connectivity: Real-time status of all Marcus components

  • Assignment Integrity: Deep validation of task assignment consistency

  • Board Health: Kanban integration health and performance

  • Performance Monitoring: Response times, throughput, and resource utilization

2. Proactive Issue Detection#

Advanced diagnostics identify problems before they impact coordination:

  • Degradation Pattern Recognition: Identifies trends indicating health decline

  • Predictive Maintenance: Proactive optimization before issues occur

  • Consistency Validation: Prevents data synchronization problems

  • Performance Optimization: Maintains optimal system responsiveness

3. Automated Recovery & Maintenance#

Intelligent recovery mechanisms maintain system health:

  • Automatic Cleanup: Removes stuck assignments and expired leases

  • State Synchronization: Maintains consistency between systems

  • Emergency Recovery: Rapid response to critical health issues

  • Performance Tuning: Continuous optimization of system performance


🎯 System Impact#

Without Health & Diagnostics#

  • Coordination failures discovered only after impact

  • Manual detection of system inconsistencies

  • Reactive response to performance degradation

  • Limited visibility into system health trends

  • No proactive maintenance or optimization

With Health & Diagnostics#

  • Proactive Health Management: Issues identified and resolved before impact

  • Comprehensive System Visibility: Complete insight into coordination health

  • Automated Maintenance: Self-healing capabilities maintain optimal performance

  • Predictive Intelligence: Trend analysis prevents future problems

  • Continuous Optimization: System performance continuously improved


🎯 Key Takeaway#

The System Health & Diagnostics tools transform Marcus from a coordination system that fails unpredictably into a self-monitoring, self-healing coordination intelligence with comprehensive health visibility, proactive issue detection, and automated maintenance capabilities.

These tools ensure that Marcus coordination remains reliable, performant, and resilient, providing the foundation for trustworthy multi-agent project management at scale.