Token-Copilot Documentation

Token-Copilot is a modern, plugin-based library for tracking, analyzing, and optimizing LLM costs in production. Built with clean architecture supporting 5 usage patterns - from minimal tracking to enterprise monitoring.

🎯 Zero Config

Start tracking with just one line of code

🔌 Plugin-Based

Add features as needed - analytics, persistence, routing

👥 Multi-Tenant

Track costs by user, organization, or session

💰 Budget Control

Hard stops at limits with flexible enforcement

📊 Analytics Ready

Export to pandas for advanced analysis

🌐 Multi-Framework

Works with LangChain, LlamaIndex, and Azure OpenAI

Version 2.0 Highlights: 80% simpler API, plugin architecture, 5 usage patterns, zero breaking changes from v1.x

Installation

Basic Installation

pip install token-copilot

With Optional Features

# With analytics support
pip install token-copilot[analytics]

# With streaming support
pip install token-copilot[streaming]

# With all features
pip install token-copilot[all]

# For development
pip install token-copilot[dev]

Requirements

  • Python 3.8+
  • langchain-core (automatically installed)
  • Optional: pandas, numpy (for analytics)
  • Optional: kafka-python, opentelemetry (for streaming)

Quick Start

Get started in under 30 seconds:

from token_copilot import TokenCoPilot
from langchain_openai import ChatOpenAI

# Create copilot with budget
copilot = TokenCoPilot(budget_limit=10.00)

# Use with LangChain
llm = ChatOpenAI(model="gpt-4o-mini", callbacks=[copilot])
result = llm.invoke("What is Python?")

# Get metrics
print(f"Cost: ${copilot.cost:.4f}")
print(f"Tokens: {copilot.tokens}")
print(f"Remaining: ${copilot.get_remaining_budget():.2f}")
That's it! You're now tracking LLM costs with budget enforcement.

Usage Patterns

Choose the pattern that fits your coding style:

1. Minimal Pattern (Simplest)

Perfect for getting started or simple scripts:

from token_copilot import TokenCoPilot
from langchain_openai import ChatOpenAI

copilot = TokenCoPilot(budget_limit=10.00)
llm = ChatOpenAI(callbacks=[copilot])

result = llm.invoke("Hello!")
print(f"Cost: ${copilot.cost:.4f}")

2. Builder Pattern (Fluent API)

Chain methods to add features explicitly:

from token_copilot import TokenCoPilot

copilot = (TokenCoPilot(budget_limit=100.00)
    .with_streaming(webhook_url="https://example.com/webhook")
    .with_analytics(detect_anomalies=True)
    .with_adaptive()
    .build()
)

llm = ChatOpenAI(callbacks=[copilot])
result = llm.invoke("Complex task...")

3. Factory Pattern (Presets)

Use pre-configured setups for common scenarios:

from token_copilot.presets import basic, development, production, enterprise

# Basic - just cost tracking
copilot = basic(budget_limit=10.00)

# Development - with logging and anomaly detection
copilot = development(budget_limit=50.00, detect_anomalies=True)

# Production - monitoring and alerts
copilot = production(
    budget_limit=1000.00,
    webhook_url="https://monitoring.example.com",
    slack_webhook="https://hooks.slack.com/...",
    enable_forecasting=True
)

# Enterprise - all features enabled
copilot = enterprise(
    budget_limit=10000.00,
    kafka_brokers=["kafka1:9092"],
    enable_all=True
)

4. Context Managers (Pythonic)

Scoped tracking with automatic cleanup:

from token_copilot import track_costs, with_budget, monitored

# General tracking
with track_costs(budget_limit=5.00) as copilot:
    llm = ChatOpenAI(callbacks=[copilot])
    result = llm.invoke("Hello!")
    print(f"Cost: ${copilot.cost:.4f}")
# Automatic summary on exit

# Budget-focused
with with_budget(limit=10.00, warn_at=0.8) as budget:
    llm = ChatOpenAI(callbacks=[budget])
    for task in tasks:
        if budget.get_remaining_budget() > 0:
            result = llm.invoke(task)

