# Optimal Agent Scheduling System ## Overview The Optimal Agent Scheduling System is Marcus's intelligent resource optimization engine that uses Critical Path Method (CPM) analysis to determine the optimal number of agents needed for maximum project efficiency. This system analyzes the unified dependency graph (including parent tasks and subtasks) to identify parallel work opportunities, calculate critical paths, and provide data-driven recommendations for agent scaling. ## What This System Does The Optimal Agent Scheduling System serves multiple critical functions: 1. **Critical Path Analysis**: Identifies the longest dependency chain that determines minimum project duration 2. **Parallelism Detection**: Discovers opportunities where multiple tasks can run simultaneously 3. **Resource Optimization**: Calculates optimal agent count to maximize efficiency without waste 4. **Efficiency Prediction**: Estimates time savings from multi-agent coordination 5. **Agent Self-Assessment**: Enables autonomous agents to identify resource constraints 6. **Dynamic Scaling Recommendations**: Provides real-time guidance for scaling agent pools up or down ## Architecture The system consists of several interconnected components working together: ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ Optimal Agent Scheduling System │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────┐ ┌─────────────────┐ ┌──────────────────────────┐ │ │ │ Unified Graph │ │ CPM Calculator │ │ Scheduling Optimizer │ │ │ │ Query │ │ │ │ │ │ │ │ │ │ • Topological │ │ • Parallelism Analysis │ │ │ │ • project_tasks │ │ Sort (Kahn) │ │ • Agent Count Calc │ │ │ │ • Parents + │ │ • Critical Path │ │ • Efficiency Metrics │ │ │ │ Subtasks │ │ • Time Calc │ │ • Resource Planning │ │ │ │ • Dependencies │ │ • Level Assign │ │ • Cost-Benefit Analysis │ │ │ └─────────────────┘ └─────────────────┘ └──────────────────────────┘ │ │ │ │ │ │ │ └──────────────────────┼───────────────────────────┘ │ │ │ │ │ ┌───────────────────────────────┼─────────────────────────────────────┐ │ │ │ MCP Tool Interface │ │ │ │ │ │ │ │ │ • get_optimal_agent_count │ • Agent/Human Access │ │ │ │ • include_details param │ • JSON Response Format │ │ │ │ • Error Handling │ • Project Context Management │ │ │ └───────────────────────────────────────────────────────────────────┘ │ │ │ │ │ ┌───────────────────────────────┼─────────────────────────────────────┐ │ │ │ Integration & Usage Layer │ │ │ │ │ │ │ │ │ • Inspector Client Access │ • Agent Self-Assessment │ │ │ │ • Workflow Integration │ • Blocker Reporting │ │ │ │ • Real-time Recommendations │ • Dynamic Scaling Decisions │ │ │ └───────────────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────────────────┘ ``` ### Core Components #### 1. Scheduler (`src/marcus_mcp/coordinator/scheduler.py`) **Purpose**: Core CPM implementation for critical path and optimal agent calculation **Key Classes**: - **ProjectSchedule**: Result dataclass containing optimal agents, critical path, efficiency metrics - **calculate_optimal_agents**: Main entry point for CPM analysis **CPM Algorithm**: ```python def calculate_optimal_agents(tasks: List[Task]) -> ProjectSchedule: """ Calculate optimal agent count using Critical Path Method. Returns ------- ProjectSchedule Contains optimal_agents, critical_path_hours, max_parallelism, efficiency_gain, and parallel_opportunities """ # Step 1: Build dependency graph graph, in_degree = _build_dependency_graph(tasks) # Step 2: Topological sort using Kahn's algorithm sorted_tasks = _topological_sort(tasks, graph, in_degree) # Step 3: Calculate earliest start times (forward pass) earliest_start = _calculate_earliest_start(sorted_tasks, graph, task_map) # Step 4: Identify critical path (longest chain) critical_path_hours = max(earliest_start.values()) if earliest_start else 0 # Step 5: Analyze parallelism by level parallel_opportunities = _analyze_parallel_opportunities(sorted_tasks, earliest_start) # Step 6: Calculate optimal agent count max_parallelism = max(len(tasks) for _, tasks in parallel_opportunities) optimal_agents = min(max_parallelism, len(tasks)) # Step 7: Calculate efficiency metrics single_agent_hours = sum(task.estimated_hours for task in tasks) efficiency_gain = (single_agent_hours - critical_path_hours) / single_agent_hours return ProjectSchedule( optimal_agents=optimal_agents, critical_path_hours=critical_path_hours, max_parallelism=max_parallelism, estimated_completion_hours=critical_path_hours, single_agent_hours=single_agent_hours, efficiency_gain=efficiency_gain, parallel_opportunities=parallel_opportunities ) ``` **Topological Sort (Kahn's Algorithm)**: ```python def _topological_sort( tasks: List[Task], graph: Dict[str, List[str]], in_degree: Dict[str, int] ) -> List[Task]: """ Perform topological sort to determine task execution order. Uses Kahn's algorithm: 1. Start with tasks that have no dependencies (in_degree = 0) 2. Process each task, reducing in_degree of dependents 3. Add newly-ready tasks to queue 4. Detect cycles if tasks remain with non-zero in_degree """ queue = deque([t for t in tasks if in_degree.get(t.id, 0) == 0]) sorted_tasks = [] while queue: task = queue.popleft() sorted_tasks.append(task) # Process dependents for dependent_id in graph.get(task.id, []): in_degree[dependent_id] -= 1 if in_degree[dependent_id] == 0: dependent = next(t for t in tasks if t.id == dependent_id) queue.append(dependent) # Check for cycles if len(sorted_tasks) != len(tasks): raise ValueError("Circular dependency detected in task graph") return sorted_tasks ``` **Critical Path Calculation**: ```python def _calculate_earliest_start( sorted_tasks: List[Task], graph: Dict[str, List[str]], task_map: Dict[str, Task] ) -> Dict[str, float]: """ Calculate earliest start time for each task (forward pass). The critical path is the longest dependency chain from start to finish. """ earliest_start = {} for task in sorted_tasks: # Task can start when all dependencies complete dep_completion_times = [ earliest_start.get(dep_id, 0) + task_map[dep_id].estimated_hours for dep_id in (task.dependencies or []) if dep_id in task_map ] earliest_start[task.id] = max(dep_completion_times) if dep_completion_times else 0 return earliest_start ``` **Parallelism Analysis**: ```python def _analyze_parallel_opportunities( sorted_tasks: List[Task], earliest_start: Dict[str, float] ) -> List[Dict[str, Any]]: """ Group tasks by earliest start time to identify parallel work. Returns list of time points with tasks that can run simultaneously. """ # Group tasks by start time time_buckets: Dict[float, List[str]] = {} for task in sorted_tasks: start_time = earliest_start.get(task.id, 0) if start_time not in time_buckets: time_buckets[start_time] = [] time_buckets[start_time].append(task.name) # Convert to parallel opportunities list opportunities = [ { "time": time, "task_count": len(task_names), "tasks": task_names } for time, task_names in sorted(time_buckets.items()) if len(task_names) > 1 # Only include parallel opportunities ] return opportunities ``` #### 2. Scheduling Tool (`src/marcus_mcp/tools/scheduling.py`) **Purpose**: MCP tool wrapper for scheduler, making CPM accessible via Marcus interface **Key Functions**: - **get_optimal_agent_count**: Async function that queries unified graph and calls scheduler **Tool Interface**: ```python async def get_optimal_agent_count( include_details: bool = False, state: Any = None, ) -> Dict[str, Any]: """ Calculate optimal number of agents using CPM analysis. Parameters ---------- include_details : bool Whether to include detailed parallel opportunities state : Any Marcus server state instance Returns ------- Dict[str, Any] Scheduling analysis with optimal agent count """ if not state: return { "success": False, "error": "Server state not available", } try: # Get all tasks from unified graph tasks = getattr(state, "project_tasks", []) if not tasks: return { "success": True, "message": "No tasks available for scheduling", "optimal_agents": 0, "critical_path_hours": 0.0, "total_work_hours": 0.0, } # Calculate optimal agents using CPM schedule = calculate_optimal_agents(tasks) # Build response response = { "success": True, "optimal_agents": schedule.optimal_agents, "critical_path_hours": round(schedule.critical_path_hours, 2), "max_parallelism": schedule.max_parallelism, "estimated_completion_hours": round(schedule.estimated_completion_hours, 2), "single_agent_hours": round(schedule.single_agent_hours, 2), "efficiency_gain_percent": round(schedule.efficiency_gain * 100, 1), "total_tasks": len(tasks), } # Add detailed parallel opportunities if requested if include_details and schedule.parallel_opportunities: response["parallel_opportunities"] = schedule.parallel_opportunities return response except ValueError as e: # Handle dependency cycles return { "success": False, "error": f"Cannot calculate optimal agents: {str(e)}", "suggestion": "Check for circular dependencies in your task graph", } except Exception as e: logger.error(f"Error calculating optimal agents: {e}", exc_info=True) return { "success": False, "error": f"Failed to calculate optimal agents: {str(e)}", } ``` #### 3. MCP Tool Registration (`src/marcus_mcp/handlers.py`) **Purpose**: Registers tool with Marcus MCP interface for agent and human access **Tool Definition**: ```python types.Tool( name="get_optimal_agent_count", description=( "Calculate optimal number of agents using Critical Path Method (CPM) analysis.\n\n" "Analyzes the unified dependency graph (including parent tasks and subtasks) " "to determine the optimal agent count for maximum efficiency.\n\n" "Returns:\n" "- optimal_agents: Recommended number of agents\n" "- critical_path_hours: Duration of longest dependency chain\n" "- max_parallelism: Maximum tasks that can run simultaneously\n" "- efficiency_gain: Percentage improvement vs single agent\n" "- estimated_completion_hours: Expected completion time\n\n" "Optionally includes detailed parallel opportunities showing when " "multiple tasks can be worked on simultaneously." ), inputSchema={ "type": "object", "properties": { "include_details": { "type": "boolean", "description": ( "Include detailed parallel opportunities analysis " "(shows time points where multiple tasks can run in parallel)" ), "default": False, }, }, "required": [], }, ) ``` **Handler Implementation**: ```python # In handle_call_tool function elif name == "get_optimal_agent_count": result = await get_optimal_agent_count( include_details=arguments.get("include_details", False), state=state, ) ``` ## Integration with Marcus Ecosystem ### Position in Marcus Architecture The Optimal Agent Scheduling System operates as the "resource advisor" for project planning, sitting alongside task assignment: ```mermaid graph TD A[Create Project] --> B[Generate Tasks with Dependencies] B --> C[Unified Dependency Graph] C --> D[Optimal Agent Scheduling System] C --> E[Agent Assignment System] D --> F[CPM Analysis] F --> G[Optimal Agent Count] G --> H[Scaling Recommendations] E --> I[Task Assignment] I --> J[Agent Work] J --> K[Progress Updates] K --> D H --> L[Agent Self-Assessment] L --> M[Report Blocker: Resource Constraint] M --> N[Human Decision: Scale Up/Down] ``` ### Integration with Unified Dependency Graph The system directly queries `state.project_tasks` which contains the unified dependency graph created in Phase 3: ```python # Unified graph structure (from Phase 3) # Parent tasks have: is_subtask=False, parent_task_id=None # Subtasks have: is_subtask=True, parent_task_id=, subtask_index=N # Example unified graph: project_tasks = [ Task(id="1", name="Design API", estimated_hours=8, dependencies=[]), Task(id="2", name="Implement Backend", estimated_hours=16, dependencies=["1"]), Task(id="3", name="Write Tests", estimated_hours=8, dependencies=["2"]), Task(id="2.1", name="Setup Database", estimated_hours=4, dependencies=["1"], is_subtask=True, parent_task_id="2", subtask_index=1), Task(id="2.2", name="Create API Endpoints", estimated_hours=8, dependencies=["2.1"], is_subtask=True, parent_task_id="2", subtask_index=2), ] # Scheduler analyzes ALL tasks (parents + subtasks) as single graph schedule = calculate_optimal_agents(project_tasks) # Result: optimal_agents=2, critical_path_hours=28, efficiency_gain=37% ``` ### Integration Points 1. **Project Creation (create_project)**: ``` Tasks generated → Unified graph created → [SCHEDULING ANALYSIS AVAILABLE] ``` 2. **Agent Startup (register_agent)**: ``` Agent registers → Calls get_optimal_agent_count → [RESOURCE ASSESSMENT] ``` 3. **Periodic Monitoring**: ``` Tasks complete → Graph evolves → [RE-CALCULATE OPTIMAL AGENTS] ``` 4. **Blocker Reporting (report_blocker)**: ``` Agent blocked → Analyze scheduling → [IDENTIFY RESOURCE CONSTRAINT] ``` ### Supporting Systems Integration - **Task Dependency System**: Provides validated dependency graph for CPM analysis - **Assignment System**: Uses scheduling recommendations for agent pool sizing - **Subtask System**: Unified graph enables fine-grained parallelism analysis - **Error Framework**: Robust error handling with automatic fallback strategies ## Workflow Integration ### Typical Marcus Scenario Flow ``` create_project → register_agent → [get_optimal_agent_count] → request_next_task → work ↓ ↓ ↓ ↓ ↓ [Tasks Created] [Agent Ready] [Resource Assessment] [Task Assignment] [Execute] [Monitor] ``` ### When the Scheduling System is Used #### 1. Project Creation - Initial Assessment **Phase**: After task generation and dependency inference ```python # After create_project completes async def assess_project_resources(): result = await session.call_tool( "get_optimal_agent_count", arguments={"include_details": True} ) data = json.loads(result.content[0].text) print(f"Project requires {data['optimal_agents']} agents") print(f"Expected completion: {data['estimated_completion_hours']} hours") print(f"Efficiency gain: {data['efficiency_gain_percent']}%") ``` **What Happens**: - Analyzes complete project dependency graph - Identifies parallel work opportunities - Calculates critical path duration - Recommends optimal agent count - Estimates time savings from parallelization #### 2. Agent Startup - Self-Assessment **Phase**: During agent registration and initialization ```python # Agent startup workflow async def agent_startup_check(): # Register as agent await session.call_tool("register_agent", {...}) # Assess resource needs result = await session.call_tool("get_optimal_agent_count", {}) data = json.loads(result.content[0].text) if data["optimal_agents"] > 1: # Report resource constraint await session.call_tool( "report_blocker", arguments={ "agent_id": "agent-1", "blocker_type": "Resource Constraint", "description": ( f"Project could benefit from {data['optimal_agents']} agents. " f"Current setup: 1 agent. " f"Expected speedup: {data['efficiency_gain_percent']}% " f"({data['single_agent_hours']}h → {data['critical_path_hours']}h)" ) } ) ``` #### 3. Periodic Monitoring - Dynamic Assessment **Phase**: After significant project state changes ```python # Check after N tasks complete async def periodic_resource_check(tasks_completed: int): if tasks_completed % 5 == 0: # Every 5 tasks result = await session.call_tool( "get_optimal_agent_count", arguments={"include_details": True} ) data = json.loads(result.content[0].text) # Analyze parallel opportunities if data.get("parallel_opportunities"): max_parallel = max(opp["task_count"] for opp in data["parallel_opportunities"]) if max_parallel > current_agent_count: logger.info( f"Bottleneck detected: {max_parallel} tasks can run in parallel " f"but only {current_agent_count} agents available" ) ``` #### 4. Scaling Decisions - Resource Optimization **Phase**: When deciding whether to scale agent pool ```python # Decision logic for scaling async def should_scale_up(): result = await session.call_tool("get_optimal_agent_count", {}) data = json.loads(result.content[0].text) if not data["success"]: return False, 0 optimal = data["optimal_agents"] current = get_current_agent_count() efficiency_gain = data["efficiency_gain_percent"] # Scale up if significant speedup possible if optimal > current and efficiency_gain > 25: return True, optimal # Scale down if over-provisioned if optimal < current and efficiency_gain < 10: return True, optimal return False, current ``` ## What Makes This System Special ### 1. Unified Graph Analysis **Single Source of Truth**: - Analyzes `state.project_tasks` which includes both parent tasks AND subtasks - No separate subtask calculations needed - Dependencies between parents and subtasks handled uniformly - Enables fine-grained parallelism detection **Example**: ```python # Parent task with 3 subtasks parent = Task(id="1", name="Implement Feature", estimated_hours=20) # Subtasks can have dependencies on other subtasks or parents subtask1 = Task(id="1.1", name="Setup", estimated_hours=4, is_subtask=True, parent_task_id="1", dependencies=[]) subtask2 = Task(id="1.2", name="Core Logic", estimated_hours=10, is_subtask=True, parent_task_id="1", dependencies=["1.1"]) subtask3 = Task(id="1.3", name="Tests", estimated_hours=6, is_subtask=True, parent_task_id="1", dependencies=["1.2"]) # Scheduler sees all 4 tasks in single graph and calculates optimal parallelism ``` ### 2. Critical Path Method (CPM) **Industry-Standard Algorithm**: - Based on project management best practices - Proven for resource optimization - Accurate parallelism detection - Handles complex dependency graphs **Key Advantages**: - **Deterministic**: Same input always produces same output - **Explainable**: Can trace why specific agent count recommended - **Efficient**: O(V + E) complexity (linear in tasks and dependencies) - **Robust**: Handles cycles gracefully with clear error messages ### 3. Agent Self-Assessment **Autonomous Resource Awareness**: - Agents can calculate their own resource needs - No human intervention required for basic scaling decisions - Enables proactive bottleneck identification - Supports intelligent blocker reporting **Example**: ```python # Agent discovers it's under-resourced if optimal_agents > 1 and current_agents == 1: await report_blocker( "Resource Constraint", f"Project has {max_parallelism} parallel work opportunities " f"but only 1 agent available" ) ``` ### 4. Real-Time Recommendations **Dynamic Analysis**: - Recalculates as project evolves - Adapts to completed tasks - Identifies when scaling up/down makes sense - Provides efficiency metrics for decision-making ### 5. Detailed Parallel Opportunities **Fine-Grained Analysis**: ```json { "parallel_opportunities": [ { "time": 0.0, "task_count": 3, "tasks": ["Setup Database", "Design API", "Configure CI/CD"] }, { "time": 8.0, "task_count": 2, "tasks": ["Implement Backend", "Build Frontend"] } ] } ``` Shows exactly when and where parallel work is possible. ## Technical Implementation Details ### Core Data Models #### ProjectSchedule Result ```python @dataclass class ProjectSchedule: """Result of CPM scheduling analysis.""" optimal_agents: int # Recommended agent count critical_path_hours: float # Longest dependency chain max_parallelism: int # Max simultaneous tasks estimated_completion_hours: float # Expected project duration single_agent_hours: float # Sequential completion time efficiency_gain: float # 0.0-1.0 speedup ratio parallel_opportunities: List[Dict] # Detailed parallel work points ``` #### Dependency Graph Structure ```python # Internal representation for CPM graph: Dict[str, List[str]] = { "task_1": ["task_2", "task_3"], # task_1 enables task_2 and task_3 "task_2": ["task_4"], # task_2 enables task_4 "task_3": ["task_4"], # task_3 also enables task_4 } in_degree: Dict[str, int] = { "task_1": 0, # No dependencies "task_2": 1, # Depends on task_1 "task_3": 1, # Depends on task_1 "task_4": 2, # Depends on task_2 AND task_3 } ``` ### Algorithm Complexity **Time Complexity**: - Graph building: O(V + E) where V = tasks, E = dependencies - Topological sort: O(V + E) - Critical path: O(V + E) - Parallelism analysis: O(V) - **Total: O(V + E)** - Linear in graph size **Space Complexity**: - Graph storage: O(V + E) - Earliest start times: O(V) - **Total: O(V + E)** **Performance**: - Small projects (1-10 tasks): < 1ms - Medium projects (10-50 tasks): 1-5ms - Large projects (50-200 tasks): 5-20ms - Very large projects (200+ tasks): 20-100ms ### Edge Cases and Error Handling #### Circular Dependencies ```python # Example circular dependency task_a.dependencies = ["task_b"] task_b.dependencies = ["task_a"] # Result: { "success": False, "error": "Cannot calculate optimal agents: Circular dependency detected", "suggestion": "Check for circular dependencies in your task graph" } ``` #### Empty Project ```python # No tasks in project tasks = [] # Result: { "success": True, "message": "No tasks available for scheduling", "optimal_agents": 0, "critical_path_hours": 0.0, "total_work_hours": 0.0 } ``` #### Single Task ```python # Only one task tasks = [Task(id="1", name="Deploy", estimated_hours=2)] # Result: { "success": True, "optimal_agents": 1, "critical_path_hours": 2.0, "max_parallelism": 1, "efficiency_gain_percent": 0.0, # No benefit from multiple agents "single_agent_hours": 2.0, "estimated_completion_hours": 2.0 } ``` #### All Independent Tasks ```python # No dependencies tasks = [ Task(id="1", name="Task A", estimated_hours=4, dependencies=[]), Task(id="2", name="Task B", estimated_hours=4, dependencies=[]), Task(id="3", name="Task C", estimated_hours=4, dependencies=[]), ] # Result: { "success": True, "optimal_agents": 3, # All can run in parallel "critical_path_hours": 4.0, # Longest single task "max_parallelism": 3, "efficiency_gain_percent": 66.7, # 12h → 4h = 67% faster "single_agent_hours": 12.0, "estimated_completion_hours": 4.0 } ``` #### Sequential Chain ```python # Pure sequential dependencies tasks = [ Task(id="1", name="Step 1", estimated_hours=2, dependencies=[]), Task(id="2", name="Step 2", estimated_hours=2, dependencies=["1"]), Task(id="3", name="Step 3", estimated_hours=2, dependencies=["2"]), ] # Result: { "success": True, "optimal_agents": 1, # No parallelism possible "critical_path_hours": 6.0, "max_parallelism": 1, "efficiency_gain_percent": 0.0, # No benefit from multiple agents "single_agent_hours": 6.0, "estimated_completion_hours": 6.0 } ``` ## Usage Patterns ### 1. Inspector Client - Stdio Mode ```python from src.worker.inspector import Inspector import json async def assess_project_resources(): client = Inspector(connection_type="stdio") async with client.connect() as session: # Authenticate await session.call_tool( "authenticate", arguments={"client_id": "test", "client_type": "admin", "role": "admin"} ) # Create project await session.call_tool( "create_project", arguments={ "project_name": "Web App", "description": "Build a web application..." } ) # Get optimal agent count result = await session.call_tool( "get_optimal_agent_count", arguments={"include_details": True} ) data = json.loads(result.content[0].text) print(f"Optimal agents: {data['optimal_agents']}") print(f"Critical path: {data['critical_path_hours']}h") print(f"Efficiency gain: {data['efficiency_gain_percent']}%") ``` ### 2. Inspector Client - HTTP Mode ```python async def monitor_project_resources(): client = Inspector(connection_type="http") async with client.connect(url="http://localhost:4298/mcp") as session: # Authenticate await session.call_tool("authenticate", {...}) # Periodic resource check while True: result = await session.call_tool("get_optimal_agent_count", {}) data = json.loads(result.content[0].text) if data["optimal_agents"] > current_agent_count: print(f"Consider scaling up to {data['optimal_agents']} agents") await asyncio.sleep(300) # Check every 5 minutes ``` ### 3. Autonomous Agent - Resource Self-Assessment ```python async def agent_workflow(): # Register await session.call_tool("register_agent", {...}) # Assess resources at startup result = await session.call_tool("get_optimal_agent_count", {}) data = json.loads(result.content[0].text) if data["optimal_agents"] > 1: await session.call_tool( "report_blocker", arguments={ "agent_id": "agent-1", "blocker_type": "Resource Constraint", "description": ( f"Project requires {data['optimal_agents']} agents for optimal efficiency. " f"Current: 1 agent. " f"Expected speedup: {data['efficiency_gain_percent']}%" ) } ) # Continue with task execution while True: task = await session.call_tool("request_next_task", {...}) # ... work on task ... ``` ### 4. Dynamic Scaling Decision ```python async def scaling_orchestrator(): """Orchestrator that manages agent pool size dynamically.""" while True: result = await session.call_tool("get_optimal_agent_count", {}) data = json.loads(result.content[0].text) optimal = data["optimal_agents"] current = len(active_agents) efficiency_gain = data["efficiency_gain_percent"] if optimal > current and efficiency_gain > 30: # Scale up - significant speedup possible agents_to_add = optimal - current print(f"Scaling up: adding {agents_to_add} agents") for i in range(agents_to_add): spawn_new_agent() elif optimal < current and efficiency_gain < 10: # Scale down - over-provisioned agents_to_remove = current - optimal print(f"Scaling down: removing {agents_to_remove} agents") for i in range(agents_to_remove): terminate_idle_agent() else: print(f"Current agent count ({current}) is optimal") await asyncio.sleep(600) # Check every 10 minutes ``` ## Configuration and Tuning ### Default Behavior No configuration needed - the system uses sensible defaults: ```python # Automatic configuration based on task graph # - optimal_agents: min(max_parallelism, total_tasks) # - include_details: False (for performance) # - Error handling: Graceful degradation ``` ### Performance Optimization For large projects (200+ tasks), consider: ```python # Option 1: Disable detailed analysis if not needed result = await session.call_tool( "get_optimal_agent_count", arguments={"include_details": False} # Faster ) # Option 2: Cache results for repeated queries cache_key = f"optimal_agents_{project_id}_{len(tasks)}" if cache_key in cache: return cache[cache_key] result = await session.call_tool("get_optimal_agent_count", {}) cache[cache_key] = result ``` ### Thresholds for Decision Making ```python # Example thresholds for scaling decisions SCALE_UP_EFFICIENCY_THRESHOLD = 25 # Scale up if >25% speedup possible SCALE_DOWN_EFFICIENCY_THRESHOLD = 10 # Scale down if <10% benefit MIN_AGENTS = 1 # Always have at least 1 agent MAX_AGENTS = 10 # Don't exceed budget/infrastructure limits async def apply_scaling_policy(data: Dict) -> str: optimal = data["optimal_agents"] efficiency = data["efficiency_gain_percent"] if efficiency > SCALE_UP_EFFICIENCY_THRESHOLD: return f"SCALE_UP to {min(optimal, MAX_AGENTS)} agents" elif efficiency < SCALE_DOWN_EFFICIENCY_THRESHOLD: return f"SCALE_DOWN to {max(optimal, MIN_AGENTS)} agents" else: return "MAINTAIN current agent count" ``` ## Current Implementation Pros and Cons ### Advantages #### 1. Industry-Standard Algorithm - **CPM is proven** for project scheduling and resource optimization - **Well-understood** by project managers and engineers - **Mathematically sound** critical path calculation - **Deterministic results** for reproducibility #### 2. Unified Graph Integration - **Single source of truth** - analyzes state.project_tasks directly - **No separate calculations** for subtasks vs parent tasks - **Consistent dependencies** across entire task hierarchy - **Fine-grained parallelism** detection at subtask level #### 3. Agent Autonomy - **Self-assessment capability** - agents can identify resource needs - **Proactive bottleneck detection** without human oversight - **Intelligent blocker reporting** with data-driven recommendations - **Dynamic adaptation** as project evolves #### 4. Performance and Efficiency - **O(V + E) complexity** - scales linearly with project size - **Sub-millisecond for small projects** (< 10 tasks) - **Efficient for large projects** (< 100ms for 200+ tasks) - **No external dependencies** - pure Python implementation #### 5. Clear, Actionable Insights - **Optimal agent count** with confidence - **Efficiency gain predictions** for ROI analysis - **Detailed parallel opportunities** for fine-grained optimization - **Critical path identification** for timeline prediction ### Limitations #### 1. Simplified Time Estimates - **Assumes accurate task.estimated_hours** - garbage in, garbage out - **Doesn't account for agent skill differences** - assumes uniform capability - **No consideration for task switching overhead** - overestimates efficiency - **Ignores blocked time** (waiting for reviews, approvals, etc.) #### 2. Pure Task-Level Analysis - **No resource constraints** beyond agent count (CPU, memory, API limits) - **No skill matching** - assumes any agent can do any task - **No agent availability** modeling (time zones, working hours, vacations) - **No priority consideration** - treats all tasks equally #### 3. Static Dependency Model - **Assumes dependencies don't change** during execution - **No conditional dependencies** (if X succeeds, do Y; else do Z) - **No dynamic task generation** (tasks discovered during work) - **No external dependencies** (third-party services, approvals) #### 4. No Cost Modeling - **Doesn't consider agent cost differences** (senior vs junior devs) - **No budget constraints** in recommendations - **No efficiency vs cost tradeoff** analysis - **Assumes infinite agent availability** #### 5. Limited Error Recovery - **Circular dependency detection only** - other graph issues not validated - **No automatic correction** of problematic dependencies - **Limited guidance** for fixing detected issues - **Assumes dependency system has already validated graph** ## Why This Approach Was Chosen ### Design Philosophy **"Data-Driven Resource Optimization with Agent Autonomy"** The CPM-based scheduling approach was chosen to balance several competing requirements: - **Accuracy vs. Simplicity**: CPM provides accurate critical path without complex simulation - **Speed vs. Completeness**: Linear algorithm scales well while providing essential insights - **Autonomy vs. Control**: Agents can self-assess but humans make final scaling decisions - **Determinism vs. Adaptability**: Consistent results with ability to recalculate as project evolves ### Alternative Approaches Considered #### 1. Monte Carlo Simulation **Pros**: Handles uncertainty, realistic modeling, confidence intervals **Cons**: Computationally expensive, requires probability distributions, slow **Decision**: Too slow for real-time agent decision-making #### 2. Constraint Satisfaction **Pros**: Can model complex resource constraints, optimal solutions **Cons**: NP-hard, requires solver libraries, complex configuration **Decision**: Overkill for typical project scheduling needs #### 3. Heuristic Rules **Pros**: Simple, fast, no complex algorithms **Cons**: Not optimal, hard to maintain, doesn't adapt **Decision**: Insufficient for providing confident recommendations #### 4. Machine Learning **Pros**: Learns from historical data, adapts to patterns **Cons**: Requires training data, not explainable, unpredictable **Decision**: Too complex and requires organizational data we don't have #### 5. Fixed Agent Pools **Pros**: Simple, no calculation needed, predictable **Cons**: Wasteful or insufficient, no optimization **Decision**: Defeats the purpose of intelligent resource management ### Technical Decision Rationale **Why CPM Won**: 1. **Proven Algorithm**: 60+ years of use in project management 2. **Optimal Critical Path**: Mathematically correct longest path calculation 3. **Linear Performance**: O(V + E) scales to large projects 4. **Explainable**: Can trace exactly why recommendations were made 5. **No External Dependencies**: Pure Python, no heavy libraries 6. **Agent-Friendly**: Fast enough for real-time agent self-assessment ## Future Evolution Possibilities ### Short-term Enhancements (3-6 months) #### 1. Skill-Based Scheduling ```python # Consider agent skills in parallelism calculation async def calculate_skill_aware_agents(tasks: List[Task], agents: List[Agent]) -> ProjectSchedule: """Calculate optimal agents considering skill requirements.""" # Group tasks by required skills skill_groups = group_tasks_by_skills(tasks) # Calculate parallel work per skill group optimal_by_skill = {} for skill, skill_tasks in skill_groups.items(): schedule = calculate_optimal_agents(skill_tasks) optimal_by_skill[skill] = schedule.optimal_agents # Recommend agent mix return recommend_agent_distribution(optimal_by_skill, available_agents) ``` #### 2. Cost-Benefit Analysis ```python # Add cost modeling @dataclass class CostAwareSchedule(ProjectSchedule): total_cost: float # Cost of recommended agent count cost_per_hour_saved: float # ROI metric break_even_point: int # Hours until cost pays off alternative_configurations: List # Cost/time tradeoffs ``` #### 3. Real-Time Monitoring ```python # Track actual vs predicted performance async def monitor_schedule_accuracy(): """Compare predicted vs actual completion times.""" predicted = schedule.estimated_completion_hours actual = measure_actual_completion_time() accuracy = 1.0 - abs(predicted - actual) / predicted # Feed back to improve estimates update_estimation_model(accuracy, project_characteristics) ``` ### Medium-term Evolution (6-12 months) #### 1. Dynamic Dependency Adaptation ```python # Recalculate as dependencies change async def adaptive_scheduling(): """Continuously update schedule as project evolves.""" while project_active(): # Detect dependency changes current_graph = get_current_dependencies() if current_graph != last_graph: # Recalculate optimal agents new_schedule = calculate_optimal_agents(current_tasks) if needs_scaling(new_schedule, current_agents): recommend_scaling_action(new_schedule) await asyncio.sleep(300) # Check every 5 minutes ``` #### 2. Multi-Project Resource Allocation ```python # Optimize across multiple concurrent projects async def allocate_agents_across_projects(projects: List[Project]) -> Dict: """Optimally distribute agents across multiple projects.""" # Calculate individual project needs schedules = {p.id: calculate_optimal_agents(p.tasks) for p in projects} # Solve optimization problem allocation = optimize_agent_distribution( schedules=schedules, total_agents=get_total_agent_count(), priorities=get_project_priorities() ) return allocation ``` #### 3. Predictive Bottleneck Detection ```python # Predict future bottlenecks async def predict_bottlenecks(lookahead_hours: int = 24) -> List[Dict]: """Identify likely bottlenecks before they occur.""" # Simulate project progress current_time = get_current_time() future_time = current_time + lookahead_hours # Find critical paths at future time future_tasks = project_remaining_tasks_at(future_time) future_schedule = calculate_optimal_agents(future_tasks) # Identify if we'll be under-resourced if future_schedule.optimal_agents > current_agent_count: return { "time": future_time, "needed_agents": future_schedule.optimal_agents, "current_agents": current_agent_count, "affected_tasks": get_tasks_on_critical_path() } ``` ### Long-term Vision (12+ months) #### 1. Machine Learning Integration - **Historical performance learning** to improve time estimates - **Pattern recognition** for common project structures - **Anomaly detection** for unusual resource usage - **Predictive scaling** based on project trajectory #### 2. Advanced Resource Modeling - **Multi-resource constraints** (CPU, memory, API quotas, budget) - **Time zone optimization** for global distributed teams - **Agent fatigue modeling** for realistic productivity curves - **Context switching costs** for more accurate estimates #### 3. Intelligent What-If Analysis ```python # Scenario planning async def scenario_analysis(): """Compare different resource allocation strategies.""" scenarios = [ {"agents": 1, "strategy": "sequential"}, {"agents": 3, "strategy": "parallel"}, {"agents": 5, "strategy": "aggressive"}, ] results = [] for scenario in scenarios: projected = simulate_project( tasks=project_tasks, agent_count=scenario["agents"], strategy=scenario["strategy"] ) results.append({ "scenario": scenario, "completion_time": projected.hours, "cost": projected.cost, "quality_score": projected.quality, "risk_score": projected.risk }) return recommend_best_scenario(results, optimization_goal="balanced") ``` ## Integration with Other Systems ### Cato (Learning System) **Future Integration Possibilities**: ```python # Learn optimal agent patterns from historical data class CatoEnhancedScheduler: async def calculate_optimal_agents(self, tasks: List[Task]) -> ProjectSchedule: # Get base CPM calculation base_schedule = calculate_optimal_agents(tasks) # Enhance with organizational learning historical_data = await self.cato.get_similar_projects( task_count=len(tasks), complexity=assess_complexity(tasks), tech_stack=extract_tech_stack(tasks) ) # Adjust based on actual performance adjusted_schedule = self.adjust_with_history(base_schedule, historical_data) return adjusted_schedule ``` ### Task Dependency System **Current Integration**: - Scheduler relies on validated dependency graph - Assumes circular dependencies already detected - Uses topological sort for execution order **Future Integration**: - Scheduler could provide feedback to dependency system - Identify impossible schedules early - Suggest dependency changes to improve parallelism ### Assignment System **Current Integration**: - Scheduler provides recommendations - Assignment system makes actual assignments - Separate systems with clear boundaries **Future Integration**: - Real-time coordination during assignment - Dynamic adjustment based on agent availability - Skill-based assignment guided by scheduler ## Conclusion The Optimal Agent Scheduling System represents a pragmatic approach to resource optimization that balances algorithmic rigor with practical usability. By leveraging the industry-standard Critical Path Method on Marcus's unified dependency graph, it provides accurate, explainable recommendations for agent scaling without requiring complex configuration or external dependencies. The system's integration with the unified task graph (including subtasks) enables fine-grained parallelism detection, while its MCP tool interface makes recommendations accessible to both autonomous agents and human operators. This dual accessibility supports both automated self-assessment and informed human decision-making. The CPM algorithm's O(V + E) performance ensures the system scales efficiently from small prototypes to large enterprise projects, with sub-millisecond response times for typical use cases. Combined with graceful error handling and clear, actionable output, the system provides reliable guidance for resource optimization throughout the project lifecycle. As the system evolves with skill-based scheduling, cost modeling, and Cato integration, it will become increasingly sophisticated at balancing the competing demands of speed, cost, and quality. The foundation of accurate critical path calculation and parallelism detection provides a solid base for these future enhancements while maintaining the simplicity and reliability required for production use. The scheduler's emphasis on agent autonomy—enabling agents to self-assess resource needs and proactively report constraints—represents a key step toward truly autonomous AI development teams that can optimize their own coordination without constant human oversight.