Source code for src.ai.decisions.hybrid_framework

"""
Hybrid Decision Framework for Marcus AI.

Merges rule-based safety constraints with AI intelligence to make
optimal task assignment decisions while never compromising safety.
"""

import logging
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Dict, Optional

if TYPE_CHECKING:
    from src.ai.core.ai_engine import RuleBasedEngine

from src.ai.types import (
    AIOptimizationResult,
    AnalysisContext,
    AssignmentContext,
    AssignmentDecision,
    RuleBasedResult,
)
from src.core.models import Task

logger = logging.getLogger(__name__)


[docs] class AIEngine: """Mock AI engine interface for hybrid framework."""
[docs] async def analyze_assignment_optimality( self, task: Task, context: AssignmentContext ) -> AIOptimizationResult: """ Analyze assignment optimality using AI. Args ---- task: Task to analyze context: Assignment context Returns ------- AI optimization result """ # This would be implemented by the actual AI engine # For now, return reasonable defaults task_text = f"{task.name} {task.description or ''}".lower() # Analyze task complexity complexity_indicators = ["complex", "advanced", "integration", "migration"] is_complex = any(indicator in task_text for indicator in complexity_indicators) # Analyze risk factors risk_indicators = ["deploy", "production", "migration", "critical"] is_risky = any(indicator in task_text for indicator in risk_indicators) # Generate suggestions based on analysis improvements = [] if is_complex: improvements.extend( [ "Consider breaking down into smaller tasks", "Ensure sufficient time allocation", "Plan for thorough testing", ] ) if is_risky: improvements.extend( [ "Implement monitoring and rollback plan", "Schedule during low-traffic hours", "Have backup ready", ] ) # Default suggestions if not improvements: improvements = [ "Follow established coding standards", "Update documentation as needed", "Run tests before completion", ] optimization_score = 0.8 if is_complex: optimization_score -= 0.1 if is_risky: optimization_score -= 0.1 return AIOptimizationResult( confidence=0.85, optimization_score=optimization_score, improvements=improvements, semantic_confidence=0.8, risk_mitigation=["Follow standard procedures", "Test thoroughly"], estimated_completion_time=8.0 if not is_complex else 12.0, )
[docs] class HybridDecisionFramework: """ Hybrid decision framework that merges rule-based safety with AI optimization. Core principle: Rules provide mandatory safety constraints that AI cannot override. AI provides optimization and enhancement when rules allow the assignment. """
[docs] def __init__(self) -> None: # Initialize will be done lazily to avoid circular imports self.rule_engine: Optional["RuleBasedEngine"] = None self.ai_engine = AIEngine() # Mock for now, will be replaced with real AI # Decision weights self.rule_weight = 0.7 # Rules weighted higher for safety self.ai_weight = 0.3 # AI provides enhancement # Safety settings self.allow_ai_override = False # Never allow AI to override safety rules self.require_rule_validation = True # Always validate with rules first logger.info("Hybrid decision framework initialized")
[docs] async def make_assignment_decision( self, task: Task, context: AssignmentContext ) -> AssignmentDecision: """ Make hybrid assignment decision combining rules with AI optimization. Args ---- task: Task to assign context: Assignment context Returns ------- Assignment decision with reasoning and AI enhancements """ logger.debug(f"Making assignment decision for task: {task.name}") # Step 1: Mandatory rule-based validation (never bypassed) rule_result = await self._validate_with_rules(task, context) # Step 2: If rules reject, return immediately (safety first) if not rule_result.is_valid: return AssignmentDecision( allow=False, confidence=rule_result.confidence, reason=f"Rule violation: {rule_result.reason}", safety_critical=rule_result.safety_critical, mandatory_rule_applied=rule_result.mandatory, confidence_breakdown={ "rule_component": rule_result.confidence, "ai_component": 0.0, "rule_weight": 1.0, "ai_weight": 0.0, }, ) # Step 3: Rules allow assignment - get AI optimization ai_result = None try: ai_result = await self.ai_engine.analyze_assignment_optimality( task, context ) except Exception as e: logger.warning( f"AI optimization failed, proceeding with rule-based decision: {e}" ) # Step 4: Calculate hybrid confidence final_confidence = self._calculate_hybrid_confidence( rule_result.confidence, ai_result.confidence if ai_result else None ) # Step 5: Build confidence breakdown confidence_breakdown = { "rule_component": rule_result.confidence, "ai_component": ai_result.confidence if ai_result else 0.0, "rule_weight": self.rule_weight, "ai_weight": self.ai_weight if ai_result else 0.0, } # Step 6: Build final decision reason_parts = [f"Rules passed ({rule_result.confidence:.2f})"] if ai_result: reason_parts.append(f"AI optimization ({ai_result.confidence:.2f})") return AssignmentDecision( allow=True, confidence=final_confidence, reason="; ".join(reason_parts), ai_suggestions=ai_result, optimization_score=ai_result.optimization_score if ai_result else None, confidence_breakdown=confidence_breakdown, safety_critical=False, mandatory_rule_applied=False, )
async def _validate_with_rules( self, task: Task, context: AssignmentContext ) -> RuleBasedResult: """ Validate assignment using rule-based logic. Args ---- task: Task to validate context: Assignment context Returns ------- Rule-based validation result """ # Initialize rule engine lazily if self.rule_engine is None: from src.ai.core.ai_engine import RuleBasedEngine self.rule_engine = RuleBasedEngine() # Convert context to format expected by rule engine analysis_context_data = { "task": task, "project_context": { "available_tasks": context.available_tasks, "project_type": context.project_context.get("project_type", "general"), "tech_stack": context.project_context.get("tech_stack", []), "team_size": context.project_context.get("team_size", 1), }, "historical_data": context.project_context.get("historical_data", []), } analysis_context = AnalysisContext(**analysis_context_data) result = await self.rule_engine.analyze(analysis_context) return result def _calculate_hybrid_confidence( self, rule_confidence: float, ai_confidence: Optional[float] ) -> float: """ Calculate hybrid confidence score. Rule confidence is weighted higher to prioritize safety. AI confidence provides enhancement when available. Args ---- rule_confidence: Confidence from rule-based analysis ai_confidence: Confidence from AI analysis (optional) Returns ------- Weighted confidence score """ if ai_confidence is None: return rule_confidence # Weighted average with rule bias for safety hybrid_confidence = ( rule_confidence * self.rule_weight + ai_confidence * self.ai_weight ) # Ensure confidence stays within bounds return max(0.0, min(1.0, hybrid_confidence))
[docs] async def evaluate_assignment_quality( self, task: Task, agent_id: str, assignment_outcome: Dict[str, Any] ) -> Dict[str, Any]: """ Evaluate the quality of a completed assignment for learning. Args ---- task: Completed task agent_id: Agent who completed the task assignment_outcome: Outcome data (success, time, quality, etc.) Returns ------- Assignment quality evaluation """ evaluation: Dict[str, Any] = { "task_id": task.id, "agent_id": agent_id, "completion_success": assignment_outcome.get("success", True), "actual_time": assignment_outcome.get("completion_time"), "quality_score": assignment_outcome.get("quality_score", 0.8), "blockers_encountered": assignment_outcome.get("blockers", []), "evaluation_timestamp": datetime.now(timezone.utc), } # Calculate assignment effectiveness estimated_time = assignment_outcome.get("estimated_time", 8.0) actual_time = assignment_outcome.get("completion_time", 8.0) time_accuracy: Optional[float] = None if estimated_time > 0: time_accuracy = min(estimated_time, actual_time) / max( estimated_time, actual_time ) evaluation["time_estimation_accuracy"] = time_accuracy # Evaluate decision quality if assignment_outcome.get("success", True) and not assignment_outcome.get( "blockers" ): evaluation["decision_quality"] = "excellent" elif assignment_outcome.get("success", True): evaluation["decision_quality"] = "good" else: evaluation["decision_quality"] = "poor" # Extract learnings learnings = [] if evaluation["decision_quality"] == "excellent": learnings.append("Assignment criteria were well-matched") if assignment_outcome.get("blockers"): learnings.append("Consider better dependency checking") if time_accuracy is not None and time_accuracy < 0.7: learnings.append("Improve time estimation accuracy") evaluation["learnings"] = learnings logger.debug( f"Assignment quality for {task.id}: " f"{evaluation['decision_quality']}" ) return evaluation
[docs] async def get_framework_stats(self) -> Dict[str, Any]: """Get framework performance statistics.""" return { "rule_weight": self.rule_weight, "ai_weight": self.ai_weight, "safety_settings": { "allow_ai_override": self.allow_ai_override, "require_rule_validation": self.require_rule_validation, }, "components": {"rule_engine": "active", "ai_engine": "active"}, }
[docs] async def adjust_weights(self, rule_weight: float, ai_weight: float) -> bool: """ Adjust confidence weights (with safety constraints). Args ---- rule_weight: New rule weight ai_weight: New AI weight Returns ------- True if weights were adjusted, False if rejected for safety """ # Safety constraint: Rule weight must be >= 0.5 for safety if rule_weight < 0.5: logger.warning( f"Rejected weight adjustment: rule weight {rule_weight} < 0.5 minimum" ) return False # Normalize weights total_weight = rule_weight + ai_weight if total_weight <= 0: logger.warning("Rejected weight adjustment: total weight <= 0") return False self.rule_weight = rule_weight / total_weight self.ai_weight = ai_weight / total_weight logger.info( f"Adjusted weights: rule={self.rule_weight:.2f}, ai={self.ai_weight:.2f}" ) return True