src.core.events module#

Event Distribution System for Marcus.

Simple event system that allows components to publish and subscribe to events without complex dependencies. Events enable loose coupling between systems.

class src.core.events.Event[source]#

Bases: object

Base event structure for all Marcus events.

event_id: str#
timestamp: datetime#
event_type: str#
source: str#
data: Dict[str, Any]#
metadata: Dict[str, Any] | None = None#
to_dict()[source]#

Convert event to dictionary.

Return type:

Dict[str, Any]

to_json()[source]#

Convert event to JSON string.

Return type:

str

__init__(event_id, timestamp, event_type, source, data, metadata=None)#
Parameters:
Return type:

None

class src.core.events.Events[source]#

Bases: object

Simple event distribution system for Marcus.

Features: - Publish/subscribe pattern - Async event handling - Event history (optional) - Error isolation (one subscriber error doesn’t affect others) - Optional persistence to disk

__init__(store_history=False, persistence=None)[source]#

Initialize the event system.

Parameters:
  • store_history (bool) – Whether to keep event history in memory.

  • persistence (Optional[Any]) – Optional Persistence instance for storing events.

subscribers: Dict[str, List[Callable[[...], Any]]]#
history: List[Event]#
subscribe(event_type, handler)[source]#

Subscribe to an event type.

Parameters:
  • event_type (str) – Type of event to subscribe to (‘*’ for all events).

  • handler (Callable[..., Any]) – Async function to call when event occurs.

Return type:

None

unsubscribe(event_type, handler)[source]#

Unsubscribe from an event type.

Parameters:
  • event_type (str) – Type of event to unsubscribe from.

  • handler (Callable[..., Any]) – Handler function to remove.

Return type:

None

async publish(event_type, source, data, metadata=None, wait_for_handlers=True)[source]#

Publish an event to all subscribers.

Parameters:
  • event_type (str) – Type of event (e.g., ‘task_assigned’, ‘progress_updated’).

  • source (str) – Source of the event (e.g., ‘marcus’, ‘agent_123’).

  • data (Dict[str, Any]) – Event data.

  • metadata (Optional[Dict[str, Any]]) – Optional metadata (confidence scores, etc.).

  • wait_for_handlers (bool) – If False, handlers run asynchronously without waiting (default: True).

Return type:

Event

Returns:

The published Event object

async get_history(event_type=None, source=None, limit=100)[source]#

Get event history with optional filtering.

Parameters:
  • event_type (Optional[str]) – Filter by event type.

  • source (Optional[str]) – Filter by source.

  • limit (int) – Maximum number of events to return.

Return type:

List[Event]

Returns:

List of events matching the criteria

clear_history()[source]#

Clear event history.

Return type:

None

async publish_nowait(event_type, source, data, metadata=None)[source]#

Publish an event without waiting for handlers (performance optimization).

This is useful for non-critical events like logging or monitoring where you don’t need to wait for handlers to complete.

Parameters:
  • event_type (str) – Type of event.

  • source (str) – Source of the event.

  • data (Dict[str, Any]) – Event data.

  • metadata (Optional[Dict[str, Any]]) – Optional metadata.

Return type:

Event

Returns:

The published Event object

async wait_for_event(event_type, timeout=None)[source]#

Wait for a specific event type to occur.

Parameters:
  • event_type (str) – Event type to wait for.

  • timeout (Optional[float]) – Maximum time to wait (seconds).

Return type:

Optional[Event]

Returns:

The event if it occurs, None if timeout

class src.core.events.EventTypes[source]#

Bases: object

Standard event types used in Marcus.

TASK_REQUESTED = 'task_requested'#
TASK_ASSIGNED = 'task_assigned'#
TASK_STARTED = 'task_started'#
TASK_PROGRESS = 'task_progress'#
TASK_COMPLETED = 'task_completed'#
TASK_BLOCKED = 'task_blocked'#
BLOCKER_RESOLVED = 'blocker_resolved'#
AGENT_REGISTERED = 'agent_registered'#
AGENT_STATUS_CHANGED = 'agent_status_changed'#
AGENT_SKILL_UPDATED = 'agent_skill_updated'#
PROJECT_CREATED = 'project_created'#
PROJECT_UPDATED = 'project_updated'#
PROJECT_COMPLETED = 'project_completed'#
SYSTEM_STARTUP = 'system_startup'#
SYSTEM_SHUTDOWN = 'system_shutdown'#
KANBAN_CONNECTED = 'kanban_connected'#
KANBAN_ERROR = 'kanban_error'#
CONTEXT_UPDATED = 'context_updated'#
DEPENDENCY_DETECTED = 'dependency_detected'#
IMPLEMENTATION_FOUND = 'implementation_found'#
DECISION_LOGGED = 'decision_logged'#
PATTERN_DETECTED = 'pattern_detected'#
PLANNING_INTENT_FIDELITY = 'planning_intent_fidelity'#
PREDICTION_MADE = 'prediction_made'#
AGENT_LEARNED = 'agent_learned'#
ERROR = 'error'#
WARNING = 'warning'#