Source code for src.integrations.kanban_client

"""Simple MCP Kanban Client for reliable task management.

This module provides a simplified client for interacting with the kanban-mcp server,
focusing on reliability and following proven patterns that work consistently.

The client handles:
- Task retrieval from kanban boards
- Task assignment to agents
- Board status monitoring
- Automatic configuration loading

Notes
-----
This implementation avoids persistent connections and creates a new MCP session
for each operation to ensure reliability.
"""

import json
import logging
import os
import sys
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, cast

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.types import TextContent

from src.core.models import Priority, Task, TaskStatus

logger = logging.getLogger(__name__)


[docs] class KanbanClient: """ Simple MCP Kanban client that follows proven patterns for reliability. This client creates a new MCP session for each operation rather than maintaining persistent connections, which has proven more reliable in practice. Attributes ---------- board_id : Optional[str] ID of the kanban board to work with project_id : Optional[str] ID of the project associated with the board Examples -------- >>> client = KanbanClient() >>> tasks = await client.get_available_tasks() >>> for task in tasks: ... print(f"Task: {task.name} - Priority: {task.priority.value}") Notes ----- Planka credentials are loaded from environment variables or set to defaults. Board and project IDs are loaded from config_marcus.json if available. """
[docs] def __init__(self) -> None: """ Initialize the Simple MCP Kanban Client. Loads configuration from config_marcus.json and sets up Planka environment variables. Config file takes precedence. """ # Initialize attributes self.board_id: Optional[str] = None self.project_id: Optional[str] = None # Defer kanban-mcp path detection to first use. Resolving eagerly # blocked startup for non-Planka providers (SQLite, GitHub, Linear) # that never call the subprocess. _kanban_mcp_path is resolved on # first access via the kanban_mcp_path property. self._kanban_mcp_path: Optional[str] = None # Load config first - this may set environment variables self._load_config() # If config didn't have IDs, try loading from environment variables # (set by Planka provider when switching projects) if self.project_id is None: self.project_id = os.environ.get("PLANKA_PROJECT_ID") if self.board_id is None: self.board_id = os.environ.get("PLANKA_BOARD_ID") # If still not set, try loading from workspace state workspace_state = None if self.project_id is None: workspace_state = self._load_workspace_state() if workspace_state: self.project_id = workspace_state.get("project_id") if self.board_id is None: if workspace_state is None: workspace_state = self._load_workspace_state() if workspace_state: self.board_id = workspace_state.get("board_id") if workspace_state: logger.info( "Loaded project_id and board_id from workspace state: " f"project={self.project_id}, board={self.board_id}" ) # Set environment for Planka from .env or use defaults # (only if not already set by config) if "PLANKA_BASE_URL" not in os.environ: os.environ["PLANKA_BASE_URL"] = "http://localhost:3333" if "PLANKA_AGENT_EMAIL" not in os.environ: os.environ["PLANKA_AGENT_EMAIL"] = "demo@demo.demo" if "PLANKA_AGENT_PASSWORD" not in os.environ: os.environ["PLANKA_AGENT_PASSWORD"] = "demo" # nosec B105
def _is_running_in_docker(self) -> bool: """ Detect if Marcus is running inside a Docker container. Returns ------- bool True if running in Docker, False otherwise Notes ----- Checks for common Docker environment indicators: - /.dockerenv file exists - Running on Alpine Linux (common in Docker) - Container-specific cgroup entries """ from pathlib import Path # Check 1: /.dockerenv file (most reliable) if Path("/.dockerenv").exists(): return True # Check 2: Check cgroup for docker/containerd try: if Path("/proc/1/cgroup").exists(): with open("/proc/1/cgroup", "r") as f: content = f.read() if "docker" in content or "containerd" in content: return True except Exception: pass # nosec B110 - Intentional fallback for environment detection # Check 3: Check if hostname is a container ID (12 char hex) try: import socket hostname = socket.gethostname() # Docker container hostnames are typically 12 character hex strings if len(hostname) == 12 and all(c in "0123456789abcdef" for c in hostname): return True except Exception: pass # nosec B110 - Intentional fallback for environment detection return False def _adjust_planka_url_for_environment(self, base_url: str) -> str: """ Adjust Planka base URL based on environment (Docker vs local). If running in Docker, use Docker service names (planka:1337). If running locally, convert to localhost. Parameters ---------- base_url : str Base URL from config (e.g., "http://planka:1337") Returns ------- str Adjusted URL appropriate for current environment Examples -------- >>> # In Docker >>> client._adjust_planka_url_for_environment("http://planka:1337") 'http://planka:1337' >>> # Locally >>> client._adjust_planka_url_for_environment("http://planka:1337") 'http://localhost:3333' """ in_docker = self._is_running_in_docker() # If in Docker, keep the URL as-is (use service names) if in_docker: logger.info(f"Running in Docker - using Planka URL as-is: {base_url}") return base_url # If local, convert Docker service names to localhost # Handle common patterns: # - http://planka:1337 -> http://localhost:3333 # - http://planka -> http://localhost:3333 if "planka:" in base_url or base_url.endswith("planka"): # Extract protocol if base_url.startswith("https://"): local_url = "https://localhost:3333" else: local_url = "http://localhost:3333" logger.info( f"Running locally - converted Planka URL: {base_url} -> {local_url}" ) return local_url # If it's already localhost or an IP, keep it logger.info(f"Using Planka URL from config: {base_url}") return base_url @property def kanban_mcp_path(self) -> str: """Lazily resolve and cache the kanban-mcp executable path. Raises ------ FileNotFoundError If kanban-mcp cannot be found. Raised on first Planka call, not at construction time, so non-Planka providers are unaffected. """ if self._kanban_mcp_path is None: self._kanban_mcp_path = self._get_kanban_mcp_path() return self._kanban_mcp_path def _get_kanban_mcp_path(self) -> str: """ Automatically detect kanban-mcp path. Checks in priority order: 1. KANBAN_MCP_PATH environment variable (user override) 2. /app/kanban-mcp/dist/index.js (Docker) 3. ../kanban-mcp/dist/index.js (sibling directory for local dev) Returns ------- str Path to kanban-mcp index.js Raises ------ FileNotFoundError If kanban-mcp cannot be found in any expected location """ from pathlib import Path # 1. Check environment variable (highest priority) if env_path := os.getenv("KANBAN_MCP_PATH"): # Expand ~ and environment variables env_path_obj = Path(env_path).expanduser() if env_path_obj.exists(): logger.info( f"Using kanban-mcp from KANBAN_MCP_PATH: " f"{env_path_obj}" ) return str(env_path_obj) else: logger.warning( f"KANBAN_MCP_PATH set to {env_path} but file " f"doesn't exist at {env_path_obj}" ) # 2. Check Docker path docker_path = Path("/app/kanban-mcp/dist/index.js") if docker_path.exists(): logger.info(f"Using kanban-mcp from Docker: {docker_path}") return str(docker_path) # 3. Check sibling directory (../kanban-mcp relative to marcus/) # Path(__file__) -> kanban_client.py # .parent -> integrations/ # .parent -> src/ # .parent -> marcus/ # .parent -> parent directory containing marcus/ marcus_root = Path(__file__).parent.parent.parent sibling_path = marcus_root.parent / "kanban-mcp" / "dist" / "index.js" if sibling_path.exists(): logger.info(f"Using kanban-mcp from sibling directory: {sibling_path}") return str(sibling_path) # 4. Give up with helpful message raise FileNotFoundError( "Could not find kanban-mcp. Please either:\n" " 1. Set KANBAN_MCP_PATH environment variable to point to " "kanban-mcp/dist/index.js\n" f" 2. Clone kanban-mcp as sibling directory: " f"{marcus_root.parent}/kanban-mcp\n" " 3. Run in Docker where it's at /app/kanban-mcp\n" f"\nSearched in:\n" f" - KANBAN_MCP_PATH env var\n" f" - {docker_path}\n" f" - {sibling_path}" ) def _load_config(self) -> None: """ Load configuration from config_marcus.json file. Reads project_id, board_id, and Planka credentials from the configuration file if it exists. Prints confirmation message to stderr for debugging. Notes ----- The config file is searched in multiple locations: 1. Current working directory 2. Project root (relative to this file) """ # Try multiple locations for config file from pathlib import Path config_paths = [ Path("config_marcus.json"), # Current directory # Project root Path(__file__).parent.parent.parent / "config_marcus.json", ] config_path = None for path in config_paths: if path.exists(): config_path = path break if config_path: with open(config_path, "r") as f: config = json.load(f) self.project_id = config.get("project_id") self.board_id = config.get("board_id") # Load Planka credentials from config if available planka_config = config.get("planka", {}) if planka_config.get("base_url"): # Auto-adjust base_url based on environment base_url = planka_config["base_url"] base_url = self._adjust_planka_url_for_environment(base_url) os.environ["PLANKA_BASE_URL"] = base_url if planka_config.get("email"): os.environ["PLANKA_AGENT_EMAIL"] = planka_config["email"] if planka_config.get("password"): os.environ["PLANKA_AGENT_PASSWORD"] = planka_config["password"] # Config loaded successfully # Don't print - interferes with MCP stdio else: print( "❌ config_marcus.json not found in any of these " "locations:", file=sys.stderr, ) for path in config_paths: print(f" - {path.absolute()}", file=sys.stderr)
[docs] async def get_available_tasks(self) -> List[Task]: """ Get all unassigned tasks from the kanban board. Retrieves tasks that are in "available" states (TODO, BACKLOG, READY) and have not been assigned to any agent. Returns ------- List[Task] List of unassigned tasks sorted by priority Raises ------ RuntimeError If board_id is not set in configuration Examples -------- >>> client = KanbanClient() >>> tasks = await client.get_available_tasks() >>> print(f"Found {len(tasks)} available tasks") Notes ----- This method creates a new MCP session for the operation. Tasks are filtered based on their list name (TODO, BACKLOG, etc.) and whether they have an assigned_to field. """ if not self.board_id: raise RuntimeError("Board ID not set") # Use the exact same pattern as working scripts server_params = StdioServerParameters( command="node", args=[self.kanban_mcp_path], env=os.environ.copy(), ) async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: await session.initialize() # First get all lists for the board lists_result = await session.call_tool( "mcp_kanban_list_manager", {"action": "get_all", "boardId": self.board_id}, ) all_cards = [] if ( lists_result and hasattr(lists_result, "content") and lists_result.content ): first_content = cast(TextContent, lists_result.content[0]) lists_data = json.loads(first_content.text) lists = ( lists_data if isinstance(lists_data, list) else lists_data.get("items", []) ) # Get cards from each list for lst in lists: list_id = lst.get("id") if list_id: # Get cards for this list cards_result = await session.call_tool( "mcp_kanban_card_manager", {"action": "get_all", "listId": list_id}, ) if ( cards_result and hasattr(cards_result, "content") and cards_result.content ): first_content = cast( TextContent, cards_result.content[0] ) cards_text = first_content.text if cards_text and cards_text.strip(): cards_data = json.loads(cards_text) cards_list = ( cards_data if isinstance(cards_data, list) else cards_data.get("items", []) ) # Add list name to each card for card in cards_list: card["listName"] = lst.get("name", "") all_cards.append(card) # Fetch detailed card information including labels # NOTE: Planka's list card API doesn't include labels, use get_details try: for card in all_cards: card_id = card.get("id") if not card_id: continue # Get full card details which includes labels details_result = await session.call_tool( "mcp_kanban_card_manager", {"action": "get_details", "cardId": card_id}, ) if ( details_result and hasattr(details_result, "content") and details_result.content ): first_content = cast(TextContent, details_result.content[0]) card_details = json.loads(first_content.text) # Update card with label data from details # kanban-mcp now returns FILTERED labels # (as of fix in feature/fix-card-label-filtering) # No need to filter again, use labels directly if "labels" in card_details: card["labels"] = card_details["labels"] except Exception as e: logger.error( f"Failed to fetch card details for labels: {e}", exc_info=True, ) # Continue without labels rather than failing entirely # First, convert ALL cards to tasks to build complete ID mapping all_tasks = [] for card in all_cards: task = self._card_to_task(card) all_tasks.append(task) # Build mapping of original IDs to new IDs from ALL tasks # This ensures completed/assigned tasks can still be # resolved as dependencies id_mapping = {} for task in all_tasks: if hasattr(task, "_original_id") and task._original_id: id_mapping[task._original_id] = task.id # Resolve dependencies using the complete mapping if id_mapping: logger.debug( f"Resolving dependencies with ID mapping: {id_mapping}" ) for task in all_tasks: if task.dependencies: resolved_deps = [] for dep_id in task.dependencies: if dep_id in id_mapping: # Dependency exists on the board - resolve it resolved_id = id_mapping[dep_id] logger.debug( f"Resolved dependency {dep_id} -> {resolved_id}" ) resolved_deps.append(resolved_id) else: # Dependency doesn't exist on the board # Check if it's already a board ID if dep_id in [t.id for t in all_tasks]: # It's a valid board ID, keep it resolved_deps.append(dep_id) else: # Orphaned dependency - skip it logger.warning( f"Skipping orphaned dependency " f"'{dep_id}' for task " f"'{task.name}'" ) task.dependencies = resolved_deps # Now filter for available tasks (after dependency resolution) tasks = [] # Only include tasks in TODO status that aren't assigned for task in all_tasks: if not task.assigned_to and task.status == TaskStatus.TODO: tasks.append(task) return tasks
[docs] async def get_all_tasks(self) -> List[Task]: """ Get all tasks from the kanban board regardless of status or assignment. Retrieves tasks from all lists on the board, including assigned, unassigned, completed, and blocked tasks. Returns ------- List[Task] List of all tasks on the board Raises ------ RuntimeError If board_id is not set in configuration Examples -------- >>> client = KanbanClient() >>> tasks = await client.get_all_tasks() >>> print(f"Total tasks on board: {len(tasks)}") Notes ----- This method creates a new MCP session for the operation. Unlike get_available_tasks(), this includes tasks in all states and with any assignment status. """ if not self.board_id: raise RuntimeError("Board ID not set") # Use the exact same pattern as working scripts server_params = StdioServerParameters( command="node", args=[self.kanban_mcp_path], env=os.environ.copy(), ) async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: await session.initialize() # First get all lists for the board lists_result = await session.call_tool( "mcp_kanban_list_manager", {"action": "get_all", "boardId": self.board_id}, ) all_cards = [] if ( lists_result and hasattr(lists_result, "content") and lists_result.content ): first_content = cast(TextContent, lists_result.content[0]) lists_data = json.loads(first_content.text) lists = ( lists_data if isinstance(lists_data, list) else lists_data.get("items", []) ) # Get cards from each list for lst in lists: list_id = lst.get("id") if list_id: # Get cards for this list cards_result = await session.call_tool( "mcp_kanban_card_manager", {"action": "get_all", "listId": list_id}, ) if ( cards_result and hasattr(cards_result, "content") and cards_result.content ): first_content = cast( TextContent, cards_result.content[0] ) cards_text = first_content.text if cards_text and cards_text.strip(): cards_data = json.loads(cards_text) cards_list = ( cards_data if isinstance(cards_data, list) else cards_data.get("items", []) ) # Add list name to each card for card in cards_list: card["listName"] = lst.get("name", "") all_cards.append(card) # Fetch detailed card information including labels # NOTE: Planka's list card API doesn't include labels, use get_details try: for card in all_cards: card_id = card.get("id") if not card_id: continue # Get full card details which includes labels details_result = await session.call_tool( "mcp_kanban_card_manager", {"action": "get_details", "cardId": card_id}, ) if ( details_result and hasattr(details_result, "content") and details_result.content ): first_content = cast(TextContent, details_result.content[0]) card_details = json.loads(first_content.text) # Update card with label data from details # kanban-mcp now returns FILTERED labels # (as of fix in feature/fix-card-label-filtering) # No need to filter again, use labels directly if "labels" in card_details: card["labels"] = card_details["labels"] except Exception as e: logger.error( f"Failed to fetch card details for labels: {e}", exc_info=True, ) # Continue without labels rather than failing entirely tasks = [] # Convert all cards to tasks (no filtering) for card in all_cards: task = self._card_to_task(card) tasks.append(task) # Build mapping of original IDs to new IDs id_mapping = {} for task in tasks: if hasattr(task, "_original_id") and task._original_id: id_mapping[task._original_id] = task.id # Resolve dependencies using the mapping if id_mapping: logger.debug( f"Resolving dependencies with ID mapping: {id_mapping}" ) for task in tasks: if task.dependencies: resolved_deps = [] for dep_id in task.dependencies: if dep_id in id_mapping: # Dependency exists on the board - resolve it resolved_id = id_mapping[dep_id] logger.debug( f"Resolved dependency {dep_id} -> {resolved_id}" ) resolved_deps.append(resolved_id) else: # Dependency doesn't exist on the board # Check if it's already a board ID if dep_id in [t.id for t in tasks]: # It's a valid board ID, keep it resolved_deps.append(dep_id) else: # Orphaned dependency - skip it logger.warning( f"Skipping orphaned dependency " f"'{dep_id}' for task " f"'{task.name}'" ) task.dependencies = resolved_deps return tasks
# If no lists were found or lists_result was empty, return empty list
[docs] async def assign_task(self, task_id: str, agent_id: str) -> None: """ Assign a task to an agent. This method: 1. Adds a comment to the task indicating assignment 2. Moves the task to the "In Progress" list Parameters ---------- task_id : str ID of the task to assign agent_id : str ID of the agent receiving the assignment Examples -------- >>> await client.assign_task("card-123", "agent-001") Notes ----- The task is automatically moved to the first list containing "progress" in its name (case-insensitive). """ server_params = StdioServerParameters( command="node", args=[self.kanban_mcp_path], env=os.environ.copy(), ) async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: await session.initialize() # Add comment await session.call_tool( "mcp_kanban_comment_manager", { "action": "create", "cardId": task_id, "text": ( f"📋 Task assigned to {agent_id} at " f"{datetime.now(timezone.utc).isoformat()}" ), }, ) # Move to In Progress # First get lists lists_result = await session.call_tool( "mcp_kanban_list_manager", {"action": "get_all", "boardId": self.board_id}, ) if lists_result and hasattr(lists_result, "content"): first_content = cast(TextContent, lists_result.content[0]) lists_data = json.loads(first_content.text) lists = ( lists_data if isinstance(lists_data, list) else lists_data.get("items", []) ) # Find In Progress list in_progress_list = None for lst in lists: if "progress" in lst.get("name", "").lower(): in_progress_list = lst break if in_progress_list: # Move card await session.call_tool( "mcp_kanban_card_manager", { "action": "move", "id": task_id, "listId": in_progress_list["id"], }, )
[docs] async def get_board_summary(self) -> Dict[str, Any]: """ Get summary statistics for the kanban board. Returns ------- Dict[str, Any] Board statistics including task counts, completion percentage, and other metrics provided by the kanban-mcp server Raises ------ RuntimeError If board_id is not set in configuration Examples -------- >>> summary = await client.get_board_summary() >>> print(f"Completion: {summary.get('completionPercentage', 0)}%") Notes ----- The exact structure of the summary depends on the kanban-mcp implementation. Typically includes totalCards, completionPercentage, and counts by status. """ if not self.board_id: raise RuntimeError("Board ID not set") server_params = StdioServerParameters( command="node", args=[self.kanban_mcp_path], env=os.environ.copy(), ) async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: await session.initialize() result = await session.call_tool( "mcp_kanban_project_board_manager", { "action": "get_board_summary", "boardId": self.board_id, "includeTaskDetails": False, }, ) if result and hasattr(result, "content"): first_content = cast(TextContent, result.content[0]) parsed_result = json.loads(first_content.text) if isinstance(parsed_result, dict): return parsed_result else: return {"data": parsed_result} return {}
def _is_available_task(self, card: Dict[str, Any]) -> bool: """ Check if a task is in an available state. Parameters ---------- card : Dict[str, Any] Card data from the kanban board Returns ------- bool True if the task is in an available state (TODO, BACKLOG, READY) Notes ----- Available states are determined by the list name containing specific keywords (case-insensitive). """ list_name = card.get("listName", "").upper() available_states = ["TODO", "TO DO", "BACKLOG", "READY"] return any(state in list_name for state in available_states) def _card_to_task(self, card: Dict[str, Any]) -> Task: """ Convert a kanban card to a Task object. Parameters ---------- card : Dict[str, Any] Card data from the kanban board containing fields like id, name/title, description, listName, etc. Returns ------- Task Task object with data mapped from the card Examples -------- >>> card = {"id": "123", "name": "Fix bug", "listName": "TODO"} >>> task = client._card_to_task(card) >>> print(task.status) # TaskStatus.TODO Notes ----- - Status is determined by the list name (DONE, PROGRESS, BLOCKED, TODO) - Priority defaults to MEDIUM if not specified - Dates default to current time if not provided - assigned_to is extracted from card users/assignment fields """ task_name = card.get("name") or card.get("title", "") # Parse dates created_at = datetime.now(timezone.utc) updated_at = datetime.now(timezone.utc) # Determine status list_name = card.get("listName", "").upper() if "DONE" in list_name: status = TaskStatus.DONE elif "PROGRESS" in list_name: status = TaskStatus.IN_PROGRESS elif "BLOCKED" in list_name or "ON HOLD" in list_name: status = TaskStatus.BLOCKED else: status = TaskStatus.TODO # Extract assignment information # Check for assignments in the card data structure assigned_to = None # Try different possible assignment fields if card.get("users"): # Planka assigns users to cards users = card.get("users", []) if users and len(users) > 0: # Take the first assigned user as the assignee assigned_to = ( users[0].get("username") or users[0].get("email") or users[0].get("name") ) elif card.get("assignedTo"): # Alternative field name assigned_to = card.get("assignedTo") elif card.get("assigned_to"): # Another alternative assigned_to = card.get("assigned_to") # Parse dependencies from description if they exist description = card.get("description", "") dependencies = self._parse_dependencies_from_description(description) original_id = self._parse_original_id_from_description(description) estimated_hours = self._parse_estimated_hours_from_description(description) # Parse labels from the card labels = [] if card.get("labels"): for label in card.get("labels", []): if isinstance(label, dict) and label.get("name"): labels.append(label["name"]) elif isinstance(label, str): labels.append(label) task = Task( id=card.get("id", ""), name=task_name, description=description, status=status, priority=Priority.MEDIUM, assigned_to=assigned_to, created_at=created_at, updated_at=updated_at, due_date=None, estimated_hours=estimated_hours, actual_hours=0.0, dependencies=dependencies, labels=labels, ) # Store original ID as a custom attribute if original_id: setattr(task, "_original_id", original_id) return task def _parse_dependencies_from_description(self, description: str) -> List[str]: """ Parse task dependencies from the description field. Dependencies are stored in the description as: 🔗 Dependencies: task_id_1, task_id_2, task_id_3 Parameters ---------- description : str Task description that may contain dependencies Returns ------- List[str] List of task IDs that this task depends on """ if not description: return [] import re # Look for the dependencies line pattern = r"🔗 Dependencies:\s*([^\n]+)" match = re.search(pattern, description) if match: deps_str = match.group(1) # Split by comma and clean up each dependency ID dependencies = [dep.strip() for dep in deps_str.split(",") if dep.strip()] return dependencies return [] def _parse_original_id_from_description(self, description: str) -> Optional[str]: """ Parse the original task ID from the description field. Original ID is stored in the description as: 🏷️ Original ID: task_get_hello_design Parameters ---------- description : str Task description that may contain original ID Returns ------- Optional[str] Original task ID if found, None otherwise """ if not description: return None import re # Look for the original ID line pattern = r"🏷️ Original ID:\s*([^\n]+)" match = re.search(pattern, description) if match: return match.group(1).strip() return None def _parse_estimated_hours_from_description(self, description: str) -> float: """ Parse estimated hours from the description field. Estimated hours are stored in the description as: ⏱️ Estimated: 8 hours or ⏱️ Estimated: 16.5 hours Parameters ---------- description : str Task description that may contain estimated hours Returns ------- float Estimated hours if found, 0.0 otherwise """ if not description: return 0.0 import re # Look for the estimated hours line # Pattern matches: "⏱️ Estimated: 8 hours" or "⏱️ Estimated: 16.5 hours" pattern = r"⏱️ Estimated:\s*(\d+(?:\.\d+)?)\s*hours?" match = re.search(pattern, description) if match: try: return float(match.group(1)) except ValueError: return 0.0 return 0.0
[docs] async def add_comment(self, task_id: str, comment_text: str) -> None: """ Add a comment to a task. Parameters ---------- task_id : str ID of the task to comment on comment_text : str Text of the comment to add Examples -------- >>> await client.add_comment("card-123", "Task completed successfully") Notes ----- Comments are visible in the Planka UI and are timestamped automatically. """ server_params = StdioServerParameters( command="node", args=[self.kanban_mcp_path], env=os.environ.copy(), ) async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: await session.initialize() await session.call_tool( "mcp_kanban_comment_manager", {"action": "create", "cardId": task_id, "text": comment_text}, )
[docs] async def complete_task(self, task_id: str) -> None: """ Mark a task as completed by moving it to the Done list. Parameters ---------- task_id : str ID of the task to complete Examples -------- >>> await client.complete_task("card-123") Notes ----- The task is moved to the first list containing "done" or "completed" in its name (case-insensitive). """ await self._move_task_to_list(task_id, ["done", "completed"])
[docs] async def update_task_status(self, task_id: str, status: str) -> None: """ Update a task's status by moving it to the appropriate list. Parameters ---------- task_id : str ID of the task to update status : str New status (e.g., "blocked", "in_progress", "todo") Examples -------- >>> await client.update_task_status("card-123", "blocked") Notes ----- Status names are matched to list names containing the status keyword. For example, "blocked" matches any list with "blocked" in the name. """ status_to_keywords = { "blocked": ["on hold", "blocked"], # Try "on hold" first, then "blocked" "in_progress": ["progress", "in progress"], "todo": ["todo", "to do", "backlog"], "done": ["done", "completed"], } keywords = status_to_keywords.get(status.lower(), [status.lower()]) await self._move_task_to_list(task_id, keywords)
async def _move_task_to_list(self, task_id: str, list_keywords: List[str]) -> None: """ Move a task to a list matching one of the keywords. Parameters ---------- task_id : str ID of the task to move list_keywords : List[str] Keywords to match against list names (case-insensitive) Raises ------ RuntimeError If no matching list is found Notes ----- This is a helper method used by other status update methods. """ server_params = StdioServerParameters( command="node", args=[self.kanban_mcp_path], env=os.environ.copy(), ) async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: await session.initialize() # Get all lists lists_result = await session.call_tool( "mcp_kanban_list_manager", {"action": "get_all", "boardId": self.board_id}, ) if lists_result and hasattr(lists_result, "content"): first_content = cast(TextContent, lists_result.content[0]) lists_data = json.loads(first_content.text) lists = ( lists_data if isinstance(lists_data, list) else lists_data.get("items", []) ) # Find matching list target_list = None for lst in lists: list_name_lower = lst.get("name", "").lower() for keyword in list_keywords: if keyword in list_name_lower: target_list = lst break if target_list: break if target_list: # Move card await session.call_tool( "mcp_kanban_card_manager", { "action": "move", "id": task_id, "listId": target_list["id"], "position": 65535, # Default position at end of list }, ) else: raise RuntimeError( f"No list found matching keywords: {list_keywords}" )
[docs] async def auto_setup_project( self, project_name: str, board_name: str = "Main Board", project_root: str | None = None, ) -> Dict[str, str]: """ Automatically create a Planka project and board if they don't exist. This method will: 1. Create a new project in Planka 2. Create a new board in that project with default lists/labels 3. Save the IDs to .marcus_workspace.json 4. Load the IDs into memory Parameters ---------- project_name : str Name of the project to create board_name : str Name of the board to create (default: "Main Board") project_root : str | None, optional Absolute path to project implementation directory Returns ------- Dict[str, str] Dictionary with project_id and board_id Examples -------- >>> client = KanbanClient() >>> result = await client.auto_setup_project("My Project") >>> print(f"Project ID: {result['project_id']}") >>> print(f"Board ID: {result['board_id']}") """ server_params = StdioServerParameters( command="node", args=[self.kanban_mcp_path], env=os.environ.copy(), ) async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: await session.initialize() # Create project project_result = await session.call_tool( "mcp_kanban_project_board_manager", {"action": "create_project", "name": project_name}, ) if not project_result or not hasattr(project_result, "content"): raise RuntimeError("Failed to create project") first_content = cast(TextContent, project_result.content[0]) project_data = json.loads(first_content.text) project_id = project_data.get("id") if not project_id: raise RuntimeError("Project created but no ID returned") # Create board with default position board_result = await session.call_tool( "mcp_kanban_project_board_manager", { "action": "create_board", "projectId": project_id, "name": board_name, "position": 65535, }, ) if not board_result or not hasattr(board_result, "content"): raise RuntimeError("Failed to create board") first_content = cast(TextContent, board_result.content[0]) board_data = json.loads(first_content.text) board_id = board_data.get("id") if not board_id: raise RuntimeError("Board created but no ID returned") # Save to workspace file self._save_workspace_state( project_id=project_id, board_id=board_id, project_name=project_name, board_name=board_name, project_root=project_root, ) # Update instance variables self.project_id = project_id self.board_id = board_id return {"project_id": project_id, "board_id": board_id}
[docs] async def get_projects(self) -> List[Dict[str, Any]]: """ Get all projects from Planka. Returns ------- List[Dict[str, Any]] List of projects with their details including: - id: Project ID - name: Project name - boards: List of boards in the project Examples -------- >>> client = KanbanClient() >>> projects = await client.get_projects() >>> for project in projects: ... print(f"Project: {project['name']} (ID: {project['id']})") Notes ----- This method creates a new MCP session for the operation. Useful for discovering existing projects in Planka. """ server_params = StdioServerParameters( command="node", args=[self.kanban_mcp_path], env=os.environ.copy(), ) async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: await session.initialize() # Get all projects (with pagination) result = await session.call_tool( "mcp_kanban_project_board_manager", {"action": "get_projects", "page": 1, "perPage": 100}, ) if result and hasattr(result, "content") and result.content: first_content = cast(TextContent, result.content[0]) if not first_content.text or not first_content.text.strip(): return [] # No projects found — empty board projects_data = json.loads(first_content.text) # Handle both list and dict responses if isinstance(projects_data, list): return cast(List[Dict[str, Any]], projects_data) elif isinstance(projects_data, dict) and "items" in projects_data: return cast(List[Dict[str, Any]], projects_data["items"]) return []
[docs] async def get_boards_for_project(self, project_id: str) -> List[Dict[str, Any]]: """ Get all boards for a specific Planka project. Parameters ---------- project_id : str The Planka project ID Returns ------- List[Dict[str, Any]] List of boards for the project """ server_params = StdioServerParameters( command="node", args=[self.kanban_mcp_path], env=os.environ.copy(), ) async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: await session.initialize() # Get boards for project result = await session.call_tool( "mcp_kanban_project_board_manager", {"action": "get_boards", "projectId": project_id}, ) if result and hasattr(result, "content"): first_content = cast(TextContent, result.content[0]) boards_data = json.loads(first_content.text) # Handle both list and dict responses if isinstance(boards_data, list): return cast(List[Dict[str, Any]], boards_data) elif isinstance(boards_data, dict) and "items" in boards_data: return cast(List[Dict[str, Any]], boards_data["items"]) return []
def _save_workspace_state( self, project_id: str, board_id: str, project_name: str, board_name: str, project_root: str | None = None, ) -> None: """ Save project and board IDs to workspace state file. Parameters ---------- project_id : str The Planka project ID board_id : str The Planka board ID project_name : str Name of the project board_name : str Name of the board project_root : str | None, optional Absolute path to project implementation directory """ from pathlib import Path workspace_file = Path(".marcus_workspace.json") workspace_data = { "project_id": project_id, "board_id": board_id, "project_name": project_name, "board_name": board_name, "created_at": datetime.now(timezone.utc).isoformat(), "updated_at": datetime.now(timezone.utc).isoformat(), } # Add project_root if provided if project_root: workspace_data["project_root"] = project_root with open(workspace_file, "w") as f: json.dump(workspace_data, f, indent=2) logger.info(f"Saved workspace state to {workspace_file.absolute()}") def _load_workspace_state(self) -> Optional[Dict[str, str]]: """ Load project and board IDs from workspace state file. Returns ------- Optional[Dict[str, str]] Dictionary with project_id, board_id, and project_root if available """ from pathlib import Path workspace_file = Path(".marcus_workspace.json") if not workspace_file.exists(): return None try: with open(workspace_file, "r") as f: data = json.load(f) result = { "project_id": data.get("project_id"), "board_id": data.get("board_id"), } # Include project_root if present if "project_root" in data: result["project_root"] = data["project_root"] return result except Exception as e: logger.warning(f"Failed to load workspace state: {e}") return None