LOCAL PREVIEW View on GitHub

Token Efficiency Architecture for GenAI Applications

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Certification Task Skill This File
AWS AIP-C01 Task 4.1 — Optimize cost and performance of FM applications Skill 4.1.1 — Design token efficiency systems for FM-powered applications Token lifecycle, budget management, compression techniques, context pruning, architecture diagrams, production code

Skill scope: Design and implement end-to-end token efficiency systems that minimize cost while preserving answer quality — covering estimation, tracking, optimization, and monitoring across every layer of the MangaAssist request pipeline.


Mind Map — Token Efficiency Dimensions

mindmap
  root((Token<br/>Efficiency))
    Estimation
      tiktoken Pre-Counting
        Input token estimation
        Output budget reservation
      Per-Intent Budgets
        product_search — 1200 in / 400 out
        order_status — 600 in / 200 out
        recommendation — 2000 in / 600 out
        manga_qa — 1500 in / 500 out
      Model Selection Gate
        Haiku for simple intents
        Sonnet for complex reasoning
    Tracking
      Per-Model Metrics
        Sonnet input & output
        Haiku input & output
        Cost attribution
      Per-Intent Metrics
        Token histogram per intent
        Cost per intent per day
        Budget utilization ratio
      Per-Session Metrics
        Cumulative tokens per session
        Session cost running total
        Multi-turn growth rate
      Bedrock Invocation Logs
        Actual vs estimated delta
        CloudWatch Metrics
        Cost anomaly alerting
    Optimization
      Prompt Compression
        Remove redundant instructions
        Dynamic few-shot selection
        Template variable injection
        LLMLingua-style pruning
      Context Pruning
        RAG chunk relevance scoring
        Conversation history trimming
        Recency-weighted retention
      Context Window Management
        Sliding window for multi-turn
        Summarization of older turns
        Max context budget per intent
      Response Size Controls
        max_tokens per intent
        Output ceiling enforcement
        Early stopping on high confidence
    Monitoring
      Real-Time Dashboards
        Token velocity per second
        Cost burn rate
        Budget headroom gauge
      Alerts
        Per-request token spike
        Hourly budget breach
        Estimation drift alarm
      Drift Detection
        Prompt version token delta
        Model behavior shift
        Compression ratio degradation

Token Lifecycle in MangaAssist

Every user message flows through four phases of token management before and after hitting Bedrock. This is not optional overhead — it is the cost control backbone.

flowchart LR
    subgraph Phase1["1. Estimation"]
        A[User message arrives] --> B[tiktoken pre-count input]
        B --> C[Intent classification]
        C --> D[Look up intent token budget]
        D --> E{Budget available?}
        E -->|Yes| F[Reserve token budget]
        E -->|No| G[Downgrade model or compress]
    end

    subgraph Phase2["2. Optimization"]
        F --> H[Assemble prompt]
        G --> H
        H --> I[Compress prompt]
        I --> J[Prune RAG context]
        J --> K[Trim conversation history]
        K --> L[Set max_tokens for response]
    end

    subgraph Phase3["3. Invocation"]
        L --> M[Call Bedrock with optimized prompt]
        M --> N[Stream response with token counter]
        N --> O{Token ceiling hit?}
        O -->|Yes| P[Graceful stop + append notice]
        O -->|No| Q[Complete response]
    end

    subgraph Phase4["4. Tracking"]
        P --> R[Record actual tokens consumed]
        Q --> R
        R --> S[Update session running total]
        S --> T[Emit CloudWatch metrics]
        T --> U[Check budget alerts]
        U --> V[Update estimation calibration]
    end

    style Phase1 fill:#e8f5e9,stroke:#2e7d32
    style Phase2 fill:#e3f2fd,stroke:#1565c0
    style Phase3 fill:#fff3e0,stroke:#e65100
    style Phase4 fill:#f3e5f5,stroke:#6a1b9a

Token Estimation Framework

Why Estimate Before Calling Bedrock?

Without pre-call estimation, you discover cost overruns after the money is spent. With MangaAssist handling 1M messages/day, even a 10% overrun on a Sonnet call means:

Scenario Daily Token Overrun Extra Daily Cost
10% overrun on Sonnet input 100K msgs x 200 extra tokens = 20M tokens 20M / 1M x $3 = $60/day
10% overrun on Sonnet output 100K msgs x 80 extra tokens = 8M tokens 8M / 1M x $15 = $120/day
Combined input + output $180/day = $5,400/month

That is why estimation is the very first step.

tiktoken-Based Estimation

Claude 3 uses a tokenizer compatible with tiktoken's cl100k_base encoding. We count tokens before the API call to enforce budgets.

import tiktoken
from dataclasses import dataclass, field
from typing import Dict, Optional
from enum import Enum


class MangaIntent(Enum):
    PRODUCT_SEARCH = "product_search"
    ORDER_STATUS = "order_status"
    RECOMMENDATION = "recommendation"
    MANGA_QA = "manga_qa"
    CHITCHAT = "chitchat"


