Source code for src.ai.providers.openai_provider

"""
OpenAI Provider for Marcus AI.

Implements semantic task analysis using OpenAI's GPT models as a fallback
or alternative to Anthropic Claude.

This provider integrates with OpenAI's API to provide:
- Task semantic analysis
- Dependency inference
- Enhanced descriptions
- Effort estimation
- Blocker analysis

Classes
-------
OpenAIProvider
    OpenAI GPT provider implementation

Notes
-----
Requires OPENAI_API_KEY environment variable.
Model selection via OPENAI_MODEL environment variable.
"""

import json
import logging
import os
import time
from typing import Any, Dict, List

import httpx

from src.core.models import Task
from src.cost_tracking.cost_recorder import get_recorder

from .base_provider import (
    BaseLLMProvider,
    EffortEstimate,
    SemanticAnalysis,
    SemanticDependency,
)

logger = logging.getLogger(__name__)


[docs] class OpenAIProvider(BaseLLMProvider): """ OpenAI GPT provider for semantic AI analysis. Provides fallback capability when Anthropic is unavailable or alternative AI perspective for comparison. Parameters ---------- None Attributes ---------- api_key : str OpenAI API key from environment base_url : str OpenAI API base URL model : str GPT model to use (default: gpt-3.5-turbo) max_tokens : int Maximum tokens for responses timeout : float API request timeout in seconds client : httpx.AsyncClient Async HTTP client for API calls Raises ------ ValueError If OPENAI_API_KEY is not set Examples -------- >>> provider = OpenAIProvider() >>> analysis = await provider.analyze_task(task, context) """
[docs] def __init__(self) -> None: # Get configuration from centralized config from src.config.marcus_config import get_config config = get_config() self.api_key = config.ai.openai_api_key or os.getenv("OPENAI_API_KEY") if not self.api_key: raise ValueError("OpenAI API key not found in config or environment") self.base_url = "https://api.openai.com/v1" # Use OpenAI-specific default when model is not set or is a Claude model if not config.ai.model or config.ai.model.startswith("claude"): self.model = "gpt-3.5-turbo" else: self.model = config.ai.model self.max_tokens = config.ai.max_tokens self.temperature = config.ai.temperature # Read temperature from config self.timeout = 30.0 # HTTP client self.client = httpx.AsyncClient( headers={ "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", }, timeout=self.timeout, ) logger.info(f"OpenAI provider initialized with model: {self.model}")
[docs] async def analyze_task( self, task: Task, context: Dict[str, Any] ) -> SemanticAnalysis: """ Analyze task semantics using GPT. Parameters ---------- task : Task Task to analyze context : dict Project context Returns ------- SemanticAnalysis Semantic analysis with fallback on error """ messages = [ { "role": "system", "content": ( "You are an expert software project manager analyzing " "development tasks. Respond only with valid JSON." ), }, { "role": "user", "content": self._build_task_analysis_prompt(task, context), }, ] try: response = await self._call_openai(messages) return self._parse_task_analysis_response(response) except Exception as e: logger.error(f"OpenAI task analysis failed: {e}") return SemanticAnalysis( task_intent="unknown", semantic_dependencies=[], risk_factors=["ai_analysis_failed"], suggestions=["Review task manually"], confidence=0.1, reasoning=f"OpenAI analysis failed: {str(e)}", risk_assessment={"availability": "degraded"}, )
[docs] async def infer_dependencies(self, tasks: List[Task]) -> List[SemanticDependency]: """ Infer semantic dependencies using GPT. Parameters ---------- tasks : list of Task Tasks to analyze for dependencies Returns ------- list of SemanticDependency Inferred dependencies, empty list on error """ if len(tasks) < 2: return [] messages = [ { "role": "system", "content": ( "You are analyzing task dependencies. " "Respond only with valid JSON array." ), }, { "role": "user", "content": self._build_dependency_inference_prompt(tasks), }, ] try: response = await self._call_openai(messages) return self._parse_dependency_response(response, tasks) except Exception as e: logger.error(f"OpenAI dependency inference failed: {e}") return []
[docs] async def generate_enhanced_description( self, task: Task, context: Dict[str, Any] ) -> str: """Generate enhanced description using GPT.""" messages = [ { "role": "system", "content": ( "You are enhancing task descriptions for software " "development. Be concise but thorough." ), }, { "role": "user", "content": self._build_enhancement_prompt(task, context), }, ] try: response = await self._call_openai(messages) return response.strip() except Exception as e: logger.error(f"OpenAI description enhancement failed: {e}") return task.description or task.name
[docs] async def estimate_effort( self, task: Task, context: Dict[str, Any] ) -> EffortEstimate: """Estimate effort using GPT.""" messages = [ { "role": "system", "content": ( "You are estimating development effort. " "Respond only with valid JSON." ), }, { "role": "user", "content": self._build_estimation_prompt(task, context), }, ] try: response = await self._call_openai(messages) return self._parse_estimation_response(response) except Exception as e: logger.error(f"OpenAI effort estimation failed: {e}") return EffortEstimate( estimated_hours=8.0, confidence=0.1, factors=["ai_estimation_failed"], similar_tasks=[], risk_multiplier=1.5, )
[docs] async def analyze_blocker( self, task: Task, blocker: str, context: Dict[str, Any] ) -> List[str]: """Analyze blocker using GPT.""" messages = [ { "role": "system", "content": ( "You are helping resolve development blockers. " "Respond with a JSON array of specific suggestions." ), }, { "role": "user", "content": self._build_blocker_analysis_prompt(task, blocker, context), }, ] try: response = await self._call_openai(messages) return self._parse_blocker_response(response) except Exception as e: logger.error(f"OpenAI blocker analysis failed: {e}") return [ "Check task requirements and prerequisites", "Review relevant documentation", "Seek assistance from team members", ]
def _build_task_analysis_prompt(self, task: Task, context: Dict[str, Any]) -> str: """Build task analysis prompt for GPT.""" return f""" Analyze this software development task: Task: {task.name} Description: {task.description or 'No description'} Priority: {task.priority} Project Type: {context.get('project_type', 'general')} Tech Stack: {', '.join(context.get('tech_stack', []))} Provide JSON response: {{ "task_intent": "what this task accomplishes", "semantic_dependencies": ["prerequisites needed"], "risk_factors": ["potential risks"], "suggestions": ["recommendations"], "confidence": 0.0-1.0, "reasoning": "analysis explanation", "risk_assessment": {{ "technical_complexity": "low|medium|high", "user_impact": "low|medium|high", "rollback_difficulty": "low|medium|high" }} }}""" def _build_dependency_inference_prompt(self, tasks: List[Task]) -> str: """Build dependency inference prompt.""" task_list = "\n".join( [ f"{task.id}: {task.name} - {task.description or 'No description'}" for task in tasks ] ) return f""" Identify dependencies between these tasks: {task_list} Return JSON array: [ {{ "dependent_task_id": "task_that_depends", "dependency_task_id": "prerequisite_task", "confidence": 0.0-1.0, "reasoning": "why dependency exists", "dependency_type": "logical|technical|temporal" }} ] Only include necessary dependencies.""" def _build_enhancement_prompt(self, task: Task, context: Dict[str, Any]) -> str: """Build enhancement prompt.""" return f""" Enhance this task description: Current: {task.name} Description: {task.description or 'No description'} Project: {context.get('project_type', 'general')} Provide enhanced description with: - Clear objective - Technical requirements - Success criteria - Key considerations Enhanced Description:""" def _build_estimation_prompt(self, task: Task, context: Dict[str, Any]) -> str: """Build estimation prompt.""" return f""" Estimate effort for this task: Task: {task.name} Description: {task.description or 'No description'} Technology: {', '.join(context.get('tech_stack', []))} JSON response: {{ "estimated_hours": float, "confidence": 0.0-1.0, "factors": ["factors affecting estimate"], "similar_tasks": ["similar task patterns"], "risk_multiplier": 1.0-3.0 }}""" def _build_blocker_analysis_prompt( self, task: Task, blocker: str, context: Dict[str, Any] ) -> str: """Build blocker analysis prompt.""" return f""" Help resolve this development blocker: Task: {task.name} Blocker: {blocker} Severity: {context.get('severity', 'unknown')} Provide JSON array of 3-5 specific solutions: ["solution1", "solution2", "solution3"]""" async def _call_openai(self, messages: List[Dict[str, str]]) -> str: """Make API call to OpenAI. Records the token usage to the planner cost store on success (parity with AnthropicProvider / LocalLLMProvider — without this hook every OpenAI fallback during Marcus's decomposition / analysis path silently went uncounted, leaving cost data missing the entire 'planner' role for accounts that aren't on Anthropic). """ payload = { "model": self.model, "messages": messages, "max_tokens": self.max_tokens, "temperature": self.temperature, } start = time.monotonic() try: response = await self.client.post( f"{self.base_url}/chat/completions", json=payload ) response.raise_for_status() data = response.json() latency_ms = int((time.monotonic() - start) * 1000) # OpenAI-compatible servers return a usage object with # prompt_tokens / completion_tokens. Cache fields are # absent for non-Anthropic backends; record_planner_call # defaults them to 0. usage = data.get("usage") or {} try: get_recorder().record_planner_call( operation="analyze", provider="openai", model=str(data.get("model", self.model)), input_tokens=int(usage.get("prompt_tokens", 0)), output_tokens=int(usage.get("completion_tokens", 0)), latency_ms=latency_ms, request_id=str(data.get("id")) if data.get("id") else None, ) except Exception: # pragma: no cover - recorder must never raise logger.exception("OpenAI cost recording failed") return str(data["choices"][0]["message"]["content"]) except httpx.TimeoutException: raise Exception("OpenAI API request timed out") except httpx.HTTPStatusError as e: raise Exception(f"OpenAI API error: {e.response.status_code}") except Exception as e: raise Exception(f"OpenAI API call failed: {str(e)}") def _parse_task_analysis_response(self, response: str) -> SemanticAnalysis: """Parse GPT task analysis response.""" try: data = json.loads(response) return SemanticAnalysis( task_intent=data.get("task_intent", "unknown"), semantic_dependencies=data.get("semantic_dependencies", []), risk_factors=data.get("risk_factors", []), suggestions=data.get("suggestions", []), confidence=float(data.get("confidence", 0.5)), reasoning=data.get("reasoning", "No reasoning provided"), risk_assessment=data.get("risk_assessment", {}), ) except (json.JSONDecodeError, KeyError, ValueError) as e: logger.warning(f"Failed to parse OpenAI response: {e}") return SemanticAnalysis( task_intent="parse_error", semantic_dependencies=[], risk_factors=["response_parsing_failed"], suggestions=["Retry analysis"], confidence=0.1, reasoning="Failed to parse AI response", risk_assessment={}, ) def _parse_dependency_response( self, response: str, tasks: List[Task] ) -> List[SemanticDependency]: """Parse dependency inference response.""" try: data = json.loads(response) dependencies = [] task_ids = {task.id for task in tasks} for dep in data: if ( dep.get("dependent_task_id") in task_ids and dep.get("dependency_task_id") in task_ids ): dependencies.append( SemanticDependency( dependent_task_id=dep["dependent_task_id"], dependency_task_id=dep["dependency_task_id"], confidence=float(dep.get("confidence", 0.5)), reasoning=dep.get("reasoning", ""), dependency_type=dep.get("dependency_type", "logical"), ) ) return dependencies except (json.JSONDecodeError, KeyError, ValueError): return [] def _parse_estimation_response(self, response: str) -> EffortEstimate: """Parse effort estimation response.""" try: data = json.loads(response) return EffortEstimate( estimated_hours=float(data.get("estimated_hours", 8.0)), confidence=float(data.get("confidence", 0.5)), factors=data.get("factors", []), similar_tasks=data.get("similar_tasks", []), risk_multiplier=float(data.get("risk_multiplier", 1.0)), ) except (json.JSONDecodeError, KeyError, ValueError): return EffortEstimate( estimated_hours=8.0, confidence=0.1, factors=["parsing_failed"], similar_tasks=[], risk_multiplier=1.5, ) def _parse_blocker_response(self, response: str) -> List[str]: """Parse blocker analysis response.""" try: suggestions_raw: Any = json.loads(response) if isinstance(suggestions_raw, list): return [str(s) for s in suggestions_raw] else: return [str(suggestions_raw)] except (json.JSONDecodeError, ValueError): # Extract from text format lines = response.strip().split("\n") suggestions_list: List[str] = [] for line in lines: if line.strip() and not line.startswith("#"): suggestions_list.append(line.strip("- ").strip()) return ( suggestions_list[:5] if suggestions_list else ["Review task requirements"] )
[docs] async def complete(self, prompt: str, max_tokens: int = 2000) -> str: """ Generate a completion for the given prompt. Args ---- prompt: The prompt to complete max_tokens: Maximum tokens in response Returns ------- The completion text """ messages = [{"role": "user", "content": prompt}] self.max_tokens = max_tokens try: response = await self._call_openai(messages) return response except Exception as e: logger.error(f"OpenAI completion failed: {e}") raise
[docs] async def close(self) -> None: """Close HTTP client.""" await self.client.aclose()