src.core.assignment_persistence module#

Assignment persistence layer for Marcus.

This module provides persistent storage for task assignments to prevent duplicate assignments across Marcus restarts and multiple instances.

class src.core.assignment_persistence.AssignmentPersistence[source]#

Bases: object

Handles persistent storage of task assignments.

__init__(storage_dir=None)[source]#

Initialize the assignment persistence layer.

Parameters:

storage_dir (Optional[Path]) –

Directory for storing assignment data.

Defaults to ./data/assignments/

property lock: Lock#

Get lock for the current event loop.

async save_assignment(worker_id, task_id, task_data)[source]#

Save a task assignment persistently.

Parameters:
  • worker_id (str) – ID of the worker assigned to the task.

  • task_id (str) – ID of the task being assigned.

  • task_data (Dict[str, Any]) – Additional task information to store.

Return type:

None

async remove_assignment(worker_id)[source]#

Remove a task assignment (e.g., when task is completed).

Parameters:

worker_id (str) – ID of the worker to remove assignment for.

Return type:

None

async get_assignment(worker_id)[source]#

Get the current assignment for a worker.

Parameters:

worker_id (str) – ID of the worker.

Return type:

Optional[Dict[str, Any]]

Returns:

Assignment data or None if no assignment exists

async get_all_assigned_task_ids()[source]#

Get all currently assigned task IDs.

Return type:

set[str]

Returns:

Set of task IDs that are currently assigned

async load_assignments()[source]#

Load assignments from persistent storage.

Return type:

Dict[str, Dict[str, Any]]

Returns:

Dictionary of worker_id -> assignment data

async is_task_assigned(task_id)[source]#

Check if a task is currently assigned to any worker.

Parameters:

task_id (str) – ID of the task to check.

Return type:

bool

Returns:

True if the task is assigned, False otherwise

async get_worker_for_task(task_id)[source]#

Get the worker ID assigned to a specific task.

Parameters:

task_id (str) – ID of the task.

Return type:

Optional[str]

Returns:

Worker ID or None if task is not assigned

async cleanup()[source]#

Clean up any resources and persist final state.

Return type:

None