# Monitored operations
with monitored(name="data_processing", budget_limit=10.00) as copilot:
    llm = ChatOpenAI(callbacks=[copilot])
    for doc in documents:
        result = llm.invoke(f"Process: {doc}")
# Logs: "Operation [data_processing]: Cost=$X, Tokens=Y"

5. Decorators (Reusable)

Function-level tracking for reusable code:

from token_copilot.decorators import track_cost, enforce_budget, monitored

# Track cost decorator
@track_cost(budget_limit=5.00)
def summarize_text(text):
    llm = ChatOpenAI(callbacks=[summarize_text.copilot])
    return llm.invoke(f"Summarize: {text}")

result = summarize_text("Long text...")
print(f"Cost: ${summarize_text.copilot.cost:.4f}")

# Enforce budget decorator
@enforce_budget(limit=1.00, on_exceeded="raise")
def expensive_task(copilot):
    llm = ChatOpenAI(callbacks=[copilot])
    return llm.invoke("Expensive operation...")

# Monitored decorator
@monitored(name="analysis", budget_limit=10.00)
def analyze_document(doc, copilot):
    llm = ChatOpenAI(callbacks=[copilot])
    return llm.invoke(f"Analyze: {doc}")

result = analyze_document("My document")
# Automatically logs cost and tokens
Which pattern to use?
• Getting started: Minimal or Factory presets
• Production: Builder or Production preset
• Reusable code: Decorators or Context managers
• Enterprise: Enterprise preset with custom config

Core Features

Basic Cost Tracking

Track costs and tokens automatically:

from token_copilot import TokenCoPilot
from langchain_openai import ChatOpenAI

copilot = TokenCoPilot()
llm = ChatOpenAI(model="gpt-4o-mini", callbacks=[copilot])

# Make calls
result1 = llm.invoke("What is AI?")
result2 = llm.invoke("Explain machine learning")

# Get totals
print(f"Total Cost: ${copilot.cost:.4f}")
print(f"Total Tokens: {copilot.tokens:,}")

# Get statistics
stats = copilot.get_stats()
print(f"Total Calls: {stats['total_calls']}")
print(f"Average Cost: ${stats['avg_cost_per_call']:.4f}")
print(f"Average Tokens: {stats['avg_tokens_per_call']:.1f}")

Budget Enforcement

Automatically enforce spending limits:

from token_copilot import TokenCoPilot, BudgetExceededError

# Strict enforcement (raises exception)
copilot = TokenCoPilot(
    budget_limit=1.00,
    on_budget_exceeded="raise"  # Options: "raise", "warn", "ignore"
)

llm = ChatOpenAI(callbacks=[copilot])

try:
    result = llm.invoke("Long expensive task...")
except BudgetExceededError as e:
    print(f"Budget limit reached: {e}")

# Check remaining budget
remaining = copilot.get_remaining_budget()
print(f"Remaining: ${remaining:.2f}")

# Budget periods
copilot = TokenCoPilot(
    budget_limit=100.00,
    budget_period="daily"  # Options: "total", "daily", "monthly", "per_user", "per_org"
)

Multi-Tenant Tracking

Track costs by user, organization, session, or any dimension:

copilot = TokenCoPilot(budget_limit=100.00)
llm = ChatOpenAI(callbacks=[copilot])

# Track per user
result = llm.invoke(
    "Hello",
    config={
        "metadata": {
            "user_id": "user_123",
            "org_id": "org_456",
            "session_id": "session_789",
            "feature": "chat",
            "environment": "prod"
        }
    }
)

# Get costs by dimension
user_costs = copilot.tracker.get_costs_by("user_id")
org_costs = copilot.tracker.get_costs_by("org_id")
feature_costs = copilot.tracker.get_costs_by("feature")

print(f"User user_123 cost: ${user_costs['user_123']:.4f}")
print(f"Org org_456 cost: ${org_costs['org_456']:.4f}")

# Per-user budgets
copilot = TokenCoPilot(
    budget_limit=10.00,
    budget_period="per_user"
)

