src.worker package#

Worker package for Marcus.

This package provides the MCP client for testing Marcus workflows and validating Marcus functionality. Inspector supports both stdio (isolated testing) and HTTP (integration testing) connections.

The Inspector client enables: - Testing Marcus MCP tool functionality - Validating task assignment workflows - Debugging Marcus coordination logic - Simulating agent behavior for development

Classes#

Inspector

Unified MCP testing client supporting stdio and HTTP connections

Examples

Test Marcus with isolated stdio connection:

>>> from src.worker import Inspector
>>> client = Inspector(connection_type='stdio')
>>> async with client.connect() as session:
...     # Test Marcus functionality
...     await client.register_agent("test-1", "Test Agent", "Developer", ["python"])
...     task = await client.request_next_task("test-1")

Test Marcus with HTTP connection:

>>> from src.worker import Inspector
>>> client = Inspector(connection_type='http')
>>> async with client.connect(url="http://localhost:4298/mcp") as session:
...     # Test against running Marcus instance
...     await client.register_agent("test-1", "Test Agent", "Developer", [])

Notes

Inspector is designed for TESTING Marcus, not for production AI agents. Use stdio for isolated tests, HTTP for integration testing against a running Marcus server.

class src.worker.Inspector[source]#

Bases: object

Unified Testing Client for Marcus MCP workflows.

This client provides a testing interface for validating Marcus functionality via either stdio (isolated instance) or HTTP (shared server). It is designed for development, testing, and debugging Marcus workflows, NOT for production AI agent usage.

Inspector supports the full MCP protocol for simulating agent behavior and validating that Marcus task assignment, progress tracking, and coordination systems work correctly.

session#

The active MCP client session, None if not connected

Type:

Optional[ClientSession]

connection_type#

The type of connection to use

Type:

Literal[‘stdio’, ‘http’]

connect(url: str | None = None) AsyncIterator[ClientSession][source]#

Connect to Marcus using the configured connection type

Parameters:
Return type:

AsyncIterator[mcp.ClientSession]

register_agent(agent_id, name, role, skills) Dict[str, Any][source]#

Simulate agent registration for testing

Parameters:
Return type:

Dict[str, Any]

request_next_task(agent_id) Dict[str, Any][source]#

Test task assignment workflow

Parameters:
Return type:

T

report_task_progress(agent_id, task_id, status, progress, message) Dict[str, Any][source]#

Test progress reporting workflow

Parameters:
Return type:

T

report_blocker(agent_id, task_id, description, severity) Dict[str, Any][source]#

Test blocker reporting workflow

Parameters:
Return type:

T

get_project_status() Dict[str, Any][source]#

Test project status retrieval

Return type:

Dict[str, Any]

get_agent_status(agent_id) Dict[str, Any][source]#

Test agent status retrieval

Parameters:

agent_id (str)

Return type:

Dict[str, Any]

Examples

Test task assignment with stdio:

>>> client = Inspector(connection_type='stdio')
>>> async with client.connect() as session:
...     # Simulate agent workflow for testing
...     result = await client.register_agent("test-1", "Test", "Developer", [])
...     task = await client.request_next_task("test-1")
...     assert task is not None

Test with HTTP connection:

>>> client = Inspector(connection_type='http')
>>> async with client.connect(url="http://localhost:4298/mcp") as session:
...     # Test against running Marcus instance
...     result = await client.register_agent("test-1", "Test", "Developer", [])
...     assert result.get('success')

Notes

  • Inspector is for TESTING, not production agent usage

  • stdio: Best for isolated unit/integration tests

  • http: Best for testing against running Marcus instances

  • All methods simulate real agent behavior for validation

__init__(connection_type='stdio')[source]#

Initialize the Inspector testing client.

Parameters:

connection_type (Literal['stdio', 'http']) – The type of connection to use: - ‘stdio’: Spawn isolated Marcus instance (for testing) - ‘http’: Connect to running Marcus HTTP server (for integration)

Return type:

None

connect(url=None, timeout=30, sse_read_timeout=300)[source]#

Connect to Marcus MCP server using the configured connection type.

This method automatically uses the appropriate transport (stdio or HTTP) based on the connection_type specified during initialization.

Parameters:
  • url (Optional[str]) – The HTTP URL of the Marcus MCP server endpoint (required for http mode) Default: “http://localhost:4298/mcp” Ignored for stdio mode

  • timeout (float) – HTTP timeout for regular operations in seconds (http mode only) Default: 30

  • sse_read_timeout (float) – Timeout for SSE read operations in seconds (http mode only) Default: 300

Yields:

