Source code for src.ai.core.ai_engine

"""
Marcus AI Engine - Core AI coordination engine.

This module implements hybrid intelligence that combines rule-based safety
with AI enhancement. Key principle: Rules provide safety guarantees, AI
provides intelligence enhancement.

The engine consists of two main components:
1. RuleBasedEngine: Provides deterministic, safety-critical validation
2. MarcusAIEngine: Coordinates rule-based and AI-powered analysis

Classes
-------
RuleBasedEngine
    Rule-based analysis engine using existing Phase 1-2 logic
MarcusAIEngine
    Central AI coordination engine implementing hybrid intelligence

Examples
--------
>>> from src.ai.core.ai_engine import MarcusAIEngine
>>> from src.ai.types import AnalysisContext
>>>
>>> engine = MarcusAIEngine()
>>> context = AnalysisContext(task=task, project_context=project_data)
>>> result = await engine.analyze_with_hybrid_intelligence(context)
>>> print(f"Assignment allowed: {result.allow_assignment}")
"""

import logging
import os
from typing import Any, Dict, List, Optional

from src.ai.providers.llm_abstraction import LLMAbstraction
from src.ai.types import (
    AIInsights,
    AnalysisContext,
    HybridAnalysis,
    RuleBasedResult,
)
from src.core.models import Task, TaskStatus

logger = logging.getLogger(__name__)