# Check user-specific budget
user_remaining = copilot.get_remaining_budget(
    metadata={"user_id": "user_123"}
)

DataFrame Export

Export to pandas for advanced analytics:

import pandas as pd
from token_copilot import TokenCoPilot

copilot = TokenCoPilot()
llm = ChatOpenAI(callbacks=[copilot])

# Make some calls...
for i in range(100):
    result = llm.invoke(f"Task {i}")

# Export to DataFrame
df = copilot.to_dataframe()

# Available columns:
# - timestamp (index)
# - model, input_tokens, output_tokens, total_tokens
# - cost, user_id, org_id, session_id
# - feature, endpoint, environment
# - any custom tags from metadata

# Analytics examples
print(df.head())
print(df.describe())

# Group by user
user_costs = df.groupby('user_id')['cost'].sum()
print(user_costs)

# Group by model
model_costs = df.groupby('model').agg({
    'cost': 'sum',
    'total_tokens': 'sum'
})
print(model_costs)

# Filter by date
today = df[df.index.date == pd.Timestamp.today().date()]

# Time series analysis
hourly = df.resample('H')['cost'].sum()

# Save to CSV
df.to_csv('llm_costs.csv')

# Save to Excel
df.to_excel('llm_costs.xlsx')

Plugins

Extend functionality with plugins:

Persistence Plugin

Save cost data to database for historical analysis:

from token_copilot import TokenCoPilot
from token_copilot.plugins import SQLiteBackend, JSONBackend

# SQLite Backend (recommended for production)
backend = SQLiteBackend(db_path="costs.db")
copilot = (TokenCoPilot(budget_limit=100.00)
    .with_persistence(backend=backend, session_id="session_123")
)

# JSON Backend (simple file-based)
backend = JSONBackend(file_path="costs.json")
copilot = (TokenCoPilot(budget_limit=100.00)
    .with_persistence(backend=backend)
)

# Use normally - costs automatically saved
llm = ChatOpenAI(callbacks=[copilot])
response = llm.invoke("Hello!")

# Query historical data
plugin = copilot._plugin_manager.get_plugins()[0]

# Get summary
summary = plugin.get_summary()
print(f"Total cost: ${summary['total_cost']:.2f}")
print(f"Total calls: {summary['total_calls']}")

# Get events from last 24 hours
from datetime import datetime, timedelta
start = datetime.now() - timedelta(days=1)
events = plugin.get_events(start_time=start)

# Filter by session
events = plugin.get_events(session_id="session_123")

# Close backend
backend.close()

Custom Backend

from token_copilot.plugins.persistence import PersistenceBackend

class RedisBackend(PersistenceBackend):
    def __init__(self, redis_client):
        self.redis = redis_client

    def save_event(self, event):
        # Save to Redis
        pass

    def get_events(self, **kwargs):
        # Retrieve from Redis
        pass

    def get_summary(self):
        # Calculate summary
        pass

Analytics Plugin

Detect waste, anomalies, and inefficiencies:

from token_copilot.plugins import AnalyticsPlugin
from token_copilot.analytics import log_alert, slack_alert, webhook_alert

copilot = TokenCoPilot(budget_limit=100.00)

# Add analytics
copilot.add_plugin(AnalyticsPlugin(
    detect_anomalies=True,
    anomaly_sensitivity=3.0,  # Standard deviations
    alert_handlers=[log_alert, slack_alert],
    track_waste=True,
    track_efficiency=True
))

# Or use builder
copilot = (TokenCoPilot(budget_limit=100.00)
    .with_analytics(
        detect_anomalies=True,
        alert_handlers=[log_alert]
    )
)

# Get analytics plugin
from token_copilot.plugins.analytics import AnalyticsPlugin
analytics = copilot._plugin_manager.get_plugins(AnalyticsPlugin)[0]

# Waste analysis
waste_report = analytics.analyze_waste()
print(f"Total waste: ${waste_report['summary']['total_waste_cost']:.2f}")
print(f"Waste percentage: {waste_report['summary']['waste_percentage']:.1f}%")
print(f"Monthly savings potential: ${waste_report['summary']['monthly_savings']:.2f}")

