Source code for src.core.assignment_lease

"""
Assignment Lease System for automatic task recovery.

This module implements a lease-based assignment system where tasks are assigned
with time-limited leases that must be renewed through progress reports. Tasks
with expired leases are automatically returned to the TODO state for reassignment.

Key features:
- Automatic lease renewal on progress reports
- Configurable lease durations based on task complexity
- Escalation for tasks with excessive renewals
- Integration with assignment persistence
"""

import asyncio
import logging
import statistics
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Deque, Dict, List, Optional

from src.core.assignment_persistence import AssignmentPersistence
from src.core.event_loop_utils import EventLoopLockManager
from src.core.models import RecoveryInfo, Task, TaskStatus
from src.integrations.kanban_interface import KanbanInterface

logger = logging.getLogger(__name__)

# Merge-conflict-aware lease extension (dashboard-v73 fix).
#
# When an agent's lease is about to expire and the agent's worktree has
# unresolved git merge conflicts, Marcus grants a one-shot extension to
# give the agent more time to finish conflict resolution. The extension
# is bounded so a permanently-stuck agent cannot hold a lease forever.
#
# Truth-grounded: the worktree git state IS the source of truth — no
# new agent API surface, no agent self-declaration, agent cannot lie
# or forget. Same mechanism handles the mid-rebase case for free.
MAX_MERGE_CONFLICT_EXTENSIONS = 2  # max grants per lease (=10 min total)
MERGE_CONFLICT_EXTENSION_SECONDS = 300  # 5 min per grant
MERGE_CONFLICT_GIT_TIMEOUT_SECONDS = 10.0  # cap git status to bound the lease loop

# Git porcelain status prefixes that indicate unmerged paths. Any line
# whose first two characters match one of these is an unresolved merge:
#   UU = both modified         (most common — same line edited on both sides)
#   AA = both added            (same path created independently)
#   DD = both deleted          (sanity-edge — both branches removed it)
#   DU/UD = deleted vs modified  (one side removed, other modified)
#   AU/UA = added vs modified    (one side created, other modified pre-existing)
# See ``man git-status`` "Output → Short Format" for the full grammar.
_GIT_UNMERGED_PREFIXES = ("UU", "AA", "DD", "DU", "UD", "AU", "UA")


