Source code for src.core.assignment_reconciliation

"""
Assignment reconciliation for Marcus.

This module handles reconciling persisted assignments with the actual
kanban board state on startup or after connectivity issues.
"""

import logging
from datetime import datetime, timezone
from typing import Any, Dict, List, TypedDict

from src.core.assignment_persistence import AssignmentPersistence
from src.core.models import TaskStatus
from src.integrations.kanban_interface import KanbanInterface

logger = logging.getLogger(__name__)


[docs] class ReconciliationResults(TypedDict): """Type definition for reconciliation results.""" assignments_verified: int assignments_restored: int assignments_removed: int orphaned_tasks: List[Dict[str, Any]] errors: List[str]
[docs] class AssignmentHealth(TypedDict, total=False): """Type definition for assignment health status.""" persisted_count: int kanban_assigned_count: int mismatches: List[Dict[str, Any]] healthy: bool error: str # Optional field when there's an error
[docs] class AssignmentReconciler: """Reconciles persisted assignments with kanban board state."""
[docs] def __init__( self, persistence: AssignmentPersistence, kanban_client: KanbanInterface ): """ Initialize the reconciler. Parameters ---------- persistence Assignment persistence layer. kanban_client Kanban board interface. """ self.persistence = persistence self.kanban_client = kanban_client
[docs] async def reconcile_assignments(self) -> ReconciliationResults: """ Reconcile persisted assignments with kanban board state. Returns ------- Dictionary with reconciliation results """ results: ReconciliationResults = { "assignments_verified": 0, "assignments_restored": 0, "assignments_removed": 0, "orphaned_tasks": [], "errors": [], } try: # Get all persisted assignments persisted = await self.persistence.load_assignments() # Get all tasks from kanban all_tasks = await self.kanban_client.get_all_tasks() task_map = {task.id: task for task in all_tasks} # Check each persisted assignment for worker_id, assignment in list(persisted.items()): task_id = assignment["task_id"] if task_id not in task_map: # Task no longer exists in kanban logger.warning( f"Task {task_id} no longer exists, removing assignment" ) await self.persistence.remove_assignment(worker_id) results["assignments_removed"] += 1 continue task = task_map[task_id] # Check if task is still assigned to this worker if ( task.status == TaskStatus.IN_PROGRESS and task.assigned_to == worker_id ): # Assignment is valid results["assignments_verified"] += 1 elif task.status == TaskStatus.DONE: # Task was completed, remove assignment logger.info(f"Task {task_id} is completed, removing assignment") await self.persistence.remove_assignment(worker_id) results["assignments_removed"] += 1 elif task.status == TaskStatus.TODO: # Task was reset to TODO, remove assignment logger.warning( f"Task {task_id} is back in TODO, removing assignment" ) await self.persistence.remove_assignment(worker_id) results["assignments_removed"] += 1 else: # Task is assigned to someone else or in different state logger.warning( f"Task {task_id} state mismatch - " f"status: {task.status}, assigned_to: {task.assigned_to}" ) await self.persistence.remove_assignment(worker_id) results["assignments_removed"] += 1 # Find orphaned IN_PROGRESS tasks (assigned in kanban but not persisted) persisted_task_ids = {a["task_id"] for a in list(persisted.values())} for task in all_tasks: if ( task.status == TaskStatus.IN_PROGRESS and task.assigned_to and task.id not in persisted_task_ids ): # This task is assigned in kanban but not persisted logger.info( f"Found orphaned task {task.id} assigned to {task.assigned_to}" ) results["orphaned_tasks"].append( { "task_id": task.id, "task_name": task.name, "assigned_to": task.assigned_to, } ) # Restore to persistence await self.persistence.save_assignment( task.assigned_to, task.id, { "name": task.name, "priority": ( task.priority.value if task.priority else "medium" ), "estimated_hours": task.estimated_hours, "restored_at": datetime.now(timezone.utc).isoformat(), }, ) results["assignments_restored"] += 1 except Exception as e: logger.error(f"Error during reconciliation: {e}") results["errors"].append(str(e)) return results
[docs] async def get_assignment_health(self) -> AssignmentHealth: """ Get health status of assignment tracking. Returns ------- Dictionary with health metrics """ health: AssignmentHealth = { "persisted_count": 0, "kanban_assigned_count": 0, "mismatches": [], "healthy": True, } try: # Get persisted assignments persisted = await self.persistence.load_assignments() health["persisted_count"] = len(persisted) # Get kanban assignments all_tasks = await self.kanban_client.get_all_tasks() kanban_assigned = [ t for t in all_tasks if t.status == TaskStatus.IN_PROGRESS and t.assigned_to ] health["kanban_assigned_count"] = len(kanban_assigned) # Check for mismatches persisted_task_ids = {a["task_id"] for a in list(persisted.values())} kanban_task_ids = {t.id for t in kanban_assigned} # Tasks in persistence but not kanban only_persisted = persisted_task_ids - kanban_task_ids if only_persisted: health["mismatches"].append( {"type": "only_in_persistence", "task_ids": list(only_persisted)} ) # Tasks in kanban but not persistence only_kanban = kanban_task_ids - persisted_task_ids if only_kanban: health["mismatches"].append( {"type": "only_in_kanban", "task_ids": list(only_kanban)} ) health["healthy"] = len(health["mismatches"]) == 0 except Exception as e: health["healthy"] = False health["error"] = str(e) return health