@dataclass
class TokenBudget:
    """Token budget for a single intent."""
    max_input_tokens: int
    max_output_tokens: int
    preferred_model: str  # "sonnet" or "haiku"
    fallback_model: str   # model to use if budget is tight


# Per-intent token budgets — tuned from 30 days of MangaAssist production data
INTENT_BUDGETS: Dict[MangaIntent, TokenBudget] = {
    MangaIntent.PRODUCT_SEARCH: TokenBudget(
        max_input_tokens=1200, max_output_tokens=400,
        preferred_model="sonnet", fallback_model="haiku"
    ),
    MangaIntent.ORDER_STATUS: TokenBudget(
        max_input_tokens=600, max_output_tokens=200,
        preferred_model="haiku", fallback_model="haiku"
    ),
    MangaIntent.RECOMMENDATION: TokenBudget(
        max_input_tokens=2000, max_output_tokens=600,
        preferred_model="sonnet", fallback_model="haiku"
    ),
    MangaIntent.MANGA_QA: TokenBudget(
        max_input_tokens=1500, max_output_tokens=500,
        preferred_model="sonnet", fallback_model="haiku"
    ),
    MangaIntent.CHITCHAT: TokenBudget(
        max_input_tokens=400, max_output_tokens=150,
        preferred_model="haiku", fallback_model="haiku"
    ),
}


class TokenEstimator:
    """Pre-call token estimation using tiktoken.

    MangaAssist uses this to decide model routing and prompt compression
    BEFORE spending money on a Bedrock invocation.
    """

    def __init__(self):
        # cl100k_base is the closest public tokenizer to Claude 3's tokenizer.
        # Empirical calibration shows ~5% variance which we handle with a buffer.
        self.encoding = tiktoken.get_encoding("cl100k_base")
        self.calibration_buffer = 1.05  # 5% safety margin

    def count_tokens(self, text: str) -> int:
        """Count tokens in a string with calibration buffer."""
        raw_count = len(self.encoding.encode(text))
        return int(raw_count * self.calibration_buffer)

    def estimate_prompt(self, system_prompt: str, user_message: str,
                        conversation_history: list[dict],
                        rag_context: str) -> int:
        """Estimate total input tokens for a Bedrock Claude call."""
        parts = [system_prompt, rag_context]
        for turn in conversation_history:
            parts.append(turn.get("role", ""))
            parts.append(turn.get("content", ""))
        parts.append(user_message)

        total_text = "\n".join(parts)
        return self.count_tokens(total_text)

    def check_budget(self, estimated_input: int, intent: MangaIntent
                     ) -> dict:
        """Check if estimated tokens fit the intent budget.

        Returns:
            dict with keys: within_budget, model, needs_compression,
            compression_target
        """
        budget = INTENT_BUDGETS[intent]
        if estimated_input <= budget.max_input_tokens:
            return {
                "within_budget": True,
                "model": budget.preferred_model,
                "needs_compression": False,
                "compression_target": None,
                "max_output_tokens": budget.max_output_tokens,
            }

        # Over budget — can we fit with compression?
        overage_ratio = estimated_input / budget.max_input_tokens
        if overage_ratio <= 1.5:
            # Mild overage — compress prompt to fit preferred model
            return {
                "within_budget": False,
                "model": budget.preferred_model,
                "needs_compression": True,
                "compression_target": budget.max_input_tokens,
                "max_output_tokens": budget.max_output_tokens,
            }
        else:
            # Severe overage — fall back to cheaper model + compress
            return {
                "within_budget": False,
                "model": budget.fallback_model,
                "needs_compression": True,
                "compression_target": budget.max_input_tokens,
                "max_output_tokens": budget.max_output_tokens,
            }

Token Tracking System

Per-Model, Per-Intent, Per-Session Tracking

Every Bedrock invocation produces actual token counts in the response metadata. MangaAssist captures these and emits structured metrics.

flowchart TD
    A[Bedrock Response Metadata] --> B[Extract input_tokens & output_tokens]
    B --> C[Tag with model_id]
    B --> D[Tag with intent]
    B --> E[Tag with session_id]

    C --> F[Per-Model Aggregator]
    D --> G[Per-Intent Aggregator]
    E --> H[Per-Session Accumulator]

    F --> I[CloudWatch: tokens_by_model]
    G --> J[CloudWatch: tokens_by_intent]
    H --> K[DynamoDB: session_token_total]

    I --> L[Cost Dashboard]
    J --> L
    K --> M[Session Budget Gate]

    M -->|Over session budget| N[Downgrade to Haiku]
    M -->|Within budget| O[Continue normally]

    style F fill:#c8e6c9
    style G fill:#bbdefb
    style H fill:#ffe0b2
import time
import json
import boto3
from dataclasses import dataclass, field
from typing import Dict, Optional


@dataclass
class TokenRecord:
    """A single token consumption record from one Bedrock invocation."""
    timestamp: float
    session_id: str
    intent: str
    model_id: str
    input_tokens: int
    output_tokens: int
    estimated_input_tokens: int  # what we predicted before the call
    latency_ms: float
    cost_usd: float


