"""Project Context Manager for Marcus Multi-Project Support.
Manages project switching, state isolation, and kanban client lifecycle
for multiple concurrent projects.
"""
import asyncio
import logging
from collections import OrderedDict
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Dict, Optional
from src.config.marcus_config import get_config
from src.core.assignment_persistence import AssignmentPersistence
from src.core.context import Context
from src.core.event_loop_utils import EventLoopLockManager
from src.core.events import Events
from src.core.models import ProjectState
from src.core.persistence import Persistence
from src.core.project_registry import ProjectConfig, ProjectRegistry
from src.integrations.kanban_factory import KanbanFactory
from src.integrations.kanban_interface import KanbanInterface
from src.logging.conversation_logger import conversation_logger
logger = logging.getLogger(__name__)
[docs]
class ProjectContext:
"""Container for project-specific state and services."""
[docs]
def __init__(self, project_id: str):
self.project_id = project_id
self.kanban_client: Optional[KanbanInterface] = None
self.context: Optional[Context] = None
self.events: Optional[Events] = None
self.project_state: Optional[ProjectState] = None
self.assignment_persistence: Optional[AssignmentPersistence] = None
self.last_accessed = datetime.now(timezone.utc)
self.is_connected = False
[docs]
class ProjectContextManager:
"""Manages multiple project contexts with state isolation.
Handles:
- Project switching
- Kanban client lifecycle
- State isolation per project
- Resource cleanup for inactive projects
"""
MAX_CACHED_PROJECTS = 10
IDLE_TIMEOUT_MINUTES = 30
[docs]
def __init__(
self,
registry: Optional[ProjectRegistry] = None,
global_context: Optional[Context] = None,
):
"""Initialize the project context manager.
Parameters
----------
registry : Optional[ProjectRegistry]
Optional project registry instance.
global_context : Optional[Context]
Optional global context instance to update with project_id changes.
"""
self.registry = registry or ProjectRegistry()
self.persistence = Persistence()
self.config = get_config()
# Use OrderedDict for LRU behavior
self.contexts: OrderedDict[str, ProjectContext] = OrderedDict()
self.active_project_id: Optional[str] = None
self.active_project_name: Optional[str] = None
# Global context to sync project_id with
self._global_context = global_context
# Lock manager for event loop safe operations
self._lock_manager = EventLoopLockManager()
# Background task for cleanup
self._cleanup_task: Optional[asyncio.Task[None]] = None
@property
def lock(self) -> asyncio.Lock:
"""Get context lock for the current event loop."""
return self._lock_manager.get_lock()
[docs]
def set_global_context(self, context: Context) -> None:
"""Set the global context to sync project_id with.
Parameters
----------
context : Context
The global context instance to update when projects switch.
"""
self._global_context = context
logger.debug("Global context linked to project context manager")
[docs]
async def initialize(self, auto_switch: bool = True) -> None:
"""
Initialize the context manager.
Parameters
----------
auto_switch : bool
If True, automatically switch to the active project.
Set to False if you want to sync projects before switching.
"""
await self.registry.initialize()
# Load the active project (only if auto_switch is enabled)
if auto_switch:
active_project = await self.registry.get_active_project()
if active_project:
await self.switch_project(active_project.id)
# Start background cleanup task
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
[docs]
async def shutdown(self) -> None:
"""Shutdown the context manager and cleanup resources."""
# Cancel cleanup task
if self._cleanup_task:
self._cleanup_task.cancel()
try:
await self._cleanup_task
except asyncio.CancelledError:
pass
# Disconnect all clients
for context in self.contexts.values():
if context.kanban_client and context.is_connected:
try:
await context.kanban_client.disconnect()
except Exception as e:
logger.error(f"Error disconnecting client: {e}")
[docs]
async def switch_project(self, project_id: str) -> bool:
"""Switch to a different project.
Parameters
----------
project_id : str
Target project ID.
Returns
-------
bool
True if successful.
"""
async with self.lock:
# Get project config
project = await self.registry.get_project(project_id)
if not project:
logger.error(f"Project {project_id} not found")
conversation_logger.log_pm_decision(
decision=f"Failed to switch to project {project_id}",
rationale="Project not found in registry",
confidence_score=1.0,
decision_factors={"project_id": project_id, "error": "not_found"},
)
return False
# Save current project state if exists
previous_project_id = self.active_project_id
previous_project = None
if self.active_project_id and self.active_project_id in self.contexts:
previous_project = await self.registry.get_project(
self.active_project_id
)
await self._save_project_state(self.active_project_id)
# Log the project switch decision
conversation_logger.log_pm_decision(
decision=(
f"Switching project from "
f"'{previous_project.name if previous_project else 'None'}' "
f"to '{project.name}'"
),
rationale="User requested project switch",
confidence_score=1.0,
decision_factors={
"previous_project_id": previous_project_id,
"previous_project_name": (
previous_project.name if previous_project else None
),
"new_project_id": project_id,
"new_project_name": project.name,
"provider": project.provider,
"tags": project.tags,
},
)
# Load or create context
await self._get_or_create_context(project)
# Update active project
self.active_project_id = project_id
self.active_project_name = project.name
await self.registry.set_active_project(project_id)
# Update global context's project_id if it exists
if self._global_context is not None:
self._global_context.project_id = project_id
logger.debug(
f"Updated global context project_id to {project_id} "
f"for project '{project.name}'"
)
# Move to end for LRU
self.contexts.move_to_end(project_id)
# Cleanup old contexts if needed
await self._cleanup_old_contexts()
# Log successful switch
conversation_logger.log_kanban_interaction(
action="project_switched",
direction="internal",
data={
"project_id": project_id,
"project_name": project.name,
"provider": project.provider,
"cached_contexts": len(self.contexts),
"active_project": project_id,
"total_projects": len(await self.registry.list_projects()),
},
)
logger.info(f"Switched to project: {project.name} ({project_id})")
return True
[docs]
async def get_kanban_client(self) -> Optional[KanbanInterface]:
"""Get the kanban client for the active project.
Returns
-------
Optional[KanbanInterface]
Kanban client or None if no active project.
"""
if not self.active_project_id:
return None
context = self.contexts.get(self.active_project_id)
if not context:
return None
# Update last accessed
context.last_accessed = datetime.now(timezone.utc)
return context.kanban_client
[docs]
async def get_active_context(self) -> Optional[Context]:
"""Get the context for the active project."""
if not self.active_project_id:
return None
context = self.contexts.get(self.active_project_id)
return context.context if context else None
[docs]
async def get_active_events(self) -> Optional[Events]:
"""Get the events for the active project."""
if not self.active_project_id:
return None
context = self.contexts.get(self.active_project_id)
return context.events if context else None
[docs]
async def get_active_project_state(self) -> Optional[ProjectState]:
"""Get the project state for the active project."""
if not self.active_project_id:
return None
context = self.contexts.get(self.active_project_id)
return context.project_state if context else None
[docs]
async def get_active_assignment_persistence(
self,
) -> Optional[AssignmentPersistence]:
"""Get the assignment persistence for the active project."""
if not self.active_project_id:
return None
context = self.contexts.get(self.active_project_id)
return context.assignment_persistence if context else None
async def _get_or_create_context(self, project: ProjectConfig) -> ProjectContext:
"""Get existing context or create new one."""
if project.id in self.contexts:
conversation_logger.log_kanban_interaction(
action="context_reused",
direction="internal",
data={
"project_id": project.id,
"project_name": project.name,
"provider": project.provider,
},
)
return self.contexts[project.id]
# Log context creation
conversation_logger.log_pm_thinking(
thought=(
f"Creating new context for project '{project.name}' - "
f"No existing context found for project {project.id}"
),
context={"project_id": project.id, "provider": project.provider},
)
# Create new context
context = ProjectContext(project.id)
# Create kanban client
provider_config = self._build_provider_config(project)
context.kanban_client = KanbanFactory.create(project.provider, provider_config)
# Connect client
try:
await context.kanban_client.connect()
context.is_connected = True
# Log successful connection
conversation_logger.log_kanban_interaction(
action="provider_connected",
direction="to_kanban",
data={
"project_id": project.id,
"project_name": project.name,
"provider": project.provider,
"connection_status": "success",
},
)
except Exception as e:
logger.error(f"Failed to connect kanban client: {e}")
# Log connection failure
conversation_logger.log_kanban_interaction(
action="provider_connection_failed",
direction="to_kanban",
data={
"project_id": project.id,
"project_name": project.name,
"provider": project.provider,
"error": str(e),
},
)
raise
# Create project-specific services
context.context = Context(
events=None, persistence=self.persistence, project_id=project.id
)
context.events = Events(store_history=True, persistence=self.persistence)
# Create project-specific assignment persistence directory
assignments_dir = Path("data/assignments") / f"project_{project.id}"
context.assignment_persistence = AssignmentPersistence(
storage_dir=assignments_dir
)
# Load project state
context.project_state = await self._load_project_state(project.id)
# Store context
self.contexts[project.id] = context
# Log context creation complete
conversation_logger.log_kanban_interaction(
action="context_created",
direction="internal",
data={
"project_id": project.id,
"project_name": project.name,
"provider": project.provider,
"services_initialized": ["context", "events", "assignment_persistence"],
},
)
return context
def _build_provider_config(self, project: ProjectConfig) -> Dict[str, Any]:
"""Build provider configuration merging global and project configs."""
config = {}
# Get global provider credentials
if project.provider == "planka":
config.update(
{
"base_url": self.config.kanban.planka_base_url,
"email": self.config.kanban.planka_email,
"password": self.config.kanban.planka_password,
}
)
elif project.provider == "github":
config.update({"token": self.config.kanban.github_token})
elif project.provider == "linear":
config.update({"api_key": self.config.kanban.linear_api_key})
elif project.provider == "sqlite":
config.update(
{
"db_path": (
self.config.kanban.sqlite_db_path or "./data/kanban.db"
),
"project_name": project.name,
"attachments_dir": (
self.config.kanban.sqlite_attachments_dir
or "./data/attachments"
),
}
)
# Merge with project-specific config
config.update(project.provider_config)
return config
async def _save_project_state(self, project_id: str) -> None:
"""Save project state to persistence."""
context = self.contexts.get(project_id)
if not context or not context.project_state:
return
# Save to persistence
await self.persistence.store(
"project_states",
project_id,
{
"state": context.project_state.__dict__,
"saved_at": datetime.now(timezone.utc).isoformat(),
},
)
async def _load_project_state(self, project_id: str) -> Optional[ProjectState]:
"""Load project state from persistence."""
data = await self.persistence.retrieve("project_states", project_id)
if data and "state" in data:
# Convert back to ProjectState
# This is simplified - actual implementation would reconstruct the object
return None # TODO: Implement state reconstruction
return None
async def _cleanup_old_contexts(self) -> None:
"""Cleanup old contexts if exceeding cache limit."""
if len(self.contexts) <= self.MAX_CACHED_PROJECTS:
return
# Remove oldest contexts (excluding active)
to_remove = []
for project_id in self.contexts:
if project_id != self.active_project_id:
to_remove.append(project_id)
if len(self.contexts) - len(to_remove) <= self.MAX_CACHED_PROJECTS:
break
for project_id in to_remove:
await self._remove_context(project_id)
async def _remove_context(self, project_id: str) -> None:
"""Remove and cleanup a project context."""
context = self.contexts.get(project_id)
if not context:
return
# Save state
await self._save_project_state(project_id)
# Disconnect client
if context.kanban_client and context.is_connected:
try:
await context.kanban_client.disconnect()
except Exception as e:
logger.error(f"Error disconnecting client: {e}")
# Remove from cache
del self.contexts[project_id]
logger.info(f"Removed context for project {project_id}")
async def _cleanup_loop(self) -> None:
"""Background task to cleanup idle projects."""
while True:
try:
await asyncio.sleep(300) # Check every 5 minutes
async with self.lock:
now = datetime.now(timezone.utc)
idle_threshold = now - timedelta(minutes=self.IDLE_TIMEOUT_MINUTES)
to_remove = []
for project_id, context in self.contexts.items():
if (
project_id != self.active_project_id
and context.last_accessed < idle_threshold
):
to_remove.append(project_id)
for project_id in to_remove:
await self._remove_context(project_id)
logger.info(f"Cleaned up idle project: {project_id}")
except asyncio.CancelledError:
raise
except Exception as e:
logger.error(f"Error in cleanup loop: {e}")