Source code for src.core.service_registry

"""
Marcus Service Registry.

Manages service advertisement and discovery for Marcus instances.
Allows multiple clients (Cato, Claude Desktop, etc.) to discover
and connect to running Marcus instances.
"""

import json
import os
import platform
import tempfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional

import psutil


[docs] class MarcusServiceRegistry: """ Manages Marcus service advertisement and discovery. When Marcus starts, it registers itself in a discoverable location. Clients like Cato can find running Marcus instances automatically. """
[docs] def __init__(self, instance_id: Optional[str] = None): """ Initialize service registry. Parameters ---------- instance_id : str, optional Unique identifier for this Marcus instance """ self.instance_id = instance_id or f"marcus_{os.getpid()}" self.registry_dir = self._get_registry_dir() self.registry_file = self.registry_dir / f"{self.instance_id}.json"
def _get_registry_dir(self) -> Path: """Get the directory for service registry files.""" if platform.system() == "Windows": base_dir = Path(os.environ.get("APPDATA", tempfile.gettempdir())) else: base_dir = Path.home() registry_dir = base_dir / ".marcus" / "services" registry_dir.mkdir(parents=True, exist_ok=True) return registry_dir
[docs] def register_service( self, mcp_command: str, log_dir: str, project_name: Optional[str] = None, provider: Optional[str] = None, **kwargs: Any, ) -> Dict[str, Any]: """ Register this Marcus instance as available service. Parameters ---------- mcp_command : str Command to connect to this Marcus MCP server log_dir : str Directory where Marcus writes logs project_name : str, optional Current project name provider : str, optional Kanban provider being used **kwargs Additional service metadata Returns ------- Dict[str, Any] Service registration info """ service_info = { "instance_id": self.instance_id, "pid": os.getpid(), "mcp_command": mcp_command, "log_dir": str(Path(log_dir).absolute()), "project_name": project_name, "provider": provider, "status": "running", "started_at": datetime.now(timezone.utc).isoformat(), "last_heartbeat": datetime.now(timezone.utc).isoformat(), "platform": platform.system(), "python_version": platform.python_version(), "working_directory": str(Path.cwd()), **kwargs, } # Write service file with open(self.registry_file, "w") as f: json.dump(service_info, f, indent=2) return service_info
[docs] def update_heartbeat(self, **updates: Any) -> None: """Update service heartbeat and optional fields.""" if not self.registry_file.exists(): return try: with open(self.registry_file, "r") as f: service_info = json.load(f) service_info["last_heartbeat"] = datetime.now(timezone.utc).isoformat() service_info.update(updates) with open(self.registry_file, "w") as f: json.dump(service_info, f, indent=2) except (json.JSONDecodeError, FileNotFoundError): pass
[docs] def unregister_service(self) -> None: """Remove service registration.""" if self.registry_file.exists(): self.registry_file.unlink()
[docs] @classmethod def discover_services(cls) -> List[Dict[str, Any]]: """ Discover all running Marcus services. Returns ------- List[Dict[str, Any]] List of available Marcus services """ registry = cls() services: List[Dict[str, Any]] = [] if not registry.registry_dir.exists(): return services # Read all service files for service_file in registry.registry_dir.glob("marcus_*.json"): try: with open(service_file, "r") as f: service_info = json.load(f) # Check if process is still running if cls._is_process_running(service_info.get("pid")): services.append(service_info) else: # Clean up stale service file try: service_file.unlink() except (OSError, PermissionError) as e: # Log specific file system errors but continue discovery import logging logger = logging.getLogger(__name__) logger.debug( f"Could not remove stale service file {service_file}: {e}" ) except Exception as e: # Log unexpected errors but continue discovery import logging logger = logging.getLogger(__name__) logger.warning( f"Unexpected error removing stale service file " f"{service_file}: {e}" ) except (json.JSONDecodeError, FileNotFoundError): # Clean up invalid service files try: service_file.unlink() except (OSError, PermissionError) as e: # Log specific file system errors but continue discovery import logging logger = logging.getLogger(__name__) logger.debug( f"Could not remove invalid service file {service_file}: {e}" ) except Exception as e: # Log unexpected errors but continue discovery import logging logger = logging.getLogger(__name__) logger.warning( f"Unexpected error removing service file {service_file}: {e}" ) return sorted(services, key=lambda x: x.get("started_at", ""))
[docs] @classmethod def get_preferred_service(cls) -> Optional[Dict[str, Any]]: """ Get the preferred Marcus service to connect to. Returns most recently started service, or None if none available. Returns ------- Optional[Dict[str, Any]] Preferred service info, or None """ services = cls.discover_services() return services[-1] if services else None
@staticmethod def _is_process_running(pid: int) -> bool: """Check if a process is running by PID.""" if not pid: return False try: return psutil.pid_exists(pid) # type: ignore[no-any-return] except Exception: return False
# Global registry instance _service_registry = None
[docs] def get_service_registry(instance_id: Optional[str] = None) -> MarcusServiceRegistry: """Get or create global service registry instance.""" global _service_registry if _service_registry is None: _service_registry = MarcusServiceRegistry(instance_id) return _service_registry
[docs] def register_marcus_service(**kwargs: Any) -> Dict[str, Any]: """Register Marcus service.""" registry = get_service_registry() return registry.register_service(**kwargs)
[docs] def unregister_marcus_service() -> None: """Unregister Marcus service.""" registry = get_service_registry() registry.unregister_service()