# What Happens When an Agent Requests a Task ## Internal Systems Architecture Deep Dive When an AI agent calls `request_next_task("dev-001")`, it triggers a sophisticated 8-stage orchestration involving 15+ interconnected systems that transforms a simple "give me work" request into an intelligent task assignment with contextual guidance, dependency awareness, risk analysis, and learning integration. This document explains the internal complexity behind Marcus's AI-powered task coordination. --- ## 🎯 **The Complete Flow Overview** ``` Agent Request β†’ Conversation Log β†’ State Refresh β†’ Safety Filtering β†’ AI Analysis β†’ Context Building β†’ Assignment Lease β†’ Instruction Generation ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ [Task Tool] [Logging Sys] [Project Mgmt] [Phase Control] [AI Engine] [Context Sys] [Lease Mgmt] [Memory Sys] [Event Sys] [Kanban Integ] [Safety Checks] [Task Match] [Dependencies] [Persistence] [Predictions] ``` **Result**: An intelligently selected task with comprehensive context, dependency awareness, risk predictions, time estimates, and adaptive instructions tailored to the specific agent and project state. --- ## πŸ“‹ **Stage 1: Request Intake & System Coordination** **System**: `21-agent-coordination.md` (Agent Coordination) + `02-logging-system.md` (Conversation Logging) ### What Is a Task Request? A task request isn't just "give me work" - it's an agent announcing **availability** and asking Marcus to make an **optimal assignment decision** based on current project state, agent capabilities, and intelligent coordination. ### What Happens: #### 1. **Conversation & Event Logging** Marcus logs this as a multi-directional conversation: ```python # Agent β†’ Marcus communication conversation_logger.log_worker_message( agent_id="dev-001", direction="to_pm", message="Requesting next task", metadata={"worker_info": "Worker dev-001 requesting task"} ) # System event for coordination state.log_event( event_type="task_request", data={ "worker_id": "dev-001", "source": "dev-001", "target": "marcus", "timestamp": "2025-09-05T14:30:00Z" } ) # Visualization event for real-time monitoring log_agent_event("task_request", {"worker_id": "dev-001"}) ``` **Why this triple logging exists**: - **Conversation Log**: Tracks the communication flow for debugging coordination issues - **System Event**: Triggers other systems (monitoring, analytics, capacity planning) - **Visualization Event**: Updates real-time dashboards showing agent activity #### 2. **Kanban System Initialization** Marcus ensures the project board is ready: ```python await state.initialize_kanban() # Connect to Planka/Linear/GitHub Projects ``` **What initialization does**: - Connects to the configured Kanban provider (Planka, Linear, GitHub Projects) - Validates API credentials and board access - Syncs any external changes to the board since last check - Ensures Marcus has current task status and assignments ### Data Created: ```json { "request_id": "req_2025_1452_dev001", "agent_id": "dev-001", "request_type": "task_request", "timestamp": "2025-09-05T14:30:00Z", "system_events": ["conversation_logged", "system_event_fired", "visualization_updated"], "kanban_status": "connected" } ``` --- ## πŸ”„ **Stage 2: Project State Analysis & Refresh** **System**: `16-project-management.md` (Project Management) + `04-kanban-integration.md` (Kanban Integration) ### What Is Project State Refresh? Before making assignment decisions, Marcus needs a **current snapshot** of the entire project: what tasks exist, what's completed, what's in progress, who's working on what, and what dependencies have changed. ### What Happens: #### 1. **Marcus Internal Reasoning** Marcus logs its own "thinking" process: ```python log_thinking("marcus", "Need to check current project state") # Then analyzes the specific agent: agent = state.agent_status.get("dev-001") if agent: log_thinking( "marcus", f"Finding optimal task for {agent.name}", { "agent_skills": ["Python", "FastAPI", "PostgreSQL"], "current_workload": len(agent.current_tasks), # How many tasks already assigned "performance_history": agent.performance_score } ) ``` **What "Marcus thinking" captures**: These logs show the internal reasoning process that Marcus's AI goes through, so developers can understand WHY certain decisions were made later during debugging. #### 2. **Project State Synchronization** ```python await state.refresh_project_state() ``` **What refresh_project_state() does**: - Pulls latest task status from Kanban board (in case external changes happened) - Updates internal task cache with current assignments - Identifies any tasks that may have been completed outside Marcus - Checks for new tasks added externally - Validates that dependencies are still accurate - Updates project completion metrics ### Internal State Updated: ```python project_state = { "total_tasks": 47, "completed_tasks": 23, "in_progress_tasks": 8, "blocked_tasks": 3, "available_tasks": 13, "active_agents": ["dev-001", "dev-002", "design-001"], "last_synced": "2025-09-05T14:30:15Z" } ``` --- ## πŸ›‘οΈ **Stage 3: Multi-Layer Safety Filtering** **System**: Custom Task Assignment Logic + `23-task-management-intelligence.md` (Task Intelligence) ### What Is Safety Filtering? Marcus doesn't just assign any available task - it applies **multiple intelligence layers** to prevent illogical assignments that could break the project or waste agent time. ### What Happens: #### 1. **Assignment Conflict Prevention** ```python # Get all currently assigned tasks to prevent duplicates all_assigned_ids = set() for other_agent_id, assignment in state.assignment_persistence.get_all_assignments().items(): all_assigned_ids.add(assignment["task_id"]) # Filter out already assigned tasks available_tasks = [ task for task in state.project_tasks if task.status == TaskStatus.TODO and task.id not in all_assigned_ids ] ``` **Why this prevents chaos**: Without this check, multiple agents could work on the same task simultaneously, creating conflicts and wasted effort. #### 2. **Phase-Based Constraint Enforcement** Marcus uses an **Enhanced Task Classifier** and **Phase Enforcer** to prevent illogical task ordering: ```python # Classify each task by type task_type = classifier.classify(task) # DESIGN, IMPLEMENTATION, TESTING, DEPLOYMENT task_phase = phase_enforcer._get_task_phase(task_type) # Check phase ordering within features if task.labels and ip_task.labels: # If tasks share feature labels shared_labels = set(task.labels) & set(ip_task.labels) if shared_labels: # Same feature if phase_enforcer._should_depend_on_phase(task_phase, ip_phase): # This phase should wait for the in-progress phase to complete phase_allowed = False ``` **Real example**: If someone is working on "User Authentication API Implementation" (IMPLEMENTATION phase), Marcus won't assign "User Authentication Testing" (TESTING phase) to another agent until the implementation is done. #### 3. **Deployment Task Deprioritization** Marcus actively **avoids assigning deployment tasks** unless no other work is available: ```python deployment_keywords = ["deploy", "release", "production", "launch", "rollout"] # Separate tasks into deployment vs non-deployment for task in available_tasks: is_deployment = any( keyword in task.name.lower() or keyword in task.labels for keyword in deployment_keywords ) if is_deployment: deployment_tasks.append(task) else: non_deployment_tasks.append(task) # Prefer non-deployment tasks available_tasks = non_deployment_tasks if non_deployment_tasks else deployment_tasks ``` **Why deployment avoidance**: Deployment tasks are typically reserved for when most development is complete, and they require special coordination. ### Safety Filter Results: ```python { "initial_tasks": 13, "after_assignment_check": 11, # 2 already assigned to other agents "after_phase_filtering": 8, # 3 blocked by phase constraints "after_deployment_filter": 7, # 1 deployment task deprioritized "final_eligible_tasks": 7 } ``` --- ## 🧠 **Stage 4: AI-Powered Task Selection** **System**: `07-ai-intelligence-engine.md` (AI Engine) + Custom AI Task Assignment Engine ### What Is AI-Powered Task Selection? Instead of simple skill matching, Marcus uses a **4-phase AI analysis** to find the truly optimal task based on complex factors that humans would struggle to evaluate simultaneously. ### What Happens: #### **Phase 1: Safety Validation** AI validates that tasks are logically safe to assign: ```python safe_tasks = await self._filter_safe_tasks(available_tasks) # For deployment tasks, AI checks: if self._is_deployment_task(task): if not await self._are_dependencies_complete(task): # AI analysis: "Dependencies incomplete" continue # Additional AI safety check safety_check = await ai_engine.check_deployment_safety(task, project_tasks) if not safety_check.get("safe", False): # AI reasoning: "Database migrations not tested" continue ``` #### **Phase 2: Dependency Impact Analysis** AI analyzes how completing this task affects the rest of the project: ```python dependency_scores = await self._analyze_dependencies(safe_tasks) for task in safe_tasks: # Count tasks this would unblock unblocked_count = sum( 1 for other_task in project_tasks if task.id in other_task.dependencies and other_task.status == TaskStatus.TODO ) # Check if task is on critical path critical_path = dependency_graph.get_critical_path() is_critical = task.id in critical_path # Calculate dependency score score = unblocked_count * 0.5 + (0.5 if is_critical else 0) ``` **What this achieves**: Prioritizes tasks that will unblock the most other work and keep the project moving forward efficiently. #### **Phase 3: Agent-Task Matching Intelligence** AI evaluates agent suitability for each task: ```python ai_scores = await self._get_ai_recommendations(safe_tasks, agent_info) context = AssignmentContext( task=task, agent_id="dev-001", agent_status=agent_info, available_tasks=safe_tasks, project_context={ "total_tasks": 47, "completed_tasks": 23, "project_phase": "development" # vs planning, testing, deployment } ) ai_analysis = await ai_engine.analyze_task_assignment(context) score = ai_analysis.get("suitability_score", 0.5) * ai_analysis.get("confidence", 1.0) ``` **AI evaluates**: - Skill alignment with task requirements - Agent's current workload and capacity - Task complexity vs agent experience level - Project phase appropriateness - Historical performance on similar tasks #### **Phase 4: Predictive Impact Assessment** AI predicts the consequences of this assignment: ```python impact_scores = await self._predict_task_impact(safe_tasks) # Predictions include: # - Timeline impact if this task is delayed # - Resource utilization optimization # - Risk of blocking critical path # - Effect on team productivity patterns ``` ### AI Decision Result: ```python { "selected_task": "task_015_user_auth_api", "ai_confidence": 0.87, "selection_reasoning": "High skill match, unblocks 3 frontend tasks, on critical path", "phase_scores": { "safety": 1.0, # Completely safe to assign "dependency": 0.8, # High impact on project flow "agent_match": 0.9, # Excellent skill alignment "impact": 0.7 # Good timeline optimization }, "alternatives_considered": 6 } ``` --- ## πŸ“Š **Stage 5: Context & Dependency Analysis** **System**: `03-context-dependency-system.md` (Context & Dependency) + `42-code-analysis-system.md` (Code Analysis) ### What Is Context Building? Marcus doesn't just assign a task - it provides **rich contextual information** to help the agent understand how their work fits into the bigger picture and what decisions will affect future tasks. ### What Happens: #### 1. **Implementation Context Gathering** For GitHub-integrated projects, Marcus analyzes existing code: ```python if state.provider == "github" and state.code_analyzer: owner = os.getenv("GITHUB_OWNER") repo = os.getenv("GITHUB_REPO") impl_details = await state.code_analyzer.get_implementation_details( optimal_task.dependencies, owner, repo ) ``` **What this discovers**: - How similar features were implemented previously - Existing API patterns and interfaces to follow - Code structure and naming conventions - Integration points with existing systems #### 2. **Dependency Relationship Analysis** Marcus builds a complete picture of how this task relates to others: ```python # Analyze dependencies across the entire project dep_map = await state.context.analyze_dependencies(state.project_tasks) # For each task that depends on the current task for dep_task_id in dep_map[optimal_task.id]: dep_task = find_task_by_id(dep_task_id) # AI infers what the dependent task needs expected_interface = state.context.infer_needed_interface(dep_task, optimal_task.id) # Add to context state.context.add_dependency(optimal_task.id, DependentTask( task_id=dep_task.id, task_name=dep_task.name, expected_interface=expected_interface # "REST API with /auth endpoints" )) ``` #### 3. **Dependency Awareness Generation** Marcus creates human-readable guidance about future impact: ```python if task_context.dependent_tasks: dep_count = len(task_context.dependent_tasks) dep_list = "\n".join([ f"- {dt['task_name']} (needs: {dt['expected_interface']})" for dt in task_context.dependent_tasks[:3] # Show first 3 ]) dependency_awareness = f"{dep_count} future tasks depend on your work:\n{dep_list}" ``` ### Context Data Structure: ```python context_data = { "previous_implementations": [ { "file_path": "src/auth/oauth.py", "pattern": "OAuth2 implementation with JWT tokens", "interfaces": ["POST /auth/login", "GET /auth/verify"] } ], "dependent_tasks": [ { "task_id": "task_020", "task_name": "Frontend Login Component", "expected_interface": "POST /auth/login endpoint with email/password" }, { "task_id": "task_025", "task_name": "Mobile Authentication", "expected_interface": "JWT token response format" } ], "architectural_decisions": [ { "decision": "Use JWT for stateless auth", "rationale": "Mobile apps need offline token validation", "impact": "All API endpoints must validate JWT" } ] } ``` --- ## πŸ” **Stage 6: Assignment Lease Creation** **System**: `35-assignment-lease-system.md` (Assignment Lease System) ### What Is an Assignment Lease? An assignment lease is a **time-bound contract** between Marcus and the agent. It prevents tasks from getting "stuck" with unresponsive agents and provides automatic recovery mechanisms. ### What Happens: #### 1. **Adaptive Duration Calculation** Marcus calculates how long this agent should have to complete this task: ```python def calculate_adaptive_duration(self, task: Task) -> float: base_hours = 2.0 # Default lease time # Apply priority multiplier priority_mult = { "urgent": 0.5, # Urgent tasks get shorter leases (faster escalation) "high": 0.8, "medium": 1.0, "low": 1.5 # Low priority gets more time }.get(task.priority.value.lower(), 1.0) base_hours *= priority_mult # Apply complexity multiplier based on labels if "complex" in task.labels: base_hours *= 2.0 # Complex tasks get more time if "simple" in task.labels: base_hours *= 0.5 # Simple tasks should be quick return max(0.5, min(base_hours, 24.0)) # Between 30 minutes and 24 hours ``` #### 2. **Lease Object Creation** ```python assignment_lease = AssignmentLease( task_id="task_015_user_auth_api", agent_id="dev-001", created_at=datetime.now(), expires_at=datetime.now() + timedelta(hours=3.2), # Calculated duration duration_hours=3.2, renewal_count=0, metadata={ "task_complexity": "medium", "agent_performance": 1.0, "priority_factor": 0.8 } ) ``` #### 3. **Background Monitoring Setup** Marcus starts monitoring this lease: ```python # LeaseMonitor checks every 60 seconds for: # - Tasks approaching expiration (30 min warning) # - Expired tasks needing recovery # - Stuck tasks requiring intervention ``` **Why leases matter**: - **Automatic Recovery**: If an agent goes offline, their tasks get automatically reassigned - **Progress Accountability**: Agents must show progress or lose the assignment - **Capacity Planning**: Marcus knows exactly what's assigned and for how long - **Quality Control**: Tasks can't sit indefinitely without progress ### Assignment Lease Data: ```json { "task_id": "task_015_user_auth_api", "agent_id": "dev-001", "created_at": "2025-09-05T14:30:45Z", "expires_at": "2025-09-05T17:49:45Z", "duration_hours": 3.2, "renewable": true, "warning_threshold": "2025-09-05T17:19:45Z", "monitoring_active": true } ``` --- ## 🧠 **Stage 7: Memory Integration & Predictive Analysis** **System**: `01-memory-system.md` (Multi-Tier Memory) + `17-learning-systems.md` (Learning Systems) ### What Is Memory Integration? Marcus uses its **four-tier memory system** to learn from every task assignment and provide predictions about how this assignment will go. ### What Happens: #### 1. **Task Outcome Prediction** Marcus predicts how likely this assignment is to succeed: ```python basic_prediction = await state.memory.predict_task_outcome("dev-001", optimal_task) # Based on: # - Agent's historical performance on similar tasks # - Task complexity vs agent skill level # - Current project phase and pressure # - Similar task outcomes in memory prediction = { "success_probability": 0.85, # 85% chance of successful completion "confidence": 0.92, # 92% confident in this prediction "risk_factors": ["complex_integration", "tight_timeline"] } ``` #### 2. **Completion Time Estimation** Marcus predicts how long this will actually take: ```python completion_time = await state.memory.predict_completion_time("dev-001", optimal_task) time_estimate = { "expected_hours": 2.8, "confidence_interval": {"lower": 1.5, "upper": 4.2}, "factors": ["authentication complexity", "API design decisions", "testing requirements"], "historical_basis": "Similar auth tasks took 2.1-3.4 hours for this agent" } ``` #### 3. **Blockage Risk Analysis** Marcus predicts what might go wrong: ```python blockage_analysis = await state.memory.predict_blockage_probability("dev-001", optimal_task) blockage_risk = { "overall_risk": 0.3, # 30% chance of encountering blockers "risk_breakdown": { "technical_dependencies": 0.15, # External API issues "requirement_clarity": 0.08, # Unclear specifications "integration_complexity": 0.12 # Existing system conflicts }, "preventive_measures": [ "Review existing OAuth implementation patterns", "Validate API endpoints with frontend team early" ] } ``` #### 4. **Cascade Effect Prediction** Marcus predicts how delays in this task would affect others: ```python cascade_effects = await state.memory.predict_cascade_effects(optimal_task.id, potential_delay=0.6) cascade_impact = { "critical_path_impact": True, "affected_tasks": ["task_020_frontend_login", "task_025_mobile_auth"], "delay_multiplier": 1.3, # 1 hour delay causes 1.3 hours total project delay "mitigation_options": ["Parallel frontend mockup development"] } ``` #### 5. **Agent Performance Trajectory** Marcus analyzes how this assignment fits the agent's development: ```python performance_trajectory = await state.memory.calculate_agent_performance_trajectory("dev-001") trajectory = { "improving_skills": {"Python": 0.15, "API_design": 0.23}, # Getting better "skill_trends": "Strong upward trend in backend development", "recommendations": ["Good opportunity to cement authentication expertise"] } ``` #### 6. **Memory Recording** Marcus records this assignment for future learning: ```python await state.memory.record_task_start("dev-001", optimal_task) # Updates: # - Working Memory: Current agent assignments # - Episodic Memory: This specific assignment event # - Semantic Memory: Patterns about auth tasks # - Procedural Memory: Assignment process effectiveness ``` ### Memory & Prediction Data: ```python { "predictions": { "success_probability": 0.85, "expected_completion_hours": 2.8, "blockage_risk": 0.3, "cascade_risk": "medium", "skill_development_opportunity": "high" }, "memory_updates": { "working_memory": "agent_dev-001_assigned_task_015", "episodic_event": "task_assignment_2025_09_05_1430", "semantic_pattern": "auth_task_backend_developer", "procedural_reinforcement": "ai_assignment_success" } } ``` --- ## πŸ“ **Stage 8: Intelligent Instruction Generation** **System**: `07-ai-intelligence-engine.md` (AI Engine) + Custom Tiered Instruction Builder ### What Is Intelligent Instruction Generation? Marcus doesn't just say "do this task" - it generates **contextually adaptive instructions** with 6 layers of intelligence based on the task complexity, dependencies, and agent needs. ### What Happens: #### 1. **Base Instruction Generation** Marcus uses AI to create task-specific instructions: ```python base_instructions = await state.ai_engine.generate_task_instructions( optimal_task, state.agent_status.get("dev-001") ) # AI considers: # - Task description and requirements # - Agent's skill level and experience # - Current project context and standards # - Previous implementations and patterns ``` #### 2. **Tiered Instruction Building** Marcus builds **6 layers** of increasingly sophisticated guidance: **Layer 1: Base Instructions** (always included) ``` "Implement user authentication API endpoints using OAuth2 with JWT tokens..." ``` **Layer 2: Implementation Context** (if previous work exists) ``` "πŸ“š IMPLEMENTATION CONTEXT: 2 relevant implementations found. Use these patterns and interfaces to maintain consistency." ``` **Layer 3: Dependency Awareness** (if other tasks depend on this) ``` "πŸ”— DEPENDENCY AWARENESS: 3 future tasks depend on your work: - Frontend Login Component (needs: POST /auth/login endpoint) - Mobile Authentication (needs: JWT token response format) - User Profile Service (needs: User ID extraction from JWT)" ``` **Layer 4: Decision Logging** (if task has high impact) ``` "πŸ“ ARCHITECTURAL DECISIONS: This task has significant downstream impact. When making technical choices: Use: 'Marcus, log decision: I chose [WHAT] because [WHY]. This affects [IMPACT].' Example: 'I chose JWT tokens because mobile apps need stateless auth. This affects all API endpoints.'" ``` **Layer 5: Predictions & Insights** (if available) ``` "⚑ PREDICTIONS & INSIGHTS: ⏱️ Expected duration: 2.8 hours (1.5-4.2 hours) ⚠️ High blockage risk: 30% β€’ technical_dependencies: 15% chance β€’ integration_complexity: 12% chance πŸ’‘ Prevention tips: β€’ Review existing OAuth implementation patterns β€’ Validate API endpoints with frontend team early πŸ“ˆ You're improving in API_design - great opportunity to excel!" ``` **Layer 6: Task-Specific Guidance** (based on task labels) ``` "πŸ’‘ TASK-SPECIFIC GUIDANCE: πŸ”’ Security Guidelines: Follow OWASP best practices, implement proper validation, use secure defaults 🌐 API Guidelines: Follow RESTful conventions, include proper error handling, document response formats" ``` ### Final Instruction Package: ```python { "task_assignment": { "task_id": "task_015_user_auth_api", "task_name": "User Authentication API", "instructions": "Implement user authentication...[6 layers of guidance]", "context_layers": 6, "estimated_reading_time": "3-5 minutes", "instruction_complexity": "comprehensive" }, "assignment_metadata": { "lease_duration": "3.2 hours", "renewable": True, "progress_expected": "25% increments", "decision_logging_encouraged": True } } ``` --- ## πŸ’Ύ **Data Persistence Across Systems** ### What Gets Stored Where: ``` data/assignments/assignments.json ← Active task assignments with leases data/marcus_state/memory/ ← Learning patterns from task assignments data/marcus_state/context/ ← Task dependencies and relationships data/audit_logs/ ← Complete audit trail of assignment process data/token_usage.json ← AI costs for assignment analysis ``` ### System State Changes: - **Assignment Registry**: `35-assignment-lease-system.md` creates time-bound assignment - **Agent Status**: `21-agent-coordination.md` updates agent's current_tasks list - **Memory System**: `01-memory-system.md` records assignment for learning - **Context System**: `03-context-dependency-system.md` tracks task relationships - **Monitoring**: `41-assignment-monitor.md` begins tracking assignment health --- ## πŸ”„ **Why This Complexity Matters** ### **Without This Orchestration:** - Simple task queue: "Here's the next task in the list" - No coordination: Multiple agents working on conflicting tasks - No context: Agents making decisions in isolation - No learning: Same mistakes repeated on every project - No recovery: Tasks stuck forever if agents go offline ### **With Marcus:** - **AI-Powered Selection**: Best task for this specific agent at this specific time - **Conflict Prevention**: Sophisticated safety checks prevent logical errors - **Rich Context**: Agents understand how their work affects the broader project - **Predictive Intelligence**: Risk analysis and time estimation based on learned patterns - **Automatic Recovery**: Time-bound leases with automatic reassignment - **Continuous Learning**: Every assignment improves future coordination ### **The Result:** A single `request_next_task()` call triggers sophisticated AI analysis, multi-system coordination, predictive modeling, contextual guidance generation, and learning integrationβ€”transforming a simple "give me work" request into an intelligent project coordination decision that optimizes for both immediate productivity and long-term project success. --- ## 🎯 **Key Takeaway** **Task assignment isn't just "match skills to work"**β€”it's a sophisticated AI-powered coordination process involving safety analysis, dependency intelligence, contextual guidance, predictive modeling, time-bound accountability, and continuous learning. This is why Marcus can effectively coordinate complex multi-agent work: every task assignment is an intelligent decision that considers the agent, the project, the dependencies, the risks, and the learning opportunities all simultaneously.