ClientSession – An active MCP client session for communicating with Marcus

Raises:
Return type:

AsyncIterator[mcp.ClientSession]

Examples

Stdio connection:

>>> client = Inspector(connection_type='stdio')
>>> async with client.connect() as session:
...     await client.register_agent("test-1", "Test", "Developer", [])

HTTP connection:

>>> client = Inspector(connection_type='http')
>>> async with client.connect(url="http://localhost:4298/mcp") as session:
...     await client.register_agent("test-1", "Test", "Developer", [])

Notes

  • stdio: Spawns isolated Marcus instance for testing

  • http: Connects to running Marcus HTTP server using streamablehttp_client

  • Session cleanup is guaranteed even if exceptions occur

async ensure_connected()[source]#

Ensure we have an active connection, attempt reconnect if needed.

Return type:

bool

async register_agent(agent_id, name, role, skills)[source]#

Simulate agent registration for testing Marcus workflows.

This method tests the agent registration workflow by simulating an agent registering with Marcus. Use this to validate that Marcus correctly handles agent registration, stores agent information, and returns proper responses.

Parameters:
  • agent_id (str) – Unique identifier for the test agent (e.g., “test-agent-001”)

  • name (str) – Human-readable display name for the test agent

  • role (str) – Agent’s role for testing (e.g., “Developer”, “QA Engineer”)

  • skills (List[str]) – List of skills to test skill-based task assignment

Returns:

Registration response containing: - success: bool indicating registration success - agent_id: str confirming the registered agent ID - message: str with registration status details

Return type:

Dict[str, Any]

Raises:

RuntimeError – If no active connection exists to Marcus server

Examples

Test agent registration:

>>> client = Inspector(connection_type='stdio')
>>> async with client.connect() as session:
...     result = await client.register_agent(
...         "test-agent-1",
...         "Test Agent",
...         "Developer",
...         ["python", "testing"]
...     )
...     assert result.get('success') is True

Notes

  • This simulates agent behavior for testing Marcus

  • Use to validate Marcus registration workflow

  • Not for production AI agent usage

async request_next_task(**kwargs)[source]#
Return type:

TypeVar(T)

Parameters:
async report_task_progress(**kwargs)[source]#
Return type:

TypeVar(T)

Parameters:
async report_blocker(**kwargs)[source]#
Return type:

TypeVar(T)

Parameters:
async get_project_status()[source]#

Test project status retrieval.

This method tests Marcus’s status reporting by fetching project metrics. Use this to validate that Marcus correctly tracks project state, calculates metrics, and provides comprehensive status information.

Returns:

Project status response containing: - project_info: Dict with project details - task_metrics: Dict with task statistics - agent_metrics: Dict with agent statistics - performance_metrics: Dict with performance data

Return type:

Dict[str, Any]

Raises:

RuntimeError – If no active connection exists to Marcus server

Examples

Test status retrieval:

>>> async with client.connect() as session:
...     status = await client.get_project_status()
...     assert 'task_metrics' in status
...     assert 'agent_metrics' in status

Notes

  • Tests Marcus status reporting

  • Validates metric calculations

  • Not for production AI agent usage

async get_agent_status(agent_id)[source]#

Test agent status retrieval.

This method tests Marcus’s agent tracking by fetching agent-specific status. Use this to validate that Marcus correctly tracks agent state, assignments, and performance.

Parameters:

agent_id (str) – Unique identifier of the agent to query

Returns:

Agent status response containing agent details and metrics

Return type:

Dict[str, Any]

Raises:

RuntimeError – If no active connection exists to Marcus server

Examples

Test agent status:

>>> async with client.connect() as session:
...     await client.register_agent("test-1", "Test", "Dev", [])
...     status = await client.get_agent_status("test-1")
...     assert status.get('agent_id') == "test-1"

Notes

  • Tests Marcus agent tracking

  • Validates agent state management

  • Not for production AI agent usage

async src.worker.create_inspector(connection_type='stdio')[source]#

Create and return a new Inspector testing client.

Parameters:

connection_type (Literal['stdio', 'http']) – The type of connection to use: - ‘stdio’: Spawn isolated Marcus instance (recommended for testing) - ‘http’: Connect to running Marcus HTTP server (recommended for integration)

Returns:

A new testing client instance ready for connection

Return type:

Inspector

Examples

>>> client = await create_inspector('stdio')
>>> async with client.connect() as session:
...     # Test Marcus workflows
...     pass
>>> client = await create_inspector('http')
>>> async with client.connect(url="http://localhost:4298/mcp") as session:
...     # Test against running instance
...     pass

Submodules#