"""Base class for Natural Language task creation.
Provides shared functionality for create_project and add_feature tools.
"""
import logging
from abc import ABC, abstractmethod
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from src.core.models import Task
from src.core.task_graph_validator import TaskGraphValidator
from src.integrations.enhanced_task_classifier import EnhancedTaskClassifier
from src.integrations.nlp_task_utils import (
SafetyChecker,
TaskBuilder,
TaskType,
)
logger = logging.getLogger(__name__)
[docs]
class NaturalLanguageTaskCreator(ABC):
"""
Base class for natural language task creation tools.
Provides common functionality for:
- Task creation on kanban boards
- Safety checks and validation
- Task classification
- Error handling
"""
[docs]
def __init__(
self,
kanban_client: Any,
ai_engine: Any = None,
subtask_manager: Any = None,
complexity: str = "standard",
) -> None:
"""
Initialize the base task creator.
Parameters
----------
kanban_client : Any
Kanban board client with create_task method
ai_engine : Any, optional
Optional AI engine for enhanced processing
subtask_manager : Any, optional
Optional SubtaskManager for registering decomposed subtasks
complexity : str, default="standard"
Project complexity level: "prototype", "standard", "enterprise"
"""
self.kanban_client = kanban_client
self.ai_engine = ai_engine
self.subtask_manager = subtask_manager
self.complexity = complexity
self.task_classifier = EnhancedTaskClassifier()
self.task_builder = TaskBuilder()
self.safety_checker = SafetyChecker()
self.active_project_id: Optional[str] = None
[docs]
async def create_tasks_on_board(
self,
tasks: List[Task],
skip_validation: bool = False,
update_dependencies: bool = True,
) -> List[Task]:
"""
Create tasks on the kanban board.
This is the main shared functionality between create_project and add_feature.
Parameters
----------
tasks : List[Task]
List of tasks to create
skip_validation : bool
Skip dependency validation if True
update_dependencies : bool
Update task dependencies with new IDs after creation
Returns
-------
List[Task]
List of created tasks
Raises
------
RuntimeError
If kanban client doesn't support task creation
"""
# CRITICAL: Auto-fix task graph issues BEFORE committing to Kanban
# This fixes problems automatically rather than raising exceptions
if not skip_validation:
# Auto-fix task graph issues
fixed_tasks, user_warnings = TaskGraphValidator.validate_and_fix(tasks)
tasks = fixed_tasks # Use the fixed version
# Log user-friendly warnings
if user_warnings:
logger.warning(
f"Task graph auto-fixed: {len(user_warnings)} issues corrected"
)
for warning in user_warnings:
logger.info(f" • {warning}")
# Also run legacy safety checker (diagnostic warnings)
errors = self.safety_checker.validate_dependencies(tasks)
if errors:
logger.warning(f"Dependency validation errors: {errors}")
# Check if kanban client supports task creation
if not hasattr(self.kanban_client, "create_task"):
from src.core.error_framework import ErrorContext, KanbanIntegrationError
raise KanbanIntegrationError(
board_name=getattr(self.kanban_client, "board_id", "unknown"),
operation="task_creation_validation",
context=ErrorContext(
operation="create_tasks_on_board",
integration_name="natural_language_tools",
custom_context={
"client_type": type(self.kanban_client).__name__,
"details": (
f"Kanban client "
f"{type(self.kanban_client).__name__} does not "
f"support task creation. Expected "
f"KanbanClientWithCreate or compatible "
f"implementation. Current client type: "
f"{type(self.kanban_client).__module__}."
f"{type(self.kanban_client).__name__}"
),
},
),
)
created_tasks = []
failed_tasks = []
for task in tasks:
try:
# Build task data using utility
task_data = self.task_builder.build_task_data(task)
# Create task on board
logger.info(f"Creating task: {task.name}")
kanban_task = await self.kanban_client.create_task(task_data)
created_tasks.append(kanban_task)
# Store task metadata for Phase 1 analysis
try:
from pathlib import Path
from src.core.persistence import SQLitePersistence
# Use absolute path to database (relative to marcus root)
marcus_root = Path(__file__).parent.parent.parent
db_path = marcus_root / "data" / "marcus.db"
persistence = SQLitePersistence(db_path=db_path)
# kanban_task is a Task object, not a dict
task_id = kanban_task.id
if task_id:
await persistence.store(
"task_metadata",
str(task_id),
{
"task_id": str(task_id),
"name": task.name,
"description": task.description,
"priority": task_data.get("priority"),
"estimated_hours": task.estimated_hours,
"labels": task.labels,
"dependencies": task.dependencies,
"project_id": self.active_project_id,
"source_type": getattr(task, "source_type", None),
"created_at": datetime.now(timezone.utc).isoformat(),
},
)
except Exception as log_error:
logger.warning(
f"Failed to log task metadata for '{task.name}': {log_error}"
)
# Snapshot the kanban task's human name into costs.db so the
# Cato dashboard can render real names in "Tokens by task"
# instead of opaque hex IDs (Marcus #530). Lives in the
# shared task-creation method so BOTH create_project and
# add_feature flows are covered automatically — Codex P2 on
# PR #536 caught that the project-only snapshot left the
# feature-adder path silently dropping names.
#
# Failure is logged + swallowed: cost tracking is optional
# and must never block kanban task creation.
try:
from src.cost_tracking.cost_recorder import (
get_recorder as _cost_get_recorder,
)
_cost_store = _cost_get_recorder().store
if kanban_task.id and task.name:
_cost_store.record_task_name(str(kanban_task.id), task.name)
except Exception as _name_err: # pragma: no cover
logger.debug(
"Failed to snapshot task name for %s: %s",
kanban_task.id,
_name_err,
)
except Exception as e:
from src.core.error_framework import (
ErrorContext,
KanbanIntegrationError,
)
from src.core.error_monitoring import record_error_for_monitoring
# Log original exception BEFORE wrapping so the root-cause
# traceback is visible in production logs (issue #480).
# The wrapped KanbanIntegrationError message alone does not
# contain the original str(e) in any log line.
logger.error(
f"Task creation failed for '{task.name}': {e!r}",
exc_info=True,
)
# Create proper error with context
kanban_error = KanbanIntegrationError(
board_name=getattr(self.kanban_client, "board_id", "unknown"),
operation="individual_task_creation",
context=ErrorContext(
operation="create_tasks_on_board",
integration_name="natural_language_tools",
custom_context={
"task_name": task.name,
"task_type": getattr(task, "task_type", "unknown"),
"details": f"Failed to create task '{task.name}': {str(e)}",
},
),
)
# Record for monitoring but continue processing
record_error_for_monitoring(kanban_error)
logger.error(f"Failed to create task '{task.name}': {kanban_error}")
failed_tasks.append((task, str(kanban_error)))
# Continue with other tasks even if one fails
# Log summary
logger.info(
f"Task creation complete: {len(created_tasks)} succeeded, "
f"{len(failed_tasks)} failed"
)
if failed_tasks:
logger.error(f"Failed tasks: {[(t.name, e) for t, e in failed_tasks]}")
# Check if no tasks were created at all
if not created_tasks and tasks:
from src.core.error_framework import ErrorContext, KanbanIntegrationError
raise KanbanIntegrationError(
board_name=getattr(self.kanban_client, "board_id", "unknown"),
operation="batch_task_creation",
context=ErrorContext(
operation="create_tasks_on_board",
integration_name="natural_language_tools",
custom_context={
"total_tasks": len(tasks),
"failed_tasks": len(failed_tasks),
"details": (
f"Failed to create any of {len(tasks)} tasks. "
f"All task creation attempts failed. This "
f"indicates a fundamental issue with the kanban "
f"integration or board configuration."
),
},
),
)
# Remap dependencies from slug IDs to real UUIDs
if update_dependencies and created_tasks:
await self._remap_dependencies(tasks, created_tasks)
# After remap, update the task_metadata rows in marcus.db
# so Cato sees the real UUIDs (not the synthetic slug IDs
# that were written during the per-task creation loop
# above). Without this, Cato's dependency graph contains
# unresolvable slug references and no edges render.
await self._update_task_metadata_dependencies(created_tasks)
# Decompose tasks that meet criteria and add as checklist items
await self._decompose_and_add_subtasks(created_tasks, tasks)
# Wire cross-parent dependencies after decomposition completes
await self._wire_cross_parent_dependencies()
return created_tasks
async def _update_task_metadata_dependencies(
self,
created_tasks: List[Task],
) -> None:
"""Update task_metadata rows after dependency remap.
The per-task creation loop at ``create_tasks_on_board``
persists ``task_metadata`` with the pre-remap slug-based
dependency IDs. After ``_remap_dependencies`` converts those
to real kanban UUIDs, the in-memory ``created_tasks`` have
correct dependencies but the ``task_metadata`` rows in
marcus.db still contain the slugs. Cato reads
``task_metadata.dependencies`` for its graph so the slugs
produce unresolvable edges.
This helper re-reads each task_metadata row, patches the
``dependencies`` field with the remapped values, and writes
it back. Best-effort — persistence errors are logged but
don't fail task creation.
Parameters
----------
created_tasks : List[Task]
Tasks after dependency remap (post-``_remap_dependencies``).
"""
# Setup persistence once. A failure here (can't import, can't
# open db) skips the whole pass — no per-task recovery is
# possible without the persistence client. Any successful
# setup proceeds to the per-task loop where errors are
# isolated so one bad row doesn't block the rest.
try:
from pathlib import Path
from src.core.persistence import SQLitePersistence
marcus_root = Path(__file__).parent.parent.parent
db_path = marcus_root / "data" / "marcus.db"
persistence = SQLitePersistence(db_path=db_path)
except Exception as e:
logger.warning(
f"Failed to initialize persistence for task_metadata "
f"dependency update (Cato edges may not render): {e}"
)
return
def _get(obj: Any, attr: str, default: Any = None) -> Any:
if hasattr(obj, attr):
return getattr(obj, attr)
if isinstance(obj, dict):
return obj.get(attr, default)
return default
updated = 0
failed = 0
for task in created_tasks:
# Isolate per-task errors so one malformed row or
# transient SQLite error doesn't abort updates for the
# rest of the batch (Codex P2 on PR #340).
try:
task_id = _get(task, "id", "")
if not task_id:
continue
new_deps = list(_get(task, "dependencies", []) or [])
# Read existing metadata, patch deps, write back.
existing = await persistence.retrieve("task_metadata", str(task_id))
if existing is None:
# No metadata row for this task — nothing to update.
continue
existing["dependencies"] = new_deps
await persistence.store("task_metadata", str(task_id), existing)
updated += 1
except Exception as e:
failed += 1
logger.warning(
f"Failed to update task_metadata dependencies "
f"for task {_get(task, 'id', '?')} "
f"({_get(task, 'name', '?')}): {e}"
)
if updated or failed:
logger.info(
f"task_metadata dependency update: "
f"{updated} succeeded, {failed} failed"
)
async def _remap_dependencies(
self,
original_tasks: List[Task],
created_tasks: List[Task],
) -> None:
"""Remap dependencies from slug IDs to real kanban UUIDs.
After task creation, dependencies still reference the original
slug-style IDs (e.g. ``design_project_domain``). This method
builds a slug-to-UUID mapping and updates both the in-memory
Task objects and the kanban board storage.
Parameters
----------
original_tasks : List[Task]
Tasks as generated by NLP (with original slug IDs).
created_tasks : List[Task]
Tasks as created on the kanban board (with real UUIDs).
"""
# Build mapping: original_id / slug → real UUID
# Use name-based matching instead of positional zip, since
# failed task creations can shift positions.
# Helper to access fields on Task objects or dicts
def _get(obj: Any, attr: str, default: Any = None) -> Any:
if hasattr(obj, attr):
return getattr(obj, attr)
if isinstance(obj, dict):
return obj.get(attr, default)
return default
def _set(obj: Any, attr: str, value: Any) -> None:
if hasattr(obj, attr):
setattr(obj, attr, value)
elif isinstance(obj, dict):
obj[attr] = value
slug_to_uuid: Dict[str, str] = {}
orig_by_name: Dict[str, Any] = {_get(t, "name", ""): t for t in original_tasks}
for created in created_tasks:
c_name = _get(created, "name", "")
c_id = _get(created, "id", "")
orig = orig_by_name.get(c_name)
orig_id_val = _get(orig, "id", "") if orig else ""
if orig and orig_id_val and c_id:
slug_to_uuid[str(orig_id_val)] = str(c_id)
# Also map by original_id if stored
orig_id = _get(created, "original_id")
if orig_id and c_id:
slug_to_uuid[str(orig_id)] = str(c_id)
if not slug_to_uuid:
return
logger.info(
f"Remapping dependencies: {len(slug_to_uuid)} " f"slug-to-UUID entries"
)
# Remap dependencies on in-memory tasks
all_ids = {_get(t, "id", "") for t in created_tasks}
for task in created_tasks:
deps = _get(task, "dependencies", [])
if not deps:
continue
remapped: List[str] = []
for dep_id in deps:
if dep_id in slug_to_uuid:
remapped.append(slug_to_uuid[dep_id])
elif dep_id in all_ids:
remapped.append(dep_id)
else:
logger.warning(
f"Orphaned dependency '{dep_id}' "
f"on task '{_get(task, 'name', '?')}'"
f" — skipping"
)
_set(task, "dependencies", remapped)
# Persist remapped dependencies to kanban board
# SQLiteKanban stores deps in task_dependencies table
if hasattr(self.kanban_client, "_with_connection"):
# SQLite provider — update junction table directly
import sqlite3
def _update_deps(conn: sqlite3.Connection) -> None:
for task in created_tasks:
tid = _get(task, "id", "")
deps = _get(task, "dependencies", [])
conn.execute(
"DELETE FROM task_dependencies " "WHERE task_id = ?",
(tid,),
)
for dep_id in deps:
conn.execute(
"INSERT OR IGNORE INTO "
"task_dependencies "
"(task_id, depends_on_id) "
"VALUES (?, ?)",
(tid, dep_id),
)
conn.commit()
try:
await self.kanban_client._run_in_executor(
lambda: self.kanban_client._with_connection(_update_deps)
)
logger.info("Persisted remapped dependencies to SQLite")
except Exception as e:
logger.warning(f"Failed to persist remapped deps: {e}")
async def _decompose_and_add_subtasks(
self, created_tasks: List[Task], original_tasks: List[Task]
) -> None:
"""
Decompose tasks that meet criteria and add subtasks as Planka checklist items.
Uses parallel AI calls for performance - all task decompositions are
executed concurrently instead of sequentially.
Parameters
----------
created_tasks : List[Task]
Tasks that were created on the Kanban board
original_tasks : List[Task]
Original task objects with estimated_hours
"""
# Skip decomposition if no AI engine is available
if not self.ai_engine:
logger.warning(
"⚠️ No AI engine available for task decomposition - skipping. "
"Subtasks will not be created!"
)
return
# Log decomposition context
logger.info(
f"🔍 Task decomposition started: {len(created_tasks)} tasks, "
f"SubtaskManager available: {self.subtask_manager is not None}, "
f"Complexity: {self.complexity}"
)
# Note: AI engine now uses LLMAbstraction which automatically
# checks provider availability and supports local models (Ollama),
# Anthropic, and OpenAI. If no provider is configured,
# decompose_task will raise a clear error message.
import asyncio
from src.marcus_mcp.coordinator import decompose_task, should_decompose
# Create mapping of task names to original tasks (for estimated_hours)
task_map = {task.name: task for task in original_tasks}
# Collect all tasks that need decomposition for parallel execution
decomposition_jobs = []
task_metadata = [] # Track (created_task, original_task) pairs
for created_task in created_tasks:
# Get original task to check estimated hours
original_task = task_map.get(created_task.name)
if not original_task:
continue
# Check if task should be decomposed
# Pass complexity for prototype mode check
if not should_decompose(original_task, project_complexity=self.complexity):
continue
logger.info(
f"Queueing task '{created_task.name}' for decomposition "
f"({original_task.estimated_hours}h)"
)
# Add decomposition job to parallel execution list
# CRITICAL: Pass created_task which has the real Planka ID,
# not original_task. We need to create a task object that has
# both the real ID and the original details
task_with_real_id = Task(
id=created_task.id, # Real Planka ID
name=original_task.name,
description=original_task.description,
status=original_task.status,
priority=original_task.priority,
assigned_to=original_task.assigned_to,
created_at=original_task.created_at,
updated_at=original_task.updated_at,
due_date=original_task.due_date,
estimated_hours=original_task.estimated_hours,
actual_hours=original_task.actual_hours,
dependencies=original_task.dependencies,
labels=original_task.labels,
project_id=getattr(original_task, "project_id", None),
project_name=getattr(original_task, "project_name", None),
# #557 Fix 1: carry the parent's acceptance criteria into
# decomposition. For contract_first these were generated by
# decompose_by_contract with the domain contract in hand, so
# subtask criteria can be derived FROM them instead of being
# re-invented blind (which risks drift from the contract).
acceptance_criteria=getattr(original_task, "acceptance_criteria", []),
completion_criteria=getattr(original_task, "completion_criteria", None),
)
# Pass complexity through project_context for time budgets and validation
project_context = {"complexity": self.complexity}
decomposition_jobs.append(
decompose_task(
task_with_real_id, self.ai_engine, project_context=project_context
)
)
task_metadata.append((created_task, original_task))
# Execute all decompositions in parallel
if not decomposition_jobs:
logger.debug("No tasks require decomposition")
return
logger.info(f"Decomposing {len(decomposition_jobs)} tasks in parallel...")
decomposition_results = await asyncio.gather(
*decomposition_jobs, return_exceptions=True
)
# Process results and add checklist items
successful_count = 0
failed_count = 0
for idx, result in enumerate(decomposition_results):
created_task, original_task = task_metadata[idx]
try:
# Handle exceptions from gather (both Exception and BaseException)
if isinstance(result, BaseException):
failed_count += 1
logger.error(
(
f"Decomposition failed for task "
f"'{created_task.name}': {result}"
),
exc_info=result if isinstance(result, Exception) else None,
)
continue
# Handle failed decomposition responses
if not result.get("success"):
failed_count += 1
logger.warning(
f"Failed to decompose task '{created_task.name}': "
f"{result.get('error')}"
)
continue
# Successfully decomposed - add subtasks
subtasks = result.get("subtasks", [])
shared_conventions = result.get("shared_conventions", {})
num_subtasks = len(subtasks)
logger.info(
f"Task '{created_task.name}' decomposed into "
f"{num_subtasks} subtasks"
)
# Register subtasks with SubtaskManager (GH-62 fix)
if self.subtask_manager:
from src.marcus_mcp.coordinator.subtask_manager import (
SubtaskMetadata,
)
metadata = SubtaskMetadata(
shared_conventions=shared_conventions,
decomposed_by="ai",
)
self.subtask_manager.add_subtasks(
parent_task_id=created_task.id,
subtasks=subtasks,
metadata=metadata,
)
logger.info(
f"Registered {num_subtasks} subtasks with SubtaskManager "
f"for task '{created_task.name}'"
)
else:
logger.warning(
"SubtaskManager not available - subtasks will only exist as "
"checklist items (GH-62)"
)
# Add subtasks as checklist items in Planka. Skip for any
# other provider — SQLite, GitHub, and Linear all manage
# subtasks via SubtaskManager (already done above), and
# spawning the kanban-mcp Node service for non-Planka
# providers either errors out (Planka env vars missing)
# or fails silently for first-time users without
# kanban-mcp installed at all.
if self._is_planka_provider():
await self._add_subtasks_as_checklist(created_task.id, subtasks)
successful_count += 1
except Exception as e:
failed_count += 1
logger.error(
(
f"Error processing decomposition for task "
f"'{created_task.name}': {e}"
),
exc_info=True,
)
# Continue with other tasks even if processing fails
# Log summary
logger.info(
f"Task decomposition complete: {successful_count} succeeded, "
f"{failed_count} failed"
)
async def _wire_cross_parent_dependencies(self) -> None:
"""
Wire cross-parent dependencies after decomposition completes.
This is called after all tasks have been decomposed to create
fine-grained dependencies between subtasks of different parent tasks.
"""
# Skip if no subtask manager or AI engine
if not self.subtask_manager or not self.ai_engine:
logger.debug(
"Skipping cross-parent dependency wiring - "
"required components not available"
)
return
# Note: Cross-parent dependency wiring is implemented in
# server.refresh_project_state() where we have access to unified
# project_tasks storage
logger.info(
"Cross-parent dependency wiring will be performed "
"during project state refresh"
)
def _is_planka_provider(self) -> bool:
"""Return True if the wired kanban client is the Planka provider.
Used to gate Planka-specific code paths (notably
``_add_subtasks_as_checklist``, which spawns the ``kanban-mcp``
Node service to create checklist items on a Planka card).
The check is robust to mocks and partial imports: it inspects
the client's ``provider`` attribute (set by every concrete
``KanbanInterface`` subclass) and falls back to a string match
on the class name when ``provider`` is absent.
Returns
-------
bool
True iff the kanban client is a Planka provider.
"""
provider = getattr(self.kanban_client, "provider", None)
if provider is not None:
# KanbanProvider enum values are strings under the hood;
# accept either the enum or its string equivalent.
provider_str = getattr(provider, "value", str(provider))
if str(provider_str).lower() == "planka":
return True
# Fallback when provider attribute isn't set (e.g. partial mocks)
return "Planka" in type(self.kanban_client).__name__
async def _add_subtasks_as_checklist(
self, parent_card_id: str, subtasks: List[Dict[str, Any]]
) -> None:
"""
Add subtasks as checklist items (tasks) in Planka.
Parameters
----------
parent_card_id : str
ID of the parent card in Planka
subtasks : List[Dict[str, Any]]
List of subtask definitions from decomposition
"""
try:
import os
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
# Use same server params as PlankaKanban
# Use local path for kanban-mcp
kanban_mcp_path = os.path.expanduser("~/dev/kanban-mcp/dist/index.js")
server_params = StdioServerParameters(
command="node",
args=[kanban_mcp_path],
env=os.environ.copy(),
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# Create checklist item for each subtask
for idx, subtask in enumerate(subtasks):
subtask_name = subtask.get("name", f"Subtask {idx + 1}")
# Create task (checklist item) in Planka
result = await session.call_tool(
"mcp_kanban_task_manager",
{
"action": "create",
"cardId": parent_card_id,
"name": subtask_name,
"position": (idx + 1) * 65535,
},
)
if result:
logger.info(
f"Created checklist item '{subtask_name}' "
f"on card {parent_card_id}"
)
num_added = len(subtasks)
logger.info(
f"Added {num_added} subtasks as checklist items "
f"to card {parent_card_id}"
)
except Exception as e:
logger.error(
f"Error adding subtasks as checklist items to card "
f"{parent_card_id}: {e}",
exc_info=True,
)
[docs]
async def apply_safety_checks(self, tasks: List[Task]) -> List[Task]:
"""
Apply safety checks to ensure logical task ordering.
This method can be overridden by subclasses for custom safety logic.
Parameters
----------
tasks : List[Task]
List of tasks to check
Returns
-------
List[Task]
List of tasks with updated dependencies
"""
# Import phase dependency enforcer
from src.core.phase_dependency_enforcer import PhaseDependencyEnforcer
# First apply phase-based dependencies for proper ordering
phase_enforcer = PhaseDependencyEnforcer()
tasks = phase_enforcer.enforce_phase_dependencies(tasks)
# Then apply legacy safety checks for additional constraints
# These may add extra dependencies but won't conflict with phase ordering
# Apply implementation dependencies (implementation depends on design)
tasks = self.safety_checker.apply_implementation_dependencies(tasks)
# Apply testing dependencies (testing depends on implementation)
tasks = self.safety_checker.apply_testing_dependencies(tasks)
# Apply deployment dependencies (deployment depends on everything)
tasks = self.safety_checker.apply_deployment_dependencies(tasks)
return tasks
[docs]
def classify_tasks(self, tasks: List[Task]) -> Dict[TaskType, List[Task]]:
"""
Classify tasks by their type.
Parameters
----------
tasks : List[Task]
List of tasks to classify
Returns
-------
Dict[TaskType, List[Task]]
Dictionary mapping task types to lists of tasks
"""
classified: Dict[TaskType, List[Task]] = {
task_type: [] for task_type in list(TaskType)
}
for task in tasks:
task_type = self.task_classifier.classify(task)
classified[task_type].append(task)
return classified
[docs]
def classify_tasks_with_details(
self, tasks: List[Task]
) -> Dict[str, Dict[str, Any]]:
"""
Classify tasks and return detailed classification info per task.
Parameters
----------
tasks : List[Task]
List of tasks to classify
Returns
-------
Dict[str, Dict[str, Any]]
Dictionary mapping task IDs to classification details
"""
from src.integrations.enhanced_task_classifier import EnhancedTaskClassifier
classifier = EnhancedTaskClassifier()
results = {}
for task in tasks:
result = classifier.classify_with_confidence(task)
results[task.id] = {
"type": result.task_type.value,
"confidence": result.confidence,
"reasoning": result.reasoning,
}
return results
[docs]
def get_tasks_by_type(self, tasks: List[Task], task_type: TaskType) -> List[Any]:
"""Get all tasks of a specific type."""
return self.task_classifier.filter_by_type(tasks, task_type)
[docs]
def is_deployment_task(self, task: Task) -> Any:
"""Check if task is deployment-related."""
return self.task_classifier.is_type(task, TaskType.DEPLOYMENT)
[docs]
def is_implementation_task(self, task: Task) -> Any:
"""Check if task is implementation-related."""
return self.task_classifier.is_type(task, TaskType.IMPLEMENTATION)
[docs]
def is_testing_task(self, task: Task) -> Any:
"""Check if task is testing-related."""
return self.task_classifier.is_type(task, TaskType.TESTING)
[docs]
@abstractmethod
async def process_natural_language(
self, description: str, **kwargs: Any
) -> List[Task]:
"""
Process natural language description into tasks.
This method must be implemented by subclasses.
Parameters
----------
description : str
Natural language description
**kwargs : Any
Additional parameters specific to the implementation
Returns
-------
List[Task]
List of generated tasks
"""
pass
[docs]
async def create_from_description(
self, description: str, apply_safety: bool = True, **kwargs: Any
) -> Dict[str, Any]:
"""
Create tasks from natural language description.
Parameters
----------
description : str
Natural language description
apply_safety : bool
Whether to apply safety checks
**kwargs : Any
Additional parameters for processing
Returns
-------
Dict[str, Any]
Dictionary with creation results
"""
try:
# Process natural language into tasks
tasks = await self.process_natural_language(description, **kwargs)
# Apply safety checks if requested
if apply_safety:
tasks = await self.apply_safety_checks(tasks)
# Create tasks on board
created_tasks = await self.create_tasks_on_board(tasks)
# Build result
result = {
"success": True,
"tasks_created": len(created_tasks),
"tasks": [
{
"id": task.id,
"name": task.name,
"type": self.task_classifier.classify(task).value,
}
for task in created_tasks
],
"task_types": {
task_type.value: len(
self.get_tasks_by_type(created_tasks, task_type)
)
for task_type in list(TaskType)
},
}
return result
except Exception as e:
logger.error(f"Error in create_from_description: {str(e)}")
return {"success": False, "error": str(e), "tasks_created": 0}