src.ai package#

Marcus AI Integration Package.

Provides AI-enhanced intelligence while preserving rule-based safety guarantees. Implements hybrid decision making that combines deterministic logic with semantic understanding and intelligent optimization.

class src.ai.MarcusAIEngine[source]#

Bases: object

Central AI coordination engine implementing hybrid intelligence.

Combines rule-based safety guarantees with AI-powered semantic understanding and intelligent optimization while ensuring rules are never overridden.

The engine follows a strict precedence model: 1. Rule-based validation (mandatory, safety-critical) 2. AI enhancement (optional, additive only) 3. Hybrid confidence calculation (weighted combination)

Parameters:

None

llm_client#

AI provider abstraction for semantic analysis

Type:

LLMAbstraction

rule_engine#

Rule-based validation engine

Type:

RuleBasedEngine

hybrid_coordinator#

Coordinates hybrid decision making

Type:

HybridDecisionFramework

ai_enabled#

Whether AI enhancement is enabled

Type:

bool

fallback_on_ai_failure#

Whether to fallback to rules on AI failure

Type:

bool

rule_safety_override#

Whether AI can override rules (always False)

Type:

bool

analyze_with_hybrid_intelligence(context)[source]#

Perform hybrid analysis on task assignment

Parameters:

context (AnalysisContext)

Return type:

HybridAnalysis

enhance_task_with_ai(task, context)[source]#

Enhance task descriptions using AI

Parameters:
Return type:

Dict[str, Any]

analyze_blocker(task_id, blocker_description, severity, agent, task)[source]#

Analyze blockers and suggest solutions

Parameters:
Return type:

List[str]

get_engine_status()[source]#

Get current engine status and configuration

Return type:

Dict[str, Any]

Examples

>>> engine = MarcusAIEngine()
>>> # Analyze task assignment
>>> result = await engine.analyze_with_hybrid_intelligence(context)
>>> if result.allow_assignment:
...     print(f"Assignment allowed with {result.confidence:.0%} confidence")
...
>>> # Get AI suggestions for blocker
>>> suggestions = await engine.analyze_blocker(
...     task_id="task-123",
...     blocker_description="Database connection failed",
...     severity="high",
...     agent=agent_info,
...     task=blocked_task
... )

Notes

AI enhancement is controlled by the MARCUS_AI_ENABLED environment variable. The engine always falls back to rule-based decisions if AI fails, ensuring system reliability.

__init__()[source]#
Return type:

None

async analyze_with_hybrid_intelligence(context)[source]#

Perform hybrid analysis combining rule-based safety with AI intelligence.

Executes a multi-step analysis process: 1. Rule-based validation (mandatory) 2. AI enhancement (if enabled and rules pass) 3. Confidence calculation (weighted combination)

Parameters:

context (AnalysisContext) – Analysis context containing: - task: Task to analyze for assignment - project_context: Project state and metadata

Returns:

Complete analysis result containing: - allow_assignment: Final decision on assignment - confidence: Combined confidence score (0.0-1.0) - reason: Human-readable explanation - ai_confidence: AI model confidence (if available) - ai_insights: Detailed AI analysis (if available) - fallback_mode: Whether AI was unavailable - confidence_breakdown: Component confidence scores

Return type:

HybridAnalysis

Raises:

Exception – Only if AI fails and fallback_on_ai_failure is False

Notes

Rule violations always result in assignment rejection, regardless of AI analysis. AI can only enhance allowed assignments, never override safety rules.

Examples

>>> context = AnalysisContext(
...     task=Task(id="1", name="Deploy to production"),
...     project_context={"available_tasks": tasks}
... )
>>> result = await engine.analyze_with_hybrid_intelligence(context)
>>> if not result.allow_assignment:
...     print(f"Blocked: {result.reason}")
async enhance_task_with_ai(task, context)[source]#

Enhance a task with AI-generated improvements.

Uses AI to generate enhanced descriptions and effort estimates that provide more context and clarity for task execution.