[docs] class LeaseStatus(Enum): """Status of an assignment lease.""" ACTIVE = "active" EXPIRING_SOON = "expiring_soon" # Less than 1 hour remaining EXPIRED = "expired" RENEWED = "renewed"
[docs] @dataclass class AssignmentLease: """Represents a time-limited assignment lease.""" task_id: str agent_id: str assigned_at: datetime lease_expires: datetime last_renewed: datetime renewal_count: int = 0 estimated_hours: float = 4.0 # From task estimation progress_percentage: int = 0 last_progress_message: str = "" grace_period_seconds: Optional[float] = None # Per-lease adaptive grace update_timestamps: list[datetime] = field(default_factory=list) # Number of merge-conflict extensions already granted to this # lease. Bounded by ``MAX_MERGE_CONFLICT_EXTENSIONS`` so a stuck # agent cannot hold the lease forever. merge_conflict_extensions: int = 0 @property def median_update_interval(self) -> Optional[float]: """Calculate median seconds between progress updates. Returns ------- Optional[float] Median interval in seconds, or None if fewer than 2 timestamps. """ if len(self.update_timestamps) < 2: return None sorted_ts = sorted(self.update_timestamps) intervals = [ (sorted_ts[i] - sorted_ts[i - 1]).total_seconds() for i in range(1, len(sorted_ts)) ] return statistics.median(intervals) @property def time_remaining(self) -> timedelta: """Calculate time remaining on lease.""" return self.lease_expires - datetime.now(timezone.utc) @property def is_expired(self) -> bool: """Check if lease has expired.""" return datetime.now(timezone.utc) > self.lease_expires @property def is_expiring_soon(self) -> bool: """Check if lease expires within 1 hour.""" return self.time_remaining < timedelta(hours=1) @property def status(self) -> LeaseStatus: """Get current lease status.""" if self.is_expired: return LeaseStatus.EXPIRED elif self.is_expiring_soon: return LeaseStatus.EXPIRING_SOON else: return LeaseStatus.ACTIVE
[docs] def calculate_renewal_duration( self, lease_manager: Optional["AssignmentLeaseManager"] = None ) -> timedelta: """ Calculate renewal duration based on progress and history. Parameters ---------- lease_manager Optional reference to lease manager for config. Returns ------- Renewal duration (adaptive based on multiple factors) """ base_hours = 4.0 # Adjust based on progress if self.progress_percentage > 75: # Near completion, shorter renewal base_hours = 2.0 elif self.progress_percentage > 50: base_hours = 3.0 elif self.progress_percentage < 25 and self.renewal_count > 2: # Low progress with multiple renewals - might be stuck base_hours = 2.0 # Adjust based on task complexity if self.estimated_hours > 8: # Complex task, allow more time base_hours *= 1.5 # Apply renewal decay if configured if lease_manager and hasattr(lease_manager, "renewal_decay_factor"): decay_factor = lease_manager.renewal_decay_factor**self.renewal_count base_hours *= decay_factor # Cap renewals for tasks that are taking too long if self.renewal_count > 5: base_hours = min(base_hours, 2.0) # Apply bounds if lease manager available if lease_manager: base_hours = max( lease_manager.min_lease_hours, min(lease_manager.max_lease_hours, base_hours), ) return timedelta(hours=base_hours)
def _ensure_timezone_aware(dt: datetime) -> datetime: """ Ensure a datetime is timezone-aware (UTC). Normalizes naive datetimes from old persistence data to UTC. This prevents TypeErrors when comparing with timezone-aware datetimes. Parameters ---------- dt : datetime Datetime to normalize (may be naive or aware) Returns ------- datetime Timezone-aware datetime in UTC """ if dt.tzinfo is None: # Naive datetime - assume it was meant to be UTC return dt.replace(tzinfo=timezone.utc) return dt
[docs] class AssignmentLeaseManager: """Manages assignment leases with automatic expiration and renewal."""
[docs] def __init__( self, kanban_client: KanbanInterface, assignment_persistence: AssignmentPersistence, default_lease_hours: float = 0.0667, # 240s (matches Phase 1) max_renewals: int = 10, warning_threshold_hours: float = 0.0167, # 1 min (was 1.0 hr) priority_multipliers: Optional[Dict[str, float]] = None, complexity_multipliers: Optional[Dict[str, float]] = None, grace_period_minutes: float = 1.0, # 60 seconds grace period renewal_decay_factor: float = 0.9, min_lease_hours: float = 0.05, # Minimum 180 seconds max_lease_hours: float = 0.1, # Maximum 360 seconds (Phase 3) stuck_task_threshold_renewals: int = 5, enable_adaptive_leases: bool = True, task_list: Optional[List[Task]] = None, silence_multiplier: float = 5.0, ): """ Initialize the lease manager. Parameters ---------- kanban_client Interface to kanban board. assignment_persistence Assignment persistence layer. default_lease_hours Default lease duration in hours. max_renewals Maximum allowed renewals before escalation. warning_threshold_hours Hours before expiry to warn. priority_multipliers Lease duration multipliers by priority. complexity_multipliers Lease duration multipliers by label/type. grace_period_minutes Grace period in minutes (float) after expiry before recovery. renewal_decay_factor Factor to reduce renewal duration over time. min_lease_hours Minimum allowed lease duration. max_lease_hours Maximum allowed lease duration. stuck_task_threshold_renewals Renewals before considering task stuck. enable_adaptive_leases Enable smart lease duration adjustments. task_list Optional reference to project tasks for recovery info updates. """ self.kanban_client = kanban_client self.assignment_persistence = assignment_persistence self.task_list = task_list if task_list is not None else [] self.default_lease_hours = default_lease_hours self.max_renewals = max_renewals self.warning_threshold_hours = warning_threshold_hours # Advanced configuration self.priority_multipliers = priority_multipliers or { "critical": 0.5, # Shorter leases for urgent tasks "high": 0.75, "medium": 1.0, "low": 1.5, } self.complexity_multipliers = complexity_multipliers or { "simple": 0.5, "complex": 1.5, "research": 2.0, "epic": 3.0, } self.grace_period_minutes = grace_period_minutes self.renewal_decay_factor = renewal_decay_factor self.min_lease_hours = min_lease_hours self.max_lease_hours = max_lease_hours self.stuck_task_threshold_renewals = stuck_task_threshold_renewals self.enable_adaptive_leases = enable_adaptive_leases self.silence_multiplier = silence_multiplier # Active leases tracked in memory self.active_leases: Dict[str, AssignmentLease] = {} # Observability counters — every increment indicates a latent # coordination bug. Surfaced via logs at WARNING level and # available for MLflow/metrics export. self.recoveries_skipped_terminal_status: int = 0 # Optional callback invoked after a successful recovery # Server sets this to clean up in-memory tracking structures # (agent_tasks, tasks_being_assigned) that lease manager # can't access directly. self.on_recovery_callback: Optional[Callable[[str, str], None]] = None # Track lease history for analysis (max 1000 entries to prevent memory leak) self.lease_history: Deque[Dict[str, Any]] = deque(maxlen=1000) # Lock manager for event loop safe operations self._lock_manager = EventLoopLockManager()
@property def lease_lock(self) -> asyncio.Lock: """Get lease lock for the current event loop.""" return self._lock_manager.get_lock()
[docs] def update_task_list(self, task_list: List[Task]) -> None: """ Update the task list reference. Called by MarcusServer when project_tasks is refreshed. Parameters ---------- task_list : List[Task] Updated list of project tasks """ self.task_list = task_list
[docs] async def create_lease( self, task_id: str, agent_id: str, task: Optional[Task] = None ) -> AssignmentLease: """ Create a new assignment lease. Parameters ---------- task_id ID of the task being assigned. agent_id ID of the agent receiving assignment. task Optional task object for additional context. Returns ------- Created assignment lease """ async with self.lease_lock: now = datetime.now(timezone.utc) # Calculate initial lease duration # Use progressive timeout phase-1 if in aggressive mode if self.default_lease_hours < 1.0: # Aggressive mode: Use phase-1 timeout for unproven agents lease_seconds, _ = self.calculate_adaptive_timeout( progress=0, update_count=0, has_recent_activity=False ) base_hours = lease_seconds / 3600 else: # Conservative mode: Use default or task-based estimation base_hours = self.default_lease_hours # Only use task-based estimation if default is conservative (> 1 hour) if task and self.enable_adaptive_leases and self.default_lease_hours > 1.0: # Use task estimation if available if task.estimated_hours: base_hours = task.estimated_hours # Apply priority multiplier if hasattr(task, "priority"): priority_mult = self.priority_multipliers.get( task.priority.value.lower(), 1.0 ) base_hours *= priority_mult # Apply complexity multiplier based on labels if hasattr(task, "labels"): for label in task.labels: label_lower = label.lower() if label_lower in self.complexity_multipliers: base_hours *= self.complexity_multipliers[label_lower] break # Use first matching complexity # Enforce min/max bounds base_hours = max( self.min_lease_hours, min(self.max_lease_hours, base_hours) ) initial_duration = timedelta(hours=base_hours) lease = AssignmentLease( task_id=task_id, agent_id=agent_id, assigned_at=now, lease_expires=now + initial_duration, last_renewed=now, estimated_hours=task.estimated_hours if task else base_hours, ) # Store lease self.active_leases[task_id] = lease # Update assignment persistence with lease info await self._persist_lease(lease) # Log lease creation logger.info( f"Created lease for task {task_id} to agent {agent_id} " f"(expires: {lease.lease_expires.isoformat()})" ) # Track in history self.lease_history.append( { "event": "lease_created", "task_id": task_id, "agent_id": agent_id, "timestamp": now.isoformat(), "expires": lease.lease_expires.isoformat(), } ) return lease
[docs] async def renew_lease( self, task_id: str, progress: int, message: str = "" ) -> Optional[AssignmentLease]: """ Renew an existing lease based on progress report. Uses progressive timeout strategy to adapt lease duration based on task progress and agent reliability. Parameters ---------- task_id ID of the task. progress Current progress percentage. message Progress message. Returns ------- Renewed lease or None if not found/expired """ async with self.lease_lock: lease = self.active_leases.get(task_id) if not lease: logger.warning(f"No active lease found for task {task_id}") return None if lease.is_expired: # Capture the progress value before giving up on the # renewal. The lease itself cannot be renewed — that # decision belongs to the monitor — but # ``lease.progress_percentage`` must reflect the # agent's latest self-reported value so that when the # monitor eventually recovers this lease, the # recovery context carries the agent's actual # progress, not a stale pre-expiry snapshot. # # Issue #342 (dashboard-v70 Epictetus audit): agent # reported 25% then 50%, but the 50% report arrived # after the lease silently expired. Without this # capture, the progress value is dropped on the # floor here and the recovering agent sees 25% (or # 0%) and rebuilds from scratch — dashboard-v70 # produced 341 lines of ghost source + 506 lines of # ghost tests this way. # # Guard with ``>`` so we never regress the snapshot # (e.g. if two out-of-order late reports arrive, the # higher value wins). if progress > lease.progress_percentage: old_progress = lease.progress_percentage lease.progress_percentage = progress if message: lease.last_progress_message = message logger.info( f"Captured late progress on expired lease " f"{task_id}: {old_progress}% → {progress}% " f"(lease still expired, recovery context " f"will use the updated snapshot)" ) logger.warning(f"Cannot renew expired lease for task {task_id}") return None # Update progress lease.progress_percentage = progress lease.last_progress_message = message # Track update timestamp for cadence-based recovery lease.update_timestamps.append(datetime.now(timezone.utc)) # Use progressive timeout if in aggressive mode (< 1 hour default) if self.default_lease_hours < 1.0: # Progressive timeout mode: calculate based on progress lease_seconds, grace_seconds = self.calculate_adaptive_timeout( progress=progress, update_count=lease.renewal_count + 1, # +1 for this renewal has_recent_activity=True, # Just reported progress ) renewal_duration = timedelta(seconds=lease_seconds) lease.grace_period_seconds = float(grace_seconds) logger.info( f"Progressive timeout for {task_id}: {lease_seconds}s " f"+ {grace_seconds}s grace " f"(progress={progress}%, updates={lease.renewal_count + 1})" ) else: # Conservative mode: use old calculation logic renewal_duration = lease.calculate_renewal_duration(self) # Renew lease lease.last_renewed = datetime.now(timezone.utc) lease.lease_expires = lease.last_renewed + renewal_duration lease.renewal_count += 1 # Check for excessive renewals if lease.renewal_count >= self.max_renewals: logger.warning( f"Task {task_id} has been renewed {lease.renewal_count} times. " f"Consider escalation or reassignment." ) # Update persistence await self._persist_lease(lease) # Log renewal logger.info( f"Renewed lease for task {task_id} " f"(progress: {progress}%, expires: {lease.lease_expires.isoformat()})" ) # Track in history self.lease_history.append( { "event": "lease_renewed", "task_id": task_id, "timestamp": datetime.now(timezone.utc).isoformat(), "progress": progress, "renewal_count": lease.renewal_count, "new_expiry": lease.lease_expires.isoformat(), } ) return lease
[docs] async def touch_lease(self, agent_id: str) -> bool: """ Extend an agent's lease without changing progress. Called on any MCP tool activity to prove the agent is alive. This is a lightweight alternative to renew_lease that doesn't require progress data or update cadence tracking. Parameters ---------- agent_id : str ID of the agent whose lease to extend. Returns ------- bool True if a lease was touched, False if no active lease found. """ async with self.lease_lock: # Find lease by agent_id lease = None for active_lease in self.active_leases.values(): if active_lease.agent_id == agent_id: lease = active_lease break if not lease or lease.is_expired: return False # Extend by the current phase timeout lease_seconds, _ = self.calculate_adaptive_timeout( progress=lease.progress_percentage, update_count=max(lease.renewal_count, 1), has_recent_activity=True, ) now = datetime.now(timezone.utc) lease.last_renewed = now lease.lease_expires = now + timedelta(seconds=lease_seconds) # Update timestamp for cadence tracking lease.update_timestamps.append(now) logger.debug( f"Touched lease for agent {agent_id} " f"(task {lease.task_id}, " f"expires: {lease.lease_expires.isoformat()})" ) return True
[docs] async def check_expired_leases(self) -> List[AssignmentLease]: """ Check for expired leases that need recovery. Two-phase to avoid holding ``lease_lock`` during git subprocess I/O (Codex P2 on PR #350). Holding the global lock during per-lease ``git status`` calls would serialize every concurrent ``renew_lease`` and ``touch_lease`` for the duration of the slowest probe, and could cause active agents' renewals to starve and look expired in the next cycle. Phase 1 (lock held, no I/O): Snapshot leases that have crossed the grace deadline. Phase 2 (lock released, may do I/O): For each candidate, try the merge-conflict extension. The extension helper does a git probe outside the lock and briefly re-acquires the lock only to mutate + persist the lease atomically. Before returning a lease as expired, the merge-conflict extension may grant up to ``MAX_MERGE_CONFLICT_EXTENSIONS`` extensions of ``MERGE_CONFLICT_EXTENSION_SECONDS`` each when the agent's worktree has unresolved git conflicts. See the constants at the top of this module for the rationale. Returns ------- List of expired leases (considering grace period) """ expired_leases: List[AssignmentLease] = [] now = datetime.now(timezone.utc) default_grace_delta = timedelta(minutes=self.grace_period_minutes) # Phase 1: collect candidates under lock, no I/O. candidates: List[tuple[AssignmentLease, datetime]] = [] async with self.lease_lock: for task_id, lease in list(self.active_leases.items()): if not lease.is_expired: continue # Use per-lease adaptive grace if set, else global default if lease.grace_period_seconds is not None: grace_delta = timedelta(seconds=lease.grace_period_seconds) else: grace_delta = default_grace_delta grace_deadline = lease.lease_expires + grace_delta if now > grace_deadline: candidates.append((lease, grace_deadline)) else: logger.debug( f"Lease expired but in grace period: task {task_id} " f"(expires fully at: {grace_deadline.isoformat()})" ) # Phase 2: probe + extend OUTSIDE the lock so concurrent # renew_lease/touch_lease calls aren't blocked on git I/O. # Last-chance check: is the agent stuck on a merge conflict? # dashboard-v73 demonstrated this case — agent was actively # resolving conflicts when the lease expired, work was lost # on recovery. for lease, grace_deadline in candidates: extended = await self._try_extend_for_merge_conflict(lease) if extended: continue # Re-check: a concurrent renew_lease may have advanced the # lease while the git probe ran. If the lease is no longer # expired, skip recovery — the agent is still active. if not lease.is_expired: continue expired_leases.append(lease) logger.info( f"Found expired lease: task {lease.task_id} " f"(expired: {lease.lease_expires.isoformat()}, " f"grace ended: {grace_deadline.isoformat()})" ) # Diagnostic: surface the gap between what is_expired counts and # what actually becomes a recovery candidate. When these disagree, # an expired lease is being silently filtered (grace, extension, # or a concurrent renew). Enable via MARCUS_DEBUG_LEASE=1. raw_expired = [tid for tid, ls in self.active_leases.items() if ls.is_expired] logger.debug( f"check_expired_leases: {len(raw_expired)} lease(s) is_expired=" f"{raw_expired}, {len(candidates)} past grace, " f"{len(expired_leases)} returned for recovery=" f"{[ls.task_id for ls in expired_leases]}" ) return expired_leases
def _resolve_worktree_path(self, lease: AssignmentLease) -> Optional[Path]: """ Resolve the worktree path for an agent's lease. Marcus's worktree convention (set by the experiment runner at ``dev-tools/experiments/runners/spawn_agents.py``) places agent worktrees at ``<experiment_dir>/worktrees/<agent_id>``, where ``experiment_dir`` is the parent of the project ``implementation`` directory. The kanban client exposes the project root via its workspace state. Defensive: any failure (no workspace state method, malformed state, missing or non-string project_root, missing worktree directory) returns None. The caller treats None as "not eligible for merge-conflict extension" and proceeds with normal recovery. Parameters ---------- lease : AssignmentLease The lease whose agent's worktree we want to find. Returns ------- Optional[Path] The worktree path if it exists on disk, or None for any unresolvable case (single-branch projects, non-experiment runs, mocked kanban clients in tests). """ load_state = getattr(self.kanban_client, "_load_workspace_state", None) if not callable(load_state): return None try: workspace_state = load_state() if not isinstance(workspace_state, dict): return None project_root = workspace_state.get("project_root") if not isinstance(project_root, str) or not project_root: return None # Marcus convention: worktrees live as siblings to implementation/ worktree = Path(project_root).parent / "worktrees" / lease.agent_id if not worktree.exists(): return None return worktree except Exception as e: logger.debug(f"Could not resolve worktree path for {lease.task_id}: {e}") return None async def _has_unresolved_conflicts(self, worktree: Path) -> bool: """ Check whether a worktree has unresolved merge conflicts. Runs ``git status --porcelain`` and looks for status codes that indicate unmerged paths (UU, AA, DD, DU, UD, AU, UA). Bounded by ``MERGE_CONFLICT_GIT_TIMEOUT_SECONDS`` so a wedged git process cannot stall the lease loop. Any subprocess error, timeout, or non-git worktree returns False — when in doubt, assume no conflict and let the lease expire normally. Parameters ---------- worktree : Path Worktree directory to inspect. Returns ------- bool True if at least one unmerged path is present. """ proc: Optional[asyncio.subprocess.Process] = None try: proc = await asyncio.create_subprocess_exec( "git", "-C", str(worktree), "status", "--porcelain", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, _ = await asyncio.wait_for( proc.communicate(), timeout=MERGE_CONFLICT_GIT_TIMEOUT_SECONDS, ) except asyncio.TimeoutError: logger.warning( f"git status timed out after " f"{MERGE_CONFLICT_GIT_TIMEOUT_SECONDS}s for {worktree}; " f"treating as no-conflict" ) # Best-effort cleanup; subprocess kill returns immediately try: proc.kill() await proc.wait() except ProcessLookupError: pass return False if proc.returncode != 0: return False except Exception as e: logger.debug(f"git status failed for {worktree}: {e}") return False for line in stdout.decode("utf-8", errors="replace").splitlines(): # Porcelain format: XY <path>. The XY is the first 2 chars. if len(line) >= 2 and line[:2] in _GIT_UNMERGED_PREFIXES: return True return False async def _try_extend_for_merge_conflict(self, lease: AssignmentLease) -> bool: """ Grant a merge-conflict extension if eligible. Called from ``check_expired_leases`` OUTSIDE the lease lock so the git subprocess probe doesn't serialize concurrent ``renew_lease``/``touch_lease`` calls (Codex P2 on PR #350). Phases: 1. Eligibility precheck (no lock, no I/O): cap, worktree path 2. Git probe (no lock, async subprocess I/O bounded by ``MERGE_CONFLICT_GIT_TIMEOUT_SECONDS``) 3. Apply extension under the lock, then persist outside it. The under-lock section re-verifies three conditions before mutating the lease: a. Lease is still in ``active_leases`` (not removed by a concurrent recovery path). b. Lease is still expired (guards against ``renew_lease`` succeeding while the git probe ran — if the agent renewed, skip the extension so we don't burn a conflict-extension slot on an active lease). c. Extension cap not yet reached. The expiry timestamp is taken at grant time inside the lock, not at the start of the scan, so each candidate receives the full intended extension window regardless of how long prior candidates' git probes took. On success, extends the lease by ``MERGE_CONFLICT_EXTENSION_SECONDS``, increments ``lease.merge_conflict_extensions``, and persists the lease so a service restart during the extension window does not reload the old expiry and trigger immediate recovery (Codex P1 on PR #350). Parameters ---------- lease : AssignmentLease The expired lease being considered for recovery. Returns ------- bool True if extension granted, False otherwise. """ # Phase 1: cheap eligibility checks (no lock, no I/O). if lease.merge_conflict_extensions >= MAX_MERGE_CONFLICT_EXTENSIONS: return False worktree = self._resolve_worktree_path(lease) if worktree is None: return False # Phase 2: git probe (no lock; bounded by timeout). if not await self._has_unresolved_conflicts(worktree): return False # Phase 3: apply the extension atomically under the lease lock. # Three defensive re-checks before mutating: # (a) lease still in active_leases — not removed by concurrent recovery # (b) lease still expired — guards against renew_lease winning the race # while the git probe ran; if the agent renewed successfully, skip # the extension so we don't burn a conflict-extension slot on an # active lease (Codex P1 review, PR #384) # (c) extension cap not yet reached (re-check under lock) # grant_time is taken here, not at scan start, so each candidate # receives the full intended extension regardless of prior probe # durations (Codex P2 review, PR #384). async with self.lease_lock: if lease.task_id not in self.active_leases: return False if not lease.is_expired: return False if lease.merge_conflict_extensions >= MAX_MERGE_CONFLICT_EXTENSIONS: return False grant_time = datetime.now(timezone.utc) extension = timedelta(seconds=MERGE_CONFLICT_EXTENSION_SECONDS) lease.lease_expires = grant_time + extension lease.last_renewed = grant_time lease.merge_conflict_extensions += 1 self.lease_history.append( { "event": "merge_conflict_extension", "task_id": lease.task_id, "agent_id": lease.agent_id, "timestamp": grant_time.isoformat(), "extension_count": lease.merge_conflict_extensions, "new_expiry": lease.lease_expires.isoformat(), "worktree": str(worktree), } ) # Persist OUTSIDE the lock — _persist_lease does I/O against # assignment_persistence and we already hold a consistent # in-memory snapshot of the lease state. await self._persist_lease(lease) logger.info( f"Granted merge-conflict extension for task {lease.task_id} " f"to agent {lease.agent_id} " f"(extension {lease.merge_conflict_extensions}/" f"{MAX_MERGE_CONFLICT_EXTENSIONS}, " f"new expiry: {lease.lease_expires.isoformat()}, " f"unresolved conflicts in {worktree})" ) return True def _find_task(self, task_id: str) -> Optional[Task]: """ Find a task by ID in the task list. Parameters ---------- task_id Task ID to find. Returns ------- Task object if found, None otherwise """ for task in self.task_list: if task.id == task_id: return task return None
[docs] async def recover_expired_lease(self, lease: AssignmentLease) -> bool: """ Recover a task with an expired lease. Implements dual-write pattern: 1. Updates task model with structured RecoveryInfo (source of truth) 2. Posts to Kanban comments for audit trail (observability) Parameters ---------- lease The expired lease to recover. Returns ------- True if recovery successful """ try: logger.info( f"Recovering task {lease.task_id} from agent {lease.agent_id} " f"(expired: {lease.lease_expires.isoformat()})" ) # Calculate time spent now = datetime.now(timezone.utc) time_spent = now - lease.assigned_at time_spent_minutes = time_spent.total_seconds() / 60 # Create structured recovery info # In worktree mode, each agent works on branch marcus/<agent_id> previous_branch = f"marcus/{lease.agent_id}" # ``lease.progress_percentage`` reflects the agent's # latest self-reported progress including late reports # that arrived after the lease silently expired. The # expired-lease progress capture in ``renew_lease`` # (Issue #342 fix) updates the snapshot even when the # lease itself cannot be renewed, so recovery context # now shows what the agent actually reached rather than # a pre-expiry snapshot. Dashboard-v70's ghost code was # caused by the pre-fix renew_lease path silently # dropping the 50% progress report when the lease had # already expired. # Emit lease_expired telemetry (Marcus #416, Stage 4 of # #9). Fires before the recovery handoff so we record # the expiry event even if the recovery path fails # downstream. ``recovery_attempted=True`` — Marcus is # putting the task back on the board now; whether the # next agent finishes it is observed separately via # ``task_completed`` (Kaia review 3 honesty fix). try: from src.telemetry.events import fire_lease_expired fire_lease_expired( task_held_minutes=int(time_spent_minutes), progress_pct_at_expiry=int(lease.progress_percentage), recovery_attempted=True, ) except Exception: # noqa: BLE001 - never crash recovery pass recovery_info = RecoveryInfo( recovered_at=now, recovered_from_agent=lease.agent_id, previous_progress=lease.progress_percentage, time_spent_minutes=time_spent_minutes, recovery_reason="lease_expired", previous_agent_branch=previous_branch, instructions=( f"⚠️ **RECOVERY ADDENDUM** - This task was recovered " f"from agent {lease.agent_id}\n\n" f"**FIRST: Pick up committed work from the previous " f"agent:**\n" f"```\n" f"git merge {previous_branch} --no-edit\n" f"```\n" f"This merges any commits the previous agent made " f"before they disconnected.\n\n" f"**Then check what was done:**\n" f"1. Run `git log {previous_branch}` to see their " f"commits\n" f"2. Check for artifacts or design documents\n" f"3. Previous agent reached " f"{lease.progress_percentage}%\n" f"4. **Continue from where they left off** - " f"don't restart from scratch\n\n" + ( "**Previous agent's last progress note:**\n" + "\n".join( f"> {line}" for line in lease.last_progress_message.strip().splitlines() ) + "\n\nUse this note to understand what was built" " and what remains. Wire any completed components" " into the entry point if they are not yet" " reachable.\n\n" if lease.last_progress_message.strip() else f"**Previous agent left no progress notes.**\n" f"Check `git log {previous_branch}` and `git " f"diff main {previous_branch}` to discover what " f"was committed. Do NOT re-implement files that " f"already exist on the branch — read them " f"first.\n\n" ) + f"**Recovery Context:**\n" f"- Previous agent: {lease.agent_id}\n" f"- Previous branch: {previous_branch}\n" f"- Time they spent: " f"{time_spent_minutes:.1f} minutes\n" f"- Recovery reason: lease expired (no progress " f"updates)\n" f"- Your task: Complete the ORIGINAL task " f"requirements, building on existing work\n" ), recovery_expires_at=now + timedelta(hours=24), ) # 1. Update task model (source of truth) if task is available task = self._find_task(lease.task_id) if task: task.recovery_info = recovery_info # Clear ownership so task re-enters the assignment pool task.assigned_to = None logger.info( f"Updated task {lease.task_id} model with recovery " f"info, cleared assigned_to" ) else: logger.warning( f"Task {lease.task_id} not found in task list for " f"recovery info update" ) # 2. Dual-write to Kanban for audit trail # Don't fail entire recovery if Kanban write fails try: await self._create_recovery_handoff_comment(lease, recovery_info) except Exception as e: logger.warning(f"Failed to write recovery comment to Kanban: {e}") # Continue - task model update is what matters # Remove from active leases async with self.lease_lock: if lease.task_id in self.active_leases: del self.active_leases[lease.task_id] # Remove assignment from persistence await self.assignment_persistence.remove_assignment(lease.agent_id) # Invoke recovery callback so server can clean in-memory state if self.on_recovery_callback is not None: try: self.on_recovery_callback(lease.agent_id, lease.task_id) except Exception as e: logger.warning( f"Recovery callback failed for task " f"{lease.task_id}: {e}" ) # Reset task on board: status → TODO, clear assigned_to try: if hasattr(self.kanban_client, "update_task"): await self.kanban_client.update_task( lease.task_id, {"status": TaskStatus.TODO, "assigned_to": None}, ) elif hasattr(self.kanban_client, "update_task_status"): # Fallback: at least reset status await self.kanban_client.update_task_status( lease.task_id, TaskStatus.TODO ) logger.warning( f"Kanban client lacks update_task — " f"assigned_to not cleared on board for " f"{lease.task_id}" ) else: logger.warning( f"Kanban client does not support task updates, " f"task {lease.task_id} not updated on board" ) except Exception as e: logger.error(f"Failed to reset task {lease.task_id} on board: {e}") # Don't fail entire recovery if board update fails # In-memory state is already updated # Track in history self.lease_history.append( { "event": "lease_recovered", "task_id": lease.task_id, "agent_id": lease.agent_id, "timestamp": datetime.now(timezone.utc).isoformat(), "progress_at_recovery": lease.progress_percentage, "total_renewals": lease.renewal_count, } ) return True except Exception as e: logger.error(f"Error recovering lease for task {lease.task_id}: {e}") return False
[docs] async def get_expiring_leases(self) -> List[AssignmentLease]: """ Get leases that are expiring soon. Returns ------- List of leases expiring within warning threshold """ expiring = [] async with self.lease_lock: for lease in self.active_leases.values(): if lease.is_expiring_soon and not lease.is_expired: expiring.append(lease) return expiring
async def _persist_lease(self, lease: AssignmentLease) -> None: """Persist lease information to assignment persistence.""" assignment = await self.assignment_persistence.get_assignment(lease.agent_id) if assignment: assignment["lease_expires"] = lease.lease_expires.isoformat() assignment["lease_renewed_at"] = lease.last_renewed.isoformat() assignment["renewal_count"] = lease.renewal_count assignment["progress_percentage"] = lease.progress_percentage assignment["last_progress_update"] = datetime.now(timezone.utc).isoformat() assignment["update_timestamps"] = [ ts.isoformat() for ts in lease.update_timestamps ] # Persist merge-conflict extension counter so the cap # survives a service restart during the extension window # (Codex P1 on PR #350). assignment["merge_conflict_extensions"] = lease.merge_conflict_extensions await self.assignment_persistence.save_assignment( lease.agent_id, lease.task_id, assignment.get("assigned_at", datetime.now(timezone.utc).isoformat()), )
[docs] async def load_active_leases(self) -> None: """Load active leases from persistence on startup.""" assignments = await self.assignment_persistence.load_assignments() for agent_id, assignment in assignments.items(): task_id = assignment["task_id"] # Reconstruct lease from assignment # Normalize naive datetimes to UTC for backwards compatibility assigned_at = _ensure_timezone_aware( datetime.fromisoformat(assignment["assigned_at"]) ) lease_expires = _ensure_timezone_aware( datetime.fromisoformat( assignment.get( "lease_expires", datetime.now(timezone.utc).isoformat() ) ) ) last_renewed = _ensure_timezone_aware( datetime.fromisoformat( assignment.get("lease_renewed_at", assignment["assigned_at"]) ) ) # Restore update timestamps for cadence-based recovery raw_timestamps = assignment.get("update_timestamps", []) update_timestamps = [ _ensure_timezone_aware(datetime.fromisoformat(ts)) for ts in raw_timestamps ] lease = AssignmentLease( task_id=task_id, agent_id=agent_id, assigned_at=assigned_at, lease_expires=lease_expires, last_renewed=last_renewed, renewal_count=assignment.get("renewal_count", 0), progress_percentage=assignment.get("progress_percentage", 0), update_timestamps=update_timestamps, # Restore merge-conflict extension counter so the cap # survives a restart during the extension window # (Codex P1 on PR #350). Defaults to 0 for assignments # written before this field existed. merge_conflict_extensions=assignment.get( "merge_conflict_extensions", 0 ), ) self.active_leases[task_id] = lease logger.info(f"Loaded {len(self.active_leases)} active leases from persistence")
[docs] def get_lease_statistics(self) -> Dict[str, Any]: """Get statistics about current leases.""" stats: Dict[str, Any] = { "total_active": len(self.active_leases), "expired": 0, "expiring_soon": 0, "high_renewal_count": 0, "by_status": {}, "average_renewal_count": 0, } total_renewals = 0 for lease in self.active_leases.values(): # Count by status status = lease.status.value by_status_dict: Dict[str, int] = stats["by_status"] by_status_dict[status] = by_status_dict.get(status, 0) + 1 # Count specific conditions if lease.is_expired: expired_count: int = stats["expired"] stats["expired"] = expired_count + 1 elif lease.is_expiring_soon: expiring_count: int = stats["expiring_soon"] stats["expiring_soon"] = expiring_count + 1 if lease.renewal_count >= self.max_renewals: high_renewal_count: int = stats["high_renewal_count"] stats["high_renewal_count"] = high_renewal_count + 1 total_renewals += lease.renewal_count if self.active_leases: stats["average_renewal_count"] = total_renewals / len(self.active_leases) return stats
[docs] def calculate_adaptive_timeout( self, progress: int, update_count: int, has_recent_activity: bool ) -> tuple[int, int]: """ Calculate adaptive timeout based on task state (progressive timeout). Parameters ---------- progress : int Current progress percentage (0-100) update_count : int Number of progress updates received has_recent_activity : bool Whether task shows recent activity Returns ------- tuple[int, int] (lease_seconds, grace_seconds) timeout configuration Notes ----- Progressive timeout phases (widened 2026-04-12 after experiment 66 evidence showed agents routinely go 2+ minutes between progress reports during implementation bursts — the previous 90-120s timeouts caused leases to expire mid-implementation, recovering in-progress tasks and reassigning them to other agents): - Phase 1 (Unproven): No updates yet → 180s + 60s = 240s total - Phase 2 (Working): First update → 240s + 60s = 300s total - Phase 3 (Proven): 25-75% progress → 300s + 60s = 360s total - Phase 4 (Finishing): >75% progress → 360s + 90s = 450s total Phase 4 was widened 2026-04-25 (snake_game-v1 cascade). The original "near completion = faster recovery" intuition was backwards: tail-phase activities (test runs, builds, commits, push, conflict resolution) take LONGER between progress reports than the middle phase, not shorter. Empirical evidence showed 161-215s gaps during the final 25% routinely tripped the old 210s window, causing recovery on tasks that were actually completing successfully. Phase 4 is now the longest window, not the shortest. The 90s grace also covers silent validator LLM calls (60-120s each) that run after 100% is reported; touch_lease is called before each attempt. These tolerances accommodate the observed 116-120s gap between progress reports during contract-first implementation work, plus a comfortable buffer for agents reading contract files and running tests locally without touching MCP tools. """ # Phase 1: No updates yet - strict but tolerant of setup time if update_count == 0: return (180, 60) # Phase 2: First update received - moderate timeout if update_count == 1: return (240, 60) # Phase 4: Near completion — LONGEST window. Tail-phase activities # (final tests, build, commit, push, conflict resolution) routinely # run 2-4 minutes between progress reports; validator LLM calls add # another 60-120s each. Shorter windows here cause spurious recovery # on tasks that are actively completing. if progress >= 75: return (360, 90) # Phase 3: Good progress (25-75%) if progress >= 25: return (300, 60) # Default: working state return (240, 60)
[docs] async def should_recover_expired_lease(self, lease: AssignmentLease) -> bool: """ Determine if expired lease should be recovered using cadence detection. Compares time since last progress update against the agent's own median update interval * silence_multiplier. If the agent has been silent for longer than expected based on its established cadence, it's considered dead and the task should be recovered. Defense-in-depth guard (Simon decision 011b3fad): if the task is already in a terminal state (DONE/BLOCKED) on the board, skip recovery regardless of cadence. The lease is stale bookkeeping at that point — recovering a finished task only causes a fresh agent to redo work that's already complete (snake_game-v1 cascade). The lease will be cleared on the next monitor pass; we just don't reassign. Parameters ---------- lease : AssignmentLease The expired lease to evaluate Returns ------- bool True if task should be recovered, False to give more time or because the task is already terminal. Notes ----- Real data from logs: median progress interval ~47s, mean ~60s. Default silence_multiplier is 1.5x — configurable via constructor. Fallback: if fewer than 2 progress updates exist (can't compute median), always recover since the agent has no established cadence. """ # Defense-in-depth: never recover a task that's already terminal. # The board status flips to DONE/BLOCKED before lease bookkeeping # has caught up; recovering at that point reassigns finished work # to a fresh agent and produces duplicate output. if hasattr(self.kanban_client, "get_task_by_id"): try: task_obj = await self.kanban_client.get_task_by_id(lease.task_id) if task_obj is not None: status_value = getattr(task_obj, "status", None) # Accept either TaskStatus enum or string forms status_str = ( getattr(status_value, "value", None) or str(status_value) ).lower() if status_str in {"done", "completed", "blocked"}: # Observability counter — every hit here is a # latent coordination bug elsewhere (lease # bookkeeping fell behind the board status flip). # The guard prevents the bad outcome (recovering # a finished task), but the metric makes the # underlying bug visible instead of swallowing # it silently. self.recoveries_skipped_terminal_status += 1 logger.warning( f"Task {lease.task_id} is terminal " f"(status={status_str}); skipping recovery. " f"This indicates a stale lease — the board " f"reached a terminal state but lease cleanup " f"didn't fire. Counter: " f"recoveries_skipped_terminal_status=" f"{self.recoveries_skipped_terminal_status}" ) return False except Exception as e: # Don't block recovery on a kanban lookup hiccup — fall # through to the cadence check. logger.debug(f"Terminal-status check failed for {lease.task_id}: {e}") now = datetime.now(timezone.utc) median_interval = lease.median_update_interval # Not enough data to compute cadence — recover if median_interval is None: logger.info( f"Task {lease.task_id}: no established update cadence " f"(updates={len(lease.update_timestamps)}), recovering" ) return True # Calculate silence duration from last update if lease.update_timestamps: last_update = max(lease.update_timestamps) silence_seconds = (now - last_update).total_seconds() else: silence_seconds = (now - lease.assigned_at).total_seconds() threshold = median_interval * self.silence_multiplier if silence_seconds > threshold: logger.info( f"Task {lease.task_id}: silence={silence_seconds:.0f}s > " f"threshold={threshold:.0f}s " f"(median={median_interval:.0f}s * " f"{self.silence_multiplier}x), recovering" ) return True logger.info( f"Task {lease.task_id}: silence={silence_seconds:.0f}s <= " f"threshold={threshold:.0f}s " f"(median={median_interval:.0f}s * " f"{self.silence_multiplier}x), extending grace" ) return False
async def _create_recovery_handoff_comment( self, lease: AssignmentLease, recovery_info: RecoveryInfo ) -> None: """ Post recovery information to Kanban as a comment (audit trail). This is the dual-write for observability. The task model holds the authoritative recovery info that agents use. Parameters ---------- lease : AssignmentLease The lease being recovered recovery_info : RecoveryInfo The structured recovery information """ # Build handoff message from recovery info handoff_message = ( f"⚠️ **TASK RECOVERED FROM AGENT " f"{recovery_info.recovered_from_agent}**\n\n" f"**Recovery Details:**\n" f"- Recovered at: " f"{recovery_info.recovered_at.strftime('%Y-%m-%d %H:%M:%S UTC')}\n" f"- Progress: {recovery_info.previous_progress}%\n" f"- Time spent: {recovery_info.time_spent_minutes:.1f} minutes\n" f"- Reason: {recovery_info.recovery_reason}\n\n" f"{recovery_info.instructions}" ) # Add comment to Kanban board await self.kanban_client.add_comment(lease.task_id, handoff_message) logger.info( f"Added recovery handoff comment to Kanban for task {lease.task_id}" )
[docs] class LeaseMonitor: """Background monitor for lease expiration and recovery."""
[docs] def __init__( self, lease_manager: AssignmentLeaseManager, check_interval_seconds: int = 60 ): """ Initialize the lease monitor. Parameters ---------- lease_manager The lease manager instance. check_interval_seconds How often to check for expired leases. """ self.lease_manager = lease_manager self.check_interval = check_interval_seconds self._running = False self._monitor_task: Optional[asyncio.Task[None]] = None
[docs] async def start(self) -> None: """Start monitoring for expired leases.""" if self._running: logger.warning("Lease monitor already running") return # Load existing leases first await self.lease_manager.load_active_leases() self._running = True self._monitor_task = asyncio.create_task(self._monitor_loop()) logger.info(f"Lease monitor started (interval: {self.check_interval}s)")
[docs] async def stop(self) -> None: """Stop the lease monitor.""" self._running = False if self._monitor_task: self._monitor_task.cancel() try: await self._monitor_task except asyncio.CancelledError: pass logger.info("Lease monitor stopped")
async def _monitor_loop(self) -> None: """Monitor lease expiration and recover dead agents.""" while self._running: try: # Check for expired leases expired_leases = await self.lease_manager.check_expired_leases() # Recover expired leases (with smart checks) for lease in expired_leases: should_recover = ( await self.lease_manager.should_recover_expired_lease(lease) ) if not should_recover: logger.info( f"Skipping recovery for {lease.task_id} " f"(smart checks indicate agent still working)" ) continue success = await self.lease_manager.recover_expired_lease(lease) if success: logger.info( f"Successfully recovered expired lease for " f"task {lease.task_id}" ) else: logger.error( f"Failed to recover expired lease for task {lease.task_id}" ) # Log statistics periodically stats = self.lease_manager.get_lease_statistics() if stats["total_active"] > 0: logger.info( f"Lease stats: {stats['total_active']} active, " f"{stats['expiring_soon']} expiring soon, " f"{stats['expired']} expired" ) # Check for expiring leases and log warnings expiring = await self.lease_manager.get_expiring_leases() for lease in expiring: logger.warning( f"Lease expiring soon: task {lease.task_id} " f"(expires in {lease.time_remaining})" ) await asyncio.sleep(self.check_interval) except asyncio.CancelledError: logger.warning("Lease monitor cancelled") raise except Exception as e: logger.error( f"Error in lease monitor: {e}", exc_info=True, ) await asyncio.sleep(self.check_interval) logger.warning("Lease monitor loop exited")