Function Reference: CPM and Subtask Dependency System#

Quick Navigation#


CPM Functions#

get_optimal_agent_count()#

Location: src/marcus_mcp/tools/scheduling.py:16-96

async def get_optimal_agent_count(
    include_details: bool = False,
    state: Any = None,
) -> Dict[str, Any]:
    """
    Calculate optimal number of agents using CPM analysis.

    Uses the unified dependency graph (including parent tasks and subtasks)
    to determine the optimal agent count for maximum efficiency.

    Parameters
    ----------
    include_details : bool
        Whether to include detailed parallel opportunities (default: False)
    state : Any
        Marcus server state instance (provides access to project_tasks)

    Returns
    -------
    Dict[str, Any]
        {
            "success": bool,
            "optimal_agents": int,
            "critical_path_hours": float,
            "max_parallelism": int,
            "estimated_completion_hours": float,
            "single_agent_hours": float,
            "efficiency_gain_percent": float,
            "total_tasks": int,
            "parallel_opportunities": List[Dict]  # if include_details=True
        }

    Raises
    ------
    ValueError
        If dependency graph contains cycles

    When called
    -----------
    - On-demand via API
    - NOT called automatically
    - NOT called during project selection
    - NOT called during refresh_project_state
    """

Usage:

# In agent code
result = await get_optimal_agent_count(include_details=True, state=server)
if result["success"]:
    optimal_agents = result["optimal_agents"]
    efficiency = result["efficiency_gain_percent"]

calculate_optimal_agents()#

Location: src/marcus_mcp/coordinator/scheduler.py:210-337

def calculate_optimal_agents(tasks: List[Task]) -> ProjectSchedule:
    """
    Calculate optimal number of agents using unified dependency graph.

    Uses critical path method (CPM) to find:
    1. Longest dependency chain (critical path)
    2. Maximum parallelism at any time point
    3. Optimal agent count for maximum efficiency

    Algorithm
    ---------
    1. Filter to workable tasks (is_subtask=True, status != DONE)
    2. Check for dependency cycles
    3. Calculate earliest start/finish times via topological sort
    4. Find critical path length (max finish time)
    5. Find peak parallelism (max concurrent tasks)
    6. Set optimal_agents = max_parallelism (peak allocation strategy)

    Parameters
    ----------
    tasks : List[Task]
        All tasks in the project (parents + subtasks)

    Returns
    -------
    ProjectSchedule
        Dataclass with:
        - optimal_agents: int
        - critical_path_hours: float
        - max_parallelism: int
        - estimated_completion_hours: float
        - single_agent_hours: float
        - efficiency_gain: float (0.0-1.0)
        - parallel_opportunities: List[Dict]

    Raises
    ------
    ValueError
        If cycles detected in dependency graph

    Notes
    -----
    - ONLY subtasks (is_subtask=True) are analyzed
    - Parent tasks are EXCLUDED from calculation
    - Completed tasks (status=DONE) are EXCLUDED
    - Strategy: provision for PEAK demand (max_parallelism)
    - Rationale: Idle agents are cheap; bottlenecks are not resolvable
    """

Key Lines:

  • Line 248-250: Filter to workable tasks

  • Line 265: Cycle detection

  • Line 269: Calculate task times

  • Line 272: Find critical path

  • Line 279: Find max parallelism

  • Line 291: Set optimal = max_parallelism


topological_sort()#

Location: src/marcus_mcp/coordinator/scheduler.py:50-104

def topological_sort(tasks: List[Task]) -> List[Task]:
    """
    Sort tasks in topological order (dependencies before dependents).

    Uses Kahn's algorithm for topological sorting of DAG.

    Parameters
    ----------
    tasks : List[Task]
        List of tasks to sort

    Returns
    -------
    List[Task]
        Tasks sorted in dependency order

    Raises
    ------
    ValueError
        If the dependency graph contains a cycle
    """

calculate_task_times()#

Location: src/marcus_mcp/coordinator/scheduler.py:166-207