Parameters:
  • task (Task) – Task to enhance with AI insights

  • context (Dict[str, Any]) – Project context for understanding task relationships

Returns:

Enhanced task data containing: - enhanced_description: Improved task description - ai_effort_estimate: AI-based effort estimation - enhancement_confidence: Confidence in enhancements

Return type:

Dict[str, Any]

Notes

Returns empty dict if AI is disabled or enhancement fails. Enhancements are suggestions only and don’t affect task execution.

Examples

>>> enhancements = await engine.enhance_task_with_ai(
...     task=Task(name="Setup CI"),
...     context=project_context
... )
>>> print(enhancements.get('enhanced_description'))
async analyze_blocker(task_id, blocker_description, severity, agent, task)[source]#

Analyze a blocker and suggest solutions using AI.

Uses AI to understand the blocker context and generate actionable suggestions for resolution. Falls back to generic suggestions if AI is unavailable.

Parameters:
  • task_id (str) – ID of the blocked task

  • blocker_description (str) – Detailed description of what’s blocking progress

  • severity (str) – Severity level: ‘low’, ‘medium’, or ‘high’

  • agent (Optional[Dict[str, Any]]) – Information about the agent encountering the blocker

  • task (Optional[Task]) – The blocked task object for context

Returns:

Prioritized list of suggested solutions

Return type:

List[str]

Notes

Always returns at least 3 suggestions, even if AI fails. High-severity blockers receive more detailed analysis.

Examples

>>> suggestions = await engine.analyze_blocker(
...     task_id="task-123",
...     blocker_description="PostgreSQL connection timeout",
...     severity="high",
...     agent={"id": "agent-1", "name": "Backend Dev"},
...     task=database_task
... )
>>> for i, suggestion in enumerate(suggestions, 1):
...     print(f"{i}. {suggestion}")
async get_engine_status()[source]#

Get current AI engine status and configuration.

Returns:

Engine status containing: - ai_enabled: Whether AI enhancement is active - llm_provider: Current LLM provider name - fallback_mode: Whether fallback is enabled - safety_override_disabled: Confirms rules can’t be overridden - components: Status of each engine component

Return type:

Dict[str, Any]

Examples

>>> status = await engine.get_engine_status()
>>> print(f"AI enabled: {status['ai_enabled']}")
>>> print(f"Provider: {status['llm_provider']}")
class src.ai.LLMAbstraction[source]#

Bases: object

Multi-provider LLM abstraction with intelligent fallback.

Supports multiple LLM providers with automatic fallback when primary fails. Provides a unified interface for all AI operations in Marcus.

providers#

Available LLM provider instances

Type:

dict

current_provider#

Name of the primary provider to use

Type:

str

fallback_providers#

Ordered list of providers to try on failure

Type:

list of str

provider_stats#

Performance statistics for each provider

Type:

dict

analyze_task_semantics(task, context)[source]#

Analyze task meaning and intent

Parameters:
Return type:

SemanticAnalysis

infer_dependencies_semantic(tasks)[source]#

Infer logical dependencies between tasks

Parameters:

tasks (List[Task])

Return type:

List[SemanticDependency]

generate_enhanced_description(task, context)[source]#

Create improved task descriptions

Parameters:
Return type:

str

estimate_effort_intelligently(task, context)[source]#

AI-powered effort estimation

Parameters:
Return type:

EffortEstimate

analyze_blocker_and_suggest_solutions(task, blocker, severity, agent)[source]#

Analyze blockers and provide solutions

Parameters:
Return type:

List[str]

Notes

Providers are initialized lazily to avoid circular imports. Statistics are tracked for intelligent provider selection.

__init__()[source]#
Return type:

None

async analyze_task_semantics(task, context)[source]#

Analyze task semantics using the best available provider.

Parameters:
  • task (Task) – Task to analyze for semantic meaning

  • context (Dict[str, Any]) – Project context including related tasks

Returns:

Comprehensive semantic analysis including intent and risks

Return type:

SemanticAnalysis

Notes

