"""Agent Management Tools for Marcus MCP.
This module contains tools for managing AI agents in the Marcus system:
- register_agent: Register a new agent with skills and role
- get_agent_status: Get current status and tasks for an agent
- list_registered_agents: List all registered agents
"""
from typing import Any, Dict, List
from src.core.models import WorkerStatus
from src.logging.agent_events import log_agent_event
from src.logging.conversation_logger import conversation_logger, log_thinking
[docs]
async def register_agent(
agent_id: str,
name: str,
role: str,
skills: List[str],
state: Any,
project_id: str = "",
) -> Dict[str, Any]:
"""
Register a new agent with the Marcus system.
Parameters
----------
agent_id : str
Unique identifier for the agent
name : str
Display name for the agent
role : str
Agent's role (e.g., 'Backend Developer')
skills : List[str]
List of agent's technical skills
state : Any
Marcus server state instance
project_id : str
ID of the project this agent is working on. Used to scope
request_next_task results so agents from concurrent experiments
cannot steal tasks across project boundaries (GH-388).
Returns
-------
Dict[str, Any]
Dict with success status, registration details, and project_id.
"""
# Log incoming registration request
conversation_logger.log_worker_message(
agent_id,
"to_pm",
f"Registering as {role} with skills: {skills}",
{"name": name, "role": role, "skills": skills, "project_id": project_id},
)
try:
# Log Marcus thinking
log_thinking(
"marcus",
f"New agent registration request from {name}",
{"agent_id": agent_id, "role": role, "skills": skills},
)
# Create worker status with correct field names
# Validate project_id before any state mutation (P2: keeps
# registration atomic — no ghost entries on rejected calls).
# Agents are ephemeral — one project, then terminated (GH-389).
if not project_id:
return {
"success": False,
"error": (
"project_id is required. Agents are ephemeral and must "
"register with the project they are working on."
),
}
status = WorkerStatus(
worker_id=agent_id,
name=name,
role=role,
email=None,
current_tasks=[],
completed_tasks_count=0,
capacity=40, # Default 40 hours/week
skills=skills or [],
availability={
"monday": True,
"tuesday": True,
"wednesday": True,
"thursday": True,
"friday": True,
"saturday": False,
"sunday": False,
},
performance_score=1.0,
)
state.agent_status[agent_id] = status
# Store project scope (GH-388: prevents cross-project task theft).
if not hasattr(state, "agent_project_map"):
state.agent_project_map = {}
state.agent_project_map[agent_id] = project_id
# Log registration event immediately
state.log_event(
"worker_registration",
{
"worker_id": agent_id,
"name": name,
"role": role,
"skills": skills,
"source": "mcp_client",
"target": "marcus",
},
)
# Record in active experiment if one is running
from src.experiments.live_experiment_monitor import get_active_monitor
monitor = get_active_monitor()
if monitor and monitor.is_running:
monitor.record_agent_registration(
agent_id=agent_id, name=name, role=role, skills=skills
)
# Log conversation event for visualization
log_agent_event(
"worker_registration",
{"worker_id": agent_id, "name": name, "role": role, "skills": skills},
)
# Log decision
conversation_logger.log_pm_decision(
decision=f"Register agent {name}",
rationale="Agent skills match project requirements",
confidence_score=0.95,
decision_factors={
"skills_match": True,
"capacity_available": True,
"role_needed": True,
},
)
# Log response
conversation_logger.log_worker_message(
agent_id,
"from_pm",
f"Registration successful. Welcome {name}!",
{"status": "registered"},
)
# Emit agent_registered telemetry (Marcus #416, Stage 5A of
# #9). Ships role + skills (user-controlled labels per
# docs/telemetry.md disclosure note) and agent_model. Does
# NOT ship agent name or agent_id (those can identify a
# human). Helper swallows its own errors.
try:
from src.telemetry.events import fire_agent_registered
fire_agent_registered(role=role, skills=skills or [])
except Exception: # noqa: BLE001
pass
return {
"success": True,
"message": f"Agent {name} registered successfully",
"agent_id": agent_id,
"project_id": project_id,
}
except Exception as e:
conversation_logger.log_worker_message(
agent_id, "from_pm", f"Registration failed: {str(e)}", {"error": str(e)}
)
return {"success": False, "error": str(e)}
[docs]
async def get_agent_status(agent_id: str, state: Any) -> Dict[str, Any]:
"""
Get status and current assignment for an agent.
Parameters
----------
agent_id : str
The agent's unique identifier
state : Any
Marcus server state instance
Returns
-------
Dict[str, Any]
Dict with agent status, current tasks, and assignment details
"""
try:
agent = state.agent_status.get(agent_id)
if agent:
result = {
"success": True,
"agent": {
"id": agent.worker_id,
"name": agent.name,
"role": agent.role,
"skills": agent.skills,
"status": (
"working" if len(agent.current_tasks) > 0 else "available"
),
"current_tasks": [t.id for t in agent.current_tasks],
"total_completed": agent.completed_tasks_count,
"performance_score": agent.performance_score,
},
}
# Add current assignment details if any
if len(agent.current_tasks) > 0 and agent.worker_id in state.agent_tasks:
assignment = state.agent_tasks[agent.worker_id]
result["current_assignment"] = {
"task_id": assignment.task_id,
"task_name": assignment.task_name,
"assigned_at": assignment.assigned_at.isoformat(),
"instructions": assignment.instructions,
}
return result
else:
return {"success": False, "message": f"Agent {agent_id} not found"}
except Exception as e:
return {"success": False, "error": str(e)}
[docs]
async def list_registered_agents(state: Any) -> Dict[str, Any]:
"""
List all registered agents and their current status.
Parameters
----------
state : Any
Marcus server state instance
Returns
-------
Dict[str, Any]
Dict with list of all agents and their details
"""
try:
agents = []
for agent in list(state.agent_status.values()):
agents.append(
{
"id": agent.worker_id,
"name": agent.name,
"role": agent.role,
"status": (
"working" if len(agent.current_tasks) > 0 else "available"
),
"skills": agent.skills,
"current_tasks": [t.id for t in agent.current_tasks],
"total_completed": agent.completed_tasks_count,
}
)
return {"success": True, "agents": agents, "total": len(agents)}
except Exception as e:
return {"success": False, "error": str(e)}