The CostTracker provides comprehensive cost tracking for all Claude SDK calls with per-agent attribution, time-series aggregation, and LangFuse observability.
Centralized cost tracking for all Claude API calls.
Initialize cost tracker.
project_root: Project root directory (defaults to cwd)
enable_langfuse: Enable LangFuse integration if available
storage_path: Custom storage path for cost data (defaults to logs/costs/)
deftrack_call(self,metrics:UsageMetrics,agent_name:str="unknown",task_name:str="unknown",)->None:"""Track a single API call with batched writes. Args: ---- metrics: Usage metrics from the call agent_name: Name of the agent that made the call task_name: Name of the task being executed """# Track immediately for real-time summariesself.calls.append(metrics)# Retain in write buffer for batched disk persistence (Quick Win #3)self._write_buffer.append(metrics)# Add to agent trackingself.agents[agent_name].append(metrics)# Add to task trackingtask_key=f"{agent_name}:{task_name}"iftask_keynotinself.tasks:self.tasks[task_key]=TaskMetrics(task_name=task_name,agent_name=agent_name,)self.tasks[task_key].add_call(metrics)# Update LangFuse if available (v3.x API)ifself.langfuse:try:# Create event for this API callmetadata={"model":metrics.model,"agent":agent_name,"task":task_name,"cost_usd":metrics.total_cost,"cache_hit_rate":metrics.cache_hit_rate,"input_tokens":metrics.input_tokens,"output_tokens":metrics.output_tokens,"cache_read_tokens":metrics.cache_read_tokens,"duration_ms":metrics.duration_ms,}# PHASE 1 Task 1.3: Add thinking token trackingifmetrics.thinking_tokens>0:metadata["thinking_tokens"]=metrics.thinking_tokensmetadata["thinking_from_cache"]=metrics.thinking_from_cacheself.langfuse.create_event(name=f"{agent_name}:{task_name}",metadata=metadata,)exceptExceptionase:print(f"⚠️ LangFuse event creation failed: {e}")# Schedule auto-flush check (non-blocking)try:asyncio.create_task(self._auto_flush_if_needed())exceptRuntimeError:# No event loop - buffer writes, flush manually via flush_sync()# This happens in tests without async contextpass
defget_cost_summary(self,start_date:str|None=None,end_date:str|None=None,)->dict[str,Any]:"""Get cost summary for a date range. Args: ---- start_date: Start date (ISO format, e.g., "2025-10-01") end_date: End date (ISO format, e.g., "2025-10-31") Returns: ------- Summary dict with total costs, tokens, and breakdowns """# Filter calls by date rangefiltered_calls=self.callsifstart_date:start_dt=datetime.fromisoformat(start_date)filtered_calls=[cforcinfiltered_callsifdatetime.fromisoformat(c.timestamp)>=start_dt]ifend_date:end_dt=datetime.fromisoformat(end_date)filtered_calls=[cforcinfiltered_callsifdatetime.fromisoformat(c.timestamp)<=end_dt]ifnotfiltered_calls:return{"total_calls":0,"total_cost":0.0,"total_tokens":0}# Calculate totalstotal_cost=sum(c.total_costforcinfiltered_calls)total_tokens=sum(c.total_tokensforcinfiltered_calls)total_input=sum(c.input_tokensforcinfiltered_calls)total_output=sum(c.output_tokensforcinfiltered_calls)total_cache_reads=sum(c.cache_read_tokensforcinfiltered_calls)# Model breakdownmodel_costs:dict[str,float]=defaultdict(float)forcallinfiltered_calls:model_costs[call.model]+=call.total_costreturn{"period":{"start":start_dateorfiltered_calls[0].timestamp,"end":end_dateorfiltered_calls[-1].timestamp,},"total_calls":len(filtered_calls),"total_cost":total_cost,"total_tokens":total_tokens,"tokens_breakdown":{"input":total_input,"output":total_output,"cache_reads":total_cache_reads,},"average_cost_per_call":total_cost/len(filtered_calls),"average_tokens_per_call":total_tokens/len(filtered_calls),"cache_hit_rate":(total_cache_reads/(total_input+total_cache_reads)*100)if(total_input+total_cache_reads)>0else0.0,"model_breakdown":dict(model_costs),}
defget_daily_costs(self,days:int=30)->dict[str,float]:"""Get daily cost totals for the last N days. Args: ---- days: Number of days to include (default: 30) Returns: ------- Dict mapping dates (YYYY-MM-DD) to total costs """daily_costs:dict[str,float]=defaultdict(float)cutoff=datetime.now()-timedelta(days=days)forcallinself.calls:call_date=datetime.fromisoformat(call.timestamp)ifcall_date>=cutoff:date_key=call_date.strftime("%Y-%m-%d")daily_costs[date_key]+=call.total_costreturndict(sorted(daily_costs.items()))
defexport_to_csv(self,output_path:Path)->None:"""Export cost data to CSV file. Args: ---- output_path: Path to output CSV file """importcsvwithoutput_path.open("w",newline="")asf:writer=csv.writer(f)# Headerwriter.writerow(["timestamp","model","agent","task","input_tokens","output_tokens","cache_read_tokens","cache_write_tokens","input_cost","output_cost","cache_read_cost","cache_write_cost","total_cost","duration_ms",],)# Find agent/task for each callforcallinself.calls:agent="unknown"task="unknown"# Search through tasks to find this callfortask_key,task_metricsinself.tasks.items():ifcallintask_metrics.calls:agent=task_metrics.agent_nametask=task_metrics.task_namebreakwriter.writerow([call.timestamp,call.model,agent,task,call.input_tokens,call.output_tokens,call.cache_read_tokens,call.cache_creation_tokens,call.input_cost,call.output_cost,call.cache_read_cost,call.cache_write_cost,call.total_cost,call.duration_ms,],)print(f"✅ Exported {len(self.calls)} calls to {output_path}")
tracker=get_tracker()# Export to CSVtracker.export_to_csv("cost_report.csv",start_date="2025-10-01")# Export to JSONtracker.export_to_json("cost_report.json",start_date="2025-10-01")