Source code for src.cost_tracking.token_tracker

"""
Token-based Cost Tracking for Marcus AI Usage.

Tracks real-time AI token consumption, spend rates, and project costs
based on actual usage rather than naive hourly estimates.
"""

import asyncio
import json
import sys
from collections import defaultdict, deque
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Deque, Dict, List, Optional


[docs] class TokenTracker: """ Tracks AI token usage per project with real-time cost monitoring. Features: - Per-project token tracking - Real-time spend rate calculation - Cost projections based on current usage - Token usage history and analytics """
[docs] def __init__(self, cost_per_1k_tokens: float = 0.03): """ Initialize token tracker. Parameters ---------- cost_per_1k_tokens : float Cost per 1000 tokens (default $0.03 for Claude) """ self.cost_per_1k_tokens = cost_per_1k_tokens # Project tracking self.project_tokens: Dict[str, int] = defaultdict(int) self.project_costs: Dict[str, float] = defaultdict(float) # Real-time tracking with sliding windows self.token_history: Dict[str, Deque[Dict[str, Any]]] = defaultdict( lambda: deque(maxlen=1000) # Keep last 1000 token events ) # Session tracking self.session_start_times: Dict[str, datetime] = {} self.session_tokens: Dict[str, int] = defaultdict(int) # Rate tracking self.spend_rates: Dict[str, List[float]] = defaultdict(list) # Persistence self.data_file = Path("data/token_usage.json") self.load_historical_data() # Start background rate calculator self._rate_task: Optional[asyncio.Task[None]] = None
[docs] def load_historical_data(self) -> None: """Load historical token usage data.""" if self.data_file.exists(): try: with open(self.data_file, "r") as f: data = json.load(f) self.project_tokens = defaultdict( int, data.get("project_tokens", {}) ) self.project_costs = defaultdict( float, data.get("project_costs", {}) ) except Exception as e: print(f"Failed to load token history: {e}", file=sys.stderr)
[docs] def save_data(self) -> None: """Persist token usage data.""" self.data_file.parent.mkdir(parents=True, exist_ok=True) data = { "project_tokens": dict(self.project_tokens), "project_costs": dict(self.project_costs), "last_updated": datetime.now(timezone.utc).isoformat(), } with open(self.data_file, "w") as f: json.dump(data, f, indent=2)
[docs] async def track_tokens( self, project_id: str, input_tokens: int, output_tokens: int, model: str = "claude-3-sonnet", metadata: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """ Track token usage for a project. Parameters ---------- project_id : str Project identifier input_tokens : int Number of input tokens output_tokens : int Number of output tokens model : str AI model used metadata : Optional[Dict[str, Any]] Additional context (task_id, agent_id, etc.) Returns ------- Dict[str, Any] Dict with usage stats and cost info """ total_tokens = input_tokens + output_tokens timestamp = datetime.now(timezone.utc) # Update totals self.project_tokens[project_id] += total_tokens cost = (total_tokens / 1000) * self.cost_per_1k_tokens self.project_costs[project_id] += cost # Track for rate calculation self.token_history[project_id].append( { "timestamp": timestamp, "tokens": total_tokens, "cost": cost, "metadata": metadata or {}, } ) # Update session tracking if project_id not in self.session_start_times: self.session_start_times[project_id] = timestamp self.session_tokens[project_id] += total_tokens # Calculate current stats stats = self.get_project_stats(project_id) # Save periodically if len(self.token_history[project_id]) % 10 == 0: self.save_data() return stats
[docs] def get_project_stats(self, project_id: str) -> Dict[str, Any]: """ Get comprehensive stats for a project. Parameters ---------- project_id : str Project identifier Returns ------- Dict[str, Any] Dict containing: - total_tokens: Total tokens used - total_cost: Total cost incurred - current_spend_rate: Tokens/hour over last 5 minutes - average_spend_rate: Overall tokens/hour - projected_cost: Estimated total cost at current rate - session_duration: Time since first token """ if project_id not in self.project_tokens: return { "total_tokens": 0, "total_cost": 0.0, "current_spend_rate": 0.0, "average_spend_rate": 0.0, "projected_cost": 0.0, "session_duration": 0, } # Basic stats total_tokens = self.project_tokens[project_id] total_cost = self.project_costs[project_id] # Calculate spend rates current_rate = self._calculate_current_spend_rate(project_id) avg_rate = self._calculate_average_spend_rate(project_id) # Project cost based on current burn rate projected_cost = self._project_total_cost(project_id, current_rate) # Session duration if project_id in self.session_start_times: duration = ( datetime.now(timezone.utc) - self.session_start_times[project_id] ).total_seconds() else: duration = 0 return { "total_tokens": total_tokens, "total_cost": round(total_cost, 4), "current_spend_rate": round(current_rate, 2), # tokens/hour "average_spend_rate": round(avg_rate, 2), "projected_cost": round(projected_cost, 2), "session_duration": int(duration), "cost_per_hour": round((current_rate / 1000) * self.cost_per_1k_tokens, 2), }
def _calculate_current_spend_rate(self, project_id: str) -> float: """Calculate current spend rate (tokens/hour) over last 5 minutes.""" if project_id not in self.token_history: return 0.0 history = list(self.token_history[project_id]) if len(history) < 2: return 0.0 # Get events from last 5 minutes cutoff = datetime.now(timezone.utc) - timedelta(minutes=5) recent_events = [e for e in history if e["timestamp"] > cutoff] if len(recent_events) < 2: # Fall back to last 10 events if not enough recent recent_events = history[-10:] if len(recent_events) < 2: return 0.0 # Calculate rate time_span = ( recent_events[-1]["timestamp"] - recent_events[0]["timestamp"] ).total_seconds() if time_span == 0: return 0.0 total_tokens = sum(e["tokens"] for e in recent_events) tokens_per_second = total_tokens / time_span tokens_per_hour = tokens_per_second * 3600 return float(tokens_per_hour) def _calculate_average_spend_rate(self, project_id: str) -> float: """Calculate average spend rate over entire session.""" if project_id not in self.session_start_times: return 0.0 duration = ( datetime.now(timezone.utc) - self.session_start_times[project_id] ).total_seconds() if duration == 0: return 0.0 tokens_per_second = self.session_tokens[project_id] / duration return tokens_per_second * 3600 def _project_total_cost(self, project_id: str, current_rate: float) -> float: """Project total cost based on current burn rate.""" # This is a simple projection - could be enhanced with: # - Task completion estimates # - Historical patterns # - Complexity analysis # For now, assume 20% of tasks complete = 20% of tokens used # This would integrate with task tracking current_cost = self.project_costs[project_id] # Rough estimate: if we've spent X so far at current rate, # project 5x for a full project (assuming 20% complete) # This should be replaced with actual task completion percentage if current_cost > 0: return current_cost * 5 # Placeholder multiplier return 0.0
[docs] def get_all_projects_summary(self) -> Dict[str, Any]: """Get summary of all tracked projects.""" summary = {} total_tokens = 0 total_cost = 0.0 for project_id in self.project_tokens: stats = self.get_project_stats(project_id) summary[project_id] = stats total_tokens += stats["total_tokens"] total_cost += stats["total_cost"] return { "projects": summary, "total_tokens": total_tokens, "total_cost": round(total_cost, 2), "active_projects": len( [p for p in summary.values() if p["current_spend_rate"] > 0] ), }
[docs] async def start_monitoring(self) -> None: """Start background monitoring task.""" if self._rate_task is None: self._rate_task = asyncio.create_task(self._monitor_rates())
[docs] async def stop_monitoring(self) -> None: """Stop background monitoring.""" if self._rate_task: self._rate_task.cancel() self._rate_task = None
async def _monitor_rates(self) -> None: """Background task to monitor spend rates and alert on anomalies.""" while True: try: await asyncio.sleep(60) # Check every minute for project_id in self.project_tokens: stats = self.get_project_stats(project_id) current_rate = stats["current_spend_rate"] # Track rate history self.spend_rates[project_id].append(current_rate) # Keep only last hour of rates if len(self.spend_rates[project_id]) > 60: self.spend_rates[project_id] = self.spend_rates[project_id][ -60: ] # Check for anomalies if len(self.spend_rates[project_id]) > 10: avg_rate = sum(self.spend_rates[project_id][-10:]) / 10 if current_rate > avg_rate * 2 and current_rate > 10000: # Alert: Spending spike detected msg = ( f"⚠️ Token spend spike for {project_id}: " f"{current_rate:.0f} tokens/hour" ) print(msg, file=sys.stderr) # Periodic save self.save_data() except asyncio.CancelledError: break except Exception as e: print(f"Error in rate monitor: {e}", file=sys.stderr)
# Global instance token_tracker = TokenTracker()