Source code for src.core.events

"""
Event Distribution System for Marcus.

Simple event system that allows components to publish and subscribe to events
without complex dependencies. Events enable loose coupling between systems.
"""

import asyncio
import json
import logging
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from typing import Any, Callable, Dict, List, Optional

from src.core.resilience import with_fallback

logger = logging.getLogger(__name__)


[docs] @dataclass class Event: """Base event structure for all Marcus events.""" event_id: str timestamp: datetime event_type: str source: str data: Dict[str, Any] metadata: Optional[Dict[str, Any]] = None
[docs] def to_dict(self) -> Dict[str, Any]: """Convert event to dictionary.""" result = asdict(self) result["timestamp"] = self.timestamp.isoformat() return result
[docs] def to_json(self) -> str: """Convert event to JSON string.""" return json.dumps(self.to_dict())
[docs] class Events: """ Simple event distribution system for Marcus. Features: - Publish/subscribe pattern - Async event handling - Event history (optional) - Error isolation (one subscriber error doesn't affect others) - Optional persistence to disk """
[docs] def __init__(self, store_history: bool = False, persistence: Optional[Any] = None): """ Initialize the event system. Parameters ---------- store_history Whether to keep event history in memory. persistence Optional Persistence instance for storing events. """ self.subscribers: Dict[str, List[Callable[..., Any]]] = {} self.store_history = store_history self.history: List[Event] = [] self._event_counter = 0 self.persistence = persistence
[docs] def subscribe(self, event_type: str, handler: Callable[..., Any]) -> None: """ Subscribe to an event type. Parameters ---------- event_type Type of event to subscribe to ('*' for all events). handler Async function to call when event occurs. """ if event_type not in self.subscribers: self.subscribers[event_type] = [] self.subscribers[event_type].append(handler) logger.debug(f"Handler subscribed to {event_type} events")
[docs] def unsubscribe(self, event_type: str, handler: Callable[..., Any]) -> None: """ Unsubscribe from an event type. Parameters ---------- event_type Type of event to unsubscribe from. handler Handler function to remove. """ if event_type in self.subscribers: self.subscribers[event_type] = [ h for h in self.subscribers[event_type] if h != handler ]
[docs] async def publish( self, event_type: str, source: str, data: Dict[str, Any], metadata: Optional[Dict[str, Any]] = None, wait_for_handlers: bool = True, ) -> Event: """ Publish an event to all subscribers. Parameters ---------- event_type Type of event (e.g., 'task_assigned', 'progress_updated'). source Source of the event (e.g., 'marcus', 'agent_123'). data Event data. metadata Optional metadata (confidence scores, etc.). wait_for_handlers If False, handlers run asynchronously without waiting (default: True). Returns ------- The published Event object """ # Create event self._event_counter += 1 now = datetime.now(timezone.utc) event = Event( event_id=f"evt_{self._event_counter}_{now.timestamp()}", timestamp=now, event_type=event_type, source=source, data=data, metadata=metadata or {}, ) # Store in history if enabled if self.store_history: self.history.append(event) # Keep only last 1000 events to prevent memory issues if len(self.history) > 1000: self.history = self.history[-1000:] # Store to persistence if available (with graceful degradation) if self.persistence: await self._persist_event_safe(event) # Get handlers for this event type handlers = [] if event_type in self.subscribers: handlers.extend(self.subscribers[event_type]) if "*" in self.subscribers: # Universal subscribers handlers.extend(self.subscribers["*"]) # Call all handlers asynchronously if handlers: tasks = [] for handler in handlers: # Wrap in try/except to isolate errors async def safe_handler(h: Callable[..., Any], e: Event) -> None: try: await h(e) except Exception as err: logger.error(f"Error in event handler {h.__name__}: {err}") tasks.append(safe_handler(handler, event)) # Run all handlers - either wait or fire-and-forget if wait_for_handlers: # Current behavior: wait for all handlers to complete await asyncio.gather(*tasks) else: # New behavior: fire-and-forget for performance for task in tasks: asyncio.create_task(task) logger.debug(f"Published {event_type} event from {source}") return event
@with_fallback( lambda self, event: logger.warning(f"Event {event.event_id} not persisted") ) async def _persist_event_safe(self, event: Event) -> None: """Persist event with graceful degradation.""" if self.persistence: await self.persistence.store_event(event)
[docs] async def get_history( self, event_type: Optional[str] = None, source: Optional[str] = None, limit: int = 100, ) -> List[Event]: """ Get event history with optional filtering. Parameters ---------- event_type Filter by event type. source Filter by source. limit Maximum number of events to return. Returns ------- List of events matching the criteria """ # Try persistence first if available if self.persistence: try: result = await self.persistence.get_events( event_type=event_type, source=source, limit=limit ) if result is not None: return result # type: ignore[no-any-return] except Exception as e: logger.error(f"Failed to get events from persistence: {e}") # Fall back to in-memory history if not self.store_history: return [] filtered = self.history if event_type: filtered = [e for e in filtered if e.event_type == event_type] if source: filtered = [e for e in filtered if e.source == source] return filtered[-limit:]
[docs] def clear_history(self) -> None: """Clear event history.""" self.history = []
[docs] async def publish_nowait( self, event_type: str, source: str, data: Dict[str, Any], metadata: Optional[Dict[str, Any]] = None, ) -> Event: """ Publish an event without waiting for handlers (performance optimization). This is useful for non-critical events like logging or monitoring where you don't need to wait for handlers to complete. Parameters ---------- event_type Type of event. source Source of the event. data Event data. metadata Optional metadata. Returns ------- The published Event object """ return await self.publish( event_type, source, data, metadata, wait_for_handlers=False )
[docs] async def wait_for_event( self, event_type: str, timeout: Optional[float] = None ) -> Optional[Event]: """ Wait for a specific event type to occur. Parameters ---------- event_type Event type to wait for. timeout Maximum time to wait (seconds). Returns ------- The event if it occurs, None if timeout """ received_event = None event_received = asyncio.Event() async def capture_handler(event: Event) -> None: nonlocal received_event received_event = event event_received.set() # Subscribe temporarily self.subscribe(event_type, capture_handler) try: # Wait for event with timeout await asyncio.wait_for(event_received.wait(), timeout=timeout) return received_event except asyncio.TimeoutError: return None finally: # Clean up subscription self.unsubscribe(event_type, capture_handler)
# Common event types for Marcus
[docs] class EventTypes: """Standard event types used in Marcus.""" # Task events TASK_REQUESTED = "task_requested" TASK_ASSIGNED = "task_assigned" TASK_STARTED = "task_started" TASK_PROGRESS = "task_progress" TASK_COMPLETED = "task_completed" TASK_BLOCKED = "task_blocked" BLOCKER_RESOLVED = "blocker_resolved" # Agent events AGENT_REGISTERED = "agent_registered" AGENT_STATUS_CHANGED = "agent_status_changed" AGENT_SKILL_UPDATED = "agent_skill_updated" # Project events PROJECT_CREATED = "project_created" PROJECT_UPDATED = "project_updated" PROJECT_COMPLETED = "project_completed" # System events SYSTEM_STARTUP = "system_startup" SYSTEM_SHUTDOWN = "system_shutdown" KANBAN_CONNECTED = "kanban_connected" KANBAN_ERROR = "kanban_error" # Context events CONTEXT_UPDATED = "context_updated" DEPENDENCY_DETECTED = "dependency_detected" IMPLEMENTATION_FOUND = "implementation_found" # Decision events DECISION_LOGGED = "decision_logged" PATTERN_DETECTED = "pattern_detected" # Planning events (issue #449) # Emitted by NaturalLanguageProjectCreator after each decomposer # path returns, when MARCUS_OUTCOME_COVERAGE is on and outcomes # were extracted. Carries intent_fidelity_score plus coverage # maps so Cato can surface intent-fidelity telemetry alongside # the planning-phase swim lanes. PLANNING_INTENT_FIDELITY = "planning_intent_fidelity" # Memory events PREDICTION_MADE = "prediction_made" AGENT_LEARNED = "agent_learned" # Error events ERROR = "error" WARNING = "warning"