System 54: Hierarchical Task Decomposition#
Overview#
The Hierarchical Task Decomposition System is Marcusβs AI-powered engine for intelligently breaking down large, complex tasks into smaller, manageable subtasks with clear interfaces, dependencies, and shared conventions. This system enables autonomous agents to work on well-defined components while maintaining cohesion across the overall feature.
Status: β Production (95% complete) Category: Project Management Dependencies: 07-AI Intelligence Engine, 23-Task Management Intelligence, 04-Kanban Integration, 03-Context & Dependency System
What This System Does#
The Hierarchical Task Decomposition System provides:
Intelligent Decomposition - AI-powered analysis to break tasks into optimal subtasks
Subtask Management - Tracking subtask-parent relationships and state
Dependency Resolution - Managing dependencies between subtasks
Interface Contracts - Clear provides/requires definitions for each subtask
Shared Conventions - Enforcing consistency across subtask implementations
Progress Aggregation - Rolling up subtask completion to parent tasks
Automatic Integration - Final validation subtask for cohesion testing
Context Enrichment - Enhanced context for agents working on subtasks
Architecture#
Component Structure#
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Hierarchical Task Decomposition System β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββββββββββββ ββββββββββββββββββββββββββββββββββ β
β β Decomposer β β SubtaskManager β β
β β (decomposer.py) β β (subtask_manager.py) β β
β β β β β β
β β β’ should_decompose() β β β’ Subtask dataclass β β
β β β’ decompose_task() β β β’ SubtaskMetadata β β
β β β’ AI prompt gen β β β’ add_subtasks() β β
β β β’ Integration auto β β β’ get_subtasks() β β
β β β’ Convention builder β β β’ update_subtask_status() β β
β β β β β’ can_assign_subtask() β β
β β β β β’ get_completion_percentage() β β
β β β β β’ _save_state() β β
β β β β β’ _load_state() β β
β ββββββββββββββββββββββββ ββββββββββββββββββββββββββββββββββ β
β β β β
β ββββββββββββββββ¬ββββββββββββββββ β
β β β
β βββββββββββββββββββββββββΌββββββββββββββββββββββββββββββββββββ β
β β Subtask Assignment β β β
β β (subtask_assignment.py) β β
β β β β β
β β β’ find_next_available_subtask() β’ convert_subtask_to_task() β
β β β’ check_and_complete_parent_task() β’ update_subtask_progress_in_parent() β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βββββββββββββββββββββββββΌββββββββββββββββββββββββββββββββββββ β
β β Integration Layer β β β
β β (task_assignment_integration.py, tools/context.py) β β
β β β β β
β β β’ find_optimal_task_with_subtasks() β’ Enhanced get_task_context() β
β β β’ Subtask prioritization β’ Dependency artifact context β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Data Flow#
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Decomposition Flow β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Create Task β
β β β
β βΌ β
β should_decompose(task) β
β β β
β ββββ΄ββββ β
β β Yes β No β Regular task workflow β
β βΌ β
β decompose_task(task, ai_engine, project_context) β
β β β
β βββ AI analyzes task description β
β βββ Identifies components and interfaces β
β βββ Determines dependencies between components β
β βββ Estimates time per component β
β βββ Generates shared conventions β
β βββ Adds automatic integration subtask β
β β β
β βΌ β
β Returns: {success, subtasks, shared_conventions} β
β β β
β βΌ β
β SubtaskManager.add_subtasks(parent_id, subtasks, metadata) β
β β β
β βββ Stores subtask-parent relationships β
β βββ Persists to data/marcus_state/subtasks.json β
β βββ Ready for assignment β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Assignment Flow β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Agent: request_next_task(agent_id) β
β β β
β βΌ β
β find_optimal_task_with_subtasks(agent_id, state) β
β β β
β βΌ β
β find_next_available_subtask(subtask_manager, agent_id, tasks) β
β β β
β βββ Get all subtasks across all parent tasks β
β βββ Filter to TODO status β
β βββ Check dependencies satisfied β
β βββ Check not already assigned β
β βββ Sort by parent priority + subtask order β
β β β
β ββββ΄ββββ β
β βFound?β β
β βΌ βΌ β
β Yes No β Regular task assignment β
β β β
β βΌ β
β convert_subtask_to_task(subtask, parent_task) β
β β β
β βββ Create Task object from Subtask β
β βββ Add enhanced context β
β β ββ is_subtask flag β
β β ββ parent_task info β
β β ββ shared_conventions β
β β ββ dependency_artifacts β
β β ββ sibling_subtasks β
β β β
β βΌ β
β Return task to agent with enriched context β
β β β
β βΌ β
β Agent works on subtask β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Completion Flow β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Agent: report_task_progress(task_id, status="completed") β
β β β
β βΌ β
β Is task_id a subtask? β
β β β
β ββββ΄ββββ β
β β Yes β No β Regular completion β
β βΌ β
β SubtaskManager.update_subtask_status(task_id, DONE, agent_id) β
β β β
β βΌ β
β update_subtask_progress_in_parent(parent_id, task_id, ...) β
β β β
β βββ Calculate parent completion percentage β
β βββ Update parent progress on Kanban board β
β βββ Add checklist comment on parent β
β β β
β βΌ β
β check_and_complete_parent_task(parent_id, ...) β
β β β
β βββ Check if all subtasks are DONE β
β β β
β βββ If yes: β
β βββ Move parent task to DONE on Kanban β
β βββ Add completion summary comment β
β βββ Trigger dependent tasks β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Core Components#
1. Decomposer (src/marcus_mcp/coordinator/decomposer.py)#
Purpose: Decides when to decompose tasks and performs AI-powered breakdown
Key Functions#
should_decompose(task: Task, project_complexity: Optional[str] = None) -> bool
def should_decompose(task: Task, project_complexity: Optional[str] = None) -> bool:
"""
Decide whether a task should be decomposed into subtasks.
Uses heuristics based on project complexity mode:
- prototype: No decomposition (speed priority)
- standard: Balanced decomposition (>= 0.2 hours, 3+ indicators)
- enterprise: Aggressive decomposition (>= 0.1 hours, 2+ indicators,
force decompose "Implement" tasks)
Decomposition Thresholds by Mode:
- Prototype: Never decompose
- Standard (default): >= 0.2 hours (12 minutes), 3+ component indicators
- Enterprise: >= 0.1 hours (6 minutes), 2+ component indicators
Parameters
----------
task : Task
The task to evaluate
project_complexity : Optional[str], default=None
Project complexity mode: "prototype", "standard", or "enterprise"
Returns
-------
bool
True if task should be decomposed
"""
Heuristics by Complexity Mode:
Prototype Mode:
β Never decompose (speed over granularity)
Standard Mode (default):
β Size:
estimated_hours >= 0.2(12 minutes)β Complexity: Multiple indicators (3+) in description (api, database, model, ui, etc.)
β Type: Skip bugfix, hotfix, refactor, documentation, deployment, design tasks
Enterprise Mode:
β Size:
estimated_hours >= 0.1(6 minutes)β Complexity: Multiple indicators (2+) in description
β Force decompose: All βImplementβ tasks
β Type: Skip bugfix, hotfix, refactor, documentation, deployment, design tasks
decompose_task(task, ai_engine, project_context) -> Dict
async def decompose_task(
task: Task,
ai_engine: Any,
project_context: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Decompose a task into subtasks using AI.
Process:
1. Build context-rich prompt with task details and project info
2. Call AI engine for intelligent breakdown
3. Parse AI response into structured subtasks
4. Add automatic integration subtask
5. Generate shared conventions
6. Validate subtask structure
Parameters
----------
task : Task
The parent task to decompose
ai_engine : Any
AI intelligence engine for analysis
project_context : Optional[Dict[str, Any]]
Additional context (labels, existing tasks, tech stack)
Returns
-------
Dict[str, Any]
{
"success": bool,
"subtasks": List[Dict],
"shared_conventions": Dict,
"error": Optional[str]
}
"""
AI Prompt Structure:
prompt = f"""
Analyze this task and break it down into subtasks:
TASK INFORMATION:
Name: {task.name}
Description: {task.description}
Estimated Hours: {task.estimated_hours}
Labels: {task.labels}
Priority: {task.priority}
PROJECT CONTEXT:
{json.dumps(project_context, indent=2)}
REQUIREMENTS:
1. Identify distinct components (API, Database, UI, etc.)
2. Define clear interfaces (provides/requires) for each subtask
3. Estimate realistic time per subtask (1-3 hours ideal)
4. Determine dependencies (which subtasks need what)
5. Identify file artifacts each subtask will produce
6. Suggest shared conventions (file structure, naming, formats)
OUTPUT FORMAT (JSON):
{{
"subtasks": [
{{
"name": "Component name",
"description": "What needs to be done",
"estimated_hours": 2.0,
"dependencies": ["subtask_id_1"],
"file_artifacts": ["path/to/file.py"],
"provides": "Clear interface this provides",
"requires": "What this needs from dependencies",
"order": 1
}}
],
"shared_conventions": {{
"base_path": "src/feature/",
"response_format": {{}},
"naming_convention": "snake_case"
}}
}}
"""
Automatic Integration Subtask:
integration_subtask = {
"id": f"{parent_task.id}_sub_integration",
"name": "Integration testing and validation",
"description": """
Verify all components work together:
- Run end-to-end workflow tests
- Validate all interfaces function correctly
- Check error handling across components
- Create consolidated documentation
- Verify all file artifacts exist and are correct
""",
"estimated_hours": 1.0,
"dependencies": [all_other_subtask_ids],
"file_artifacts": [
"tests/integration/test_{feature}.py",
"docs/integration_report.md"
],
"provides": "Validated, integrated solution",
"requires": "All components complete",
"order": 99 # Always last
}
2. SubtaskManager (src/marcus_mcp/coordinator/subtask_manager.py)#
Purpose: Tracks subtask-parent relationships and manages state
Data Models#
Subtask Dataclass:
@dataclass
class Subtask:
"""Represents a subtask decomposed from a parent task."""
id: str
parent_task_id: str
name: str
description: str
status: TaskStatus
priority: Priority
assigned_to: Optional[str]
created_at: datetime
estimated_hours: float
dependencies: List[str] = field(default_factory=list)
dependency_types: List[str] = field(default_factory=list)
file_artifacts: List[str] = field(default_factory=list)
provides: Optional[str] = None
requires: Optional[str] = None
order: int = 0 # Execution order within parent
Field Details:
dependency_types: Type of each dependency - βhardβ (blocks start) or βsoftβ (can use mock/contract)Must match length of dependencies list
Enables parallel work by using contracts from Design phase
Empty array if no dependencies
SubtaskMetadata Dataclass:
@dataclass
class SubtaskMetadata:
"""Metadata about decomposed tasks."""
shared_conventions: Dict[str, Any] = field(default_factory=dict)
decomposed_at: datetime = field(default_factory=datetime.now)
decomposed_by: str = "ai" # or "manual"
Key Methods#
add_subtasks(parent_task_id, subtasks, project_tasks, metadata)
def add_subtasks(
self,
parent_task_id: str,
subtasks: List[Dict[str, Any]],
project_tasks: Optional[List[Task]] = None,
metadata: Optional[SubtaskMetadata] = None,
) -> List[Task]:
"""
Add subtasks for a parent task to unified project_tasks storage.
Creates Task objects with is_subtask=True and appends them to
project_tasks list for unified dependency graph.
Parameters
----------
parent_task_id : str
ID of the parent task
subtasks : List[Dict[str, Any]]
List of subtask dictionaries with fields:
- name, description, estimated_hours, dependencies, dependency_types, etc.
project_tasks : Optional[List[Task]]
Unified task storage (server.project_tasks). If None, uses legacy mode.
metadata : Optional[SubtaskMetadata]
Shared metadata (conventions, decomposition info). Now optional.
Returns
-------
List[Task]
Created Task objects (with is_subtask=True)
"""
get_subtasks(parent_task_id) -> List[Subtask]
def get_subtasks(self, parent_task_id: str) -> List[Subtask]:
"""
Get all subtasks for a parent task.
Returns sorted by order field.
"""
can_assign_subtask(subtask_id, project_tasks) -> bool
def can_assign_subtask(
self,
subtask_id: str,
project_tasks: List[Task]
) -> bool:
"""
Check if a subtask can be assigned.
Checks:
- Subtask exists
- Status is TODO
- Not already assigned
- All dependencies satisfied (DONE status)
Parameters
----------
subtask_id : str
ID of subtask to check
project_tasks : List[Task]
Current project tasks for dependency checking
Returns
-------
bool
True if subtask can be assigned
"""
update_subtask_status(subtask_id, status, assigned_to)
def update_subtask_status(
self,
subtask_id: str,
new_status: TaskStatus,
assigned_to: Optional[str] = None
) -> None:
"""
Update subtask status and optionally assignment.
Automatically persists to JSON.
"""
get_completion_percentage(parent_task_id) -> int
def get_completion_percentage(self, parent_task_id: str) -> int:
"""
Calculate completion percentage for parent task.
Returns
-------
int
Percentage (0-100) based on completed subtasks
"""
Persistence:
def _save_state(self) -> None:
"""Save subtasks to data/marcus_state/subtasks.json"""
def _load_state(self) -> None:
"""Load subtasks from data/marcus_state/subtasks.json"""
3. Subtask Assignment (src/marcus_mcp/coordinator/subtask_assignment.py)#
Purpose: Helper functions for finding and assigning subtasks
find_next_available_subtask(subtask_manager, agent_id, project_tasks)
def find_next_available_subtask(
subtask_manager: SubtaskManager,
agent_id: str,
project_tasks: List[Task]
) -> Optional[Subtask]:
"""
Find the highest priority available subtask for an agent.
Process:
1. Get all subtasks across all parent tasks
2. Filter to TODO status
3. Filter to satisfied dependencies
4. Filter to not already assigned
5. Sort by parent task priority, then subtask order
6. Return first match
Parameters
----------
subtask_manager : SubtaskManager
Manager with all subtask state
agent_id : str
Agent requesting work
project_tasks : List[Task]
Current project tasks for dependency validation
Returns
-------
Optional[Subtask]
Available subtask or None
"""
convert_subtask_to_task(subtask, parent_task) -> Task
def convert_subtask_to_task(
subtask: Subtask,
parent_task: Task
) -> Task:
"""
Convert Subtask to Task object for assignment.
Enriches with:
- is_subtask flag in description
- Parent task information
- Shared conventions
- Dependency artifact context
- Sibling subtask information
Parameters
----------
subtask : Subtask
The subtask to convert
parent_task : Task
The parent Task object for context
Returns
-------
Task
Task object ready for agent assignment
"""
update_subtask_progress_in_parent(parent_task_id, subtask_id, manager, kanban)
async def update_subtask_progress_in_parent(
parent_task_id: str,
completed_subtask_id: str,
subtask_manager: SubtaskManager,
kanban_client: Any
) -> None:
"""
Update parent task progress when subtask completes.
Updates:
- Parent task progress percentage
- Kanban board checklist
- Parent task comments with completion info
Parameters
----------
parent_task_id : str
Parent task to update
completed_subtask_id : str
Subtask that just completed
subtask_manager : SubtaskManager
Manager with all subtask state
kanban_client : Any
Kanban integration for board updates
"""
check_and_complete_parent_task(parent_task_id, manager, kanban)
async def check_and_complete_parent_task(
parent_task_id: str,
subtask_manager: SubtaskManager,
kanban_client: Any
) -> None:
"""
Check if parent should be completed and do so if ready.
Completes parent when:
- All subtasks have DONE status
- Parent task exists
- Parent not already complete
Actions on completion:
- Move parent to DONE on Kanban
- Add completion summary comment
- Trigger dependent tasks
Parameters
----------
parent_task_id : str
Parent task to check
subtask_manager : SubtaskManager
Manager with all subtask state
kanban_client : Any
Kanban integration for updates
"""
4. Integration Layer#
Task Assignment Integration (src/marcus_mcp/coordinator/task_assignment_integration.py):
async def find_optimal_task_with_subtasks(
agent_id: str,
state: Any,
fallback_task_finder: Callable
) -> Optional[Task]:
"""
Find optimal task, prioritizing subtasks over regular tasks.
Workflow:
1. Check if subtask_manager exists
2. If yes, look for available subtasks
3. If subtask found, convert and return
4. If no subtask, use fallback_task_finder for regular tasks
This is a wrapper that integrates subtask logic without
modifying existing task assignment code.
"""
Context Enhancement (src/marcus_mcp/tools/context.py):
async def get_task_context(task_id: str, state: Any) -> Dict[str, Any]:
"""
Get enhanced context for a task, including subtask information.
For subtasks, includes:
- is_subtask flag
- subtask_info (name, provides, requires, file_artifacts)
- parent_task details
- shared_conventions
- dependency_artifacts (completed subtasks this can use)
- sibling_subtasks (other subtasks in same decomposition)
For regular tasks, returns standard context.
"""
Integration Points#
With Marcus Server (src/marcus_mcp/server.py)#
class MarcusServer:
def __init__(self):
# ... existing initialization ...
# Add SubtaskManager
self.subtask_manager = SubtaskManager()
# Load existing subtasks from persistence
self.subtask_manager._load_state()
With Task Assignment (src/marcus_mcp/tools/task.py)#
async def find_optimal_task_for_agent(
agent_id: str,
state: Any
) -> Optional[Task]:
"""
Find optimal task for agent, checking subtasks first.
"""
# Use integrated finder
return await find_optimal_task_with_subtasks(
agent_id,
state,
fallback_task_finder=_find_optimal_task_original_logic
)
With Progress Reporting (src/marcus_mcp/tools/task.py)#
async def report_task_progress(
agent_id: str,
task_id: str,
status: str,
progress: int,
message: str,
state: Any
) -> Dict[str, Any]:
"""
Report task progress, handling subtask completion.
"""
# ... existing progress logic ...
if status == "completed":
# Check if this is a subtask
if (hasattr(state, "subtask_manager") and
state.subtask_manager and
task_id in state.subtask_manager.subtasks):
# Update subtask status
state.subtask_manager.update_subtask_status(
task_id, TaskStatus.DONE, agent_id
)
# Get parent ID
subtask = state.subtask_manager.subtasks[task_id]
parent_task_id = subtask.parent_task_id
# Update parent progress
await update_subtask_progress_in_parent(
parent_task_id,
task_id,
state.subtask_manager,
state.kanban_client
)
# Check for parent completion
await check_and_complete_parent_task(
parent_task_id,
state.subtask_manager,
state.kanban_client
)
# ... rest of existing logic ...
Data Persistence#
Storage Location#
data/marcus_state/subtasks.json
Data Structure#
{
"subtasks": {
"task_123_sub_1": {
"id": "task_123_sub_1",
"parent_task_id": "task_123",
"name": "Create User model",
"description": "...",
"status": "done",
"priority": "high",
"assigned_to": "dev-001",
"created_at": "2025-10-05T14:30:00",
"estimated_hours": 2.0,
"dependencies": [],
"file_artifacts": ["src/models/user.py"],
"provides": "User model with email validation",
"requires": null,
"order": 1
}
},
"parent_subtasks": {
"task_123": ["task_123_sub_1", "task_123_sub_2", "task_123_sub_integration"]
},
"metadata": {
"task_123": {
"shared_conventions": {
"base_path": "src/api/auth/",
"response_format": {...}
},
"decomposed_at": "2025-10-05T14:25:00",
"decomposed_by": "ai"
}
}
}
Performance Considerations#
Caching Strategy#
Subtask state cached in memory
Periodic persistence to JSON (on updates)
Load from JSON on server startup
Optimization Techniques#
# Index subtasks by parent for O(1) lookup
self.parent_subtasks: Dict[str, List[str]] = {}
# Cache available subtasks
self._available_cache = {}
self._cache_ttl = 60 # seconds
# Batch Kanban updates
pending_updates = []
# ... collect updates ...
await kanban_client.batch_update(pending_updates)
Scalability#
Current: Handles 100s of subtasks per project efficiently
Limits: In-memory storage suitable for < 10,000 subtasks
Future: Consider database backend for larger scale
Testing#
Unit Tests (tests/unit/coordinator/)#
test_decomposer.py (852 lines, 42 tests):
Test
should_decompose()heuristicsTest AI prompt generation
Test integration subtask auto-generation
Test shared conventions extraction
Test decomposition validation
test_subtask_manager.py (514 lines, 22 tests):
Test subtask CRUD operations
Test dependency resolution
Test status updates
Test completion percentage calculations
Test persistence (save/load JSON)
Test parent-child relationship management
Integration Tests (tests/integration/e2e/)#
test_task_decomposition_e2e.py (366 lines, 6 tests):
test_complete_decomposition_workflow- Full end-to-end decomposition β assignment β completiontest_subtask_to_task_conversion- Subtask conversion with contexttest_parent_auto_completion- Parent auto-completes when all subtasks donetest_subtask_progress_updates_parent- Parent progress reflects subtask completiontest_subtask_assignment_respects_already_assigned- No double-assignmenttest_subtask_manager_persistence- State persists across restarts
Coverage: 95% of decomposition system code paths
Example Demo (examples/task_decomposition_demo.py)#
Demonstrates:
Creating a parent task
Automatic decomposition with AI
Subtask assignment workflow
Progress tracking
Parent auto-completion
python examples/task_decomposition_demo.py
Usage Examples#
Basic Decomposition#
from src.marcus_mcp.coordinator import should_decompose, decompose_task, SubtaskManager, SubtaskMetadata
# Check if task should be decomposed
task = await create_task(
name="Build user authentication system",
description="Complete auth with login, registration, sessions, and password reset",
estimated_hours=8.0
)
if should_decompose(task, project_complexity="standard"):
# Decompose using AI
decomposition = await decompose_task(
task,
state.ai_engine,
project_context={
"labels": task.labels,
"existing_tasks": [t.name for t in state.project_tasks],
}
)
if decomposition["success"]:
# Store subtasks
metadata = SubtaskMetadata(
shared_conventions=decomposition["shared_conventions"],
decomposed_by="ai"
)
# Add subtasks to unified storage
created_tasks = state.subtask_manager.add_subtasks(
task.id,
decomposition["subtasks"],
project_tasks=state.project_tasks,
metadata=metadata
)
# created_tasks now contains List[Task] with is_subtask=True
Querying Subtasks#
# Get all subtasks for a parent
subtasks = state.subtask_manager.get_subtasks("task_123")
# Check parent completion
progress = state.subtask_manager.get_completion_percentage("task_123")
print(f"Feature {progress}% complete")
# Find available subtask for agent
available = find_next_available_subtask(
state.subtask_manager,
"dev-001",
state.project_tasks
)
if available:
print(f"Assign: {available.name}")
Monitoring Progress#
# Get subtask status breakdown
subtasks = state.subtask_manager.get_subtasks("task_123")
completed = [s for s in subtasks if s.status == TaskStatus.DONE]
in_progress = [s for s in subtasks if s.status == TaskStatus.IN_PROGRESS]
todo = [s for s in subtasks if s.status == TaskStatus.TODO]
print(f"Completed: {len(completed)}/{len(subtasks)}")
print(f"In Progress: {len(in_progress)}")
print(f"Todo: {len(todo)}")
# Check if ready for parent completion
all_done = all(s.status == TaskStatus.DONE for s in subtasks)
if all_done:
print("Ready to auto-complete parent task")
Error Handling#
Decomposition Failures#
decomposition = await decompose_task(task, ai_engine, context)
if not decomposition["success"]:
logger.error(f"Decomposition failed: {decomposition.get('error')}")
# Fall back to treating as regular task
return task
Missing Dependencies#
if not state.subtask_manager.can_assign_subtask(subtask_id, project_tasks):
missing = [
dep for dep in subtask.dependencies
if state.subtask_manager.subtasks[dep].status != TaskStatus.DONE
]
logger.warning(f"Cannot assign {subtask.name} - waiting for {missing}")
Persistence Failures#
try:
state.subtask_manager._save_state()
except Exception as e:
logger.error(f"Failed to persist subtasks: {e}")
# Continue operation - in-memory state still valid
Future Enhancements#
Planned Improvements (5% remaining)#
Real-Time Board Checklists
Create Kanban checklist items for each subtask
Update checklist as subtasks complete
Visual progress on parent task card
Subtask Reassignment
Automatically reassign subtasks if agent goes offline
Lease-based subtask assignments
Stale subtask detection and recovery
Learning from Decompositions
Track decomposition effectiveness
Learn common patterns for task types
Improve AI prompts based on successful decompositions
Manual Subtask Creation
Allow users to manually define subtasks
UI for decomposition editing
Merge AI and manual decompositions
Research Opportunities#
Optimal Subtask Size: What subtask duration maximizes agent efficiency?
Decomposition Patterns: What decomposition structures work best for different project types?
Integration Effectiveness: Does automatic integration subtask catch real issues?
Agent Preferences: Do agents perform better on certain subtask types?
References#
Integration Guide:
docs/task-decomposition-integration.mdCore Implementation:
src/marcus_mcp/coordinator/Tests:
tests/unit/coordinator/,tests/integration/e2e/test_task_decomposition_e2e.pyExample:
examples/task_decomposition_demo.pyPersistence:
data/marcus_state/subtasks.json
Implementation Status: 95% Complete
β Core decomposition logic
β Subtask management
β Assignment integration
β Progress tracking
β Parent auto-completion
β Persistence layer
β Unit tests (64 tests, 100% pass)
β Integration tests (6 tests, 100% pass)
β³ Real-time board checklists (5% remaining)
The Hierarchical Task Decomposition System enables Marcus to intelligently break down complex features into manageable subtasks with clear interfaces, enabling autonomous agents to collaborate effectively on large-scale software projects.