Function Reference: CPM and Subtask Dependency System#
CPM Functions#
get_optimal_agent_count()#
Location: src/marcus_mcp/tools/scheduling.py:16-96
async def get_optimal_agent_count(
include_details: bool = False,
state: Any = None,
) -> Dict[str, Any]:
"""
Calculate optimal number of agents using CPM analysis.
Uses the unified dependency graph (including parent tasks and subtasks)
to determine the optimal agent count for maximum efficiency.
Parameters
----------
include_details : bool
Whether to include detailed parallel opportunities (default: False)
state : Any
Marcus server state instance (provides access to project_tasks)
Returns
-------
Dict[str, Any]
{
"success": bool,
"optimal_agents": int,
"critical_path_hours": float,
"max_parallelism": int,
"estimated_completion_hours": float,
"single_agent_hours": float,
"efficiency_gain_percent": float,
"total_tasks": int,
"parallel_opportunities": List[Dict] # if include_details=True
}
Raises
------
ValueError
If dependency graph contains cycles
When called
-----------
- On-demand via API
- NOT called automatically
- NOT called during project selection
- NOT called during refresh_project_state
"""
Usage:
# In agent code
result = await get_optimal_agent_count(include_details=True, state=server)
if result["success"]:
optimal_agents = result["optimal_agents"]
efficiency = result["efficiency_gain_percent"]
calculate_optimal_agents()#
Location: src/marcus_mcp/coordinator/scheduler.py:210-337
def calculate_optimal_agents(tasks: List[Task]) -> ProjectSchedule:
"""
Calculate optimal number of agents using unified dependency graph.
Uses critical path method (CPM) to find:
1. Longest dependency chain (critical path)
2. Maximum parallelism at any time point
3. Optimal agent count for maximum efficiency
Algorithm
---------
1. Filter to workable tasks (is_subtask=True, status != DONE)
2. Check for dependency cycles
3. Calculate earliest start/finish times via topological sort
4. Find critical path length (max finish time)
5. Find peak parallelism (max concurrent tasks)
6. Set optimal_agents = max_parallelism (peak allocation strategy)
Parameters
----------
tasks : List[Task]
All tasks in the project (parents + subtasks)
Returns
-------
ProjectSchedule
Dataclass with:
- optimal_agents: int
- critical_path_hours: float
- max_parallelism: int
- estimated_completion_hours: float
- single_agent_hours: float
- efficiency_gain: float (0.0-1.0)
- parallel_opportunities: List[Dict]
Raises
------
ValueError
If cycles detected in dependency graph
Notes
-----
- ONLY subtasks (is_subtask=True) are analyzed
- Parent tasks are EXCLUDED from calculation
- Completed tasks (status=DONE) are EXCLUDED
- Strategy: provision for PEAK demand (max_parallelism)
- Rationale: Idle agents are cheap; bottlenecks are not resolvable
"""
Key Lines:
Line 248-250: Filter to workable tasks
Line 265: Cycle detection
Line 269: Calculate task times
Line 272: Find critical path
Line 279: Find max parallelism
Line 291: Set optimal = max_parallelism
topological_sort()#
Location: src/marcus_mcp/coordinator/scheduler.py:50-104
def topological_sort(tasks: List[Task]) -> List[Task]:
"""
Sort tasks in topological order (dependencies before dependents).
Uses Kahn's algorithm for topological sorting of DAG.
Parameters
----------
tasks : List[Task]
List of tasks to sort
Returns
-------
List[Task]
Tasks sorted in dependency order
Raises
------
ValueError
If the dependency graph contains a cycle
"""
calculate_task_times()#
Location: src/marcus_mcp/coordinator/scheduler.py:166-207
def calculate_task_times(tasks: List[Task]) -> Dict[str, Dict[str, Any]]:
"""
Calculate earliest start and finish times for each task using CPM.
Forward pass through DAG to compute start/finish times.
Parameters
----------
tasks : List[Task]
List of tasks (must be in topological order)
Returns
-------
Dict[str, Dict[str, Any]]
Mapping of task_id to {
"start": float,
"finish": float,
"task": Task
}
"""
Subtask Manager Functions#
SubtaskManager.add_subtasks()#
Location: src/marcus_mcp/coordinator/subtask_manager.py:130-243
def add_subtasks(
self,
parent_task_id: str,
subtasks: List[Dict[str, Any]],
project_tasks: Optional[List[Task]] = None,
metadata: Optional[SubtaskMetadata] = None,
) -> List[Task]:
"""
Add subtasks for a parent task to unified project_tasks storage.
Creates Task objects with is_subtask=True and appends to project_tasks.
Parameters
----------
parent_task_id : str
ID of the parent task
subtasks : List[Dict[str, Any]]
List of subtask dictionaries with fields:
- name: str (required)
- description: str (required)
- estimated_hours: float (required)
- dependencies: List[str] (optional)
- priority: Priority (optional)
- provides: str (optional) - what this outputs
- requires: str (optional) - what this needs
- file_artifacts: List[str] (optional)
project_tasks : Optional[List[Task]]
Unified task storage. If None, uses legacy mode only.
metadata : Optional[SubtaskMetadata]
Metadata about the decomposition
Returns
-------
List[Task]
Created Task objects (with is_subtask=True)
Storage
-------
- Stores in legacy format (self.subtasks dict)
- Stores in unified format (appends to project_tasks)
- Persists to disk (data/marcus_state/subtasks.json)
When called
-----------
- During task decomposition (NLP phase)
- NOT called during project selection
"""
Key Fields Set:
is_subtask=True: Marks as subtaskparent_task_id: Links to parentprovides: Output for cross-parent matchingrequires: Input requirements for matchingdependencies: List of task IDs (intra-parent deps)
SubtaskManager.migrate_to_unified_storage()#
Location: src/marcus_mcp/coordinator/subtask_manager.py:666-731
def migrate_to_unified_storage(self, project_tasks: List[Task]) -> None:
"""
Migrate old Subtask objects to unified Task storage.
Converts legacy Subtask objects to Task objects with is_subtask=True
and appends them to project_tasks.
CRITICAL: Only migrates subtasks whose parent tasks exist in project_tasks.
This ensures subtasks from other projects don't leak in.
Parameters
----------
project_tasks : List[Task]
Unified task storage to migrate subtasks into
Triggers
--------
- Called in refresh_project_state() (server.py line 844)
- Only called once per project (checked by _subtasks_migrated flag)
- After migration completes, cross-parent deps are wired
Safety Checks
-------------
parent_task_ids = {t.id for t in project_tasks if not t.is_subtask}
for subtask_id, subtask in self.subtasks.items():
if subtask.parent_task_id not in parent_task_ids:
continue # Skip if parent doesn't exist
Result
------
- Legacy subtasks converted to unified format
- Only subtasks for current project included
- State persisted to disk
"""
When Called:
Line 844 in
server.pyduringrefresh_project_state()Condition:
if not self._subtasks_migratedOnly ONCE per project selection
SubtaskManager.get_subtasks()#
Location: src/marcus_mcp/coordinator/subtask_manager.py:245-301
def get_subtasks(
self,
parent_task_id: str,
project_tasks: Optional[List[Task]] = None,
) -> List[Task]:
"""
Get all subtasks for a parent task from unified storage.
Queries from unified storage or falls back to legacy storage.
Parameters
----------
parent_task_id : str
ID of the parent task
project_tasks : Optional[List[Task]]
Unified task storage. If None, falls back to legacy.
Returns
-------
List[Task]
List of Task objects (with is_subtask=True) ordered by subtask_index
"""
SubtaskManager.update_subtask_status()#
Location: src/marcus_mcp/coordinator/subtask_manager.py:343-401
def update_subtask_status(
self,
subtask_id: str,
status: TaskStatus,
project_tasks: Optional[List[Task]] = None,
assigned_to: Optional[str] = None,
) -> bool:
"""
Update the status of a subtask in unified storage.
Updates both unified and legacy storage for backwards compatibility.
Parameters
----------
subtask_id : str
ID of the subtask
status : TaskStatus
New status (TODO, IN_PROGRESS, DONE, BLOCKED)
project_tasks : Optional[List[Task]]
Unified task storage. If None, falls back to legacy.
assigned_to : Optional[str]
Agent ID assigned to the subtask
Returns
-------
bool
True if update successful
"""
Dependency Wiring Functions#
wire_cross_parent_dependencies()#
Location: src/marcus_mcp/coordinator/dependency_wiring.py:410-496
async def wire_cross_parent_dependencies(
project_tasks: List[Task],
ai_engine: Any,
embedding_model: Optional[Any] = None,
) -> Dict[str, Any]:
"""
Create cross-parent dependencies using hybrid matching.
After all tasks have been decomposed, analyzes each subtask's
'requires' field and matches it against other subtasks' 'provides'
fields to create fine-grained cross-parent dependencies.
Pipeline
--------
For each subtask with requires field:
1. Filter candidates using embeddings (fast semantic matching)
2. Use LLM to validate candidates (accurate reasoning)
3. Run sanity checks (cycles, phase order, etc.)
4. Add validated deps to task.dependencies
Parameters
----------
project_tasks : List[Task]
All tasks in the project (parents and subtasks)
ai_engine : Any
AI engine for LLM reasoning
embedding_model : Optional[Any]
Sentence transformer model for embeddings (optional)
Returns
-------
Dict[str, Any]
Statistics:
{
"subtasks_analyzed": int,
"dependencies_created": int,
"llm_calls": int,
"rejected_cycles": int,
"skipped_no_requires": int,
"skipped_no_candidates": int,
"total_time_seconds": float
}
When called
-----------
- Line 872 in refresh_project_state() (server.py)
- After migrate_to_unified_storage() completes
- Only run once per project (not re-run on subsequent refreshes)
"""
Key Algorithm:
Embedding filter: Fast semantic matching (cosine similarity ≥ 0.6)
LLM reasoning: Accurate validation on candidate set
Sanity checks:
Cycle detection (DFS)
Same-parent check (skip intra-parent)
Phase ordering (Design → Implement → Test)
hybrid_dependency_resolution()#
Location: src/marcus_mcp/coordinator/dependency_wiring.py:325-407
async def hybrid_dependency_resolution(
subtask: Task,
all_tasks: List[Task],
ai_engine: Any,
embedding_model: Optional[Any] = None,
) -> List[str]:
"""
Find cross-parent dependencies using hybrid approach.
Combines embeddings (fast filtering) with LLM reasoning (accurate)
and sanity checks (prevent errors).
Parameters
----------
subtask : Task
The subtask to analyze
all_tasks : List[Task]
All tasks in the project
ai_engine : Any
AI engine for LLM reasoning
embedding_model : Optional[Any]
Sentence transformer model (optional)
Returns
-------
List[str]
List of task IDs to add as dependencies
Process
-------
1. Stage 1: Filter candidates using embeddings
2. Stage 2: Use LLM to make final decision
3. Stage 3: Sanity checks (cycles, phase order, etc.)
"""
filter_candidates_by_embeddings()#
Location: src/marcus_mcp/coordinator/dependency_wiring.py:21-98
def filter_candidates_by_embeddings(
subtask: Task,
all_tasks: List[Task],
embedding_model: Any,
similarity_threshold: float = 0.6,
max_candidates: int = 10,
) -> List[Tuple[Task, float]]:
"""
Filter potential dependency candidates using semantic embeddings.
Uses sentence transformers to compute semantic similarity between
the subtask's requires field and other tasks' provides fields.
Parameters
----------
subtask : Task
The subtask being analyzed
all_tasks : List[Task]
All tasks in the project
embedding_model : Any
Sentence transformer model
similarity_threshold : float
Minimum cosine similarity (default 0.6)
max_candidates : int
Maximum candidates to return (default 10)
Returns
-------
List[Tuple[Task, float]]
List of (task, similarity_score) tuples, sorted by similarity descending
Process
-------
1. Encode subtask.requires field
2. For each other subtask:
- Skip if same parent (intra-parent handled separately)
- Skip if no provides field
- Encode provides field
- Compute cosine similarity
- Keep if >= threshold
3. Sort by similarity, return top N
"""
resolve_dependencies_with_llm()#
Location: src/marcus_mcp/coordinator/dependency_wiring.py:101-216
async def resolve_dependencies_with_llm(
subtask: Task,
candidates: List[Task],
ai_engine: Any,
) -> Dict[str, Any]:
"""
Use LLM to determine which candidates are true dependencies.
Given a subtask and candidate providers, asks LLM to reason
about which should actually be dependencies.
Parameters
----------
subtask : Task
The subtask being analyzed
candidates : List[Task]
Candidate provider tasks (pre-filtered by embeddings)
ai_engine : Any
AI engine for LLM reasoning
Returns
-------
Dict[str, Any]
{
"dependencies": List[str], # task IDs to add
"reasoning": Dict[str, str] # explanations
}
LLM Rules (from prompt)
-----------------------
- Only create if task TRULY NEEDS the output
- Match requires semantically with provides
- DO NOT create same-parent dependencies
- Respect workflow: Design → Implement → Test
- Conservative: false negative > false positive
- Match actual specification, not research
"""
would_create_cycle()#
Location: src/marcus_mcp/coordinator/dependency_wiring.py:219-274
def would_create_cycle(
from_task_id: str,
to_task_id: str,
all_tasks: List[Task],
) -> bool:
"""
Check if adding a dependency would create a circular dependency.
Uses depth-first search to detect if edge from_task_id → to_task_id
would create a cycle in the dependency graph.
Parameters
----------
from_task_id : str
The task that would gain a new dependency
to_task_id : str
The task being added as a dependency
all_tasks : List[Task]
All tasks in the project
Returns
-------
bool
True if adding this dependency would create a cycle
"""
validate_phase_order()#
Location: src/marcus_mcp/coordinator/dependency_wiring.py:277-322
def validate_phase_order(subtask: Task, dependency_task: Task) -> bool:
"""
Validate that dependency follows proper phase ordering.
Ensures dependencies follow Design → Implement → Test workflow.
For example, Implementation can depend on Design, but Design
cannot depend on Implementation.
Parameters
----------
subtask : Task
The task that would gain the dependency
dependency_task : Task
The task being added as a dependency
Returns
-------
bool
True if phase ordering is valid
Phase Order
-----------
0: design
1: implement
2: test
3: integration
Valid Dependencies
------------------
implement → design ✓
test → implement ✓
design → implement ✗
"""
Trigger Points#
Subtask Migration Trigger#
Location: server.py:839-845
if self.subtask_manager and self.project_tasks is not None and not self._subtasks_migrated:
self.subtask_manager.migrate_to_unified_storage(self.project_tasks)
self._subtasks_migrated = True
When: First call to refresh_project_state() after select_project()
Condition: _subtasks_migrated == False
Called by: select_project() → refresh_project_state()
Cross-Parent Wiring Trigger#
Location: server.py:847-886
try:
from src.marcus_mcp.coordinator import wire_cross_parent_dependencies
embedding_model = SentenceTransformer("all-MiniLM-L6-v2")
stats = await wire_cross_parent_dependencies(
self.project_tasks,
self.ai_engine,
embedding_model
)
except Exception as e:
logger.error(f"Failed to wire cross-parent dependencies: {e}")
When: After migration completes successfully
Condition: After _subtasks_migrated = True and no errors
Called by: refresh_project_state()
CPM Calculation Trigger#
Location: External API call
await get_optimal_agent_count(include_details=True, state=server)
When: On-demand, never automatic
Called by: User/agent explicitly via API
Never called by: select_project(), refresh_project_state(), or any internal function
Data Structures#
ProjectSchedule (dataclass)#
Location: scheduler.py:18-47
@dataclass
class ProjectSchedule:
optimal_agents: int
critical_path_hours: float
max_parallelism: int
estimated_completion_hours: float
single_agent_hours: float
efficiency_gain: float # 0.0-1.0
parallel_opportunities: List[Dict[str, Any]] = field(default_factory=list)
Subtask (dataclass)#
Location: subtask_manager.py:20-73
@dataclass
class Subtask:
id: str
parent_task_id: str
name: str
description: str
status: TaskStatus
priority: Priority
assigned_to: Optional[str]
created_at: datetime
estimated_hours: float
dependencies: List[str] = field(default_factory=list)
dependency_types: List[str] = field(default_factory=list)
file_artifacts: List[str] = field(default_factory=list)
provides: Optional[str] = None
requires: Optional[str] = None
order: int = 0
SubtaskMetadata (dataclass)#
Location: subtask_manager.py:75-93
@dataclass
class SubtaskMetadata:
shared_conventions: Dict[str, Any] = field(default_factory=dict)
decomposed_at: datetime = field(default_factory=datetime.now)
decomposed_by: str = "ai"
Configuration and Constants#
Embedding Similarity Threshold#
File:
dependency_wiring.py:21-98Default: 0.6 (cosine similarity)
Meaning: Tasks with similarity < 0.6 are filtered out
Max Candidates for LLM#
File:
dependency_wiring.py:21-98Default: 10
Meaning: Only top 10 similar tasks sent to LLM
Subtask State File#
Location:
data/marcus_state/subtasks.jsonFormat: JSON with structure {subtasks, parent_to_subtasks, metadata}
Purpose: Persist subtask state across restarts
Embedding Model#
Model:
all-MiniLM-L6-v2Library:
sentence-transformersFallback: If unavailable, uses LLM-only matching
Location: Loaded in
refresh_project_state()line 861
Error Handling#
CPM Calculation Errors#
except ValueError as e:
# Dependency cycles detected
return {
"success": False,
"error": f"Cannot calculate optimal agents: {str(e)}",
"suggestion": "Check for circular dependencies"
}
except Exception as e:
logger.error(f"Error calculating optimal agents: {e}")
return {"success": False, "error": str(e)}
Dependency Wiring Errors#
try:
await wire_cross_parent_dependencies(...)
except Exception as e:
logger.error(f"Failed to wire cross-parent dependencies: {e}")
# Don't fail refresh - wiring is optional
Performance Considerations#
CPM Calculation: O(V + E) where V=tasks, E=dependencies
Topological sort: O(V + E)
Task times calculation: O(V + E)
Critical path: O(V)
Embedding Filtering: O(N * M) where N=subtasks, M=candidates
Reduced by similarity threshold
LLM Reasoning: Bound by max_candidates (default 10)
One LLM call per subtask with requires field
Cross-Parent Wiring: Only runs once per project
Testing#
Test Files#
Unit tests:
tests/unit/coordinator/test_scheduler.pyIntegration tests:
tests/integration/e2e/test_task_decomposition_e2e.pyDependency wiring:
tests/unit/coordinator/test_dependency_wiring.py
Key Test Cases#
test_calculate_optimal_agents_sequential: Linear dependency chaintest_calculate_optimal_agents_fully_parallel: Independent taskstest_calculate_optimal_agents_mixed: Mix of sequential and paralleltest_calculate_optimal_agents_with_subtasks: Includes subtask filtering