for recommendation in waste_report['recommendations']:
    print(f"- {recommendation}")

# Efficiency scoring
efficiency = analytics.get_efficiency_score("user_id", "user_123")
print(f"Overall score: {efficiency.overall_score:.2f}")
print(f"Token efficiency: {efficiency.token_efficiency:.2f}")
print(f"Cost efficiency: {efficiency.cost_efficiency:.2f}")

# Get anomalies
anomalies = analytics.get_anomalies(minutes=60, min_severity='medium')
for anomaly in anomalies:
    print(f"[{anomaly.severity}] {anomaly.message}")

# Leaderboard
leaderboard = analytics.get_leaderboard('user_id', top_n=10)
for entry in leaderboard:
    print(f"{entry['rank']}. {entry['entity_id']}: {entry['overall_score']:.2f}")

Custom Alert Handlers

def custom_alert(anomaly):
    """Custom alert handler."""
    print(f"ALERT: {anomaly.message}")
    # Send to your monitoring system
    # Post to Slack
    # Send email
    # etc.

copilot.with_analytics(alert_handlers=[custom_alert])

Routing Plugin

Intelligent model selection based on cost and quality:

from token_copilot.plugins import RoutingPlugin
from token_copilot import ModelConfig

# Define available models
models = [
    ModelConfig(
        model_id="gpt-4o-mini",
        provider="openai",
        context_window=128000,
        max_output_tokens=4096,
        input_cost_per_1m=0.15,
        output_cost_per_1m=0.60
    ),
    ModelConfig(
        model_id="gpt-4o",
        provider="openai",
        context_window=128000,
        max_output_tokens=4096,
        input_cost_per_1m=5.0,
        output_cost_per_1m=15.0
    )
]

copilot = (TokenCoPilot(budget_limit=100.00)
    .with_routing(
        models=models,
        strategy="balanced"  # Options: "cheapest_first", "quality_first", "balanced"
    )
)

# Get routing plugin
from token_copilot.plugins.routing import RoutingPlugin
routing = copilot._plugin_manager.get_plugins(RoutingPlugin)[0]

# Get routing suggestion
decision = routing.suggest_model(
    prompt="Simple greeting",
    estimated_tokens=100
)
print(f"Selected model: {decision.selected_model}")
print(f"Estimated cost: ${decision.estimated_cost:.4f}")
print(f"Reason: {decision.reason}")

# Use suggested model
llm = ChatOpenAI(model=decision.selected_model, callbacks=[copilot])
result = llm.invoke("Hello!")

# Model statistics
model_stats = routing.get_model_stats()
for model, stats in model_stats.items():
    print(f"{model}: {stats['calls']} calls")

Adaptive Plugin

Auto-adjust parameters based on budget:

from token_copilot.plugins import AdaptivePlugin

copilot = (TokenCoPilot(budget_limit=100.00)
    .with_adaptive()
)

# Get adaptive plugin
from token_copilot.plugins.adaptive import AdaptivePlugin
adaptive = copilot._plugin_manager.get_plugins(AdaptivePlugin)[0]

# Get current budget tier
tier_info = adaptive.get_tier_info()
print(f"Budget tier: {tier_info['tier_name']}")
print(f"Remaining: ${tier_info['remaining']:.2f}")

# Budget tiers:
# - abundant: >80% remaining (high quality, max tokens)
# - comfortable: 50-80% remaining (balanced)
# - constrained: 20-50% remaining (conservative)
# - critical: <20% remaining (minimal usage)

# Get adaptive operations
ops = adaptive.operations

# Operations automatically adjust based on budget tier
result = ops.generate(llm, "Explain quantum computing")
# Automatically adjusts max_tokens, temperature, etc.

# Context operations
with ops.budget_aware_section("expensive_op") as section:
    # Operations in this section are budget-aware
    result = llm.invoke("Complex task...")

