41. Assignment Monitor System#
Executive Summary#
The Assignment Monitor System is a real-time task assignment consistency checker that prevents workers from being stuck with reverted or reassigned tasks. It continuously monitors the state of task assignments between Marcusβs internal persistence layer and the Kanban board, detecting and resolving discrepancies to ensure agents always work on valid, properly assigned tasks.
System Architecture#
Core Components#
The Assignment Monitor consists of three primary components:
Assignment Monitor Architecture
βββ assignment_monitor.py (Core Monitoring + Health Analysis)
β βββ AssignmentMonitor (Main monitor class)
β β βββ Reversion Detection (State change detection)
β β βββ Monitoring Loop (Continuous checking)
β β βββ get_monitoring_stats() (Monitor health statistics)
β βββ AssignmentHealthChecker (Health assessment β same file)
β βββ check_assignment_health() (Comprehensive health check)
β βββ Orphan Detection (Persistence vs Kanban)
β βββ Issue Reporting (Health issues)
β βββ Metrics Collection (System metrics)
βββ assignment_reconciliation.py (Conflict Resolution)
βββ AssignmentReconciler (Sync resolution)
βββ State Synchronization (Board alignment)
βββ Cleanup Operations (Orphan removal)
Note:
AssignmentHealthCheckeris defined insrc/monitoring/assignment_monitor.pyβ the same file asAssignmentMonitor. There is no separateassignment_health_checker.pyfile.
Monitoring Flow#
Start Monitor
β
βΌ
Load Assignments βββββββ
β β
βΌ β
Get Board State β (Every 30 seconds)
β β
βΌ β
Compare States β
β β
βββ Match βββββββββ
β
βββ Mismatch
β
βΌ
Detect Type
β
βββ Task Reverted β Remove Assignment
βββ Task Reassigned β Update Records
βββ Task Missing β Clean Up
βββ Task Blocked β Handle Appropriately
Core Functionality#
1. Reversion Detection#
The monitor detects several types of task state reversions:
async def _detect_reversion(self, task: Task, worker_id: str) -> bool:
"""
Detect if a task has been reverted from its expected state.
Detection Cases:
1. Task reverted to TODO status
2. Task reassigned to different worker
3. Task completed by someone else
4. Task blocked and unassigned
"""
task_id = task.id
# Case 1: Task went back to TODO
if task.status == TaskStatus.TODO:
logger.info(f"Task {task_id} reverted to TODO status")
return True
# Case 2: Task is IN_PROGRESS but assigned to different worker
if task.status == TaskStatus.IN_PROGRESS and task.assigned_to != worker_id:
logger.info(
f"Task {task_id} reassigned from {worker_id} to {task.assigned_to}"
)
return True
# Case 3: Task completed by someone else
if task.status == TaskStatus.DONE and task.assigned_to != worker_id:
logger.info(
f"Task {task_id} completed by {task.assigned_to} instead of {worker_id}"
)
return True
# Case 4: Task blocked but no longer assigned
if task.status == TaskStatus.BLOCKED and not task.assigned_to:
logger.info(f"Task {task_id} blocked and unassigned")
return True
return False
2. Monitoring Loop#
Continuous background monitoring with configurable intervals:
async def _monitor_loop(self):
"""Main monitoring loop."""
while self._running:
try:
await self._check_for_reversions()
await asyncio.sleep(self.check_interval) # Default: 30 seconds
except Exception as e:
logger.error(f"Error in assignment monitor: {e}")
await asyncio.sleep(self.check_interval)
3. Health Checking#
Comprehensive health assessment of the assignment system. AssignmentHealthChecker is defined in src/monitoring/assignment_monitor.py (same file as AssignmentMonitor, not a separate file). check_assignment_health() is a method on AssignmentHealthChecker; get_monitoring_stats() is a method on AssignmentMonitor.
class AssignmentHealthChecker:
async def check_assignment_health(self) -> Dict:
"""
Performs comprehensive health check comparing:
- Persisted assignments vs Kanban board state
- Monitor operational status
- Reversion frequency tracking
- Orphaned assignments detection
"""
health = {
"healthy": True,
"issues": [],
"metrics": {},
"timestamp": datetime.now().isoformat()
}
# Check for orphaned assignments
persisted_task_ids = {a["task_id"] for a in persisted.values()}
kanban_assigned_ids = {t.id for t in in_progress if t.assigned_to}
# Tasks only in persistence (orphaned)
orphaned_persisted = persisted_task_ids - kanban_assigned_ids
if orphaned_persisted:
health["healthy"] = False
health["issues"].append({
"type": "orphaned_assignments",
"description": f"{len(orphaned_persisted)} tasks in persistence but not assigned in kanban",
"task_ids": list(orphaned_persisted)
})
Key Features#
1. Automatic Reversion Handling#
When a reversion is detected, the monitor automatically:
Removes the invalid assignment from persistence
Logs the reversion for tracking
Increments reversion counters for pattern detection
2. Reversion Tracking#
The system tracks how many times each task has reverted:
# Track reversion count
self._reversion_count[task_id] = self._reversion_count.get(task_id, 0) + 1
# Flag problematic tasks
if self._reversion_count[task_id] >= 3:
logger.error(
f"Task {task_id} has reverted {self._reversion_count[task_id]} times! "
"This task may have issues."
)
3. Fallback Handling#
The monitor gracefully handles different Kanban client implementations:
try:
all_tasks = await self.kanban_client.get_all_tasks()
except AttributeError as e:
# Fallback for clients without get_all_tasks
logger.warning(
f"get_all_tasks not available on {type(self.kanban_client)}: {e}"
)
logger.warning(
"Using get_available_tasks as fallback - health check will be limited"
)
all_tasks = await self.kanban_client.get_available_tasks()
Integration with Marcus Ecosystem#
Assignment Persistence Integration#
The monitor works closely with the AssignmentPersistence layer:
class AssignmentMonitor:
def __init__(
self,
persistence: AssignmentPersistence,
kanban_client: KanbanInterface,
check_interval: int = 30
):
self.persistence = persistence
self.kanban_client = kanban_client
self.reconciler = AssignmentReconciler(persistence, kanban_client)
Kanban Board Integration#
Monitors all task state changes on the Kanban board:
Fetches current board state every check interval
Compares with persisted assignments
Detects any discrepancies or state changes
Monitoring System Integration#
Provides statistics for the broader monitoring infrastructure:
def get_monitoring_stats(self) -> Dict:
"""Get current monitoring statistics."""
return {
"monitoring": self._running,
"check_interval": self.check_interval,
"tracked_tasks": len(self._last_known_states),
"reversion_counts": dict(self._reversion_count),
"last_check": datetime.now().isoformat()
}
Why This System is Necessary#
Problem Scenarios#
Manual Board Changes: Project managers might manually move tasks back to TODO
Reassignments: Tasks get reassigned to different agents through the UI
System Glitches: Network issues or race conditions cause state inconsistencies
Abandoned Tasks: Agents disconnect without properly releasing tasks
Solution Benefits#
Consistency: Ensures Marcusβs view matches the actual board state
Recovery: Automatically recovers from state reversions
Visibility: Tracks patterns of problematic tasks
Reliability: Prevents agents from working on invalid assignments
Configuration#
The Assignment Monitor supports configurable parameters:
AssignmentMonitor(
persistence=persistence,
kanban_client=kanban_client,
check_interval=30 # Check every 30 seconds
)
Tuning Guidelines#
check_interval: Balance between responsiveness and system load
Faster (10-20s): Quick reversion detection, higher load
Standard (30s): Good balance for most use cases
Slower (60s+): Lower load, delayed detection
Monitoring and Alerts#
Key Metrics Tracked#
Reversion Frequency: How often tasks revert state
Orphaned Assignments: Assignments without board tasks
Untracked Assignments: Board tasks without persistence
Monitor Health: Is the monitor running properly
Alert Conditions#
Task reverted 3+ times (indicates systemic issue)
Monitor stopped running
High number of orphaned assignments
Reconciliation failures
Future Enhancements#
Short-term Improvements#
Predictive Reversion Detection: Use patterns to predict likely reversions
Adaptive Check Intervals: Increase frequency during high activity
Enhanced Notifications: Direct agent notifications on reversions
Reversion Analytics: Detailed reporting on reversion patterns
Long-term Vision#
Machine Learning Integration: Learn reversion patterns per project type
Preventive Actions: Automatically prevent problematic assignments
Cross-Board Monitoring: Monitor multiple boards simultaneously
Integration with Lease System: Coordinate with assignment leases
Conclusion#
The Assignment Monitor System provides critical consistency checking between Marcusβs internal state and the actual Kanban board state. By continuously monitoring for reversions and automatically handling discrepancies, it ensures that agents always work on valid, properly assigned tasks. This real-time monitoring and automatic recovery mechanism is essential for maintaining system reliability in the face of manual interventions and system glitches.