Source code for src.ai.providers.cloud_provider

"""
Cloud LLM Provider for Marcus AI.

Generic OpenAI-compatible cloud provider. Works with any service that
exposes an OpenAI-style ``/chat/completions`` endpoint — Fireworks AI,
Groq, Together AI, Mistral, DeepSeek, and others.

Classes
-------
CloudLLMProvider
    Cloud model provider for OpenAI-compatible hosted inference APIs

Examples
--------
>>> provider = CloudLLMProvider(
...     model="accounts/fireworks/models/qwen2p5-coder-7b-instruct",
...     api_key="fw_abc123",  # pragma: allowlist secret
...     url="https://api.fireworks.ai/inference/v1",
... )
>>> result = await provider.analyze_task(task, context)
"""

import logging
import time
from typing import Optional

import httpx

from src.cost_tracking.cost_recorder import get_recorder

from .local_provider import LocalLLMProvider, _strip_reasoning_blocks

logger = logging.getLogger(__name__)


[docs] class CloudLLMProvider(LocalLLMProvider): """Generic cloud LLM provider for OpenAI-compatible hosted APIs. Inherits all semantic-analysis business logic from ``LocalLLMProvider`` and overrides the transport layer only: the HTTP client is pointed at an explicit remote URL with a required API key. The Ollama native-API fallback present in the parent is intentionally omitted — cloud endpoints do not implement it. Parameters ---------- model : str Full model identifier as expected by the remote service (e.g. ``"accounts/fireworks/models/qwen2p5-coder-7b-instruct"``). api_key : str Bearer token for the remote service. url : str Base URL of the OpenAI-compatible inference endpoint (e.g. ``"https://api.fireworks.ai/inference/v1"``). Raises ------ ValueError If ``model``, ``api_key``, or ``url`` is empty. """
[docs] def __init__(self, model: str, api_key: str, url: str) -> None: """Initialize cloud LLM provider. Parameters ---------- model : str Model identifier for the remote service. api_key : str Bearer token — must not be empty. url : str Base URL for the OpenAI-compatible API — must not be empty. """ if not model: raise ValueError("CloudLLMProvider requires a non-empty model name") if not api_key: raise ValueError("CloudLLMProvider requires a non-empty api_key") if not url: raise ValueError("CloudLLMProvider requires a non-empty cloud_url") # Pull global settings (temperature, max_tokens) from config; skip # LocalLLMProvider.__init__ entirely because it reads local_model / # local_url / local_key — none of which apply here. from src.config.marcus_config import get_config config = get_config() self.base_url: str = url self.model: str = model self.max_tokens: int = config.ai.max_tokens self.temperature: float = config.ai.temperature # Cloud APIs respond faster than local inference self.timeout: float = 60.0 self.client = httpx.AsyncClient( headers={ "Content-Type": "application/json", "Authorization": f"Bearer {api_key}", }, timeout=self.timeout, base_url=url, ) logger.info("Cloud LLM provider initialized: model=%s url=%s", model, url)
async def _call_cloud_llm( self, prompt: str, max_tokens: Optional[int] = None, temperature: Optional[float] = None, ) -> str: """Make a request to the cloud OpenAI-compatible endpoint. Unlike ``LocalLLMProvider._call_local_llm``, this method does NOT fall back to the Ollama native API on a 404; cloud services are not Ollama and a 404 is always an error. Parameters ---------- prompt : str User prompt to send. max_tokens : int, optional Token budget; defaults to ``self.max_tokens`` from config. temperature : float, optional Sampling temperature; defaults to ``self.temperature`` from config. Returns ------- str Decoded model response with leading ``<think>`` blocks stripped. Raises ------ Exception On any HTTP or network error. """ if max_tokens is None: max_tokens = self.max_tokens else: # Floor caller-supplied budget to config.ai.max_tokens. Reasoning # models (R1, Qwen3p5, etc.) spend tokens on chain-of-thought # before emitting JSON; when truncated, providers like Fireworks # spill the partial CoT into `content` and break JSON parsers. max_tokens = max(max_tokens, self.max_tokens) if temperature is None: temperature = self.temperature request_data = { "model": self.model, "messages": [ { "role": "system", "content": ( "You are an AI assistant helping with software " "development task analysis. Provide clear, structured " "responses focusing on practical implementation details." ), }, {"role": "user", "content": prompt}, ], "max_tokens": max_tokens, "temperature": temperature, # Disable reasoning/chain-of-thought on models that support it # (Qwen3, DeepSeek-R1 via Fireworks). Drastically reduces tokens # and keeps JSON in `content` rather than `reasoning_content`. "reasoning_effort": "none", "stream": False, } start = time.monotonic() try: response = await self.client.post("/chat/completions", json=request_data) response.raise_for_status() data = response.json() content = data["choices"][0]["message"]["content"] if not isinstance(content, str): raise Exception(f"Expected string response, got {type(content)}") usage = data.get("usage") or {} get_recorder().record_planner_call( operation="analyze", provider="cloud", model=self.model, input_tokens=int(usage.get("prompt_tokens", 0)), output_tokens=int(usage.get("completion_tokens", 0)), latency_ms=int((time.monotonic() - start) * 1000), request_id=str(data.get("id")) if data.get("id") else None, ) return _strip_reasoning_blocks(content) except httpx.HTTPStatusError as e: raise Exception( f"Cloud LLM API error: {e.response.status_code} - " f"{e.response.text}" ) except Exception: logger.error("Cloud LLM call failed for model %s", self.model) raise def _cost_provider_name(self) -> str: """Return ``'cloud'`` as the cost-event provider tag. Overrides the parent's ``'local'`` so cost rows from this class attribute correctly to the cloud provider. """ return "cloud" # Override the internal dispatch used by all inherited business methods async def _call_local_llm( self, prompt: str, max_tokens: Optional[int] = None, temperature: float = 0.7, ) -> str: """Delegate to ``_call_cloud_llm``. ``LocalLLMProvider`` routes all LLM calls through this method. We override it here so every inherited business method (``analyze_task``, ``infer_dependencies``, etc.) transparently uses the cloud endpoint instead of a local server. """ return await self._call_cloud_llm( prompt, max_tokens=max_tokens, temperature=temperature )