"""
Marcus Error Handling Framework.
Comprehensive error handling system designed for autonomous agent environments.
Provides structured error types, rich context, and intelligent recovery strategies.
"""
import logging
import traceback
import uuid
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Dict, Optional, Union
logger = logging.getLogger(__name__)
[docs]
class ErrorSeverity(Enum):
"""Error severity levels for prioritization and alerting."""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
[docs]
class ErrorCategory(Enum):
"""High-level error categories for monitoring and classification."""
TRANSIENT = "transient"
CONFIGURATION = "configuration"
BUSINESS_LOGIC = "business_logic"
INTEGRATION = "integration"
SECURITY = "security"
SYSTEM = "system"
[docs]
@dataclass
class ErrorContext:
"""Rich context information for error debugging and recovery."""
# Operation context
operation: str = ""
operation_id: str = field(default_factory=lambda: str(uuid.uuid4()))
# Agent context
agent_id: Optional[str] = None
task_id: Optional[str] = None
agent_state: Optional[Dict[str, Any]] = None
# System context
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
correlation_id: str = field(default_factory=lambda: str(uuid.uuid4()))
system_state: Optional[Dict[str, Any]] = None
# Integration context
integration_name: Optional[str] = None
integration_state: Optional[Dict[str, Any]] = None
# Additional context
user_context: Optional[Dict[str, Any]] = None
custom_context: Optional[Dict[str, Any]] = None
[docs]
class MarcusBaseError(Exception):
"""
Base exception class for Marcus-specific errors.
Provides rich context, categorization, and remediation suggestions
for autonomous agent error handling.
"""
[docs]
def __init__(
self,
message: str,
error_code: Optional[str] = None,
context: Optional[ErrorContext] = None,
remediation: Optional[Union[RemediationSuggestion, Dict[str, Any]]] = None,
severity: ErrorSeverity = ErrorSeverity.MEDIUM,
retryable: bool = False,
cause: Optional[Exception] = None,
):
super().__init__(message)
self.message = message
self.error_code = error_code or self.__class__.__name__.upper()
self.context = context or ErrorContext()
# Handle remediation as either RemediationSuggestion or dict
if isinstance(remediation, dict):
self.remediation = RemediationSuggestion(**remediation)
elif remediation is not None:
self.remediation = remediation
else:
self.remediation = RemediationSuggestion()
self.severity = severity
self.retryable = retryable
self.cause = cause
self.category = self._get_category()
# Capture stack trace for debugging
self.stack_trace = traceback.format_exc()
# Log error immediately
self._log_error()
def _get_category(self) -> ErrorCategory:
"""Determine error category based on exception type."""
# Override in subclasses for specific categorization
return ErrorCategory.SYSTEM
def _log_error(self) -> None:
"""Log error with appropriate level based on severity."""
log_data = {
"error_code": self.error_code,
"error_message": self.message, # Changed from 'message' to avoid conflict
"severity": self.severity.value,
"category": self.category.value,
"correlation_id": self.context.correlation_id,
"agent_id": self.context.agent_id,
"task_id": self.context.task_id,
"retryable": self.retryable,
}
if self.severity == ErrorSeverity.CRITICAL:
logger.critical("Critical Marcus error", extra=log_data)
elif self.severity == ErrorSeverity.HIGH:
logger.error("High severity Marcus error", extra=log_data)
elif self.severity == ErrorSeverity.MEDIUM:
logger.warning("Medium severity Marcus error", extra=log_data)
else:
logger.info("Low severity Marcus error", extra=log_data)
[docs]
def to_dict(self) -> Dict[str, Any]:
"""Convert error to dictionary for API responses."""
return {
"code": self.error_code,
"message": self.message,
"type": self.__class__.__name__,
"category": self.category.value,
"severity": self.severity.value,
"retryable": self.retryable,
"context": {
"operation": self.context.operation,
"operation_id": self.context.operation_id,
"agent_id": self.context.agent_id,
"task_id": self.context.task_id,
"correlation_id": self.context.correlation_id,
"timestamp": self.context.timestamp.isoformat(),
"integration_name": self.context.integration_name,
"custom_context": self.context.custom_context or {},
},
"remediation": {
"immediate": self.remediation.immediate_action,
"long_term": self.remediation.long_term_solution,
"fallback": self.remediation.fallback_strategy,
"retry": self.remediation.retry_strategy,
"escalation": self.remediation.escalation_path,
},
}
# =============================================================================
# TRANSIENT ERRORS (Auto-retryable)
# =============================================================================
[docs]
class TransientError(MarcusBaseError):
"""Base class for transient errors that can be automatically retried."""
[docs]
def __init__(self, *args: Any, **kwargs: Any) -> None:
kwargs.setdefault("retryable", True)
kwargs.setdefault("severity", ErrorSeverity.MEDIUM)
super().__init__(*args, **kwargs)
def _get_category(self) -> ErrorCategory:
return ErrorCategory.TRANSIENT
[docs]
class NetworkTimeoutError(TransientError):
"""Network operation timed out."""
[docs]
def __init__(
self,
service_name: str = "unknown",
timeout_seconds: int = 30,
*args: Any,
**kwargs: Any,
) -> None:
# Extract service_name from kwargs if not provided positionally
if "service_name" in kwargs:
service_name = kwargs.pop("service_name")
if "timeout_seconds" in kwargs:
timeout_seconds = kwargs.pop("timeout_seconds")
message = (
f"Network timeout connecting to {service_name} after {timeout_seconds}s"
)
kwargs.setdefault(
"remediation",
RemediationSuggestion(
immediate_action=f"Retry connection to {service_name}",
long_term_solution=(
"Check network connectivity and service availability"
),
retry_strategy="Exponential backoff with max 3 attempts",
),
)
super().__init__(message, *args, **kwargs)
[docs]
class ServiceUnavailableError(TransientError):
"""External service is temporarily unavailable."""
[docs]
def __init__(
self, service_name: str = "unknown", *args: Any, **kwargs: Any
) -> None:
message = f"Service {service_name} is temporarily unavailable"
kwargs.setdefault(
"remediation",
RemediationSuggestion(
immediate_action=f"Wait and retry {service_name} connection",
fallback_strategy="Use cached data or degraded functionality",
retry_strategy="Exponential backoff with circuit breaker",
),
)
super().__init__(message, *args, **kwargs)
[docs]
class RateLimitError(TransientError):
"""Rate limit exceeded for API calls."""
[docs]
def __init__(
self,
service_name: str = "unknown",
retry_after: int = 60,
*args: Any,
**kwargs: Any,
) -> None:
message = f"Rate limit exceeded for {service_name}. Retry after {retry_after}s"
kwargs.setdefault(
"remediation",
RemediationSuggestion(
immediate_action=f"Wait {retry_after} seconds before retrying",
long_term_solution="Implement request queuing and rate limiting",
retry_strategy=f"Wait {retry_after}s then retry",
),
)
super().__init__(message, *args, **kwargs)
[docs]
class TemporaryResourceError(TransientError):
"""Temporary resource unavailability (memory, disk, etc.)."""
[docs]
def __init__(
self, resource_type: str = "unknown", *args: Any, **kwargs: Any
) -> None:
message = f"Temporary {resource_type} resource unavailability"
kwargs.setdefault(
"remediation",
RemediationSuggestion(
immediate_action="Wait for resource to become available",
long_term_solution=f"Monitor and optimize {resource_type} usage",
retry_strategy="Exponential backoff with resource monitoring",
),
)
super().__init__(message, *args, **kwargs)
# =============================================================================
# CONFIGURATION ERRORS (User-fixable)
# =============================================================================
[docs]
class ConfigurationError(MarcusBaseError):
"""Base class for configuration-related errors."""
[docs]
def __init__(self, *args: Any, **kwargs: Any) -> None:
kwargs.setdefault("severity", ErrorSeverity.HIGH)
kwargs.setdefault("retryable", False)
super().__init__(*args, **kwargs)
def _get_category(self) -> ErrorCategory:
return ErrorCategory.CONFIGURATION
[docs]
class MissingCredentialsError(ConfigurationError):
"""Required API credentials are missing."""
[docs]
def __init__(
self,
service_name: str = "unknown",
credential_type: str = "API key",
*args: Any,
**kwargs: Any,
) -> None:
message = f"Missing {credential_type} for {service_name}"
kwargs.setdefault(
"remediation",
RemediationSuggestion(
immediate_action=f"Configure {credential_type} for {service_name}",
long_term_solution="Set up secure credential management",
escalation_path="Contact system administrator",
),
)
super().__init__(message, *args, **kwargs)
[docs]
class InvalidConfigurationError(ConfigurationError):
"""Configuration values are invalid."""
[docs]
def __init__(
self,
config_key: str = "unknown",
expected_format: str = "",
*args: Any,
**kwargs: Any,
) -> None:
message = f"Invalid configuration for {config_key}"
if expected_format:
message += f". Expected format: {expected_format}"
kwargs.setdefault(
"remediation",
RemediationSuggestion(
immediate_action=f"Correct configuration for {config_key}",
long_term_solution="Validate configuration on startup",
escalation_path="Review configuration documentation",
),
)
super().__init__(message, *args, **kwargs)
[docs]
class MissingDependencyError(ConfigurationError):
"""Required dependency is missing."""
[docs]
def __init__(
self, dependency_name: str = "unknown", *args: Any, **kwargs: Any
) -> None:
message = f"Missing required dependency: {dependency_name}"
kwargs.setdefault(
"remediation",
RemediationSuggestion(
immediate_action=f"Install {dependency_name}",
long_term_solution="Add dependency to requirements",
escalation_path="Contact development team",
),
)
super().__init__(message, *args, **kwargs)
[docs]
class EnvironmentError(ConfigurationError):
"""Environment setup is incorrect."""
[docs]
def __init__(
self, environment_issue: str = "unknown", *args: Any, **kwargs: Any
) -> None:
message = f"Environment setup issue: {environment_issue}"
kwargs.setdefault(
"remediation",
RemediationSuggestion(
immediate_action="Fix environment configuration",
long_term_solution="Automate environment setup",
escalation_path="Review deployment documentation",
),
)
super().__init__(message, *args, **kwargs)
# =============================================================================
# BUSINESS LOGIC ERRORS (User/Agent errors)
# =============================================================================
[docs]
class BusinessLogicError(MarcusBaseError):
"""Base class for business logic violations."""
[docs]
def __init__(self, *args: Any, **kwargs: Any) -> None:
kwargs.setdefault("severity", ErrorSeverity.MEDIUM)
kwargs.setdefault("retryable", False)
super().__init__(*args, **kwargs)
def _get_category(self) -> ErrorCategory:
return ErrorCategory.BUSINESS_LOGIC
[docs]
class TaskAssignmentError(BusinessLogicError):
"""Error in task assignment logic."""
[docs]
def __init__(
self,
task_id: str = "unknown",
agent_id: str = "unknown",
reason: str = "",
*args: Any,
**kwargs: Any,
) -> None:
message = f"Cannot assign task {task_id} to agent {agent_id}"
if reason:
message += f": {reason}"
kwargs.setdefault(
"remediation",
RemediationSuggestion(
immediate_action="Review task assignment criteria",
long_term_solution="Improve task assignment algorithm",
fallback_strategy="Assign to different agent or queue task",
),
)
super().__init__(message, *args, **kwargs)
[docs]
class WorkflowViolationError(BusinessLogicError):
"""Workflow rule violation."""
[docs]
def __init__(
self, workflow_rule: str = "unknown", *args: Any, **kwargs: Any
) -> None:
message = f"Workflow violation: {workflow_rule}"
kwargs.setdefault(
"remediation",
RemediationSuggestion(
immediate_action="Revert to valid workflow state",
long_term_solution="Strengthen workflow validation",
escalation_path="Review workflow rules",
),
)
super().__init__(message, *args, **kwargs)
[docs]
class ValidationError(BusinessLogicError):
"""Data validation failed."""
[docs]
def __init__(
self,
field_name: str = "unknown",
validation_rule: str = "",
*args: Any,
**kwargs: Any,
) -> None:
message = f"Validation failed for {field_name}"
if validation_rule:
message += f": {validation_rule}"
kwargs.setdefault(
"remediation",
RemediationSuggestion(
immediate_action=(
f"Correct {field_name} to meet validation requirements"
),
long_term_solution="Improve input validation",
escalation_path="Review validation rules",
),
)
super().__init__(message, *args, **kwargs)
[docs]
class StateConflictError(BusinessLogicError):
"""System state conflict detected."""
[docs]
def __init__(
self, conflict_description: str = "unknown", *args: Any, **kwargs: Any
) -> None:
message = f"State conflict: {conflict_description}"
kwargs.setdefault(
"remediation",
RemediationSuggestion(
immediate_action="Resolve state conflict",
long_term_solution="Implement conflict resolution strategy",
fallback_strategy="Reset to known good state",
),
)
super().__init__(message, *args, **kwargs)
[docs]
class TaskValidationError(BusinessLogicError):
"""Task implementation validation failed."""
[docs]
def __init__(
self,
task_id: str = "unknown",
task_name: str = "",
issues: list[str] | None = None,
*args: Any,
**kwargs: Any,
) -> None:
"""Initialize task validation error.
Parameters
----------
task_id : str
Task identifier
task_name : str
Human-readable task name
issues : list[str] | None
List of validation issues found
"""
message = f"Task validation failed for task {task_id}"
if task_name:
message += f" ({task_name})"
if issues:
message += f": {len(issues)} issue(s) found"
kwargs.setdefault(
"remediation",
RemediationSuggestion(
immediate_action="Fix validation issues and retry completion",
long_term_solution="Follow acceptance criteria before completion",
retry_strategy="Fix issues one at a time, revalidate after each fix",
),
)
super().__init__(message, *args, **kwargs)
[docs]
class ProjectRootNotFoundError(ConfigurationError):
"""Cannot determine project root directory."""
[docs]
def __init__(
self,
task_id: str = "unknown",
*args: Any,
**kwargs: Any,
) -> None:
"""Initialize project root not found error.
Parameters
----------
task_id : str
Task identifier
"""
message = (
f"Cannot determine project_root for task {task_id} - "
"no workspace config and no artifacts logged"
)
kwargs.setdefault(
"remediation",
RemediationSuggestion(
immediate_action=(
"Configure workspace or log artifacts with project_root"
),
long_term_solution="Ensure project workspace is configured",
escalation_path="Contact Marcus administrator",
),
)
super().__init__(message, *args, **kwargs)
# =============================================================================
# INTEGRATION ERRORS (External service issues)
# =============================================================================
[docs]
class IntegrationError(MarcusBaseError):
"""Base class for external integration errors."""
[docs]
def __init__(self, *args: Any, **kwargs: Any) -> None:
# Handle different initialization patterns
message = None
service_name = "unknown"
operation = "unknown"
# Check if first arg is a message string
if args and isinstance(args[0], str):
message = args[0]
args = args[1:]
# Extract service_name and operation from kwargs or positional args
if "service_name" in kwargs:
service_name = kwargs.pop("service_name")
elif args and isinstance(args[0], str):
service_name = args[0]
args = args[1:]
if "operation" in kwargs:
operation = kwargs.pop("operation")
elif args and isinstance(args[0], str):
operation = args[0]
args = args[1:]
# Store as attributes
self.service_name = service_name
self.operation = operation
# Create message if not provided
if not message:
message = f"Integration error with {service_name} during {operation}"
kwargs.setdefault("severity", ErrorSeverity.MEDIUM)
kwargs.setdefault("retryable", True)
super().__init__(message, *args, **kwargs)
def _get_category(self) -> ErrorCategory:
return ErrorCategory.INTEGRATION
[docs]
class KanbanIntegrationError(IntegrationError):
"""Kanban board integration error."""
[docs]
def __init__(
self,
board_name: str = "unknown",
operation: str = "unknown",
*args: Any,
**kwargs: Any,
) -> None:
message = f"Kanban integration error: {operation} failed for board {board_name}"
kwargs.setdefault(
"remediation",
RemediationSuggestion(
immediate_action="Retry Kanban operation",
fallback_strategy="Cache operation for later sync",
long_term_solution="Implement Kanban health monitoring",
),
)
super().__init__(message, *args, **kwargs)
[docs]
class AIProviderError(IntegrationError):
"""AI provider integration error."""
[docs]
def __init__(
self,
provider_name: str = "unknown",
operation: str = "unknown",
*args: Any,
**kwargs: Any,
) -> None:
message = f"AI provider error: {operation} failed for {provider_name}"
kwargs.setdefault(
"remediation",
RemediationSuggestion(
immediate_action="Retry with backoff",
fallback_strategy="Use fallback AI provider or cached results",
long_term_solution="Implement AI provider health checks",
),
)
# Pass service_name and operation explicitly to parent
kwargs["service_name"] = provider_name
kwargs["operation"] = operation
super().__init__(message, *args, **kwargs)
[docs]
class AuthenticationError(IntegrationError):
"""Authentication failed for external service."""
[docs]
def __init__(
self, service_name: str = "unknown", *args: Any, **kwargs: Any
) -> None:
message = f"Authentication failed for {service_name}"
kwargs.setdefault("severity", ErrorSeverity.HIGH)
kwargs.setdefault("retryable", False)
kwargs.setdefault(
"remediation",
RemediationSuggestion(
immediate_action="Check and refresh authentication credentials",
long_term_solution="Implement credential rotation",
escalation_path="Contact service provider",
),
)
super().__init__(message, *args, **kwargs)
[docs]
class ExternalServiceError(IntegrationError):
"""Generic external service error."""
[docs]
def __init__(
self,
service_name: str = "unknown",
error_details: str = "",
*args: Any,
**kwargs: Any,
) -> None:
message = f"External service error: {service_name}"
if error_details:
message += f" - {error_details}"
kwargs.setdefault(
"remediation",
RemediationSuggestion(
immediate_action="Retry service call",
fallback_strategy="Use cached data or degraded functionality",
long_term_solution="Implement service health monitoring",
),
)
super().__init__(message, *args, **kwargs)
# =============================================================================
# SECURITY ERRORS (Critical, non-retryable)
# =============================================================================
[docs]
class SecurityError(MarcusBaseError):
"""Base class for security-related errors."""
[docs]
def __init__(self, *args: Any, **kwargs: Any) -> None:
kwargs.setdefault("severity", ErrorSeverity.CRITICAL)
kwargs.setdefault("retryable", False)
super().__init__(*args, **kwargs)
def _get_category(self) -> ErrorCategory:
return ErrorCategory.SECURITY
[docs]
class AuthorizationError(SecurityError):
"""Authorization failed - insufficient permissions."""
[docs]
def __init__(
self,
resource: str = "unknown",
required_permission: str = "unknown",
*args: Any,
**kwargs: Any,
) -> None:
message = f"Authorization failed: insufficient permissions for {resource}"
kwargs.setdefault(
"remediation",
RemediationSuggestion(
immediate_action="Request appropriate permissions",
escalation_path="Contact system administrator",
long_term_solution="Review and update permission model",
),
)
super().__init__(message, *args, **kwargs)
[docs]
class WorkspaceSecurityError(SecurityError):
"""Workspace security violation."""
[docs]
def __init__(
self,
path: str = "unknown",
violation_type: str = "unknown",
*args: Any,
**kwargs: Any,
) -> None:
message = f"Workspace security violation: {violation_type} for path {path}"
kwargs.setdefault(
"remediation",
RemediationSuggestion(
immediate_action="Block access to restricted path",
escalation_path="Review workspace security policy",
long_term_solution="Strengthen workspace isolation",
),
)
super().__init__(message, *args, **kwargs)
[docs]
class PermissionError(SecurityError):
"""Permission denied for operation."""
[docs]
def __init__(
self,
operation: str = "unknown",
resource: str = "unknown",
*args: Any,
**kwargs: Any,
) -> None:
message = f"Permission denied: {operation} on {resource}"
kwargs.setdefault(
"remediation",
RemediationSuggestion(
immediate_action="Check operation permissions",
escalation_path="Request permission elevation",
long_term_solution="Review permission model",
),
)
super().__init__(message, *args, **kwargs)
# =============================================================================
# SYSTEM ERRORS (Critical system issues)
# =============================================================================
[docs]
class SystemError(MarcusBaseError):
"""Base class for critical system errors."""
[docs]
def __init__(self, *args: Any, **kwargs: Any) -> None:
kwargs.setdefault("severity", ErrorSeverity.CRITICAL)
kwargs.setdefault("retryable", False)
super().__init__(*args, **kwargs)
def _get_category(self) -> ErrorCategory:
return ErrorCategory.SYSTEM
[docs]
class ResourceExhaustionError(SystemError):
"""System resource exhaustion."""
[docs]
def __init__(
self,
resource_type: str = "unknown",
current_usage: str = "",
*args: Any,
**kwargs: Any,
) -> None:
message = f"Resource exhaustion: {resource_type}"
if current_usage:
message += f" (current usage: {current_usage})"
kwargs.setdefault(
"remediation",
RemediationSuggestion(
immediate_action="Free up resources or restart system",
long_term_solution="Implement resource monitoring and limits",
escalation_path="Contact system administrator",
),
)
super().__init__(message, *args, **kwargs)
[docs]
class CorruptedStateError(SystemError):
"""System state corruption detected."""
[docs]
def __init__(
self,
component: str = "unknown",
corruption_details: str = "",
*args: Any,
**kwargs: Any,
) -> None:
message = f"Corrupted state detected in {component}"
if corruption_details:
message += f": {corruption_details}"
kwargs.setdefault(
"remediation",
RemediationSuggestion(
immediate_action="Restore from backup or reinitialize",
long_term_solution="Implement state validation and backup",
escalation_path="Contact development team",
),
)
super().__init__(message, *args, **kwargs)
[docs]
class DatabaseError(SystemError):
"""Database operation failed."""
[docs]
def __init__(
self,
operation: str = "unknown",
table: str = "unknown",
*args: Any,
**kwargs: Any,
) -> None:
message = f"Database error: {operation} failed for table {table}"
kwargs.setdefault(
"remediation",
RemediationSuggestion(
immediate_action="Check database connectivity and integrity",
fallback_strategy="Use cached data or read-only mode",
escalation_path="Contact database administrator",
),
)
super().__init__(message, *args, **kwargs)
[docs]
class CriticalDependencyError(SystemError):
"""Critical system dependency failure."""
[docs]
def __init__(
self,
dependency_name: str = "unknown",
failure_reason: str = "",
*args: Any,
**kwargs: Any,
) -> None:
message = f"Critical dependency failure: {dependency_name}"
if failure_reason:
message += f" - {failure_reason}"
kwargs.setdefault(
"remediation",
RemediationSuggestion(
immediate_action="Restart dependency or system",
long_term_solution="Implement dependency health monitoring",
escalation_path="Contact system administrator",
),
)
super().__init__(message, *args, **kwargs)
# =============================================================================
# ERROR CONTEXT MANAGER
# =============================================================================
[docs]
@contextmanager
def error_context(
operation: str,
agent_id: Optional[str] = None,
task_id: Optional[str] = None,
integration_name: Optional[str] = None,
custom_context: Optional[Dict[str, Any]] = None,
) -> Any:
"""
Context manager for automatic error context injection.
Usage:
with error_context("create_task", agent_id="agent_123", task_id="task_456"):
# operations that might raise Marcus errors
pass
"""
context = ErrorContext(
operation=operation,
agent_id=agent_id,
task_id=task_id,
integration_name=integration_name,
custom_context=custom_context or {},
)
try:
yield context
except MarcusBaseError as e:
# Enhance existing Marcus errors with additional context
if not e.context.operation:
e.context.operation = context.operation
if not e.context.agent_id:
e.context.agent_id = context.agent_id
if not e.context.task_id:
e.context.task_id = context.task_id
if not e.context.integration_name:
e.context.integration_name = context.integration_name
raise
except Exception as e:
# Convert regular exceptions to Marcus errors with context
raise ExternalServiceError(
service_name="unknown", error_details=str(e), context=context, cause=e
)
# =============================================================================
# ERROR RESPONSE FORMATTER
# =============================================================================
# =============================================================================
# BACKWARD COMPATIBILITY
# =============================================================================
# Maintain backward compatibility with existing code
MarcusError = MarcusBaseError