def calculate_task_times(tasks: List[Task]) -> Dict[str, Dict[str, Any]]:
    """
    Calculate earliest start and finish times for each task using CPM.

    Forward pass through DAG to compute start/finish times.

    Parameters
    ----------
    tasks : List[Task]
        List of tasks (must be in topological order)

    Returns
    -------
    Dict[str, Dict[str, Any]]
        Mapping of task_id to {
            "start": float,
            "finish": float,
            "task": Task
        }
    """

Subtask Manager Functions#

SubtaskManager.add_subtasks()#

Location: src/marcus_mcp/coordinator/subtask_manager.py:130-243

def add_subtasks(
    self,
    parent_task_id: str,
    subtasks: List[Dict[str, Any]],
    project_tasks: Optional[List[Task]] = None,
    metadata: Optional[SubtaskMetadata] = None,
) -> List[Task]:
    """
    Add subtasks for a parent task to unified project_tasks storage.

    Creates Task objects with is_subtask=True and appends to project_tasks.

    Parameters
    ----------
    parent_task_id : str
        ID of the parent task
    subtasks : List[Dict[str, Any]]
        List of subtask dictionaries with fields:
        - name: str (required)
        - description: str (required)
        - estimated_hours: float (required)
        - dependencies: List[str] (optional)
        - priority: Priority (optional)
        - provides: str (optional) - what this outputs
        - requires: str (optional) - what this needs
        - file_artifacts: List[str] (optional)
    project_tasks : Optional[List[Task]]
        Unified task storage. If None, uses legacy mode only.
    metadata : Optional[SubtaskMetadata]
        Metadata about the decomposition

    Returns
    -------
    List[Task]
        Created Task objects (with is_subtask=True)

    Storage
    -------
    - Stores in legacy format (self.subtasks dict)
    - Stores in unified format (appends to project_tasks)
    - Persists to disk (data/marcus_state/subtasks.json)

    When called
    -----------
    - During task decomposition (NLP phase)
    - NOT called during project selection
    """

Key Fields Set:

  • is_subtask=True: Marks as subtask

  • parent_task_id: Links to parent

  • provides: Output for cross-parent matching

  • requires: Input requirements for matching

  • dependencies: List of task IDs (intra-parent deps)


SubtaskManager.migrate_to_unified_storage()#

Location: src/marcus_mcp/coordinator/subtask_manager.py:666-731

def migrate_to_unified_storage(self, project_tasks: List[Task]) -> None:
    """
    Migrate old Subtask objects to unified Task storage.

    Converts legacy Subtask objects to Task objects with is_subtask=True
    and appends them to project_tasks.

    CRITICAL: Only migrates subtasks whose parent tasks exist in project_tasks.
    This ensures subtasks from other projects don't leak in.

    Parameters
    ----------
    project_tasks : List[Task]
        Unified task storage to migrate subtasks into

    Triggers
    --------
    - Called in refresh_project_state() (server.py line 844)
    - Only called once per project (checked by _subtasks_migrated flag)
    - After migration completes, cross-parent deps are wired

    Safety Checks
    -------------
    parent_task_ids = {t.id for t in project_tasks if not t.is_subtask}
    for subtask_id, subtask in self.subtasks.items():
        if subtask.parent_task_id not in parent_task_ids:
            continue  # Skip if parent doesn't exist

    Result
    ------
    - Legacy subtasks converted to unified format
    - Only subtasks for current project included
    - State persisted to disk
    """

When Called:

  • Line 844 in server.py during refresh_project_state()

  • Condition: if not self._subtasks_migrated

  • Only ONCE per project selection


SubtaskManager.get_subtasks()#

Location: src/marcus_mcp/coordinator/subtask_manager.py:245-301

def get_subtasks(
    self,
    parent_task_id: str,
    project_tasks: Optional[List[Task]] = None,
) -> List[Task]:
    """
    Get all subtasks for a parent task from unified storage.

    Queries from unified storage or falls back to legacy storage.

    Parameters
    ----------
    parent_task_id : str
        ID of the parent task
    project_tasks : Optional[List[Task]]
        Unified task storage. If None, falls back to legacy.

    Returns
    -------
    List[Task]
        List of Task objects (with is_subtask=True) ordered by subtask_index
    """

SubtaskManager.update_subtask_status()#

Location: src/marcus_mcp/coordinator/subtask_manager.py:343-401