Automatically falls back to alternative providers on failure. Tagged analyze_task_semantics via _tagged_operation() so the resulting token_events.operation row carries that label instead of the provider’s default.

async infer_dependencies_semantic(tasks)[source]#

Infer semantic dependencies between tasks.

Parameters:

tasks (List[Task]) – All tasks to analyze for dependencies

Returns:

Inferred logical relationships between tasks

Return type:

List[SemanticDependency]

Notes

Complements rule-based dependency detection with semantic understanding. Tagged infer_dependencies via _tagged_operation().

async generate_enhanced_description(task, context)[source]#

Generate enhanced task description.

Parameters:
  • task (Task) – Task needing clearer description

  • context (Dict[str, Any]) – Project context for better understanding

Returns:

Enhanced description with more detail and clarity

Return type:

str

async estimate_effort_intelligently(task, context)[source]#

Estimate task effort using AI.

Parameters:
  • task (Task) – Task to estimate completion time for

  • context (Dict[str, Any]) – Project context with historical performance data

Returns:

AI-powered time estimate with confidence and factors

Return type:

EffortEstimate

async analyze_blocker_and_suggest_solutions(task, blocker_description, severity, agent)[source]#

Analyze a blocker and suggest solutions.

Parameters:
  • task (Task) – The blocked task

  • blocker_description (str) – Detailed description of the blocker

  • severity (str) – Severity level: ‘low’, ‘medium’, or ‘high’

  • agent (Optional[Dict[str, Any]]) – Agent information for context

Returns:

Prioritized list of solution suggestions

Return type:

List[str]

Notes

Higher severity blockers receive more detailed analysis.

async analyze(prompt, context, *, operation=None)[source]#

Analyze content using LLM.

Parameters:
  • prompt (str) – The prompt to analyze.

  • context (Any) – Analysis context (may carry max_tokens override).

  • operation (Optional[str]) – Logical operation label to attach to the cost event. When provided, the recorder’s active PlannerContext is shadowed with an operation_override for the duration of the call, so token_events.operation records operation instead of whatever default the provider stamps. Used for per-call drill-down in the Cato cost dashboard. See src/cost_tracking/operations.py for the canonical taxonomy and human-readable descriptions.

Returns:

Analysis result as string.

Return type:

str

async switch_provider(provider_name)[source]#

Switch to a different provider.

Parameters:

provider_name (str)

Return type:

bool

Returns:

True if switch successful, False otherwise

get_provider_stats()[source]#

Get performance statistics for all providers.

Return type:

Dict[str, Any]

get_best_provider()[source]#

Determine the best performing provider based on success rate.

Return type:

str

Returns:

Name of best performing provider

async health_check()[source]#

Check health of all providers.

Return type:

Dict[str, Any]

Returns:

Health status for each provider

class src.ai.HybridDecisionFramework[source]#

Bases: object

Hybrid decision framework that merges rule-based safety with AI optimization.

Core principle: Rules provide mandatory safety constraints that AI cannot override. AI provides optimization and enhancement when rules allow the assignment.

__init__()[source]#
Return type:

None

async make_assignment_decision(task, context)[source]#

Make hybrid assignment decision combining rules with AI optimization.

Parameters:
Return type:

AssignmentDecision

Returns:

Assignment decision with reasoning and AI enhancements

async evaluate_assignment_quality(task, agent_id, assignment_outcome)[source]#

Evaluate the quality of a completed assignment for learning.

Parameters:
Return type:

Dict[str, Any]

Returns:

Assignment quality evaluation

async get_framework_stats()[source]#

Get framework performance statistics.

Return type:

Dict[str, Any]

async adjust_weights(rule_weight, ai_weight)[source]#

Adjust confidence weights (with safety constraints).

Parameters:
Return type:

bool

Returns:

True if weights were adjusted, False if rejected for safety

class src.ai.HybridAnalysis[source]#

Bases: object

Result of hybrid rule-based + AI analysis.

Combines deterministic rules with AI enhancement for optimal decisions.

