System 54: Hierarchical Task Decomposition#

Overview#

The Hierarchical Task Decomposition System is Marcus’s AI-powered engine for intelligently breaking down large, complex tasks into smaller, manageable subtasks with clear interfaces, dependencies, and shared conventions. This system enables autonomous agents to work on well-defined components while maintaining cohesion across the overall feature.

Status: βœ… Production (95% complete) Category: Project Management Dependencies: 07-AI Intelligence Engine, 23-Task Management Intelligence, 04-Kanban Integration, 03-Context & Dependency System

What This System Does#

The Hierarchical Task Decomposition System provides:

  1. Intelligent Decomposition - AI-powered analysis to break tasks into optimal subtasks

  2. Subtask Management - Tracking subtask-parent relationships and state

  3. Dependency Resolution - Managing dependencies between subtasks

  4. Interface Contracts - Clear provides/requires definitions for each subtask

  5. Shared Conventions - Enforcing consistency across subtask implementations

  6. Progress Aggregation - Rolling up subtask completion to parent tasks

  7. Automatic Integration - Final validation subtask for cohesion testing

  8. Context Enrichment - Enhanced context for agents working on subtasks

Architecture#

Component Structure#

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚         Hierarchical Task Decomposition System                   β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                  β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚  Decomposer          β”‚  β”‚  SubtaskManager                β”‚  β”‚
β”‚  β”‚  (decomposer.py)     β”‚  β”‚  (subtask_manager.py)          β”‚  β”‚
β”‚  β”‚                      β”‚  β”‚                                β”‚  β”‚
β”‚  β”‚ β€’ should_decompose() β”‚  β”‚ β€’ Subtask dataclass           β”‚  β”‚
β”‚  β”‚ β€’ decompose_task()   β”‚  β”‚ β€’ SubtaskMetadata             β”‚  β”‚
β”‚  β”‚ β€’ AI prompt gen      β”‚  β”‚ β€’ add_subtasks()              β”‚  β”‚
β”‚  β”‚ β€’ Integration auto   β”‚  β”‚ β€’ get_subtasks()              β”‚  β”‚
β”‚  β”‚ β€’ Convention builder β”‚  β”‚ β€’ update_subtask_status()     β”‚  β”‚
β”‚  β”‚                      β”‚  β”‚ β€’ can_assign_subtask()        β”‚  β”‚
β”‚  β”‚                      β”‚  β”‚ β€’ get_completion_percentage() β”‚  β”‚
β”‚  β”‚                      β”‚  β”‚ β€’ _save_state()               β”‚  β”‚
β”‚  β”‚                      β”‚  β”‚ β€’ _load_state()               β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚           β”‚                              β”‚                      β”‚
β”‚           β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                      β”‚
β”‚                          β”‚                                      β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚  β”‚  Subtask Assignment   β”‚                                   β”‚ β”‚
β”‚  β”‚  (subtask_assignment.py)                                  β”‚ β”‚
β”‚  β”‚                       β”‚                                   β”‚ β”‚
β”‚  β”‚ β€’ find_next_available_subtask()   β€’ convert_subtask_to_task() β”‚
β”‚  β”‚ β€’ check_and_complete_parent_task() β€’ update_subtask_progress_in_parent() β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚                          β”‚                                      β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚  β”‚  Integration Layer    β”‚                                   β”‚ β”‚
β”‚  β”‚  (task_assignment_integration.py, tools/context.py)       β”‚ β”‚
β”‚  β”‚                       β”‚                                   β”‚ β”‚
β”‚  β”‚ β€’ find_optimal_task_with_subtasks() β€’ Enhanced get_task_context() β”‚
β”‚  β”‚ β€’ Subtask prioritization            β€’ Dependency artifact context β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚                                                                  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Data Flow#

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Decomposition Flow                            β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                  β”‚
β”‚  Create Task                                                     β”‚
β”‚       β”‚                                                          β”‚
β”‚       β–Ό                                                          β”‚
β”‚  should_decompose(task)                                          β”‚
β”‚       β”‚                                                          β”‚
β”‚    β”Œβ”€β”€β”΄β”€β”€β”€β”                                                      β”‚
β”‚    β”‚ Yes  β”‚ No β†’ Regular task workflow                           β”‚
β”‚    β–Ό                                                             β”‚
β”‚  decompose_task(task, ai_engine, project_context)                β”‚
β”‚       β”‚                                                          β”‚
β”‚       β”œβ”€β†’ AI analyzes task description                           β”‚
β”‚       β”œβ”€β†’ Identifies components and interfaces                   β”‚
β”‚       β”œβ”€β†’ Determines dependencies between components             β”‚
β”‚       β”œβ”€β†’ Estimates time per component                           β”‚
β”‚       β”œβ”€β†’ Generates shared conventions                           β”‚
β”‚       └─→ Adds automatic integration subtask                     β”‚
β”‚       β”‚                                                          β”‚
β”‚       β–Ό                                                          β”‚
β”‚  Returns: {success, subtasks, shared_conventions}                β”‚
β”‚       β”‚                                                          β”‚
β”‚       β–Ό                                                          β”‚
β”‚  SubtaskManager.add_subtasks(parent_id, subtasks, metadata)      β”‚
β”‚       β”‚                                                          β”‚
β”‚       β”œβ”€β†’ Stores subtask-parent relationships                    β”‚
β”‚       β”œβ”€β†’ Persists to data/marcus_state/subtasks.json            β”‚
β”‚       └─→ Ready for assignment                                   β”‚
β”‚                                                                  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Assignment Flow                               β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                  β”‚
β”‚  Agent: request_next_task(agent_id)                              β”‚
β”‚       β”‚                                                          β”‚
β”‚       β–Ό                                                          β”‚
β”‚  find_optimal_task_with_subtasks(agent_id, state)                β”‚
β”‚       β”‚                                                          β”‚
β”‚       β–Ό                                                          β”‚
β”‚  find_next_available_subtask(subtask_manager, agent_id, tasks)   β”‚
β”‚       β”‚                                                          β”‚
β”‚       β”œβ”€β†’ Get all subtasks across all parent tasks               β”‚
β”‚       β”œβ”€β†’ Filter to TODO status                                  β”‚
β”‚       β”œβ”€β†’ Check dependencies satisfied                           β”‚
β”‚       β”œβ”€β†’ Check not already assigned                             β”‚
β”‚       β”œβ”€β†’ Sort by parent priority + subtask order                β”‚
β”‚       β”‚                                                          β”‚
β”‚    β”Œβ”€β”€β”΄β”€β”€β”€β”                                                      β”‚
β”‚    β”‚Found?β”‚                                                      β”‚
β”‚    β–Ό      β–Ό                                                      β”‚
β”‚   Yes    No β†’ Regular task assignment                            β”‚
β”‚    β”‚                                                             β”‚
β”‚    β–Ό                                                             β”‚
β”‚  convert_subtask_to_task(subtask, parent_task)                   β”‚
β”‚    β”‚                                                             β”‚
β”‚    β”œβ”€β†’ Create Task object from Subtask                           β”‚
β”‚    β”œβ”€β†’ Add enhanced context                                      β”‚
β”‚    β”‚   β”œβ”€ is_subtask flag                                        β”‚
β”‚    β”‚   β”œβ”€ parent_task info                                       β”‚
β”‚    β”‚   β”œβ”€ shared_conventions                                     β”‚
β”‚    β”‚   β”œβ”€ dependency_artifacts                                   β”‚
β”‚    β”‚   └─ sibling_subtasks                                       β”‚
β”‚    β”‚                                                             β”‚
β”‚    β–Ό                                                             β”‚
β”‚  Return task to agent with enriched context                      β”‚
β”‚    β”‚                                                             β”‚
β”‚    β–Ό                                                             β”‚
β”‚  Agent works on subtask                                          β”‚
β”‚                                                                  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                   Completion Flow                                β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                  β”‚
β”‚  Agent: report_task_progress(task_id, status="completed")        β”‚
β”‚       β”‚                                                          β”‚
β”‚       β–Ό                                                          β”‚
β”‚  Is task_id a subtask?                                           β”‚
β”‚       β”‚                                                          β”‚
β”‚    β”Œβ”€β”€β”΄β”€β”€β”€β”                                                      β”‚
β”‚    β”‚ Yes  β”‚ No β†’ Regular completion                              β”‚
β”‚    β–Ό                                                             β”‚
β”‚  SubtaskManager.update_subtask_status(task_id, DONE, agent_id)   β”‚
β”‚    β”‚                                                             β”‚
β”‚    β–Ό                                                             β”‚
β”‚  update_subtask_progress_in_parent(parent_id, task_id, ...)      β”‚
β”‚    β”‚                                                             β”‚
β”‚    β”œβ”€β†’ Calculate parent completion percentage                    β”‚
β”‚    β”œβ”€β†’ Update parent progress on Kanban board                    β”‚
β”‚    └─→ Add checklist comment on parent                           β”‚
β”‚    β”‚                                                             β”‚
β”‚    β–Ό                                                             β”‚
β”‚  check_and_complete_parent_task(parent_id, ...)                  β”‚
β”‚    β”‚                                                             β”‚
β”‚    β”œβ”€β†’ Check if all subtasks are DONE                            β”‚
β”‚    β”‚                                                             β”‚
β”‚    └─→ If yes:                                                   β”‚
β”‚        β”œβ”€β†’ Move parent task to DONE on Kanban                    β”‚
β”‚        β”œβ”€β†’ Add completion summary comment                        β”‚
β”‚        └─→ Trigger dependent tasks                               β”‚
β”‚                                                                  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Core Components#

