"""
Anthropic Claude Provider for Marcus AI.
Implements semantic task analysis, dependency inference, and intelligent
enhancement using Anthropic's Claude models.
"""
import json
import logging
import os
import time
from typing import Any, Dict, List
import httpx
from src.core.models import Task
from src.cost_tracking.cost_recorder import get_recorder
from src.utils.json_parser import parse_ai_json_response, parse_json_response
from .base_provider import (
BaseLLMProvider,
EffortEstimate,
SemanticAnalysis,
SemanticDependency,
)
logger = logging.getLogger(__name__)
[docs]
class AnthropicProvider(BaseLLMProvider):
"""
Anthropic Claude provider for semantic AI analysis.
Uses Claude to provide intelligent task analysis, dependency inference,
and project understanding while maintaining safety and reliability.
"""
[docs]
def __init__(self, api_key: str | None = None) -> None:
"""Initialize the Anthropic provider.
Parameters
----------
api_key : str, optional
Explicit Anthropic API key. If omitted, falls back to
``config.ai.anthropic_api_key`` then ``CLAUDE_API_KEY`` env var.
``ANTHROPIC_API_KEY`` is intentionally NOT consulted: setting it
in the parent shell switches Claude Code from subscription billing
to API billing, so Marcus uses ``CLAUDE_API_KEY`` to avoid
interfering with Claude Code subprocesses Marcus spawns.
Raises
------
ValueError
If no API key is provided and none is discoverable from config
or ``CLAUDE_API_KEY``.
"""
# Get configuration from centralized config
from src.config.marcus_config import get_config
config = get_config()
self.api_key = (
api_key or config.ai.anthropic_api_key or os.getenv("CLAUDE_API_KEY")
)
if not self.api_key:
raise ValueError(
"Anthropic API key not found. Pass api_key explicitly, set "
"ai.anthropic_api_key in config_marcus.json, or export "
"CLAUDE_API_KEY in your environment."
)
self.base_url = "https://api.anthropic.com/v1"
self.model = config.ai.model or "claude-3-haiku-20240307"
self.max_tokens = config.ai.max_tokens
self.temperature = config.ai.temperature # Read temperature from config
self.timeout = 120.0
# HTTP client with proper headers
# Disable HTTP/2 to avoid connection issues in Docker environments
self.client = httpx.AsyncClient(
headers={
"x-api-key": self.api_key,
"content-type": "application/json",
"anthropic-version": "2023-06-01",
},
timeout=self.timeout,
http2=False,
)
logger.info(f"Anthropic provider initialized with model: {self.model}")
[docs]
async def analyze_task(
self, task: Task, context: Dict[str, Any]
) -> SemanticAnalysis:
"""
Analyze task semantics using Claude.
Args
----
task: Task to analyze
context: Project context
Returns
-------
Semantic analysis with intent, dependencies, and risks
"""
prompt = self._build_task_analysis_prompt(task, context)
try:
response = await self._call_claude(prompt)
return self._parse_task_analysis_response(response)
except Exception as e:
logger.error(f"Anthropic task analysis failed: {e}")
# Return safe fallback
return SemanticAnalysis(
task_intent="unknown",
semantic_dependencies=[],
risk_factors=["ai_analysis_failed"],
suggestions=["Review task manually"],
confidence=0.1,
reasoning=f"AI analysis failed: {str(e)}",
risk_assessment={"availability": "degraded"},
)
[docs]
async def infer_dependencies(self, tasks: List[Task]) -> List[SemanticDependency]:
"""
Infer semantic dependencies between tasks using Claude.
Args
----
tasks: List of tasks to analyze
Returns
-------
List of inferred semantic dependencies
"""
if len(tasks) < 2:
return []
prompt = self._build_dependency_inference_prompt(tasks)
try:
response = await self._call_claude(prompt)
return self._parse_dependency_response(response, tasks)
except Exception as e:
logger.error(f"Anthropic dependency inference failed: {e}")
return []
[docs]
async def generate_enhanced_description(
self, task: Task, context: Dict[str, Any]
) -> str:
"""
Generate enhanced task description using Claude.
Args
----
task: Task to enhance
context: Project context
Returns
-------
Enhanced description
"""
prompt = self._build_enhancement_prompt(task, context)
try:
response = await self._call_claude(prompt)
return self._parse_enhancement_response(response, task)
except Exception as e:
logger.error(f"Anthropic description enhancement failed: {e}")
return task.description or task.name
[docs]
async def estimate_effort(
self, task: Task, context: Dict[str, Any]
) -> EffortEstimate:
"""
Estimate task effort using Claude.
Args
----
task: Task to estimate
context: Project context with historical data
Returns
-------
Effort estimate with confidence and factors
"""
prompt = self._build_estimation_prompt(task, context)
try:
response = await self._call_claude(prompt)
return self._parse_estimation_response(response)
except Exception as e:
logger.error(f"Anthropic effort estimation failed: {e}")
return EffortEstimate(
estimated_hours=8.0, # Safe default
confidence=0.1,
factors=["ai_estimation_failed"],
similar_tasks=[],
risk_multiplier=1.5,
)
[docs]
async def analyze_blocker(
self, task: Task, blocker: str, context: Dict[str, Any]
) -> List[str]:
"""
Analyze blocker and suggest solutions using Claude.
Args
----
task: Blocked task
blocker: Blocker description
context: Additional context
Returns
-------
List of suggested solutions
"""
prompt = self._build_blocker_analysis_prompt(task, blocker, context)
try:
response = await self._call_claude(prompt)
return self._parse_blocker_response(response)
except Exception as e:
logger.error(f"Anthropic blocker analysis failed: {e}")
return [
"Review task requirements and dependencies",
"Check documentation for similar issues",
"Consult with team lead or senior developer",
]
def _build_task_analysis_prompt(self, task: Task, context: Dict[str, Any]) -> str:
"""Build prompt for task semantic analysis."""
project_type = context.get("project_type", "general")
tech_stack = context.get("tech_stack", [])
return f"""
You are an expert software project manager analyzing a development task.
Task Information:
- Name: {task.name}
- Description: {task.description or 'No description provided'}
- Priority: {task.priority}
- Current Status: {task.status}
Project Context:
- Project Type: {project_type}
- Technology Stack: {', '.join(tech_stack) if tech_stack else 'Not specified'}
- Team Size: {context.get('team_size', 'Unknown')}
Please analyze this task and provide a JSON response with the following
structure:
{{
"task_intent": "Brief description of what this task is trying to accomplish",
"semantic_dependencies": [
"List of task types that should logically come before this"
],
"risk_factors": ["List of potential risks or complications"],
"suggestions": ["List of recommendations for successful completion"],
"confidence": 0.0-1.0,
"reasoning": "Explanation of your analysis",
"risk_assessment": {{
"technical_complexity": "low|medium|high",
"user_impact": "low|medium|high",
"rollback_difficulty": "low|medium|high"
}}
}}
Focus on:
1. Understanding the true intent beyond just the task name
2. Identifying logical prerequisites and dependencies
3. Assessing risks specific to the technology stack
4. Providing actionable suggestions
Respond only with valid JSON."""
def _build_dependency_inference_prompt(self, tasks: List[Task]) -> str:
"""Build prompt for dependency inference."""
task_list = "\n".join(
[
f"- {task.id}: {task.name} | {task.description or 'No description'}"
for task in tasks
]
)
return f"""
You are analyzing task dependencies for a software project.
Tasks to analyze:
{task_list}
Please identify logical dependencies between these tasks and return a JSON
array with this structure:
[
{{
"dependent_task_id": "task_that_depends",
"dependency_task_id": "task_that_must_come_first",
"confidence": 0.0-1.0,
"reasoning": "Why this dependency exists",
"dependency_type": "logical|technical|temporal"
}}
]
Guidelines:
1. Only suggest dependencies that are logically necessary
2. Focus on "must come before" relationships, not just "nice to have"
3. Consider technical prerequisites (e.g., API before frontend integration)
4. Consider logical flow (e.g., design before implementation, testing before deployment)
5. High confidence (>0.8) should be reserved for clear, mandatory dependencies
Respond only with valid JSON array."""
def _build_enhancement_prompt(self, task: Task, context: Dict[str, Any]) -> str:
"""Build prompt for task description enhancement."""
project_type = context.get("project_type", "general")
return f"""
You are enhancing a task description to make it more detailed and actionable.
Current Task:
- Name: {task.name}
- Description: {task.description or 'No description provided'}
Project Context:
- Type: {project_type}
- Technology: {', '.join(context.get('tech_stack', []))}
Please provide an enhanced description that includes:
1. Clear objective and success criteria
2. Specific technical requirements
3. Key considerations or gotchas
4. Definition of done
Keep it concise but comprehensive. Focus on what a developer needs to know
to complete the task successfully.
Enhanced Description:"""
def _build_estimation_prompt(self, task: Task, context: Dict[str, Any]) -> str:
"""Build prompt for effort estimation."""
historical_data = context.get("historical_data", [])
similar_tasks = [
h
for h in historical_data
if h.get("task_type") == self._classify_task_type(task)
]
similar_tasks_text = ""
if similar_tasks:
similar_tasks_text = "\nSimilar historical tasks:\n" + "\n".join(
[
f"- {h.get('name', 'Unknown')}: "
f"{h.get('actual_hours', 'Unknown')} hours"
for h in similar_tasks[:3]
]
)
return f"""
You are estimating development effort for a task.
Task: {task.name}
Description: {task.description or 'No description'}
Project Type: {context.get('project_type', 'general')}
Technology: {', '.join(context.get('tech_stack', []))}
{similar_tasks_text}
Please provide a JSON response:
{{
"estimated_hours": float,
"confidence": 0.0-1.0,
"factors": ["List of factors affecting the estimate"],
"similar_tasks": ["List of similar task patterns"],
"risk_multiplier": 1.0-3.0
}}
Consider:
1. Task complexity and scope
2. Technology familiarity
3. Integration requirements
4. Testing needs
5. Documentation requirements
Respond only with valid JSON."""
def _build_blocker_analysis_prompt(
self, task: Task, blocker: str, context: Dict[str, Any]
) -> str:
"""Build prompt for blocker analysis."""
agent_info = context.get("agent", {})
severity = context.get("severity", "unknown")
return f"""
You are helping resolve a development blocker.
Task: {task.name}
Description: {task.description or 'No description'}
Blocker: {blocker}
Severity: {severity}
Agent: {agent_info.get('name', 'Unknown')} ({agent_info.get('role', 'Developer')})
Please provide 3-5 specific, actionable suggestions to resolve this blocker.
Focus on practical steps the developer can take.
Format as a JSON array of strings:
["suggestion1", "suggestion2", "suggestion3"]
Suggestions should be:
1. Specific and actionable
2. Ordered by likelihood to resolve the issue
3. Include both immediate fixes and alternative approaches
4. Consider the technology stack and project context
Respond only with valid JSON array."""
[docs]
async def complete(self, prompt: str, max_tokens: int = 2000) -> str:
"""
Generate a completion for the given prompt.
Args
----
prompt: The prompt to complete
max_tokens: Maximum tokens in response
Returns
-------
The completion text
"""
self.max_tokens = max_tokens
return await self._call_claude(prompt)
async def _call_claude(self, prompt: str, operation: str = "analyze") -> str:
"""
Make API call to Claude.
Args
----
prompt: Prompt to send to Claude
operation: Logical operation name recorded with the cost event
(e.g. ``'parse_prd'``, ``'analyze_blocker'``). Defaults to
``'analyze'`` so legacy callers still record something useful.
Returns
-------
Claude's response text
Notes
-----
After a successful response, this method captures the four token
fields from ``data["usage"]`` (``input_tokens``,
``cache_creation_input_tokens``, ``cache_read_input_tokens``,
``output_tokens``) and records a planner-side ``token_events`` row
via :func:`src.cost_tracking.cost_recorder.get_recorder`. The
recorder is best-effort: it never raises into the caller.
"""
payload = {
"model": self.model,
"max_tokens": self.max_tokens,
"temperature": self.temperature,
"messages": [{"role": "user", "content": prompt}],
}
start = time.monotonic()
try:
response = await self.client.post(f"{self.base_url}/messages", json=payload)
response.raise_for_status()
data = response.json()
latency_ms = int((time.monotonic() - start) * 1000)
usage = data.get("usage") or {}
get_recorder().record_planner_call(
operation=operation,
provider="anthropic",
model=str(data.get("model", self.model)),
input_tokens=int(usage.get("input_tokens", 0)),
cache_creation_tokens=int(usage.get("cache_creation_input_tokens", 0)),
cache_read_tokens=int(usage.get("cache_read_input_tokens", 0)),
output_tokens=int(usage.get("output_tokens", 0)),
latency_ms=latency_ms,
request_id=str(data.get("id")) if data.get("id") else None,
)
return str(data["content"][0]["text"])
except httpx.TimeoutException:
raise Exception("Claude API request timed out")
except httpx.HTTPStatusError as e:
raise Exception(
f"Claude API error: {e.response.status_code} - {e.response.text}"
)
except Exception as e:
raise Exception(f"Claude API call failed: {str(e)}")
def _parse_task_analysis_response(self, response: str) -> SemanticAnalysis:
"""Parse Claude's task analysis response."""
try:
data = parse_ai_json_response(response)
return SemanticAnalysis(
task_intent=data.get("task_intent", "unknown"),
semantic_dependencies=data.get("semantic_dependencies", []),
risk_factors=data.get("risk_factors", []),
suggestions=data.get("suggestions", []),
confidence=float(data.get("confidence", 0.5)),
reasoning=data.get("reasoning", "No reasoning provided"),
risk_assessment=data.get("risk_assessment", {}),
)
except (json.JSONDecodeError, KeyError, ValueError) as e:
logger.warning(f"Failed to parse Claude task analysis: {e}")
return SemanticAnalysis(
task_intent="parse_error",
semantic_dependencies=[],
risk_factors=["response_parsing_failed"],
suggestions=["Retry analysis"],
confidence=0.1,
reasoning="Failed to parse AI response",
risk_assessment={},
)
def _parse_dependency_response(
self, response: str, tasks: List[Task]
) -> List[SemanticDependency]:
"""Parse Claude's dependency inference response."""
try:
data = parse_json_response(response)
dependencies = []
task_ids = {task.id for task in tasks}
# Handle both list format and dict with dependencies list
if isinstance(data, list):
deps_list = data
elif isinstance(data, dict) and "dependencies" in data:
deps_list = data["dependencies"]
elif isinstance(data, dict):
deps_list = [data] # Single dependency as dict
else:
deps_list = []
for dep in deps_list:
# Validate task IDs exist
if (
dep.get("dependent_task_id") in task_ids
and dep.get("dependency_task_id") in task_ids
):
dependencies.append(
SemanticDependency(
dependent_task_id=dep["dependent_task_id"],
dependency_task_id=dep["dependency_task_id"],
confidence=float(dep.get("confidence", 0.5)),
reasoning=dep.get("reasoning", ""),
dependency_type=dep.get("dependency_type", "logical"),
)
)
return dependencies
except (json.JSONDecodeError, KeyError, ValueError) as e:
logger.warning(f"Failed to parse Claude dependency response: {e}")
return []
def _parse_enhancement_response(self, response: str, task: Task) -> str:
"""Parse Claude's description enhancement response."""
# Claude returns enhancement directly as text
enhanced = response.strip()
# Ensure we have some content
if len(enhanced) < 10:
return task.description or task.name
return enhanced
def _parse_estimation_response(self, response: str) -> EffortEstimate:
"""Parse Claude's effort estimation response."""
try:
data = parse_ai_json_response(response)
return EffortEstimate(
estimated_hours=float(data.get("estimated_hours", 8.0)),
confidence=float(data.get("confidence", 0.5)),
factors=data.get("factors", []),
similar_tasks=data.get("similar_tasks", []),
risk_multiplier=float(data.get("risk_multiplier", 1.0)),
)
except (json.JSONDecodeError, KeyError, ValueError) as e:
logger.warning(f"Failed to parse Claude estimation response: {e}")
return EffortEstimate(
estimated_hours=8.0,
confidence=0.1,
factors=["parsing_failed"],
similar_tasks=[],
risk_multiplier=1.5,
)
def _parse_blocker_response(self, response: str) -> List[str]:
"""Parse Claude's blocker analysis response."""
try:
data = parse_json_response(response)
if isinstance(data, list):
return [str(s) for s in data]
elif isinstance(data, dict) and "suggestions" in data:
return [str(s) for s in data["suggestions"]]
else:
return [str(data)]
except (json.JSONDecodeError, ValueError) as e:
logger.warning(f"Failed to parse Claude blocker response: {e}")
# Try to extract suggestions from text
lines = response.strip().split("\n")
suggestions_list: List[str] = []
for line in lines:
if line.strip() and not line.startswith("#"):
suggestions_list.append(line.strip("- ").strip())
return (
suggestions_list[:5]
if suggestions_list
else ["Review and retry the task"]
)
def _classify_task_type(self, task: Task) -> str:
"""Classify task type for historical comparison."""
task_text = f"{task.name} {task.description or ''}".lower()
if any(word in task_text for word in ["test", "qa", "verify"]):
return "testing"
elif any(word in task_text for word in ["deploy", "release"]):
return "deployment"
elif any(word in task_text for word in ["design", "ui", "mockup"]):
return "design"
elif any(word in task_text for word in ["api", "endpoint", "service"]):
return "backend"
elif any(word in task_text for word in ["frontend", "client", "react"]):
return "frontend"
else:
return "development"
[docs]
async def close(self) -> None:
"""Close the HTTP client."""
await self.client.aclose()