# Bedrock Claude 3 pricing (us-east-1, on-demand)
MODEL_PRICING = {
    "anthropic.claude-3-sonnet-20240229-v1:0": {
        "input_per_1m": 3.00,
        "output_per_1m": 15.00,
    },
    "anthropic.claude-3-haiku-20240307-v1:0": {
        "input_per_1m": 0.25,
        "output_per_1m": 1.25,
    },
}


class TokenTracker:
    """Tracks token consumption across models, intents, and sessions.

    Emits CloudWatch metrics and maintains session-level budgets in DynamoDB.
    """

    def __init__(self, cloudwatch_namespace: str = "MangaAssist/Tokens"):
        self.cw_client = boto3.client("cloudwatch")
        self.ddb_client = boto3.client("dynamodb")
        self.namespace = cloudwatch_namespace
        self.session_table = "manga_assist_sessions"

    def calculate_cost(self, model_id: str, input_tokens: int,
                       output_tokens: int) -> float:
        """Calculate USD cost for a single invocation."""
        pricing = MODEL_PRICING.get(model_id)
        if not pricing:
            return 0.0
        input_cost = (input_tokens / 1_000_000) * pricing["input_per_1m"]
        output_cost = (output_tokens / 1_000_000) * pricing["output_per_1m"]
        return round(input_cost + output_cost, 6)

    def record_invocation(self, session_id: str, intent: str,
                          model_id: str, input_tokens: int,
                          output_tokens: int, estimated_input: int,
                          latency_ms: float) -> TokenRecord:
        """Record a Bedrock invocation and emit all tracking metrics."""
        cost = self.calculate_cost(model_id, input_tokens, output_tokens)
        record = TokenRecord(
            timestamp=time.time(),
            session_id=session_id,
            intent=intent,
            model_id=model_id,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            estimated_input_tokens=estimated_input,
            latency_ms=latency_ms,
            cost_usd=cost,
        )

        # Emit metrics in parallel dimensions
        self._emit_cloudwatch_metrics(record)
        self._update_session_total(record)
        return record

    def _emit_cloudwatch_metrics(self, record: TokenRecord):
        """Emit per-model, per-intent, and estimation-drift metrics."""
        # Determine short model name for dimension
        model_short = "sonnet" if "sonnet" in record.model_id else "haiku"
        drift = record.input_tokens - record.estimated_input_tokens
        drift_pct = (drift / max(record.estimated_input_tokens, 1)) * 100

        self.cw_client.put_metric_data(
            Namespace=self.namespace,
            MetricData=[
                # Per-model input tokens
                {
                    "MetricName": "InputTokens",
                    "Value": record.input_tokens,
                    "Unit": "Count",
                    "Dimensions": [
                        {"Name": "Model", "Value": model_short},
                    ],
                },
                # Per-model output tokens
                {
                    "MetricName": "OutputTokens",
                    "Value": record.output_tokens,
                    "Unit": "Count",
                    "Dimensions": [
                        {"Name": "Model", "Value": model_short},
                    ],
                },
                # Per-intent cost
                {
                    "MetricName": "InvocationCostUSD",
                    "Value": record.cost_usd,
                    "Unit": "None",
                    "Dimensions": [
                        {"Name": "Intent", "Value": record.intent},
                        {"Name": "Model", "Value": model_short},
                    ],
                },
                # Per-intent input tokens
                {
                    "MetricName": "InputTokens",
                    "Value": record.input_tokens,
                    "Unit": "Count",
                    "Dimensions": [
                        {"Name": "Intent", "Value": record.intent},
                    ],
                },
                # Estimation drift percentage
                {
                    "MetricName": "EstimationDriftPercent",
                    "Value": drift_pct,
                    "Unit": "Percent",
                    "Dimensions": [
                        {"Name": "Intent", "Value": record.intent},
                    ],
                },
            ],
        )

    def _update_session_total(self, record: TokenRecord):
        """Atomically increment session-level token totals in DynamoDB."""
        self.ddb_client.update_item(
            TableName=self.session_table,
            Key={"session_id": {"S": record.session_id}},
            UpdateExpression=(
                "ADD total_input_tokens :inp, total_output_tokens :out, "
                "total_cost_usd :cost, invocation_count :one"
            ),
            ExpressionAttributeValues={
                ":inp": {"N": str(record.input_tokens)},
                ":out": {"N": str(record.output_tokens)},
                ":cost": {"N": str(record.cost_usd)},
                ":one": {"N": "1"},
            },
        )

    def get_session_budget_status(self, session_id: str,
                                  max_session_cost: float = 0.10
                                  ) -> dict:
        """Check if a session is within its cost budget.

        Default max_session_cost of $0.10 means a single user session
        should never exceed 10 cents. This prevents runaway multi-turn
        conversations from burning budget.
        """
        resp = self.ddb_client.get_item(
            TableName=self.session_table,
            Key={"session_id": {"S": session_id}},
            ProjectionExpression="total_cost_usd, invocation_count",
        )
        item = resp.get("Item", {})
        current_cost = float(item.get("total_cost_usd", {}).get("N", "0"))
        invocations = int(item.get("invocation_count", {}).get("N", "0"))

        return {
            "session_id": session_id,
            "current_cost_usd": current_cost,
            "max_budget_usd": max_session_cost,
            "budget_remaining_usd": max_session_cost - current_cost,
            "within_budget": current_cost < max_session_cost,
            "invocation_count": invocations,
            "recommendation": (
                "continue" if current_cost < max_session_cost * 0.8
                else "downgrade_to_haiku" if current_cost < max_session_cost
                else "reject_or_summarize_and_reset"
            ),
        }

