"""
Context System for Marcus.
Provides rich context for task assignments including previous implementations,
dependency awareness, and relevant patterns. Enhances agent effectiveness by
reducing time spent understanding existing code and architectural decisions.
"""
# import asyncio # Removed - not needed after lazy loading fix
import logging
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple
from src.core.events import Events, EventTypes
from src.core.models import Priority, Task
from src.core.resilience import with_fallback
logger = logging.getLogger(__name__)
# Optional import for hybrid dependency inference
try:
from src.intelligence.dependency_inferer_hybrid import HybridDependencyInferer
HYBRID_AVAILABLE = True
except ImportError:
HYBRID_AVAILABLE = False
logger.info(
"Hybrid dependency inferer not available, using pattern-based inference"
)
[docs]
@dataclass
class TaskContext:
"""Complete context for a task assignment."""
task_id: str
previous_implementations: Dict[str, Any] = field(default_factory=dict)
dependent_tasks: List[Dict[str, Any]] = field(default_factory=list)
related_patterns: List[Dict[str, Any]] = field(default_factory=list)
architectural_decisions: List[Dict[str, Any]] = field(default_factory=list)
[docs]
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"task_id": self.task_id,
"previous_implementations": self.previous_implementations,
"dependent_tasks": self.dependent_tasks,
"related_patterns": self.related_patterns,
"architectural_decisions": self.architectural_decisions,
}
[docs]
@dataclass
class DependentTask:
"""Information about a task that depends on another."""
task_id: str
task_name: str
expected_interface: str
dependency_type: str = "functional" # functional, data, temporal
[docs]
@dataclass
class Decision:
"""An architectural decision made during development."""
decision_id: str
task_id: str
agent_id: str
timestamp: datetime
what: str # What was decided
why: str # Why it was decided
impact: str # Impact on other components
project_id: Optional[str] = None # Project this decision belongs to
[docs]
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for storage."""
return {
"decision_id": self.decision_id,
"task_id": self.task_id,
"agent_id": self.agent_id,
"timestamp": self.timestamp.isoformat(),
"what": self.what,
"why": self.why,
"impact": self.impact,
"project_id": self.project_id,
}
[docs]
class Context:
"""
Manages context for task assignments.
Features:
- Tracks implementations from completed tasks
- Identifies dependent tasks
- Stores architectural decisions
- Provides rich context for new assignments
- Optional persistence for long-term storage
"""
[docs]
def __init__(
self,
events: Optional[Events] = None,
persistence: Optional[Any] = None,
use_hybrid_inference: bool = True,
ai_engine: Optional[Any] = None,
project_id: Optional[str] = None,
):
"""Initialize the Context system.
Parameters
----------
events : Optional[Events]
Optional Events system for integration.
persistence : Optional[Any]
Optional Persistence instance for storing context.
use_hybrid_inference : bool
Whether to use hybrid dependency inference if available.
ai_engine : Optional[Any]
Optional AI engine for hybrid inference.
project_id : Optional[str]
Project identifier for tracking decisions and artifacts.
"""
self.events = events
self.persistence = persistence
self.project_id = project_id
self.implementations: Dict[str, Dict[str, Any]] = (
{}
) # task_id -> implementation details
self.dependencies: Dict[str, List[DependentTask]] = (
{}
) # task_id -> dependent tasks
self.decisions: List[Decision] = []
self.patterns: Dict[str, List[Dict[str, Any]]] = {} # pattern_type -> examples
self._decision_counter = 0
self.default_infer_dependencies = (
True # Default setting for dependency inference
)
# Set up dependency inference strategy
self.hybrid_inferer = None
if use_hybrid_inference and HYBRID_AVAILABLE and ai_engine:
self.hybrid_inferer = HybridDependencyInferer(ai_engine)
logger.info("Using hybrid dependency inference for better accuracy")
# Mark that persisted data needs loading (lazy loading)
self._persisted_data_loaded = False
async def _ensure_persisted_data_loaded(self) -> None:
"""Ensure persisted data is loaded, loading if necessary."""
if not self._persisted_data_loaded and self.persistence:
await self._load_persisted_data()
self._persisted_data_loaded = True
async def _load_persisted_data(self) -> None:
"""Load persisted decisions from storage."""
try:
# Load recent decisions
if self.persistence:
persisted_decisions = await self.persistence.get_decisions(limit=100)
for decision in persisted_decisions:
if decision not in self.decisions:
self.decisions.append(decision)
# Update decision counter
if self.decisions:
max_id = max(int(d.decision_id.split("_")[1]) for d in self.decisions)
self._decision_counter = max_id
logger.info(f"Loaded {len(persisted_decisions)} decisions from persistence")
except Exception as e:
logger.error(f"Failed to load persisted data: {e}")
[docs]
async def add_implementation(
self, task_id: str, implementation: Dict[str, Any]
) -> None:
"""Add implementation details from a completed task.
Parameters
----------
task_id : str
ID of the completed task.
implementation : Dict[str, Any]
Details about the implementation (APIs, models, patterns).
"""
self.implementations[task_id] = {
"task_id": task_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
**implementation,
}
# Persist implementation if available (with graceful
# degradation)
if self.persistence:
await self._persist_implementation_safe(task_id)
# Extract patterns for future use
if "patterns" in implementation:
for pattern in implementation["patterns"]:
pattern_type = pattern.get("type", "general")
if pattern_type not in self.patterns:
self.patterns[pattern_type] = []
self.patterns[pattern_type].append(
{"task_id": task_id, "pattern": pattern}
)
# Emit event if events system is available
if self.events:
await self.events.publish(
EventTypes.IMPLEMENTATION_FOUND,
"context",
{"task_id": task_id, "implementation": implementation},
)
logger.debug(f"Added implementation context for task {task_id}")
[docs]
def add_dependency(self, task_id: str, dependent_task: DependentTask) -> None:
"""Record that one task depends on another.
Parameters
----------
task_id : str
The task being depended upon.
dependent_task : DependentTask
Information about the dependent task.
"""
if task_id not in self.dependencies:
self.dependencies[task_id] = []
self.dependencies[task_id].append(dependent_task)
logger.debug(
f"Added dependency: {dependent_task.task_name} depends on {task_id}"
)
[docs]
async def log_decision(
self, agent_id: str, task_id: str, what: str, why: str, impact: str
) -> Decision:
"""Log an architectural decision made by an agent.
Parameters
----------
agent_id : str
ID of the agent making the decision.
task_id : str
Current task ID.
what : str
What was decided.
why : str
Reasoning behind the decision.
impact : str
Expected impact on other components.
Returns
-------
Decision
The logged Decision object.
"""
self._decision_counter += 1
now = datetime.now(timezone.utc)
decision = Decision(
decision_id=f"dec_{self._decision_counter}_{now.timestamp()}",
task_id=task_id,
agent_id=agent_id,
timestamp=now,
what=what,
why=why,
impact=impact,
project_id=self.project_id,
)
self.decisions.append(decision)
# Persist decision if persistence is available (with graceful
# degradation)
if self.persistence:
await self._persist_decision_safe(decision)
# Cross-reference to dependent tasks
if task_id in self.dependencies:
for dep_task in self.dependencies[task_id]:
logger.info(f"Decision affects dependent task: {dep_task.task_name}")
# Emit event
if self.events:
await self.events.publish(
EventTypes.DECISION_LOGGED, agent_id, decision.to_dict()
)
return decision
[docs]
async def get_context(
self, task_id: str, task_dependencies: List[str]
) -> TaskContext:
"""Get complete context for a task assignment.
Parameters
----------
task_id : str
The task being assigned.
task_dependencies : List[str]
IDs of tasks this task depends on.
Returns
-------
TaskContext
Complete context for the task.
"""
context = TaskContext(task_id=task_id)
# Get implementations from dependencies
for dep_id in task_dependencies:
if dep_id in self.implementations:
context.previous_implementations[dep_id] = self.implementations[dep_id]
# Get tasks that depend on this one
if task_id in self.dependencies:
context.dependent_tasks = [
{
"task_id": dep.task_id,
"task_name": dep.task_name,
"expected_interface": dep.expected_interface,
"dependency_type": dep.dependency_type,
}
for dep in self.dependencies[task_id]
]
# Get relevant patterns
# For now, include all patterns - could be smarter about filtering
for pattern_type, examples in self.patterns.items():
context.related_patterns.extend(examples[:3]) # Limit to 3 most recent
# Get relevant architectural decisions
relevant_decisions = []
# Include decisions from dependencies
for dep_id in task_dependencies:
relevant_decisions.extend(
[d.to_dict() for d in self.decisions if d.task_id == dep_id]
)
# Include decisions that might affect this task
for decision in self.decisions:
if task_id in decision.impact:
relevant_decisions.append(decision.to_dict())
context.architectural_decisions = relevant_decisions[-5:] # Last 5 relevant
# Emit event
if self.events:
await self.events.publish(
EventTypes.CONTEXT_UPDATED,
"context",
{
"task_id": task_id,
"context_size": {
"implementations": len(context.previous_implementations),
"dependents": len(context.dependent_tasks),
"patterns": len(context.related_patterns),
"decisions": len(context.architectural_decisions),
},
},
)
return context
[docs]
async def analyze_dependencies(
self, tasks: List[Task], infer_implicit: bool = True
) -> Dict[str, List[str]]:
"""
Analyze task list to identify dependencies (both explicit and implicit).
Parameters
----------
tasks
List of all tasks.
infer_implicit
Whether to infer implicit dependencies (default: True).
Returns
-------
Mapping of task_id to list of dependent task IDs
"""
# Use hybrid inferer if available for better accuracy with fewer API calls
if self.hybrid_inferer and infer_implicit:
logger.info("Using hybrid dependency inference")
# Get dependency graph from hybrid inferer
dep_graph = await self.hybrid_inferer.infer_dependencies(tasks)
# Convert to our format
dependency_map: Dict[str, List[str]] = dep_graph.adjacency_list.copy()
# Also include explicit dependencies
for task in tasks:
if task.dependencies:
for dep_id in task.dependencies:
if dep_id not in dependency_map:
dependency_map[dep_id] = []
if task.id not in dependency_map[dep_id]:
dependency_map[dep_id].append(task.id)
return dependency_map
# Fallback to pattern-based inference
fallback_dependency_map: Dict[str, List[str]] = {}
# First, map explicit dependencies
for task in tasks:
if task.dependencies:
for dep_id in task.dependencies:
if dep_id not in fallback_dependency_map:
fallback_dependency_map[dep_id] = []
fallback_dependency_map[dep_id].append(task.id)
# Then, infer implicit dependencies if enabled
if infer_implicit:
inferred_count = 0
for i, task in enumerate(tasks):
for j, other_task in enumerate(tasks):
if i >= j: # Skip self and already processed pairs
continue
# Check if task depends on other_task
if self._infer_dependency(task, other_task):
if other_task.id not in fallback_dependency_map:
fallback_dependency_map[other_task.id] = []
if task.id not in fallback_dependency_map[other_task.id]:
fallback_dependency_map[other_task.id].append(task.id)
inferred_count += 1
logger.info(
f"Inferred: '{task.name}' depends on "
f"'{other_task.name}'"
)
if inferred_count > 0:
logger.info(f"Inferred {inferred_count} implicit dependencies")
# Check for circular dependencies
cycles = self._detect_circular_dependencies(fallback_dependency_map, tasks)
if cycles:
logger.warning(f"Circular dependencies detected: {cycles}")
return fallback_dependency_map
def _infer_dependency(self, task: Task, potential_dependency: Task) -> bool:
"""
Infer if task depends on potential_dependency using multiple strategies.
Parameters
----------
task
The task to check.
potential_dependency
The potential dependency.
Returns
-------
True if dependency is likely
"""
# Extract task information
task_labels = set(label.lower() for label in (task.labels or []))
task_name_words = set(task.name.lower().split())
dep_labels = set(label.lower() for label in (potential_dependency.labels or []))
dep_name_words = set(potential_dependency.name.lower().split())
logger.debug(
f"Checking if '{task.name}' depends on '{potential_dependency.name}'"
)
logger.debug(f"Task labels: {task_labels}, words: {task_name_words}")
logger.debug(f"Dep labels: {dep_labels}, words: {dep_name_words}")
# Strategy 1: Pattern-based rules (enhanced)
inference_rules = [
# Frontend depends on backend
(
[
"frontend",
"ui",
"client",
"react",
"vue",
"angular",
"webapp",
"dashboard",
],
["backend", "api", "server", "endpoint", "rest", "graphql", "service"],
),
# Mobile apps depend on backend
(
["mobile", "ios", "android", "react-native", "flutter", "app"],
["backend", "api", "auth", "server", "endpoint"],
),
# Tests depend on implementation
(
[
"test",
"spec",
"unittest",
"integration-test",
"e2e",
"qa",
"testing",
],
[
"implement",
"feature",
"api",
"service",
"component",
"function",
"endpoint",
],
),
# Deployment depends on build
(
["deploy", "deployment", "release", "publish", "production", "staging"],
["build", "compile", "bundle", "package", "docker", "container"],
),
# Documentation depends on implementation
(
["docs", "documentation", "readme", "guide", "manual", "wiki"],
["implement", "feature", "api", "component", "interface", "service"],
),
# Database migrations depend on schema
(
["migration", "migrate", "update-db", "db-change", "alter-table"],
["schema", "database", "model", "entity", "table", "orm"],
),
# Integration depends on components
(
["integration", "integrate", "connect", "bridge", "adapter"],
["component", "service", "module", "api", "interface", "system"],
),
# Configuration depends on infrastructure
(
["config", "configure", "settings", "environment", "env"],
["infrastructure", "setup", "install", "provision", "initialize"],
),
# Security depends on authentication
(
["security", "secure", "protect", "authorize", "permission"],
["auth", "authentication", "user", "role", "identity"],
),
# Monitoring depends on deployment
(
["monitor", "monitoring", "metrics", "logging", "observability"],
["deploy", "service", "api", "application", "system"],
),
]
for dependent_keywords, dependency_keywords in inference_rules:
if any(
kw in task_labels or kw in task_name_words for kw in dependent_keywords
) and any(
kw in dep_labels or kw in dep_name_words for kw in dependency_keywords
):
return True
# Strategy 2: Action-based inference (verb analysis)
task_action = self._extract_action(task.name)
dep_action = self._extract_action(potential_dependency.name)
action_dependencies = {
"update": ["create", "implement", "build"],
"test": ["implement", "create", "build"],
"deploy": ["test", "build", "package"],
"document": ["implement", "create", "design"],
"refactor": ["implement", "create"],
"optimize": ["implement", "measure"],
"integrate": ["implement", "create", "build"],
"configure": ["install", "setup", "create"],
}
if task_action in action_dependencies:
if dep_action in action_dependencies[task_action]:
return True
# Strategy 3: Entity-based inference (same entity, different actions)
task_entity = self._extract_entity(task.name)
dep_entity = self._extract_entity(potential_dependency.name)
if task_entity and dep_entity and task_entity == dep_entity:
# Same entity - check if actions have natural order
action_order = [
"design",
"plan",
"create",
"implement",
"build",
"test",
"document",
"deploy",
"monitor",
]
try:
task_order = (
action_order.index(task_action)
if task_action in action_order
else -1
)
dep_order = (
action_order.index(dep_action) if dep_action in action_order else -1
)
# If both have known actions and task comes after dependency
if task_order > dep_order >= 0:
return True
except ValueError:
pass
# Special case: API depends on schema/database
if ("api" in task_labels or "api" in task_name_words) and (
"schema" in dep_labels
or "schema" in dep_name_words
or "database" in dep_labels
or "database" in dep_name_words
):
logger.info(
f"Special rule matched: {task.name} depends on "
f"{potential_dependency.name}"
)
return True
# Strategy 4: Technical stack dependencies (enhanced)
tech_dependencies = {
"frontend": ["api", "backend", "auth", "database", "websocket", "graphql"],
"mobile": ["api", "backend", "auth", "push-notification", "sync"],
"cli": ["api", "core", "library", "config", "auth"],
"api": ["database", "auth", "model", "validation", "middleware"],
"auth": ["database", "user", "model", "session", "token"],
"admin": ["api", "auth", "role", "permission", "user-management"],
"analytics": ["database", "api", "data", "aggregation", "metrics"],
"notification": ["queue", "email", "sms", "push", "template"],
"payment": ["api", "security", "validation", "gateway", "transaction"],
"search": ["index", "database", "api", "filter", "ranking"],
}
for tech, deps in tech_dependencies.items():
if tech in task_labels or tech in task_name_words:
if any(d in dep_labels or d in dep_name_words for d in deps):
return True
# Strategy 5: Cross-functional dependencies
cross_functional_patterns = [
# Admin interfaces need user management
(["admin", "management", "dashboard"], ["user", "role", "permission"]),
# Real-time features need websockets
(
["realtime", "live", "chat", "collaboration"],
["websocket", "pubsub", "stream"],
),
# File handling needs storage
(
["upload", "file", "document", "image"],
["storage", "s3", "cdn", "bucket"],
),
# Email features need templates
(["email", "mail", "notification"], ["template", "smtp", "mailer"]),
# Search needs indexing
(["search", "find", "query"], ["index", "elasticsearch", "lucene"]),
# Reports need data aggregation
(["report", "analytics", "dashboard"], ["aggregation", "query", "data"]),
# Import/export needs data processing
(["import", "export", "etl"], ["process", "transform", "validate"]),
]
for feature_keywords, required_keywords in cross_functional_patterns:
if any(
kw in task_labels or kw in task_name_words for kw in feature_keywords
):
if any(
kw in dep_labels or kw in dep_name_words for kw in required_keywords
):
return True
# Strategy 6: Semantic similarity for compound tasks
# If tasks share significant keywords, they might be related
common_words = task_name_words & dep_name_words
# Filter out common stop words
stop_words = {"the", "a", "an", "and", "or", "for", "to", "in", "of", "with"}
meaningful_common = common_words - stop_words
# If they share 2+ meaningful words, consider it a potential dependency
if len(meaningful_common) >= 2:
# Check if the dependency comes "before" in the development flow
if dep_action in [
"create",
"implement",
"build",
"setup",
] and task_action in ["test", "use", "integrate", "deploy"]:
logger.info(
f"Semantic similarity: '{task.name}' likely depends "
f"on '{potential_dependency.name}'"
)
return True
return False
def _extract_action(self, task_name: str) -> Optional[str]:
"""Extract the primary action verb from a task name."""
words = task_name.lower().split()
common_actions = {
"create",
"build",
"implement",
"design",
"test",
"deploy",
"update",
"refactor",
"optimize",
"document",
"integrate",
"configure",
"setup",
"install",
"add",
"remove",
"fix",
}
for word in words:
if word in common_actions:
return word
return None
def _extract_entity(self, task_name: str) -> Optional[str]:
"""Extract the primary entity/component from a task name."""
# Remove common actions to find entity
words = task_name.lower().split()
action_words = {
"create",
"build",
"implement",
"design",
"test",
"deploy",
"update",
"refactor",
"optimize",
"document",
"integrate",
"configure",
"setup",
"install",
"add",
"remove",
"fix",
"for",
}
entity_words = [w for w in words if w not in action_words and len(w) > 2]
# Common entities to look for
common_entities = {
"api",
"database",
"auth",
"user",
"login",
"dashboard",
"payment",
"notification",
"email",
"search",
"report",
}
for word in entity_words:
if word in common_entities:
return word
# Return first non-action word as entity
return entity_words[0] if entity_words else None
def _detect_circular_dependencies(
self, dependency_map: Dict[str, List[str]], tasks: List[Task]
) -> List[List[str]]:
"""
Detect circular dependencies using depth-first search.
Parameters
----------
dependency_map
Mapping of task_id to dependent task_ids.
tasks
List of all tasks.
Returns
-------
List of circular dependency chains
"""
# Build task lookup for names
task_lookup = {task.id: task.name for task in tasks}
# Track visited nodes and recursion stack
visited = set()
rec_stack = set()
cycles = []
def dfs(task_id: str, path: List[str]) -> None:
"""Depth-first search to find cycles."""
visited.add(task_id)
rec_stack.add(task_id)
path.append(task_id)
# Check all dependencies
if task_id in dependency_map:
for dependent_id in dependency_map[task_id]:
if dependent_id not in visited:
dfs(dependent_id, path.copy())
elif dependent_id in rec_stack:
# Found a cycle
cycle_start = path.index(dependent_id)
cycle = path[cycle_start:] + [dependent_id]
# Convert to task names for readability
cycle_names = [task_lookup.get(tid, tid) for tid in cycle]
cycles.append(cycle_names)
rec_stack.remove(task_id)
# Check all tasks
all_task_ids = set(task.id for task in tasks)
for task_id in all_task_ids:
if task_id not in visited:
dfs(task_id, [])
return cycles
[docs]
def infer_needed_interface(
self, dependent_task: Task, dependency_task_id: str
) -> str:
"""
Infer what interface a dependent task needs from its dependency.
Parameters
----------
dependent_task
The task that depends on another.
dependency_task_id
The ID of the task it depends on.
Returns
-------
String describing the expected interface/functionality
"""
# Extract task information for dependent task
dep_name = dependent_task.name.lower()
dep_labels = [label.lower() for label in (dependent_task.labels or [])]
# Try to find the dependency task to get its information
# Look in implementations first
if dependency_task_id in self.implementations:
# Get task info from implementation if available
impl = self.implementations[dependency_task_id]
dep_task_name = impl.get("task_name", "").lower()
dep_task_labels = [label.lower() for label in impl.get("labels", [])]
else:
# For now, infer from the task ID and name patterns
dep_task_name = dependency_task_id.lower()
dep_task_labels = []
# Infer labels from common patterns in task names/IDs
if "api" in dep_task_name:
dep_task_labels.append("api")
if "backend" in dep_task_name:
dep_task_labels.append("backend")
if "frontend" in dep_task_name:
dep_task_labels.append("frontend")
# Common interface patterns based on task types
interface_patterns = {
# Frontend needs from backend
("frontend", "api"): "REST API endpoints with JSON responses",
("frontend", "backend"): "REST API endpoints with JSON responses",
("frontend", "auth"): (
"Authentication endpoints (/login, /logout, /verify) and JWT "
"token validation"
),
(
"ui",
"api",
): "Well-documented API endpoints with consistent response formats",
(
"client",
"backend",
): "API endpoints, authentication middleware, and CORS configuration",
# Mobile app needs
(
"mobile",
"api",
): "RESTful API with mobile-optimized responses and token-based auth",
("ios", "backend"): "API endpoints with Swift-compatible JSON structures",
(
"android",
"backend",
): "API endpoints with Kotlin/Java-compatible responses",
# Testing needs
(
"test",
"api",
): "Documented endpoints with example requests/responses for testing",
(
"test",
"frontend",
): "UI components with stable interfaces and test IDs",
(
"integration-test",
"service",
): "Service interfaces and test data endpoints",
(
"e2e",
"frontend",
): "Stable UI elements with test IDs and predictable states",
# ML patterns (must come before general deployment patterns)
(
"deployment",
"training",
): "Trained model artifacts with performance metrics and configuration",
(
"deployment",
"model",
): "Model file with metadata and deployment configuration",
(
"production",
"model",
): "Model file with metadata and deployment configuration",
# General deployment needs
("deploy", "build"): "Build artifacts, Docker images, or compiled bundles",
(
"deployment",
"config",
): "Environment configuration files and deployment scripts",
("release", "package"): "Versioned packages with dependency specifications",
# Documentation needs
(
"docs",
"api",
): "OpenAPI/Swagger specs, endpoint descriptions, and examples",
(
"documentation",
"implementation",
): "Code comments, architectural decisions, and usage examples",
# Database/data needs
("api", "database"): "Database schema, models, and migration scripts",
(
"service",
"schema",
): "Data models with validation rules and relationships",
("migration", "model"): "Entity definitions and database change scripts",
# Data processing patterns
(
"transformation",
"extraction",
): "Extracted data in standardized format with clear schema",
(
"data",
"data",
): "Processed data format with documented structure",
# Integration needs
(
"integration",
"service",
): "Service interfaces, data contracts, and connection configs",
("connector", "api"): "API client libraries or SDK implementations",
}
# Check for pattern matches - match both dependent and dependency types
for (dep_type, prereq_type), interface in interface_patterns.items():
# Check if dependent task matches the pattern (support variations)
dep_match = (
dep_type in dep_labels
or dep_type in dep_name
or (
dep_type == "test"
and ("testing" in dep_labels or "integration" in dep_labels)
)
)
# Check if dependency task matches the prerequisite pattern
# (support variations)
prereq_match = (
prereq_type in dep_task_labels
or prereq_type in dep_task_name
or (
prereq_type == "training"
and ("training" in dep_task_labels or "model" in dep_task_labels)
)
)
if dep_match and prereq_match:
return interface
# Fallback: check just dependent task type for common patterns
for (dep_type, prereq_type), interface in interface_patterns.items():
dep_match = (
dep_type in dep_labels
or dep_type in dep_name
or (
dep_type == "test"
and ("testing" in dep_labels or "integration" in dep_labels)
)
)
if dep_match:
return interface
# Specific keyword-based interfaces
if "admin" in dep_name or "admin" in dep_labels:
return (
"User authentication with role-based access control "
"(admin role required)"
)
if "payment" in dep_name or "payment" in dep_labels:
return "Payment processing API with secure transaction handling"
if "notification" in dep_name or "notification" in dep_labels:
return "Message queue or notification service interface"
if "search" in dep_name or "search" in dep_labels:
return "Search API with filtering, pagination, and relevance scoring"
if "report" in dep_name or "analytics" in dep_name:
return "Data aggregation endpoints and analytics APIs"
# Default based on common patterns
if any(frontend in dep_labels for frontend in ["frontend", "ui", "client"]):
return "Backend API endpoints with authentication and data operations"
if any(test in dep_labels for test in ["test", "spec", "qa"]):
return "Testable interfaces with clear contracts and error handling"
# Generic default
return "Implementation that can be integrated by dependent components"
[docs]
async def suggest_task_order(self, tasks: List[Task]) -> List[Task]:
"""
Suggest an optimal order for tasks based on dependencies.
Uses topological sort with priority consideration.
Parameters
----------
tasks
List of tasks to order.
Returns
-------
Ordered list of tasks
"""
# Build dependency graph
dep_map = await self.analyze_dependencies(tasks)
# Build reverse map (task -> its dependencies)
# This combines explicit dependencies with inferred ones
task_deps = {}
for task in tasks:
task_deps[task.id] = set(task.dependencies or [])
# Add inferred dependencies from dep_map
# dep_map format: {dependency_id: [dependent_ids]}
for dependency_id, dependents in dep_map.items():
for dependent_id in dependents:
if dependent_id in task_deps:
task_deps[dependent_id].add(dependency_id)
# Count incoming edges (how many dependencies each task has)
in_degree = {task.id: len(task_deps.get(task.id, set())) for task in tasks}
# Priority queue for tasks with no dependencies
# Use negative priority for max heap behavior
import heapq
ready: List[Tuple[int, Any, Task]] = []
for task in tasks:
if in_degree[task.id] == 0:
# Sort by priority then by creation date
priority_value = {
Priority.URGENT: 0,
Priority.HIGH: 1,
Priority.MEDIUM: 2,
Priority.LOW: 3,
}.get(task.priority, 2)
heapq.heappush(
ready, (priority_value, task.created_at.timestamp(), task)
)
ordered = []
while ready:
_, _, task = heapq.heappop(ready)
ordered.append(task)
# Reduce in-degree for tasks that depend on this one
# Check both explicit dependencies and inferred ones
for other_task_id, deps in task_deps.items():
if task.id in deps:
in_degree[other_task_id] -= 1
if in_degree[other_task_id] == 0:
# Find task object
dependent_task = next(
(t for t in tasks if t.id == other_task_id), None
)
if dependent_task:
priority_value = {
Priority.URGENT: 0,
Priority.HIGH: 1,
Priority.MEDIUM: 2,
Priority.LOW: 3,
}.get(dependent_task.priority, 2)
heapq.heappush(
ready,
(
priority_value,
dependent_task.created_at.timestamp(),
dependent_task,
),
)
# If not all tasks were ordered, there's a cycle
if len(ordered) < len(tasks):
logger.warning("Could not order all tasks due to circular dependencies")
# Add remaining tasks at the end
ordered_ids = {t.id for t in ordered}
for task in tasks:
if task.id not in ordered_ids:
ordered.append(task)
return ordered
[docs]
async def get_decisions_for_task(self, task_id: str) -> List[Decision]:
"""
Get all decisions related to a specific task.
Parameters
----------
task_id
The task ID.
Returns
-------
List of related decisions
"""
await self._ensure_persisted_data_loaded()
return [d for d in self.decisions if d.task_id == task_id]
@with_fallback(
lambda self, task_id: logger.warning(
f"Failed to persist implementation for {task_id}"
)
)
async def _persist_implementation_safe(self, task_id: str) -> None:
"""Persist implementation with graceful degradation."""
if self.persistence:
await self.persistence.store(
"implementations", task_id, self.implementations[task_id]
)
@with_fallback(
lambda self, decision: logger.warning(
f"Failed to persist decision {decision.decision_id}"
)
)
async def _persist_decision_safe(self, decision: Decision) -> None:
"""Persist decision with graceful degradation."""
if self.persistence:
await self.persistence.store(
"decisions", decision.decision_id, decision.__dict__
)
[docs]
async def get_implementation_summary(self) -> Dict[str, Any]:
"""
Get a summary of all tracked implementations.
Returns
-------
Summary statistics and recent implementations
"""
await self._ensure_persisted_data_loaded()
return {
"total_implementations": len(self.implementations),
"total_decisions": len(self.decisions),
"pattern_types": list(self.patterns.keys()),
"recent_implementations": list(self.implementations.values())[-5:],
"tasks_with_dependents": len(self.dependencies),
}
[docs]
async def clear_old_data(self, days: int = 30) -> None:
"""
Clear context data older than specified days.
Parameters
----------
days
Number of days to retain.
"""
await self._ensure_persisted_data_loaded()
cutoff = datetime.now(timezone.utc).timestamp() - (days * 24 * 60 * 60)
# Clear old implementations
self.implementations = {
k: v
for k, v in self.implementations.items()
if datetime.fromisoformat(v["timestamp"]).timestamp() > cutoff
}
# Clear old decisions
self.decisions = [d for d in self.decisions if d.timestamp.timestamp() > cutoff]
logger.info(f"Cleared context data older than {days} days")