Skip to content

Cost Tracker API

API reference for production-grade cost tracking with LangFuse integration.

Overview

The CostTracker provides comprehensive cost tracking for all Claude SDK calls with per-agent attribution, time-series aggregation, and LangFuse observability.

from src.orchestrator.cost_tracker import CostTracker, track_agent_call

async with track_agent_call(agent_name="implementer", task="Add auth"):
    result = await executor.run_task("implement authentication")

tracker = CostTracker()
summary = tracker.get_cost_summary()

API Reference

CostTracker

CostTracker(project_root: Path | None = None, enable_langfuse: bool = True, storage_path: Path | None = None)

Centralized cost tracking for all Claude API calls.

Initialize cost tracker.


project_root: Project root directory (defaults to cwd)
enable_langfuse: Enable LangFuse integration if available
storage_path: Custom storage path for cost data (defaults to logs/costs/)
Source code in src/orchestrator/cost_tracker.py
def __init__(
    self,
    project_root: Path | None = None,
    enable_langfuse: bool = True,
    storage_path: Path | None = None,
):
    """Initialize cost tracker.

    Args:
    ----
        project_root: Project root directory (defaults to cwd)
        enable_langfuse: Enable LangFuse integration if available
        storage_path: Custom storage path for cost data (defaults to logs/costs/)

    """
    self.project_root = project_root or Path.cwd()
    self.storage_path = storage_path or (self.project_root / "logs" / "costs")
    self.storage_path.mkdir(parents=True, exist_ok=True)

    # In-memory storage
    self.calls: list[UsageMetrics] = []
    self.tasks: dict[str, TaskMetrics] = {}
    self.agents: dict[str, list[UsageMetrics]] = defaultdict(list)

    # Write batching for performance (Quick Win #3)
    self._write_buffer: list[UsageMetrics] = []
    self._last_flush_time = time.time()
    self._flush_threshold = 10  # Flush every 10 calls
    self._flush_interval = 5.0  # Or every 5 seconds
    self._flush_task: asyncio.Task | None = None

    # LangFuse integration
    self.langfuse: Any | None = None
    if enable_langfuse and LANGFUSE_AVAILABLE:
        self._init_langfuse()

    # Load existing data
    self._load_from_disk()

track_call

track_call(metrics: UsageMetrics, agent_name: str = 'unknown', task_name: str = 'unknown') -> None

Track a single API call with batched writes.


metrics: Usage metrics from the call
agent_name: Name of the agent that made the call
task_name: Name of the task being executed
Source code in src/orchestrator/cost_tracker.py
def track_call(
    self,
    metrics: UsageMetrics,
    agent_name: str = "unknown",
    task_name: str = "unknown",
) -> None:
    """Track a single API call with batched writes.

    Args:
    ----
        metrics: Usage metrics from the call
        agent_name: Name of the agent that made the call
        task_name: Name of the task being executed

    """
    # Track immediately for real-time summaries
    self.calls.append(metrics)

    # Retain in write buffer for batched disk persistence (Quick Win #3)
    self._write_buffer.append(metrics)

    # Add to agent tracking
    self.agents[agent_name].append(metrics)

    # Add to task tracking
    task_key = f"{agent_name}:{task_name}"
    if task_key not in self.tasks:
        self.tasks[task_key] = TaskMetrics(
            task_name=task_name,
            agent_name=agent_name,
        )
    self.tasks[task_key].add_call(metrics)

    # Update LangFuse if available (v3.x API)
    if self.langfuse:
        try:
            # Create event for this API call
            metadata = {
                "model": metrics.model,
                "agent": agent_name,
                "task": task_name,
                "cost_usd": metrics.total_cost,
                "cache_hit_rate": metrics.cache_hit_rate,
                "input_tokens": metrics.input_tokens,
                "output_tokens": metrics.output_tokens,
                "cache_read_tokens": metrics.cache_read_tokens,
                "duration_ms": metrics.duration_ms,
            }

            # PHASE 1 Task 1.3: Add thinking token tracking
            if metrics.thinking_tokens > 0:
                metadata["thinking_tokens"] = metrics.thinking_tokens
                metadata["thinking_from_cache"] = metrics.thinking_from_cache

            self.langfuse.create_event(
                name=f"{agent_name}:{task_name}",
                metadata=metadata,
            )
        except Exception as e:
            print(f"⚠️  LangFuse event creation failed: {e}")

    # Schedule auto-flush check (non-blocking)
    try:
        asyncio.create_task(self._auto_flush_if_needed())
    except RuntimeError:
        # No event loop - buffer writes, flush manually via flush_sync()
        # This happens in tests without async context
        pass