def update_subtask_status(
    self,
    subtask_id: str,
    status: TaskStatus,
    project_tasks: Optional[List[Task]] = None,
    assigned_to: Optional[str] = None,
) -> bool:
    """
    Update the status of a subtask in unified storage.

    Updates both unified and legacy storage for backwards compatibility.

    Parameters
    ----------
    subtask_id : str
        ID of the subtask
    status : TaskStatus
        New status (TODO, IN_PROGRESS, DONE, BLOCKED)
    project_tasks : Optional[List[Task]]
        Unified task storage. If None, falls back to legacy.
    assigned_to : Optional[str]
        Agent ID assigned to the subtask

    Returns
    -------
    bool
        True if update successful
    """

Dependency Wiring Functions#

wire_cross_parent_dependencies()#

Location: src/marcus_mcp/coordinator/dependency_wiring.py:410-496

async def wire_cross_parent_dependencies(
    project_tasks: List[Task],
    ai_engine: Any,
    embedding_model: Optional[Any] = None,
) -> Dict[str, Any]:
    """
    Create cross-parent dependencies using hybrid matching.

    After all tasks have been decomposed, analyzes each subtask's
    'requires' field and matches it against other subtasks' 'provides'
    fields to create fine-grained cross-parent dependencies.

    Pipeline
    --------
    For each subtask with requires field:
        1. Filter candidates using embeddings (fast semantic matching)
        2. Use LLM to validate candidates (accurate reasoning)
        3. Run sanity checks (cycles, phase order, etc.)
        4. Add validated deps to task.dependencies

    Parameters
    ----------
    project_tasks : List[Task]
        All tasks in the project (parents and subtasks)
    ai_engine : Any
        AI engine for LLM reasoning
    embedding_model : Optional[Any]
        Sentence transformer model for embeddings (optional)

    Returns
    -------
    Dict[str, Any]
        Statistics:
        {
            "subtasks_analyzed": int,
            "dependencies_created": int,
            "llm_calls": int,
            "rejected_cycles": int,
            "skipped_no_requires": int,
            "skipped_no_candidates": int,
            "total_time_seconds": float
        }

    When called
    -----------
    - Line 872 in refresh_project_state() (server.py)
    - After migrate_to_unified_storage() completes
    - Only run once per project (not re-run on subsequent refreshes)
    """

Key Algorithm:

  1. Embedding filter: Fast semantic matching (cosine similarity ≥ 0.6)

  2. LLM reasoning: Accurate validation on candidate set

  3. Sanity checks:

    • Cycle detection (DFS)

    • Same-parent check (skip intra-parent)

    • Phase ordering (Design → Implement → Test)


hybrid_dependency_resolution()#

Location: src/marcus_mcp/coordinator/dependency_wiring.py:325-407

async def hybrid_dependency_resolution(
    subtask: Task,
    all_tasks: List[Task],
    ai_engine: Any,
    embedding_model: Optional[Any] = None,
) -> List[str]:
    """
    Find cross-parent dependencies using hybrid approach.

    Combines embeddings (fast filtering) with LLM reasoning (accurate)
    and sanity checks (prevent errors).

    Parameters
    ----------
    subtask : Task
        The subtask to analyze
    all_tasks : List[Task]
        All tasks in the project
    ai_engine : Any
        AI engine for LLM reasoning
    embedding_model : Optional[Any]
        Sentence transformer model (optional)

    Returns
    -------
    List[str]
        List of task IDs to add as dependencies

    Process
    -------
    1. Stage 1: Filter candidates using embeddings
    2. Stage 2: Use LLM to make final decision
    3. Stage 3: Sanity checks (cycles, phase order, etc.)
    """

filter_candidates_by_embeddings()#

Location: src/marcus_mcp/coordinator/dependency_wiring.py:21-98

