"""
AI-powered analysis and decision engine for Marcus.
This module provides intelligent task assignment, blocker resolution, and project
risk analysis using Claude API. It includes comprehensive fallback mechanisms
for when the AI service is unavailable.
The engine helps with:
- Optimal task-to-agent matching based on skills and capacity
- Generating detailed task instructions
- Analyzing and resolving blockers
- Identifying project risks and mitigation strategies
Examples
--------
>>> engine = AIAnalysisEngine()
>>> await engine.initialize()
>>> task = await engine.match_task_to_agent(tasks, agent, project_state)
>>> instructions = await engine.generate_task_instructions(task, agent)
"""
import json
import os
import sys
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
import anthropic
from src.core.models import (
BlockerReport,
Priority,
ProjectRisk,
ProjectState,
RiskLevel,
Task,
WorkerStatus,
)
from src.cost_tracking.ai_usage_middleware import (
ai_usage_middleware,
)
[docs]
class AIAnalysisEngine:
"""
AI-powered analysis and decision engine using Claude API.
This class provides intelligent analysis for project management decisions,
including task assignment optimization, blocker resolution, and risk analysis.
It gracefully falls back to rule-based approaches when AI is unavailable.
Attributes
----------
client : Optional[anthropic.Anthropic]
Anthropic API client, None if unavailable
model : str
Claude model to use for analysis
prompts : Dict[str, str]
Template prompts for different analysis types
Examples
--------
>>> engine = AIAnalysisEngine()
>>> await engine.initialize()
>>> # Engine is ready for analysis tasks
Notes
-----
Requires an Anthropic API key in ``config_marcus.json``
(``ai.anthropic_api_key``) or via the ``CLAUDE_API_KEY`` env var.
``ANTHROPIC_API_KEY`` is intentionally NOT consulted: setting it in the
shell switches Claude Code from subscription to API billing, so Marcus
uses ``CLAUDE_API_KEY`` to leave Claude Code's auth untouched. Works in
fallback mode without the API key.
"""
[docs]
def __init__(self) -> None:
"""
Initialize the AI Analysis Engine.
Attempts to set up the Anthropic client with various compatibility
approaches for different library versions.
"""
# Initialize Anthropic client with better error handling
self.client: Optional[anthropic.Anthropic] = None
self.current_project_id: Optional[str] = None
self.current_agent_id: Optional[str] = None
try:
# Get API key from config first, fall back to environment
from src.config.marcus_config import get_config
config = get_config()
api_key = config.ai.anthropic_api_key or os.environ.get("CLAUDE_API_KEY")
if not api_key:
print(
"⚠️ Anthropic API key not found - "
"AI features will use fallback mode",
file=sys.stderr,
)
self.client = None
else:
# Try different initialization approaches based on version
try:
# First try simple initialization
self.client = anthropic.Anthropic(api_key=api_key)
# Don't print during initialization - it interferes with MCP stdio
except TypeError as te:
# If we get a TypeError about proxies, just use basic initialization
if "proxies" in str(te):
# Retry without any extra parameters
self.client = anthropic.Anthropic(api_key=api_key)
# Successfully initialized
else:
raise te
except Exception:
# Failed to initialize Anthropic client
# AI features will use fallback responses
# Don't print to stderr as it interferes with MCP stdio protocol
self.client = None
config = None
self.model: str = (
getattr(getattr(config, "ai", None), "model", None)
or "claude-3-5-sonnet-20241022"
) # Using Sonnet 3.5 for speed/cost balance
# Analysis prompts
self.prompts: Dict[str, str] = {
"task_assignment": """You are an AI Project Manager \
analyzing task assignments.
Given:
- Available tasks: {tasks}
- Agent profile: {agent}
- Project state: {project_state}
Analyze and recommend the SINGLE BEST task for this agent considering:
1. Agent's skills match task requirements
2. Agent's current capacity and workload
3. Task priority and dependencies
4. Project timeline and critical path
Return JSON:
{{
"recommended_task_id": "id",
"confidence_score": 0.0-1.0,
"reasoning": "explanation"
}}""",
"task_instructions": """You are generating detailed task \
instructions for a developer.
Task: {task}
Assigned to: {agent}
The agent is a skilled developer. Give them the WHAT, not the HOW.
IMPORTANT: Look at the task data to determine the task type \
(check the 'type' field). Generate instructions appropriate for the task type:
For DESIGN tasks:
- Focus on planning, architecture, and specifications
- Include creating diagrams, API specs, data models
- NO implementation code yet
- Deliverables: design documents, wireframes, API contracts
For IMPLEMENTATION tasks:
- Focus on actual coding and building
- **Critical: Use Test-Driven Development (TDD)**
- Write tests FIRST before any implementation
- Follow Red-Green-Refactor cycle
- Reference the design specifications
- Deliverables: working code with tests
For TESTING tasks:
- Focus on test scenarios and coverage
- Include unit, integration, and edge cases
- Deliverables: test suites with good coverage
For DOCUMENTATION tasks:
- Read the actual source file for every component you document
- Verify every prop name, parameter, type, and default against the code
- Do NOT document intended or design-spec APIs — document what exists
- Deliverables: README or API docs that match the implementation exactly
Generate clear, actionable instructions that:
1. Define the task objective based on its type
2. List specific steps appropriate for the task type
3. Include acceptance criteria
4. Note any dependencies or prerequisites
5. Keep complexity appropriate to the task scope \
(simple tasks get simple instructions)
Keep instructions BRIEF (under 15 lines). Do NOT write:
- Step-by-step implementation plans
- Specific test case names or test code
- File names, class names, or code structure
- Numbered phases or sub-phases
Format as structured text the developer can follow.
The agent decides the approach. You define the goal and constraints.""",
"blocker_analysis": """Analyze this blocker and suggest resolution:
Task ID: {task_id}
Blocker: {description}
Severity: {severity}
{agent_context}
{task_context}
Consider the agent's skills, experience level, and current workload \
when providing suggestions.
Tailor resolution steps to their capabilities and provide learning \
opportunities if appropriate.
Provide JSON response:
{{
"root_cause": "analysis",
"impact_assessment": "description",
"resolution_steps": [
"step1 tailored to agent skills",
"step2"
],
"required_resources": ["resource1"],
"estimated_hours": number,
"escalation_needed": boolean,
"prevention_measures": ["measure1"],
"learning_opportunities": ["skill gaps identified"],
"recommended_collaborators": ["team members with needed skills"],
"skill_match_confidence": "high/medium/low"
}}""",
"project_risk": """Analyze project risks based on current state:
Project State: {project_state}
Recent Blockers: {recent_blockers}
Team Status: {team_status}
Identify risks and provide JSON:
{{
"risks": [
{{
"description": "risk description",
"likelihood": "low|medium|high",
"impact": "low|medium|high",
"mitigation": "suggested action"
}}
],
"overall_health": "healthy|at_risk|critical",
"recommended_actions": ["action1", "action2"]
}}""",
}
[docs]
async def initialize(self) -> None:
"""
Initialize the AI engine and test connectivity.
Verifies the Anthropic client can communicate with the API by
sending a test message. Disables the client if the test fails.
Examples
--------
>>> engine = AIAnalysisEngine()
>>> await engine.initialize()
✅ AI Engine connection verified
"""
if not self.client:
print(
"⚠️ AI Engine running in fallback mode (no Anthropic client)",
file=sys.stderr,
)
return
# Test connection
try:
self.client.messages.create(
model=self.model,
max_tokens=10,
messages=[{"role": "user", "content": "test"}],
)
# Connection verified - wrap client for token tracking
self.client = ai_usage_middleware.wrap_ai_provider(self.client)
except Exception:
# AI Engine test failed - will use fallback responses
# Don't print to stderr as it interferes with MCP stdio protocol
self.client = None # Disable client if test fails
[docs]
async def match_task_to_agent(
self,
available_tasks: List[Task],
agent: WorkerStatus,
project_state: ProjectState,
) -> Optional[Task]:
"""
Find the optimal task for an agent using AI analysis.
Parameters
----------
available_tasks : List[Task]
List of unassigned tasks to choose from
agent : WorkerStatus
Agent profile including skills and capacity
project_state : ProjectState
Current state of the project
Returns
-------
Optional[Task]
The best matching task, or None if no suitable task found
Examples
--------
>>> task = await engine.match_task_to_agent(
... tasks, agent, ProjectState.HEALTHY
... )
>>> print(f"Assigned: {task.name} to {agent.name}")
Notes
-----
Falls back to skill-based matching if AI is unavailable.
Considers up to 10 tasks to avoid context limits.
"""
# Set project context for token tracking
self.current_project_id = project_state.project_name or project_state.board_id
self.current_agent_id = agent.worker_id
if not available_tasks:
return None
if not self.client:
# Fallback: Simple skill-based matching
return self._fallback_task_matching(available_tasks, agent)
# Prepare data for AI analysis
tasks_data = [
{
"id": t.id,
"name": t.name,
"description": t.description,
"priority": t.priority.value,
"estimated_hours": t.estimated_hours,
"labels": t.labels,
"dependencies": t.dependencies,
}
for t in available_tasks[:10] # Limit to 10 tasks for context
]
agent_data = {
"id": agent.worker_id,
"name": agent.name,
"role": agent.role,
"skills": agent.skills,
"current_capacity": agent.capacity,
"completed_tasks": agent.completed_tasks_count,
}
# Serialize ProjectState dataclass to JSON
project_state_data = {
"board_id": project_state.board_id,
"project_name": project_state.project_name,
"total_tasks": project_state.total_tasks,
"completed_tasks": project_state.completed_tasks,
"in_progress_tasks": project_state.in_progress_tasks,
"blocked_tasks": project_state.blocked_tasks,
"progress_percent": project_state.progress_percent,
"team_velocity": project_state.team_velocity,
"risk_level": (
project_state.risk_level.value
if hasattr(project_state.risk_level, "value")
else str(project_state.risk_level)
),
"last_updated": (
project_state.last_updated.isoformat()
if hasattr(project_state.last_updated, "isoformat")
else str(project_state.last_updated)
),
}
prompt = self.prompts["task_assignment"].format(
tasks=json.dumps(tasks_data, indent=2),
agent=json.dumps(agent_data, indent=2),
project_state=json.dumps(project_state_data, indent=2),
)
try:
response = await self._call_claude(prompt)
result = json.loads(response)
# Find the recommended task
task_id = result.get("recommended_task_id")
for task in available_tasks:
if task.id == task_id:
return task
except Exception as e:
print(f"AI task matching failed: {e}", file=sys.stderr)
# Fallback to simple matching
return self._fallback_task_matching(available_tasks, agent)
def _fallback_task_matching(
self, tasks: List[Task], agent: WorkerStatus
) -> Optional[Task]:
"""
Match tasks to agent using skill-based scoring without AI.
Parameters
----------
tasks : List[Task]
Available tasks to match
agent : WorkerStatus
Agent to match tasks for
Returns
-------
Optional[Task]
Best matching task based on priority and skills
Notes
-----
Scores tasks based on:
- Priority (urgent=4, high=3, medium=2, low=1)
- Skill matches (2 points per matching skill)
"""
# Score tasks based on priority and skill match
best_score = -1
best_task = None
priority_scores = {
Priority.URGENT: 10, # Heavily prioritize urgent tasks in fallback mode
Priority.HIGH: 3,
Priority.MEDIUM: 2,
Priority.LOW: 1,
}
for task in tasks:
score = priority_scores.get(task.priority, 1)
# Check skill match
if agent.skills and task.labels:
skill_match = len(set(agent.skills) & set(task.labels))
score += skill_match * 2
if score > best_score:
best_score = score
best_task = task
return best_task
[docs]
async def generate_task_instructions(
self, task: Task, agent: Optional[WorkerStatus] = None
) -> str:
"""
Generate detailed instructions for a task.
Parameters
----------
task : Task
The task to generate instructions for
agent : Optional[WorkerStatus], default=None
The agent assigned to the task
Returns
-------
str
Detailed task instructions formatted as markdown
Examples
--------
>>> instructions = await engine.generate_task_instructions(task, agent)
>>> print(instructions)
## Task Assignment for Alice
...
Notes
-----
Uses AI to generate context-aware instructions when available,
otherwise provides structured fallback instructions.
"""
if not self.client:
# Fallback instructions when AI is not available
return self._generate_fallback_instructions(task, agent)
# Determine task type from parent metadata, name, or labels
# CRITICAL: Check parent task type first for subtasks (GH-XXX)
if hasattr(task, "_parent_task_type"):
task_type = task._parent_task_type
else:
# Fall back to inferring from name or labels
task_type = "implementation" # default
task_labels = getattr(task, "labels", []) or []
if "design" in task.name.lower() or "type:design" in task_labels:
task_type = "design"
elif "test" in task.name.lower() or "type:testing" in task_labels:
task_type = "testing"
elif (
any(kw in task.name.lower() for kw in ("readme", "document", "docs"))
or "type:documentation" in task_labels
):
task_type = "documentation"
task_data: Dict[str, Any] = {
"name": task.name,
"description": task.description,
"priority": task.priority.value,
"estimated_hours": task.estimated_hours,
"dependencies": task.dependencies,
"labels": getattr(task, "labels", []) or [],
"type": task_type,
}
# Contract-first ownership (GH-320 PR 2). When the task is a
# contract-first task, surface the responsibility and contract
# file into the LLM prompt so the generated instructions
# emphasize contract ownership and read-the-contract-first
# behavior.
#
# Uses ``_parse_contract_metadata`` so the detection works
# regardless of whether the task was freshly constructed from
# the decomposer (``Task.responsibility`` set) or reloaded from
# a kanban provider that stripped the field during persistence
# (falls back to source_context and then to the
# MARCUS_CONTRACT_FIRST description marker). Codex P1 from
# PR #327 review.
from src.marcus_mcp.tools.task import _parse_contract_metadata
contract_meta = _parse_contract_metadata(task)
if contract_meta["responsibility"]:
task_data["responsibility"] = contract_meta["responsibility"]
if contract_meta["contract_file"]:
task_data["contract_file"] = contract_meta["contract_file"]
task_data["contract_first"] = True
agent_data = {
"name": agent.name if agent else "Unknown",
"role": agent.role if agent else "Developer",
"skills": agent.skills if agent else [],
}
prompt = self.prompts["task_instructions"].format(
task=json.dumps(task_data, indent=2), agent=json.dumps(agent_data, indent=2)
)
try:
instructions = await self._call_claude(prompt)
return instructions
except Exception as e:
print(f"AI instruction generation failed: {e}", file=sys.stderr)
return self._generate_fallback_instructions(task, agent)
def _generate_fallback_instructions(
self, task: Task, agent: Optional[WorkerStatus]
) -> str:
"""
Generate fallback instructions when AI is not available.
Parameters
----------
task : Task
Task to generate instructions for
agent : Optional[WorkerStatus]
Agent assigned to the task
Returns
-------
str
Structured task instructions in markdown format
"""
agent_name = agent.name if agent else "Team Member"
# Determine task type (check parent metadata first for subtasks)
if hasattr(task, "_parent_task_type"):
task_type = task._parent_task_type
else:
task_type = "implementation"
task_labels = getattr(task, "labels", []) or []
if "design" in task.name.lower() or "type:design" in task_labels:
task_type = "design"
elif "test" in task.name.lower() or "type:testing" in task_labels:
task_type = "testing"
# Generate type-specific instructions
if task_type == "design":
implementation_steps = """2. **Design Steps**
- Research existing patterns and best practices
- Create architecture diagrams
- Define API endpoints and data models
- Document component interfaces
- Create wireframes or mockups if needed"""
definition_of_done = """3. **Definition of Done**
- Design documentation is complete
- API specifications are defined
- Data models are documented
- Technical approach is clear"""
elif task_type == "testing":
implementation_steps = """2. **Testing Steps**
- Review the implementation to understand functionality
- Write unit tests for individual components
- Create integration tests for workflows
- Add edge case and error handling tests
- Ensure minimum 80% code coverage"""
definition_of_done = """3. **Definition of Done**
- All tests pass successfully
- Code coverage meets requirements
- Edge cases are tested
- Test documentation is complete"""
else: # implementation
implementation_steps = """2. **Implementation Steps**
- Review the design specifications
- Break down the task into smaller subtasks
- Start with the core functionality
- Follow project coding standards
- **MANDATORY**: Use Test-Driven Development (TDD)**
- Write failing tests FIRST (Red)
- Implement minimal code to pass tests (Green)
- Refactor while keeping tests green (Refactor)
"""
definition_of_done = """3. **Definition of Done**
- All requirements from description are met
- Code is tested and reviewed
- Documentation is updated
- Code follows project standards"""
return f"""## Task Assignment for {agent_name}
**Task:** {task.name}
**Description:** {task.description}
**Priority:** {task.priority.value}
**Estimated Hours:** {task.estimated_hours}
**Type:** {task_type.capitalize()}
### Instructions:
1. **Review Requirements**
- Read the task description carefully
- Check any linked documentation
- Identify dependencies: \
{', '.join(task.dependencies) if task.dependencies else 'None'}
{implementation_steps}
{definition_of_done}
4. **Communication**
- Report progress regularly
- Ask for clarification if needed
- Report any blockers immediately
**Labels:** {', '.join(task.labels) if task.labels else 'None'}
Good luck with your task!"""
[docs]
async def analyze_blocker(
self,
task_id: str,
description: str,
severity: str,
agent: Optional["WorkerStatus"] = None,
task: Optional["Task"] = None,
) -> Dict[str, Any]:
"""
Analyze a blocker and suggest resolution steps.
Parameters
----------
task_id : str
ID of the blocked task
description : str
Detailed description of the blocker
severity : str
Severity level (low, medium, high, urgent)
agent : Optional[WorkerStatus]
Agent who reported the blocker (for context-aware suggestions)
task : Optional[Task]
Full task details (for enhanced analysis)
Returns
-------
Dict[str, Any]
Analysis results including:
- root_cause: Identified cause
- impact_assessment: Impact description
- resolution_steps: List of steps tailored to agent skills
- required_resources: Needed resources
- estimated_hours: Time to resolve
- escalation_needed: Boolean
- prevention_measures: Future prevention steps
- learning_opportunities: Skills/knowledge gaps identified
- recommended_collaborators: Team members who could help
Examples
--------
>>> analysis = await engine.analyze_blocker(
... "TASK-123",
... "Database connection timeout",
... "high",
... agent=worker_status,
... task=task_obj
... )
>>> print(analysis['resolution_steps'])
['Check database server status', ...]
"""
if not self.client:
return self._generate_fallback_blocker_analysis(
description, severity, agent, task
)
# Prepare agent context
agent_context = ""
if agent:
agent_context = f"""
Agent Profile:
- ID: {agent.worker_id}
- Name: {agent.name}
- Role: {agent.role}
- Skills: {', '.join(agent.skills) if agent.skills else 'None specified'}
- Current Tasks: {len(agent.current_tasks)} active tasks
- Completed Tasks: {agent.completed_tasks_count}
- Performance Score: {agent.performance_score}
"""
# Prepare task context
task_context = ""
if task:
task_context = f"""
Task Details:
- Name: {task.name}
- Description: {task.description}
- Priority: {task.priority.value if task.priority else 'Not set'}
- Estimated Hours: {task.estimated_hours}
- Dependencies: {', '.join(task.dependencies) if task.dependencies else 'None'}
- Labels: {', '.join(task.labels) if task.labels else 'None'}
"""
prompt = self.prompts["blocker_analysis"].format(
task_id=task_id,
description=description,
severity=severity,
agent_context=agent_context,
task_context=task_context,
)
try:
response = await self._call_claude(prompt)
result: Dict[str, Any] = json.loads(response)
return result
except Exception as e:
print(f"AI blocker analysis failed: {e}", file=sys.stderr)
return self._generate_fallback_blocker_analysis(
description, severity, agent, task
)
def _generate_fallback_blocker_analysis(
self,
description: str,
severity: str,
agent: Optional["WorkerStatus"] = None,
task: Optional["Task"] = None,
) -> Dict[str, Any]:
"""
Generate fallback blocker analysis without AI.
Parameters
----------
description : str
Blocker description
severity : str
Severity level
agent : Optional[WorkerStatus]
Agent who reported the blocker
task : Optional[Task]
Task details
Returns
-------
Dict[str, Any]
Basic analysis with agent-aware resolution steps
"""
# Basic analysis
base_steps = [
"Review the blocker description",
"Identify required resources",
"Research potential solutions",
"Document resolution steps",
]
# Add agent-specific guidance if available
if agent and agent.skills:
skill_based_steps = []
if "python" in [s.lower() for s in agent.skills]:
skill_based_steps.append(
"Check Python documentation and Stack Overflow"
)
if "database" in [s.lower() for s in agent.skills]:
skill_based_steps.append("Review database logs and connection settings")
if "frontend" in [s.lower() for s in agent.skills]:
skill_based_steps.append("Check browser console and network requests")
if skill_based_steps:
base_steps = skill_based_steps + base_steps
# Adjust resources based on agent experience
resources = ["Team lead", "Subject matter expert"]
if agent and agent.performance_score < 0.7:
resources.insert(0, "Senior developer (mentoring)")
return {
"root_cause": "Analysis needed",
"impact_assessment": f"Blocker reported: {description}",
"resolution_steps": base_steps,
"required_resources": resources,
"estimated_hours": 2 if severity == "low" else 4,
"escalation_needed": severity in ["high", "urgent"],
"prevention_measures": [
"Improve documentation",
"Add monitoring",
"Review process",
],
"learning_opportunities": ["Root cause analysis", "Problem solving"],
"recommended_collaborators": ["Team lead", "Senior developer"],
"skill_match_confidence": "medium",
}
[docs]
async def generate_clarification(
self, task: Task, question: str, context: str = ""
) -> str:
"""
Generate clarification for a task-related question.
Parameters
----------
task : Task
The task in question
question : str
The question needing clarification
context : str, optional
Additional context for the question
Returns
-------
str
Clarification response
Examples
--------
>>> clarification = await engine.generate_clarification(
... task,
... "What database should I use?",
... "Working on user authentication"
... )
"""
if not self.client:
return (
f"Please clarify: {question}\n\nTask: {task.name}\nContext: {context}"
)
prompt = f"""Help clarify this question about a task:
Task: {task.name}
Description: {task.description}
Question: {question}
Context: {context}
Provide a helpful clarification that guides the developer."""
try:
return await self._call_claude(prompt)
except Exception as e:
print(f"AI clarification failed: {e}", file=sys.stderr)
return (
f"Please clarify: {question}\n\nTask: {task.name}\nContext: {context}"
)
[docs]
async def analyze_project_risks(
self,
project_state: ProjectState,
recent_blockers: List[BlockerReport],
team_status: List[WorkerStatus],
) -> List[ProjectRisk]:
"""
Analyze and identify project risks.
Parameters
----------
project_state : ProjectState
Current project state
recent_blockers : List[BlockerReport]
Recent blockers encountered
team_status : List[WorkerStatus]
Current team member status
Returns
-------
List[ProjectRisk]
Identified risks with mitigation strategies
Examples
--------
>>> risks = await engine.analyze_project_risks(
... ProjectState.AT_RISK,
... recent_blockers,
... team_status
... )
>>> for risk in risks:
... print(f"{risk.description}: {risk.mitigation}")
Notes
-----
Analyzes up to 10 recent blockers to identify patterns.
Falls back to basic risk assessment if AI unavailable.
"""
if not self.client:
return self._generate_fallback_risk_analysis(project_state)
# Prepare data
blockers_data = [
{
"task_id": b.task_id,
"description": b.description,
"severity": b.severity.value,
"reported_at": b.reported_at.isoformat(),
}
for b in recent_blockers[-10:] # Last 10 blockers
]
team_data = [
{
"name": w.name,
"role": w.role,
"current_tasks": len(w.current_tasks),
"capacity": w.capacity,
}
for w in team_status
]
# Serialize ProjectState dataclass to JSON
project_state_data = {
"board_id": project_state.board_id,
"project_name": project_state.project_name,
"total_tasks": project_state.total_tasks,
"completed_tasks": project_state.completed_tasks,
"in_progress_tasks": project_state.in_progress_tasks,
"blocked_tasks": project_state.blocked_tasks,
"progress_percent": project_state.progress_percent,
"team_velocity": project_state.team_velocity,
"risk_level": (
project_state.risk_level.value
if hasattr(project_state.risk_level, "value")
else str(project_state.risk_level)
),
"last_updated": (
project_state.last_updated.isoformat()
if hasattr(project_state.last_updated, "isoformat")
else str(project_state.last_updated)
),
}
prompt = self.prompts["project_risk"].format(
project_state=json.dumps(project_state_data, indent=2),
recent_blockers=json.dumps(blockers_data, indent=2),
team_status=json.dumps(team_data, indent=2),
)
try:
response = await self._call_claude(prompt)
result = json.loads(response)
# Convert to ProjectRisk objects
risks = []
for risk_data in result.get("risks", []):
likelihood_map = {"low": 0.2, "medium": 0.5, "high": 0.8}
impact_severity = {
"low": RiskLevel.LOW,
"medium": RiskLevel.MEDIUM,
"high": RiskLevel.HIGH,
}
risk = ProjectRisk(
risk_type="project",
description=risk_data["description"],
severity=impact_severity.get(risk_data["impact"], RiskLevel.MEDIUM),
probability=likelihood_map.get(risk_data["likelihood"], 0.5),
impact=risk_data.get("impact", "Unknown impact"),
mitigation_strategy=risk_data["mitigation"],
identified_at=datetime.now(timezone.utc),
)
risks.append(risk)
return risks
except Exception as e:
print(f"AI risk analysis failed: {e}", file=sys.stderr)
return self._generate_fallback_risk_analysis(project_state)
def _generate_fallback_risk_analysis(
self, project_state: ProjectState
) -> List[ProjectRisk]:
"""
Generate fallback risk analysis without AI.
Parameters
----------
project_state : ProjectState
Current project state
Returns
-------
List[ProjectRisk]
Basic risks based on project state
"""
risks = []
if project_state.risk_level == RiskLevel.HIGH:
risks.append(
ProjectRisk(
risk_type="timeline",
description="Project timeline at risk",
severity=RiskLevel.HIGH,
probability=0.7,
impact="Potential delays in delivery",
mitigation_strategy=(
"Review task priorities and resource allocation"
),
identified_at=datetime.now(timezone.utc),
)
)
return risks
[docs]
async def analyze_project_health(
self,
project_state: ProjectState,
recent_activities: List[Dict[str, Any]],
team_status: Any,
) -> Dict[str, Any]:
"""
Analyze overall project health using AI.
Parameters
----------
project_state : ProjectState
Current project state
recent_activities : List[Dict[str, Any]]
Recent project activities/events
team_status : Any
Current team status information (can be dict or list)
Returns
-------
Dict[str, Any]
Project health analysis with overall health, timeline prediction,
risk factors, recommendations, and resource optimization
"""
if not self.client:
return self._generate_fallback_health_analysis(project_state, team_status)
# Serialize ProjectState dataclass to JSON
project_state_data = {
"board_id": project_state.board_id,
"project_name": project_state.project_name,
"total_tasks": project_state.total_tasks,
"completed_tasks": project_state.completed_tasks,
"in_progress_tasks": project_state.in_progress_tasks,
"blocked_tasks": project_state.blocked_tasks,
"progress_percent": project_state.progress_percent,
"team_velocity": project_state.team_velocity,
"risk_level": project_state.risk_level.value,
"last_updated": (
project_state.last_updated.isoformat()
if hasattr(project_state.last_updated, "isoformat")
else str(project_state.last_updated)
),
"overdue_tasks": len(project_state.overdue_tasks),
}
# Serialize team status if it's a list of WorkerStatus objects
if isinstance(team_status, list) and len(team_status) > 0:
# Assume it's a list of WorkerStatus objects
team_status_data = [
{
"worker_id": worker.worker_id,
"name": worker.name,
"role": worker.role,
"current_tasks_count": len(worker.current_tasks),
"completed_tasks_count": worker.completed_tasks_count,
"capacity": worker.capacity,
"performance_score": getattr(worker, "performance_score", 1.0),
}
for worker in team_status
]
else:
team_status_data = team_status
# Create project health analysis prompt
prompt = f"""Analyze the health of this software project \
and provide comprehensive insights.
Project State:
{json.dumps(project_state_data, indent=2)}
Recent Activities:
{json.dumps(recent_activities, indent=2)}
Team Status:
{json.dumps(team_status_data, indent=2)}
Provide a detailed analysis including:
1. Overall project health assessment (green/yellow/red)
2. Timeline prediction with confidence level
3. Key risk factors
4. Actionable recommendations
5. Resource optimization suggestions
Return JSON in this format:
{{
"overall_health": "green|yellow|red",
"timeline_prediction": {{
"on_track": true|false,
"estimated_completion": "date or description",
"confidence": 0.0-1.0,
"critical_path_risks": ["risk1", "risk2"]
}},
"risk_factors": [
{{
"type": "resource|timeline|technical|quality",
"description": "detailed description",
"severity": "low|medium|high",
"mitigation": "suggested action"
}}
],
"recommendations": [
{{
"priority": "high|medium|low",
"action": "specific action to take",
"expected_impact": "what this will achieve"
}}
],
"resource_optimization": [
{{
"suggestion": "optimization suggestion",
"impact": "expected improvement"
}}
]
}}"""
try:
response = await self._call_claude(prompt)
result: Dict[str, Any] = json.loads(response)
return result
except Exception as e:
print(f"AI health analysis failed: {e}", file=sys.stderr)
return self._generate_fallback_health_analysis(project_state, team_status)
def _generate_fallback_health_analysis(
self, project_state: ProjectState, team_status: Dict[str, Any]
) -> Dict[str, Any]:
"""
Generate fallback health analysis without AI.
Parameters
----------
project_state : ProjectState
Current project state
team_status : Any
Team status information (can be dict or list)
Returns
-------
Dict[str, Any]
Basic health analysis based on metrics
"""
# Determine overall health based on metrics
overall_health = "green"
if project_state.risk_level == RiskLevel.HIGH:
overall_health = "red"
elif (
project_state.risk_level == RiskLevel.MEDIUM
or project_state.blocked_tasks > 2
):
overall_health = "yellow"
# Calculate timeline prediction
completion_rate = project_state.completed_tasks / max(
project_state.total_tasks, 1
)
on_track = completion_rate >= (project_state.progress_percent / 100.0)
risk_factors = []
# Check for resource risks
if project_state.blocked_tasks > 0:
risk_factors.append(
{
"type": "resource",
"description": (
f"{project_state.blocked_tasks} tasks are " "currently blocked"
),
"severity": "high" if project_state.blocked_tasks > 2 else "medium",
"mitigation": "Review and resolve blockers urgently",
}
)
# Check for timeline risks
if len(project_state.overdue_tasks) > 0:
risk_factors.append(
{
"type": "timeline",
"description": (
f"{len(project_state.overdue_tasks)} tasks are " "overdue"
),
"severity": "high",
"mitigation": "Reassign or reprioritize overdue tasks",
}
)
# Generate recommendations
recommendations = []
if project_state.team_velocity < 3.0:
recommendations.append(
{
"priority": "high",
"action": "Review team capacity and task complexity",
"expected_impact": "Improve velocity by 20-30%",
}
)
if project_state.blocked_tasks > 0:
recommendations.append(
{
"priority": "high",
"action": "Conduct blocker review session",
"expected_impact": "Unblock tasks and restore progress",
}
)
return {
"overall_health": overall_health,
"timeline_prediction": {
"on_track": on_track,
"estimated_completion": "Based on current velocity",
"confidence": 0.6 if on_track else 0.3,
"critical_path_risks": (
["Resource constraints"] if project_state.blocked_tasks > 0 else []
),
},
"risk_factors": risk_factors,
"recommendations": recommendations,
"resource_optimization": [
{
"suggestion": "Balance workload across team members",
"impact": "Reduce bottlenecks and improve throughput",
}
],
}
[docs]
async def analyze_feature_request(self, feature_description: str) -> Dict[str, Any]:
"""
Analyze a feature request and generate appropriate tasks.
Parameters
----------
feature_description : str
Natural language description of the feature to implement
Returns
-------
Dict[str, Any]
Dictionary containing required tasks with details
Examples
--------
>>> result = await engine.analyze_feature_request("Add user profile page")
>>> tasks = result["required_tasks"]
"""
# Fallback for when Claude is unavailable
if not self.client:
return self._analyze_feature_request_fallback(feature_description)
try:
prompt = f"""Analyze this feature request and generate a \
comprehensive task breakdown.
Feature Request: {feature_description}
Generate a detailed task list for implementing this feature. Consider:
1. Design/planning tasks
2. Backend implementation needs
3. Frontend/UI requirements
4. Database/data model changes
5. API endpoints required
6. Security considerations
7. Testing requirements
8. Documentation needs
Return JSON with this exact format:
{{
"required_tasks": [
{{
"name": "specific task name",
"description": "detailed description of what needs to be done",
"estimated_hours": integer,
"labels": ["appropriate", "labels"],
"critical": true/false,
"task_type": \
"design|backend|frontend|database|testing|documentation|security"
}}
],
"feature_complexity": "simple|moderate|complex",
"technical_requirements": ["list", "of", "technical", "needs"],
"dependencies_on_existing": [
"authentication",
"api",
"database"
] // existing systems this feature depends on
}}
Be specific and actionable. Each task should be self-contained \
and assignable to a developer."""
response = await self._call_claude(prompt)
# Parse JSON response
try:
result: Dict[str, Any] = json.loads(response)
return result
except json.JSONDecodeError:
# If JSON parsing fails, try to extract structured data
print(
"Failed to parse AI response as JSON, using fallback",
file=sys.stderr,
)
return self._analyze_feature_request_fallback(feature_description)
except Exception as e:
print(f"AI feature analysis failed: {e}, using fallback", file=sys.stderr)
return self._analyze_feature_request_fallback(feature_description)
def _analyze_feature_request_fallback(
self, feature_description: str
) -> Dict[str, Any]:
"""Fallback feature analysis when AI is unavailable."""
feature_lower = feature_description.lower()
tasks = []
# Always start with design
tasks.append(
{
"name": f"Design {feature_description}",
"description": (
f"Create technical design and architecture "
f"for {feature_description}"
),
"estimated_hours": 4,
"labels": ["design", "planning"],
"critical": False,
"task_type": "design",
}
)
# Analyze feature type and add appropriate tasks
if any(word in feature_lower for word in ["api", "endpoint", "service"]):
tasks.append(
{
"name": f"Implement API for {feature_description}",
"description": "Build backend API endpoints and business logic",
"estimated_hours": 12,
"labels": ["backend", "api"],
"critical": True,
"task_type": "backend",
}
)
if any(word in feature_lower for word in ["ui", "page", "screen", "interface"]):
tasks.append(
{
"name": f"Build UI for {feature_description}",
"description": "Create user interface components and interactions",
"estimated_hours": 10,
"labels": ["frontend", "ui"],
"critical": True,
"task_type": "frontend",
}
)
# Always add testing and documentation
tasks.extend(
[
{
"name": f"Test {feature_description}",
"description": "Write unit tests and perform integration testing",
"estimated_hours": 6,
"labels": ["testing", "qa"],
"critical": False,
"task_type": "testing",
},
{
"name": f"Document {feature_description}",
"description": "Create user and technical documentation",
"estimated_hours": 3,
"labels": ["documentation"],
"critical": False,
"task_type": "documentation",
},
]
)
return {
"required_tasks": tasks,
"feature_complexity": "moderate",
"technical_requirements": [],
"dependencies_on_existing": [],
}
[docs]
async def analyze_integration_points(
self, feature_tasks: List[Task], existing_tasks: List[Task]
) -> Dict[str, Any]:
"""
Analyze how new feature tasks should integrate with existing project tasks.
Parameters
----------
feature_tasks : List[Task]
Tasks for the new feature
existing_tasks : List[Task]
Current project tasks
Returns
-------
Dict[str, Any]
Integration analysis including dependencies and phase
Examples
--------
>>> integration = await engine.analyze_integration_points(
... new_tasks, project_tasks
... )
>>> phase = integration["suggested_phase"]
"""
# Fallback for when Claude is unavailable
if not self.client:
return self._analyze_integration_fallback(feature_tasks, existing_tasks)
try:
# Prepare task summaries
feature_summary = [
{
"name": t.name,
"labels": t.labels,
"type": getattr(t, "task_type", "unknown"),
}
for t in feature_tasks
]
existing_summary = [
{
"id": t.id,
"name": t.name,
"labels": t.labels,
"status": t.status.value,
"description": (
t.description[:100] + "..."
if len(t.description) > 100
else t.description
),
}
for t in existing_tasks
]
prompt = f"""Analyze how these new feature tasks should \
integrate with the existing project.
New Feature Tasks:
{json.dumps(feature_summary, indent=2)}
Existing Project Tasks:
{json.dumps(existing_summary, indent=2)}
Determine:
1. Which existing tasks the new feature depends on (by task ID)
2. The appropriate project phase for these tasks
3. Integration complexity and risks
4. Recommended task ordering
Return JSON with this format:
{{
"dependent_task_ids": ["list", "of", "existing", "task", "ids"],
"suggested_phase": "initial|development|testing|deployment|maintenance",
"integration_complexity": "low|medium|high",
"confidence": 0.0-1.0,
"integration_risks": ["list", "of", "potential", "issues"],
"recommendations": [
{{
"type": "dependency|ordering|resource",
"description": "specific recommendation"
}}
],
"rationale": "explanation of integration approach"
}}"""
response = await self._call_claude(prompt)
# Parse JSON response
try:
result: Dict[str, Any] = json.loads(response)
return result
except json.JSONDecodeError:
print(
"Failed to parse AI response as JSON, using fallback",
file=sys.stderr,
)
return self._analyze_integration_fallback(feature_tasks, existing_tasks)
except Exception as e:
print(
f"AI integration analysis failed: {e}, using fallback", file=sys.stderr
)
return self._analyze_integration_fallback(feature_tasks, existing_tasks)
def _analyze_integration_fallback(
self, feature_tasks: List[Task], existing_tasks: List[Task]
) -> Dict[str, Any]:
"""Fallback integration analysis when AI is unavailable."""
# Analyze existing task states
completed = [t for t in existing_tasks if t.status.value == "done"]
in_progress = [t for t in existing_tasks if t.status.value == "in_progress"]
# Find potential dependencies based on labels
dependent_ids = []
for existing in existing_tasks:
for feature in feature_tasks:
# Check for label overlap
if existing.labels and feature.labels:
if set(existing.labels) & set(feature.labels):
dependent_ids.append(existing.id)
break
# Determine phase
if len(completed) == 0:
phase = "initial"
elif len(completed) / len(existing_tasks) > 0.8:
phase = "maintenance"
elif any("test" in t.name.lower() for t in in_progress):
phase = "testing"
else:
phase = "development"
return {
"dependent_task_ids": list(set(dependent_ids)),
"suggested_phase": phase,
"integration_complexity": "medium",
"confidence": 0.7,
"integration_risks": [],
"recommendations": [],
"rationale": "Based on task label matching and project progress",
}
async def _call_claude(self, prompt: str) -> str:
"""
Call Claude API with error handling and token tracking.
Parameters
----------
prompt : str
The prompt to send to Claude
Returns
-------
str
Claude's response text
Raises
------
Exception
If the API call fails or client is unavailable
"""
if not self.client:
raise Exception("Anthropic client not available")
try:
# Set context for token tracking if available
if self.current_project_id and self.current_agent_id:
ai_usage_middleware.set_project_context(
self.current_agent_id, self.current_project_id
)
response = self.client.messages.create(
model=self.model,
max_tokens=2000,
temperature=0.7,
messages=[{"role": "user", "content": prompt}],
)
# Extract token usage if available
if hasattr(response, "usage"):
usage = response.usage
if self.current_project_id:
# Manually track tokens since we're calling the API directly
import asyncio
from src.cost_tracking.token_tracker import token_tracker
asyncio.create_task(
token_tracker.track_tokens(
project_id=self.current_project_id,
input_tokens=usage.input_tokens,
output_tokens=usage.output_tokens,
model=self.model,
metadata={
"agent_id": self.current_agent_id,
"function": "ai_analysis_engine",
"prompt_length": len(prompt),
},
)
)
# Handle different response block types
content = response.content[0]
if hasattr(content, "text"):
text = str(content.text).strip()
else:
text = str(content).strip()
# Extract JSON from response (LLMs often wrap in fences or add prose)
start = text.find("{") if "{" in text else text.find("[")
if start != -1:
end = text.rfind("}") if "{" in text else text.rfind("]")
if end != -1:
text = text[start : end + 1]
return text
except Exception as e:
print(f"Error calling Claude: {e}", file=sys.stderr)
raise
[docs]
async def generate_structured_response(
self,
prompt: str,
system_prompt: str = "",
response_format: Optional[Dict[str, Any]] = None,
*,
operation: Optional[str] = None,
) -> Dict[str, Any]:
"""
Generate a structured JSON response from the AI based on a schema.
This method is used for task decomposition and other operations requiring
structured output. It only works when an AI provider is configured.
Parameters
----------
prompt : str
The user prompt describing what to generate
system_prompt : str, optional
System instructions for the AI
response_format : Dict[str, Any], optional
JSON schema describing expected response structure
Returns
-------
Dict[str, Any]
Structured response matching the schema
Raises
------
RuntimeError
If no AI client is available (no API key configured)
Examples
--------
>>> result = await engine.generate_structured_response(
... prompt="Break down this task",
... system_prompt="You are a task decomposition expert",
... response_format={"type": "object", "properties": {...}}
... )
"""
# Use LLM abstraction layer to support all providers (Anthropic, OpenAI, local)
try:
from src.ai.providers.llm_abstraction import LLMAbstraction
# Initialize LLM abstraction (supports local models via config)
llm = LLMAbstraction()
# Build full prompt with schema instructions and system prompt
full_prompt = prompt
if system_prompt:
full_prompt = f"{system_prompt}\n\n{prompt}"
if response_format:
schema_str = json.dumps(response_format, indent=2)
full_prompt = f"""{full_prompt}
IMPORTANT: Respond with valid JSON matching this exact schema:
{schema_str}
Your response must be pure JSON with no markdown formatting, \
explanations, or additional text."""
# Delegate to the shared safe_structured_call helper, which
# owns the default budget (16384, up from the Haiku-3-era
# 4096) and the auto-retry-on-truncation policy. Single
# source of truth across every structured-JSON caller in
# Marcus — see src/utils/structured_llm.py.
from src.utils.structured_llm import safe_structured_call
return await safe_structured_call(
llm=llm,
prompt=full_prompt,
operation=operation or "generate_structured_response",
)
except (json.JSONDecodeError, ValueError) as e:
# The helper already logged a structured_llm.retry event for
# any truncation-driven retries that were attempted before
# this exception bubbled out — re-raise as RuntimeError so
# callers keep their existing failure semantics.
raise RuntimeError(
f"AI returned invalid JSON: {e}. This may indicate the AI "
"did not follow the schema correctly."
)
except Exception as e:
error_msg = str(e)
# Provide helpful error messages based on what went wrong
if "No AI providers" in error_msg or "No LLM providers" in error_msg:
raise RuntimeError(
"No AI provider configured. Task decomposition "
"requires an AI provider. "
"Please configure one of the following in "
"config_marcus.json:\n"
"- Local model (Ollama): Set ai.provider='local' "
"and ai.local_model\n"
"- Anthropic: Set ai.provider='anthropic' "
"and ai.anthropic_api_key\n"
"- OpenAI: Set ai.provider='openai' "
"and ai.openai_api_key"
)
print(f"Error generating structured response: {e}", file=sys.stderr)
raise RuntimeError(f"Failed to generate structured response: {e}")