Context Window Optimization

Multi-turn conversations are the single largest driver of token growth. MangaAssist users frequently browse for 10-20 turns before purchasing. Without context window management, a 20-turn conversation with RAG context can hit 8,000+ input tokens per call.

Sliding Window with Summarization

flowchart TD
    A[Conversation History: 15 turns] --> B{Turns > window_size?}
    B -->|No| C[Use full history as-is]
    B -->|Yes| D[Split: older turns vs recent window]
    D --> E[Summarize older turns with Haiku]
    D --> F[Keep last N turns verbatim]
    E --> G[Compact summary: ~100 tokens]
    F --> H[Recent window: ~400 tokens]
    G --> I[Combined context]
    H --> I
    I --> J[Savings: 800 tokens reduced to 500]

    style G fill:#c8e6c9
    style H fill:#bbdefb
    style J fill:#fff9c4

Max Context Budget Per Intent

Intent System Prompt RAG Context History User Message Total Input Budget max_tokens Output
product_search 200 500 300 200 1,200 400
order_status 150 0 250 200 600 200
recommendation 250 800 600 350 2,000 600
manga_qa 200 700 400 200 1,500 500
chitchat 100 0 200 100 400 150

Response Size Controls

max_tokens Configuration Per Intent

Setting max_tokens in the Bedrock API call is the single most impactful output cost control. Without it, Claude will generate until it naturally stops — often 2-3x longer than necessary for transactional queries.

Intent max_tokens Rationale
product_search 400 Product card + brief description + price. No essays.
order_status 200 Order number, status, ETA. Minimal prose.
recommendation 600 3-5 manga titles with brief justifications.
manga_qa 500 Detailed but bounded answer about manga content.
chitchat 150 Friendly short response. No long monologues.

Streaming with Token Budget Enforcement

MangaAssist uses WebSocket streaming. The orchestrator counts tokens during streaming and can terminate the stream gracefully if the output token ceiling is hit:

import asyncio
from typing import AsyncIterator


async def stream_with_budget(
    bedrock_stream: AsyncIterator[dict],
    max_output_tokens: int,
    encoding,  # tiktoken encoding
) -> AsyncIterator[str]:
    """Stream Bedrock response chunks while enforcing token budget.

    When the token budget is exhausted, appends a truncation notice and stops.
    This prevents runaway output cost while maintaining user experience.
    """
    token_count = 0
    buffer = ""

    async for event in bedrock_stream:
        chunk_text = event.get("chunk", {}).get("bytes", b"").decode("utf-8")
        if not chunk_text:
            continue

        chunk_tokens = len(encoding.encode(chunk_text))
        token_count += chunk_tokens

        if token_count >= max_output_tokens:
            # Budget exhausted — send what we have and stop
            yield chunk_text
            yield "\n\n---\n*[Response trimmed for brevity. Ask a follow-up for more details.]*"
            return

        yield chunk_text

Prompt Compression Techniques

Technique 1: Remove Redundant Instructions

System prompts often carry repeated instructions across turns. MangaAssist deduplicates these.

Before compression (380 tokens):

You are MangaAssist, a helpful assistant for a Japanese manga e-commerce store.
You help customers find manga, check order status, and make recommendations.
Always respond in a friendly, helpful manner.
Always provide prices in JPY and USD.
Always include the manga title in both Japanese and English.
If you don't know the answer, say so honestly.
Never make up information about manga titles or pricing.
Be concise and helpful.
Remember to format product information consistently.
Always check if the manga is in stock before recommending it.

After compression (180 tokens):

You are MangaAssist (JP manga e-commerce). Tasks: find manga, order status, recommendations.
Rules: friendly, prices in JPY+USD, titles in JP+EN, honest when unsure, no fabrication, concise, consistent format, verify stock.

Technique 2: Dynamic Few-Shot Selection

Instead of including 5 fixed examples in every prompt, select 1-2 examples most relevant to the current query:

import numpy as np
from typing import List, Tuple


