# System 54: Hierarchical Task Decomposition ## Overview The Hierarchical Task Decomposition System is Marcus's AI-powered engine for intelligently breaking down large, complex tasks into smaller, manageable subtasks with clear interfaces, dependencies, and shared conventions. This system enables autonomous agents to work on well-defined components while maintaining cohesion across the overall feature. **Status**: ✅ Production (95% complete) **Category**: Project Management **Dependencies**: 07-AI Intelligence Engine, 23-Task Management Intelligence, 04-Kanban Integration, 03-Context & Dependency System ## What This System Does The Hierarchical Task Decomposition System provides: 1. **Intelligent Decomposition** - AI-powered analysis to break tasks into optimal subtasks 2. **Subtask Management** - Tracking subtask-parent relationships and state 3. **Dependency Resolution** - Managing dependencies between subtasks 4. **Interface Contracts** - Clear provides/requires definitions for each subtask 5. **Shared Conventions** - Enforcing consistency across subtask implementations 6. **Progress Aggregation** - Rolling up subtask completion to parent tasks 7. **Automatic Integration** - Final validation subtask for cohesion testing 8. **Context Enrichment** - Enhanced context for agents working on subtasks ## Architecture ### Component Structure ``` ┌─────────────────────────────────────────────────────────────────┐ │ Hierarchical Task Decomposition System │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────────────┐ ┌────────────────────────────────┐ │ │ │ Decomposer │ │ SubtaskManager │ │ │ │ (decomposer.py) │ │ (subtask_manager.py) │ │ │ │ │ │ │ │ │ │ • should_decompose() │ │ • Subtask dataclass │ │ │ │ • decompose_task() │ │ • SubtaskMetadata │ │ │ │ • AI prompt gen │ │ • add_subtasks() │ │ │ │ • Integration auto │ │ • get_subtasks() │ │ │ │ • Convention builder │ │ • update_subtask_status() │ │ │ │ │ │ • can_assign_subtask() │ │ │ │ │ │ • get_completion_percentage() │ │ │ │ │ │ • _save_state() │ │ │ │ │ │ • _load_state() │ │ │ └──────────────────────┘ └────────────────────────────────┘ │ │ │ │ │ │ └──────────────┬───────────────┘ │ │ │ │ │ ┌───────────────────────┼───────────────────────────────────┐ │ │ │ Subtask Assignment │ │ │ │ │ (subtask_assignment.py) │ │ │ │ │ │ │ │ │ • find_next_available_subtask() • convert_subtask_to_task() │ │ │ • check_and_complete_parent_task() • update_subtask_progress_in_parent() │ │ └───────────────────────────────────────────────────────────┘ │ │ │ │ │ ┌───────────────────────┼───────────────────────────────────┐ │ │ │ Integration Layer │ │ │ │ │ (task_assignment_integration.py, tools/context.py) │ │ │ │ │ │ │ │ │ • find_optimal_task_with_subtasks() • Enhanced get_task_context() │ │ │ • Subtask prioritization • Dependency artifact context │ │ └───────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘ ``` ### Data Flow ``` ┌─────────────────────────────────────────────────────────────────┐ │ Decomposition Flow │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ Create Task │ │ │ │ │ ▼ │ │ should_decompose(task) │ │ │ │ │ ┌──┴───┐ │ │ │ Yes │ No → Regular task workflow │ │ ▼ │ │ decompose_task(task, ai_engine, project_context) │ │ │ │ │ ├─→ AI analyzes task description │ │ ├─→ Identifies components and interfaces │ │ ├─→ Determines dependencies between components │ │ ├─→ Estimates time per component │ │ ├─→ Generates shared conventions │ │ └─→ Adds automatic integration subtask │ │ │ │ │ ▼ │ │ Returns: {success, subtasks, shared_conventions} │ │ │ │ │ ▼ │ │ SubtaskManager.add_subtasks(parent_id, subtasks, metadata) │ │ │ │ │ ├─→ Stores subtask-parent relationships │ │ ├─→ Persists to data/marcus_state/subtasks.json │ │ └─→ Ready for assignment │ │ │ └─────────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────────┐ │ Assignment Flow │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ Agent: request_next_task(agent_id) │ │ │ │ │ ▼ │ │ find_optimal_task_with_subtasks(agent_id, state) │ │ │ │ │ ▼ │ │ find_next_available_subtask(subtask_manager, agent_id, tasks) │ │ │ │ │ ├─→ Get all subtasks across all parent tasks │ │ ├─→ Filter to TODO status │ │ ├─→ Check dependencies satisfied │ │ ├─→ Check not already assigned │ │ ├─→ Sort by parent priority + subtask order │ │ │ │ │ ┌──┴───┐ │ │ │Found?│ │ │ ▼ ▼ │ │ Yes No → Regular task assignment │ │ │ │ │ ▼ │ │ convert_subtask_to_task(subtask, parent_task) │ │ │ │ │ ├─→ Create Task object from Subtask │ │ ├─→ Add enhanced context │ │ │ ├─ is_subtask flag │ │ │ ├─ parent_task info │ │ │ ├─ shared_conventions │ │ │ ├─ dependency_artifacts │ │ │ └─ sibling_subtasks │ │ │ │ │ ▼ │ │ Return task to agent with enriched context │ │ │ │ │ ▼ │ │ Agent works on subtask │ │ │ └─────────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────────┐ │ Completion Flow │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ Agent: report_task_progress(task_id, status="completed") │ │ │ │ │ ▼ │ │ Is task_id a subtask? │ │ │ │ │ ┌──┴───┐ │ │ │ Yes │ No → Regular completion │ │ ▼ │ │ SubtaskManager.update_subtask_status(task_id, DONE, agent_id) │ │ │ │ │ ▼ │ │ update_subtask_progress_in_parent(parent_id, task_id, ...) │ │ │ │ │ ├─→ Calculate parent completion percentage │ │ ├─→ Update parent progress on Kanban board │ │ └─→ Add checklist comment on parent │ │ │ │ │ ▼ │ │ check_and_complete_parent_task(parent_id, ...) │ │ │ │ │ ├─→ Check if all subtasks are DONE │ │ │ │ │ └─→ If yes: │ │ ├─→ Move parent task to DONE on Kanban │ │ ├─→ Add completion summary comment │ │ └─→ Trigger dependent tasks │ │ │ └─────────────────────────────────────────────────────────────────┘ ``` ## Core Components ### 1. Decomposer (`src/marcus_mcp/coordinator/decomposer.py`) **Purpose**: Decides when to decompose tasks and performs AI-powered breakdown #### Key Functions **`should_decompose(task: Task, project_complexity: Optional[str] = None) -> bool`** ```python def should_decompose(task: Task, project_complexity: Optional[str] = None) -> bool: """ Decide whether a task should be decomposed into subtasks. Uses heuristics based on project complexity mode: - prototype: No decomposition (speed priority) - standard: Balanced decomposition (>= 0.2 hours, 3+ indicators) - enterprise: Aggressive decomposition (>= 0.1 hours, 2+ indicators, force decompose "Implement" tasks) Decomposition Thresholds by Mode: - Prototype: Never decompose - Standard (default): >= 0.2 hours (12 minutes), 3+ component indicators - Enterprise: >= 0.1 hours (6 minutes), 2+ component indicators Parameters ---------- task : Task The task to evaluate project_complexity : Optional[str], default=None Project complexity mode: "prototype", "standard", or "enterprise" Returns ------- bool True if task should be decomposed """ ``` **Heuristics by Complexity Mode**: **Prototype Mode**: - ❌ Never decompose (speed over granularity) **Standard Mode (default)**: - ✅ Size: `estimated_hours >= 0.2` (12 minutes) - ✅ Complexity: Multiple indicators (3+) in description (api, database, model, ui, etc.) - ❌ Type: Skip bugfix, hotfix, refactor, documentation, deployment, design tasks **Enterprise Mode**: - ✅ Size: `estimated_hours >= 0.1` (6 minutes) - ✅ Complexity: Multiple indicators (2+) in description - ✅ Force decompose: All "Implement" tasks - ❌ Type: Skip bugfix, hotfix, refactor, documentation, deployment, design tasks **`decompose_task(task, ai_engine, project_context) -> Dict`** ```python async def decompose_task( task: Task, ai_engine: Any, project_context: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Decompose a task into subtasks using AI. Process: 1. Build context-rich prompt with task details and project info 2. Call AI engine for intelligent breakdown 3. Parse AI response into structured subtasks 4. Add automatic integration subtask 5. Generate shared conventions 6. Validate subtask structure Parameters ---------- task : Task The parent task to decompose ai_engine : Any AI intelligence engine for analysis project_context : Optional[Dict[str, Any]] Additional context (labels, existing tasks, tech stack) Returns ------- Dict[str, Any] { "success": bool, "subtasks": List[Dict], "shared_conventions": Dict, "error": Optional[str] } """ ``` **AI Prompt Structure**: ```python prompt = f""" Analyze this task and break it down into subtasks: TASK INFORMATION: Name: {task.name} Description: {task.description} Estimated Hours: {task.estimated_hours} Labels: {task.labels} Priority: {task.priority} PROJECT CONTEXT: {json.dumps(project_context, indent=2)} REQUIREMENTS: 1. Identify distinct components (API, Database, UI, etc.) 2. Define clear interfaces (provides/requires) for each subtask 3. Estimate realistic time per subtask (1-3 hours ideal) 4. Determine dependencies (which subtasks need what) 5. Identify file artifacts each subtask will produce 6. Suggest shared conventions (file structure, naming, formats) OUTPUT FORMAT (JSON): {{ "subtasks": [ {{ "name": "Component name", "description": "What needs to be done", "estimated_hours": 2.0, "dependencies": ["subtask_id_1"], "file_artifacts": ["path/to/file.py"], "provides": "Clear interface this provides", "requires": "What this needs from dependencies", "order": 1 }} ], "shared_conventions": {{ "base_path": "src/feature/", "response_format": {{}}, "naming_convention": "snake_case" }} }} """ ``` **Automatic Integration Subtask**: ```python integration_subtask = { "id": f"{parent_task.id}_sub_integration", "name": "Integration testing and validation", "description": """ Verify all components work together: - Run end-to-end workflow tests - Validate all interfaces function correctly - Check error handling across components - Create consolidated documentation - Verify all file artifacts exist and are correct """, "estimated_hours": 1.0, "dependencies": [all_other_subtask_ids], "file_artifacts": [ "tests/integration/test_{feature}.py", "docs/integration_report.md" ], "provides": "Validated, integrated solution", "requires": "All components complete", "order": 99 # Always last } ``` ### 2. SubtaskManager (`src/marcus_mcp/coordinator/subtask_manager.py`) **Purpose**: Tracks subtask-parent relationships and manages state #### Data Models **`Subtask` Dataclass**: ```python @dataclass class Subtask: """Represents a subtask decomposed from a parent task.""" id: str parent_task_id: str name: str description: str status: TaskStatus priority: Priority assigned_to: Optional[str] created_at: datetime estimated_hours: float dependencies: List[str] = field(default_factory=list) dependency_types: List[str] = field(default_factory=list) file_artifacts: List[str] = field(default_factory=list) provides: Optional[str] = None requires: Optional[str] = None order: int = 0 # Execution order within parent ``` **Field Details**: - `dependency_types`: Type of each dependency - "hard" (blocks start) or "soft" (can use mock/contract) - Must match length of dependencies list - Enables parallel work by using contracts from Design phase - Empty array if no dependencies **`SubtaskMetadata` Dataclass**: ```python @dataclass class SubtaskMetadata: """Metadata about decomposed tasks.""" shared_conventions: Dict[str, Any] = field(default_factory=dict) decomposed_at: datetime = field(default_factory=datetime.now) decomposed_by: str = "ai" # or "manual" ``` #### Key Methods **`add_subtasks(parent_task_id, subtasks, project_tasks, metadata)`** ```python def add_subtasks( self, parent_task_id: str, subtasks: List[Dict[str, Any]], project_tasks: Optional[List[Task]] = None, metadata: Optional[SubtaskMetadata] = None, ) -> List[Task]: """ Add subtasks for a parent task to unified project_tasks storage. Creates Task objects with is_subtask=True and appends them to project_tasks list for unified dependency graph. Parameters ---------- parent_task_id : str ID of the parent task subtasks : List[Dict[str, Any]] List of subtask dictionaries with fields: - name, description, estimated_hours, dependencies, dependency_types, etc. project_tasks : Optional[List[Task]] Unified task storage (server.project_tasks). If None, uses legacy mode. metadata : Optional[SubtaskMetadata] Shared metadata (conventions, decomposition info). Now optional. Returns ------- List[Task] Created Task objects (with is_subtask=True) """ ``` **`get_subtasks(parent_task_id) -> List[Subtask]`** ```python def get_subtasks(self, parent_task_id: str) -> List[Subtask]: """ Get all subtasks for a parent task. Returns sorted by order field. """ ``` **`can_assign_subtask(subtask_id, project_tasks) -> bool`** ```python def can_assign_subtask( self, subtask_id: str, project_tasks: List[Task] ) -> bool: """ Check if a subtask can be assigned. Checks: - Subtask exists - Status is TODO - Not already assigned - All dependencies satisfied (DONE status) Parameters ---------- subtask_id : str ID of subtask to check project_tasks : List[Task] Current project tasks for dependency checking Returns ------- bool True if subtask can be assigned """ ``` **`update_subtask_status(subtask_id, status, assigned_to)`** ```python def update_subtask_status( self, subtask_id: str, new_status: TaskStatus, assigned_to: Optional[str] = None ) -> None: """ Update subtask status and optionally assignment. Automatically persists to JSON. """ ``` **`get_completion_percentage(parent_task_id) -> int`** ```python def get_completion_percentage(self, parent_task_id: str) -> int: """ Calculate completion percentage for parent task. Returns ------- int Percentage (0-100) based on completed subtasks """ ``` **Persistence**: ```python def _save_state(self) -> None: """Save subtasks to data/marcus_state/subtasks.json""" def _load_state(self) -> None: """Load subtasks from data/marcus_state/subtasks.json""" ``` ### 3. Subtask Assignment (`src/marcus_mcp/coordinator/subtask_assignment.py`) **Purpose**: Helper functions for finding and assigning subtasks **`find_next_available_subtask(subtask_manager, agent_id, project_tasks)`** ```python def find_next_available_subtask( subtask_manager: SubtaskManager, agent_id: str, project_tasks: List[Task] ) -> Optional[Subtask]: """ Find the highest priority available subtask for an agent. Process: 1. Get all subtasks across all parent tasks 2. Filter to TODO status 3. Filter to satisfied dependencies 4. Filter to not already assigned 5. Sort by parent task priority, then subtask order 6. Return first match Parameters ---------- subtask_manager : SubtaskManager Manager with all subtask state agent_id : str Agent requesting work project_tasks : List[Task] Current project tasks for dependency validation Returns ------- Optional[Subtask] Available subtask or None """ ``` **`convert_subtask_to_task(subtask, parent_task) -> Task`** ```python def convert_subtask_to_task( subtask: Subtask, parent_task: Task ) -> Task: """ Convert Subtask to Task object for assignment. Enriches with: - is_subtask flag in description - Parent task information - Shared conventions - Dependency artifact context - Sibling subtask information Parameters ---------- subtask : Subtask The subtask to convert parent_task : Task The parent Task object for context Returns ------- Task Task object ready for agent assignment """ ``` **`update_subtask_progress_in_parent(parent_task_id, subtask_id, manager, kanban)`** ```python async def update_subtask_progress_in_parent( parent_task_id: str, completed_subtask_id: str, subtask_manager: SubtaskManager, kanban_client: Any ) -> None: """ Update parent task progress when subtask completes. Updates: - Parent task progress percentage - Kanban board checklist - Parent task comments with completion info Parameters ---------- parent_task_id : str Parent task to update completed_subtask_id : str Subtask that just completed subtask_manager : SubtaskManager Manager with all subtask state kanban_client : Any Kanban integration for board updates """ ``` **`check_and_complete_parent_task(parent_task_id, manager, kanban)`** ```python async def check_and_complete_parent_task( parent_task_id: str, subtask_manager: SubtaskManager, kanban_client: Any ) -> None: """ Check if parent should be completed and do so if ready. Completes parent when: - All subtasks have DONE status - Parent task exists - Parent not already complete Actions on completion: - Move parent to DONE on Kanban - Add completion summary comment - Trigger dependent tasks Parameters ---------- parent_task_id : str Parent task to check subtask_manager : SubtaskManager Manager with all subtask state kanban_client : Any Kanban integration for updates """ ``` ### 4. Integration Layer **Task Assignment Integration** (`src/marcus_mcp/coordinator/task_assignment_integration.py`): ```python async def find_optimal_task_with_subtasks( agent_id: str, state: Any, fallback_task_finder: Callable ) -> Optional[Task]: """ Find optimal task, prioritizing subtasks over regular tasks. Workflow: 1. Check if subtask_manager exists 2. If yes, look for available subtasks 3. If subtask found, convert and return 4. If no subtask, use fallback_task_finder for regular tasks This is a wrapper that integrates subtask logic without modifying existing task assignment code. """ ``` **Context Enhancement** (`src/marcus_mcp/tools/context.py`): ```python async def get_task_context(task_id: str, state: Any) -> Dict[str, Any]: """ Get enhanced context for a task, including subtask information. For subtasks, includes: - is_subtask flag - subtask_info (name, provides, requires, file_artifacts) - parent_task details - shared_conventions - dependency_artifacts (completed subtasks this can use) - sibling_subtasks (other subtasks in same decomposition) For regular tasks, returns standard context. """ ``` ## Integration Points ### With Marcus Server (`src/marcus_mcp/server.py`) ```python class MarcusServer: def __init__(self): # ... existing initialization ... # Add SubtaskManager self.subtask_manager = SubtaskManager() # Load existing subtasks from persistence self.subtask_manager._load_state() ``` ### With Task Assignment (`src/marcus_mcp/tools/task.py`) ```python async def find_optimal_task_for_agent( agent_id: str, state: Any ) -> Optional[Task]: """ Find optimal task for agent, checking subtasks first. """ # Use integrated finder return await find_optimal_task_with_subtasks( agent_id, state, fallback_task_finder=_find_optimal_task_original_logic ) ``` ### With Progress Reporting (`src/marcus_mcp/tools/task.py`) ```python async def report_task_progress( agent_id: str, task_id: str, status: str, progress: int, message: str, state: Any ) -> Dict[str, Any]: """ Report task progress, handling subtask completion. """ # ... existing progress logic ... if status == "completed": # Check if this is a subtask if (hasattr(state, "subtask_manager") and state.subtask_manager and task_id in state.subtask_manager.subtasks): # Update subtask status state.subtask_manager.update_subtask_status( task_id, TaskStatus.DONE, agent_id ) # Get parent ID subtask = state.subtask_manager.subtasks[task_id] parent_task_id = subtask.parent_task_id # Update parent progress await update_subtask_progress_in_parent( parent_task_id, task_id, state.subtask_manager, state.kanban_client ) # Check for parent completion await check_and_complete_parent_task( parent_task_id, state.subtask_manager, state.kanban_client ) # ... rest of existing logic ... ``` ## Data Persistence ### Storage Location ``` data/marcus_state/subtasks.json ``` ### Data Structure ```json { "subtasks": { "task_123_sub_1": { "id": "task_123_sub_1", "parent_task_id": "task_123", "name": "Create User model", "description": "...", "status": "done", "priority": "high", "assigned_to": "dev-001", "created_at": "2025-10-05T14:30:00", "estimated_hours": 2.0, "dependencies": [], "file_artifacts": ["src/models/user.py"], "provides": "User model with email validation", "requires": null, "order": 1 } }, "parent_subtasks": { "task_123": ["task_123_sub_1", "task_123_sub_2", "task_123_sub_integration"] }, "metadata": { "task_123": { "shared_conventions": { "base_path": "src/api/auth/", "response_format": {...} }, "decomposed_at": "2025-10-05T14:25:00", "decomposed_by": "ai" } } } ``` ## Performance Considerations ### Caching Strategy - Subtask state cached in memory - Periodic persistence to JSON (on updates) - Load from JSON on server startup ### Optimization Techniques ```python # Index subtasks by parent for O(1) lookup self.parent_subtasks: Dict[str, List[str]] = {} # Cache available subtasks self._available_cache = {} self._cache_ttl = 60 # seconds # Batch Kanban updates pending_updates = [] # ... collect updates ... await kanban_client.batch_update(pending_updates) ``` ### Scalability - **Current**: Handles 100s of subtasks per project efficiently - **Limits**: In-memory storage suitable for < 10,000 subtasks - **Future**: Consider database backend for larger scale ## Testing ### Unit Tests (`tests/unit/coordinator/`) **`test_decomposer.py`** (852 lines, 42 tests): - Test `should_decompose()` heuristics - Test AI prompt generation - Test integration subtask auto-generation - Test shared conventions extraction - Test decomposition validation **`test_subtask_manager.py`** (514 lines, 22 tests): - Test subtask CRUD operations - Test dependency resolution - Test status updates - Test completion percentage calculations - Test persistence (save/load JSON) - Test parent-child relationship management ### Integration Tests (`tests/integration/e2e/`) **`test_task_decomposition_e2e.py`** (366 lines, 6 tests): 1. `test_complete_decomposition_workflow` - Full end-to-end decomposition → assignment → completion 2. `test_subtask_to_task_conversion` - Subtask conversion with context 3. `test_parent_auto_completion` - Parent auto-completes when all subtasks done 4. `test_subtask_progress_updates_parent` - Parent progress reflects subtask completion 5. `test_subtask_assignment_respects_already_assigned` - No double-assignment 6. `test_subtask_manager_persistence` - State persists across restarts **Coverage**: 95% of decomposition system code paths ### Example Demo (`examples/task_decomposition_demo.py`) Demonstrates: - Creating a parent task - Automatic decomposition with AI - Subtask assignment workflow - Progress tracking - Parent auto-completion ```bash python examples/task_decomposition_demo.py ``` ## Usage Examples ### Basic Decomposition ```python from src.marcus_mcp.coordinator import should_decompose, decompose_task, SubtaskManager, SubtaskMetadata # Check if task should be decomposed task = await create_task( name="Build user authentication system", description="Complete auth with login, registration, sessions, and password reset", estimated_hours=8.0 ) if should_decompose(task, project_complexity="standard"): # Decompose using AI decomposition = await decompose_task( task, state.ai_engine, project_context={ "labels": task.labels, "existing_tasks": [t.name for t in state.project_tasks], } ) if decomposition["success"]: # Store subtasks metadata = SubtaskMetadata( shared_conventions=decomposition["shared_conventions"], decomposed_by="ai" ) # Add subtasks to unified storage created_tasks = state.subtask_manager.add_subtasks( task.id, decomposition["subtasks"], project_tasks=state.project_tasks, metadata=metadata ) # created_tasks now contains List[Task] with is_subtask=True ``` ### Querying Subtasks ```python # Get all subtasks for a parent subtasks = state.subtask_manager.get_subtasks("task_123") # Check parent completion progress = state.subtask_manager.get_completion_percentage("task_123") print(f"Feature {progress}% complete") # Find available subtask for agent available = find_next_available_subtask( state.subtask_manager, "dev-001", state.project_tasks ) if available: print(f"Assign: {available.name}") ``` ### Monitoring Progress ```python # Get subtask status breakdown subtasks = state.subtask_manager.get_subtasks("task_123") completed = [s for s in subtasks if s.status == TaskStatus.DONE] in_progress = [s for s in subtasks if s.status == TaskStatus.IN_PROGRESS] todo = [s for s in subtasks if s.status == TaskStatus.TODO] print(f"Completed: {len(completed)}/{len(subtasks)}") print(f"In Progress: {len(in_progress)}") print(f"Todo: {len(todo)}") # Check if ready for parent completion all_done = all(s.status == TaskStatus.DONE for s in subtasks) if all_done: print("Ready to auto-complete parent task") ``` ## Error Handling ### Decomposition Failures ```python decomposition = await decompose_task(task, ai_engine, context) if not decomposition["success"]: logger.error(f"Decomposition failed: {decomposition.get('error')}") # Fall back to treating as regular task return task ``` ### Missing Dependencies ```python if not state.subtask_manager.can_assign_subtask(subtask_id, project_tasks): missing = [ dep for dep in subtask.dependencies if state.subtask_manager.subtasks[dep].status != TaskStatus.DONE ] logger.warning(f"Cannot assign {subtask.name} - waiting for {missing}") ``` ### Persistence Failures ```python try: state.subtask_manager._save_state() except Exception as e: logger.error(f"Failed to persist subtasks: {e}") # Continue operation - in-memory state still valid ``` ## Future Enhancements ### Planned Improvements (5% remaining) 1. **Real-Time Board Checklists** - Create Kanban checklist items for each subtask - Update checklist as subtasks complete - Visual progress on parent task card 2. **Subtask Reassignment** - Automatically reassign subtasks if agent goes offline - Lease-based subtask assignments - Stale subtask detection and recovery 3. **Learning from Decompositions** - Track decomposition effectiveness - Learn common patterns for task types - Improve AI prompts based on successful decompositions 4. **Manual Subtask Creation** - Allow users to manually define subtasks - UI for decomposition editing - Merge AI and manual decompositions ### Research Opportunities - **Optimal Subtask Size**: What subtask duration maximizes agent efficiency? - **Decomposition Patterns**: What decomposition structures work best for different project types? - **Integration Effectiveness**: Does automatic integration subtask catch real issues? - **Agent Preferences**: Do agents perform better on certain subtask types? ## Related Systems - **[07 - AI Intelligence Engine](../intelligence/07-ai-intelligence-engine.md)** - Powers AI decomposition - **[23 - Task Management Intelligence](../intelligence/23-task-management-intelligence.md)** - Task analysis and classification - **[04 - Kanban Integration](04-kanban-integration.md)** - Board synchronization for parent tasks - **[03 - Context & Dependency System](../coordination/03-context-dependency-system.md)** - Enhanced context for subtasks - **[36 - Task Dependency System](../coordination/36-task-dependency-system.md)** - Dependency resolution ## References - **Integration Guide**: `docs/task-decomposition-integration.md` - **Core Implementation**: `src/marcus_mcp/coordinator/` - **Tests**: `tests/unit/coordinator/`, `tests/integration/e2e/test_task_decomposition_e2e.py` - **Example**: `examples/task_decomposition_demo.py` - **Persistence**: `data/marcus_state/subtasks.json` --- **Implementation Status**: 95% Complete - ✅ Core decomposition logic - ✅ Subtask management - ✅ Assignment integration - ✅ Progress tracking - ✅ Parent auto-completion - ✅ Persistence layer - ✅ Unit tests (64 tests, 100% pass) - ✅ Integration tests (6 tests, 100% pass) - ⏳ Real-time board checklists (5% remaining) *The Hierarchical Task Decomposition System enables Marcus to intelligently break down complex features into manageable subtasks with clear interfaces, enabling autonomous agents to collaborate effectively on large-scale software projects.*