1. Decomposer (src/marcus_mcp/coordinator/decomposer.py)#

Purpose: Decides when to decompose tasks and performs AI-powered breakdown

Key Functions#

should_decompose(task: Task, project_complexity: Optional[str] = None) -> bool

def should_decompose(task: Task, project_complexity: Optional[str] = None) -> bool:
    """
    Decide whether a task should be decomposed into subtasks.

    Uses heuristics based on project complexity mode:
    - prototype: No decomposition (speed priority)
    - standard: Balanced decomposition (>= 0.2 hours, 3+ indicators)
    - enterprise: Aggressive decomposition (>= 0.1 hours, 2+ indicators,
                  force decompose "Implement" tasks)

    Decomposition Thresholds by Mode:
    - Prototype: Never decompose
    - Standard (default): >= 0.2 hours (12 minutes), 3+ component indicators
    - Enterprise: >= 0.1 hours (6 minutes), 2+ component indicators

    Parameters
    ----------
    task : Task
        The task to evaluate
    project_complexity : Optional[str], default=None
        Project complexity mode: "prototype", "standard", or "enterprise"

    Returns
    -------
    bool
        True if task should be decomposed
    """

Heuristics by Complexity Mode:

Prototype Mode:

  • ❌ Never decompose (speed over granularity)

