src.ai.core.ai_engine module#

Marcus AI Engine - Core AI coordination engine.

This module implements hybrid intelligence that combines rule-based safety with AI enhancement. Key principle: Rules provide safety guarantees, AI provides intelligence enhancement.

The engine consists of two main components: 1. RuleBasedEngine: Provides deterministic, safety-critical validation 2. MarcusAIEngine: Coordinates rule-based and AI-powered analysis

Classes#

RuleBasedEngine

Rule-based analysis engine using existing Phase 1-2 logic

MarcusAIEngine

Central AI coordination engine implementing hybrid intelligence

Examples

>>> from src.ai.core.ai_engine import MarcusAIEngine
>>> from src.ai.types import AnalysisContext
>>>
>>> engine = MarcusAIEngine()
>>> context = AnalysisContext(task=task, project_context=project_data)
>>> result = await engine.analyze_with_hybrid_intelligence(context)
>>> print(f"Assignment allowed: {result.allow_assignment}")
class src.ai.core.ai_engine.RuleBasedEngine[source]#

Bases: object

Rule-based analysis engine using existing Phase 1-2 logic.

Provides deterministic validation using established rules and patterns. This engine ensures safety-critical decisions are made with predictable, auditable logic that cannot be overridden by AI suggestions.

dependency_inferer#

Infers task dependencies based on patterns

Type:

DependencyInferer

adaptive_mode#

Implements adaptive assignment logic

Type:

BasicAdaptiveMode

analyze(context)[source]#

Perform rule-based analysis on task assignment

Parameters:

context (AnalysisContext)

Return type:

RuleBasedResult

Notes

This engine provides the safety foundation for Marcus. All decisions made by this engine are considered mandatory and cannot be overridden by AI suggestions.

__init__()[source]#
Return type:

None

async analyze(context)[source]#

Perform rule-based analysis using existing Phase 1-2 logic.

Validates task assignment based on: - Logical consistency (no obviously illogical assignments) - Dependency satisfaction (all prerequisites complete) - Mandatory patterns (e.g., test before deploy)

Parameters:

context (AnalysisContext) – Analysis context containing: - task: Task to analyze - project_context: Dict with available_tasks, assigned_tasks

Returns:

Analysis result containing: - is_valid: Whether assignment is allowed - confidence: Confidence level (0.0-1.0) - reason: Human-readable explanation - safety_critical: Whether this is a safety rule - mandatory: Whether rule cannot be overridden

Return type:

RuleBasedResult

Examples

>>> context = AnalysisContext(task=deploy_task, project_context=project)
>>> result = await engine.analyze(context)
>>> if not result.is_valid:
...     print(f"Assignment blocked: {result.reason}")
class src.ai.core.ai_engine.MarcusAIEngine[source]#

Bases: object

Central AI coordination engine implementing hybrid intelligence.

Combines rule-based safety guarantees with AI-powered semantic understanding and intelligent optimization while ensuring rules are never overridden.

The engine follows a strict precedence model: 1. Rule-based validation (mandatory, safety-critical) 2. AI enhancement (optional, additive only) 3. Hybrid confidence calculation (weighted combination)

Parameters:

None

llm_client#

AI provider abstraction for semantic analysis

Type:

LLMAbstraction

rule_engine#

Rule-based validation engine

Type:

RuleBasedEngine

hybrid_coordinator#

Coordinates hybrid decision making

Type:

HybridDecisionFramework

ai_enabled#

Whether AI enhancement is enabled

Type:

bool

fallback_on_ai_failure#

Whether to fallback to rules on AI failure

Type:

bool

rule_safety_override#

Whether AI can override rules (always False)

Type:

bool

analyze_with_hybrid_intelligence(context)[source]#

Perform hybrid analysis on task assignment

Parameters:

context (AnalysisContext)

Return type:

HybridAnalysis

enhance_task_with_ai(task, context)[source]#

Enhance task descriptions using AI

Parameters:
Return type:

Dict[str, Any]

analyze_blocker(task_id, blocker_description, severity, agent, task)[source]#

Analyze blockers and suggest solutions

Parameters:
Return type:

List[str]

get_engine_status()[source]#

Get current engine status and configuration

Return type:

Dict[str, Any]

Examples

