"""Configuration management for Marcus.
This module provides the Settings class which handles loading, validation,
and management of configuration settings from multiple sources including
JSON files, environment variables, and default values.
"""
import json
import os
from typing import Any, Dict, Optional
from .config_loader import get_config
[docs]
class Settings:
"""Configuration management system for Marcus.
Manages configuration loading from multiple sources with support for
hierarchical settings, environment variable overrides, and validation.
Provides specialized getters for different configuration categories.
Parameters
----------
config_path : Optional[str], default=None
Path to the configuration file. If None, uses MARCUS_CONFIG
environment variable or defaults to 'config/marcus_config.json'
Attributes
----------
config_path : str
Path to the configuration file
defaults : Dict[str, Any]
Default configuration values
config : Dict[str, Any]
Current loaded configuration
Examples
--------
>>> settings = Settings()
>>> monitoring_interval = settings.get('monitoring_interval')
>>> risk_config = settings.get_risk_thresholds()
>>> settings.set('custom_setting', 'value')
>>> settings.save()
Notes
-----
Configuration loading priority (highest to lowest):
1. Environment variables
2. Configuration file
3. Default values
"""
[docs]
def __init__(self, config_path: Optional[str] = None) -> None:
# Use the new marcus.config.json location
self.config_path = config_path or os.environ.get(
"MARCUS_CONFIG", "marcus.config.json"
)
# Default configuration
self.defaults = {
# Monitoring settings
"monitoring_interval": 900, # 15 minutes
"stall_threshold_hours": 24,
"health_check_interval": 3600, # 1 hour
# Risk thresholds
"risk_thresholds": {
"high_risk": 0.8,
"medium_risk": 0.5,
"timeline_buffer": 0.2,
},
# Escalation rules
"escalation_rules": {
"stuck_task_hours": 8,
"blocker_escalation_hours": 4,
"critical_path_delay_hours": 2,
},
# Communication settings
"communication_rules": {
"daily_plan_time": "08:00",
"progress_check_time": "14:00",
"end_of_day_summary": "18:00",
},
# Integration settings
"slack_enabled": False,
"email_enabled": False,
"kanban_comments_enabled": True,
# Team configuration
"team_config": {
"default": {
"skills": [],
"work_patterns": {
"preferred_hours": "9am-6pm",
"deep_work_blocks": "2-4 hours",
"context_switch_cost": "medium",
},
"communication_preferences": {
"notification_frequency": "medium",
"detailed_instructions": True,
"technical_depth": "medium",
},
}
},
# Performance settings
"performance": {
"max_concurrent_analyses": 5,
"cache_ttl_seconds": 300,
"batch_notification_delay": 1.0,
},
# AI settings
"ai_settings": {
"model": "claude-3-sonnet-20241022",
"temperature": 0.7,
"max_tokens": 2000,
"retry_attempts": 3,
"retry_delay": 1.0,
},
# Feature flags
"features": {
# Enable automatic task decomposition into subtasks
"enable_subtasks": True,
},
}
# Load configuration
self.config = self._load_config()
def _load_config(self) -> Dict[str, Any]:
"""Load configuration from file and environment variables.
Loads configuration in priority order: defaults, then file config,
then environment variable overrides.
Returns
-------
Dict[str, Any]
Complete merged configuration dictionary
Notes
-----
Uses the centralized ConfigLoader for consistency.
"""
# Start with defaults
config = self.defaults.copy()
# If a specific config path was provided (e.g., in tests), load from it
if (
self.config_path
and os.path.exists(self.config_path)
and self.config_path != "marcus.config.json"
):
try:
with open(self.config_path, "r") as f:
file_config = json.load(f)
config = self._deep_merge(config, file_config)
except Exception as e:
# Don't print to stdout - corrupts MCP protocol
import sys
print(f"Error loading config file: {e}", file=sys.stderr)
else:
try:
# Get configuration from centralized loader
config_loader = get_config()
# Merge relevant settings from marcus.config.json
monitoring_config = config_loader.get("monitoring", {})
if monitoring_config:
config["monitoring_interval"] = monitoring_config.get(
"interval", config["monitoring_interval"]
)
config["stall_threshold_hours"] = monitoring_config.get(
"stall_threshold_hours", config["stall_threshold_hours"]
)
config["health_check_interval"] = monitoring_config.get(
"health_check_interval", config["health_check_interval"]
)
if "risk_thresholds" in monitoring_config:
if isinstance(config["risk_thresholds"], dict) and isinstance(
monitoring_config["risk_thresholds"], dict
):
config["risk_thresholds"].update(
monitoring_config["risk_thresholds"]
)
# Get escalation settings
escalation_config = config_loader.get("escalation", {})
if escalation_config and isinstance(config["escalation_rules"], dict):
config["escalation_rules"].update(escalation_config)
# Get communication settings
comm_config = config_loader.get("communication", {})
if comm_config:
config["slack_enabled"] = comm_config.get(
"slack_enabled", config["slack_enabled"]
)
config["email_enabled"] = comm_config.get(
"email_enabled", config["email_enabled"]
)
config["kanban_comments_enabled"] = comm_config.get(
"kanban_comments_enabled", config["kanban_comments_enabled"]
)
if "rules" in comm_config and isinstance(
config["communication_rules"], dict
):
config["communication_rules"].update(comm_config["rules"])
# Get AI settings
ai_config = config_loader.get("ai", {})
if ai_config and isinstance(config["ai_settings"], dict):
config["ai_settings"]["model"] = ai_config.get(
"model", config["ai_settings"]["model"]
)
config["ai_settings"]["temperature"] = ai_config.get(
"temperature", config["ai_settings"]["temperature"]
)
config["ai_settings"]["max_tokens"] = ai_config.get(
"max_tokens", config["ai_settings"]["max_tokens"]
)
# Get feature flags
features_config = config_loader.get("features", {})
if features_config and isinstance(config["features"], dict):
config["features"]["enable_subtasks"] = features_config.get(
"enable_subtasks", config["features"]["enable_subtasks"]
)
except Exception as e:
# If config loader fails, just use defaults
# Don't print to stdout - corrupts MCP protocol
import sys
print(
f"Warning: Could not load from config loader: {e}", file=sys.stderr
)
# Environment variables still override everything
config = self._load_env_overrides(config)
return config
def _deep_merge(
self, base: Dict[str, Any], update: Dict[str, Any]
) -> Dict[str, Any]:
"""Recursively merge two dictionaries.
Performs deep merge where nested dictionaries are merged recursively
rather than replaced entirely.
Parameters
----------
base : Dict[str, Any]
Base dictionary to merge into
update : Dict[str, Any]
Dictionary with updates to merge
Returns
-------
Dict[str, Any]
New dictionary with merged values
Examples
--------
>>> base = {'a': {'x': 1}, 'b': 2}
>>> update = {'a': {'y': 3}, 'c': 4}
>>> result = settings._deep_merge(base, update)
>>> result
{'a': {'x': 1, 'y': 3}, 'b': 2, 'c': 4}
"""
result = base.copy()
for key, value in update.items():
if (
key in result
and isinstance(result[key], dict)
and isinstance(value, dict)
):
result[key] = self._deep_merge(result[key], value)
else:
result[key] = value
return result
def _load_env_overrides(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""Load configuration overrides from environment variables.
Checks for specific environment variables and updates the configuration
with their values, performing type conversion as needed.
Parameters
----------
config : Dict[str, Any]
Base configuration to update
Returns
-------
Dict[str, Any]
Configuration with environment variable overrides applied
Notes
-----
Supported environment variables:
- MARCUS_MONITORING_INTERVAL: Monitoring interval in seconds
- MARCUS_SLACK_ENABLED: Enable/disable Slack (true/false)
- SLACK_WEBHOOK_URL: Slack webhook URL
- MARCUS_EMAIL_ENABLED: Enable/disable email (true/false)
- CLAUDE_API_KEY: API key for Anthropic services. ``ANTHROPIC_API_KEY``
is intentionally NOT consulted because setting it in the shell
switches Claude Code from subscription to API billing.
"""
# Monitoring interval
if "MARCUS_MONITORING_INTERVAL" in os.environ:
config["monitoring_interval"] = int(
os.environ["MARCUS_MONITORING_INTERVAL"]
)
# Slack integration
if "MARCUS_SLACK_ENABLED" in os.environ:
config["slack_enabled"] = (
os.environ["MARCUS_SLACK_ENABLED"].lower() == "true"
)
if "SLACK_WEBHOOK_URL" in os.environ:
config["slack_webhook_url"] = os.environ["SLACK_WEBHOOK_URL"]
# Email integration
if "MARCUS_EMAIL_ENABLED" in os.environ:
config["email_enabled"] = (
os.environ["MARCUS_EMAIL_ENABLED"].lower() == "true"
)
# API keys — read CLAUDE_API_KEY (NOT ANTHROPIC_API_KEY) so we
# don't conflict with Claude Code's subscription auth.
if "CLAUDE_API_KEY" in os.environ:
config["anthropic_api_key"] = os.environ["CLAUDE_API_KEY"]
# Feature flags
if "MARCUS_ENABLE_SUBTASKS" in os.environ:
if "features" not in config:
config["features"] = {}
config["features"]["enable_subtasks"] = (
os.environ["MARCUS_ENABLE_SUBTASKS"].lower() == "true"
)
return config
[docs]
def get(self, key: str, default: Any = None) -> Any:
"""Get configuration value using dot notation key.
Supports nested key access using dot notation
(e.g., 'risk_thresholds.high_risk').
Parameters
----------
key : str
Configuration key, supports dot notation for nested access
default : Any, default=None
Default value to return if key is not found
Returns
-------
Any
Configuration value or default if not found
Examples
--------
>>> value = settings.get('monitoring_interval')
>>> nested = settings.get('risk_thresholds.high_risk', 0.8)
>>> missing = settings.get('nonexistent.key', 'fallback')
"""
keys = key.split(".")
value = self.config
for k in keys:
if isinstance(value, dict) and k in value:
value = value[k]
else:
return default
return value
[docs]
def set(self, key: str, value: Any) -> None:
"""Set configuration value using dot notation key.
Creates nested dictionary structure as needed when using dot notation.
Parameters
----------
key : str
Configuration key, supports dot notation for nested setting
value : Any
Value to set
Examples
--------
>>> settings.set('monitoring_interval', 600)
>>> settings.set('custom.nested.setting', 'value')
Notes
-----
Setting values does not automatically save to file. Call save() to persist.
"""
keys = key.split(".")
config = self.config
for k in keys[:-1]:
if k not in config:
config[k] = {}
config = config[k]
config[keys[-1]] = value
[docs]
def save(self) -> None:
"""Save current configuration to file.
Writes the current configuration to the configured file path,
creating directories as needed.
Raises
------
OSError
If file cannot be written or directory cannot be created
json.JSONEncodeError
If configuration contains non-serializable values
Examples
--------
>>> settings.set('new_setting', 'value')
>>> settings.save()
"""
if self.config_path:
os.makedirs(os.path.dirname(self.config_path), exist_ok=True)
with open(self.config_path, "w") as f:
json.dump(self.config, f, indent=2)
[docs]
def get_team_config(self, team_name: str = "default") -> Dict[str, Any]:
"""Get configuration for a specific team.
Retrieves team-specific configuration including skills, work patterns,
and communication preferences. Falls back to default team config.
Parameters
----------
team_name : str, default="default"
Name of the team to get configuration for
Returns
-------
Dict[str, Any]
Team configuration containing:
- skills: List of team skills
- work_patterns: Work scheduling preferences
- communication_preferences: Notification settings
Examples
--------
>>> backend_config = settings.get_team_config("backend_team")
>>> default_config = settings.get_team_config()
"""
team_configs = self.get("team_config", {})
if isinstance(team_configs, dict):
result = team_configs.get(team_name)
if result is not None and isinstance(result, dict):
return dict(result)
default_result = team_configs.get("default")
if default_result is not None and isinstance(default_result, dict):
return dict(default_result)
return {}
[docs]
def get_risk_thresholds(self) -> Dict[str, float]:
"""Get risk assessment thresholds for project monitoring.
Returns threshold values used for risk assessment calculations
in project monitoring and alerting systems.
Returns
-------
Dict[str, float]
Risk thresholds containing:
- high_risk: Threshold for high risk classification (0.0-1.0)
- medium_risk: Threshold for medium risk classification (0.0-1.0)
- timeline_buffer: Buffer percentage for timeline calculations
Examples
--------
>>> thresholds = settings.get_risk_thresholds()
>>> if project_risk > thresholds['high_risk']:
... send_alert()
"""
risk_thresholds = self.get("risk_thresholds", self.defaults["risk_thresholds"])
if isinstance(risk_thresholds, dict):
return risk_thresholds
# Type assertion - defaults is known to have correct structure
from typing import cast
return cast(Dict[str, float], self.defaults["risk_thresholds"])
[docs]
def get_escalation_rules(self) -> Dict[str, int]:
"""Get escalation rules for automated issue management.
Returns timing rules that determine when issues should be
escalated to higher levels of management.
Returns
-------
Dict[str, int]
Escalation rules containing:
- stuck_task_hours: Hours before escalating stuck tasks
- blocker_escalation_hours: Hours before escalating blockers
- critical_path_delay_hours: Hours before escalating critical delays
Examples
--------
>>> rules = settings.get_escalation_rules()
>>> if task_stuck_hours > rules['stuck_task_hours']:
... escalate_task(task)
"""
escalation_rules = self.get(
"escalation_rules", self.defaults["escalation_rules"]
)
if isinstance(escalation_rules, dict):
return escalation_rules
# Type assertion - defaults is known to have correct structure
from typing import cast
return cast(Dict[str, int], self.defaults["escalation_rules"])
[docs]
def get_communication_rules(self) -> Dict[str, str]:
"""Get communication timing rules for automated messaging.
Returns scheduling rules that determine when different types
of automated communications should be sent.
Returns
-------
Dict[str, str]
Communication timing rules containing:
- daily_plan_time: Time to send daily work plans (HH:MM format)
- progress_check_time: Time for progress check messages
- end_of_day_summary: Time to send daily summaries
Examples
--------
>>> rules = settings.get_communication_rules()
>>> daily_time = rules['daily_plan_time'] # "08:00"
>>> schedule_daily_plans(daily_time)
"""
communication_rules = self.get(
"communication_rules", self.defaults["communication_rules"]
)
if isinstance(communication_rules, dict):
return communication_rules
# Type assertion - defaults is known to have correct structure
from typing import cast
return cast(Dict[str, str], self.defaults["communication_rules"])
[docs]
def get_ai_settings(self) -> Dict[str, Any]:
"""Get AI model configuration settings.
Returns configuration for AI model integration including
model selection, parameters, and retry behavior.
Returns
-------
Dict[str, Any]
AI settings containing:
- model: Model identifier string
- temperature: Sampling temperature (0.0-1.0)
- max_tokens: Maximum response tokens
- retry_attempts: Number of retry attempts
- retry_delay: Delay between retries in seconds
Examples
--------
>>> ai_config = settings.get_ai_settings()
>>> client = AIClient(
... model=ai_config['model'],
... temperature=ai_config['temperature']
... )
"""
ai_settings = self.get("ai_settings", self.defaults["ai_settings"])
if isinstance(ai_settings, dict):
return ai_settings
# Type assertion - defaults is known to have correct structure
from typing import cast
return cast(Dict[str, Any], self.defaults["ai_settings"])
[docs]
def is_subtasks_enabled(self) -> bool:
"""Check if subtask functionality is enabled.
Returns whether automatic task decomposition and subtask
assignment features are enabled.
Returns
-------
bool
True if subtasks are enabled, False otherwise
Examples
--------
>>> if settings.is_subtasks_enabled():
... decompose_task(task)
>>> # Can also be set via environment:
>>> # export MARCUS_ENABLE_SUBTASKS=false
Notes
-----
Can be controlled via:
- Configuration file: features.enable_subtasks
- Environment variable: MARCUS_ENABLE_SUBTASKS=true/false
"""
result = self.get("features.enable_subtasks", True)
return bool(result)
[docs]
def validate(self) -> bool:
"""Validate current configuration for consistency and completeness.
Performs validation checks on the loaded configuration to ensure
required settings are present and values are within acceptable ranges.
Prints warnings for potential issues.
Returns
-------
bool
True if configuration is valid, False if critical issues found
Notes
-----
Validation checks include:
- Presence of required API keys
- Reasonable monitoring intervals
- At least one communication channel enabled
Warnings are printed to stdout for non-critical issues.
Examples
--------
>>> if settings.validate():
... start_marcus()
... else:
... fix_configuration()
"""
# Check for API key if AI is being used
if not os.environ.get("CLAUDE_API_KEY"):
# Don't print to stdout - corrupts MCP protocol
import sys
print("Warning: CLAUDE_API_KEY not found in environment", file=sys.stderr)
# Validate monitoring interval
monitoring_interval = self.get("monitoring_interval")
if monitoring_interval < 60:
# Don't print to stdout - corrupts MCP protocol
import sys
print(
"Warning: Monitoring interval is very short (< 60 seconds)",
file=sys.stderr,
)
# Validate communication channels
if not any(
[
self.get("slack_enabled"),
self.get("email_enabled"),
self.get("kanban_comments_enabled"),
]
):
# Don't print to stdout - corrupts MCP protocol
import sys
print("Warning: No communication channels enabled", file=sys.stderr)
return True