Standard Mode (default):

  • βœ… Size: estimated_hours >= 0.2 (12 minutes)

  • βœ… Complexity: Multiple indicators (3+) in description (api, database, model, ui, etc.)

  • ❌ Type: Skip bugfix, hotfix, refactor, documentation, deployment, design tasks

Enterprise Mode:

  • βœ… Size: estimated_hours >= 0.1 (6 minutes)

  • βœ… Complexity: Multiple indicators (2+) in description

  • βœ… Force decompose: All β€œImplement” tasks

  • ❌ Type: Skip bugfix, hotfix, refactor, documentation, deployment, design tasks

decompose_task(task, ai_engine, project_context) -> Dict

async def decompose_task(
    task: Task,
    ai_engine: Any,
    project_context: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
    """
    Decompose a task into subtasks using AI.

    Process:
    1. Build context-rich prompt with task details and project info
    2. Call AI engine for intelligent breakdown
    3. Parse AI response into structured subtasks
    4. Add automatic integration subtask
    5. Generate shared conventions
    6. Validate subtask structure

    Parameters
    ----------
    task : Task
        The parent task to decompose
    ai_engine : Any
        AI intelligence engine for analysis
    project_context : Optional[Dict[str, Any]]
        Additional context (labels, existing tasks, tech stack)

    Returns
    -------
    Dict[str, Any]
        {
            "success": bool,
            "subtasks": List[Dict],
            "shared_conventions": Dict,
            "error": Optional[str]
        }
    """

AI Prompt Structure:

prompt = f"""
Analyze this task and break it down into subtasks:

TASK INFORMATION:
Name: {task.name}
Description: {task.description}
Estimated Hours: {task.estimated_hours}
Labels: {task.labels}
Priority: {task.priority}

PROJECT CONTEXT:
{json.dumps(project_context, indent=2)}

REQUIREMENTS:
1. Identify distinct components (API, Database, UI, etc.)
2. Define clear interfaces (provides/requires) for each subtask
3. Estimate realistic time per subtask (1-3 hours ideal)
4. Determine dependencies (which subtasks need what)
5. Identify file artifacts each subtask will produce
6. Suggest shared conventions (file structure, naming, formats)

OUTPUT FORMAT (JSON):
{{
    "subtasks": [
        {{
            "name": "Component name",
            "description": "What needs to be done",
            "estimated_hours": 2.0,
            "dependencies": ["subtask_id_1"],
            "file_artifacts": ["path/to/file.py"],
            "provides": "Clear interface this provides",
            "requires": "What this needs from dependencies",
            "order": 1
        }}
    ],
    "shared_conventions": {{
        "base_path": "src/feature/",
        "response_format": {{}},
        "naming_convention": "snake_case"
    }}
}}
"""

Automatic Integration Subtask:

integration_subtask = {
    "id": f"{parent_task.id}_sub_integration",
    "name": "Integration testing and validation",
    "description": """
    Verify all components work together:
    - Run end-to-end workflow tests
    - Validate all interfaces function correctly
    - Check error handling across components
    - Create consolidated documentation
    - Verify all file artifacts exist and are correct
    """,
    "estimated_hours": 1.0,
    "dependencies": [all_other_subtask_ids],
    "file_artifacts": [
        "tests/integration/test_{feature}.py",
        "docs/integration_report.md"
    ],
    "provides": "Validated, integrated solution",
    "requires": "All components complete",
    "order": 99  # Always last
}

2. SubtaskManager (src/marcus_mcp/coordinator/subtask_manager.py)#

Purpose: Tracks subtask-parent relationships and manages state

Data Models#

Subtask Dataclass:

@dataclass
class Subtask:
    """Represents a subtask decomposed from a parent task."""
    id: str
    parent_task_id: str
    name: str
    description: str
    status: TaskStatus
    priority: Priority
    assigned_to: Optional[str]
    created_at: datetime
    estimated_hours: float
    dependencies: List[str] = field(default_factory=list)
    dependency_types: List[str] = field(default_factory=list)
    file_artifacts: List[str] = field(default_factory=list)
    provides: Optional[str] = None
    requires: Optional[str] = None
    order: int = 0  # Execution order within parent

Field Details:

  • dependency_types: Type of each dependency - β€œhard” (blocks start) or β€œsoft” (can use mock/contract)

    • Must match length of dependencies list

    • Enables parallel work by using contracts from Design phase

    • Empty array if no dependencies

SubtaskMetadata Dataclass:

@dataclass
class SubtaskMetadata:
    """Metadata about decomposed tasks."""
    shared_conventions: Dict[str, Any] = field(default_factory=dict)
    decomposed_at: datetime = field(default_factory=datetime.now)
    decomposed_by: str = "ai"  # or "manual"

Key Methods#

add_subtasks(parent_task_id, subtasks, project_tasks, metadata)

def add_subtasks(
    self,
    parent_task_id: str,
    subtasks: List[Dict[str, Any]],
    project_tasks: Optional[List[Task]] = None,
    metadata: Optional[SubtaskMetadata] = None,
) -> List[Task]:
    """
    Add subtasks for a parent task to unified project_tasks storage.

    Creates Task objects with is_subtask=True and appends them to
    project_tasks list for unified dependency graph.

    Parameters
    ----------
    parent_task_id : str
        ID of the parent task
    subtasks : List[Dict[str, Any]]
        List of subtask dictionaries with fields:
        - name, description, estimated_hours, dependencies, dependency_types, etc.
    project_tasks : Optional[List[Task]]
        Unified task storage (server.project_tasks). If None, uses legacy mode.
    metadata : Optional[SubtaskMetadata]
        Shared metadata (conventions, decomposition info). Now optional.

    Returns
    -------
    List[Task]
        Created Task objects (with is_subtask=True)
    """

get_subtasks(parent_task_id) -> List[Subtask]

def get_subtasks(self, parent_task_id: str) -> List[Subtask]:
    """
    Get all subtasks for a parent task.

    Returns sorted by order field.
    """

can_assign_subtask(subtask_id, project_tasks) -> bool

def can_assign_subtask(
    self,
    subtask_id: str,
    project_tasks: List[Task]
) -> bool:
    """
    Check if a subtask can be assigned.

    Checks:
    - Subtask exists
    - Status is TODO
    - Not already assigned
    - All dependencies satisfied (DONE status)

    Parameters
    ----------
    subtask_id : str
        ID of subtask to check
    project_tasks : List[Task]
        Current project tasks for dependency checking

    Returns
    -------
    bool
        True if subtask can be assigned
    """

update_subtask_status(subtask_id, status, assigned_to)

def update_subtask_status(
    self,
    subtask_id: str,
    new_status: TaskStatus,
    assigned_to: Optional[str] = None
) -> None:
    """
    Update subtask status and optionally assignment.

    Automatically persists to JSON.
    """

get_completion_percentage(parent_task_id) -> int

def get_completion_percentage(self, parent_task_id: str) -> int:
    """
    Calculate completion percentage for parent task.

    Returns
    -------
    int
        Percentage (0-100) based on completed subtasks
    """

Persistence:

def _save_state(self) -> None:
    """Save subtasks to data/marcus_state/subtasks.json"""

def _load_state(self) -> None:
    """Load subtasks from data/marcus_state/subtasks.json"""

3. Subtask Assignment (src/marcus_mcp/coordinator/subtask_assignment.py)#

Purpose: Helper functions for finding and assigning subtasks

find_next_available_subtask(subtask_manager, agent_id, project_tasks)

def find_next_available_subtask(
    subtask_manager: SubtaskManager,
    agent_id: str,
    project_tasks: List[Task]
) -> Optional[Subtask]:
    """
    Find the highest priority available subtask for an agent.

    Process:
    1. Get all subtasks across all parent tasks
    2. Filter to TODO status
    3. Filter to satisfied dependencies
    4. Filter to not already assigned
    5. Sort by parent task priority, then subtask order
    6. Return first match

    Parameters
    ----------
    subtask_manager : SubtaskManager
        Manager with all subtask state
    agent_id : str
        Agent requesting work
    project_tasks : List[Task]
        Current project tasks for dependency validation

    Returns
    -------
    Optional[Subtask]
        Available subtask or None
    """

convert_subtask_to_task(subtask, parent_task) -> Task

def convert_subtask_to_task(
    subtask: Subtask,
    parent_task: Task
) -> Task:
    """
    Convert Subtask to Task object for assignment.

    Enriches with:
    - is_subtask flag in description
    - Parent task information
    - Shared conventions
    - Dependency artifact context
    - Sibling subtask information

    Parameters
    ----------
    subtask : Subtask
        The subtask to convert
    parent_task : Task
        The parent Task object for context

    Returns
    -------
    Task
        Task object ready for agent assignment
    """

update_subtask_progress_in_parent(parent_task_id, subtask_id, manager, kanban)

async def update_subtask_progress_in_parent(
    parent_task_id: str,
    completed_subtask_id: str,
    subtask_manager: SubtaskManager,
    kanban_client: Any
) -> None:
    """
    Update parent task progress when subtask completes.

    Updates:
    - Parent task progress percentage
    - Kanban board checklist
    - Parent task comments with completion info

    Parameters
    ----------
    parent_task_id : str
        Parent task to update
    completed_subtask_id : str
        Subtask that just completed
    subtask_manager : SubtaskManager
        Manager with all subtask state
    kanban_client : Any
        Kanban integration for board updates
    """

check_and_complete_parent_task(parent_task_id, manager, kanban)

async def check_and_complete_parent_task(
    parent_task_id: str,
    subtask_manager: SubtaskManager,
    kanban_client: Any
) -> None:
    """
    Check if parent should be completed and do so if ready.

    Completes parent when:
    - All subtasks have DONE status
    - Parent task exists
    - Parent not already complete

    Actions on completion:
    - Move parent to DONE on Kanban
    - Add completion summary comment
    - Trigger dependent tasks

    Parameters
    ----------
    parent_task_id : str
        Parent task to check
    subtask_manager : SubtaskManager
        Manager with all subtask state
    kanban_client : Any
        Kanban integration for updates
    """

4. Integration Layer#

Task Assignment Integration (src/marcus_mcp/coordinator/task_assignment_integration.py):

async def find_optimal_task_with_subtasks(
    agent_id: str,
    state: Any,
    fallback_task_finder: Callable
) -> Optional[Task]:
    """
    Find optimal task, prioritizing subtasks over regular tasks.

    Workflow:
    1. Check if subtask_manager exists
    2. If yes, look for available subtasks
    3. If subtask found, convert and return
    4. If no subtask, use fallback_task_finder for regular tasks

    This is a wrapper that integrates subtask logic without
    modifying existing task assignment code.
    """

Context Enhancement (src/marcus_mcp/tools/context.py):

async def get_task_context(task_id: str, state: Any) -> Dict[str, Any]:
    """
    Get enhanced context for a task, including subtask information.

    For subtasks, includes:
    - is_subtask flag
    - subtask_info (name, provides, requires, file_artifacts)
    - parent_task details
    - shared_conventions
    - dependency_artifacts (completed subtasks this can use)
    - sibling_subtasks (other subtasks in same decomposition)

    For regular tasks, returns standard context.
    """

Integration Points#

With Marcus Server (src/marcus_mcp/server.py)#

class MarcusServer:
    def __init__(self):
        # ... existing initialization ...

        # Add SubtaskManager
        self.subtask_manager = SubtaskManager()

        # Load existing subtasks from persistence
        self.subtask_manager._load_state()

With Task Assignment (src/marcus_mcp/tools/task.py)#

async def find_optimal_task_for_agent(
    agent_id: str,
    state: Any
) -> Optional[Task]:
    """
    Find optimal task for agent, checking subtasks first.
    """
    # Use integrated finder
    return await find_optimal_task_with_subtasks(
        agent_id,
        state,
        fallback_task_finder=_find_optimal_task_original_logic
    )

With Progress Reporting (src/marcus_mcp/tools/task.py)#

async def report_task_progress(
    agent_id: str,
    task_id: str,
    status: str,
    progress: int,
    message: str,
    state: Any
) -> Dict[str, Any]:
    """
    Report task progress, handling subtask completion.
    """
    # ... existing progress logic ...

    if status == "completed":
        # Check if this is a subtask
        if (hasattr(state, "subtask_manager") and
            state.subtask_manager and
            task_id in state.subtask_manager.subtasks):

            # Update subtask status
            state.subtask_manager.update_subtask_status(
                task_id, TaskStatus.DONE, agent_id
            )

            # Get parent ID
            subtask = state.subtask_manager.subtasks[task_id]
            parent_task_id = subtask.parent_task_id

            # Update parent progress
            await update_subtask_progress_in_parent(
                parent_task_id,
                task_id,
                state.subtask_manager,
                state.kanban_client
            )

            # Check for parent completion
            await check_and_complete_parent_task(
                parent_task_id,
                state.subtask_manager,
                state.kanban_client
            )

    # ... rest of existing logic ...

Data Persistence#

Storage Location#

data/marcus_state/subtasks.json

Data Structure#

{
  "subtasks": {
    "task_123_sub_1": {
      "id": "task_123_sub_1",
      "parent_task_id": "task_123",
      "name": "Create User model",
      "description": "...",
      "status": "done",
      "priority": "high",
      "assigned_to": "dev-001",
      "created_at": "2025-10-05T14:30:00",
      "estimated_hours": 2.0,
      "dependencies": [],
      "file_artifacts": ["src/models/user.py"],
      "provides": "User model with email validation",
      "requires": null,
      "order": 1
    }
  },
  "parent_subtasks": {
    "task_123": ["task_123_sub_1", "task_123_sub_2", "task_123_sub_integration"]
  },
  "metadata": {
    "task_123": {
      "shared_conventions": {
        "base_path": "src/api/auth/",
        "response_format": {...}
      },
      "decomposed_at": "2025-10-05T14:25:00",
      "decomposed_by": "ai"
    }
  }
}

Performance Considerations#

Caching Strategy#

  • Subtask state cached in memory

  • Periodic persistence to JSON (on updates)

  • Load from JSON on server startup

Optimization Techniques#

# Index subtasks by parent for O(1) lookup
self.parent_subtasks: Dict[str, List[str]] = {}

# Cache available subtasks
self._available_cache = {}
self._cache_ttl = 60  # seconds

# Batch Kanban updates
pending_updates = []
# ... collect updates ...
await kanban_client.batch_update(pending_updates)

Scalability#

  • Current: Handles 100s of subtasks per project efficiently

  • Limits: In-memory storage suitable for < 10,000 subtasks

  • Future: Consider database backend for larger scale

Testing#

Unit Tests (tests/unit/coordinator/)#

test_decomposer.py (852 lines, 42 tests):

  • Test should_decompose() heuristics

  • Test AI prompt generation

  • Test integration subtask auto-generation

  • Test shared conventions extraction

  • Test decomposition validation

test_subtask_manager.py (514 lines, 22 tests):

  • Test subtask CRUD operations

  • Test dependency resolution

  • Test status updates

  • Test completion percentage calculations

  • Test persistence (save/load JSON)

  • Test parent-child relationship management

Integration Tests (tests/integration/e2e/)#

test_task_decomposition_e2e.py (366 lines, 6 tests):

  1. test_complete_decomposition_workflow - Full end-to-end decomposition β†’ assignment β†’ completion

  2. test_subtask_to_task_conversion - Subtask conversion with context

  3. test_parent_auto_completion - Parent auto-completes when all subtasks done

  4. test_subtask_progress_updates_parent - Parent progress reflects subtask completion

  5. test_subtask_assignment_respects_already_assigned - No double-assignment

  6. test_subtask_manager_persistence - State persists across restarts

Coverage: 95% of decomposition system code paths

Example Demo (examples/task_decomposition_demo.py)#

Demonstrates:

  • Creating a parent task

  • Automatic decomposition with AI

  • Subtask assignment workflow

  • Progress tracking

  • Parent auto-completion

python examples/task_decomposition_demo.py

Usage Examples#

Basic Decomposition#

from src.marcus_mcp.coordinator import should_decompose, decompose_task, SubtaskManager, SubtaskMetadata

# Check if task should be decomposed
task = await create_task(
    name="Build user authentication system",
    description="Complete auth with login, registration, sessions, and password reset",
    estimated_hours=8.0
)

if should_decompose(task, project_complexity="standard"):
    # Decompose using AI
    decomposition = await decompose_task(
        task,
        state.ai_engine,
        project_context={
            "labels": task.labels,
            "existing_tasks": [t.name for t in state.project_tasks],
        }
    )

    if decomposition["success"]:
        # Store subtasks
        metadata = SubtaskMetadata(
            shared_conventions=decomposition["shared_conventions"],
            decomposed_by="ai"
        )

        # Add subtasks to unified storage
        created_tasks = state.subtask_manager.add_subtasks(
            task.id,
            decomposition["subtasks"],
            project_tasks=state.project_tasks,
            metadata=metadata
        )

        # created_tasks now contains List[Task] with is_subtask=True

Querying Subtasks#

# Get all subtasks for a parent
subtasks = state.subtask_manager.get_subtasks("task_123")

# Check parent completion
progress = state.subtask_manager.get_completion_percentage("task_123")
print(f"Feature {progress}% complete")

# Find available subtask for agent
available = find_next_available_subtask(
    state.subtask_manager,
    "dev-001",
    state.project_tasks
)

if available:
    print(f"Assign: {available.name}")

Monitoring Progress#

# Get subtask status breakdown
subtasks = state.subtask_manager.get_subtasks("task_123")
completed = [s for s in subtasks if s.status == TaskStatus.DONE]
in_progress = [s for s in subtasks if s.status == TaskStatus.IN_PROGRESS]
todo = [s for s in subtasks if s.status == TaskStatus.TODO]

print(f"Completed: {len(completed)}/{len(subtasks)}")
print(f"In Progress: {len(in_progress)}")
print(f"Todo: {len(todo)}")

# Check if ready for parent completion
all_done = all(s.status == TaskStatus.DONE for s in subtasks)
if all_done:
    print("Ready to auto-complete parent task")

Error Handling#

Decomposition Failures#

decomposition = await decompose_task(task, ai_engine, context)

if not decomposition["success"]:
    logger.error(f"Decomposition failed: {decomposition.get('error')}")
    # Fall back to treating as regular task
    return task

Missing Dependencies#

if not state.subtask_manager.can_assign_subtask(subtask_id, project_tasks):
    missing = [
        dep for dep in subtask.dependencies
        if state.subtask_manager.subtasks[dep].status != TaskStatus.DONE
    ]
    logger.warning(f"Cannot assign {subtask.name} - waiting for {missing}")

Persistence Failures#

try:
    state.subtask_manager._save_state()
except Exception as e:
    logger.error(f"Failed to persist subtasks: {e}")
    # Continue operation - in-memory state still valid

Future Enhancements#

Planned Improvements (5% remaining)#

  1. Real-Time Board Checklists

    • Create Kanban checklist items for each subtask

    • Update checklist as subtasks complete

    • Visual progress on parent task card

  2. Subtask Reassignment

    • Automatically reassign subtasks if agent goes offline

    • Lease-based subtask assignments

    • Stale subtask detection and recovery

  3. Learning from Decompositions

    • Track decomposition effectiveness

    • Learn common patterns for task types

    • Improve AI prompts based on successful decompositions

  4. Manual Subtask Creation

    • Allow users to manually define subtasks

    • UI for decomposition editing

    • Merge AI and manual decompositions

Research Opportunities#

  • Optimal Subtask Size: What subtask duration maximizes agent efficiency?

  • Decomposition Patterns: What decomposition structures work best for different project types?

  • Integration Effectiveness: Does automatic integration subtask catch real issues?

  • Agent Preferences: Do agents perform better on certain subtask types?

References#

  • Integration Guide: docs/task-decomposition-integration.md

  • Core Implementation: src/marcus_mcp/coordinator/

  • Tests: tests/unit/coordinator/, tests/integration/e2e/test_task_decomposition_e2e.py

  • Example: examples/task_decomposition_demo.py

  • Persistence: data/marcus_state/subtasks.json


Implementation Status: 95% Complete

  • βœ… Core decomposition logic

  • βœ… Subtask management

  • βœ… Assignment integration

  • βœ… Progress tracking

  • βœ… Parent auto-completion

  • βœ… Persistence layer

  • βœ… Unit tests (64 tests, 100% pass)

  • βœ… Integration tests (6 tests, 100% pass)

  • ⏳ Real-time board checklists (5% remaining)

The Hierarchical Task Decomposition System enables Marcus to intelligently break down complex features into manageable subtasks with clear interfaces, enabling autonomous agents to collaborate effectively on large-scale software projects.