def filter_candidates_by_embeddings(
    subtask: Task,
    all_tasks: List[Task],
    embedding_model: Any,
    similarity_threshold: float = 0.6,
    max_candidates: int = 10,
) -> List[Tuple[Task, float]]:
    """
    Filter potential dependency candidates using semantic embeddings.

    Uses sentence transformers to compute semantic similarity between
    the subtask's requires field and other tasks' provides fields.

    Parameters
    ----------
    subtask : Task
        The subtask being analyzed
    all_tasks : List[Task]
        All tasks in the project
    embedding_model : Any
        Sentence transformer model
    similarity_threshold : float
        Minimum cosine similarity (default 0.6)
    max_candidates : int
        Maximum candidates to return (default 10)

    Returns
    -------
    List[Tuple[Task, float]]
        List of (task, similarity_score) tuples, sorted by similarity descending

    Process
    -------
    1. Encode subtask.requires field
    2. For each other subtask:
       - Skip if same parent (intra-parent handled separately)
       - Skip if no provides field
       - Encode provides field
       - Compute cosine similarity
       - Keep if >= threshold
    3. Sort by similarity, return top N
    """

resolve_dependencies_with_llm()#

Location: src/marcus_mcp/coordinator/dependency_wiring.py:101-216

async def resolve_dependencies_with_llm(
    subtask: Task,
    candidates: List[Task],
    ai_engine: Any,
) -> Dict[str, Any]:
    """
    Use LLM to determine which candidates are true dependencies.

    Given a subtask and candidate providers, asks LLM to reason
    about which should actually be dependencies.

    Parameters
    ----------
    subtask : Task
        The subtask being analyzed
    candidates : List[Task]
        Candidate provider tasks (pre-filtered by embeddings)
    ai_engine : Any
        AI engine for LLM reasoning

    Returns
    -------
    Dict[str, Any]
        {
            "dependencies": List[str],  # task IDs to add
            "reasoning": Dict[str, str] # explanations
        }

    LLM Rules (from prompt)
    -----------------------
    - Only create if task TRULY NEEDS the output
    - Match requires semantically with provides
    - DO NOT create same-parent dependencies
    - Respect workflow: Design → Implement → Test
    - Conservative: false negative > false positive
    - Match actual specification, not research
    """

would_create_cycle()#

Location: src/marcus_mcp/coordinator/dependency_wiring.py:219-274

def would_create_cycle(
    from_task_id: str,
    to_task_id: str,
    all_tasks: List[Task],
) -> bool:
    """
    Check if adding a dependency would create a circular dependency.

    Uses depth-first search to detect if edge from_task_id → to_task_id
    would create a cycle in the dependency graph.

    Parameters
    ----------
    from_task_id : str
        The task that would gain a new dependency
    to_task_id : str
        The task being added as a dependency
    all_tasks : List[Task]
        All tasks in the project

    Returns
    -------
    bool
        True if adding this dependency would create a cycle
    """

validate_phase_order()#

Location: src/marcus_mcp/coordinator/dependency_wiring.py:277-322

def validate_phase_order(subtask: Task, dependency_task: Task) -> bool:
    """
    Validate that dependency follows proper phase ordering.

    Ensures dependencies follow Design → Implement → Test workflow.
    For example, Implementation can depend on Design, but Design
    cannot depend on Implementation.

    Parameters
    ----------
    subtask : Task
        The task that would gain the dependency
    dependency_task : Task
        The task being added as a dependency

    Returns
    -------
    bool
        True if phase ordering is valid

    Phase Order
    -----------
    0: design
    1: implement
    2: test
    3: integration

    Valid Dependencies
    ------------------
    implement → design ✓
    test → implement ✓
    design → implement ✗
    """

Trigger Points#

Subtask Migration Trigger#

Location: server.py:839-845

if self.subtask_manager and self.project_tasks is not None and not self._subtasks_migrated:
    self.subtask_manager.migrate_to_unified_storage(self.project_tasks)
    self._subtasks_migrated = True

When: First call to refresh_project_state() after select_project() Condition: _subtasks_migrated == False Called by: select_project()refresh_project_state()


Cross-Parent Wiring Trigger#

Location: server.py:847-886

try:
    from src.marcus_mcp.coordinator import wire_cross_parent_dependencies

    embedding_model = SentenceTransformer("all-MiniLM-L6-v2")

    stats = await wire_cross_parent_dependencies(
        self.project_tasks,
        self.ai_engine,
        embedding_model
    )