class DynamicFewShotSelector:
    """Selects the most relevant few-shot examples for the current query.

    Instead of stuffing 5 examples into every prompt (costing ~500 tokens),
    this picks 1-2 relevant examples (saving 300+ tokens per call).
    """

    def __init__(self, examples: list[dict], embeddings: np.ndarray):
        """
        Args:
            examples: List of {"query": str, "response": str} few-shot pairs
            embeddings: Pre-computed embeddings for each example query,
                        shape (n_examples, embedding_dim)
        """
        self.examples = examples
        self.embeddings = embeddings  # (n_examples, dim)

    def select(self, query_embedding: np.ndarray, top_k: int = 2,
               min_similarity: float = 0.7) -> list[dict]:
        """Select top-k most similar examples above the similarity threshold.

        This means:
        - Simple queries may get 0 examples (saving ~250 tokens each)
        - Only highly relevant examples are included
        - Token spend on few-shots is proportional to query complexity
        """
        similarities = np.dot(self.embeddings, query_embedding)
        top_indices = np.argsort(similarities)[::-1][:top_k]

        selected = []
        for idx in top_indices:
            if similarities[idx] >= min_similarity:
                selected.append(self.examples[idx])
        return selected

Technique 3: Template Variable Injection

Replace verbose inline context with compact variable references that Claude expands:

Before (650 tokens):

The customer is looking at "One Piece Vol. 104" by Eiichiro Oda, published by Shueisha,
ISBN 978-4-08-883371-1, priced at 528 JPY (approximately 3.52 USD), currently in stock
with 47 copies available, rated 4.8/5.0 from 2,341 reviews, categorized as
Shonen/Action/Adventure, available in Japanese language, dimensions 17.6 x 11.4 x 2 cm,
weight 200g, release date 2023-03-03. The customer previously viewed volumes 100-103
and has "One Piece" in their wishlist. They also purchased "Demon Slayer complete box set"
last month. Based on their browsing history they prefer action manga with ongoing series.

After (280 tokens):

Product: {title: "One Piece Vol. 104", author: "Eiichiro Oda", price: "528 JPY / 3.52 USD", stock: 47, rating: "4.8/5 (2341)", genre: "Shonen/Action/Adventure"}
User context: {viewed: ["OP vols 100-103"], wishlist: ["One Piece"], recent_purchase: "Demon Slayer box set", preference: "action, ongoing series"}


Context Pruning

Relevance-Based Pruning of RAG Chunks

OpenSearch returns multiple chunks ranked by vector similarity. Not all are worth the token cost.

flowchart TD
    A[OpenSearch returns 8 RAG chunks] --> B[Score each chunk]
    B --> C{Score > relevance_threshold?}
    C -->|Yes| D[Include in prompt]
    C -->|No| E[Discard]
    D --> F{Cumulative tokens > RAG budget?}
    F -->|No| G[Add next chunk]
    F -->|Yes| H[Stop adding chunks]
    G --> B
    H --> I[Final RAG context: 3 chunks, 450 tokens]
    E --> J[Discarded: 5 chunks, saved 600 tokens]

    style I fill:#c8e6c9
    style J fill:#ffcdd2

Conversation History Trimming with Recency Weighting

Older turns matter less but should not vanish entirely. MangaAssist uses a recency-weighted retention strategy:

Turn Age Weight Action
Last 3 turns 1.0 Keep verbatim
Turns 4-6 0.6 Keep if contains product names, prices, or order IDs
Turns 7-10 0.3 Summarize into a single sentence per turn
Turns 11+ 0.0 Summarize entire block into 1-2 sentences

Architecture Diagram — Token Optimizer Pipeline

flowchart TB
    subgraph Ingress["API Gateway WebSocket"]
        WS[User Message]
    end

    subgraph Orchestrator["ECS Fargate — Orchestrator"]
        direction TB
        A[Request Handler] --> B[Intent Classifier]
        B --> C[TokenEstimator.estimate_prompt]
        C --> D[TokenBudgetManager.check_budget]
        D --> E{Within budget?}
        E -->|Yes| F[Prompt Assembler]
        E -->|No, mild| G[PromptCompressor.compress]
        E -->|No, severe| H[Model Downgrade + Compress]
        G --> F
        H --> F
        F --> I[ContextPruner.prune_rag_chunks]
        I --> J[HistoryManager.trim_history]
        J --> K[Set max_tokens for intent]
    end

    subgraph Cache["ElastiCache Redis"]
        L[Semantic Cache Lookup]
    end

    subgraph RAG["OpenSearch Serverless"]
        M[Vector Search + Chunk Retrieval]
    end

    subgraph FM["Bedrock Claude 3"]
        N[Sonnet or Haiku Invocation]
    end

    subgraph Tracking["Token Tracking"]
        O[TokenTracker.record_invocation]
        P[CloudWatch Metrics]
        Q[DynamoDB Session Update]
    end

    WS --> A
    A --> L
    L -->|Cache hit| R[Return cached response]
    L -->|Cache miss| B
    F --> M
    M --> I
    K --> N
    N --> O
    O --> P
    O --> Q
    N -->|Streamed response| S[stream_with_budget]
    S --> WS

    style Orchestrator fill:#e3f2fd,stroke:#1565c0
    style Cache fill:#e8f5e9,stroke:#2e7d32
    style RAG fill:#fff3e0,stroke:#e65100
    style FM fill:#fce4ec,stroke:#c62828
    style Tracking fill:#f3e5f5,stroke:#6a1b9a