allow_assignment#

Final decision on whether to allow task assignment

Type:

bool

confidence#

Combined confidence score (0.0-1.0)

Type:

float

reason#

Human-readable explanation of the decision

Type:

str

safety_critical#

Whether safety rules were involved in decision

Type:

bool

ai_confidence#

AI model’s confidence if AI was used

Type:

float, optional

ai_insights#

Detailed AI analysis if available

Type:

AIInsights, optional

fallback_mode#

Whether system fell back to rules-only mode

Type:

bool

confidence_breakdown#

Component confidence scores and weights

Type:

dict, optional

optimization_score#

AI’s optimization score for this assignment

Type:

float, optional

Examples

>>> result = HybridAnalysis(
...     allow_assignment=True,
...     confidence=0.85,
...     reason="All checks passed with high confidence"
... )
allow_assignment: bool#
confidence: float#
reason: str#
safety_critical: bool = False#
ai_confidence: float | None = None#
ai_insights: AIInsights | None = None#
fallback_mode: bool = False#
confidence_breakdown: Dict[str, float] | None = None#
optimization_score: float | None = None#
__init__(allow_assignment, confidence, reason, safety_critical=False, ai_confidence=None, ai_insights=None, fallback_mode=False, confidence_breakdown=None, optimization_score=None)#
Parameters:
Return type:

None

class src.ai.AnalysisContext[source]#

Bases: object

Context for AI analysis operations.

Provides all necessary information for making intelligent task assignment decisions.

task#

The task being analyzed for assignment

Type:

Task

project_context#

Current project state including available and assigned tasks

Type:

dict

historical_data#

Historical performance and assignment data

Type:

list of dict

team_context#

Team composition and skill information

Type:

dict, optional

constraints#

Project constraints (deadlines, resources, etc.)

Type:

dict, optional

Examples

>>> context = AnalysisContext(
...     task=task_to_assign,
...     project_context={"available_tasks": tasks},
...     historical_data=performance_history
... )
task: Task#
project_context: Dict[str, Any]#
historical_data: List[Dict[str, Any]]#
team_context: Dict[str, Any] | None = None#
constraints: Dict[str, Any] | None = None#
__init__(task, project_context, historical_data, team_context=None, constraints=None)#
Parameters:
Return type:

None

class src.ai.AssignmentDecision[source]#

Bases: object

Final decision on task assignment.

Represents the complete decision including rule validation, AI enhancement, and audit information.

allow#

Whether to allow the assignment

Type:

bool

confidence#

Overall confidence in decision (0.0-1.0)

Type:

float

reason#

Primary reason for the decision

Type:

str

ai_suggestions#

AI optimization suggestions if rules passed

Type:

AIOptimizationResult, optional

optimization_score#

AI’s score for assignment quality

Type:

float, optional

confidence_breakdown#

Detailed confidence components

Type:

dict, optional

safety_critical#

Whether safety rules were involved

Type:

bool

mandatory_rule_applied#

Whether mandatory rules were applied

Type:

bool

timestamp#

When the decision was made (auto-set)

Type:

datetime

__post_init__()[source]#

Automatically sets timestamp if not provided

Return type:

None

Examples

>>> decision = AssignmentDecision(
...     allow=True,
...     confidence=0.92,
...     reason="All validations passed"
... )
>>> print(f"Decision at {decision.timestamp}: {decision.allow}")
allow: bool#
confidence: float#
reason: str#
ai_suggestions: AIOptimizationResult | None = None#
optimization_score: float | None = None#
confidence_breakdown: Dict[str, float] | None = None#
safety_critical: bool = False#
mandatory_rule_applied: bool = False#
timestamp: datetime | None = None#
__post_init__()[source]#

Set timestamp if not provided.

Return type:

None

__init__(allow, confidence, reason, ai_suggestions=None, optimization_score=None, confidence_breakdown=None, safety_critical=False, mandatory_rule_applied=False, timestamp=None)#
Parameters:
Return type:

None

Subpackages#

Submodules#