# Gate operations
@ops.budget_gate(min_budget=1.00)
def expensive_operation():
    # Only runs if budget >= $1.00
    pass

Forecasting Plugin

Predict budget exhaustion:

from token_copilot.plugins import ForecastingPlugin

copilot = (TokenCoPilot(budget_limit=100.00)
    .with_forecasting(forecast_hours=48)
)

# Get forecasting plugin
from token_copilot.plugins.forecasting import ForecastingPlugin
forecasting = copilot._plugin_manager.get_plugins(ForecastingPlugin)[0]

# Get forecast
forecast = forecasting.get_forecast()

print(f"Current cost: ${forecast.current_cost:.4f}")
print(f"Remaining: ${forecast.remaining_budget:.2f}")
print(f"Burn rate: ${forecast.burn_rate_per_hour:.4f}/hour")

if forecast.hours_until_exhausted:
    print(f"Budget exhausts in: {forecast.hours_until_exhausted:.1f} hours")

# Projections
print(f"24h projection: ${forecast.projected_cost_24h:.2f}")
print(f"7d projection: ${forecast.projected_cost_7d:.2f}")
print(f"30d projection: ${forecast.projected_cost_30d:.2f}")

print(f"Confidence: {forecast.confidence:.2%}")
print(f"Trend: {forecast.trend}")

# Recommendations
for rec in forecast.recommendations:
    print(f"- {rec}")

Streaming Plugin

Real-time cost event streaming:

from token_copilot.plugins import StreamingPlugin

# Webhook streaming
copilot = (TokenCoPilot(budget_limit=100.00)
    .with_streaming(webhook_url="https://example.com/webhook")
)

# Kafka streaming
copilot = (TokenCoPilot(budget_limit=100.00)
    .with_streaming(
        kafka_brokers=["localhost:9092"],
        kafka_topic="llm_costs"
    )
)

# Syslog streaming
copilot = (TokenCoPilot(budget_limit=100.00)
    .with_streaming(
        syslog_host="syslog.example.com",
        syslog_port=514
    )
)

# OpenTelemetry
copilot = (TokenCoPilot(budget_limit=100.00)
    .with_streaming(
        otlp_endpoint="http://collector:4318"
    )
)

# Multiple backends
copilot = (TokenCoPilot(budget_limit=100.00)
    .with_streaming(
        webhook_url="https://example.com/webhook",
        kafka_brokers=["kafka:9092"],
        kafka_topic="costs",
        otlp_endpoint="http://collector:4318"
    )
)

# Events are automatically streamed in real-time
llm = ChatOpenAI(callbacks=[copilot])
result = llm.invoke("Hello!")  # Event streamed immediately

Framework Integrations

LangChain Integration

from token_copilot import TokenCoPilot
from langchain_openai import ChatOpenAI
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate

copilot = TokenCoPilot(budget_limit=10.00)

# With ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini", callbacks=[copilot])
result = llm.invoke("Hello!")

# With chains
template = PromptTemplate(
    input_variables=["topic"],
    template="Explain {topic} in simple terms"
)
chain = LLMChain(llm=llm, prompt=template)
result = chain.run(topic="quantum computing")

# With streaming
for chunk in llm.stream("Tell me a story"):
    print(chunk.content, end="")

print(f"\nTotal cost: ${copilot.cost:.4f}")

LlamaIndex Integration

from token_copilot.llamaindex import TokenCoPilotCallbackHandler
from llama_index.core import Settings, VectorStoreIndex, SimpleDirectoryReader
from llama_index.llms.openai import OpenAI

# Create callback handler
copilot = TokenCoPilotCallbackHandler(budget_limit=10.00)

# Configure LlamaIndex
llm = OpenAI(model="gpt-4o-mini")
Settings.llm = llm
Settings.callback_manager.add_handler(copilot)

# Use with queries
documents = SimpleDirectoryReader("data").load_data()
index = VectorStoreIndex.from_documents(documents)
query_engine = index.as_query_engine()

response = query_engine.query("What is this document about?")
print(response)