>>> engine = MarcusAIEngine()
>>> # Analyze task assignment
>>> result = await engine.analyze_with_hybrid_intelligence(context)
>>> if result.allow_assignment:
...     print(f"Assignment allowed with {result.confidence:.0%} confidence")
...
>>> # Get AI suggestions for blocker
>>> suggestions = await engine.analyze_blocker(
...     task_id="task-123",
...     blocker_description="Database connection failed",
...     severity="high",
...     agent=agent_info,
...     task=blocked_task
... )

Notes

AI enhancement is controlled by the MARCUS_AI_ENABLED environment variable. The engine always falls back to rule-based decisions if AI fails, ensuring system reliability.

__init__()[source]#
Return type:

None

async analyze_with_hybrid_intelligence(context)[source]#

Perform hybrid analysis combining rule-based safety with AI intelligence.

Executes a multi-step analysis process: 1. Rule-based validation (mandatory) 2. AI enhancement (if enabled and rules pass) 3. Confidence calculation (weighted combination)

Parameters:

context (AnalysisContext) – Analysis context containing: - task: Task to analyze for assignment - project_context: Project state and metadata

Returns:

Complete analysis result containing: - allow_assignment: Final decision on assignment - confidence: Combined confidence score (0.0-1.0) - reason: Human-readable explanation - ai_confidence: AI model confidence (if available) - ai_insights: Detailed AI analysis (if available) - fallback_mode: Whether AI was unavailable - confidence_breakdown: Component confidence scores

Return type:

HybridAnalysis

Raises:

Exception – Only if AI fails and fallback_on_ai_failure is False

Notes

Rule violations always result in assignment rejection, regardless of AI analysis. AI can only enhance allowed assignments, never override safety rules.

Examples

>>> context = AnalysisContext(
...     task=Task(id="1", name="Deploy to production"),
...     project_context={"available_tasks": tasks}
... )
>>> result = await engine.analyze_with_hybrid_intelligence(context)
>>> if not result.allow_assignment:
...     print(f"Blocked: {result.reason}")
async enhance_task_with_ai(task, context)[source]#

Enhance a task with AI-generated improvements.

Uses AI to generate enhanced descriptions and effort estimates that provide more context and clarity for task execution.

Parameters:
  • task (Task) – Task to enhance with AI insights

  • context (Dict[str, Any]) – Project context for understanding task relationships

Returns:

Enhanced task data containing: - enhanced_description: Improved task description - ai_effort_estimate: AI-based effort estimation - enhancement_confidence: Confidence in enhancements

Return type:

Dict[str, Any]

Notes

Returns empty dict if AI is disabled or enhancement fails. Enhancements are suggestions only and don’t affect task execution.

Examples

>>> enhancements = await engine.enhance_task_with_ai(
...     task=Task(name="Setup CI"),
...     context=project_context
... )
>>> print(enhancements.get('enhanced_description'))
async analyze_blocker(task_id, blocker_description, severity, agent, task)[source]#

Analyze a blocker and suggest solutions using AI.

Uses AI to understand the blocker context and generate actionable suggestions for resolution. Falls back to generic suggestions if AI is unavailable.

Parameters:
  • task_id (str) – ID of the blocked task

  • blocker_description (str) – Detailed description of what’s blocking progress

  • severity (str) – Severity level: ‘low’, ‘medium’, or ‘high’

  • agent (Optional[Dict[str, Any]]) – Information about the agent encountering the blocker

  • task (Optional[Task]) – The blocked task object for context

Returns:

Prioritized list of suggested solutions

Return type:

List[str]

Notes

Always returns at least 3 suggestions, even if AI fails. High-severity blockers receive more detailed analysis.

Examples

>>> suggestions = await engine.analyze_blocker(
...     task_id="task-123",
...     blocker_description="PostgreSQL connection timeout",
...     severity="high",
...     agent={"id": "agent-1", "name": "Backend Dev"},
...     task=database_task
... )
>>> for i, suggestion in enumerate(suggestions, 1):
...     print(f"{i}. {suggestion}")
async get_engine_status()[source]#

Get current AI engine status and configuration.

Returns:

Engine status containing: - ai_enabled: Whether AI enhancement is active - llm_provider: Current LLM provider name - fallback_mode: Whether fallback is enabled - safety_override_disabled: Confirms rules can’t be overridden - components: Status of each engine component

Return type:

Dict[str, Any]

Examples

>>> status = await engine.get_engine_status()
>>> print(f"AI enabled: {status['ai_enabled']}")
>>> print(f"Provider: {status['llm_provider']}")