src.integrations.ai_analysis_engine module#
AI-powered analysis and decision engine for Marcus.
This module provides intelligent task assignment, blocker resolution, and project risk analysis using Claude API. It includes comprehensive fallback mechanisms for when the AI service is unavailable.
The engine helps with: - Optimal task-to-agent matching based on skills and capacity - Generating detailed task instructions - Analyzing and resolving blockers - Identifying project risks and mitigation strategies
Examples
>>> engine = AIAnalysisEngine()
>>> await engine.initialize()
>>> task = await engine.match_task_to_agent(tasks, agent, project_state)
>>> instructions = await engine.generate_task_instructions(task, agent)
- class src.integrations.ai_analysis_engine.AIAnalysisEngine[source]#
Bases:
objectAI-powered analysis and decision engine using Claude API.
This class provides intelligent analysis for project management decisions, including task assignment optimization, blocker resolution, and risk analysis. It gracefully falls back to rule-based approaches when AI is unavailable.
- client#
Anthropic API client, None if unavailable
- Type:
Optional[anthropic.Anthropic]
Examples
>>> engine = AIAnalysisEngine() >>> await engine.initialize() >>> # Engine is ready for analysis tasks
Notes
Requires an Anthropic API key in
config_marcus.json(ai.anthropic_api_key) or via theCLAUDE_API_KEYenv var.ANTHROPIC_API_KEYis intentionally NOT consulted: setting it in the shell switches Claude Code from subscription to API billing, so Marcus usesCLAUDE_API_KEYto leave Claude Codeβs auth untouched. Works in fallback mode without the API key.- __init__()[source]#
Initialize the AI Analysis Engine.
Attempts to set up the Anthropic client with various compatibility approaches for different library versions.
- Return type:
None
- async initialize()[source]#
Initialize the AI engine and test connectivity.
Verifies the Anthropic client can communicate with the API by sending a test message. Disables the client if the test fails.
Examples
>>> engine = AIAnalysisEngine() >>> await engine.initialize() β AI Engine connection verified
- Return type:
- async match_task_to_agent(available_tasks, agent, project_state)[source]#
Find the optimal task for an agent using AI analysis.
- Parameters:
available_tasks (
List[Task]) β List of unassigned tasks to choose fromagent (
WorkerStatus) β Agent profile including skills and capacityproject_state (
ProjectState) β Current state of the project
- Returns:
The best matching task, or None if no suitable task found
- Return type:
Examples
>>> task = await engine.match_task_to_agent( ... tasks, agent, ProjectState.HEALTHY ... ) >>> print(f"Assigned: {task.name} to {agent.name}")
Notes
Falls back to skill-based matching if AI is unavailable. Considers up to 10 tasks to avoid context limits.
- async generate_task_instructions(task, agent=None)[source]#
Generate detailed instructions for a task.
- Parameters:
task (
Task) β The task to generate instructions foragent (
Optional[WorkerStatus]) β The agent assigned to the task
- Returns:
Detailed task instructions formatted as markdown
- Return type:
Examples
>>> instructions = await engine.generate_task_instructions(task, agent) >>> print(instructions) ## Task Assignment for Alice ...
Notes
Uses AI to generate context-aware instructions when available, otherwise provides structured fallback instructions.
- async analyze_blocker(task_id, description, severity, agent=None, task=None)[source]#
Analyze a blocker and suggest resolution steps.
- Parameters:
task_id (
str) β ID of the blocked taskdescription (
str) β Detailed description of the blockerseverity (
str) β Severity level (low, medium, high, urgent)agent (
Optional[WorkerStatus]) β Agent who reported the blocker (for context-aware suggestions)task (
Optional[Task]) β Full task details (for enhanced analysis)
- Returns:
Analysis results including: - root_cause: Identified cause - impact_assessment: Impact description - resolution_steps: List of steps tailored to agent skills - required_resources: Needed resources - estimated_hours: Time to resolve - escalation_needed: Boolean - prevention_measures: Future prevention steps - learning_opportunities: Skills/knowledge gaps identified - recommended_collaborators: Team members who could help
- Return type:
Examples
>>> analysis = await engine.analyze_blocker( ... "TASK-123", ... "Database connection timeout", ... "high", ... agent=worker_status, ... task=task_obj ... ) >>> print(analysis['resolution_steps']) ['Check database server status', ...]
- async generate_clarification(task, question, context='')[source]#
Generate clarification for a task-related question.
- Parameters:
- Returns:
Clarification response
- Return type:
Examples
>>> clarification = await engine.generate_clarification( ... task, ... "What database should I use?", ... "Working on user authentication" ... )
- async analyze_project_risks(project_state, recent_blockers, team_status)[source]#
Analyze and identify project risks.
- Parameters:
project_state (
ProjectState) β Current project staterecent_blockers (
List[BlockerReport]) β Recent blockers encounteredteam_status (
List[WorkerStatus]) β Current team member status
- Returns:
Identified risks with mitigation strategies
- Return type:
Examples
>>> risks = await engine.analyze_project_risks( ... ProjectState.AT_RISK, ... recent_blockers, ... team_status ... ) >>> for risk in risks: ... print(f"{risk.description}: {risk.mitigation}")
Notes
Analyzes up to 10 recent blockers to identify patterns. Falls back to basic risk assessment if AI unavailable.
- async analyze_project_health(project_state, recent_activities, team_status)[source]#
Analyze overall project health using AI.
- Parameters:
- Returns:
Project health analysis with overall health, timeline prediction, risk factors, recommendations, and resource optimization
- Return type:
- async analyze_feature_request(feature_description)[source]#
Analyze a feature request and generate appropriate tasks.
- Parameters:
feature_description (
str) β Natural language description of the feature to implement- Returns:
Dictionary containing required tasks with details
- Return type:
Examples
>>> result = await engine.analyze_feature_request("Add user profile page") >>> tasks = result["required_tasks"]
- async analyze_integration_points(feature_tasks, existing_tasks)[source]#
Analyze how new feature tasks should integrate with existing project tasks.
- Parameters:
- Returns:
Integration analysis including dependencies and phase
- Return type:
Examples
>>> integration = await engine.analyze_integration_points( ... new_tasks, project_tasks ... ) >>> phase = integration["suggested_phase"]
- async generate_structured_response(prompt, system_prompt='', response_format=None, *, operation=None)[source]#
Generate a structured JSON response from the AI based on a schema.
This method is used for task decomposition and other operations requiring structured output. It only works when an AI provider is configured.
- Parameters:
- Returns:
Structured response matching the schema
- Return type:
- Raises:
RuntimeError β If no AI client is available (no API key configured)
Examples
>>> result = await engine.generate_structured_response( ... prompt="Break down this task", ... system_prompt="You are a task decomposition expert", ... response_format={"type": "object", "properties": {...}} ... )