TokenBudgetManager — Production Class

This is the top-level class that the orchestrator calls on every request. It coordinates estimation, budget checking, compression, and model selection.

import time
import logging
from dataclasses import dataclass
from typing import Optional

logger = logging.getLogger("mangaassist.token_budget")


@dataclass
class BudgetDecision:
    """The output of the budget manager — tells the orchestrator what to do."""
    model_id: str
    max_output_tokens: int
    needs_compression: bool
    compression_target: Optional[int]
    estimated_input_tokens: int
    estimated_cost_usd: float
    session_budget_remaining_usd: float
    warnings: list[str]


class TokenBudgetManager:
    """Central budget authority for all MangaAssist Bedrock invocations.

    Coordinates:
    1. Token estimation (tiktoken pre-count)
    2. Budget checking (per-intent limits)
    3. Model selection (Sonnet vs Haiku based on budget)
    4. Session-level cost gating
    5. Compression decisions

    This class is instantiated once per ECS task and reused across requests.
    """

    SONNET_MODEL_ID = "anthropic.claude-3-sonnet-20240229-v1:0"
    HAIKU_MODEL_ID = "anthropic.claude-3-haiku-20240307-v1:0"

    def __init__(self, estimator: "TokenEstimator",
                 tracker: "TokenTracker",
                 max_session_cost: float = 0.10):
        self.estimator = estimator
        self.tracker = tracker
        self.max_session_cost = max_session_cost

    def evaluate(self, session_id: str, intent: MangaIntent,
                 system_prompt: str, user_message: str,
                 conversation_history: list[dict],
                 rag_context: str) -> BudgetDecision:
        """Evaluate a request and return the budget decision.

        This is the single entry point the orchestrator calls before
        every Bedrock invocation.
        """
        warnings = []

        # Step 1: Estimate input tokens
        estimated_input = self.estimator.estimate_prompt(
            system_prompt, user_message, conversation_history, rag_context
        )

        # Step 2: Check intent budget
        budget_check = self.estimator.check_budget(estimated_input, intent)

        # Step 3: Resolve model ID
        model_short = budget_check["model"]
        model_id = (self.SONNET_MODEL_ID if model_short == "sonnet"
                    else self.HAIKU_MODEL_ID)

        # Step 4: Check session budget
        session_status = self.tracker.get_session_budget_status(
            session_id, self.max_session_cost
        )

        if not session_status["within_budget"]:
            # Session over budget — force Haiku and warn
            model_id = self.HAIKU_MODEL_ID
            model_short = "haiku"
            warnings.append(
                f"Session budget exhausted "
                f"(${session_status['current_cost_usd']:.4f} / "
                f"${self.max_session_cost}). Downgraded to Haiku."
            )
        elif session_status["recommendation"] == "downgrade_to_haiku":
            # Session approaching limit — preemptive downgrade
            model_id = self.HAIKU_MODEL_ID
            model_short = "haiku"
            warnings.append("Session approaching budget limit. Using Haiku.")

        # Step 5: Estimate cost
        max_output = budget_check["max_output_tokens"]
        pricing = MODEL_PRICING[model_id]
        estimated_cost = (
            (estimated_input / 1_000_000) * pricing["input_per_1m"] +
            (max_output / 1_000_000) * pricing["output_per_1m"]
        )

        if not budget_check["within_budget"]:
            warnings.append(
                f"Input tokens ({estimated_input}) exceed intent budget "
                f"({INTENT_BUDGETS[intent].max_input_tokens}). "
                f"Compression required."
            )

        logger.info(
            "Budget decision",
            extra={
                "session_id": session_id,
                "intent": intent.value,
                "estimated_input_tokens": estimated_input,
                "model": model_short,
                "needs_compression": budget_check["needs_compression"],
                "estimated_cost_usd": estimated_cost,
                "session_remaining_usd": session_status["budget_remaining_usd"],
            },
        )

        return BudgetDecision(
            model_id=model_id,
            max_output_tokens=max_output,
            needs_compression=budget_check["needs_compression"],
            compression_target=budget_check["compression_target"],
            estimated_input_tokens=estimated_input,
            estimated_cost_usd=estimated_cost,
            session_budget_remaining_usd=session_status["budget_remaining_usd"],
            warnings=warnings,
        )

PromptCompressor — Production Class

import re
from dataclasses import dataclass
from typing import Optional


@dataclass
class CompressionResult:
    """Output of prompt compression."""
    original_tokens: int
    compressed_tokens: int
    compression_ratio: float
    compressed_text: str
    techniques_applied: list[str]


