"""
Inspector - Unified MCP Testing Client for Marcus Workflows.
This module provides a testing and development client that supports both stdio
and HTTP connections to Marcus MCP server. Inspector is designed for testing
Marcus functionality, debugging workflows, and validating MCP protocol integration.
Connection Types
----------------
- stdio: Spawns isolated Marcus instance for testing (recommended for development)
- http: Connects to running Marcus HTTP server (recommended for integration testing)
Classes
-------
Inspector
Testing client supporting both stdio and HTTP connections
Examples
--------
Test Marcus workflow with stdio (isolated testing):
>>> import asyncio
>>> from src.worker.inspector import Inspector
>>>
>>> async def test_task_assignment():
... client = Inspector(connection_type='stdio')
... async with client.connect() as session:
... # Simulate agent registration
... result = await client.register_agent(
... "test-agent",
... "Test Agent",
... "Developer",
... ["python", "testing"]
... )
...
... # Verify task assignment works
... task = await client.request_next_task("test-agent")
... assert task is not None
>>>
>>> asyncio.run(test_task_assignment())
Test Marcus workflow with HTTP (integration testing):
>>> async def test_http_workflow():
... client = Inspector(connection_type='http')
... async with client.connect(url="http://localhost:4298/mcp") as session:
... # Test against running Marcus instance
... result = await client.register_agent(
... "integration-test", "Test", "Developer", []
... )
... # Verify workflow
... assert result.get('success')
>>>
>>> asyncio.run(test_http_workflow())
Notes
-----
- Inspector is for TESTING Marcus, not for production AI agents
- Use stdio for isolated unit/integration tests
- Use http for testing against running Marcus instances
- Inherits all MCP communication methods for workflow testing
"""
import asyncio
import json
import os
import secrets
import sys
import time
from contextlib import asynccontextmanager
from typing import (
Any,
AsyncIterator,
Awaitable,
Callable,
Dict,
List,
Literal,
Optional,
TypeVar,
cast,
)
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.client.streamable_http import streamablehttp_client
from mcp.types import CallToolResult, TextContent
# Type variable for retry decorator
T = TypeVar("T")
def _extract_text_from_result(result: CallToolResult) -> str:
"""
Extract text content from MCP tool call result.
Parameters
----------
result : CallToolResult
The CallToolResult from MCP tool call
Returns
-------
str
The text content if available, empty string otherwise
"""
if not result.content:
return ""
for content_item in result.content:
# Check if it's a TextContent object or has a text attribute (for testing)
if isinstance(content_item, TextContent) or hasattr(content_item, "text"):
return str(content_item.text)
return ""
def retry_with_backoff(
max_attempts: int = 3,
initial_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True,
) -> Callable[[Callable[..., Awaitable[T]]], Callable[..., Awaitable[T]]]:
"""
Decorate operations with exponential backoff retry logic.
Parameters
----------
max_attempts : int, optional
Maximum number of retry attempts, by default 3
initial_delay : float, optional
Initial delay in seconds, by default 1.0
max_delay : float, optional
Maximum delay between retries, by default 60.0
exponential_base : float, optional
Base for exponential backoff, by default 2.0
jitter : bool, optional
Whether to add random jitter to delays, by default True
Returns
-------
Callable[[Callable[..., Awaitable[T]]], Callable[..., Awaitable[T]]]
Decorated function that retries on failure
"""
def decorator(func: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
async def wrapper(*args: Any, **kwargs: Any) -> T:
last_exception = None
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except (ConnectionError, RuntimeError, asyncio.TimeoutError) as e:
last_exception = e
if attempt == max_attempts - 1:
# Last attempt failed
print(
f"❌ {func.__name__} failed after "
f"{max_attempts} attempts: {e}"
)
raise
# Calculate delay with exponential backoff
delay = min(initial_delay * (exponential_base**attempt), max_delay)
# Add jitter if enabled
if jitter:
# Use cryptographically secure random for jitter
secure_random = secrets.SystemRandom()
delay = delay * (0.5 + secure_random.random())
print(
f"⚠️ {func.__name__} failed "
f"(attempt {attempt + 1}/{max_attempts}), "
f"retrying in {delay:.1f}s: {e}"
)
await asyncio.sleep(delay)
# Should never reach here, but just in case
raise last_exception or RuntimeError(f"{func.__name__} failed unexpectedly")
return wrapper
return decorator
[docs]
class Inspector:
"""
Unified Testing Client for Marcus MCP workflows.
This client provides a testing interface for validating Marcus functionality
via either stdio (isolated instance) or HTTP (shared server). It is designed
for development, testing, and debugging Marcus workflows, NOT for production
AI agent usage.
Inspector supports the full MCP protocol for simulating agent behavior and
validating that Marcus task assignment, progress tracking, and coordination
systems work correctly.
Attributes
----------
session : Optional[ClientSession]
The active MCP client session, None if not connected
connection_type : Literal['stdio', 'http']
The type of connection to use
Methods
-------
connect(url: Optional[str] = None) -> AsyncIterator[ClientSession]
Connect to Marcus using the configured connection type
register_agent(agent_id, name, role, skills) -> Dict[str, Any]
Simulate agent registration for testing
request_next_task(agent_id) -> Dict[str, Any]
Test task assignment workflow
report_task_progress(agent_id, task_id, status, progress, message) -> Dict[str, Any]
Test progress reporting workflow
report_blocker(agent_id, task_id, description, severity) -> Dict[str, Any]
Test blocker reporting workflow
get_project_status() -> Dict[str, Any]
Test project status retrieval
get_agent_status(agent_id) -> Dict[str, Any]
Test agent status retrieval
Examples
--------
Test task assignment with stdio:
>>> client = Inspector(connection_type='stdio')
>>> async with client.connect() as session:
... # Simulate agent workflow for testing
... result = await client.register_agent("test-1", "Test", "Developer", [])
... task = await client.request_next_task("test-1")
... assert task is not None
Test with HTTP connection:
>>> client = Inspector(connection_type='http')
>>> async with client.connect(url="http://localhost:4298/mcp") as session:
... # Test against running Marcus instance
... result = await client.register_agent("test-1", "Test", "Developer", [])
... assert result.get('success')
Notes
-----
- Inspector is for TESTING, not production agent usage
- stdio: Best for isolated unit/integration tests
- http: Best for testing against running Marcus instances
- All methods simulate real agent behavior for validation
"""
[docs]
def __init__(self, connection_type: Literal["stdio", "http"] = "stdio") -> None:
"""
Initialize the Inspector testing client.
Parameters
----------
connection_type : Literal['stdio', 'http'], default='stdio'
The type of connection to use:
- 'stdio': Spawn isolated Marcus instance (for testing)
- 'http': Connect to running Marcus HTTP server (for integration)
"""
self.session: Optional[ClientSession] = None
self.connection_type = connection_type
self._connection_attempts = 0
self._last_connection_time = 0.0
[docs]
@asynccontextmanager
async def connect(
self,
url: Optional[str] = None,
timeout: float = 30,
sse_read_timeout: float = 300,
) -> AsyncIterator[ClientSession]:
"""
Connect to Marcus MCP server using the configured connection type.
This method automatically uses the appropriate transport (stdio or HTTP)
based on the connection_type specified during initialization.
Parameters
----------
url : str, optional
The HTTP URL of the Marcus MCP server endpoint (required for http mode)
Default: "http://localhost:4298/mcp"
Ignored for stdio mode
timeout : float, optional
HTTP timeout for regular operations in seconds (http mode only)
Default: 30
sse_read_timeout : float, optional
Timeout for SSE read operations in seconds (http mode only)
Default: 300
Yields
------
ClientSession
An active MCP client session for communicating with Marcus
Raises
------
RuntimeError
If connection fails
ValueError
If http mode is selected but no URL is provided
Examples
--------
Stdio connection:
>>> client = Inspector(connection_type='stdio')
>>> async with client.connect() as session:
... await client.register_agent("test-1", "Test", "Developer", [])
HTTP connection:
>>> client = Inspector(connection_type='http')
>>> async with client.connect(url="http://localhost:4298/mcp") as session:
... await client.register_agent("test-1", "Test", "Developer", [])
Notes
-----
- stdio: Spawns isolated Marcus instance for testing
- http: Connects to running Marcus HTTP server using streamablehttp_client
- Session cleanup is guaranteed even if exceptions occur
"""
if self.connection_type == "stdio":
# Stdio connection - spawn isolated Marcus instance
project_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..")
)
server_cmd = [
sys.executable,
"-m",
"src.marcus_mcp.server",
"--stdio", # Force stdio mode to avoid port conflicts
]
# Inherit current environment and add PYTHONPATH
env = os.environ.copy()
env["PYTHONPATH"] = project_root
server_params = StdioServerParameters(
command=server_cmd[0], args=server_cmd[1:], env=env
)
async with stdio_client(server_params) as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
self.session = session
await session.initialize()
# List available tools to verify connection
tools_response = await session.list_tools()
if hasattr(tools_response, "tools"):
tools = tools_response.tools
else:
tools = cast(List[Any], tools_response)
tool_names = [t.name for t in tools]
print(
f"Connected to Marcus (stdio). "
f"Available tools: {tool_names}"
)
yield session
elif self.connection_type == "http":
# HTTP connection - connect to running Marcus server
if url is None:
url = "http://localhost:4298/mcp"
async with streamablehttp_client(
url,
timeout=timeout,
sse_read_timeout=sse_read_timeout,
) as (read_stream, write_stream, get_session_id):
async with ClientSession(read_stream, write_stream) as session:
self.session = session
await session.initialize()
# Get session ID from the transport
session_id = get_session_id()
if session_id:
print(f"Connected to Marcus HTTP server at {url}")
print(f"Session ID: {session_id}")
# List available tools to verify connection
tools_response = await session.list_tools()
if hasattr(tools_response, "tools"):
tools = tools_response.tools
else:
tools = cast(List[Any], tools_response)
tool_names = [t.name for t in tools]
print(f"Available tools: {', '.join(tool_names)}")
yield session
else:
raise ValueError(
f"Invalid connection_type: {self.connection_type}. "
"Must be 'stdio' or 'http'"
)
def _should_attempt_reconnect(self) -> bool:
"""Check if we should attempt to reconnect based on recent attempts."""
current_time = time.time()
# Reset attempt counter if it's been more than 5 minutes
if current_time - self._last_connection_time > 300:
self._connection_attempts = 0
# Allow up to 5 rapid reconnection attempts
return self._connection_attempts < 5
[docs]
async def ensure_connected(self) -> bool:
"""Ensure we have an active connection, attempt reconnect if needed."""
if self.session and not getattr(self.session, "_closed", True):
return True
if not self._should_attempt_reconnect():
print("⚠️ Too many reconnection attempts, please wait before retrying")
return False
print("🔄 Attempting to reconnect to Marcus...")
self._connection_attempts += 1
self._last_connection_time = time.time()
try:
# Try to establish new connection
async with self.connect() as session:
self.session = session
print("✅ Reconnected to Marcus successfully")
self._connection_attempts = 0 # Reset on success
return True
except Exception as e:
print(f"❌ Reconnection failed: {e}")
return False
[docs]
async def register_agent(
self, agent_id: str, name: str, role: str, skills: List[str]
) -> Dict[str, Any]:
"""
Simulate agent registration for testing Marcus workflows.
This method tests the agent registration workflow by simulating an agent
registering with Marcus. Use this to validate that Marcus correctly handles
agent registration, stores agent information, and returns proper responses.
Parameters
----------
agent_id : str
Unique identifier for the test agent (e.g., "test-agent-001")
name : str
Human-readable display name for the test agent
role : str
Agent's role for testing (e.g., "Developer", "QA Engineer")
skills : List[str]
List of skills to test skill-based task assignment
Returns
-------
Dict[str, Any]
Registration response containing:
- success: bool indicating registration success
- agent_id: str confirming the registered agent ID
- message: str with registration status details
Raises
------
RuntimeError
If no active connection exists to Marcus server
Examples
--------
Test agent registration:
>>> client = Inspector(connection_type='stdio')
>>> async with client.connect() as session:
... result = await client.register_agent(
... "test-agent-1",
... "Test Agent",
... "Developer",
... ["python", "testing"]
... )
... assert result.get('success') is True
Notes
-----
- This simulates agent behavior for testing Marcus
- Use to validate Marcus registration workflow
- Not for production AI agent usage
"""
if not self.session:
raise RuntimeError("Not connected to Marcus")
result = await self.session.call_tool(
"register_agent",
arguments={
"agent_id": agent_id,
"name": name,
"role": role,
"skills": skills,
},
)
text_content = _extract_text_from_result(result)
return json.loads(text_content) if text_content else {}
[docs]
@retry_with_backoff(max_attempts=3, initial_delay=2.0)
async def request_next_task(self, agent_id: str) -> Dict[str, Any]:
"""
Test task assignment workflow by requesting next task.
This method tests Marcus's task assignment logic by simulating an agent
requesting work. Use this to validate that Marcus correctly assigns tasks,
respects dependencies, and provides proper task information.
Parameters
----------
agent_id : str
Unique identifier of the test agent requesting a task
Returns
-------
Dict[str, Any]
Task assignment response containing:
- task: Dict[str, Any] or None with task details if assigned
- message: str with status about task assignment
Raises
------
RuntimeError
If no active connection exists to Marcus server
Examples
--------
Test task assignment:
>>> client = Inspector(connection_type='stdio')
>>> async with client.connect() as session:
... await client.register_agent("test-1", "Test", "Dev", [])
... task = await client.request_next_task("test-1")
... if task.get('task'):
... print(f"Task assigned: {task['task']['id']}")
Notes
-----
- Tests Marcus task assignment logic
- Validates dependency resolution
- Not for production AI agent usage
"""
if not self.session:
raise RuntimeError("Not connected to Marcus")
result = await self.session.call_tool(
"request_next_task", arguments={"agent_id": agent_id}
)
text_content = _extract_text_from_result(result)
return json.loads(text_content) if text_content else {}
[docs]
@retry_with_backoff(max_attempts=3, initial_delay=1.0, max_delay=30.0)
async def report_task_progress(
self,
agent_id: str,
task_id: str,
status: str,
progress: int = 0,
message: str = "",
) -> Dict[str, Any]:
"""
Test progress reporting workflow.
This method tests Marcus's task progress tracking by simulating progress
reports. Use this to validate that Marcus correctly updates task status,
tracks completion, and handles status transitions.
Parameters
----------
agent_id : str
Unique identifier of the test agent reporting progress
task_id : str
Unique identifier of the task being updated
status : str
Task status: "in_progress", "completed", "blocked", "paused"
progress : int, optional
Completion percentage from 0 to 100, by default 0
message : str, optional
Descriptive message about progress, by default ""
Returns
-------
Dict[str, Any]
Progress report response containing:
- success: bool indicating if report was processed
- task_id: str confirming the updated task
- status: str confirming the new status
Raises
------
RuntimeError
If no active connection exists to Marcus server
Examples
--------
Test progress reporting:
>>> async with client.connect() as session:
... await client.report_task_progress(
... "test-1", "task-123", "in_progress", 50,
... "Halfway complete"
... )
... await client.report_task_progress(
... "test-1", "task-123", "completed", 100,
... "Task finished"
... )
Notes
-----
- Tests Marcus progress tracking
- Validates status transitions
- Not for production AI agent usage
"""
if not self.session:
raise RuntimeError("Not connected to Marcus")
result = await self.session.call_tool(
"report_task_progress",
arguments={
"agent_id": agent_id,
"task_id": task_id,
"status": status,
"progress": progress,
"message": message,
},
)
text_content = _extract_text_from_result(result)
return json.loads(text_content) if text_content else {}
[docs]
@retry_with_backoff(max_attempts=3, initial_delay=1.0, max_delay=30.0)
async def report_blocker(
self,
agent_id: str,
task_id: str,
blocker_description: str,
severity: str = "medium",
) -> Dict[str, Any]:
"""
Test blocker reporting workflow.
This method tests Marcus's blocker handling by simulating blocker reports.
Use this to validate that Marcus correctly records blockers, generates
AI suggestions, and handles escalation.
Parameters
----------
agent_id : str
Unique identifier of the test agent reporting the blocker
task_id : str
Unique identifier of the blocked task
blocker_description : str
Detailed description of the blocking issue
severity : str, optional
Severity level: "low", "medium", "high", by default "medium"
Returns
-------
Dict[str, Any]
Blocker report response containing:
- success: bool indicating if blocker was recorded
- blocker_id: str unique identifier for this blocker
- suggestions: List[str] AI-generated resolution suggestions
Raises
------
RuntimeError
If no active connection exists to Marcus server
Examples
--------
Test blocker reporting:
>>> async with client.connect() as session:
... result = await client.report_blocker(
... "test-1", "task-123",
... "Database connection failed",
... "high"
... )
... assert 'suggestions' in result
Notes
-----
- Tests Marcus blocker handling
- Validates AI suggestion generation
- Not for production AI agent usage
"""
if not self.session:
raise RuntimeError("Not connected to Marcus")
result = await self.session.call_tool(
"report_blocker",
arguments={
"agent_id": agent_id,
"task_id": task_id,
"blocker_description": blocker_description,
"severity": severity,
},
)
text_content = _extract_text_from_result(result)
return json.loads(text_content) if text_content else {}
[docs]
async def get_project_status(self) -> Dict[str, Any]:
"""
Test project status retrieval.
This method tests Marcus's status reporting by fetching project metrics.
Use this to validate that Marcus correctly tracks project state, calculates
metrics, and provides comprehensive status information.
Returns
-------
Dict[str, Any]
Project status response containing:
- project_info: Dict with project details
- task_metrics: Dict with task statistics
- agent_metrics: Dict with agent statistics
- performance_metrics: Dict with performance data
Raises
------
RuntimeError
If no active connection exists to Marcus server
Examples
--------
Test status retrieval:
>>> async with client.connect() as session:
... status = await client.get_project_status()
... assert 'task_metrics' in status
... assert 'agent_metrics' in status
Notes
-----
- Tests Marcus status reporting
- Validates metric calculations
- Not for production AI agent usage
"""
if not self.session:
raise RuntimeError("Not connected to Marcus")
result = await self.session.call_tool("get_project_status", arguments={})
text_content = _extract_text_from_result(result)
return json.loads(text_content) if text_content else {}
[docs]
async def get_agent_status(self, agent_id: str) -> Dict[str, Any]:
"""
Test agent status retrieval.
This method tests Marcus's agent tracking by fetching agent-specific status.
Use this to validate that Marcus correctly tracks agent state, assignments,
and performance.
Parameters
----------
agent_id : str
Unique identifier of the agent to query
Returns
-------
Dict[str, Any]
Agent status response containing agent details and metrics
Raises
------
RuntimeError
If no active connection exists to Marcus server
Examples
--------
Test agent status:
>>> async with client.connect() as session:
... await client.register_agent("test-1", "Test", "Dev", [])
... status = await client.get_agent_status("test-1")
... assert status.get('agent_id') == "test-1"
Notes
-----
- Tests Marcus agent tracking
- Validates agent state management
- Not for production AI agent usage
"""
if not self.session:
raise RuntimeError("Not connected to Marcus")
result = await self.session.call_tool(
"get_agent_status", arguments={"agent_id": agent_id}
)
text_content = _extract_text_from_result(result)
return json.loads(text_content) if text_content else {}
# Convenience function to maintain API compatibility
[docs]
async def create_inspector(
connection_type: Literal["stdio", "http"] = "stdio",
) -> Inspector:
"""
Create and return a new Inspector testing client.
Parameters
----------
connection_type : Literal['stdio', 'http'], default='stdio'
The type of connection to use:
- 'stdio': Spawn isolated Marcus instance (recommended for testing)
- 'http': Connect to running Marcus HTTP server (recommended for integration)
Returns
-------
Inspector
A new testing client instance ready for connection
Examples
--------
>>> client = await create_inspector('stdio')
>>> async with client.connect() as session:
... # Test Marcus workflows
... pass
>>> client = await create_inspector('http')
>>> async with client.connect(url="http://localhost:4298/mcp") as session:
... # Test against running instance
... pass
"""
return Inspector(connection_type)