Source code for src.core.error_strategies

"""
Marcus Error Handling Strategies.

Advanced error handling patterns for autonomous agent environments:
- Retry policies with exponential backoff
- Circuit breaker pattern for external services
- Fallback mechanisms for graceful degradation
- Error aggregation for batch operations

Status
------
AVAILABLE BUT NOT INTEGRATED into the MCP tool layer. These components
are tested in isolation but have zero imports from ``src/marcus_mcp/``.
The simpler ``src/core/resilience.py`` decorators (``@with_retry``,
``@with_fallback``) are what production code currently uses.

This module is the recommended target for future integration when
adding resilience to Kanban API calls, AI provider calls, or other
external service boundaries in the MCP layer.
"""

import asyncio
import logging
import secrets
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from enum import Enum
from functools import wraps
from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar

from .error_framework import (
    ErrorContext,
    ErrorSeverity,
    IntegrationError,
    MarcusBaseError,
    NetworkTimeoutError,
    TransientError,
)

logger = logging.getLogger(__name__)

T = TypeVar("T")


[docs] class CircuitBreakerState(Enum): """Circuit breaker states.""" CLOSED = "closed" # Normal operation OPEN = "open" # Failing, blocking requests HALF_OPEN = "half_open" # Testing if service recovered
[docs] class RetryPolicy(Enum): """Retry policy types.""" NONE = "none" FIXED_DELAY = "fixed_delay" EXPONENTIAL_BACKOFF = "exponential_backoff" LINEAR_BACKOFF = "linear_backoff" JITTERED_EXPONENTIAL = "jittered_exponential"
[docs] @dataclass class RetryConfig: """Configuration for retry behavior.""" max_attempts: int = 3 base_delay: float = 1.0 # seconds max_delay: float = 60.0 # seconds multiplier: float = 2.0 jitter: bool = True retry_on: tuple[type[Exception], ...] = (TransientError, IntegrationError) stop_on: tuple[ type[Exception], ... ] = () # Exceptions that stop retries (empty by default)
[docs] @dataclass class CircuitBreakerConfig: """Configuration for circuit breaker behavior.""" failure_threshold: int = 5 # Failures before opening success_threshold: int = 2 # Successes to close from half-open timeout: float = 60.0 # Seconds before trying half-open monitor_window: float = 300.0 # Seconds to track failures max_failures_per_window: int = 10
[docs] @dataclass class CircuitBreakerStatus: """Current status of a circuit breaker.""" state: CircuitBreakerState = CircuitBreakerState.CLOSED failure_count: int = 0 success_count: int = 0 last_failure_time: Optional[datetime] = None next_attempt_time: Optional[datetime] = None failure_history: List[datetime] = field(default_factory=list)
[docs] class CircuitBreaker: """ Circuit breaker implementation for external service calls. Prevents cascading failures by temporarily blocking calls to failing services. """
[docs] def __init__(self, name: str, config: Optional[CircuitBreakerConfig] = None): self.name = name self.config = config or CircuitBreakerConfig() self.state = CircuitBreakerStatus() self._circuit_lock: Optional[asyncio.Lock] = None
@property def lock(self) -> asyncio.Lock: """Get circuit breaker lock, creating it if needed in the current event loop.""" if self._circuit_lock is None: self._circuit_lock = asyncio.Lock() return self._circuit_lock
[docs] async def call(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: """Execute function with circuit breaker protection.""" async with self.lock: # Check if circuit should transition states await self._update_state() # Block if circuit is open if self.state.state == CircuitBreakerState.OPEN: raise IntegrationError( service_name=self.name, operation="circuit_breaker_open", context=ErrorContext(operation=f"circuit_breaker_{self.name}"), remediation={ "immediate_action": ( f"Wait for circuit breaker to close " f"(next attempt: {self.state.next_attempt_time})" ), "fallback_strategy": ("Use cached data or alternative service"), "long_term_solution": "Fix underlying service issues", }, severity=ErrorSeverity.MEDIUM, ) # Execute function try: if asyncio.iscoroutinefunction(func): result = await func(*args, **kwargs) else: result = func(*args, **kwargs) # Record success async with self.lock: await self._record_success() return result except Exception as e: # Record failure async with self.lock: await self._record_failure(e) raise
async def _update_state(self) -> None: """Update circuit breaker state based on current conditions.""" now = datetime.now(timezone.utc) if self.state.state == CircuitBreakerState.OPEN: # Check if timeout has passed to try half-open if self.state.next_attempt_time and now >= self.state.next_attempt_time: self.state.state = CircuitBreakerState.HALF_OPEN self.state.success_count = 0 logger.info(f"Circuit breaker {self.name} transitioning to HALF_OPEN") elif self.state.state == CircuitBreakerState.HALF_OPEN: # Check if enough successes to close if self.state.success_count >= self.config.success_threshold: self.state.state = CircuitBreakerState.CLOSED self.state.failure_count = 0 self.state.failure_history.clear() logger.info(f"Circuit breaker {self.name} transitioning to CLOSED") # Clean old failures from history cutoff_time = now - timedelta(seconds=self.config.monitor_window) self.state.failure_history = [ failure_time for failure_time in self.state.failure_history if failure_time > cutoff_time ] async def _record_success(self) -> None: """Record a successful operation.""" if self.state.state == CircuitBreakerState.HALF_OPEN: self.state.success_count += 1 # Check if we should transition to CLOSED if self.state.success_count >= self.config.success_threshold: self.state.state = CircuitBreakerState.CLOSED self.state.failure_count = 0 self.state.failure_history.clear() logger.info(f"Circuit breaker {self.name} transitioning to CLOSED") logger.debug(f"Circuit breaker {self.name} recorded success") async def _record_failure(self, exception: Exception) -> None: """Record a failed operation.""" now = datetime.now(timezone.utc) self.state.failure_count += 1 self.state.last_failure_time = now self.state.failure_history.append(now) # Check if should open circuit if ( self.state.state == CircuitBreakerState.CLOSED and self.state.failure_count >= self.config.failure_threshold ): self.state.state = CircuitBreakerState.OPEN self.state.next_attempt_time = now + timedelta(seconds=self.config.timeout) logger.warning( f"Circuit breaker {self.name} OPENED due to " f"{self.state.failure_count} failures" ) elif self.state.state == CircuitBreakerState.HALF_OPEN: # Failed while half-open, go back to open self.state.state = CircuitBreakerState.OPEN self.state.next_attempt_time = now + timedelta(seconds=self.config.timeout) self.state.success_count = 0 logger.warning(f"Circuit breaker {self.name} returned to OPEN state") logger.debug(f"Circuit breaker {self.name} recorded failure: {exception}")
[docs] class RetryHandler: """ Advanced retry handler with multiple backoff strategies. Supports exponential backoff, jitter, and configurable retry policies. """
[docs] def __init__(self, config: Optional[RetryConfig] = None): self.config = config or RetryConfig()
[docs] async def execute( self, func: Callable[..., Any], *args: Any, context: Optional[ErrorContext] = None, **kwargs: Any, ) -> Any: """Execute function with retry logic.""" last_exception = None context = context or ErrorContext() for attempt in range(self.config.max_attempts): try: if asyncio.iscoroutinefunction(func): return await func(*args, **kwargs) else: return func(*args, **kwargs) except Exception as e: last_exception = e # Check if we should retry this exception if not self._should_retry(e): raise e # Non-retryable errors are raised immediately # Check if we should stop retrying (max attempts reached) if self._should_stop_retry(e, attempt): break # Exit loop to wrap in IntegrationError # Calculate delay and wait if attempt < self.config.max_attempts - 1: delay = self._calculate_delay(attempt) logger.info( f"Retry attempt {attempt + 1}/{self.config.max_attempts} " f"after {delay:.2f}s for {context.operation}", extra={ "correlation_id": context.correlation_id, "attempt": attempt + 1, "delay": delay, "exception": str(e), }, ) await asyncio.sleep(delay) # All retries exhausted - wrap in IntegrationError to indicate retry failure service_name = "unknown" operation = context.operation or "unknown" # Extract service name from NetworkTimeoutError if available if isinstance(last_exception, NetworkTimeoutError) and hasattr( last_exception, "service_name" ): service_name = last_exception.service_name elif isinstance(last_exception, IntegrationError): service_name = last_exception.service_name operation = last_exception.operation raise IntegrationError( service_name=service_name, operation=operation, context=context, remediation={ "immediate_action": "Check service availability", "long_term_solution": "Implement better error handling", "retry_strategy": f"Already retried {self.config.max_attempts} times", }, cause=last_exception, )
def _should_retry(self, exception: Exception) -> bool: """Determine if exception should trigger a retry.""" # Don't retry if exception type is in stop_on list for stop_type in self.config.stop_on: if isinstance(exception, stop_type): return False # If retry_on is empty, retry on any exception not in stop_on if not self.config.retry_on: return True # Retry if exception type is in retry_on list for retry_type in self.config.retry_on: if isinstance(exception, retry_type): return True return False def _should_stop_retry(self, exception: Exception, attempt: int) -> bool: """Determine if we should stop retrying based on exception and attempt.""" # Stop if max attempts reached if attempt >= self.config.max_attempts - 1: return True # Stop for certain exception types if isinstance(exception, MarcusBaseError) and not exception.retryable: return True return False def _calculate_delay(self, attempt: int) -> float: """Calculate delay for next retry attempt.""" if self.config.max_attempts == 1: return 0 delay = self.config.base_delay * (self.config.multiplier**attempt) delay = min(delay, self.config.max_delay) # Add jitter to prevent thundering herd if self.config.jitter: secure_random = secrets.SystemRandom() jitter = secure_random.uniform(0, delay * 0.1) # 10% jitter delay += jitter return delay
[docs] class FallbackHandler: """ Handles fallback strategies for graceful degradation. Provides multiple fallback options when primary operations fail. """
[docs] def __init__(self, name: str): self.name = name self.fallback_functions: List[Tuple[int, Callable[..., Any]]] = [] self.cache: Dict[str, Any] = {}
[docs] def add_fallback(self, func: Callable[..., Any], priority: int = 0) -> None: """Add a fallback function with priority (lower = higher priority).""" self.fallback_functions.append((priority, func)) self.fallback_functions.sort(key=lambda x: x[0]) # Sort by priority
[docs] async def execute_with_fallback( self, primary_func: Callable[..., Any], *args: Any, cache_key: Optional[str] = None, context: Optional[ErrorContext] = None, **kwargs: Any, ) -> Any: """Execute primary function with fallback options.""" context = context or ErrorContext() # Try primary function try: if asyncio.iscoroutinefunction(primary_func): result = await primary_func(*args, **kwargs) else: result = primary_func(*args, **kwargs) # Cache successful result if cache_key: self.cache[cache_key] = result return result except Exception as primary_error: logger.warning( f"Primary function failed for {self.name}: {primary_error}", extra={"correlation_id": context.correlation_id}, ) # Try fallback functions for priority, fallback_func in self.fallback_functions: try: logger.info( f"Trying fallback (priority {priority}) for {self.name}", extra={"correlation_id": context.correlation_id}, ) if asyncio.iscoroutinefunction(fallback_func): result = await fallback_func(*args, **kwargs) else: result = fallback_func(*args, **kwargs) logger.info( f"Fallback succeeded for {self.name}", extra={"correlation_id": context.correlation_id}, ) return result except Exception as fallback_error: logger.warning( f"Fallback failed for {self.name}: {fallback_error}", extra={"correlation_id": context.correlation_id}, ) continue # Try cached result if cache_key and cache_key in self.cache: logger.info( f"Using cached result for {self.name}", extra={"correlation_id": context.correlation_id}, ) return self.cache[cache_key] # All fallbacks failed, enhance original error if isinstance(primary_error, MarcusBaseError): primary_error.remediation.fallback_strategy = ( "All fallback strategies exhausted" ) raise primary_error else: raise IntegrationError( service_name=self.name, operation=context.operation or "unknown", context=context, remediation={ "immediate_action": "All fallback strategies failed", "long_term_solution": "Improve fallback mechanisms", "escalation_path": "Contact system administrator", "fallback_strategy": "All fallback strategies exhausted", }, cause=primary_error, )
[docs] class ErrorAggregator: """ Aggregates errors from batch operations. Collects multiple errors and provides summary reporting. """
[docs] def __init__(self, operation_name: str): self.operation_name = operation_name self.errors: List[MarcusBaseError] = [] self.successes: int = 0 self.total_operations: int = 0
[docs] def add_success(self) -> None: """Record a successful operation.""" self.successes += 1 self.total_operations += 1
[docs] def add_error( self, error: Exception, item_context: Optional[Dict[str, Any]] = None ) -> None: """Add an error to the aggregation.""" self.total_operations += 1 if isinstance(error, MarcusBaseError): # Enhance existing Marcus error with batch context error.context.custom_context = error.context.custom_context or {} error.context.custom_context.update( { "batch_operation": self.operation_name, "item_context": item_context or {}, } ) self.errors.append(error) else: # Convert regular exception to Marcus error marcus_error = IntegrationError( service_name="batch_operation", operation=self.operation_name, context=ErrorContext( operation=self.operation_name, custom_context={ "batch_operation": self.operation_name, "item_context": item_context or {}, "original_error": str(error), }, ), cause=error, ) self.errors.append(marcus_error)
[docs] def get_summary(self) -> Dict[str, Any]: """Get summary of batch operation results.""" error_summary: Dict[str, List[Dict[str, Any]]] = {} for error in self.errors: error_type = error.__class__.__name__ if error_type not in error_summary: error_summary[error_type] = [] error_summary[error_type].append( { "message": error.message, "correlation_id": error.context.correlation_id, "item_context": ( error.context.custom_context.get("item_context", {}) if error.context.custom_context else {} ), } ) return { "operation": self.operation_name, "total_operations": self.total_operations, "successes": self.successes, "errors": len(self.errors), "success_rate": ( self.successes / self.total_operations if self.total_operations > 0 else 0 ), "error_summary": error_summary, }
[docs] def has_errors(self) -> bool: """Check if any errors were recorded.""" return len(self.errors) > 0
[docs] def get_critical_errors(self) -> List[MarcusBaseError]: """Get only critical errors from the batch.""" return [e for e in self.errors if e.severity == ErrorSeverity.CRITICAL]
[docs] def raise_if_critical(self) -> None: """Raise exception if any critical errors were encountered.""" critical_errors = self.get_critical_errors() if critical_errors: # Raise the first critical error with batch context critical_error = critical_errors[0] critical_error.context.custom_context = ( critical_error.context.custom_context or {} ) critical_error.context.custom_context.update( { "critical_errors_in_batch": len(critical_errors), "total_errors_in_batch": len(self.errors), "batch_summary": self.get_summary(), } ) raise critical_error
# ============================================================================= # DECORATORS FOR EASY USAGE # =============================================================================
[docs] def with_retry( config: Optional[RetryConfig] = None, ) -> Callable[[Callable[..., Any]], Callable[..., Any]]: """Add retry logic to functions.""" retry_config = config or RetryConfig() def decorator(func: Callable[..., Any]) -> Callable[..., Any]: @wraps(func) async def async_wrapper(*args: Any, **kwargs: Any) -> Any: retry_handler = RetryHandler(retry_config) context = ErrorContext(operation=func.__name__) return await retry_handler.execute(func, *args, context=context, **kwargs) @wraps(func) def sync_wrapper(*args: Any, **kwargs: Any) -> Any: return asyncio.run(async_wrapper(*args, **kwargs)) if asyncio.iscoroutinefunction(func): return async_wrapper else: return sync_wrapper return decorator
[docs] def with_circuit_breaker( name: str, config: Optional[CircuitBreakerConfig] = None ) -> Callable[[Callable[..., Any]], Callable[..., Any]]: """Add circuit breaker protection to functions.""" circuit_breaker = CircuitBreaker(name, config) def decorator(func: Callable[..., Any]) -> Callable[..., Any]: @wraps(func) async def async_wrapper(*args: Any, **kwargs: Any) -> Any: return await circuit_breaker.call(func, *args, **kwargs) @wraps(func) def sync_wrapper(*args: Any, **kwargs: Any) -> Any: return asyncio.run(async_wrapper(*args, **kwargs)) if asyncio.iscoroutinefunction(func): return async_wrapper else: return sync_wrapper return decorator
[docs] def with_fallback( *fallback_functions: Callable[..., Any] ) -> Callable[[Callable[..., Any]], Callable[..., Any]]: """Add fallback functions.""" def decorator(func: Callable[..., Any]) -> Callable[..., Any]: @wraps(func) async def async_wrapper(*args: Any, **kwargs: Any) -> Any: fallback_handler = FallbackHandler(func.__name__) # Add fallback functions for i, fallback_func in enumerate(fallback_functions): fallback_handler.add_fallback(fallback_func, priority=i) context = ErrorContext(operation=func.__name__) return await fallback_handler.execute_with_fallback( func, *args, context=context, **kwargs ) @wraps(func) def sync_wrapper(*args: Any, **kwargs: Any) -> Any: return asyncio.run(async_wrapper(*args, **kwargs)) if asyncio.iscoroutinefunction(func): return async_wrapper else: return sync_wrapper return decorator
# ============================================================================= # STRATEGY REGISTRY # =============================================================================
[docs] class ErrorStrategyRegistry: """Registry for managing error handling strategies across the application."""
[docs] def __init__(self) -> None: self.circuit_breakers: Dict[str, CircuitBreaker] = {} self.fallback_handlers: Dict[str, FallbackHandler] = {} self.retry_configs: Dict[str, RetryConfig] = {}
[docs] def get_circuit_breaker( self, name: str, config: Optional[CircuitBreakerConfig] = None ) -> CircuitBreaker: """Get or create a circuit breaker for a service.""" if name not in self.circuit_breakers: self.circuit_breakers[name] = CircuitBreaker(name, config) return self.circuit_breakers[name]
[docs] def get_fallback_handler(self, name: str) -> FallbackHandler: """Get or create a fallback handler.""" if name not in self.fallback_handlers: self.fallback_handlers[name] = FallbackHandler(name) return self.fallback_handlers[name]
[docs] def register_retry_config(self, operation: str, config: RetryConfig) -> None: """Register a retry configuration for an operation.""" self.retry_configs[operation] = config
[docs] def get_retry_config(self, operation: str) -> RetryConfig: """Get retry configuration for an operation.""" return self.retry_configs.get(operation, RetryConfig())
[docs] def get_health_status(self) -> Dict[str, Any]: """Get health status of all circuit breakers.""" status = {} for name, cb in self.circuit_breakers.items(): status[name] = { "state": cb.state.state.value, "failure_count": cb.state.failure_count, "last_failure": ( cb.state.last_failure_time.isoformat() if cb.state.last_failure_time else None ), "next_attempt": ( cb.state.next_attempt_time.isoformat() if cb.state.next_attempt_time else None ), } return status
# Global registry instance error_strategy_registry: ErrorStrategyRegistry = ErrorStrategyRegistry()