Source code for src.monitoring.assignment_monitor

"""
Continuous monitoring for task assignment state changes.

This module monitors for task state reversions and handles them appropriately
to prevent workers from being stuck with reverted tasks.
"""

import asyncio
import logging
from datetime import datetime, timezone
from typing import Any, Dict, Optional

from src.core.assignment_persistence import AssignmentPersistence
from src.core.assignment_reconciliation import AssignmentReconciler
from src.core.models import Task, TaskStatus
from src.integrations.kanban_interface import KanbanInterface

logger = logging.getLogger(__name__)


[docs] class AssignmentMonitor: """Monitors task assignments for state reversions and inconsistencies."""
[docs] def __init__( self, persistence: AssignmentPersistence, kanban_client: KanbanInterface, check_interval: int = 30, # seconds ): """ Initialize the assignment monitor. Parameters ---------- persistence : AssignmentPersistence Assignment persistence layer kanban_client : KanbanInterface Kanban board interface check_interval : int How often to check for reversions (seconds) """ self.persistence = persistence self.kanban_client = kanban_client self.reconciler = AssignmentReconciler(persistence, kanban_client) self.check_interval = check_interval self._running = False self._monitor_task: Optional[asyncio.Task[None]] = None # Track task states to detect changes self._last_known_states: Dict[str, TaskStatus] = {} self._reversion_count: Dict[str, int] = ( {} ) # Track how many times a task reverted
[docs] async def start(self) -> None: """Start monitoring for assignment reversions.""" if self._running: logger.warning("Assignment monitor already running") return self._running = True self._monitor_task = asyncio.create_task(self._monitor_loop()) logger.info(f"Assignment monitor started (interval: {self.check_interval}s)")
[docs] async def stop(self) -> None: """Stop the assignment monitor.""" self._running = False if self._monitor_task: self._monitor_task.cancel() try: await self._monitor_task except asyncio.CancelledError: pass logger.info("Assignment monitor stopped")
async def _monitor_loop(self) -> None: """Run the main monitoring loop.""" while self._running: try: await self._check_for_reversions() await asyncio.sleep(self.check_interval) except Exception as e: logger.error(f"Error in assignment monitor: {e}") await asyncio.sleep(self.check_interval) async def _check_for_reversions(self) -> None: """Check for task state reversions.""" try: # Get current assignments from persistence assignments = await self.persistence.load_assignments() # Get current task states from kanban try: all_tasks = await self.kanban_client.get_all_tasks() except AttributeError as e: # Fallback: if get_all_tasks is not available, use available tasks only logger.warning( f"get_all_tasks not available on {type(self.kanban_client)}: {e}" ) logger.warning( "Using get_available_tasks as fallback - " "health check will be limited" ) all_tasks = await self.kanban_client.get_available_tasks() task_map = {task.id: task for task in all_tasks} reversions_detected = [] for worker_id, assignment in list(assignments.items()): task_id = assignment["task_id"] # Check if task still exists if task_id not in task_map: logger.warning(f"Task {task_id} no longer exists in kanban") await self._handle_missing_task(worker_id, task_id) continue task = task_map[task_id] # Check for state reversion if await self._detect_reversion(task, worker_id): reversions_detected.append( { "task_id": task_id, "worker_id": worker_id, "current_status": task.status, "assigned_to": task.assigned_to, } ) # Update last known state self._last_known_states[task_id] = task.status # Handle detected reversions if reversions_detected: logger.warning(f"Detected {len(reversions_detected)} task reversions") for reversion in reversions_detected: await self._handle_reversion(reversion) except Exception as e: logger.error(f"Error checking for reversions: {e}") async def _detect_reversion(self, task: Task, worker_id: str) -> bool: """ Detect if a task has been reverted. Returns ------- bool True if the task was reverted, False otherwise """ task_id = task.id # Case 1: Task went back to TODO if task.status == TaskStatus.TODO: logger.info(f"Task {task_id} reverted to TODO status") return True # Case 2: Task is IN_PROGRESS but assigned to different worker if task.status == TaskStatus.IN_PROGRESS and task.assigned_to != worker_id: logger.info( f"Task {task_id} reassigned from {worker_id} to {task.assigned_to}" ) return True # Case 3: Task completed by someone else if task.status == TaskStatus.DONE and task.assigned_to != worker_id: logger.info( f"Task {task_id} completed by {task.assigned_to} instead of {worker_id}" ) return True # Case 4: Task blocked but no longer assigned if task.status == TaskStatus.BLOCKED and not task.assigned_to: logger.info(f"Task {task_id} blocked and unassigned") return True return False async def _handle_reversion(self, reversion: Dict[str, Any]) -> None: """Handle a detected task reversion.""" task_id = reversion["task_id"] worker_id = reversion["worker_id"] # Track reversion count self._reversion_count[task_id] = self._reversion_count.get(task_id, 0) + 1 # Remove assignment from persistence await self.persistence.remove_assignment(worker_id) # Log the reversion logger.warning( f"Handled reversion: Task {task_id} for worker {worker_id} " f"(status: {reversion['current_status']}, " f"assigned_to: {reversion['assigned_to']}, " f"reversion_count: {self._reversion_count[task_id]})" ) # If task reverts too many times, flag it if self._reversion_count[task_id] >= 3: logger.error( f"Task {task_id} has reverted {self._reversion_count[task_id]} times! " "This task may have issues." ) async def _handle_missing_task(self, worker_id: str, task_id: str) -> None: """Handle a task that no longer exists.""" # Remove assignment await self.persistence.remove_assignment(worker_id) logger.warning( f"Removed assignment for missing task {task_id} from worker {worker_id}" )
[docs] async def force_reconciliation(self) -> Dict[str, Any]: """Force a full reconciliation check.""" logger.info("Forcing assignment reconciliation") results = await self.reconciler.reconcile_assignments() logger.info(f"Reconciliation results: {results}") # Convert ReconciliationResults to dict format return { "status": "completed", "fixed_assignments": getattr(results, "fixed_assignments", 0), "removed_assignments": getattr(results, "removed_assignments", 0), "errors": getattr(results, "errors", []), "details": str(results), }
[docs] def get_monitoring_stats(self) -> Dict[str, Any]: """Get current monitoring statistics.""" return { "monitoring": self._running, "check_interval": self.check_interval, "tracked_tasks": len(self._last_known_states), "reversion_counts": dict(self._reversion_count), "last_check": datetime.now(timezone.utc).isoformat(), }
[docs] class AssignmentHealthChecker: """Performs periodic health checks on assignment system."""
[docs] def __init__( self, persistence: AssignmentPersistence, kanban_client: KanbanInterface, monitor: AssignmentMonitor, ): self.persistence = persistence self.kanban_client = kanban_client self.monitor = monitor self.reconciler = AssignmentReconciler(persistence, kanban_client)
[docs] async def check_assignment_health(self) -> Dict[str, Any]: """ Comprehensive health check of assignment system. Returns ------- Dict[str, Any] Dictionary with health status and any issues found """ health: Dict[str, Any] = { "healthy": True, "issues": [], "metrics": {}, "timestamp": datetime.now(timezone.utc).isoformat(), } try: # Check persistence health persisted = await self.persistence.load_assignments() health["metrics"]["persisted_assignments"] = len(persisted) # Check kanban state try: tasks = await self.kanban_client.get_all_tasks() except AttributeError as e: # Fallback: if get_all_tasks is not available, use available tasks only logger.warning( f"get_all_tasks not available on {type(self.kanban_client)}: {e}" ) logger.warning( "Using get_available_tasks as fallback - " "health check will be limited" ) tasks = await self.kanban_client.get_available_tasks() in_progress = [t for t in tasks if t.status == TaskStatus.IN_PROGRESS] health["metrics"]["in_progress_tasks"] = len(in_progress) # Check for mismatches persisted_task_ids = {a["task_id"] for a in list(persisted.values())} kanban_assigned_ids = {t.id for t in in_progress if t.assigned_to} # Tasks only in persistence orphaned_persisted = persisted_task_ids - kanban_assigned_ids if orphaned_persisted: health["healthy"] = False health["issues"].append( { "type": "orphaned_assignments", "description": ( f"{len(orphaned_persisted)} tasks in persistence " "but not assigned in kanban" ), "task_ids": list(orphaned_persisted), } ) # Tasks only in kanban orphaned_kanban = kanban_assigned_ids - persisted_task_ids if orphaned_kanban: health["healthy"] = False health["issues"].append( { "type": "untracked_assignments", "description": ( f"{len(orphaned_kanban)} tasks assigned in kanban " "but not tracked" ), "task_ids": list(orphaned_kanban), } ) # Check monitor status monitor_stats = self.monitor.get_monitoring_stats() health["metrics"]["monitor"] = monitor_stats if not monitor_stats["monitoring"]: health["issues"].append( { "type": "monitor_stopped", "description": "Assignment monitor is not running", "severity": "warning", } ) # Check for high reversion counts high_reversions = { task_id: count for task_id, count in monitor_stats["reversion_counts"].items() if count >= 3 } if high_reversions: health["issues"].append( { "type": "high_reversions", "description": "Tasks with high reversion counts", "tasks": high_reversions, "severity": "warning", } ) except Exception as e: health["healthy"] = False health["issues"].append( { "type": "check_error", "description": f"Error during health check: {str(e)}", "severity": "error", } ) return health