class PromptCompressor:
    """Compresses prompts to fit within token budgets.

    Applies a cascade of compression techniques from cheapest (rule-based)
    to most expensive (LLM-based summarization). Stops as soon as the
    target token count is reached.

    Designed for MangaAssist where prompts contain:
    - System instructions (compressible with deduplication)
    - RAG manga descriptions (compressible with extraction)
    - Conversation history (compressible with summarization)
    - User message (never compressed — sacred text)
    """

    def __init__(self, estimator: "TokenEstimator"):
        self.estimator = estimator

    def compress(self, system_prompt: str, rag_context: str,
                 conversation_history: list[dict],
                 target_tokens: int) -> CompressionResult:
        """Apply cascading compression to meet target token count.

        Compression cascade (applied in order, stops when target is met):
        1. Whitespace and formatting cleanup (free)
        2. Instruction deduplication (free)
        3. RAG chunk truncation (cheap)
        4. History summarization (moderate — uses Haiku)
        5. Few-shot example removal (moderate — quality impact)
        """
        techniques_applied = []
        original_tokens = self.estimator.count_tokens(
            system_prompt + rag_context +
            " ".join(t.get("content", "") for t in conversation_history)
        )

        # Technique 1: Whitespace cleanup
        system_prompt = self._clean_whitespace(system_prompt)
        rag_context = self._clean_whitespace(rag_context)
        techniques_applied.append("whitespace_cleanup")

        current = self._count_total(system_prompt, rag_context,
                                    conversation_history)
        if current <= target_tokens:
            return self._result(original_tokens, current, system_prompt,
                                rag_context, conversation_history,
                                techniques_applied)

        # Technique 2: Instruction deduplication
        system_prompt = self._deduplicate_instructions(system_prompt)
        techniques_applied.append("instruction_dedup")

        current = self._count_total(system_prompt, rag_context,
                                    conversation_history)
        if current <= target_tokens:
            return self._result(original_tokens, current, system_prompt,
                                rag_context, conversation_history,
                                techniques_applied)

        # Technique 3: RAG chunk truncation
        rag_context = self._truncate_rag(rag_context, target_tokens // 3)
        techniques_applied.append("rag_truncation")

        current = self._count_total(system_prompt, rag_context,
                                    conversation_history)
        if current <= target_tokens:
            return self._result(original_tokens, current, system_prompt,
                                rag_context, conversation_history,
                                techniques_applied)

        # Technique 4: History summarization
        conversation_history = self._summarize_old_turns(
            conversation_history, keep_recent=3
        )
        techniques_applied.append("history_summarization")

        current = self._count_total(system_prompt, rag_context,
                                    conversation_history)
        return self._result(original_tokens, current, system_prompt,
                            rag_context, conversation_history,
                            techniques_applied)

    def _clean_whitespace(self, text: str) -> str:
        """Remove excessive whitespace, blank lines, and trailing spaces."""
        text = re.sub(r'\n{3,}', '\n\n', text)
        text = re.sub(r'[ \t]+', ' ', text)
        text = re.sub(r' +\n', '\n', text)
        return text.strip()

    def _deduplicate_instructions(self, system_prompt: str) -> str:
        """Remove semantically duplicate instruction lines.

        Common in MangaAssist prompts:
        - 'Be helpful' + 'Respond in a helpful manner' -> keep one
        - 'Always provide prices' + 'Include pricing' -> keep one
        """
        lines = system_prompt.split('\n')
        seen_intents = set()
        deduplicated = []

        # Simple keyword-group deduplication
        intent_keywords = {
            "helpful": {"helpful", "friendly", "assist", "polite"},
            "pricing": {"price", "pricing", "cost", "jpy", "usd"},
            "honesty": {"honest", "don't make up", "no fabrication",
                        "don't know"},
            "concise": {"concise", "brief", "short", "succinct"},
            "format": {"format", "consistent", "structure"},
        }

        for line in lines:
            line_lower = line.lower()
            matched_intent = None
            for intent, keywords in intent_keywords.items():
                if any(kw in line_lower for kw in keywords):
                    matched_intent = intent
                    break

            if matched_intent and matched_intent in seen_intents:
                continue  # Skip duplicate intent
            if matched_intent:
                seen_intents.add(matched_intent)
            deduplicated.append(line)

        return '\n'.join(deduplicated)

    def _truncate_rag(self, rag_context: str, max_tokens: int) -> str:
        """Truncate RAG context to fit within a token budget.

        Keeps chunks in order (highest relevance first, as returned
        by OpenSearch) and cuts when the budget is reached.
        """
        chunks = rag_context.split("\n---\n")
        result_chunks = []
        running_tokens = 0

        for chunk in chunks:
            chunk_tokens = self.estimator.count_tokens(chunk)
            if running_tokens + chunk_tokens > max_tokens:
                break
            result_chunks.append(chunk)
            running_tokens += chunk_tokens

        return "\n---\n".join(result_chunks)

    def _summarize_old_turns(self, history: list[dict],
                             keep_recent: int = 3) -> list[dict]:
        """Replace old conversation turns with a compact summary.

        Keeps the most recent `keep_recent` turns verbatim and replaces
        older turns with a single summary turn.
        """
        if len(history) <= keep_recent:
            return history

        old_turns = history[:-keep_recent]
        recent_turns = history[-keep_recent:]

        # Extract key entities from old turns (product names, order IDs)
        key_facts = []
        for turn in old_turns:
            content = turn.get("content", "")
            # Look for product references, order IDs, prices
            if any(kw in content.lower() for kw in
                   ["vol.", "manga", "order", "price", "recommend"]):
                # Keep a compressed version
                compressed = content[:80] + "..." if len(content) > 80 else content
                key_facts.append(compressed)

        summary_content = (
            f"[Earlier conversation summary: {len(old_turns)} turns. "
            f"Key topics: {'; '.join(key_facts[:3])}]"
        )
        summary_turn = {"role": "system", "content": summary_content}

        return [summary_turn] + recent_turns

    def _count_total(self, system_prompt: str, rag_context: str,
                     history: list[dict]) -> int:
        """Count total tokens across all prompt components."""
        total_text = system_prompt + "\n" + rag_context + "\n"
        total_text += " ".join(t.get("content", "") for t in history)
        return self.estimator.count_tokens(total_text)

    def _result(self, original: int, compressed: int,
                system_prompt: str, rag_context: str,
                history: list[dict],
                techniques: list[str]) -> CompressionResult:
        """Build the compression result."""
        combined = (
            system_prompt + "\n---\n" + rag_context + "\n---\n" +
            "\n".join(t.get("content", "") for t in history)
        )
        return CompressionResult(
            original_tokens=original,
            compressed_tokens=compressed,
            compression_ratio=compressed / max(original, 1),
            compressed_text=combined,
            techniques_applied=techniques,
        )

Comparison Table — Token Efficiency Techniques

Technique Token Savings Quality Impact Implementation Complexity MangaAssist Priority
max_tokens per intent 30-50% output reduction None (prevents over-generation) Low — config change P0 — deploy first
Model routing (Haiku for simple) 60-80% cost reduction per routed call Minimal for transactional queries Medium — needs intent classifier P0 — deploy first
Whitespace cleanup 5-10% input reduction None Low — regex rules P1 — quick win
Instruction deduplication 10-20% system prompt reduction None Low — keyword matching P1 — quick win
Dynamic few-shot selection 30-60% few-shot token reduction Slight improvement (more relevant examples) Medium — needs embeddings P1 — good ROI
RAG chunk pruning 20-40% RAG context reduction Risk of missing relevant details Medium — needs relevance scoring P2 — test carefully
Conversation history sliding window 40-70% history reduction Risk of losing context from early turns Medium — needs summarization P2 — test carefully
Template variable injection 30-50% context reduction None if model handles structured input Low — prompt rewrite P1 — quick win
LLMLingua-style compression 40-60% input reduction Risk of quality degradation for Japanese content High — needs perplexity model P3 — experimental
Streaming token budget enforcement 10-30% output reduction Risk of truncating important info Medium — needs streaming counter P2 — test carefully

Daily Cost Impact at MangaAssist Scale

Assuming 1M messages/day with the following intent distribution:

Intent % of Traffic Messages/Day Model (without optimization) Model (with optimization)
product_search 35% 350,000 Sonnet Sonnet (compressed)
order_status 25% 250,000 Sonnet Haiku
recommendation 15% 150,000 Sonnet Sonnet (compressed)
manga_qa 10% 100,000 Sonnet Sonnet (compressed)
chitchat 15% 150,000 Sonnet Haiku

Cost Comparison

Metric Without Optimization With Full Token Efficiency Savings
Daily Sonnet calls 1,000,000 400,000 60% fewer Sonnet calls
Daily Haiku calls 0 600,000 Cheaper model absorbs simple intents
Avg input tokens/call (Sonnet) 1,800 1,200 33% reduction via compression
Avg output tokens/call (Sonnet) 500 350 30% reduction via max_tokens
Daily input cost (Sonnet) $5,400 $1,440 $3,960 saved
Daily output cost (Sonnet) $7,500 $2,100 $5,400 saved
Daily input cost (Haiku) $0 $108 Added cost
Daily output cost (Haiku) $0 $131 Added cost
Total daily cost $12,900 $3,779 $9,121/day saved (71%)
Monthly cost $387,000 $113,370 $273,630/month saved

Key Takeaways

  1. Estimate before you spend: tiktoken pre-counting with a 5% calibration buffer catches budget overruns before they cost money.
  2. Track at three dimensions: per-model, per-intent, and per-session tracking gives you the granularity to diagnose cost spikes.
  3. Compress in cascade: apply cheap techniques first (whitespace, dedup) and expensive techniques (summarization) only when needed.
  4. Set max_tokens always: the single highest-ROI configuration change is setting per-intent output token limits.
  5. Route aggressively to Haiku: transactional intents (order_status, chitchat) work well on Haiku at 12x lower input cost and 12x lower output cost.
  6. Budget at the session level: a per-session cost cap ($0.10) prevents multi-turn conversations from burning runaway cost.