except Exception as e:
    logger.error(f"Failed to wire cross-parent dependencies: {e}")

When: After migration completes successfully Condition: After _subtasks_migrated = True and no errors Called by: refresh_project_state()


CPM Calculation Trigger#

Location: External API call

await get_optimal_agent_count(include_details=True, state=server)

When: On-demand, never automatic Called by: User/agent explicitly via API Never called by: select_project(), refresh_project_state(), or any internal function


Data Structures#

ProjectSchedule (dataclass)#

Location: scheduler.py:18-47

@dataclass
class ProjectSchedule:
    optimal_agents: int
    critical_path_hours: float
    max_parallelism: int
    estimated_completion_hours: float
    single_agent_hours: float
    efficiency_gain: float  # 0.0-1.0
    parallel_opportunities: List[Dict[str, Any]] = field(default_factory=list)

Subtask (dataclass)#

Location: subtask_manager.py:20-73

@dataclass
class Subtask:
    id: str
    parent_task_id: str
    name: str
    description: str
    status: TaskStatus
    priority: Priority
    assigned_to: Optional[str]
    created_at: datetime
    estimated_hours: float
    dependencies: List[str] = field(default_factory=list)
    dependency_types: List[str] = field(default_factory=list)
    file_artifacts: List[str] = field(default_factory=list)
    provides: Optional[str] = None
    requires: Optional[str] = None
    order: int = 0

SubtaskMetadata (dataclass)#

Location: subtask_manager.py:75-93

@dataclass
class SubtaskMetadata:
    shared_conventions: Dict[str, Any] = field(default_factory=dict)
    decomposed_at: datetime = field(default_factory=datetime.now)
    decomposed_by: str = "ai"

Configuration and Constants#

Embedding Similarity Threshold#

  • File: dependency_wiring.py:21-98

  • Default: 0.6 (cosine similarity)

  • Meaning: Tasks with similarity < 0.6 are filtered out

Max Candidates for LLM#

  • File: dependency_wiring.py:21-98

  • Default: 10

  • Meaning: Only top 10 similar tasks sent to LLM

Subtask State File#

  • Location: data/marcus_state/subtasks.json

  • Format: JSON with structure {subtasks, parent_to_subtasks, metadata}

  • Purpose: Persist subtask state across restarts

Embedding Model#

  • Model: all-MiniLM-L6-v2

  • Library: sentence-transformers

  • Fallback: If unavailable, uses LLM-only matching

  • Location: Loaded in refresh_project_state() line 861


Error Handling#

CPM Calculation Errors#

except ValueError as e:
    # Dependency cycles detected
    return {
        "success": False,
        "error": f"Cannot calculate optimal agents: {str(e)}",
        "suggestion": "Check for circular dependencies"
    }
except Exception as e:
    logger.error(f"Error calculating optimal agents: {e}")
    return {"success": False, "error": str(e)}

Dependency Wiring Errors#

try:
    await wire_cross_parent_dependencies(...)
except Exception as e:
    logger.error(f"Failed to wire cross-parent dependencies: {e}")
    # Don't fail refresh - wiring is optional

Performance Considerations#

  1. CPM Calculation: O(V + E) where V=tasks, E=dependencies

    • Topological sort: O(V + E)

    • Task times calculation: O(V + E)

    • Critical path: O(V)

  2. Embedding Filtering: O(N * M) where N=subtasks, M=candidates

    • Reduced by similarity threshold

  3. LLM Reasoning: Bound by max_candidates (default 10)

    • One LLM call per subtask with requires field

  4. Cross-Parent Wiring: Only runs once per project


Testing#

Test Files#

  • Unit tests: tests/unit/coordinator/test_scheduler.py

  • Integration tests: tests/integration/e2e/test_task_decomposition_e2e.py

  • Dependency wiring: tests/unit/coordinator/test_dependency_wiring.py

Key Test Cases#

  • test_calculate_optimal_agents_sequential: Linear dependency chain

  • test_calculate_optimal_agents_fully_parallel: Independent tasks

  • test_calculate_optimal_agents_mixed: Mix of sequential and parallel

  • test_calculate_optimal_agents_with_subtasks: Includes subtask filtering