get_cost_summary

get_cost_summary(start_date: str | None = None, end_date: str | None = None) -> dict[str, Any]

Get cost summary for a date range.


start_date: Start date (ISO format, e.g., "2025-10-01")
end_date: End date (ISO format, e.g., "2025-10-31")

Summary dict with total costs, tokens, and breakdowns
Source code in src/orchestrator/cost_tracker.py
def get_cost_summary(
    self,
    start_date: str | None = None,
    end_date: str | None = None,
) -> dict[str, Any]:
    """Get cost summary for a date range.

    Args:
    ----
        start_date: Start date (ISO format, e.g., "2025-10-01")
        end_date: End date (ISO format, e.g., "2025-10-31")

    Returns:
    -------
        Summary dict with total costs, tokens, and breakdowns

    """
    # Filter calls by date range
    filtered_calls = self.calls
    if start_date:
        start_dt = datetime.fromisoformat(start_date)
        filtered_calls = [
            c for c in filtered_calls if datetime.fromisoformat(c.timestamp) >= start_dt
        ]
    if end_date:
        end_dt = datetime.fromisoformat(end_date)
        filtered_calls = [
            c for c in filtered_calls if datetime.fromisoformat(c.timestamp) <= end_dt
        ]

    if not filtered_calls:
        return {"total_calls": 0, "total_cost": 0.0, "total_tokens": 0}

    # Calculate totals
    total_cost = sum(c.total_cost for c in filtered_calls)
    total_tokens = sum(c.total_tokens for c in filtered_calls)
    total_input = sum(c.input_tokens for c in filtered_calls)
    total_output = sum(c.output_tokens for c in filtered_calls)
    total_cache_reads = sum(c.cache_read_tokens for c in filtered_calls)

    # Model breakdown
    model_costs: dict[str, float] = defaultdict(float)
    for call in filtered_calls:
        model_costs[call.model] += call.total_cost

    return {
        "period": {
            "start": start_date or filtered_calls[0].timestamp,
            "end": end_date or filtered_calls[-1].timestamp,
        },
        "total_calls": len(filtered_calls),
        "total_cost": total_cost,
        "total_tokens": total_tokens,
        "tokens_breakdown": {
            "input": total_input,
            "output": total_output,
            "cache_reads": total_cache_reads,
        },
        "average_cost_per_call": total_cost / len(filtered_calls),
        "average_tokens_per_call": total_tokens / len(filtered_calls),
        "cache_hit_rate": (total_cache_reads / (total_input + total_cache_reads) * 100)
        if (total_input + total_cache_reads) > 0
        else 0.0,
        "model_breakdown": dict(model_costs),
    }

get_daily_costs

get_daily_costs(days: int = 30) -> dict[str, float]

Get daily cost totals for the last N days.


days: Number of days to include (default: 30)

Dict mapping dates (YYYY-MM-DD) to total costs
Source code in src/orchestrator/cost_tracker.py
def get_daily_costs(self, days: int = 30) -> dict[str, float]:
    """Get daily cost totals for the last N days.

    Args:
    ----
        days: Number of days to include (default: 30)

    Returns:
    -------
        Dict mapping dates (YYYY-MM-DD) to total costs

    """
    daily_costs: dict[str, float] = defaultdict(float)

    cutoff = datetime.now() - timedelta(days=days)
    for call in self.calls:
        call_date = datetime.fromisoformat(call.timestamp)
        if call_date >= cutoff:
            date_key = call_date.strftime("%Y-%m-%d")
            daily_costs[date_key] += call.total_cost

    return dict(sorted(daily_costs.items()))

get_costs_by_agent

get_costs_by_agent() -> dict[str, dict[str, Any]]

Get cost breakdown by agent.

Returns
Dict mapping agent names to their cost summaries
Source code in src/orchestrator/cost_tracker.py
def get_costs_by_agent(self) -> dict[str, dict[str, Any]]:
    """Get cost breakdown by agent.

    Returns
    -------
        Dict mapping agent names to their cost summaries

    """
    result = {}
    for agent_name, calls in self.agents.items():
        if not calls:
            continue

        total_cost = sum(c.total_cost for c in calls)
        total_tokens = sum(c.total_tokens for c in calls)

        result[agent_name] = {
            "total_calls": len(calls),
            "total_cost": total_cost,
            "total_tokens": total_tokens,
            "average_cost_per_call": total_cost / len(calls),
            "average_tokens_per_call": total_tokens / len(calls),
        }

    return result

export_to_csv

export_to_csv(output_path: Path) -> None