# Get costs
print(f"Cost: ${copilot.get_total_cost():.4f}")
print(f"Tokens: {copilot.get_total_tokens():,}")

Azure OpenAI Integration

Full support for Azure OpenAI with automatic cost tracking:

from token_copilot import TokenCoPilot
from langchain_openai import AzureChatOpenAI
import os

# Configure Azure OpenAI
llm = AzureChatOpenAI(
    azure_deployment="gpt-4o-mini",
    api_version="2024-02-15-preview",
    azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
    api_key=os.getenv("AZURE_OPENAI_API_KEY")
)

# Use with token-copilot
copilot = TokenCoPilot(budget_limit=10.00)
response = llm.invoke("Hello!", config={"callbacks": [copilot]})

print(f"Cost: ${copilot.cost:.6f}")
print(f"Tokens: {copilot.tokens:,}")

Environment Setup

Create a .env file:

AZURE_OPENAI_API_KEY=your-api-key
AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/
AZURE_OPENAI_API_VERSION=2024-02-15-preview
AZURE_OPENAI_CHAT_DEPLOYMENT=gpt-4o-mini

Supported Models

  • gpt-4o-mini (all versions)
  • gpt-4o (all versions)
  • gpt-4-turbo (all versions)
  • gpt-3.5-turbo (all versions)

API Reference

TokenCoPilot Class

class TokenCoPilot(
    budget_limit: Optional[float] = None,
    budget_period: str = "total",
    on_budget_exceeded: str = "raise"
)

Parameters

Parameter Type Default Description
budget_limit float None Budget limit in USD
budget_period str "total" "total", "daily", "monthly", "per_user", "per_org"
on_budget_exceeded str "raise" "raise", "warn", "ignore"

Properties

Property Type Description
cost float Total cost in USD
tokens int Total tokens used
budget_limit float Budget limit

Methods

Method Returns Description
get_total_cost() float Get total cost
get_total_tokens() int Get total tokens
get_stats() dict Get summary statistics
get_remaining_budget(metadata=None) float Get remaining budget
to_dataframe() DataFrame Export to pandas DataFrame
add_plugin(plugin) None Add a plugin
with_streaming(**kwargs) Self Add streaming plugin
with_analytics(**kwargs) Self Add analytics plugin
with_routing(**kwargs) Self Add routing plugin
with_adaptive() Self Add adaptive plugin
with_forecasting(**kwargs) Self Add forecasting plugin
with_persistence(**kwargs) Self Add persistence plugin
build() Self Finalize builder (optional)

Pricing Functions

from token_copilot import (
    get_model_config,
    calculate_cost,
    list_models,
    list_providers
)

# Get model configuration
config = get_model_config("gpt-4o-mini")
print(config.input_cost_per_1m)  # Cost per 1M input tokens
print(config.output_cost_per_1m)  # Cost per 1M output tokens

# Calculate cost
cost = calculate_cost("gpt-4o-mini", input_tokens=1000, output_tokens=500)

# List all models
models = list_models()  # Returns list of model IDs

# List providers
providers = list_providers()  # Returns ["openai", "anthropic", "ollama"]

Context Managers

from token_copilot import track_costs, with_budget, monitored

# track_costs
with track_costs(budget_limit=5.00) as copilot:
    # Use copilot
    pass

# with_budget
with with_budget(limit=10.00, warn_at=0.8) as budget:
    # Use budget
    pass

# monitored
with monitored(name="operation", budget_limit=10.00) as copilot:
    # Use copilot
    pass

Decorators

from token_copilot.decorators import track_cost, enforce_budget, monitored

@track_cost(budget_limit=5.00)
def my_function(text):
    # Function has .copilot attribute
    pass

@enforce_budget(limit=1.00, on_exceeded="raise")
def expensive_function(copilot):
    # copilot passed as argument
    pass

@monitored(name="task", budget_limit=10.00)
def monitored_function(data, copilot):
    # Automatically logged
    pass

Complete Examples

Example 1: Simple Chatbot

from token_copilot import TokenCoPilot
from langchain_openai import ChatOpenAI

