"""Context Tools for Marcus MCP.
This module contains tools for context management:
- log_decision: Log architectural decisions
- get_task_context: Get context for a specific task
"""
import logging
from typing import Any, Dict, List, Set
from src.core.project_history import Decision as HistoryDecision
from src.core.project_history import (
ProjectHistoryPersistence,
)
from src.logging.agent_events import log_agent_event
logger = logging.getLogger(__name__)
[docs]
async def log_decision(
agent_id: str, task_id: str, decision: str, state: Any
) -> Dict[str, Any]:
"""
Log an architectural decision made during task implementation.
Agents use this to document important technical choices that might
affect other tasks. Decisions are automatically cross-referenced
to dependent tasks.
Parameters
----------
agent_id : str
The agent making the decision
task_id : str
Current task ID
decision : str
Natural language description of the decision
state : Any
Marcus server state instance
Returns
-------
Dict[str, Any]
Dict with success status and decision details
"""
try:
# Check if Context system is available
if not hasattr(state, "context") or not state.context:
return {"success": False, "error": "Context system not enabled"}
# Parse decision from natural language
# Expected format: "I chose X because Y. This affects Z."
parts = decision.split(".", 2)
what = decision # Default to full decision
why = "Not specified"
impact = "May affect dependent tasks"
# Try to parse structured format
if len(parts) >= 1 and "because" in parts[0]:
what_parts = parts[0].split("because", 1)
what = what_parts[0].strip()
if len(what_parts) > 1:
why = what_parts[1].strip()
if len(parts) >= 2 and any(
word in parts[1].lower() for word in ["affect", "impact", "require"]
):
impact = parts[1].strip()
# Log the decision
logged_decision = await state.context.log_decision(
agent_id=agent_id, task_id=task_id, what=what, why=why, impact=impact
)
# Add comment to task if kanban is available
if state.kanban_client:
try:
comment = f"🏗️ ARCHITECTURAL DECISION by {agent_id}\\n"
comment += f"Decision: {what}\\n"
comment += f"Reasoning: {why}\\n"
comment += f"Impact: {impact}"
await state.kanban_client.add_comment(task_id, comment)
except Exception as e:
# Don't fail if kanban comment fails - decision is still logged
logger.warning(f"Failed to add kanban comment for decision: {e}")
# Log event (non-blocking)
if hasattr(state, "events") and state.events:
await state.events.publish_nowait(
"decision_logged", agent_id, logged_decision.to_dict()
)
# Record in active experiment if one is running
from src.experiments.live_experiment_monitor import get_active_monitor
monitor = get_active_monitor()
if monitor and monitor.is_running:
monitor.record_decision(
agent_id=agent_id, task_id=task_id, decision=decision
)
# Persist to project history for post-project analysis
await _persist_decision_to_history(logged_decision, state)
return {
"success": True,
"decision_id": logged_decision.decision_id,
"message": "Decision logged and cross-referenced to dependent tasks",
}
except Exception as e:
return {"success": False, "error": str(e)}
[docs]
async def get_task_context(task_id: str, state: Any) -> Dict[str, Any]:
"""
Get the full context for a specific task.
This is useful for agents who want to understand the broader
context of their work or review decisions made on dependencies.
For subtasks, includes parent task context and shared conventions.
Parameters
----------
task_id : str
The task to get context for (can be regular task or subtask ID)
state : Any
Marcus server state instance
Returns
-------
Dict[str, Any]
Dict with task context including implementations, dependencies, and
decisions. For subtasks, includes parent_task, shared_conventions,
and dependency_artifacts.
"""
# Log to agent events — proof the agent called get_task_context
# Resolve agent_id: try authenticated client first, then task assignment
agent_id = getattr(state, "_current_client_id", None)
if not agent_id and hasattr(state, "project_tasks"):
for t in state.project_tasks:
if t.id == task_id and t.assigned_to:
agent_id = t.assigned_to
break
agent_id = agent_id or "unknown"
project_name = getattr(state, "current_project_name", None) or "unknown"
log_agent_event(
"context_requested",
{
"agent_id": agent_id,
"task_id": task_id,
"project": project_name,
},
)
try:
# Check if this is a subtask
if hasattr(state, "subtask_manager") and state.subtask_manager:
# Check if task_id is a subtask
if task_id in state.subtask_manager.subtasks:
# Get subtask context
subtask_context = state.subtask_manager.get_subtask_context(task_id)
# Get parent task context
parent_task_id = subtask_context["parent_task_id"]
parent_task = None
for t in state.project_tasks:
if t.id == parent_task_id:
parent_task = t
break
# Build enriched context
context_dict = {
"is_subtask": True,
"subtask_info": subtask_context["subtask"],
"parent_task": (
{
"id": parent_task_id,
"name": parent_task.name if parent_task else "Unknown",
"description": (
parent_task.description if parent_task else ""
),
"labels": parent_task.labels if parent_task else [],
}
if parent_task
else {"id": parent_task_id}
),
"shared_conventions": subtask_context["shared_conventions"],
"dependency_artifacts": subtask_context["dependency_artifacts"],
"sibling_subtasks": subtask_context["sibling_subtasks"],
}
# CRITICAL: Add artifacts and decisions from sibling subtasks
# This allows subtasks to see what their siblings have produced
# Optimized: Single pass through siblings collecting both
sibling_context = await _collect_sibling_subtask_context(
task_id, parent_task_id, state
)
context_dict["sibling_artifacts"] = sibling_context["artifacts"]
context_dict["sibling_decisions"] = sibling_context["decisions"]
# #595 Fix 2: the foundation contract is project-global —
# every task, including subtasks, receives it regardless
# of dependency-graph position.
context_dict["project_contract"] = await _collect_foundation_contract(
state
)
# Add parent task's context if Context system is available
if hasattr(state, "context") and state.context and parent_task:
parent_context = await state.context.get_context(
parent_task_id, parent_task.dependencies or []
)
context_dict["parent_context"] = parent_context.to_dict()
return {"success": True, "context": context_dict}
# Standard task context (not a subtask)
# Check if Context system is available
if not hasattr(state, "context") or not state.context:
return {"success": False, "error": "Context system not enabled"}
# Find the task
task = None
for t in state.project_tasks:
if t.id == task_id:
task = t
break
if not task:
return {"success": False, "error": f"Task {task_id} not found"}
# Get context
context = await state.context.get_context(task_id, task.dependencies or [])
context_dict = context.to_dict()
context_dict["is_subtask"] = False
# Add artifact information
artifacts = await _collect_task_artifacts(task_id, task, state)
context_dict["artifacts"] = artifacts
# #595 Fix 2: the foundation contract is project-global. Unlike
# `artifacts` (scoped to direct dependencies), it is returned to
# every task regardless of dependency-graph position.
context_dict["project_contract"] = await _collect_foundation_contract(state)
# Add recovery information if present
if task.recovery_info:
context_dict["recovery_info"] = task.recovery_info.to_dict()
return {"success": True, "context": context_dict}
except Exception as e:
return {"success": False, "error": str(e)}
# Issue #605: context carries architectural artifacts only — never source
# code. These are the artifact types whose canonical home is under ``docs/``
# (see ``ARTIFACT_PATHS`` in ``attachment.py``). Any artifact whose
# ``artifact_type`` is not in this set (e.g. ``temporary`` or a custom type
# pointing at real code) is excluded from delivered context.
ARCHITECTURAL_ARTIFACT_TYPES = frozenset(
{
"specification",
"api",
"design",
"architecture",
"documentation",
"reference",
}
)
def _is_architectural_artifact(artifact: Dict[str, Any]) -> bool:
"""
Return ``True`` when an artifact is architectural (safe to deliver).
Issue #605: delivered context carries decisions and architectural
artifacts only, never source code. An artifact qualifies when its
``artifact_type`` is one of :data:`ARCHITECTURAL_ARTIFACT_TYPES`.
Artifacts with no ``artifact_type`` are treated as architectural —
Kanban attachments, for example, are stamped ``artifact_type:
"reference"`` already, and missing the field should not silently
drop a doc.
Parameters
----------
artifact : Dict[str, Any]
An artifact dict from ``state.task_artifacts`` or a Kanban
attachment record.
Returns
-------
bool
``True`` when the artifact may be delivered as context.
"""
artifact_type = artifact.get("artifact_type")
if artifact_type is None:
return True
return artifact_type in ARCHITECTURAL_ARTIFACT_TYPES
_COORDINATION_REFERENCE_GUIDANCE = (
"COORDINATION REFERENCE: This artifact defines the interface boundary "
"between domains. Build the complete, working feature implementation. "
"Treat this as a constraint on what your code must expose at integration "
"points — not as your implementation spec."
)
_IMPLEMENTATION_GUIDE_GUIDANCE = (
"IMPLEMENTATION GUIDE: This artifact provides data shapes and design "
"patterns. Use it to understand the expected structure and patterns — "
"adapt as needed for your complete feature."
)
_FOUNDATION_USAGE_GUIDANCE = (
"SHARED FOUNDATION: This artifact was produced by a shared setup task "
"that completed before parallel work began. Use it directly — import it, "
"consume its exports, and extend it if needed. Do not recreate or "
"duplicate what it already provides."
)
def _inject_usage_guidance(
artifact: Dict[str, Any],
is_design_dep: bool,
is_contract_first_ghost: bool,
) -> None:
"""
Inject usage_guidance into a dependency artifact dict in-place.
Priority (highest to lowest):
1. ``artifact_role`` field — role-aware guidance (Option C).
2. ``is_design_dep`` label-based fallback for feature_based design
artifacts that predate the ``artifact_role`` field (Option B).
Foundation (pre-fork synthesis) artifacts are no longer handled here.
They are delivered project-globally via ``project_contract`` and
carry ``_FOUNDATION_USAGE_GUIDANCE`` from :func:`_collect_foundation_contract`
(issue #595 Fix 2).
Parameters
----------
artifact : Dict[str, Any]
Artifact dict from ``state.task_artifacts``. Modified in-place.
is_design_dep : bool
True when the dependency task has ``"design"`` in its labels.
is_contract_first_ghost : bool
True when the dependency task also has ``"auto_completed"`` in its
labels (contract_first ghost tasks). These already get framing from
``build_tiered_instructions`` and must not be overridden here.
"""
role = artifact.get("artifact_role")
if role == "interface_contract":
artifact.setdefault("usage_guidance", _COORDINATION_REFERENCE_GUIDANCE)
elif role == "implementation_spec":
artifact.setdefault("usage_guidance", _IMPLEMENTATION_GUIDE_GUIDANCE)
elif is_design_dep and not is_contract_first_ghost:
# Option B label-based fallback: feature_based design artifacts
artifact.setdefault("usage_guidance", _COORDINATION_REFERENCE_GUIDANCE)
async def _collect_foundation_contract(
state: Any,
) -> Dict[str, List[Dict[str, Any]]]:
"""
Collect the project-global foundation contract (issue #595 Fix 2).
The foundation (pre-fork synthesis) tasks establish the shared
technical contract — language, build config, test harness, public
API surface — that every task must build against, regardless of its
position in the dependency graph. Unlike ordinary task artifacts,
which :func:`_collect_task_artifacts` scopes to a task's *direct*
dependencies, this contract is project-global and is returned to
every task by :func:`get_task_context`.
Foundation tasks are identified by
``source_type == "pre_fork_synthesis"`` — the exact, Marcus-set
marker. This deliberately excludes domain-specific design artifacts,
whose 1-hop dependency scoping is correct.
Sources of artifacts (Codex P1 on PR #622):
1. ``state.task_artifacts[fid]`` — artifacts logged via
:func:`log_artifact`.
2. Kanban-board attachments on each foundation task — fetched via
``state.kanban_client.get_attachments(card_id=...)``. Without
this second source, foundation documents attached to the board
directly (not logged through ``log_artifact``) would be silently
dropped, because the dependency-traversal path also skips
``pre_fork_synthesis`` tasks.
All artifacts from both sources are filtered through
:func:`_is_architectural_artifact` (Codex P2 on PR #622). The
project_contract is broadcast to every task, so non-architectural
artifacts (temporary or code-like) would be project-wide noise and
cross the bright line by leaking implementation HOW into a
coordination channel. Other tiers (dependency / transitive) already
enforce this filter; foundation now matches.
Parameters
----------
state : Any
Marcus server state. ``project_tasks``, ``task_artifacts``,
``kanban_client`` and ``context`` are each consulted defensively
and treated as optional, so the helper degrades to empty results
rather than raising when a subsystem is unavailable.
Returns
-------
Dict[str, List[Dict[str, Any]]]
``{"artifacts": [...], "decisions": [...]}`` — the union of
architectural artifacts and decisions produced by every
foundation task in the project. Both lists are empty when there
is no foundation work or the backing state is unavailable.
"""
foundation_tasks = [
t
for t in (getattr(state, "project_tasks", None) or [])
if getattr(t, "source_type", None) == "pre_fork_synthesis"
]
if not foundation_tasks:
return {"artifacts": [], "decisions": []}
foundation_task_ids = {t.id for t in foundation_tasks}
artifacts: List[Dict[str, Any]] = []
task_artifacts = getattr(state, "task_artifacts", None) or {}
# Source 1: artifacts logged via log_artifact, filtered to
# architectural types only.
for fid in foundation_task_ids:
for raw in task_artifacts.get(fid, []):
if not _is_architectural_artifact(raw):
continue
artifact = dict(raw)
artifact.setdefault("usage_guidance", _FOUNDATION_USAGE_GUIDANCE)
artifacts.append(artifact)
# Source 2: Kanban-board attachments on each foundation task.
# Without this, foundation documents attached to the board (not
# logged through log_artifact) are silently dropped (Codex P1 on
# PR #622). Kanban attachments are stamped
# ``artifact_type: "reference"`` which is architectural, so they
# pass the filter — but apply the filter anyway for symmetry with
# the logged-artifact path and to future-proof against attachments
# being stamped with a non-architectural type.
#
# Interface contract per ``src/integrations/kanban_interface.py``:
# ``get_attachments(task_id: str)`` returns
# ``{"success": bool, "data": [{"id", "filename", "url",
# "content_type", "size", "created_at", "created_by"}, ...]}``.
# The keys are normalized; do NOT use Planka-style raw keys
# (``name`` / ``userId`` / ``createdAt``) — they would all read
# as ``None`` against the canonical provider output (Codex P2
# follow-up on PR #623). The same bug exists in
# ``_collect_task_artifacts`` and is tracked separately; this
# fix is foundation-collector only.
kanban_client = getattr(state, "kanban_client", None)
if kanban_client is not None:
for t in foundation_tasks:
try:
result = await kanban_client.get_attachments(task_id=t.id)
except Exception as exc: # noqa: BLE001
# Don't fail the whole context-delivery path if the
# kanban backend is unavailable or transient-errors.
logger.warning(
"Foundation contract: failed to fetch attachments "
"for task %s: %s",
t.id,
exc,
)
continue
if not result.get("success", False):
continue
for attachment in result.get("data") or []:
filename = attachment.get("filename")
# Prefer the provider's canonical ``url`` (real
# filepath / served URL); fall back to a synthetic
# ``./attachments/<id>/<filename>`` only when the
# provider didn't populate ``url``.
location = attachment.get("url") or (
f"./attachments/{attachment.get('id')}/{filename}"
if filename
else None
)
attach_artifact: Dict[str, Any] = {
"filename": filename,
"location": location,
"storage_type": "attachment",
"artifact_type": "reference",
"created_by": attachment.get("created_by"),
"created_at": attachment.get("created_at"),
"description": (f"Foundation attachment from task {t.id}"),
"usage_guidance": _FOUNDATION_USAGE_GUIDANCE,
}
if _is_architectural_artifact(attach_artifact):
artifacts.append(attach_artifact)
decisions: List[Dict[str, Any]] = []
context = getattr(state, "context", None)
if context is not None:
for decision in getattr(context, "decisions", None) or []:
if getattr(decision, "task_id", None) in foundation_task_ids:
decisions.append(decision.to_dict())
return {"artifacts": artifacts, "decisions": decisions}
async def _collect_task_artifacts(
task_id: str, task: Any, state: Any
) -> List[Dict[str, Any]]:
"""
Collect all artifacts available for this task from tracked sources.
Returns artifacts from:
1. Artifacts logged via log_artifact for this task (in state.task_artifacts)
2. Kanban attachments for this task
3. Artifacts from dependency tasks (both logged and attached)
Does NOT scan filesystem - only returns explicitly tracked artifacts.
"""
artifacts = []
try:
# 1. Get artifacts logged via log_artifact
if hasattr(state, "task_artifacts") and task_id in state.task_artifacts:
artifacts.extend(state.task_artifacts[task_id].copy())
# 2. Get Kanban attachments for this task
if state.kanban_client:
try:
card_id = getattr(task, "kanban_card_id", None) or task.id
result = await state.kanban_client.get_attachments(card_id=card_id)
if result.get("success", False):
attachments = result.get("data", [])
for attachment in attachments:
artifacts.append(
{
"filename": attachment.get("name"),
"location": (
f"./attachments/{attachment.get('id')}/"
f"{attachment.get('name')}"
),
"storage_type": "attachment",
"artifact_type": "reference",
"created_by": attachment.get("userId"),
"created_at": attachment.get("createdAt"),
"description": f"Attachment from task {task_id}",
}
)
except Exception as e:
# Don't fail the whole operation if kanban is unavailable
print(
f"Warning: Failed to get kanban attachments for task {task_id}: {e}"
)
# 3. Get artifacts from dependency tasks
#
# GH-356 scope annotation: determine which domain the requesting
# task owns by extracting its ``domain:X`` labels. These labels
# are added by the contract-first path in nlp_tools for both
# ghost (design) and implementation tasks. Feature-based tasks
# carry no ``domain:`` labels, so ``requesting_domain_labels``
# will be empty for that path.
requesting_domain_labels = {
label
for label in (getattr(task, "labels", []) or [])
if label.startswith("domain:")
}
if task.dependencies:
for dep_id in task.dependencies:
dep_task = next(
(t for t in state.project_tasks if t.id == dep_id), None
)
if dep_task:
# #595 Fix 2: foundation (pre-fork synthesis) output is
# delivered project-globally via `project_contract`.
# Skip foundation deps entirely here — both logged
# artifacts and Kanban attachments — so that channel
# stays the single source and direct dependents are
# not handed foundation data the 1-hop way.
if getattr(dep_task, "source_type", None) == "pre_fork_synthesis":
continue
# Logged artifacts from dependency
if (
hasattr(state, "task_artifacts")
and dep_id in state.task_artifacts
):
# P1 fix (GH-356 Codex review): deep-copy each
# artifact dict so scope_annotation mutations
# don't bleed back into state.task_artifacts.
# The list .copy() was shallow — dict objects
# were shared, so A's annotation persisted into
# B's stored artifact for subsequent callers.
dep_artifacts = [dict(a) for a in state.task_artifacts[dep_id]]
dep_labels = getattr(dep_task, "labels", []) or []
is_design_dep = "design" in dep_labels
# contract_first ghost tasks carry both "design" and
# "auto_completed" labels. They already receive framing
# from the contract_notice layer in
# build_tiered_instructions, so we skip guidance here.
is_contract_first_ghost = "auto_completed" in dep_labels
# GH-356: ``domain:`` labels on the dep task for
# scope_annotation comparison.
dep_domain_labels = {
label for label in dep_labels if label.startswith("domain:")
}
for artifact in dep_artifacts:
artifact["dependency_task_id"] = dep_id
artifact["dependency_task_name"] = dep_task.name
artifact["description"] = (
f"{artifact.get('description', '')} "
f"(from dependency: {dep_task.name})"
)
# Option C: artifact_role field takes precedence.
# Option B: fall back to label-based detection.
_inject_usage_guidance(
artifact, is_design_dep, is_contract_first_ghost
)
# GH-356: scope annotation at retrieval time.
# The same artifact is in_scope for one agent and
# reference_only for another — annotation cannot be
# set at generation time.
#
# contract_first (requesting task has domain: labels):
# same domain as dep → in_scope (agent owns it)
# different domain → reference_only (coordinate only)
#
# feature_based (no domain: labels):
# all deps → reference_only (no domain ownership)
if requesting_domain_labels:
if requesting_domain_labels & dep_domain_labels:
artifact["scope_annotation"] = "in_scope"
else:
artifact["scope_annotation"] = "reference_only"
else:
artifact["scope_annotation"] = "reference_only"
artifacts.extend(dep_artifacts)
# Kanban attachments from dependency
if state.kanban_client:
try:
dep_card_id = (
getattr(dep_task, "kanban_card_id", None) or dep_task.id
)
result = await state.kanban_client.get_attachments(
card_id=dep_card_id
)
if result.get("success", False):
attachments = result.get("data", [])
for attachment in attachments:
# P2 fix (GH-356 Codex review):
# Kanban dep attachments also need
# scope_annotation so the data layer
# matches the prompt contract.
dep_attachment_labels = (
getattr(dep_task, "labels", []) or []
)
dep_attachment_domains = {
lbl
for lbl in dep_attachment_labels
if lbl.startswith("domain:")
}
if requesting_domain_labels:
attachment_scope = (
"in_scope"
if requesting_domain_labels
& dep_attachment_domains
else "reference_only"
)
else:
attachment_scope = "reference_only"
artifacts.append(
{
"filename": attachment.get("name"),
"location": (
f"./attachments/{attachment.get('id')}/"
f"{attachment.get('name')}"
),
"storage_type": "attachment",
"artifact_type": "reference",
"created_by": attachment.get("userId"),
"created_at": attachment.get("createdAt"),
"dependency_task_id": dep_id,
"dependency_task_name": dep_task.name,
"description": (
f"Attachment from dependency: "
f"{dep_task.name}"
),
"scope_annotation": attachment_scope,
}
)
except Exception as e:
# Don't fail if kanban is unavailable
print(
f"Warning: Failed to get kanban attachments "
f"for dependency {dep_id}: {e}"
)
except Exception as e:
# Don't fail the whole context operation if artifact collection fails
print(f"Warning: Artifact collection encountered an error: {e}")
return artifacts
async def _collect_transitive_context(
task_id: str, task: Any, state: Any
) -> Dict[str, List[Dict[str, Any]]]:
"""
Collect ambient context from a task's *transitive* ancestors.
Issue #605 tier 3: beyond the foundation contract (project-global)
and direct-dependency artifacts (one hop, ``in_scope``), an agent
also benefits from architectural artifacts and decisions produced
further upstream. This helper walks the dependency graph past the
first hop and gathers that ambient reference material.
Everything returned is labelled ``scope_annotation: "reference_only"``
— it is context to coordinate against, not work the requesting agent
owns. Direct (one-hop) dependencies are excluded here; those are
handled by :func:`_collect_task_artifacts` with proper ``in_scope`` /
``reference_only`` annotation. Foundation (pre-fork synthesis) tasks
are also excluded — their output rides the project-global contract.
Only architectural artifacts are returned (see
:func:`_is_architectural_artifact`); source-code artifacts are never
delivered as context.
Decisions propagate fully: every decision made on any transitive
ancestor is returned, because architectural decisions are small and
cross-cutting and every descendant should see them.
Parameters
----------
task_id : str
The requesting task's ID.
task : Any
The requesting task. Its ``dependencies`` list seeds the walk.
state : Any
Marcus server state. ``project_tasks``, ``task_artifacts`` and
``context`` are each consulted defensively.
Returns
-------
Dict[str, List[Dict[str, Any]]]
``{"artifacts": [...], "decisions": [...]}`` — transitive
ancestor artifacts and decisions, each ``reference_only``. Both
lists are empty when there is no transitive ancestry.
"""
project_tasks = getattr(state, "project_tasks", None) or []
tasks_by_id = {t.id: t for t in project_tasks}
task_artifacts = getattr(state, "task_artifacts", None) or {}
direct_deps = set(getattr(task, "dependencies", None) or [])
foundation_ids = {
t.id
for t in project_tasks
if getattr(t, "source_type", None) == "pre_fork_synthesis"
}
# Breadth-first walk of all ancestors. ``visited`` guards against
# cycles in a malformed dependency graph.
ancestors: Set[str] = set()
visited: Set[str] = {task_id}
frontier = list(direct_deps)
while frontier:
dep_id = frontier.pop()
if dep_id in visited:
continue
visited.add(dep_id)
ancestors.add(dep_id)
dep_task = tasks_by_id.get(dep_id)
if dep_task is not None:
for upstream in getattr(dep_task, "dependencies", None) or []:
if upstream not in visited:
frontier.append(upstream)
# Transitive-only = ancestors minus direct deps minus foundation.
transitive_ids = ancestors - direct_deps - foundation_ids
artifacts: List[Dict[str, Any]] = []
for anc_id in transitive_ids:
anc_task = tasks_by_id.get(anc_id)
anc_name = getattr(anc_task, "name", anc_id) if anc_task else anc_id
for raw in task_artifacts.get(anc_id, []):
if not _is_architectural_artifact(raw):
continue
artifact = dict(raw)
artifact["scope_annotation"] = "reference_only"
artifact["dependency_task_id"] = anc_id
artifact["dependency_task_name"] = anc_name
artifact.setdefault("usage_guidance", _COORDINATION_REFERENCE_GUIDANCE)
artifacts.append(artifact)
# PR #606 review P2: _collect_task_artifacts pulls Kanban
# attachments for direct deps; the transitive walk has to do
# the same or architectural docs that were attached (but not
# logged via ``log_artifact``) silently disappear past the
# first hop.
kanban = getattr(state, "kanban_client", None)
if kanban is not None and anc_task is not None:
try:
card_id = getattr(anc_task, "kanban_card_id", None) or anc_id
kanban_result = await kanban.get_attachments(card_id=card_id)
if kanban_result.get("success", False):
for attachment in kanban_result.get("data", []) or []:
attach_artifact: Dict[str, Any] = {
"filename": attachment.get("name"),
"location": (
f"./attachments/{attachment.get('id')}/"
f"{attachment.get('name')}"
),
"storage_type": "attachment",
"artifact_type": "reference",
"created_by": attachment.get("userId"),
"created_at": attachment.get("createdAt"),
"dependency_task_id": anc_id,
"dependency_task_name": anc_name,
"description": (
f"Attachment from transitive ancestor: " f"{anc_name}"
),
"scope_annotation": "reference_only",
"usage_guidance": _COORDINATION_REFERENCE_GUIDANCE,
}
if _is_architectural_artifact(attach_artifact):
artifacts.append(attach_artifact)
except Exception as exc: # noqa: BLE001
logger.warning(
"Failed to get Kanban attachments for transitive "
"ancestor %s: %s",
anc_id,
exc,
)
decisions: List[Dict[str, Any]] = []
context = getattr(state, "context", None)
if context is not None:
for decision in getattr(context, "decisions", None) or []:
if getattr(decision, "task_id", None) in transitive_ids:
decision_dict = decision.to_dict()
decision_dict["scope_annotation"] = "reference_only"
decisions.append(decision_dict)
return {"artifacts": artifacts, "decisions": decisions}
[docs]
async def assemble_task_context(task_id: str, task: Any, state: Any) -> Dict[str, Any]:
"""
Assemble the full delivered context for a freshly assigned task.
Issue #605: context must be delivered *with* the task in the
``request_next_task`` response, not left to the optional
``get_task_context`` call. This helper builds the three-tier context
bundle so :func:`request_next_task` can attach it directly.
The three tiers, each scope-labelled:
1. ``project_contract`` — the project-global foundation contract
(:func:`_collect_foundation_contract`). Reaches every task.
2. ``dependency_artifacts`` — direct-dependency artifacts, filtered
to architectural types, carrying the ``in_scope`` /
``reference_only`` annotation set by :func:`_collect_task_artifacts`.
3. ``transitive_context`` — transitive ancestor artifacts and all
upstream decisions (:func:`_collect_transitive_context`), every
entry ``reference_only``.
Parameters
----------
task_id : str
The assigned task's ID.
task : Any
The assigned task object.
state : Any
Marcus server state.
Returns
-------
Dict[str, Any]
``{"project_contract": {...}, "dependency_artifacts": [...],
"transitive_context": {...}}``. Each field degrades to an empty
value rather than raising when a subsystem is unavailable.
"""
project_contract = await _collect_foundation_contract(state)
direct_artifacts = await _collect_task_artifacts(task_id, task, state)
# PR #606 review P2: _collect_task_artifacts also returns the
# requesting task's *own* logged artifacts and Kanban attachments,
# which lack ``dependency_task_id``. Filter to dependency-sourced
# entries so ``dependency_artifacts`` is what its name claims and
# downstream consumers that rely on the scope/coordination shape
# never see the task's own artifacts mis-tagged as deps.
dependency_artifacts = [
a
for a in direct_artifacts
if a.get("dependency_task_id") and _is_architectural_artifact(a)
]
transitive_context = await _collect_transitive_context(task_id, task, state)
return {
"project_contract": project_contract,
"dependency_artifacts": dependency_artifacts,
"transitive_context": transitive_context,
}
async def _collect_sibling_subtask_context(
current_subtask_id: str, parent_task_id: str, state: Any
) -> Dict[str, List[Dict[str, Any]]]:
"""
Collect both artifacts AND decisions from sibling subtasks in one pass.
OPTIMIZED: Single loop through siblings collecting both artifacts and
decisions, reducing from 2 separate calls to 1.
When Task B's subtask 1 calls get_task_context, it should see
artifacts and decisions produced by Task B's subtask 2, 3, etc.
Parameters
----------
current_subtask_id : str
The current subtask requesting context
parent_task_id : str
The parent task ID
state : Any
Marcus server state
Returns
-------
Dict[str, List[Dict[str, Any]]]
Dict with "artifacts" and "decisions" keys containing sibling context
"""
sibling_artifacts: List[Dict[str, Any]] = []
sibling_decisions: List[Dict[str, Any]] = []
if not hasattr(state, "subtask_manager") or not state.subtask_manager:
return {"artifacts": sibling_artifacts, "decisions": sibling_decisions}
# Get all subtasks for this parent from unified storage
subtasks = state.subtask_manager.get_subtasks(parent_task_id, state.project_tasks)
# Single pass: collect both artifacts and decisions from siblings
for subtask in subtasks:
if subtask.id == current_subtask_id:
continue # Skip current subtask
# Collect artifacts from this sibling
if hasattr(state, "task_artifacts") and subtask.id in state.task_artifacts:
for artifact in state.task_artifacts[subtask.id]:
# Add sibling context to artifact
sibling_artifact = artifact.copy()
sibling_artifact["from_sibling_subtask"] = subtask.id
sibling_artifact["from_sibling_subtask_name"] = subtask.name
sibling_artifact["description"] = (
f"[From sibling subtask: {subtask.name}] "
f"{sibling_artifact.get('description', '')}"
)
sibling_artifacts.append(sibling_artifact)
# Collect decisions from this sibling
if hasattr(state, "context") and state.context:
subtask_decisions = [
d.to_dict() for d in state.context.decisions if d.task_id == subtask.id
]
for decision in subtask_decisions:
# Add sibling context
decision["from_sibling_subtask"] = subtask.id
decision["from_sibling_subtask_name"] = subtask.name
decision["what"] = f"[From sibling: {subtask.name}] {decision['what']}"
sibling_decisions.append(decision)
return {"artifacts": sibling_artifacts, "decisions": sibling_decisions}
async def _persist_decision_to_history(logged_decision: Any, state: Any) -> None:
"""
Persist decision to project history for post-project analysis.
Converts the Context system's Decision to ProjectHistory's Decision format
and stores it persistently.
Parameters
----------
logged_decision : Decision
The decision from the Context system
state : Any
Marcus server state
Notes
-----
Fails gracefully - errors are logged but don't interrupt the main flow.
"""
try:
# Get project info from state
if not hasattr(state, "current_project_id") or not state.current_project_id:
logger.debug("No active project - skipping project history persistence")
return
project_id = state.current_project_id
project_name = getattr(state, "current_project_name", project_id)
# Initialize project history persistence if not already done
if not hasattr(state, "project_history_persistence"):
state.project_history_persistence = ProjectHistoryPersistence()
# Get kanban comment URL if decision was posted to kanban
kanban_comment_url = None
if hasattr(state, "last_kanban_comment_url"):
kanban_comment_url = state.last_kanban_comment_url
# Find affected tasks by checking task dependencies
affected_tasks: list[str] = []
if hasattr(state, "project_tasks"):
for task in state.project_tasks:
if (
hasattr(task, "dependencies")
and task.dependencies
and logged_decision.task_id in task.dependencies
):
affected_tasks.append(task.id)
# Convert Context Decision to ProjectHistory Decision
history_decision = HistoryDecision(
decision_id=logged_decision.decision_id,
task_id=logged_decision.task_id,
agent_id=logged_decision.agent_id,
timestamp=logged_decision.timestamp,
what=logged_decision.what,
why=logged_decision.why,
impact=logged_decision.impact,
affected_tasks=affected_tasks,
confidence=0.8, # Default confidence
kanban_comment_url=kanban_comment_url,
project_id=project_id,
)
# Persist to project history
await state.project_history_persistence.append_decision(
project_id, project_name, history_decision
)
logger.info(
f"Persisted decision {logged_decision.decision_id} "
f"to project history for {project_id}"
)
except Exception as e:
# Graceful degradation - log but don't fail
logger.warning(f"Failed to persist decision to project history: {e}")