Source code for src.core.workspace

"""
Workspace isolation and security management for autonomous agents.

This module ensures that client agents only operate within their designated
project directories and cannot access the Marcus installation or system files.

The WorkspaceManager provides:
- Automatic detection of Marcus installation directory
- Configuration-based workspace assignment
- Path validation and security enforcement
- Audit logging of security violations

Examples
--------
>>> manager = WorkspaceManager()
>>> workspace = manager.assign_agent_workspace("agent-001", "/home/user/project")
>>> manager.validate_path("/home/user/project/src/file.py")  # OK
>>> manager.validate_path("/usr/lib/python3/site-packages")  # Raises SecurityError
"""

import json
import os
import sys
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Set


[docs] class WorkspaceSecurityError(Exception): """ Raised when a security violation is detected. This exception indicates an agent attempted to access a forbidden path. Examples -------- >>> raise WorkspaceSecurityError("Access denied to system directory") """ pass
[docs] @dataclass class WorkspaceConfig: """ Configuration for a single agent workspace. Parameters ---------- workspace_id : str Unique identifier for the workspace path : str Filesystem path to the workspace directory agent_id : Optional[str], default=None ID of the agent assigned to this workspace is_readonly : bool, default=False Whether the workspace is read-only Notes ----- Paths are automatically expanded and converted to absolute paths during initialization. """ workspace_id: str path: str agent_id: Optional[str] = None is_readonly: bool = False
[docs] def __post_init__(self) -> None: """ Expand and validate the workspace path. Converts relative paths to absolute and expands user home directory. """ self.path = os.path.abspath(os.path.expanduser(self.path))
[docs] @dataclass class ProjectWorkspaces: """ Configuration for all project workspaces. Parameters ---------- main_workspace : str Primary project workspace path agent_workspaces : Dict[str, str], optional Mapping of agent IDs to their dedicated workspace paths Examples -------- >>> config = ProjectWorkspaces( ... main_workspace="/home/user/project", ... agent_workspaces={"agent-001": "/home/user/project/agent1"} ... ) """ main_workspace: str agent_workspaces: Dict[str, str] = field(default_factory=dict)
[docs] def __post_init__(self) -> None: """Expand all workspace paths to absolute paths.""" self.main_workspace = os.path.abspath(os.path.expanduser(self.main_workspace)) self.agent_workspaces = { agent_id: os.path.abspath(os.path.expanduser(path)) for agent_id, path in self.agent_workspaces.items() }
[docs] class WorkspaceManager: """ Manages workspace isolation and security boundaries for Marcus. This class enforces security by maintaining a list of forbidden paths (including the Marcus installation) and validating all agent workspace assignments against these restrictions. Attributes ---------- marcus_root : str Automatically detected Marcus installation directory forbidden_paths : Set[str] Set of paths that agents are not allowed to access agent_workspaces : Dict[str, WorkspaceConfig] Mapping of agent IDs to their workspace configurations project_config : Optional[ProjectWorkspaces] Project-level workspace configuration Examples -------- >>> manager = WorkspaceManager() >>> workspace = manager.assign_agent_workspace("agent-001") >>> print(workspace.path) # /home/user/project >>> manager.validate_path("/etc/passwd") # Raises WorkspaceSecurityError Notes ----- The Marcus installation directory is automatically detected and added to the forbidden paths list to prevent agents from modifying Marcus itself. """
[docs] def __init__(self, config_path: Optional[str] = None) -> None: """ Initialize WorkspaceManager with automatic Marcus detection. Parameters ---------- config_path : Optional[str], default=None Path to configuration file. If not provided, searches default locations. """ # Auto-detect Marcus installation directory self.marcus_root: str = self._detect_marcus_root() # Initialize forbidden paths - Marcus directory is always forbidden self.forbidden_paths: Set[str] = { self.marcus_root, } # Add common Python installation paths to forbidden list self._add_system_paths_to_forbidden() # Workspace assignments: agent_id -> WorkspaceConfig self.agent_workspaces: Dict[str, WorkspaceConfig] = {} # Project workspace configuration self.project_config: Optional[ProjectWorkspaces] = None # Load configuration if config_path: self.load_config(config_path) else: # Try to load from default locations self._load_default_config()
def _detect_marcus_root(self) -> str: """ Automatically detect Marcus installation directory. Returns ------- str Absolute path to Marcus root directory Notes ----- This works by finding the root of the Marcus package structure relative to this file's location. """ # Get the directory of this file current_file = os.path.abspath(__file__) # Navigate up to find the Marcus root # We're in src/core/workspace_manager.py, so go up 2 levels marcus_root = os.path.dirname(os.path.dirname(os.path.dirname(current_file))) # Validate that we found the correct directory expected_markers = ["src", "scripts", "config_marcus.json"] for marker in expected_markers: if not os.path.exists(os.path.join(marcus_root, marker)): # If marker is missing, we might be in a different structure # Try alternative detection methods break return marcus_root def _add_system_paths_to_forbidden(self) -> None: """ Add system Python paths to forbidden list. This prevents agents from modifying Python installations or system libraries. """ forbidden_prefixes = [ "/usr/lib/python", "/usr/local/lib/python", "/opt/homebrew/lib/python", "/System/Library", os.path.dirname(os.__file__), # Python installation ] for prefix in forbidden_prefixes: if os.path.exists(prefix): self.forbidden_paths.add(os.path.abspath(prefix)) def _load_default_config(self) -> None: """ Try to load configuration from default locations. Searches in order: 1. XDG config directory (~/.config/marcus/config.json) 2. Local config in Marcus root 3. Path from MARCUS_CONFIG environment variable """ config_locations = [ # XDG config directory (preferred) os.path.expanduser("~/.config/marcus/config.json"), # Local config in Marcus root os.path.join(self.marcus_root, "config_marcus.json"), # Environment variable os.environ.get("MARCUS_CONFIG", ""), ] for location in config_locations: if location and os.path.exists(location): try: self.load_config(location) print(f"Loaded workspace config from: {location}", file=sys.stderr) return except Exception as e: print( f"Failed to load config from {location}: {e}", file=sys.stderr ) continue
[docs] def load_config(self, config_path: str) -> None: """ Load workspace configuration from file. Parameters ---------- config_path : str Path to JSON configuration file Raises ------ FileNotFoundError If configuration file doesn't exist WorkspaceSecurityError If configured workspaces overlap with forbidden paths Notes ----- Configuration format:: { "project": { "workspaces": { "main": "/path/to/project", "agents": { "agent-001": "/path/to/agent1/workspace" } } }, "security": { "additional_forbidden_paths": ["/sensitive/data"] } } """ config_path = os.path.expanduser(config_path) if not os.path.exists(config_path): raise FileNotFoundError(f"Configuration file not found: {config_path}") with open(config_path, "r") as f: config = json.load(f) # Load project workspace configuration if "project" in config and "workspaces" in config["project"]: workspaces = config["project"]["workspaces"] self.project_config = ProjectWorkspaces( main_workspace=workspaces.get("main", ""), agent_workspaces=workspaces.get("agents", {}), ) # Validate that project workspaces don't overlap with forbidden paths self._validate_project_workspaces() # Load additional forbidden paths if specified if "security" in config and "additional_forbidden_paths" in config["security"]: for path in config["security"]["additional_forbidden_paths"]: self.add_forbidden_path(path)
def _validate_project_workspaces(self) -> None: """ Ensure project workspaces don't overlap with forbidden paths. Raises ------ WorkspaceSecurityError If any workspace overlaps with a forbidden path """ if not self.project_config: return all_workspaces = [self.project_config.main_workspace] all_workspaces.extend(self.project_config.agent_workspaces.values()) for workspace in all_workspaces: for forbidden in self.forbidden_paths: if workspace.startswith(forbidden) or forbidden.startswith(workspace): raise WorkspaceSecurityError( f"Workspace {workspace} overlaps with " f"forbidden path {forbidden}" )
[docs] def assign_agent_workspace( self, agent_id: str, workspace_path: Optional[str] = None ) -> WorkspaceConfig: """ Assign a workspace to an agent. Parameters ---------- agent_id : str Unique identifier for the agent workspace_path : Optional[str], default=None Explicit workspace path. If not provided, uses project configuration. Returns ------- WorkspaceConfig The assigned workspace configuration Raises ------ ValueError If no workspace is available for the agent WorkspaceSecurityError If the workspace path is forbidden Examples -------- >>> workspace = manager.assign_agent_workspace( ... "agent-001", "/home/user/project" ... ) >>> print(workspace.workspace_id) # "agent-001_workspace" Notes ----- If workspace_path is not provided, the method tries: 1. Pre-configured workspace for the specific agent 2. The main project workspace as fallback """ if workspace_path: # Explicit workspace assignment workspace = WorkspaceConfig( workspace_id=f"{agent_id}_workspace", path=workspace_path, agent_id=agent_id, ) elif self.project_config and agent_id in self.project_config.agent_workspaces: # Use pre-configured workspace workspace = WorkspaceConfig( workspace_id=f"{agent_id}_workspace", path=self.project_config.agent_workspaces[agent_id], agent_id=agent_id, ) elif self.project_config: # Fallback to main workspace workspace = WorkspaceConfig( workspace_id=f"{agent_id}_workspace", path=self.project_config.main_workspace, agent_id=agent_id, ) else: raise ValueError(f"No workspace available for agent {agent_id}") # Validate the workspace self.validate_path(workspace.path) # Store the assignment self.agent_workspaces[agent_id] = workspace return workspace
[docs] def get_agent_workspace(self, agent_id: str) -> Optional[WorkspaceConfig]: """ Get the assigned workspace for an agent. Parameters ---------- agent_id : str The agent's unique identifier Returns ------- Optional[WorkspaceConfig] The workspace configuration if assigned, None otherwise """ return self.agent_workspaces.get(agent_id)
[docs] def validate_path(self, path: str) -> str: """ Validate that a path is allowed (not in forbidden paths). Parameters ---------- path : str Path to validate (can be relative or contain ~) Returns ------- str The absolute path if valid Raises ------ WorkspaceSecurityError If the path is within a forbidden directory Examples -------- >>> manager.validate_path("/home/user/project/file.py") # OK >>> manager.validate_path("~/project/file.py") # Expands and validates >>> manager.validate_path("/usr/lib/python3/os.py") # Raises error """ absolute_path = os.path.abspath(os.path.expanduser(path)) # Check against all forbidden paths for forbidden in self.forbidden_paths: if absolute_path.startswith(forbidden): raise WorkspaceSecurityError( f"Access denied: {path} is within forbidden path {forbidden}" ) return absolute_path
[docs] def is_path_allowed(self, path: str) -> bool: """ Check if a path is allowed without raising an exception. Parameters ---------- path : str Path to check Returns ------- bool True if path is allowed, False if forbidden Examples -------- >>> if manager.is_path_allowed("/home/user/file.py"): ... print("Path is safe to access") """ try: self.validate_path(path) return True except WorkspaceSecurityError: return False
[docs] def add_forbidden_path(self, path: str) -> None: """ Add a path to the forbidden list. Parameters ---------- path : str Path to mark as forbidden Examples -------- >>> manager.add_forbidden_path("/sensitive/data") >>> manager.is_path_allowed("/sensitive/data/file.txt") # False """ absolute_path = os.path.abspath(os.path.expanduser(path)) self.forbidden_paths.add(absolute_path)
[docs] def get_forbidden_paths(self) -> List[str]: """ Get list of all forbidden paths. Returns ------- List[str] Sorted list of forbidden paths """ return sorted(list(self.forbidden_paths))
[docs] def get_task_assignment_data(self, agent_id: str) -> Dict[str, Any]: """ Get workspace data to include in task assignments. This data should be sent to agents so they know where to work and what paths to avoid. Parameters ---------- agent_id : str The agent's identifier Returns ------- Dict[str, Any] Dictionary containing: - workspace_path: Where the agent should work - workspace_id: Unique workspace identifier - forbidden_paths: List of paths to avoid - is_readonly: Whether the workspace is read-only Examples -------- >>> data = manager.get_task_assignment_data("agent-001") >>> print(data['workspace_path']) # /home/user/project >>> print(len(data['forbidden_paths'])) # Number of forbidden paths """ workspace = self.get_agent_workspace(agent_id) if not workspace: workspace = self.assign_agent_workspace(agent_id) return { "workspace_path": workspace.path, "workspace_id": workspace.workspace_id, "forbidden_paths": self.get_forbidden_paths(), "is_readonly": workspace.is_readonly, }
[docs] def log_security_violation( self, agent_id: str, attempted_path: str, operation: str ) -> None: """ Log a security violation attempt. Parameters ---------- agent_id : str ID of the agent that attempted the violation attempted_path : str Path the agent tried to access operation : str Operation attempted (e.g., "read", "write", "execute") Examples -------- >>> manager.log_security_violation( ... "agent-001", ... "/etc/passwd", ... "read" ... ) Notes ----- Currently logs to stderr. In production, this would write to a security audit log file. """ violation = { "timestamp": datetime.now(timezone.utc).isoformat(), "agent_id": agent_id, "attempted_path": attempted_path, "operation": operation, "marcus_root": self.marcus_root, } # In production, this would log to a security audit file # For now, print to stderr print(f"SECURITY VIOLATION: {json.dumps(violation)}", file=sys.stderr)