Source code for src.ai.providers.local_provider

"""
Local LLM Provider for Marcus AI.

Implements support for local models via Ollama or other OpenAI-compatible servers.
This provider enables running Marcus with complete local AI inference, removing
dependency on external API services.

Classes
-------
LocalLLMProvider
    Local model provider supporting Ollama and OpenAI-compatible endpoints

Notes
-----
Requires a local LLM server running (e.g., Ollama, llama.cpp server, etc.)
Model selection via MARCUS_LOCAL_LLM_PATH environment variable.
Base URL configurable via MARCUS_LOCAL_LLM_URL (defaults to Ollama).

Examples
--------
>>> # With Ollama running locally
>>> os.environ['MARCUS_LOCAL_LLM_PATH'] = 'codellama:13b'
>>> os.environ['MARCUS_LLM_PROVIDER'] = 'local'
>>> provider = LocalLLMProvider('codellama:13b')
"""

import json
import logging
import os
import re
import time
from typing import Any, Dict, List, Optional

import httpx

from src.core.models import Task
from src.cost_tracking.cost_recorder import get_recorder
from src.utils.json_parser import parse_ai_json_response

from .base_provider import (
    BaseLLMProvider,
    EffortEstimate,
    SemanticAnalysis,
    SemanticDependency,
)

logger = logging.getLogger(__name__)

# Reasoning-distilled models (deepseek-r1, qwq, etc.) emit ``<think>...</think>``
# blocks before their structured output.  Marcus's downstream parsers expect
# clean JSON, so we strip the leading reasoning prefix here.
#
# Anchored to the response prefix only (Codex P2 review on PR #489): a global
# substitution would corrupt response payloads that legitimately quote the
# tags inside JSON string values — for example a task description about
# handling reasoning blocks would otherwise have its content silently
# removed.  We only strip the reasoning prefix that precedes the structured
# output, never tags embedded inside it.
_LEADING_THINK_BLOCK = re.compile(
    r"\A\s*(?:<think>.*?</think>\s*)+",
    re.DOTALL | re.IGNORECASE,
)


def _strip_reasoning_blocks(content: str) -> str:
    """Remove leading ``<think>...</think>`` reasoning prefix from a response.

    Reasoning-distilled models (deepseek-r1 family, qwq, etc.) emit a
    chain-of-thought reasoning block before their structured output.
    Marcus's JSON parsers cannot consume that prefix.  This strip removes
    well-formed reasoning blocks that appear at the start of the response
    (one or more, possibly separated by whitespace).  Tags that appear
    inside the structured output — for example inside a JSON string value
    that legitimately mentions ``<think>...</think>`` — are NOT stripped.
    Malformed (unclosed) ``<think>`` tags are also left untouched so the
    failure surfaces in parsing rather than being hidden.

    Parameters
    ----------
    content : str
        Raw model response.

    Returns
    -------
    str
        Response with leading ``<think>...</think>`` reasoning prefix
        removed and surrounding whitespace trimmed.  Embedded tags inside
        the actual payload are preserved.
    """
    stripped = _LEADING_THINK_BLOCK.sub("", content)
    if stripped != content:
        logger.debug(
            "Stripped %d char(s) of leading <think>...</think> reasoning prefix",
            len(content) - len(stripped),
        )
    return stripped.strip()