Export cost data to CSV file.


output_path: Path to output CSV file
Source code in src/orchestrator/cost_tracker.py
def export_to_csv(self, output_path: Path) -> None:
    """Export cost data to CSV file.

    Args:
    ----
        output_path: Path to output CSV file

    """
    import csv

    with output_path.open("w", newline="") as f:
        writer = csv.writer(f)

        # Header
        writer.writerow(
            [
                "timestamp",
                "model",
                "agent",
                "task",
                "input_tokens",
                "output_tokens",
                "cache_read_tokens",
                "cache_write_tokens",
                "input_cost",
                "output_cost",
                "cache_read_cost",
                "cache_write_cost",
                "total_cost",
                "duration_ms",
            ],
        )

        # Find agent/task for each call
        for call in self.calls:
            agent = "unknown"
            task = "unknown"

            # Search through tasks to find this call
            for task_key, task_metrics in self.tasks.items():
                if call in task_metrics.calls:
                    agent = task_metrics.agent_name
                    task = task_metrics.task_name
                    break

            writer.writerow(
                [
                    call.timestamp,
                    call.model,
                    agent,
                    task,
                    call.input_tokens,
                    call.output_tokens,
                    call.cache_read_tokens,
                    call.cache_creation_tokens,
                    call.input_cost,
                    call.output_cost,
                    call.cache_read_cost,
                    call.cache_write_cost,
                    call.total_cost,
                    call.duration_ms,
                ],
            )

    print(f"✅ Exported {len(self.calls)} calls to {output_path}")

UsageMetrics Model

UsageMetrics dataclass

UsageMetrics(input_tokens: int = 0, output_tokens: int = 0, cache_creation_tokens: int = 0, cache_read_tokens: int = 0, thinking_tokens: int = 0, input_cost: float = 0.0, output_cost: float = 0.0, cache_write_cost: float = 0.0, cache_read_cost: float = 0.0, total_cost: float = 0.0, model: str = '', duration_ms: int = 0, timestamp: str = (lambda: datetime.now().isoformat())())

Token usage and cost metrics for a single API call.

total_tokens property

total_tokens: int

Total tokens consumed (excluding cache reads which are cheaper).

cache_hit_rate property

cache_hit_rate: float

Percentage of input tokens served from cache.

thinking_from_cache property

thinking_from_cache: bool

Check if thinking tokens came from cache.

PHASE 1 Task 1.3: Extended thinking blocks auto-cache. Anthropic: Thinking blocks cache with message content.

cost_per_token property

cost_per_token: float

Average cost per token (USD).

to_dict

to_dict() -> dict[str, Any]

Convert to dictionary for serialization.

Source code in src/orchestrator/cost_tracker.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary for serialization."""
    return asdict(self)

Usage Examples

Basic Cost Tracking

from src.orchestrator.cost_tracker import track_agent_call, get_tracker

# Track task cost
async with track_agent_call(agent_name="implementer", task="Feature X"):
    result = await executor.run_task("implement feature X")

# Get summary
tracker = get_tracker()
summary = tracker.get_cost_summary()
print(f"Total: ${summary['total_cost_usd']:.4f}")

Per-Agent Costs

tracker = get_tracker()
agent_costs = tracker.get_costs_by_agent()

for agent, cost in agent_costs.items():
    print(f"{agent}: ${cost:.4f}")

Time-Series Reports

from datetime import datetime, timedelta

tracker = get_tracker()
start = datetime.now() - timedelta(days=30)
end = datetime.now()

daily_costs = tracker.get_daily_costs(start_date=start, end_date=end)
for date, cost in daily_costs.items():
    print(f"{date}: ${cost:.2f}")

Export Reports

tracker = get_tracker()

# Export to CSV
tracker.export_to_csv("cost_report.csv", start_date="2025-10-01")

# Export to JSON
tracker.export_to_json("cost_report.json", start_date="2025-10-01")

Pricing Reference

Current pricing (October 2025):

Model Input Output Cache Write Cache Read
Haiku 4.5 $0.80/MTok $4.00/MTok $1.00/MTok $0.08/MTok
Sonnet 4.5 $3.00/MTok $15.00/MTok $3.75/MTok $0.30/MTok
Opus 4 $15.00/MTok $75.00/MTok $18.75/MTok $1.50/MTok

Best Practices

  1. Enable Caching - 90% cost savings
  2. Track All Calls - Use track_agent_call context manager
  3. Monitor Daily - Check costs regularly
  4. Set Budgets - Configure alerts
  5. Export Reports - Monthly cost analysis

Next Steps

Cost Guide Orchestrator API