def chatbot():
    copilot = TokenCoPilot(budget_limit=5.00, on_budget_exceeded="warn")
    llm = ChatOpenAI(model="gpt-4o-mini", callbacks=[copilot])

    print("Chatbot started! (type 'quit' to exit)")

    while True:
        user_input = input("\nYou: ")
        if user_input.lower() == 'quit':
            break

        # Check budget
        if copilot.get_remaining_budget() <= 0:
            print("Budget exhausted!")
            break

        response = llm.invoke(user_input)
        print(f"Bot: {response.content}")
        print(f"Cost this turn: ${copilot.tracker.get_last_cost():.6f}")

    # Final stats
    stats = copilot.get_stats()
    print(f"\nSession Summary:")
    print(f"Total turns: {stats['total_calls']}")
    print(f"Total cost: ${stats['total_cost']:.4f}")
    print(f"Average cost/turn: ${stats['avg_cost_per_call']:.4f}")

if __name__ == "__main__":
    chatbot()

Example 2: Multi-User API

from flask import Flask, request, jsonify
from token_copilot import TokenCoPilot
from langchain_openai import ChatOpenAI

app = Flask(__name__)

# Global copilot with per-user budgets
copilot = TokenCoPilot(
    budget_limit=10.00,
    budget_period="per_user",
    on_budget_exceeded="raise"
)

llm = ChatOpenAI(model="gpt-4o-mini", callbacks=[copilot])

@app.route('/chat', methods=['POST'])
def chat():
    data = request.json
    user_id = data.get('user_id')
    message = data.get('message')

    # Check user's remaining budget
    remaining = copilot.get_remaining_budget(metadata={"user_id": user_id})
    if remaining <= 0:
        return jsonify({"error": "Budget exhausted"}), 429

    # Process request
    try:
        response = llm.invoke(
            message,
            config={"metadata": {"user_id": user_id}}
        )

        return jsonify({
            "response": response.content,
            "cost": copilot.tracker.get_last_cost(),
            "remaining_budget": copilot.get_remaining_budget(
                metadata={"user_id": user_id}
            )
        })
    except Exception as e:
        return jsonify({"error": str(e)}), 500

@app.route('/stats/', methods=['GET'])
def get_stats(user_id):
    user_costs = copilot.tracker.get_costs_by("user_id")
    return jsonify({
        "user_id": user_id,
        "total_cost": user_costs.get(user_id, 0.0),
        "remaining_budget": copilot.get_remaining_budget(
            metadata={"user_id": user_id}
        )
    })

if __name__ == '__main__':
    app.run(debug=True)

Example 3: Production Monitoring

from token_copilot.presets import production
from langchain_openai import ChatOpenAI
import os

# Production setup with all monitoring
copilot = production(
    budget_limit=1000.00,
    webhook_url=os.getenv("WEBHOOK_URL"),
    slack_webhook=os.getenv("SLACK_WEBHOOK"),
    detect_anomalies=True,
    enable_forecasting=True
)

llm = ChatOpenAI(model="gpt-4o-mini", callbacks=[copilot])

# Your application logic
def process_request(user_input):
    response = llm.invoke(user_input)
    return response.content

# Monitoring happens automatically:
# - Costs tracked
# - Anomalies detected
# - Alerts sent to Slack/webhook
# - Budget forecasted
# - All data exportable

# Periodic reporting
def generate_report():
    df = copilot.to_dataframe()

    # Daily summary
    print("Daily Summary:")
    print(f"Total cost: ${df['cost'].sum():.2f}")
    print(f"Total calls: {len(df)}")
    print(f"Average cost: ${df['cost'].mean():.4f}")

    # Top users
    user_costs = df.groupby('user_id')['cost'].sum().sort_values(ascending=False)
    print("\nTop 10 users by cost:")
    print(user_costs.head(10))

    # Save report
    df.to_csv(f'daily_report_{datetime.now().date()}.csv')

Example 4: RAG Application

from token_copilot import TokenCoPilot, track_costs
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.llms.openai import OpenAI
from llama_index.core import Settings

