Token Efficiency Architecture for GenAI Applications
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Certification | Task | Skill | This File |
|---|---|---|---|
| AWS AIP-C01 | Task 4.1 — Optimize cost and performance of FM applications | Skill 4.1.1 — Design token efficiency systems for FM-powered applications | Token lifecycle, budget management, compression techniques, context pruning, architecture diagrams, production code |
Skill scope: Design and implement end-to-end token efficiency systems that minimize cost while preserving answer quality — covering estimation, tracking, optimization, and monitoring across every layer of the MangaAssist request pipeline.
Mind Map — Token Efficiency Dimensions
mindmap
root((Token<br/>Efficiency))
Estimation
tiktoken Pre-Counting
Input token estimation
Output budget reservation
Per-Intent Budgets
product_search — 1200 in / 400 out
order_status — 600 in / 200 out
recommendation — 2000 in / 600 out
manga_qa — 1500 in / 500 out
Model Selection Gate
Haiku for simple intents
Sonnet for complex reasoning
Tracking
Per-Model Metrics
Sonnet input & output
Haiku input & output
Cost attribution
Per-Intent Metrics
Token histogram per intent
Cost per intent per day
Budget utilization ratio
Per-Session Metrics
Cumulative tokens per session
Session cost running total
Multi-turn growth rate
Bedrock Invocation Logs
Actual vs estimated delta
CloudWatch Metrics
Cost anomaly alerting
Optimization
Prompt Compression
Remove redundant instructions
Dynamic few-shot selection
Template variable injection
LLMLingua-style pruning
Context Pruning
RAG chunk relevance scoring
Conversation history trimming
Recency-weighted retention
Context Window Management
Sliding window for multi-turn
Summarization of older turns
Max context budget per intent
Response Size Controls
max_tokens per intent
Output ceiling enforcement
Early stopping on high confidence
Monitoring
Real-Time Dashboards
Token velocity per second
Cost burn rate
Budget headroom gauge
Alerts
Per-request token spike
Hourly budget breach
Estimation drift alarm
Drift Detection
Prompt version token delta
Model behavior shift
Compression ratio degradation
Token Lifecycle in MangaAssist
Every user message flows through four phases of token management before and after hitting Bedrock. This is not optional overhead — it is the cost control backbone.
flowchart LR
subgraph Phase1["1. Estimation"]
A[User message arrives] --> B[tiktoken pre-count input]
B --> C[Intent classification]
C --> D[Look up intent token budget]
D --> E{Budget available?}
E -->|Yes| F[Reserve token budget]
E -->|No| G[Downgrade model or compress]
end
subgraph Phase2["2. Optimization"]
F --> H[Assemble prompt]
G --> H
H --> I[Compress prompt]
I --> J[Prune RAG context]
J --> K[Trim conversation history]
K --> L[Set max_tokens for response]
end
subgraph Phase3["3. Invocation"]
L --> M[Call Bedrock with optimized prompt]
M --> N[Stream response with token counter]
N --> O{Token ceiling hit?}
O -->|Yes| P[Graceful stop + append notice]
O -->|No| Q[Complete response]
end
subgraph Phase4["4. Tracking"]
P --> R[Record actual tokens consumed]
Q --> R
R --> S[Update session running total]
S --> T[Emit CloudWatch metrics]
T --> U[Check budget alerts]
U --> V[Update estimation calibration]
end
style Phase1 fill:#e8f5e9,stroke:#2e7d32
style Phase2 fill:#e3f2fd,stroke:#1565c0
style Phase3 fill:#fff3e0,stroke:#e65100
style Phase4 fill:#f3e5f5,stroke:#6a1b9a
Token Estimation Framework
Why Estimate Before Calling Bedrock?
Without pre-call estimation, you discover cost overruns after the money is spent. With MangaAssist handling 1M messages/day, even a 10% overrun on a Sonnet call means:
| Scenario | Daily Token Overrun | Extra Daily Cost |
|---|---|---|
| 10% overrun on Sonnet input | 100K msgs x 200 extra tokens = 20M tokens | 20M / 1M x $3 = $60/day |
| 10% overrun on Sonnet output | 100K msgs x 80 extra tokens = 8M tokens | 8M / 1M x $15 = $120/day |
| Combined input + output | — | $180/day = $5,400/month |
That is why estimation is the very first step.
tiktoken-Based Estimation
Claude 3 uses a tokenizer compatible with tiktoken's cl100k_base encoding. We count tokens before the API call to enforce budgets.
import tiktoken
from dataclasses import dataclass, field
from typing import Dict, Optional
from enum import Enum
class MangaIntent(Enum):
PRODUCT_SEARCH = "product_search"
ORDER_STATUS = "order_status"
RECOMMENDATION = "recommendation"
MANGA_QA = "manga_qa"
CHITCHAT = "chitchat"
@dataclass
class TokenBudget:
"""Token budget for a single intent."""
max_input_tokens: int
max_output_tokens: int
preferred_model: str # "sonnet" or "haiku"
fallback_model: str # model to use if budget is tight
# Per-intent token budgets — tuned from 30 days of MangaAssist production data
INTENT_BUDGETS: Dict[MangaIntent, TokenBudget] = {
MangaIntent.PRODUCT_SEARCH: TokenBudget(
max_input_tokens=1200, max_output_tokens=400,
preferred_model="sonnet", fallback_model="haiku"
),
MangaIntent.ORDER_STATUS: TokenBudget(
max_input_tokens=600, max_output_tokens=200,
preferred_model="haiku", fallback_model="haiku"
),
MangaIntent.RECOMMENDATION: TokenBudget(
max_input_tokens=2000, max_output_tokens=600,
preferred_model="sonnet", fallback_model="haiku"
),
MangaIntent.MANGA_QA: TokenBudget(
max_input_tokens=1500, max_output_tokens=500,
preferred_model="sonnet", fallback_model="haiku"
),
MangaIntent.CHITCHAT: TokenBudget(
max_input_tokens=400, max_output_tokens=150,
preferred_model="haiku", fallback_model="haiku"
),
}
class TokenEstimator:
"""Pre-call token estimation using tiktoken.
MangaAssist uses this to decide model routing and prompt compression
BEFORE spending money on a Bedrock invocation.
"""
def __init__(self):
# cl100k_base is the closest public tokenizer to Claude 3's tokenizer.
# Empirical calibration shows ~5% variance which we handle with a buffer.
self.encoding = tiktoken.get_encoding("cl100k_base")
self.calibration_buffer = 1.05 # 5% safety margin
def count_tokens(self, text: str) -> int:
"""Count tokens in a string with calibration buffer."""
raw_count = len(self.encoding.encode(text))
return int(raw_count * self.calibration_buffer)
def estimate_prompt(self, system_prompt: str, user_message: str,
conversation_history: list[dict],
rag_context: str) -> int:
"""Estimate total input tokens for a Bedrock Claude call."""
parts = [system_prompt, rag_context]
for turn in conversation_history:
parts.append(turn.get("role", ""))
parts.append(turn.get("content", ""))
parts.append(user_message)
total_text = "\n".join(parts)
return self.count_tokens(total_text)
def check_budget(self, estimated_input: int, intent: MangaIntent
) -> dict:
"""Check if estimated tokens fit the intent budget.
Returns:
dict with keys: within_budget, model, needs_compression,
compression_target
"""
budget = INTENT_BUDGETS[intent]
if estimated_input <= budget.max_input_tokens:
return {
"within_budget": True,
"model": budget.preferred_model,
"needs_compression": False,
"compression_target": None,
"max_output_tokens": budget.max_output_tokens,
}
# Over budget — can we fit with compression?
overage_ratio = estimated_input / budget.max_input_tokens
if overage_ratio <= 1.5:
# Mild overage — compress prompt to fit preferred model
return {
"within_budget": False,
"model": budget.preferred_model,
"needs_compression": True,
"compression_target": budget.max_input_tokens,
"max_output_tokens": budget.max_output_tokens,
}
else:
# Severe overage — fall back to cheaper model + compress
return {
"within_budget": False,
"model": budget.fallback_model,
"needs_compression": True,
"compression_target": budget.max_input_tokens,
"max_output_tokens": budget.max_output_tokens,
}
Token Tracking System
Per-Model, Per-Intent, Per-Session Tracking
Every Bedrock invocation produces actual token counts in the response metadata. MangaAssist captures these and emits structured metrics.
flowchart TD
A[Bedrock Response Metadata] --> B[Extract input_tokens & output_tokens]
B --> C[Tag with model_id]
B --> D[Tag with intent]
B --> E[Tag with session_id]
C --> F[Per-Model Aggregator]
D --> G[Per-Intent Aggregator]
E --> H[Per-Session Accumulator]
F --> I[CloudWatch: tokens_by_model]
G --> J[CloudWatch: tokens_by_intent]
H --> K[DynamoDB: session_token_total]
I --> L[Cost Dashboard]
J --> L
K --> M[Session Budget Gate]
M -->|Over session budget| N[Downgrade to Haiku]
M -->|Within budget| O[Continue normally]
style F fill:#c8e6c9
style G fill:#bbdefb
style H fill:#ffe0b2
import time
import json
import boto3
from dataclasses import dataclass, field
from typing import Dict, Optional
@dataclass
class TokenRecord:
"""A single token consumption record from one Bedrock invocation."""
timestamp: float
session_id: str
intent: str
model_id: str
input_tokens: int
output_tokens: int
estimated_input_tokens: int # what we predicted before the call
latency_ms: float
cost_usd: float
# Bedrock Claude 3 pricing (us-east-1, on-demand)
MODEL_PRICING = {
"anthropic.claude-3-sonnet-20240229-v1:0": {
"input_per_1m": 3.00,
"output_per_1m": 15.00,
},
"anthropic.claude-3-haiku-20240307-v1:0": {
"input_per_1m": 0.25,
"output_per_1m": 1.25,
},
}
class TokenTracker:
"""Tracks token consumption across models, intents, and sessions.
Emits CloudWatch metrics and maintains session-level budgets in DynamoDB.
"""
def __init__(self, cloudwatch_namespace: str = "MangaAssist/Tokens"):
self.cw_client = boto3.client("cloudwatch")
self.ddb_client = boto3.client("dynamodb")
self.namespace = cloudwatch_namespace
self.session_table = "manga_assist_sessions"
def calculate_cost(self, model_id: str, input_tokens: int,
output_tokens: int) -> float:
"""Calculate USD cost for a single invocation."""
pricing = MODEL_PRICING.get(model_id)
if not pricing:
return 0.0
input_cost = (input_tokens / 1_000_000) * pricing["input_per_1m"]
output_cost = (output_tokens / 1_000_000) * pricing["output_per_1m"]
return round(input_cost + output_cost, 6)
def record_invocation(self, session_id: str, intent: str,
model_id: str, input_tokens: int,
output_tokens: int, estimated_input: int,
latency_ms: float) -> TokenRecord:
"""Record a Bedrock invocation and emit all tracking metrics."""
cost = self.calculate_cost(model_id, input_tokens, output_tokens)
record = TokenRecord(
timestamp=time.time(),
session_id=session_id,
intent=intent,
model_id=model_id,
input_tokens=input_tokens,
output_tokens=output_tokens,
estimated_input_tokens=estimated_input,
latency_ms=latency_ms,
cost_usd=cost,
)
# Emit metrics in parallel dimensions
self._emit_cloudwatch_metrics(record)
self._update_session_total(record)
return record
def _emit_cloudwatch_metrics(self, record: TokenRecord):
"""Emit per-model, per-intent, and estimation-drift metrics."""
# Determine short model name for dimension
model_short = "sonnet" if "sonnet" in record.model_id else "haiku"
drift = record.input_tokens - record.estimated_input_tokens
drift_pct = (drift / max(record.estimated_input_tokens, 1)) * 100
self.cw_client.put_metric_data(
Namespace=self.namespace,
MetricData=[
# Per-model input tokens
{
"MetricName": "InputTokens",
"Value": record.input_tokens,
"Unit": "Count",
"Dimensions": [
{"Name": "Model", "Value": model_short},
],
},
# Per-model output tokens
{
"MetricName": "OutputTokens",
"Value": record.output_tokens,
"Unit": "Count",
"Dimensions": [
{"Name": "Model", "Value": model_short},
],
},
# Per-intent cost
{
"MetricName": "InvocationCostUSD",
"Value": record.cost_usd,
"Unit": "None",
"Dimensions": [
{"Name": "Intent", "Value": record.intent},
{"Name": "Model", "Value": model_short},
],
},
# Per-intent input tokens
{
"MetricName": "InputTokens",
"Value": record.input_tokens,
"Unit": "Count",
"Dimensions": [
{"Name": "Intent", "Value": record.intent},
],
},
# Estimation drift percentage
{
"MetricName": "EstimationDriftPercent",
"Value": drift_pct,
"Unit": "Percent",
"Dimensions": [
{"Name": "Intent", "Value": record.intent},
],
},
],
)
def _update_session_total(self, record: TokenRecord):
"""Atomically increment session-level token totals in DynamoDB."""
self.ddb_client.update_item(
TableName=self.session_table,
Key={"session_id": {"S": record.session_id}},
UpdateExpression=(
"ADD total_input_tokens :inp, total_output_tokens :out, "
"total_cost_usd :cost, invocation_count :one"
),
ExpressionAttributeValues={
":inp": {"N": str(record.input_tokens)},
":out": {"N": str(record.output_tokens)},
":cost": {"N": str(record.cost_usd)},
":one": {"N": "1"},
},
)
def get_session_budget_status(self, session_id: str,
max_session_cost: float = 0.10
) -> dict:
"""Check if a session is within its cost budget.
Default max_session_cost of $0.10 means a single user session
should never exceed 10 cents. This prevents runaway multi-turn
conversations from burning budget.
"""
resp = self.ddb_client.get_item(
TableName=self.session_table,
Key={"session_id": {"S": session_id}},
ProjectionExpression="total_cost_usd, invocation_count",
)
item = resp.get("Item", {})
current_cost = float(item.get("total_cost_usd", {}).get("N", "0"))
invocations = int(item.get("invocation_count", {}).get("N", "0"))
return {
"session_id": session_id,
"current_cost_usd": current_cost,
"max_budget_usd": max_session_cost,
"budget_remaining_usd": max_session_cost - current_cost,
"within_budget": current_cost < max_session_cost,
"invocation_count": invocations,
"recommendation": (
"continue" if current_cost < max_session_cost * 0.8
else "downgrade_to_haiku" if current_cost < max_session_cost
else "reject_or_summarize_and_reset"
),
}
Context Window Optimization
Multi-turn conversations are the single largest driver of token growth. MangaAssist users frequently browse for 10-20 turns before purchasing. Without context window management, a 20-turn conversation with RAG context can hit 8,000+ input tokens per call.
Sliding Window with Summarization
flowchart TD
A[Conversation History: 15 turns] --> B{Turns > window_size?}
B -->|No| C[Use full history as-is]
B -->|Yes| D[Split: older turns vs recent window]
D --> E[Summarize older turns with Haiku]
D --> F[Keep last N turns verbatim]
E --> G[Compact summary: ~100 tokens]
F --> H[Recent window: ~400 tokens]
G --> I[Combined context]
H --> I
I --> J[Savings: 800 tokens reduced to 500]
style G fill:#c8e6c9
style H fill:#bbdefb
style J fill:#fff9c4
Max Context Budget Per Intent
| Intent | System Prompt | RAG Context | History | User Message | Total Input Budget | max_tokens Output |
|---|---|---|---|---|---|---|
product_search |
200 | 500 | 300 | 200 | 1,200 | 400 |
order_status |
150 | 0 | 250 | 200 | 600 | 200 |
recommendation |
250 | 800 | 600 | 350 | 2,000 | 600 |
manga_qa |
200 | 700 | 400 | 200 | 1,500 | 500 |
chitchat |
100 | 0 | 200 | 100 | 400 | 150 |
Response Size Controls
max_tokens Configuration Per Intent
Setting max_tokens in the Bedrock API call is the single most impactful output cost control. Without it, Claude will generate until it naturally stops — often 2-3x longer than necessary for transactional queries.
| Intent | max_tokens | Rationale |
|---|---|---|
product_search |
400 | Product card + brief description + price. No essays. |
order_status |
200 | Order number, status, ETA. Minimal prose. |
recommendation |
600 | 3-5 manga titles with brief justifications. |
manga_qa |
500 | Detailed but bounded answer about manga content. |
chitchat |
150 | Friendly short response. No long monologues. |
Streaming with Token Budget Enforcement
MangaAssist uses WebSocket streaming. The orchestrator counts tokens during streaming and can terminate the stream gracefully if the output token ceiling is hit:
import asyncio
from typing import AsyncIterator
async def stream_with_budget(
bedrock_stream: AsyncIterator[dict],
max_output_tokens: int,
encoding, # tiktoken encoding
) -> AsyncIterator[str]:
"""Stream Bedrock response chunks while enforcing token budget.
When the token budget is exhausted, appends a truncation notice and stops.
This prevents runaway output cost while maintaining user experience.
"""
token_count = 0
buffer = ""
async for event in bedrock_stream:
chunk_text = event.get("chunk", {}).get("bytes", b"").decode("utf-8")
if not chunk_text:
continue
chunk_tokens = len(encoding.encode(chunk_text))
token_count += chunk_tokens
if token_count >= max_output_tokens:
# Budget exhausted — send what we have and stop
yield chunk_text
yield "\n\n---\n*[Response trimmed for brevity. Ask a follow-up for more details.]*"
return
yield chunk_text
Prompt Compression Techniques
Technique 1: Remove Redundant Instructions
System prompts often carry repeated instructions across turns. MangaAssist deduplicates these.
Before compression (380 tokens):
You are MangaAssist, a helpful assistant for a Japanese manga e-commerce store.
You help customers find manga, check order status, and make recommendations.
Always respond in a friendly, helpful manner.
Always provide prices in JPY and USD.
Always include the manga title in both Japanese and English.
If you don't know the answer, say so honestly.
Never make up information about manga titles or pricing.
Be concise and helpful.
Remember to format product information consistently.
Always check if the manga is in stock before recommending it.
After compression (180 tokens):
You are MangaAssist (JP manga e-commerce). Tasks: find manga, order status, recommendations.
Rules: friendly, prices in JPY+USD, titles in JP+EN, honest when unsure, no fabrication, concise, consistent format, verify stock.
Technique 2: Dynamic Few-Shot Selection
Instead of including 5 fixed examples in every prompt, select 1-2 examples most relevant to the current query:
import numpy as np
from typing import List, Tuple
class DynamicFewShotSelector:
"""Selects the most relevant few-shot examples for the current query.
Instead of stuffing 5 examples into every prompt (costing ~500 tokens),
this picks 1-2 relevant examples (saving 300+ tokens per call).
"""
def __init__(self, examples: list[dict], embeddings: np.ndarray):
"""
Args:
examples: List of {"query": str, "response": str} few-shot pairs
embeddings: Pre-computed embeddings for each example query,
shape (n_examples, embedding_dim)
"""
self.examples = examples
self.embeddings = embeddings # (n_examples, dim)
def select(self, query_embedding: np.ndarray, top_k: int = 2,
min_similarity: float = 0.7) -> list[dict]:
"""Select top-k most similar examples above the similarity threshold.
This means:
- Simple queries may get 0 examples (saving ~250 tokens each)
- Only highly relevant examples are included
- Token spend on few-shots is proportional to query complexity
"""
similarities = np.dot(self.embeddings, query_embedding)
top_indices = np.argsort(similarities)[::-1][:top_k]
selected = []
for idx in top_indices:
if similarities[idx] >= min_similarity:
selected.append(self.examples[idx])
return selected
Technique 3: Template Variable Injection
Replace verbose inline context with compact variable references that Claude expands:
Before (650 tokens):
The customer is looking at "One Piece Vol. 104" by Eiichiro Oda, published by Shueisha,
ISBN 978-4-08-883371-1, priced at 528 JPY (approximately 3.52 USD), currently in stock
with 47 copies available, rated 4.8/5.0 from 2,341 reviews, categorized as
Shonen/Action/Adventure, available in Japanese language, dimensions 17.6 x 11.4 x 2 cm,
weight 200g, release date 2023-03-03. The customer previously viewed volumes 100-103
and has "One Piece" in their wishlist. They also purchased "Demon Slayer complete box set"
last month. Based on their browsing history they prefer action manga with ongoing series.
After (280 tokens):
Product: {title: "One Piece Vol. 104", author: "Eiichiro Oda", price: "528 JPY / 3.52 USD", stock: 47, rating: "4.8/5 (2341)", genre: "Shonen/Action/Adventure"}
User context: {viewed: ["OP vols 100-103"], wishlist: ["One Piece"], recent_purchase: "Demon Slayer box set", preference: "action, ongoing series"}
Context Pruning
Relevance-Based Pruning of RAG Chunks
OpenSearch returns multiple chunks ranked by vector similarity. Not all are worth the token cost.
flowchart TD
A[OpenSearch returns 8 RAG chunks] --> B[Score each chunk]
B --> C{Score > relevance_threshold?}
C -->|Yes| D[Include in prompt]
C -->|No| E[Discard]
D --> F{Cumulative tokens > RAG budget?}
F -->|No| G[Add next chunk]
F -->|Yes| H[Stop adding chunks]
G --> B
H --> I[Final RAG context: 3 chunks, 450 tokens]
E --> J[Discarded: 5 chunks, saved 600 tokens]
style I fill:#c8e6c9
style J fill:#ffcdd2
Conversation History Trimming with Recency Weighting
Older turns matter less but should not vanish entirely. MangaAssist uses a recency-weighted retention strategy:
| Turn Age | Weight | Action |
|---|---|---|
| Last 3 turns | 1.0 | Keep verbatim |
| Turns 4-6 | 0.6 | Keep if contains product names, prices, or order IDs |
| Turns 7-10 | 0.3 | Summarize into a single sentence per turn |
| Turns 11+ | 0.0 | Summarize entire block into 1-2 sentences |
Architecture Diagram — Token Optimizer Pipeline
flowchart TB
subgraph Ingress["API Gateway WebSocket"]
WS[User Message]
end
subgraph Orchestrator["ECS Fargate — Orchestrator"]
direction TB
A[Request Handler] --> B[Intent Classifier]
B --> C[TokenEstimator.estimate_prompt]
C --> D[TokenBudgetManager.check_budget]
D --> E{Within budget?}
E -->|Yes| F[Prompt Assembler]
E -->|No, mild| G[PromptCompressor.compress]
E -->|No, severe| H[Model Downgrade + Compress]
G --> F
H --> F
F --> I[ContextPruner.prune_rag_chunks]
I --> J[HistoryManager.trim_history]
J --> K[Set max_tokens for intent]
end
subgraph Cache["ElastiCache Redis"]
L[Semantic Cache Lookup]
end
subgraph RAG["OpenSearch Serverless"]
M[Vector Search + Chunk Retrieval]
end
subgraph FM["Bedrock Claude 3"]
N[Sonnet or Haiku Invocation]
end
subgraph Tracking["Token Tracking"]
O[TokenTracker.record_invocation]
P[CloudWatch Metrics]
Q[DynamoDB Session Update]
end
WS --> A
A --> L
L -->|Cache hit| R[Return cached response]
L -->|Cache miss| B
F --> M
M --> I
K --> N
N --> O
O --> P
O --> Q
N -->|Streamed response| S[stream_with_budget]
S --> WS
style Orchestrator fill:#e3f2fd,stroke:#1565c0
style Cache fill:#e8f5e9,stroke:#2e7d32
style RAG fill:#fff3e0,stroke:#e65100
style FM fill:#fce4ec,stroke:#c62828
style Tracking fill:#f3e5f5,stroke:#6a1b9a
TokenBudgetManager — Production Class
This is the top-level class that the orchestrator calls on every request. It coordinates estimation, budget checking, compression, and model selection.
import time
import logging
from dataclasses import dataclass
from typing import Optional
logger = logging.getLogger("mangaassist.token_budget")
@dataclass
class BudgetDecision:
"""The output of the budget manager — tells the orchestrator what to do."""
model_id: str
max_output_tokens: int
needs_compression: bool
compression_target: Optional[int]
estimated_input_tokens: int
estimated_cost_usd: float
session_budget_remaining_usd: float
warnings: list[str]
class TokenBudgetManager:
"""Central budget authority for all MangaAssist Bedrock invocations.
Coordinates:
1. Token estimation (tiktoken pre-count)
2. Budget checking (per-intent limits)
3. Model selection (Sonnet vs Haiku based on budget)
4. Session-level cost gating
5. Compression decisions
This class is instantiated once per ECS task and reused across requests.
"""
SONNET_MODEL_ID = "anthropic.claude-3-sonnet-20240229-v1:0"
HAIKU_MODEL_ID = "anthropic.claude-3-haiku-20240307-v1:0"
def __init__(self, estimator: "TokenEstimator",
tracker: "TokenTracker",
max_session_cost: float = 0.10):
self.estimator = estimator
self.tracker = tracker
self.max_session_cost = max_session_cost
def evaluate(self, session_id: str, intent: MangaIntent,
system_prompt: str, user_message: str,
conversation_history: list[dict],
rag_context: str) -> BudgetDecision:
"""Evaluate a request and return the budget decision.
This is the single entry point the orchestrator calls before
every Bedrock invocation.
"""
warnings = []
# Step 1: Estimate input tokens
estimated_input = self.estimator.estimate_prompt(
system_prompt, user_message, conversation_history, rag_context
)
# Step 2: Check intent budget
budget_check = self.estimator.check_budget(estimated_input, intent)
# Step 3: Resolve model ID
model_short = budget_check["model"]
model_id = (self.SONNET_MODEL_ID if model_short == "sonnet"
else self.HAIKU_MODEL_ID)
# Step 4: Check session budget
session_status = self.tracker.get_session_budget_status(
session_id, self.max_session_cost
)
if not session_status["within_budget"]:
# Session over budget — force Haiku and warn
model_id = self.HAIKU_MODEL_ID
model_short = "haiku"
warnings.append(
f"Session budget exhausted "
f"(${session_status['current_cost_usd']:.4f} / "
f"${self.max_session_cost}). Downgraded to Haiku."
)
elif session_status["recommendation"] == "downgrade_to_haiku":
# Session approaching limit — preemptive downgrade
model_id = self.HAIKU_MODEL_ID
model_short = "haiku"
warnings.append("Session approaching budget limit. Using Haiku.")
# Step 5: Estimate cost
max_output = budget_check["max_output_tokens"]
pricing = MODEL_PRICING[model_id]
estimated_cost = (
(estimated_input / 1_000_000) * pricing["input_per_1m"] +
(max_output / 1_000_000) * pricing["output_per_1m"]
)
if not budget_check["within_budget"]:
warnings.append(
f"Input tokens ({estimated_input}) exceed intent budget "
f"({INTENT_BUDGETS[intent].max_input_tokens}). "
f"Compression required."
)
logger.info(
"Budget decision",
extra={
"session_id": session_id,
"intent": intent.value,
"estimated_input_tokens": estimated_input,
"model": model_short,
"needs_compression": budget_check["needs_compression"],
"estimated_cost_usd": estimated_cost,
"session_remaining_usd": session_status["budget_remaining_usd"],
},
)
return BudgetDecision(
model_id=model_id,
max_output_tokens=max_output,
needs_compression=budget_check["needs_compression"],
compression_target=budget_check["compression_target"],
estimated_input_tokens=estimated_input,
estimated_cost_usd=estimated_cost,
session_budget_remaining_usd=session_status["budget_remaining_usd"],
warnings=warnings,
)
PromptCompressor — Production Class
import re
from dataclasses import dataclass
from typing import Optional
@dataclass
class CompressionResult:
"""Output of prompt compression."""
original_tokens: int
compressed_tokens: int
compression_ratio: float
compressed_text: str
techniques_applied: list[str]
class PromptCompressor:
"""Compresses prompts to fit within token budgets.
Applies a cascade of compression techniques from cheapest (rule-based)
to most expensive (LLM-based summarization). Stops as soon as the
target token count is reached.
Designed for MangaAssist where prompts contain:
- System instructions (compressible with deduplication)
- RAG manga descriptions (compressible with extraction)
- Conversation history (compressible with summarization)
- User message (never compressed — sacred text)
"""
def __init__(self, estimator: "TokenEstimator"):
self.estimator = estimator
def compress(self, system_prompt: str, rag_context: str,
conversation_history: list[dict],
target_tokens: int) -> CompressionResult:
"""Apply cascading compression to meet target token count.
Compression cascade (applied in order, stops when target is met):
1. Whitespace and formatting cleanup (free)
2. Instruction deduplication (free)
3. RAG chunk truncation (cheap)
4. History summarization (moderate — uses Haiku)
5. Few-shot example removal (moderate — quality impact)
"""
techniques_applied = []
original_tokens = self.estimator.count_tokens(
system_prompt + rag_context +
" ".join(t.get("content", "") for t in conversation_history)
)
# Technique 1: Whitespace cleanup
system_prompt = self._clean_whitespace(system_prompt)
rag_context = self._clean_whitespace(rag_context)
techniques_applied.append("whitespace_cleanup")
current = self._count_total(system_prompt, rag_context,
conversation_history)
if current <= target_tokens:
return self._result(original_tokens, current, system_prompt,
rag_context, conversation_history,
techniques_applied)
# Technique 2: Instruction deduplication
system_prompt = self._deduplicate_instructions(system_prompt)
techniques_applied.append("instruction_dedup")
current = self._count_total(system_prompt, rag_context,
conversation_history)
if current <= target_tokens:
return self._result(original_tokens, current, system_prompt,
rag_context, conversation_history,
techniques_applied)
# Technique 3: RAG chunk truncation
rag_context = self._truncate_rag(rag_context, target_tokens // 3)
techniques_applied.append("rag_truncation")
current = self._count_total(system_prompt, rag_context,
conversation_history)
if current <= target_tokens:
return self._result(original_tokens, current, system_prompt,
rag_context, conversation_history,
techniques_applied)
# Technique 4: History summarization
conversation_history = self._summarize_old_turns(
conversation_history, keep_recent=3
)
techniques_applied.append("history_summarization")
current = self._count_total(system_prompt, rag_context,
conversation_history)
return self._result(original_tokens, current, system_prompt,
rag_context, conversation_history,
techniques_applied)
def _clean_whitespace(self, text: str) -> str:
"""Remove excessive whitespace, blank lines, and trailing spaces."""
text = re.sub(r'\n{3,}', '\n\n', text)
text = re.sub(r'[ \t]+', ' ', text)
text = re.sub(r' +\n', '\n', text)
return text.strip()
def _deduplicate_instructions(self, system_prompt: str) -> str:
"""Remove semantically duplicate instruction lines.
Common in MangaAssist prompts:
- 'Be helpful' + 'Respond in a helpful manner' -> keep one
- 'Always provide prices' + 'Include pricing' -> keep one
"""
lines = system_prompt.split('\n')
seen_intents = set()
deduplicated = []
# Simple keyword-group deduplication
intent_keywords = {
"helpful": {"helpful", "friendly", "assist", "polite"},
"pricing": {"price", "pricing", "cost", "jpy", "usd"},
"honesty": {"honest", "don't make up", "no fabrication",
"don't know"},
"concise": {"concise", "brief", "short", "succinct"},
"format": {"format", "consistent", "structure"},
}
for line in lines:
line_lower = line.lower()
matched_intent = None
for intent, keywords in intent_keywords.items():
if any(kw in line_lower for kw in keywords):
matched_intent = intent
break
if matched_intent and matched_intent in seen_intents:
continue # Skip duplicate intent
if matched_intent:
seen_intents.add(matched_intent)
deduplicated.append(line)
return '\n'.join(deduplicated)
def _truncate_rag(self, rag_context: str, max_tokens: int) -> str:
"""Truncate RAG context to fit within a token budget.
Keeps chunks in order (highest relevance first, as returned
by OpenSearch) and cuts when the budget is reached.
"""
chunks = rag_context.split("\n---\n")
result_chunks = []
running_tokens = 0
for chunk in chunks:
chunk_tokens = self.estimator.count_tokens(chunk)
if running_tokens + chunk_tokens > max_tokens:
break
result_chunks.append(chunk)
running_tokens += chunk_tokens
return "\n---\n".join(result_chunks)
def _summarize_old_turns(self, history: list[dict],
keep_recent: int = 3) -> list[dict]:
"""Replace old conversation turns with a compact summary.
Keeps the most recent `keep_recent` turns verbatim and replaces
older turns with a single summary turn.
"""
if len(history) <= keep_recent:
return history
old_turns = history[:-keep_recent]
recent_turns = history[-keep_recent:]
# Extract key entities from old turns (product names, order IDs)
key_facts = []
for turn in old_turns:
content = turn.get("content", "")
# Look for product references, order IDs, prices
if any(kw in content.lower() for kw in
["vol.", "manga", "order", "price", "recommend"]):
# Keep a compressed version
compressed = content[:80] + "..." if len(content) > 80 else content
key_facts.append(compressed)
summary_content = (
f"[Earlier conversation summary: {len(old_turns)} turns. "
f"Key topics: {'; '.join(key_facts[:3])}]"
)
summary_turn = {"role": "system", "content": summary_content}
return [summary_turn] + recent_turns
def _count_total(self, system_prompt: str, rag_context: str,
history: list[dict]) -> int:
"""Count total tokens across all prompt components."""
total_text = system_prompt + "\n" + rag_context + "\n"
total_text += " ".join(t.get("content", "") for t in history)
return self.estimator.count_tokens(total_text)
def _result(self, original: int, compressed: int,
system_prompt: str, rag_context: str,
history: list[dict],
techniques: list[str]) -> CompressionResult:
"""Build the compression result."""
combined = (
system_prompt + "\n---\n" + rag_context + "\n---\n" +
"\n".join(t.get("content", "") for t in history)
)
return CompressionResult(
original_tokens=original,
compressed_tokens=compressed,
compression_ratio=compressed / max(original, 1),
compressed_text=combined,
techniques_applied=techniques,
)
Comparison Table — Token Efficiency Techniques
| Technique | Token Savings | Quality Impact | Implementation Complexity | MangaAssist Priority |
|---|---|---|---|---|
| max_tokens per intent | 30-50% output reduction | None (prevents over-generation) | Low — config change | P0 — deploy first |
| Model routing (Haiku for simple) | 60-80% cost reduction per routed call | Minimal for transactional queries | Medium — needs intent classifier | P0 — deploy first |
| Whitespace cleanup | 5-10% input reduction | None | Low — regex rules | P1 — quick win |
| Instruction deduplication | 10-20% system prompt reduction | None | Low — keyword matching | P1 — quick win |
| Dynamic few-shot selection | 30-60% few-shot token reduction | Slight improvement (more relevant examples) | Medium — needs embeddings | P1 — good ROI |
| RAG chunk pruning | 20-40% RAG context reduction | Risk of missing relevant details | Medium — needs relevance scoring | P2 — test carefully |
| Conversation history sliding window | 40-70% history reduction | Risk of losing context from early turns | Medium — needs summarization | P2 — test carefully |
| Template variable injection | 30-50% context reduction | None if model handles structured input | Low — prompt rewrite | P1 — quick win |
| LLMLingua-style compression | 40-60% input reduction | Risk of quality degradation for Japanese content | High — needs perplexity model | P3 — experimental |
| Streaming token budget enforcement | 10-30% output reduction | Risk of truncating important info | Medium — needs streaming counter | P2 — test carefully |
Daily Cost Impact at MangaAssist Scale
Assuming 1M messages/day with the following intent distribution:
| Intent | % of Traffic | Messages/Day | Model (without optimization) | Model (with optimization) |
|---|---|---|---|---|
product_search |
35% | 350,000 | Sonnet | Sonnet (compressed) |
order_status |
25% | 250,000 | Sonnet | Haiku |
recommendation |
15% | 150,000 | Sonnet | Sonnet (compressed) |
manga_qa |
10% | 100,000 | Sonnet | Sonnet (compressed) |
chitchat |
15% | 150,000 | Sonnet | Haiku |
Cost Comparison
| Metric | Without Optimization | With Full Token Efficiency | Savings |
|---|---|---|---|
| Daily Sonnet calls | 1,000,000 | 400,000 | 60% fewer Sonnet calls |
| Daily Haiku calls | 0 | 600,000 | Cheaper model absorbs simple intents |
| Avg input tokens/call (Sonnet) | 1,800 | 1,200 | 33% reduction via compression |
| Avg output tokens/call (Sonnet) | 500 | 350 | 30% reduction via max_tokens |
| Daily input cost (Sonnet) | $5,400 | $1,440 | $3,960 saved |
| Daily output cost (Sonnet) | $7,500 | $2,100 | $5,400 saved |
| Daily input cost (Haiku) | $0 | $108 | Added cost |
| Daily output cost (Haiku) | $0 | $131 | Added cost |
| Total daily cost | $12,900 | $3,779 | $9,121/day saved (71%) |
| Monthly cost | $387,000 | $113,370 | $273,630/month saved |
Key Takeaways
- Estimate before you spend: tiktoken pre-counting with a 5% calibration buffer catches budget overruns before they cost money.
- Track at three dimensions: per-model, per-intent, and per-session tracking gives you the granularity to diagnose cost spikes.
- Compress in cascade: apply cheap techniques first (whitespace, dedup) and expensive techniques (summarization) only when needed.
- Set max_tokens always: the single highest-ROI configuration change is setting per-intent output token limits.
- Route aggressively to Haiku: transactional intents (order_status, chitchat) work well on Haiku at 12x lower input cost and 12x lower output cost.
- Budget at the session level: a per-session cost cap ($0.10) prevents multi-turn conversations from burning runaway cost.