src.worker package#
Worker package for Marcus.
This package provides the MCP client for testing Marcus workflows and validating Marcus functionality. Inspector supports both stdio (isolated testing) and HTTP (integration testing) connections.
The Inspector client enables: - Testing Marcus MCP tool functionality - Validating task assignment workflows - Debugging Marcus coordination logic - Simulating agent behavior for development
Classes#
- Inspector
Unified MCP testing client supporting stdio and HTTP connections
Examples
Test Marcus with isolated stdio connection:
>>> from src.worker import Inspector
>>> client = Inspector(connection_type='stdio')
>>> async with client.connect() as session:
... # Test Marcus functionality
... await client.register_agent("test-1", "Test Agent", "Developer", ["python"])
... task = await client.request_next_task("test-1")
Test Marcus with HTTP connection:
>>> from src.worker import Inspector
>>> client = Inspector(connection_type='http')
>>> async with client.connect(url="http://localhost:4298/mcp") as session:
... # Test against running Marcus instance
... await client.register_agent("test-1", "Test Agent", "Developer", [])
Notes
Inspector is designed for TESTING Marcus, not for production AI agents. Use stdio for isolated tests, HTTP for integration testing against a running Marcus server.
- class src.worker.Inspector[source]#
Bases:
objectUnified Testing Client for Marcus MCP workflows.
This client provides a testing interface for validating Marcus functionality via either stdio (isolated instance) or HTTP (shared server). It is designed for development, testing, and debugging Marcus workflows, NOT for production AI agent usage.
Inspector supports the full MCP protocol for simulating agent behavior and validating that Marcus task assignment, progress tracking, and coordination systems work correctly.
- session#
The active MCP client session, None if not connected
- Type:
Optional[ClientSession]
- connection_type#
The type of connection to use
- Type:
Literal[‘stdio’, ‘http’]
- connect(url: str | None = None) AsyncIterator[ClientSession][source]#
Connect to Marcus using the configured connection type
- Parameters:
- Return type:
AsyncIterator[mcp.ClientSession]
- register_agent(agent_id, name, role, skills) Dict[str, Any][source]#
Simulate agent registration for testing
- report_task_progress(agent_id, task_id, status, progress, message) Dict[str, Any][source]#
Test progress reporting workflow
- report_blocker(agent_id, task_id, description, severity) Dict[str, Any][source]#
Test blocker reporting workflow
Examples
Test task assignment with stdio:
>>> client = Inspector(connection_type='stdio') >>> async with client.connect() as session: ... # Simulate agent workflow for testing ... result = await client.register_agent("test-1", "Test", "Developer", []) ... task = await client.request_next_task("test-1") ... assert task is not None
Test with HTTP connection:
>>> client = Inspector(connection_type='http') >>> async with client.connect(url="http://localhost:4298/mcp") as session: ... # Test against running Marcus instance ... result = await client.register_agent("test-1", "Test", "Developer", []) ... assert result.get('success')
Notes
Inspector is for TESTING, not production agent usage
stdio: Best for isolated unit/integration tests
http: Best for testing against running Marcus instances
All methods simulate real agent behavior for validation
- __init__(connection_type='stdio')[source]#
Initialize the Inspector testing client.
- Parameters:
connection_type (
Literal['stdio','http']) – The type of connection to use: - ‘stdio’: Spawn isolated Marcus instance (for testing) - ‘http’: Connect to running Marcus HTTP server (for integration)- Return type:
None
- connect(url=None, timeout=30, sse_read_timeout=300)[source]#
Connect to Marcus MCP server using the configured connection type.
This method automatically uses the appropriate transport (stdio or HTTP) based on the connection_type specified during initialization.
- Parameters:
url (
Optional[str]) – The HTTP URL of the Marcus MCP server endpoint (required for http mode) Default: “http://localhost:4298/mcp” Ignored for stdio modetimeout (
float) – HTTP timeout for regular operations in seconds (http mode only) Default: 30sse_read_timeout (
float) – Timeout for SSE read operations in seconds (http mode only) Default: 300
- Yields:
ClientSession – An active MCP client session for communicating with Marcus
- Raises:
RuntimeError – If connection fails
ValueError – If http mode is selected but no URL is provided
- Return type:
AsyncIterator[mcp.ClientSession]
Examples
Stdio connection:
>>> client = Inspector(connection_type='stdio') >>> async with client.connect() as session: ... await client.register_agent("test-1", "Test", "Developer", [])
HTTP connection:
>>> client = Inspector(connection_type='http') >>> async with client.connect(url="http://localhost:4298/mcp") as session: ... await client.register_agent("test-1", "Test", "Developer", [])
Notes
stdio: Spawns isolated Marcus instance for testing
http: Connects to running Marcus HTTP server using streamablehttp_client
Session cleanup is guaranteed even if exceptions occur
- async ensure_connected()[source]#
Ensure we have an active connection, attempt reconnect if needed.
- Return type:
- async register_agent(agent_id, name, role, skills)[source]#
Simulate agent registration for testing Marcus workflows.
This method tests the agent registration workflow by simulating an agent registering with Marcus. Use this to validate that Marcus correctly handles agent registration, stores agent information, and returns proper responses.
- Parameters:
- Returns:
Registration response containing: - success: bool indicating registration success - agent_id: str confirming the registered agent ID - message: str with registration status details
- Return type:
- Raises:
RuntimeError – If no active connection exists to Marcus server
Examples
Test agent registration:
>>> client = Inspector(connection_type='stdio') >>> async with client.connect() as session: ... result = await client.register_agent( ... "test-agent-1", ... "Test Agent", ... "Developer", ... ["python", "testing"] ... ) ... assert result.get('success') is True
Notes
This simulates agent behavior for testing Marcus
Use to validate Marcus registration workflow
Not for production AI agent usage
- async get_project_status()[source]#
Test project status retrieval.
This method tests Marcus’s status reporting by fetching project metrics. Use this to validate that Marcus correctly tracks project state, calculates metrics, and provides comprehensive status information.
- Returns:
Project status response containing: - project_info: Dict with project details - task_metrics: Dict with task statistics - agent_metrics: Dict with agent statistics - performance_metrics: Dict with performance data
- Return type:
- Raises:
RuntimeError – If no active connection exists to Marcus server
Examples
Test status retrieval:
>>> async with client.connect() as session: ... status = await client.get_project_status() ... assert 'task_metrics' in status ... assert 'agent_metrics' in status
Notes
Tests Marcus status reporting
Validates metric calculations
Not for production AI agent usage
- async get_agent_status(agent_id)[source]#
Test agent status retrieval.
This method tests Marcus’s agent tracking by fetching agent-specific status. Use this to validate that Marcus correctly tracks agent state, assignments, and performance.
- Parameters:
agent_id (
str) – Unique identifier of the agent to query- Returns:
Agent status response containing agent details and metrics
- Return type:
- Raises:
RuntimeError – If no active connection exists to Marcus server
Examples
Test agent status:
>>> async with client.connect() as session: ... await client.register_agent("test-1", "Test", "Dev", []) ... status = await client.get_agent_status("test-1") ... assert status.get('agent_id') == "test-1"
Notes
Tests Marcus agent tracking
Validates agent state management
Not for production AI agent usage
- async src.worker.create_inspector(connection_type='stdio')[source]#
Create and return a new Inspector testing client.
- Parameters:
connection_type (
Literal['stdio','http']) – The type of connection to use: - ‘stdio’: Spawn isolated Marcus instance (recommended for testing) - ‘http’: Connect to running Marcus HTTP server (recommended for integration)- Returns:
A new testing client instance ready for connection
- Return type:
Examples
>>> client = await create_inspector('stdio') >>> async with client.connect() as session: ... # Test Marcus workflows ... pass
>>> client = await create_inspector('http') >>> async with client.connect(url="http://localhost:4298/mcp") as session: ... # Test against running instance ... pass