def build_rag_system():
    # Load documents
    documents = SimpleDirectoryReader("docs").load_data()

    # Configure with cost tracking
    copilot = TokenCoPilot(budget_limit=20.00)
    llm = OpenAI(model="gpt-4o-mini")

    # Note: LlamaIndex integration via TokenCoPilotCallbackHandler
    from token_copilot.llamaindex import TokenCoPilotCallbackHandler
    handler = TokenCoPilotCallbackHandler(budget_limit=20.00)

    Settings.llm = llm
    Settings.callback_manager.add_handler(handler)

    # Build index (costs tracked)
    print("Building index...")
    index = VectorStoreIndex.from_documents(documents)
    print(f"Index cost: ${handler.get_total_cost():.4f}")

    # Create query engine
    query_engine = index.as_query_engine()

    # Query with cost tracking
    def query(question):
        initial_cost = handler.get_total_cost()
        response = query_engine.query(question)
        query_cost = handler.get_total_cost() - initial_cost

        return {
            "answer": response.response,
            "cost": query_cost,
            "remaining": handler.get_remaining_budget()
        }

    return query

# Use the system
query_fn = build_rag_system()

questions = [
    "What is the main topic?",
    "Summarize the key points",
    "What are the recommendations?"
]

for q in questions:
    result = query_fn(q)
    print(f"\nQ: {q}")
    print(f"A: {result['answer']}")
    print(f"Cost: ${result['cost']:.4f}")
    print(f"Remaining: ${result['remaining']:.2f}")

Frequently Asked Questions

Does this work with streaming responses?

Currently tracks costs after completion. Full streaming support coming in v1.0.3.

Can I use without LangChain?

Yes! Use MultiTenantTracker directly:

from token_copilot.tracking import MultiTenantTracker

tracker = MultiTenantTracker()
entry = tracker.track(
    model="gpt-4o-mini",
    input_tokens=100,
    output_tokens=50
)
print(f"Cost: ${entry.cost:.6f}")

Which usage pattern should I use?

  • Getting started: Minimal or Factory presets
  • Production: Builder or Production preset
  • Reusable code: Decorators or Context managers
  • Enterprise: Enterprise preset

Can I create custom plugins?

Yes! Extend the Plugin base class:

from token_copilot.core import Plugin

class MyPlugin(Plugin):
    def on_cost_tracked(self, model, tokens, cost, metadata):
        # Custom logic
        print(f"Cost tracked: ${cost:.6f}")

copilot = TokenCoPilot()
copilot.add_plugin(MyPlugin())

How accurate is the cost tracking?

Uses official pricing from OpenAI and Anthropic. Updated regularly. Accuracy depends on correct model identification.

Does it support other LLM providers?

Yes! Supports:

  • OpenAI (including Azure OpenAI)
  • Anthropic (Claude models)
  • Ollama (local models)

Can I track costs for multiple projects?

Yes! Use metadata to separate projects:

copilot = TokenCoPilot()
llm = ChatOpenAI(callbacks=[copilot])

result = llm.invoke(
    "Hello",
    config={"metadata": {"project_id": "project_123"}}
)

# Get costs by project
project_costs = copilot.tracker.get_costs_by("project_id")

How do I handle budget resets?

For daily/monthly budgets, implement a scheduler:

import schedule
import time

def reset_budget():
    copilot.tracker.clear()
    print("Budget reset!")

# Reset daily at midnight
schedule.every().day.at("00:00").do(reset_budget)

while True:
    schedule.run_pending()
    time.sleep(60)

Can I export to formats other than CSV?

Yes! Pandas DataFrame supports many formats:

df = copilot.to_dataframe()

# Excel
df.to_excel('costs.xlsx')

# JSON
df.to_json('costs.json')

# Parquet
df.to_parquet('costs.parquet')

# SQL database
df.to_sql('costs', con=engine)

How do I get support?

Token-Copilot v1.0.2

Built with ❤️ by Sai Kumar Yava

GitHub | PyPI | MIT License