[docs] class LocalLLMProvider(BaseLLMProvider): """ Local LLM provider for semantic AI analysis. Supports Ollama and other OpenAI-compatible local inference servers. Optimized for coding and reasoning tasks with models like CodeLlama, DeepSeek-Coder, or Mixtral. Parameters ---------- model_name : str Name of the model to use (e.g., 'codellama:13b', 'deepseek-coder:6.7b') Attributes ---------- base_url : str Local LLM server URL (default: http://localhost:11434/v1 for Ollama) model : str Model identifier for the local server max_tokens : int Maximum tokens for responses timeout : float API request timeout in seconds client : httpx.AsyncClient Async HTTP client for API calls Examples -------- >>> provider = LocalLLMProvider('codellama:13b') >>> analysis = await provider.analyze_task(task, context) """
[docs] def __init__(self, model_name: str) -> None: """ Initialize local LLM provider. Parameters ---------- model_name : str Model to use (e.g., 'codellama:13b') """ # Get configuration from centralized config from src.config.marcus_config import get_config config = get_config() # Support different local LLM servers - config first, env var as override self.base_url: str = config.ai.local_url or "http://localhost:11434/v1" env_url = os.getenv("MARCUS_LOCAL_LLM_URL") if env_url: self.base_url = env_url self.model = model_name self.max_tokens = config.ai.max_tokens self.temperature = config.ai.temperature # Read temperature from config self.timeout = 120.0 # Longer timeout for local inference # Get API key from config or env var api_key: str = config.ai.local_key or "none" env_key = os.getenv("MARCUS_LOCAL_LLM_KEY") if env_key: api_key = env_key # HTTP client for OpenAI-compatible API self.client = httpx.AsyncClient( headers={ "Content-Type": "application/json", # Ollama doesn't require auth, but some servers might "Authorization": f"Bearer {api_key}", }, timeout=self.timeout, base_url=self.base_url, ) logger.info( f"Local LLM provider initialized with model: {self.model} " f"at {self.base_url}" )
[docs] async def analyze_task( self, task: Task, context: Dict[str, Any] ) -> SemanticAnalysis: """ Analyze task semantics using local LLM. Parameters ---------- task : Task Task to analyze context : dict Project context including related tasks Returns ------- SemanticAnalysis Comprehensive semantic analysis of the task """ prompt = self._build_task_analysis_prompt(task, context) try: response = await self._call_local_llm(prompt) return self._parse_task_analysis_response(response) except Exception as e: logger.error(f"Local LLM task analysis failed: {e}") # Return safe fallback return SemanticAnalysis( task_intent="unknown", semantic_dependencies=[], risk_factors=["local_llm_analysis_failed"], suggestions=["Review task manually"], confidence=0.1, reasoning=f"Local LLM analysis failed: {str(e)}", risk_assessment={"availability": "degraded"}, )
[docs] async def infer_dependencies(self, tasks: List[Task]) -> List[SemanticDependency]: """ Infer semantic dependencies between tasks. Parameters ---------- tasks : list of Task All tasks to analyze for dependencies Returns ------- list of SemanticDependency Inferred dependencies with confidence scores """ prompt = self._build_dependency_inference_prompt(tasks) try: response = await self._call_local_llm(prompt) return self._parse_dependency_response(response) except Exception as e: logger.error(f"Local LLM dependency inference failed: {e}") return []
[docs] async def generate_enhanced_description( self, task: Task, context: Dict[str, Any] ) -> str: """ Generate enhanced task description. Parameters ---------- task : Task Task needing better description context : dict Project context Returns ------- str Enhanced, detailed task description """ prompt = f"""Given this task: '{task.name}' Current description: '{task.description or "No description"}' Project context: {json.dumps(context.get('project_type', 'software development'))} Generate a clear, actionable description that includes: 1. What needs to be done 2. Key technical considerations 3. Success criteria Enhanced description:""" try: response = await self._call_local_llm(prompt, max_tokens=500) return response.strip() except Exception as e: logger.error(f"Local LLM description generation failed: {e}") return task.description or task.name
[docs] async def estimate_effort( self, task: Task, context: Dict[str, Any] ) -> EffortEstimate: """ Estimate task effort using local LLM. Parameters ---------- task : Task Task to estimate context : dict Project context with team velocity Returns ------- EffortEstimate Hours estimate with confidence and factors """ prompt = self._build_effort_estimation_prompt(task, context) try: response = await self._call_local_llm(prompt) return self._parse_effort_response(response) except Exception as e: logger.error(f"Local LLM effort estimation failed: {e}") # Safe fallback return EffortEstimate( estimated_hours=8.0, confidence=0.3, factors=["default_estimate"], similar_tasks=[], risk_multiplier=1.0, )
[docs] async def analyze_blocker( self, task: Task, blocker: str, context: Dict[str, Any] ) -> List[str]: """ Analyze blocker and suggest solutions. Parameters ---------- task : Task Blocked task blocker : str Description of the blocker context : dict Additional context including severity Returns ------- list of str Prioritized solution suggestions """ severity = context.get("severity", "medium") agent_info = context.get("agent", {}) prompt = f"""Task: {task.name} Blocker: {blocker} Severity: {severity} Agent: {agent_info.get('name', 'Unknown')} (Skills: {agent_info.get('skills', [])}) Analyze this blocker and provide 3-5 specific, actionable solutions. Focus on practical steps the developer can take immediately. Solutions:""" try: response = await self._call_local_llm(prompt) # Parse numbered list solutions = [] for line in response.split("\n"): line = line.strip() if line and (line[0].isdigit() or line.startswith("-")): # Remove numbering/bullets solution = line.lstrip("0123456789.-) ").strip() if solution: solutions.append(solution) return solutions[:5] if solutions else self._get_fallback_solutions() except Exception as e: logger.error(f"Local LLM blocker analysis failed: {e}") return self._get_fallback_solutions()
[docs] async def complete( self, prompt: str, max_tokens: Optional[int] = None, temperature: float | None = None, ) -> str: """ Complete text using local LLM for direct access. Parameters ---------- prompt : str The prompt to complete max_tokens : int, optional Maximum tokens to generate. ``None`` (default) uses ``self.max_tokens`` which is sourced from ``config.ai.max_tokens`` at provider construction. Pass an explicit value only when a single call needs a tighter or looser budget than the project default. temperature : float, optional Sampling temperature (0.0-1.0). If None, uses config value. Returns ------- str The completion text """ if max_tokens is None: max_tokens = self.max_tokens if temperature is None: temperature = self.temperature return await self._call_local_llm(prompt, max_tokens, temperature)
async def _call_local_llm( self, prompt: str, max_tokens: Optional[int] = None, temperature: float = 0.7 ) -> str: """ Make a request to the local LLM server. Parameters ---------- prompt : str The prompt to send max_tokens : int, optional Max tokens to generate temperature : float Sampling temperature Returns ------- str The model's response Raises ------ Exception If the API call fails """ if max_tokens is None: max_tokens = self.max_tokens # OpenAI-compatible format request_data = { "model": self.model, "messages": [ { "role": "system", "content": "You are an AI assistant helping with software " "development task analysis. Provide clear, structured " "responses focusing on practical implementation details.", }, {"role": "user", "content": prompt}, ], "max_tokens": max_tokens, "temperature": temperature, "stream": False, } start = time.monotonic() try: response = await self.client.post("/chat/completions", json=request_data) response.raise_for_status() data = response.json() content = data["choices"][0]["message"]["content"] if not isinstance(content, str): raise Exception(f"Expected string response, got {type(content)}") # OpenAI-compatible servers (incl. local Ollama) usually return # a usage object. Cache fields are absent for non-Anthropic # backends; record_planner_call defaults them to 0. usage = data.get("usage") or {} get_recorder().record_planner_call( operation="analyze", provider=self._cost_provider_name(), model=self.model, input_tokens=int(usage.get("prompt_tokens", 0)), output_tokens=int(usage.get("completion_tokens", 0)), latency_ms=int((time.monotonic() - start) * 1000), request_id=str(data.get("id")) if data.get("id") else None, ) return _strip_reasoning_blocks(content) except httpx.HTTPStatusError as e: if e.response.status_code == 404: # Try Ollama's native API format return await self._call_ollama_native(prompt, max_tokens, temperature) raise Exception( f"Local LLM API error: {e.response.status_code} - {e.response.text}" ) except Exception as e: logger.error(f"Local LLM call failed: {e}") raise def _cost_provider_name(self) -> str: """Return the provider tag for cost events. Subclasses (cloud) override.""" return "local" async def _call_ollama_native( self, prompt: str, max_tokens: int, temperature: float ) -> str: """ Fallback to Ollama's native API format. Some Ollama installations might not have OpenAI compatibility enabled. """ # Ollama native endpoint native_url = self.base_url.replace("/v1", "") request_data = { "model": self.model, "prompt": prompt, "stream": False, "options": { "num_predict": max_tokens, "temperature": temperature, }, } try: response = await self.client.post( f"{native_url}/api/generate", json=request_data ) response.raise_for_status() data = response.json() response_text = data["response"] if not isinstance(response_text, str): raise Exception(f"Expected string response, got {type(response_text)}") return _strip_reasoning_blocks(response_text) except Exception as e: logger.error(f"Ollama native API call failed: {e}") raise Exception(f"Failed to connect to local LLM server: {str(e)}") def _build_task_analysis_prompt(self, task: Task, context: Dict[str, Any]) -> str: """Build prompt for task analysis.""" return f"""Analyze this software development task:. Task: {task.name} Description: {task.description or 'No description provided'} Priority: {task.priority} Labels: {task.labels} Project context: - Type: {context.get('project_type', 'software project')} - Related tasks: {len(context.get('available_tasks', []))} tasks in project Provide a JSON response with: {{ "task_intent": "What this task aims to achieve", "semantic_dependencies": ["task names that should complete first"], "risk_factors": ["potential risks or complexities"], "suggestions": ["specific improvements or considerations"], "confidence": 0.0-1.0, "reasoning": "Brief explanation of analysis", "risk_assessment": {{ "technical": "low/medium/high", "timeline": "low/medium/high", "dependencies": "low/medium/high" }} }}""" def _parse_task_analysis_response(self, response: str) -> SemanticAnalysis: """Parse task analysis response.""" try: data = parse_ai_json_response(response) return SemanticAnalysis( task_intent=data.get("task_intent", "unknown"), semantic_dependencies=data.get("semantic_dependencies", []), risk_factors=data.get("risk_factors", []), suggestions=data.get("suggestions", []), confidence=float(data.get("confidence", 0.5)), reasoning=data.get("reasoning", ""), risk_assessment=data.get("risk_assessment", {}), ) except Exception as e: logger.warning(f"Failed to parse task analysis: {e}") # Return minimal valid response return SemanticAnalysis( task_intent="parse_error", semantic_dependencies=[], risk_factors=["response_parsing_failed"], suggestions=[], confidence=0.1, reasoning=str(e), risk_assessment={}, ) def _build_dependency_inference_prompt(self, tasks: List[Task]) -> str: """Build prompt for dependency inference.""" task_list = "\n".join( [ f"- {t.id}: {t.name} ({t.status})" for t in tasks[:20] ] # Limit for context ) return f"""Analyze these tasks and identify logical dependencies:. {task_list} Return a JSON array of dependencies: [ {{ "dependent_task_id": "task that depends", "dependency_task_id": "task that must complete first", "confidence": 0.0-1.0, "reasoning": "why this dependency exists", "dependency_type": "logical|technical|temporal" }} ] Focus on clear dependencies like: - Tests before deployment - Setup before implementation - Data models before APIs""" def _parse_dependency_response(self, response: str) -> List[SemanticDependency]: """Parse dependency inference response.""" try: parsed_data = parse_ai_json_response(response) # parse_ai_json_response always returns Dict[str, Any] dependency_data = parsed_data.get("dependencies", []) # Handle case where dependencies might be the root array if not dependency_data and isinstance(parsed_data, dict): # Check if the whole response is meant to be the array dict_values = list(parsed_data.values()) if len(dict_values) == 1 and isinstance(dict_values[0], list): dependency_data = dict_values[0] dependencies = [] for dep in dependency_data: dependencies.append( SemanticDependency( dependent_task_id=dep["dependent_task_id"], dependency_task_id=dep["dependency_task_id"], confidence=float(dep.get("confidence", 0.5)), reasoning=dep.get("reasoning", ""), dependency_type=dep.get("dependency_type", "logical"), ) ) return dependencies except Exception as e: logger.warning(f"Failed to parse dependencies: {e}") return [] def _build_effort_estimation_prompt( self, task: Task, context: Dict[str, Any] ) -> str: """Build prompt for effort estimation.""" return f"""Estimate effort for this task:. Task: {task.name} Description: {task.description or 'No description'} Priority: {task.priority} Current estimate: {task.estimated_hours or 'None'} Team context: - Average velocity: {context.get('avg_velocity', 'unknown')} - Tech stack: {context.get('tech_stack', [])} Provide a JSON response: {{ "hours_estimate": number, "confidence": 0.0-1.0, "factors_considered": ["list of factors"], "reasoning": "explanation" }}""" def _parse_effort_response(self, response: str) -> EffortEstimate: """Parse effort estimation response.""" try: data = parse_ai_json_response(response) return EffortEstimate( estimated_hours=float(data.get("hours_estimate", 8.0)), confidence=float(data.get("confidence", 0.5)), factors=data.get("factors_considered", []), similar_tasks=[], risk_multiplier=1.0, ) except Exception as e: logger.warning(f"Failed to parse effort estimate: {e}") return EffortEstimate( estimated_hours=8.0, confidence=0.3, factors=["parse_error"], similar_tasks=[], risk_multiplier=1.0, ) def _get_fallback_solutions(self) -> List[str]: """Get generic fallback solutions for blockers.""" return [ "Review task requirements and dependencies", "Check project documentation for similar issues", "Consult with team lead or senior developer", "Break down the task into smaller components", "Research the specific error or issue online", ]