Source code for src.core.error_monitoring

"""
Marcus Error Monitoring and Correlation System.

Provides comprehensive error tracking, pattern analysis, and correlation
capabilities for autonomous agent environments.
"""

import asyncio
import json
import logging
import threading
import time
from asyncio import Task
from collections import defaultdict, deque
from dataclasses import asdict, dataclass, field
from datetime import datetime, timedelta, timezone
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Set

from .error_framework import ErrorSeverity, MarcusBaseError

logger = logging.getLogger(__name__)


[docs] class AlertSeverity(Enum): """Alert severity levels.""" INFO = "info" WARNING = "warning" ERROR = "error" CRITICAL = "critical"
[docs] @dataclass class ErrorMetrics: """Error metrics for monitoring.""" total_errors: int = 0 errors_by_type: Dict[str, int] = field(default_factory=dict) errors_by_severity: Dict[str, int] = field(default_factory=dict) errors_by_category: Dict[str, int] = field(default_factory=dict) errors_by_agent: Dict[str, int] = field(default_factory=dict) errors_by_operation: Dict[str, int] = field(default_factory=dict) retryable_errors: int = 0 critical_errors: int = 0 error_rate_per_minute: float = 0.0 last_updated: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
[docs] @dataclass class ErrorPattern: """Detected error pattern.""" pattern_id: str pattern_type: str description: str frequency: int first_seen: datetime last_seen: datetime affected_agents: Set[str] = field(default_factory=set) affected_operations: Set[str] = field(default_factory=set) severity: ErrorSeverity = ErrorSeverity.MEDIUM sample_errors: List[str] = field(default_factory=list)
[docs] @dataclass class CorrelationGroup: """Group of correlated errors.""" group_id: str correlation_key: str errors: List[str] = field(default_factory=list) # Error correlation IDs start_time: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) end_time: Optional[datetime] = None pattern: Optional[str] = None root_cause: Optional[str] = None
[docs] class ErrorMonitor: """ Comprehensive error monitoring system. Tracks error patterns, provides real-time metrics, and enables proactive issue detection for autonomous agents. """
[docs] def __init__( self, storage_path: str = "logs/error_monitoring.json", metrics_window_minutes: int = 60, pattern_detection_enabled: bool = True, correlation_timeout_minutes: int = 30, ): self.storage_path = Path(storage_path) self.metrics_window_minutes = metrics_window_minutes self.pattern_detection_enabled = pattern_detection_enabled self.correlation_timeout_minutes = correlation_timeout_minutes # Error storage self.error_history: deque[Dict[str, Any]] = deque( maxlen=10000 ) # Keep last 10k errors self.error_index: Dict[str, Dict[str, Any]] = {} # correlation_id -> error data # Metrics self.current_metrics = ErrorMetrics() self.metrics_history: List[ErrorMetrics] = [] # Pattern detection self.detected_patterns: Dict[str, ErrorPattern] = {} self.pattern_thresholds = { "frequency_threshold": 5, # Same error 5+ times "burst_threshold": 10, # 10+ errors in short time "agent_error_threshold": 20, # 20+ errors from same agent "cascade_threshold": 3, # 3+ related errors in sequence } # Correlation tracking self.correlation_groups: Dict[str, CorrelationGroup] = {} self.active_correlations: Dict[str, str] = {} # correlation_id -> group_id # Alert callbacks self.alert_callbacks: List[Callable[[ErrorPattern], None]] = [] # Background task management self._monitoring_task: Optional[Task[None]] = None self._lock = threading.Lock() # Initialize storage self._initialize_storage()
def _initialize_storage(self) -> None: """Initialize error monitoring storage.""" self.storage_path.parent.mkdir(parents=True, exist_ok=True) # Load existing data if available if self.storage_path.exists(): try: with open(self.storage_path, "r") as f: data = json.load(f) self._load_from_storage(data) except Exception as e: logger.warning(f"Failed to load error monitoring data: {e}") def _load_from_storage(self, data: Dict[str, Any]) -> None: """Load monitoring data from storage.""" # Load metrics history if "metrics_history" in data: for metrics_data in data["metrics_history"][-100:]: # Keep last 100 # Convert ISO string to datetime if needed if "last_updated" in metrics_data and isinstance( metrics_data["last_updated"], str ): metrics_data["last_updated"] = datetime.fromisoformat( metrics_data["last_updated"] ) metrics = ErrorMetrics(**metrics_data) self.metrics_history.append(metrics) # Load detected patterns if "patterns" in data: for pattern_data in data["patterns"].values(): # Convert ISO strings to datetime objects if "first_seen" in pattern_data and isinstance( pattern_data["first_seen"], str ): pattern_data["first_seen"] = datetime.fromisoformat( pattern_data["first_seen"] ) if "last_seen" in pattern_data and isinstance( pattern_data["last_seen"], str ): pattern_data["last_seen"] = datetime.fromisoformat( pattern_data["last_seen"] ) if "severity" in pattern_data and isinstance( pattern_data["severity"], str ): pattern_data["severity"] = ErrorSeverity(pattern_data["severity"]) pattern = ErrorPattern(**pattern_data) pattern.affected_agents = set(pattern.affected_agents) pattern.affected_operations = set(pattern.affected_operations) self.detected_patterns[pattern.pattern_id] = pattern def _save_to_storage(self) -> None: """Save monitoring data to storage.""" try: # Convert patterns for JSON serialization patterns_data = {} for pattern_id, pattern in self.detected_patterns.items(): pattern_dict = asdict(pattern) pattern_dict["affected_agents"] = list(pattern.affected_agents) pattern_dict["affected_operations"] = list(pattern.affected_operations) pattern_dict["first_seen"] = pattern.first_seen.isoformat() pattern_dict["last_seen"] = pattern.last_seen.isoformat() pattern_dict["severity"] = pattern.severity.value patterns_data[pattern_id] = pattern_dict # Convert metrics for JSON serialization metrics_data = [] for metrics in self.metrics_history[-100:]: # Keep last 100 metrics_dict = asdict(metrics) metrics_dict["last_updated"] = metrics.last_updated.isoformat() metrics_data.append(metrics_dict) data = { "patterns": patterns_data, "metrics_history": metrics_data, "last_updated": datetime.now(timezone.utc).isoformat(), } with open(self.storage_path, "w") as f: json.dump(data, f, indent=2) except Exception as e: logger.error(f"Failed to save error monitoring data: {e}")
[docs] def record_error(self, error: MarcusBaseError) -> None: """Record an error for monitoring and analysis.""" with self._lock: # Create error record error_record = { "correlation_id": error.context.correlation_id, "error_code": error.error_code, "error_type": error.__class__.__name__, "message": error.message, "severity": error.severity.value, "category": error.category.value, "retryable": error.retryable, "timestamp": error.context.timestamp, "operation": error.context.operation, "agent_id": error.context.agent_id, "task_id": error.context.task_id, "integration_name": error.context.integration_name, "custom_context": error.context.custom_context or {}, } # Store error self.error_history.append(error_record) self.error_index[error.context.correlation_id] = error_record # Update metrics self._update_metrics(error_record) # Pattern detection if self.pattern_detection_enabled: self._detect_patterns(error_record) # Correlation tracking self._track_correlations(error_record) # Log for debugging logger.debug( f"Recorded error: {error.error_code} ({error.context.correlation_id})" )
def _update_metrics(self, error_record: Dict[str, Any]) -> None: """Update error metrics.""" metrics = self.current_metrics # Update counters metrics.total_errors += 1 # By type error_type = error_record["error_type"] metrics.errors_by_type[error_type] = ( metrics.errors_by_type.get(error_type, 0) + 1 ) # By severity severity = error_record["severity"] metrics.errors_by_severity[severity] = ( metrics.errors_by_severity.get(severity, 0) + 1 ) # By category category = error_record["category"] metrics.errors_by_category[category] = ( metrics.errors_by_category.get(category, 0) + 1 ) # By agent if error_record["agent_id"]: agent_id = error_record["agent_id"] metrics.errors_by_agent[agent_id] = ( metrics.errors_by_agent.get(agent_id, 0) + 1 ) # By operation if error_record["operation"]: operation = error_record["operation"] metrics.errors_by_operation[operation] = ( metrics.errors_by_operation.get(operation, 0) + 1 ) # Special counters if error_record["retryable"]: metrics.retryable_errors += 1 if error_record["severity"] == "critical": metrics.critical_errors += 1 # Update timestamp metrics.last_updated = datetime.now(timezone.utc) # Calculate error rate self._calculate_error_rate() def _calculate_error_rate(self) -> None: """Calculate current error rate per minute.""" now = datetime.now(timezone.utc) cutoff_time = now - timedelta(minutes=self.metrics_window_minutes) # Count errors in time window recent_errors = sum( 1 for error in self.error_history if error["timestamp"] > cutoff_time ) self.current_metrics.error_rate_per_minute = ( recent_errors / self.metrics_window_minutes ) def _detect_patterns(self, error_record: Dict[str, Any]) -> None: """Detect error patterns for proactive alerting.""" now = datetime.now(timezone.utc) # Pattern 1: Frequency-based (same error type recurring) self._detect_frequency_pattern(error_record, now) # Pattern 2: Burst pattern (many errors in short time) self._detect_burst_pattern(error_record, now) # Pattern 3: Agent-specific error pattern self._detect_agent_pattern(error_record, now) # Pattern 4: Cascade pattern (related errors in sequence) self._detect_cascade_pattern(error_record, now) def _detect_frequency_pattern( self, error_record: Dict[str, Any], now: datetime ) -> None: """Detect frequency-based error patterns.""" error_type = error_record["error_type"] # Count recent occurrences of this error type recent_count = sum( 1 for error in self.error_history if ( error["error_type"] == error_type and now - error["timestamp"] < timedelta(minutes=10) ) ) if recent_count >= self.pattern_thresholds["frequency_threshold"]: pattern_id = f"frequency_{error_type}_{now.strftime('%Y%m%d_%H%M')}" if pattern_id not in self.detected_patterns: pattern = ErrorPattern( pattern_id=pattern_id, pattern_type="frequency", description=( f"High frequency of {error_type} errors " f"({recent_count} in 10 minutes)" ), frequency=recent_count, first_seen=now, last_seen=now, severity=( ErrorSeverity.MEDIUM if recent_count < 10 else ErrorSeverity.HIGH ), ) self.detected_patterns[pattern_id] = pattern self._notify_pattern_detected(pattern) else: # Update existing pattern pattern = self.detected_patterns[pattern_id] pattern.frequency = recent_count pattern.last_seen = now def _detect_burst_pattern( self, error_record: Dict[str, Any], now: datetime ) -> None: """Detect burst error patterns.""" # Count all errors in last 5 minutes burst_count = sum( 1 for error in self.error_history if now - error["timestamp"] < timedelta(minutes=5) ) if burst_count >= self.pattern_thresholds["burst_threshold"]: pattern_id = f"burst_{now.strftime('%Y%m%d_%H%M')}" if pattern_id not in self.detected_patterns: pattern = ErrorPattern( pattern_id=pattern_id, pattern_type="burst", description=( f"Error burst detected " f"({burst_count} errors in 5 minutes)" ), frequency=burst_count, first_seen=now, last_seen=now, severity=( ErrorSeverity.HIGH if burst_count < 20 else ErrorSeverity.CRITICAL ), ) self.detected_patterns[pattern_id] = pattern self._notify_pattern_detected(pattern) def _detect_agent_pattern( self, error_record: Dict[str, Any], now: datetime ) -> None: """Detect agent-specific error patterns.""" agent_id = error_record.get("agent_id") if not agent_id: return # Count errors from this agent in last 30 minutes agent_errors = sum( 1 for error in self.error_history if ( error.get("agent_id") == agent_id and now - error["timestamp"] < timedelta(minutes=30) ) ) if agent_errors >= self.pattern_thresholds["agent_error_threshold"]: pattern_id = f"agent_{agent_id}_{now.strftime('%Y%m%d_%H%M')}" if pattern_id not in self.detected_patterns: pattern = ErrorPattern( pattern_id=pattern_id, pattern_type="agent_specific", description=( f"High error rate from agent {agent_id} " f"({agent_errors} errors in 30 minutes)" ), frequency=agent_errors, first_seen=now, last_seen=now, severity=ErrorSeverity.MEDIUM, affected_agents={agent_id}, ) self.detected_patterns[pattern_id] = pattern self._notify_pattern_detected(pattern) def _detect_cascade_pattern( self, error_record: Dict[str, Any], now: datetime ) -> None: """Detect cascade error patterns (related errors in sequence).""" # Look for errors with similar context that occurred recently similar_errors = [] for error in list(self.error_history)[-50:]: # Check last 50 errors if ( now - error["timestamp"] < timedelta(minutes=5) and error["correlation_id"] != error_record["correlation_id"] ): # Check for similarity similarity_score = self._calculate_error_similarity(error, error_record) if similarity_score > 0.7: # 70% similarity threshold similar_errors.append(error) if len(similar_errors) >= self.pattern_thresholds["cascade_threshold"]: pattern_id = f"cascade_{now.strftime('%Y%m%d_%H%M%S')}" pattern = ErrorPattern( pattern_id=pattern_id, pattern_type="cascade", description=( f"Cascade pattern detected " f"({len(similar_errors)} related errors)" ), frequency=len(similar_errors), first_seen=now, last_seen=now, severity=ErrorSeverity.MEDIUM, ) self.detected_patterns[pattern_id] = pattern self._notify_pattern_detected(pattern) def _calculate_error_similarity( self, error1: Dict[str, Any], error2: Dict[str, Any] ) -> float: """Calculate similarity between two errors (0.0 to 1.0).""" similarity_factors = [] # Same error type if error1["error_type"] == error2["error_type"]: similarity_factors.append(0.4) # Same operation if error1.get("operation") == error2.get("operation"): similarity_factors.append(0.3) # Same integration if error1.get("integration_name") == error2.get("integration_name"): similarity_factors.append(0.2) # Similar timestamp (within 1 minute) time_diff = abs((error1["timestamp"] - error2["timestamp"]).total_seconds()) if time_diff < 60: similarity_factors.append(0.1) return sum(similarity_factors) def _track_correlations(self, error_record: Dict[str, Any]) -> None: """Track error correlations for root cause analysis.""" correlation_id = error_record["correlation_id"] # Find correlation key (operation + agent + integration) correlation_key = ( f"{error_record.get('operation', 'unknown')}_" f"{error_record.get('agent_id', 'unknown')}_" f"{error_record.get('integration_name', 'unknown')}" ) # Find or create correlation group group_id = None for gid, group in self.correlation_groups.items(): if group.correlation_key == correlation_key and datetime.now( timezone.utc ) - group.start_time < timedelta(minutes=self.correlation_timeout_minutes): group_id = gid break if not group_id: group_id = f"corr_{int(time.time())}_{correlation_key[:20]}" self.correlation_groups[group_id] = CorrelationGroup( group_id=group_id, correlation_key=correlation_key ) # Add error to group group = self.correlation_groups[group_id] group.errors.append(correlation_id) group.end_time = datetime.now(timezone.utc) self.active_correlations[correlation_id] = group_id def _notify_pattern_detected(self, pattern: ErrorPattern) -> None: """Notify about detected error pattern.""" logger.warning(f"Error pattern detected: {pattern.description}") # Call registered callbacks for callback in self.alert_callbacks: try: callback(pattern) except Exception as e: logger.error(f"Error in pattern alert callback: {e}")
[docs] def add_alert_callback(self, callback: Callable[[ErrorPattern], None]) -> None: """Add callback for pattern alerts.""" self.alert_callbacks.append(callback)
[docs] def get_current_metrics(self) -> ErrorMetrics: """Get current error metrics.""" return self.current_metrics
[docs] def get_metrics_history(self, hours: int = 24) -> List[ErrorMetrics]: """Get metrics history for specified hours.""" cutoff_time = datetime.now(timezone.utc) - timedelta(hours=hours) return [ metrics for metrics in self.metrics_history if metrics.last_updated > cutoff_time ]
[docs] def get_detected_patterns(self, active_only: bool = True) -> List[ErrorPattern]: """Get detected error patterns.""" if not active_only: return list(self.detected_patterns.values()) # Return only patterns from last 24 hours cutoff_time = datetime.now(timezone.utc) - timedelta(hours=24) return [ pattern for pattern in self.detected_patterns.values() if pattern.last_seen > cutoff_time ]
[docs] def get_correlation_groups( self, active_only: bool = True ) -> List[CorrelationGroup]: """Get error correlation groups.""" if not active_only: return list(self.correlation_groups.values()) # Return only active correlations cutoff_time = datetime.now(timezone.utc) - timedelta( minutes=self.correlation_timeout_minutes ) return [ group for group in self.correlation_groups.values() if group.end_time and group.end_time > cutoff_time ]
[docs] def get_error_details(self, correlation_id: str) -> Optional[Dict[str, Any]]: """Get detailed information about a specific error.""" return self.error_index.get(correlation_id)
[docs] def search_errors( self, error_type: Optional[str] = None, agent_id: Optional[str] = None, operation: Optional[str] = None, severity: Optional[str] = None, hours: int = 24, ) -> List[Dict[str, Any]]: """Search errors with specified criteria.""" cutoff_time = datetime.now(timezone.utc) - timedelta(hours=hours) results = [] for error in self.error_history: if error["timestamp"] < cutoff_time: continue # Apply filters if error_type and error["error_type"] != error_type: continue if agent_id and error.get("agent_id") != agent_id: continue if operation and error.get("operation") != operation: continue if severity and error["severity"] != severity: continue results.append(error) return results
[docs] def generate_health_report(self) -> Dict[str, Any]: """Generate comprehensive health report.""" current_metrics = self.get_current_metrics() active_patterns = self.get_detected_patterns(active_only=True) active_correlations = self.get_correlation_groups(active_only=True) # Health score calculation (0-100) health_score = 100 if current_metrics.error_rate_per_minute > 10: health_score -= 30 elif current_metrics.error_rate_per_minute > 5: health_score -= 15 elif current_metrics.error_rate_per_minute > 2: health_score -= 5 if current_metrics.critical_errors > 0: health_score -= 25 if len(active_patterns) > 0: health_score -= len(active_patterns) * 10 health_score = max(0, health_score) # Determine health status if health_score >= 90: health_status = "excellent" elif health_score >= 75: health_status = "good" elif health_score >= 50: health_status = "fair" elif health_score >= 25: health_status = "poor" else: health_status = "critical" return { "health_score": health_score, "health_status": health_status, "timestamp": datetime.now(timezone.utc).isoformat(), "metrics": asdict(current_metrics), "active_patterns": len(active_patterns), "active_correlations": len(active_correlations), "top_error_types": dict( sorted( current_metrics.errors_by_type.items(), key=lambda x: x[1], reverse=True, )[:5] ), "recommendations": self._generate_recommendations( current_metrics, active_patterns ), }
def _generate_recommendations( self, metrics: ErrorMetrics, patterns: List[ErrorPattern] ) -> List[str]: """Generate health recommendations.""" recommendations = [] if metrics.error_rate_per_minute > 5: recommendations.append( "High error rate detected - investigate recent changes" ) if metrics.critical_errors > 0: recommendations.append( "Critical errors present - immediate attention required" ) if len(patterns) > 3: recommendations.append( "Multiple error patterns detected - system instability likely" ) # Agent-specific recommendations top_error_agents = sorted( metrics.errors_by_agent.items(), key=lambda x: x[1], reverse=True )[:3] for agent_id, error_count in top_error_agents: if error_count > 10: recommendations.append( f"Agent {agent_id} has high error count " f"({error_count}) - review agent configuration" ) # Integration-specific recommendations integration_errors: Dict[str, int] = defaultdict(int) # Get last 1000 errors from deque recent_errors = ( list(self.error_history)[-1000:] if len(self.error_history) > 1000 else list(self.error_history) ) for error in recent_errors: if error.get("integration_name"): integration_errors[error["integration_name"]] += 1 for integration, count in integration_errors.items(): if count > 20: recommendations.append( f"Integration {integration} has high error count - " f"check service health" ) if not recommendations: recommendations.append("System health is good - continue monitoring") return recommendations
[docs] async def start_monitoring(self) -> None: """Start background monitoring tasks.""" if self._monitoring_task is None or self._monitoring_task.done(): self._monitoring_task = asyncio.create_task(self._monitoring_loop()) logger.info("Error monitoring started")
[docs] async def stop_monitoring(self) -> None: """Stop background monitoring tasks.""" if self._monitoring_task and not self._monitoring_task.done(): self._monitoring_task.cancel() try: await self._monitoring_task except asyncio.CancelledError: pass logger.info("Error monitoring stopped")
async def _monitoring_loop(self) -> None: """Background monitoring loop.""" while True: try: # Save metrics snapshot self.metrics_history.append( ErrorMetrics(**asdict(self.current_metrics)) ) # Cleanup old data self._cleanup_old_data() # Save to storage self._save_to_storage() # Sleep for next iteration await asyncio.sleep(300) # 5 minutes except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in monitoring loop: {e}") await asyncio.sleep(60) # Wait 1 minute on error def _cleanup_old_data(self) -> None: """Clean up old monitoring data.""" now = datetime.now(timezone.utc) # Clean old patterns (older than 7 days) cutoff_time = now - timedelta(days=7) old_patterns = [ pattern_id for pattern_id, pattern in self.detected_patterns.items() if pattern.last_seen < cutoff_time ] for pattern_id in old_patterns: del self.detected_patterns[pattern_id] # Clean old correlation groups (older than 24 hours) cutoff_time = now - timedelta(hours=24) old_groups = [ group_id for group_id, group in self.correlation_groups.items() if group.end_time and group.end_time < cutoff_time ] for group_id in old_groups: del self.correlation_groups[group_id] # Keep only last 1000 metrics if len(self.metrics_history) > 1000: self.metrics_history = self.metrics_history[-1000:]
# ============================================================================= # GLOBAL MONITOR INSTANCE # ============================================================================= # Global error monitor instance error_monitor = ErrorMonitor() # ============================================================================= # INTEGRATION HELPERS # =============================================================================
[docs] def setup_error_monitoring( storage_path: str = "logs/error_monitoring.json", enable_patterns: bool = True, alert_callback: Optional[Callable[[ErrorPattern], None]] = None, ) -> ErrorMonitor: """Set up global error monitoring.""" global error_monitor error_monitor = ErrorMonitor( storage_path=storage_path, pattern_detection_enabled=enable_patterns ) if alert_callback: error_monitor.add_alert_callback(alert_callback) return error_monitor
[docs] def record_error_for_monitoring(error: MarcusBaseError) -> None: """Record error in global monitor.""" error_monitor.record_error(error) # Emit error_occurred telemetry (Marcus #416, Stage 4 of #9). # Privacy contract: only the error CLASS NAME is shipped โ€” never # the message, never the stack trace. See docs/telemetry.md # ยง error_occurred. try: from src.telemetry.events import fire_error_occurred fire_error_occurred(error_type=type(error).__name__) except Exception: # noqa: BLE001 - never crash the error path pass
[docs] def get_error_health_status() -> Dict[str, Any]: """Get current error health status.""" return error_monitor.generate_health_report()