[docs] class RuleBasedEngine: """ Rule-based analysis engine using existing Phase 1-2 logic. Provides deterministic validation using established rules and patterns. This engine ensures safety-critical decisions are made with predictable, auditable logic that cannot be overridden by AI suggestions. Attributes ---------- dependency_inferer : DependencyInferer Infers task dependencies based on patterns adaptive_mode : BasicAdaptiveMode Implements adaptive assignment logic Methods ------- analyze(context) Perform rule-based analysis on task assignment Notes ----- This engine provides the safety foundation for Marcus. All decisions made by this engine are considered mandatory and cannot be overridden by AI suggestions. """
[docs] def __init__(self) -> None: # Import existing dependency inferer and mode logic from src.intelligence.dependency_inferer import DependencyInferer from src.modes.adaptive.basic_adaptive import BasicAdaptiveMode self.dependency_inferer = DependencyInferer() self.adaptive_mode = BasicAdaptiveMode()
[docs] async def analyze(self, context: AnalysisContext) -> RuleBasedResult: """ Perform rule-based analysis using existing Phase 1-2 logic. Validates task assignment based on: - Logical consistency (no obviously illogical assignments) - Dependency satisfaction (all prerequisites complete) - Mandatory patterns (e.g., test before deploy) Parameters ---------- context : AnalysisContext Analysis context containing: - task: Task to analyze - project_context: Dict with available_tasks, assigned_tasks Returns ------- RuleBasedResult Analysis result containing: - is_valid: Whether assignment is allowed - confidence: Confidence level (0.0-1.0) - reason: Human-readable explanation - safety_critical: Whether this is a safety rule - mandatory: Whether rule cannot be overridden Examples -------- >>> context = AnalysisContext(task=deploy_task, project_context=project) >>> result = await engine.analyze(context) >>> if not result.is_valid: ... print(f"Assignment blocked: {result.reason}") """ task = context.task # Use existing dependency checking logic # Simulate validation by checking if task can be assigned available_tasks = context.project_context.get("available_tasks", []) assigned_tasks = context.project_context.get("assigned_tasks", {}) # Check if this task would be obviously illogical is_illogical = await self.adaptive_mode._is_obviously_illogical( task, available_tasks ) if is_illogical: return RuleBasedResult( is_valid=False, confidence=1.0, reason="Illogical task assignment blocked by rule engine", safety_critical=True, mandatory=True, ) # Check if task is unblocked is_unblocked = await self.adaptive_mode._is_task_unblocked( task, available_tasks, assigned_tasks ) if not is_unblocked: return RuleBasedResult( is_valid=False, confidence=0.95, reason="Task blocked by dependencies", safety_critical=True, mandatory=True, ) # Check for mandatory dependency patterns dependencies_valid = await self._check_mandatory_dependencies(task, context) if not dependencies_valid["valid"]: return RuleBasedResult( is_valid=False, confidence=0.95, reason=dependencies_valid["reason"], safety_critical=True, mandatory=True, ) return RuleBasedResult( is_valid=True, confidence=0.8, reason="All rule-based checks passed" )
async def _check_mandatory_dependencies( self, task: Task, context: AnalysisContext ) -> Dict[str, Any]: """ Check mandatory dependency patterns. Enforces critical safety patterns like ensuring tests complete before deployment tasks can be assigned. Parameters ---------- task : Task Task to check dependencies for context : AnalysisContext Analysis context with project information Returns ------- dict Validation result with keys: - valid (bool): Whether dependencies are satisfied - reason (str): Explanation if invalid Notes ----- Currently implements deployment safety check: deployment tasks cannot be assigned until all testing tasks are complete. """ task_text = f"{task.name} {task.description or ''}".lower() # Check deployment before testing pattern if any( word in task_text for word in ["deploy", "release", "launch", "production"] ): # Check if testing tasks exist and are complete available_tasks = context.project_context.get("available_tasks", []) testing_tasks = [ t for t in available_tasks if any( test_word in f"{t.name} {t.description or ''}".lower() for test_word in ["test", "qa", "quality", "verify"] ) ] if testing_tasks: incomplete_tests = [ t for t in testing_tasks if t.status != TaskStatus.DONE ] if incomplete_tests: return { "valid": False, "reason": ( f"Deployment blocked: {len(incomplete_tests)} " f"testing tasks incomplete" ), } return {"valid": True, "reason": "All mandatory dependencies satisfied"}
[docs] class MarcusAIEngine: """ Central AI coordination engine implementing hybrid intelligence. Combines rule-based safety guarantees with AI-powered semantic understanding and intelligent optimization while ensuring rules are never overridden. The engine follows a strict precedence model: 1. Rule-based validation (mandatory, safety-critical) 2. AI enhancement (optional, additive only) 3. Hybrid confidence calculation (weighted combination) Parameters ---------- None Attributes ---------- llm_client : LLMAbstraction AI provider abstraction for semantic analysis rule_engine : RuleBasedEngine Rule-based validation engine hybrid_coordinator : HybridDecisionFramework Coordinates hybrid decision making ai_enabled : bool Whether AI enhancement is enabled fallback_on_ai_failure : bool Whether to fallback to rules on AI failure rule_safety_override : bool Whether AI can override rules (always False) Methods ------- analyze_with_hybrid_intelligence(context) Perform hybrid analysis on task assignment enhance_task_with_ai(task, context) Enhance task descriptions using AI analyze_blocker(task_id, blocker_description, severity, agent, task) Analyze blockers and suggest solutions get_engine_status() Get current engine status and configuration Examples -------- >>> engine = MarcusAIEngine() >>> # Analyze task assignment >>> result = await engine.analyze_with_hybrid_intelligence(context) >>> if result.allow_assignment: ... print(f"Assignment allowed with {result.confidence:.0%} confidence") ... >>> # Get AI suggestions for blocker >>> suggestions = await engine.analyze_blocker( ... task_id="task-123", ... blocker_description="Database connection failed", ... severity="high", ... agent=agent_info, ... task=blocked_task ... ) Notes ----- AI enhancement is controlled by the MARCUS_AI_ENABLED environment variable. The engine always falls back to rule-based decisions if AI fails, ensuring system reliability. """
[docs] def __init__(self) -> None: self.llm_client = LLMAbstraction() self.rule_engine = RuleBasedEngine() # Import lazily to avoid circular dependency from src.ai.decisions.hybrid_framework import HybridDecisionFramework self.hybrid_coordinator = HybridDecisionFramework() # Configuration - get from config first, env var as override from src.config.marcus_config import get_config config = get_config() # Check config first, then env var, default to true self.ai_enabled = config.ai.enabled if os.getenv("MARCUS_AI_ENABLED") is not None: self.ai_enabled = os.getenv("MARCUS_AI_ENABLED", "true").lower() == "true" self.fallback_on_ai_failure = True self.rule_safety_override = False # Never allow AI to override safety rules logger.info(f"Marcus AI Engine initialized (AI enabled: {self.ai_enabled})")
[docs] async def analyze_with_hybrid_intelligence( self, context: AnalysisContext ) -> HybridAnalysis: """ Perform hybrid analysis combining rule-based safety with AI intelligence. Executes a multi-step analysis process: 1. Rule-based validation (mandatory) 2. AI enhancement (if enabled and rules pass) 3. Confidence calculation (weighted combination) Parameters ---------- context : AnalysisContext Analysis context containing: - task: Task to analyze for assignment - project_context: Project state and metadata Returns ------- HybridAnalysis Complete analysis result containing: - allow_assignment: Final decision on assignment - confidence: Combined confidence score (0.0-1.0) - reason: Human-readable explanation - ai_confidence: AI model confidence (if available) - ai_insights: Detailed AI analysis (if available) - fallback_mode: Whether AI was unavailable - confidence_breakdown: Component confidence scores Raises ------ Exception Only if AI fails and fallback_on_ai_failure is False Notes ----- Rule violations always result in assignment rejection, regardless of AI analysis. AI can only enhance allowed assignments, never override safety rules. Examples -------- >>> context = AnalysisContext( ... task=Task(id="1", name="Deploy to production"), ... project_context={"available_tasks": tasks} ... ) >>> result = await engine.analyze_with_hybrid_intelligence(context) >>> if not result.allow_assignment: ... print(f"Blocked: {result.reason}") """ logger.debug(f"Starting hybrid analysis for task: {context.task.name}") # Step 1: Rule-based analysis (mandatory, never bypassed) rule_result = await self.rule_engine.analyze(context) # Step 2: If rules reject, return immediately (safety first) if not rule_result.is_valid: return HybridAnalysis( allow_assignment=False, confidence=rule_result.confidence, reason=f"Rule violation: {rule_result.reason}", safety_critical=rule_result.safety_critical, ) # Step 3: AI enhancement (only when rules allow) ai_insights = None ai_confidence = None fallback_mode = False if self.ai_enabled: try: ai_result = await self._get_ai_insights(context) ai_insights = ai_result ai_confidence = ai_result.confidence except Exception as e: logger.warning(f"AI analysis failed, falling back to rule-based: {e}") fallback_mode = True if not self.fallback_on_ai_failure: raise else: fallback_mode = True # Step 4: Merge rule and AI confidence final_confidence = self._calculate_hybrid_confidence( rule_result.confidence, ai_confidence ) # Step 5: Build confidence breakdown confidence_breakdown = { "rule_weight": 0.7 if ai_confidence else 1.0, "ai_weight": 0.3 if ai_confidence else 0.0, "rule_component": rule_result.confidence, "ai_component": ai_confidence or 0.0, } # Build reason string ai_part = ( f"AI confidence: {ai_confidence:.2f}" if ai_confidence else "Rule-based only" ) reason = f"Rules passed. {ai_part}" return HybridAnalysis( allow_assignment=True, confidence=final_confidence, reason=reason, ai_confidence=ai_confidence, ai_insights=ai_insights, fallback_mode=fallback_mode, confidence_breakdown=confidence_breakdown, )
async def _get_ai_insights(self, context: AnalysisContext) -> AIInsights: """ Get AI insights for the task. Parameters ---------- context : AnalysisContext Analysis context with task and project information Returns ------- AIInsights AI-generated insights including intent, dependencies, and risks """ semantic_analysis = await self.llm_client.analyze_task_semantics( context.task, context.project_context ) return AIInsights( task_intent=semantic_analysis.task_intent, semantic_dependencies=semantic_analysis.semantic_dependencies, risk_factors=semantic_analysis.risk_factors, suggestions=semantic_analysis.suggestions, confidence=semantic_analysis.confidence, reasoning=semantic_analysis.reasoning, risk_assessment=semantic_analysis.risk_assessment, ) def _calculate_hybrid_confidence( self, rule_confidence: float, ai_confidence: Optional[float] ) -> float: """ Calculate final confidence by weighting rule and AI confidence. Rule confidence is weighted higher (70%) for safety-critical decisions to ensure system reliability. Parameters ---------- rule_confidence : float Confidence from rule-based analysis (0.0-1.0) ai_confidence : float, optional Confidence from AI analysis (0.0-1.0) Returns ------- float Weighted confidence score (0.0-1.0) Notes ----- Current weighting: 70% rules, 30% AI """ if ai_confidence is None: return rule_confidence # Weight rule confidence higher (70%) for safety rule_weight = 0.7 ai_weight = 0.3 return (rule_confidence * rule_weight) + (ai_confidence * ai_weight)
[docs] async def enhance_task_with_ai( self, task: Task, context: Dict[str, Any] ) -> Dict[str, Any]: """ Enhance a task with AI-generated improvements. Uses AI to generate enhanced descriptions and effort estimates that provide more context and clarity for task execution. Parameters ---------- task : Task Task to enhance with AI insights context : dict Project context for understanding task relationships Returns ------- dict Enhanced task data containing: - enhanced_description: Improved task description - ai_effort_estimate: AI-based effort estimation - enhancement_confidence: Confidence in enhancements Notes ----- Returns empty dict if AI is disabled or enhancement fails. Enhancements are suggestions only and don't affect task execution. Examples -------- >>> enhancements = await engine.enhance_task_with_ai( ... task=Task(name="Setup CI"), ... context=project_context ... ) >>> print(enhancements.get('enhanced_description')) """ if not self.ai_enabled: return {} try: enhanced = await self.llm_client.generate_enhanced_description( task, context ) estimates = await self.llm_client.estimate_effort_intelligently( task, context ) return { "enhanced_description": enhanced, "ai_effort_estimate": estimates, "enhancement_confidence": 0.8, # Base confidence for enhancements } except Exception as e: logger.warning(f"AI enhancement failed for task {task.id}: {e}") return {}
[docs] async def analyze_blocker( self, task_id: str, blocker_description: str, severity: str, agent: Optional[Dict[str, Any]], task: Optional[Task], ) -> List[str]: """ Analyze a blocker and suggest solutions using AI. Uses AI to understand the blocker context and generate actionable suggestions for resolution. Falls back to generic suggestions if AI is unavailable. Parameters ---------- task_id : str ID of the blocked task blocker_description : str Detailed description of what's blocking progress severity : str Severity level: 'low', 'medium', or 'high' agent : dict, optional Information about the agent encountering the blocker task : Task, optional The blocked task object for context Returns ------- list of str Prioritized list of suggested solutions Notes ----- Always returns at least 3 suggestions, even if AI fails. High-severity blockers receive more detailed analysis. Examples -------- >>> suggestions = await engine.analyze_blocker( ... task_id="task-123", ... blocker_description="PostgreSQL connection timeout", ... severity="high", ... agent={"id": "agent-1", "name": "Backend Dev"}, ... task=database_task ... ) >>> for i, suggestion in enumerate(suggestions, 1): ... print(f"{i}. {suggestion}") """ if not self.ai_enabled or not task: return [ "Review task dependencies", "Check if prerequisite tasks are complete", "Consult team lead for guidance", ] try: suggestions = await self.llm_client.analyze_blocker_and_suggest_solutions( task, blocker_description, severity, agent ) return suggestions except Exception as e: logger.warning(f"AI blocker analysis failed: {e}") return [ "Review task requirements and dependencies", "Check documentation for similar issues", "Escalate to team lead if blocker persists", ]
[docs] async def get_engine_status(self) -> Dict[str, Any]: """ Get current AI engine status and configuration. Returns ------- dict Engine status containing: - ai_enabled: Whether AI enhancement is active - llm_provider: Current LLM provider name - fallback_mode: Whether fallback is enabled - safety_override_disabled: Confirms rules can't be overridden - components: Status of each engine component Examples -------- >>> status = await engine.get_engine_status() >>> print(f"AI enabled: {status['ai_enabled']}") >>> print(f"Provider: {status['llm_provider']}") """ return { "ai_enabled": self.ai_enabled, "llm_provider": self.llm_client.current_provider, "fallback_mode": self.fallback_on_ai_failure, "safety_override_disabled": not self.rule_safety_override, "components": { "rule_engine": "active", "llm_client": "active" if self.ai_enabled else "disabled", "hybrid_coordinator": "active", }, }