src.integrations.ai_analysis_engine module#

AI-powered analysis and decision engine for Marcus.

This module provides intelligent task assignment, blocker resolution, and project risk analysis using Claude API. It includes comprehensive fallback mechanisms for when the AI service is unavailable.

The engine helps with: - Optimal task-to-agent matching based on skills and capacity - Generating detailed task instructions - Analyzing and resolving blockers - Identifying project risks and mitigation strategies

Examples

>>> engine = AIAnalysisEngine()
>>> await engine.initialize()
>>> task = await engine.match_task_to_agent(tasks, agent, project_state)
>>> instructions = await engine.generate_task_instructions(task, agent)
class src.integrations.ai_analysis_engine.AIAnalysisEngine[source]#

Bases: object

AI-powered analysis and decision engine using Claude API.

This class provides intelligent analysis for project management decisions, including task assignment optimization, blocker resolution, and risk analysis. It gracefully falls back to rule-based approaches when AI is unavailable.

client#

Anthropic API client, None if unavailable

Type:

Optional[anthropic.Anthropic]

model#

Claude model to use for analysis

Type:

str

prompts#

Template prompts for different analysis types

Type:

Dict[str, str]

Examples

>>> engine = AIAnalysisEngine()
>>> await engine.initialize()
>>> # Engine is ready for analysis tasks

Notes

Requires an Anthropic API key in config_marcus.json (ai.anthropic_api_key) or via the CLAUDE_API_KEY env var. ANTHROPIC_API_KEY is intentionally NOT consulted: setting it in the shell switches Claude Code from subscription to API billing, so Marcus uses CLAUDE_API_KEY to leave Claude Code’s auth untouched. Works in fallback mode without the API key.

__init__()[source]#

Initialize the AI Analysis Engine.

Attempts to set up the Anthropic client with various compatibility approaches for different library versions.

Return type:

None

async initialize()[source]#

Initialize the AI engine and test connectivity.

Verifies the Anthropic client can communicate with the API by sending a test message. Disables the client if the test fails.

Examples

>>> engine = AIAnalysisEngine()
>>> await engine.initialize()
βœ… AI Engine connection verified
Return type:

None

async match_task_to_agent(available_tasks, agent, project_state)[source]#

Find the optimal task for an agent using AI analysis.

Parameters:
  • available_tasks (List[Task]) – List of unassigned tasks to choose from

  • agent (WorkerStatus) – Agent profile including skills and capacity

  • project_state (ProjectState) – Current state of the project

Returns:

The best matching task, or None if no suitable task found

Return type:

Optional[Task]

Examples

>>> task = await engine.match_task_to_agent(
...     tasks, agent, ProjectState.HEALTHY
... )
>>> print(f"Assigned: {task.name} to {agent.name}")

Notes

Falls back to skill-based matching if AI is unavailable. Considers up to 10 tasks to avoid context limits.

async generate_task_instructions(task, agent=None)[source]#

Generate detailed instructions for a task.

Parameters:
  • task (Task) – The task to generate instructions for

  • agent (Optional[WorkerStatus]) – The agent assigned to the task

Returns:

Detailed task instructions formatted as markdown

Return type:

str

Examples

>>> instructions = await engine.generate_task_instructions(task, agent)
>>> print(instructions)
## Task Assignment for Alice
...

Notes

Uses AI to generate context-aware instructions when available, otherwise provides structured fallback instructions.

async analyze_blocker(task_id, description, severity, agent=None, task=None)[source]#

Analyze a blocker and suggest resolution steps.

Parameters:
  • task_id (str) – ID of the blocked task

  • description (str) – Detailed description of the blocker

  • severity (str) – Severity level (low, medium, high, urgent)

  • agent (Optional[WorkerStatus]) – Agent who reported the blocker (for context-aware suggestions)

  • task (Optional[Task]) – Full task details (for enhanced analysis)

Returns:

Analysis results including: - root_cause: Identified cause - impact_assessment: Impact description - resolution_steps: List of steps tailored to agent skills - required_resources: Needed resources - estimated_hours: Time to resolve - escalation_needed: Boolean - prevention_measures: Future prevention steps - learning_opportunities: Skills/knowledge gaps identified - recommended_collaborators: Team members who could help

Return type:

Dict[str, Any]

Examples

>>> analysis = await engine.analyze_blocker(
...     "TASK-123",
...     "Database connection timeout",
...     "high",
...     agent=worker_status,
...     task=task_obj
... )
>>> print(analysis['resolution_steps'])
['Check database server status', ...]
async generate_clarification(task, question, context='')[source]#

Generate clarification for a task-related question.

Parameters:
  • task (Task) – The task in question

  • question (str) – The question needing clarification

  • context (str) – Additional context for the question

Returns:

Clarification response

Return type:

str

Examples

>>> clarification = await engine.generate_clarification(
...     task,
...     "What database should I use?",
...     "Working on user authentication"
... )
async analyze_project_risks(project_state, recent_blockers, team_status)[source]#

Analyze and identify project risks.

Parameters:
Returns:

Identified risks with mitigation strategies

Return type:

List[ProjectRisk]

Examples

>>> risks = await engine.analyze_project_risks(
...     ProjectState.AT_RISK,
...     recent_blockers,
...     team_status
... )
>>> for risk in risks:
...     print(f"{risk.description}: {risk.mitigation}")

Notes

Analyzes up to 10 recent blockers to identify patterns. Falls back to basic risk assessment if AI unavailable.

async analyze_project_health(project_state, recent_activities, team_status)[source]#

Analyze overall project health using AI.

Parameters:
  • project_state (ProjectState) – Current project state

  • recent_activities (List[Dict[str, Any]]) – Recent project activities/events

  • team_status (Any) – Current team status information (can be dict or list)

Returns:

Project health analysis with overall health, timeline prediction, risk factors, recommendations, and resource optimization

Return type:

Dict[str, Any]

async analyze_feature_request(feature_description)[source]#

Analyze a feature request and generate appropriate tasks.

Parameters:

feature_description (str) – Natural language description of the feature to implement

Returns:

Dictionary containing required tasks with details

Return type:

Dict[str, Any]

Examples

>>> result = await engine.analyze_feature_request("Add user profile page")
>>> tasks = result["required_tasks"]
async analyze_integration_points(feature_tasks, existing_tasks)[source]#

Analyze how new feature tasks should integrate with existing project tasks.

Parameters:
  • feature_tasks (List[Task]) – Tasks for the new feature

  • existing_tasks (List[Task]) – Current project tasks

Returns:

Integration analysis including dependencies and phase

Return type:

Dict[str, Any]

Examples

>>> integration = await engine.analyze_integration_points(
...     new_tasks, project_tasks
... )
>>> phase = integration["suggested_phase"]
async generate_structured_response(prompt, system_prompt='', response_format=None, *, operation=None)[source]#

Generate a structured JSON response from the AI based on a schema.

This method is used for task decomposition and other operations requiring structured output. It only works when an AI provider is configured.

Parameters:
  • prompt (str) – The user prompt describing what to generate

  • system_prompt (str) – System instructions for the AI

  • response_format (Optional[Dict[str, Any]]) – JSON schema describing expected response structure

  • operation (str | None)

Returns:

Structured response matching the schema

Return type:

Dict[str, Any]

Raises:

RuntimeError – If no AI client is available (no API key configured)

Examples

>>> result = await engine.generate_structured_response(
...     prompt="Break down this task",
...     system_prompt="You are a task decomposition expert",
...     response_format={"type": "object", "properties": {...}}
... )