Intelligent Caching Architecture for GenAI Applications
AWS AIP-C01 Task 4.1 — Skill 4.1.4: Design intelligent caching systems for FM applications Context: MangaAssist e-commerce chatbot — Bedrock Claude 3 (Sonnet/Haiku), OpenSearch Serverless, DynamoDB, ECS Fargate, API Gateway WebSocket, ElastiCache Redis. 1M messages/day.
Skill Mapping
| Certification | Domain | Task | Skill |
|---|---|---|---|
| AWS AIP-C01 | Domain 4 — Operational Efficiency | Task 4.1 — Optimize FM applications | Skill 4.1.4 — Design intelligent caching systems to reduce FM invocation cost and latency |
Skill scope: Architect multi-tier caching layers — from in-process exact-match through semantic similarity to edge delivery — that reduce Bedrock invocation volume, lower p95 latency, and cut per-message cost while guaranteeing answer freshness for a high-traffic manga retail chatbot.
Mind Map — Intelligent Caching Dimensions
mindmap
root((Intelligent<br/>Caching))
Cache Tiers
L1 — In-Memory
Python LRU / TTLCache
Exact Query Match
Sub-ms Latency
Per-Container Scope
L2 — Semantic Cache
ElastiCache Redis
Vector Similarity Search
Cosine Threshold 0.92
Cross-Container Shared
L3 — Prompt Cache
Bedrock Prompt Caching
System Prompt Reuse
Up to 100K Tokens
Model-Provider Managed
L4 — Edge Cache
CloudFront Distribution
FAQ Responses
Product Info Pages
Global Low Latency
Cache Key Strategies
Exact Match Hashing
Normalized Query Text
Lowercase + Stopword Removal
Deterministic Hash
Semantic Fingerprinting
Intent + Entities + Model
Sorted Entity Keys
SHA-256 Digest
Embedding Similarity
Titan Embeddings V2
1024-Dim Vectors
Cosine Distance
Invalidation
TTL-Based
Product Info 1hr
Recommendations 4hr
FAQ 24hr
Event-Driven
Price Change Webhook
Inventory Update
Catalog Refresh
Version-Based
Prompt Version Tag
Model Version Change
System Config Update
Observability
Hit Rate per Tier
Latency Saved
Cost Avoidance
Staleness Incidents
Memory Utilization
Multi-Tier Caching Architecture
MangaAssist processes 1M messages/day. Without caching, every message invokes Bedrock ($0.003 input + $0.015 output per 1K tokens on Sonnet). A 40% aggregate cache hit rate saves ~$18,000/month and drops p95 latency from 2.8s to 180ms for cached responses.
Tier Overview
graph TD
subgraph Client
U[User Message via WebSocket]
end
subgraph "API Gateway + ECS Fargate"
N[Normalize Query]
L1[L1 — In-Memory Cache<br/>Python TTLCache<br/>Exact Match]
L2[L2 — Semantic Cache<br/>ElastiCache Redis<br/>Cosine ≥ 0.92]
L3[L3 — Prompt Cache<br/>Bedrock Prompt Caching<br/>System Prompt Reuse]
ORCH[Orchestrator<br/>RAG + Bedrock Invoke]
STORE[Cache Store<br/>Write-Back to L1 + L2]
end
subgraph "Edge"
L4[L4 — CloudFront Edge Cache<br/>FAQ & Product Pages<br/>TTL 5–60 min]
end
subgraph "AWS Services"
BED[Bedrock Claude 3 Sonnet/Haiku]
OS[OpenSearch Serverless<br/>Vector Store]
DDB[DynamoDB<br/>Session History]
EMB[Bedrock Titan Embeddings V2]
end
U -->|Static FAQ / Product| L4
L4 -->|MISS| N
U -->|Conversational| N
N --> L1
L1 -->|HIT| U
L1 -->|MISS| L2
L2 -->|HIT| U
L2 -->|MISS| L3
L3 -->|System prompt cached| BED
L3 -->|MISS| ORCH
ORCH --> OS
ORCH --> DDB
ORCH --> BED
BED --> STORE
STORE --> L1
STORE --> L2
STORE --> U
style L1 fill:#2d6a4f,stroke:#1b4332,color:#fff
style L2 fill:#e76f51,stroke:#f4a261,color:#fff
style L3 fill:#264653,stroke:#2a9d8f,color:#fff
style L4 fill:#457b9d,stroke:#1d3557,color:#fff
L1 — In-Memory Exact-Match Cache
The fastest tier. Each ECS container maintains a local Python dictionary with TTL expiry. Only exact normalized-query matches return a hit.
Why it works for MangaAssist: Popular queries repeat verbatim — "what's new this week", "track my order", "do you ship to Osaka". These account for ~12% of traffic.
import hashlib
import time
from threading import Lock
from typing import Optional, Any
class L1InMemoryCache:
"""
Per-container in-memory cache with TTL.
Designed for exact-match lookups on normalized queries.
MangaAssist: ~12% hit rate on verbatim repeated queries.
"""
def __init__(self, max_size: int = 10_000, default_ttl: int = 300):
self._cache: dict[str, dict[str, Any]] = {}
self._max_size = max_size
self._default_ttl = default_ttl
self._lock = Lock()
self._hits = 0
self._misses = 0
def _make_key(self, normalized_query: str, intent: str) -> str:
"""Deterministic cache key from normalized query + intent."""
raw = f"{normalized_query}|{intent}"
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def get(self, normalized_query: str, intent: str) -> Optional[str]:
key = self._make_key(normalized_query, intent)
with self._lock:
entry = self._cache.get(key)
if entry is None:
self._misses += 1
return None
if time.time() > entry["expires_at"]:
del self._cache[key]
self._misses += 1
return None
self._hits += 1
return entry["response"]
def put(
self,
normalized_query: str,
intent: str,
response: str,
ttl: Optional[int] = None,
) -> None:
key = self._make_key(normalized_query, intent)
ttl = ttl or self._default_ttl
with self._lock:
if len(self._cache) >= self._max_size:
self._evict_oldest()
self._cache[key] = {
"response": response,
"expires_at": time.time() + ttl,
"created_at": time.time(),
}
def _evict_oldest(self) -> None:
"""Remove the entry closest to expiration."""
oldest_key = min(self._cache, key=lambda k: self._cache[k]["expires_at"])
del self._cache[oldest_key]
@property
def hit_rate(self) -> float:
total = self._hits + self._misses
return self._hits / total if total > 0 else 0.0
L2 — Semantic Cache with ElastiCache Redis
The core caching tier. Queries are embedded with Titan Embeddings V2, then matched against cached embeddings using Redis vector similarity search. A cosine similarity >= 0.92 returns the cached response.
Semantic Caching Flow
sequenceDiagram
participant U as User
participant N as Normalizer
participant E as Titan Embeddings V2
participant R as ElastiCache Redis<br/>(RediSearch VSS)
participant B as Bedrock Claude 3
U->>N: "When does One Piece vol 108 come out?"
N->>N: normalize → "one piece vol 108 release date"
N->>E: Embed normalized query
E-->>N: 1024-dim vector
N->>R: FT.SEARCH idx @embedding:[VECTOR_RANGE 0.08 $vec]
alt Cosine similarity ≥ 0.92
R-->>U: Return cached response + metadata
Note over R,U: Cache HIT — 15ms total
else No match above threshold
R-->>N: No match
N->>B: Full RAG pipeline invocation
B-->>N: Generated response
N->>R: Store embedding + response + metadata
N-->>U: Return fresh response
Note over B,U: Cache MISS — 2800ms total
end
Result Fingerprinting
For deterministic cache keys that go beyond embedding similarity, MangaAssist uses result fingerprinting — a hash of the structured intent, sorted entities, and model ID.
import hashlib
import json
import re
from typing import Optional
# Common Japanese/English stopwords for manga queries
STOPWORDS = {
"the", "a", "an", "is", "are", "was", "were", "do", "does", "did",
"what", "when", "where", "how", "can", "could", "please", "i", "my",
"me", "tell", "show", "about", "this", "that", "の", "は", "が", "を",
"に", "で", "と", "も", "か", "よ", "ね",
}
class CacheKeyGenerator:
"""
Generates deterministic cache keys for MangaAssist queries.
Two strategies: exact-match hash and result fingerprint.
"""
@staticmethod
def normalize_query(raw_query: str) -> str:
"""
Normalize query for exact-match hashing:
lowercase, strip punctuation, remove stopwords, sort remaining tokens.
"""
text = raw_query.lower().strip()
# Remove punctuation except hyphens (for manga titles like 'Jujutsu-Kaisen')
text = re.sub(r"[^\w\s\-]", "", text)
tokens = text.split()
filtered = [t for t in tokens if t not in STOPWORDS]
return " ".join(sorted(filtered))
@staticmethod
def exact_match_key(normalized_query: str) -> str:
"""SHA-256 of normalized query text."""
return hashlib.sha256(normalized_query.encode("utf-8")).hexdigest()
@staticmethod
def result_fingerprint(
intent: str,
entities: dict[str, str],
model_id: str = "anthropic.claude-3-sonnet-20240229-v1:0",
) -> str:
"""
Deterministic fingerprint: hash(intent + sorted_entities + model_id).
Used when the same structured intent+entities should always return
the same cached response regardless of phrasing.
Example:
intent = "manga_release_date"
entities = {"title": "One Piece", "volume": "108"}
→ consistent key for any phrasing asking about OP vol 108 release
"""
sorted_entities = json.dumps(entities, sort_keys=True, ensure_ascii=False)
raw = f"{intent}|{sorted_entities}|{model_id}"
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
@staticmethod
def ttl_for_intent(intent: str) -> int:
"""
Intent-aware TTL selection for MangaAssist.
Product info changes more often than FAQ answers.
"""
ttl_map = {
"product_info": 3600, # 1 hour — prices/stock change
"manga_release_date": 3600, # 1 hour — dates can shift
"recommendation": 14400, # 4 hours — personalized, less volatile
"shipping_info": 7200, # 2 hours — policy changes are rare
"faq": 86400, # 24 hours — highly stable
"order_status": 0, # Never cache — always real-time
"greeting": 86400, # 24 hours — static
}
return ttl_map.get(intent, 1800) # Default 30 minutes
L3 — Bedrock Prompt Caching
Bedrock's built-in prompt caching feature caches the system prompt prefix across invocations. When MangaAssist sends the same system prompt (persona instructions, guardrails, output format) with every request, Bedrock can skip re-processing that prefix.
How Prompt Caching Works
| Aspect | Detail |
|---|---|
| What is cached | System prompt + any static prefix (up to 100K tokens) |
| Cache scope | Same model, same account, same region |
| Latency saving | Up to 85% reduction in time-to-first-token for cached prefix |
| Cost saving | Cached tokens billed at reduced rate (~90% discount on input tokens) |
| Cache lifetime | 5 minutes of inactivity before eviction |
| MangaAssist system prompt | ~2,200 tokens (persona + guardrails + format instructions + manga domain context) |
MangaAssist Prompt Structure for Caching
import boto3
import json
def invoke_with_prompt_caching(
user_message: str,
rag_context: str,
session_history: list[dict],
) -> dict:
"""
Invoke Bedrock Claude 3 Sonnet with prompt caching enabled.
The system prompt (2,200 tokens) is cached across invocations.
Only user message + RAG context + session history vary per request.
"""
client = boto3.client("bedrock-runtime", region_name="us-east-1")
# This system prompt is identical across all 1M daily messages.
# Bedrock caches it after the first invocation.
system_prompt = [
{
"text": (
"You are MangaAssist, a friendly and knowledgeable Japanese manga "
"retail assistant for a major e-commerce platform. You help customers "
"discover manga, track orders, understand shipping options, and get "
"personalized recommendations.\n\n"
"## Guardrails\n"
"- Never discuss non-manga topics beyond polite deflection\n"
"- Never reveal internal pricing logic or supplier information\n"
"- Always respond in the customer's language (Japanese or English)\n"
"- Flag potential age-restricted content with appropriate warnings\n"
"- Never fabricate manga titles, ISBNs, or release dates\n\n"
"## Output Format\n"
"- Use markdown for structured responses\n"
"- Include manga title in both English and Japanese when available\n"
"- Always cite source (catalog, FAQ, order system) for factual claims\n"
"- Keep responses under 300 words unless the customer asks for detail"
),
"cachePoint": {"type": "default"}, # <-- Enables prompt caching
}
]
# Dynamic content — changes per request, not cached
messages = []
for turn in session_history[-6:]: # Last 3 turns
messages.append(turn)
messages.append({
"role": "user",
"content": [
{"text": f"## Retrieved Context\n{rag_context}\n\n## Customer Question\n{user_message}"}
],
})
response = client.converse(
modelId="anthropic.claude-3-sonnet-20240229-v1:0",
system=system_prompt,
messages=messages,
inferenceConfig={
"maxTokens": 1024,
"temperature": 0.3,
},
)
# Check cache usage in response metadata
usage = response.get("usage", {})
cache_read_tokens = usage.get("cacheReadInputTokens", 0)
cache_write_tokens = usage.get("cacheWriteInputTokens", 0)
return {
"response": response["output"]["message"]["content"][0]["text"],
"cache_read_tokens": cache_read_tokens,
"cache_write_tokens": cache_write_tokens,
"total_input_tokens": usage.get("inputTokens", 0),
"output_tokens": usage.get("outputTokens", 0),
}
L4 — CloudFront Edge Cache
Static and semi-static responses — FAQs, product catalog pages, shipping policy — are cached at CloudFront edge locations. These bypass the chatbot pipeline entirely.
Edge Cache Design
| Content Type | CloudFront TTL | Cache-Control Header | Invalidation Trigger |
|---|---|---|---|
| FAQ answers (top 50) | 60 minutes | public, max-age=3600, s-maxage=3600 |
FAQ content update in CMS |
| Product info pages | 15 minutes | public, max-age=900, s-maxage=900 |
Price or stock change event |
| Shipping policy | 24 hours | public, max-age=86400 |
Policy document update |
| Manga cover images | 7 days | public, max-age=604800, immutable |
Never (immutable assets) |
| Order status | 0 (no cache) | private, no-store |
N/A — always real-time |
Edge Cache Architecture
graph LR
subgraph "Edge (CloudFront PoPs)"
CF[CloudFront Distribution]
CF_CACHE[(Edge Cache<br/>FAQ + Product Pages)]
end
subgraph "Origin (ECS Fargate)"
ALB[Application Load Balancer]
FAQ_SVC[FAQ Service<br/>Pre-rendered Answers]
PROD_SVC[Product API<br/>Catalog Data]
end
subgraph "Invalidation"
EB[EventBridge]
LAMBDA[Lambda<br/>Cache Invalidator]
end
USER[User / Client] --> CF
CF --> CF_CACHE
CF_CACHE -->|MISS| ALB
ALB --> FAQ_SVC
ALB --> PROD_SVC
EB -->|price_changed| LAMBDA
EB -->|faq_updated| LAMBDA
LAMBDA -->|CreateInvalidation| CF
style CF fill:#457b9d,stroke:#1d3557,color:#fff
style CF_CACHE fill:#a8dadc,stroke:#457b9d,color:#1d3557
SemanticCacheManager — Full Implementation
import hashlib
import json
import time
import logging
from typing import Optional, Any
from dataclasses import dataclass, field
import boto3
import redis
import numpy as np
logger = logging.getLogger(__name__)
@dataclass
class CacheEntry:
"""A single cached response with metadata."""
query_text: str
response_text: str
embedding: list[float]
intent: str
model_id: str
confidence: float
created_at: float
ttl: int
entities: dict[str, str] = field(default_factory=dict)
hit_count: int = 0
class SemanticCacheManager:
"""
Multi-tier cache manager for MangaAssist.
L1: In-memory exact match (per container)
L2: ElastiCache Redis with vector similarity search
Usage:
cache = SemanticCacheManager(
redis_url="rediss://manga-cache.xxxxx.use1.cache.amazonaws.com:6379",
similarity_threshold=0.92,
)
# Try cache first
result = await cache.get(query="When does One Piece 108 release?", intent="manga_release_date")
if result is None:
response = invoke_bedrock(...)
await cache.put(query=..., response=response, intent=..., entities=...)
"""
EMBEDDING_DIM = 1024 # Titan Embeddings V2 dimension
INDEX_NAME = "manga_cache_idx"
KEY_PREFIX = "cache:"
def __init__(
self,
redis_url: str,
similarity_threshold: float = 0.92,
l1_max_size: int = 5_000,
l1_default_ttl: int = 300,
embedding_model_id: str = "amazon.titan-embed-text-v2:0",
):
self.redis_client = redis.Redis.from_url(
redis_url, decode_responses=False
)
self.similarity_threshold = similarity_threshold
self.embedding_model_id = embedding_model_id
self.bedrock_client = boto3.client("bedrock-runtime", region_name="us-east-1")
# L1 in-memory cache
self._l1_cache: dict[str, dict[str, Any]] = {}
self._l1_max_size = l1_max_size
self._l1_default_ttl = l1_default_ttl
# Metrics
self._metrics = {
"l1_hits": 0, "l1_misses": 0,
"l2_hits": 0, "l2_misses": 0,
"total_requests": 0,
}
self._ensure_redis_index()
def _ensure_redis_index(self) -> None:
"""Create RediSearch vector similarity index if it doesn't exist."""
try:
self.redis_client.execute_command("FT.INFO", self.INDEX_NAME)
logger.info("Redis vector index already exists")
except redis.ResponseError:
schema = (
"FT.CREATE", self.INDEX_NAME, "ON", "HASH",
"PREFIX", "1", self.KEY_PREFIX,
"SCHEMA",
"embedding", "VECTOR", "HNSW", "6",
"TYPE", "FLOAT32",
"DIM", str(self.EMBEDDING_DIM),
"DISTANCE_METRIC", "COSINE",
"query_text", "TEXT",
"response_text", "TEXT",
"intent", "TAG",
"model_id", "TAG",
"created_at", "NUMERIC", "SORTABLE",
"ttl", "NUMERIC",
"hit_count", "NUMERIC", "SORTABLE",
"confidence", "NUMERIC",
"entities_json", "TEXT",
)
self.redis_client.execute_command(*schema)
logger.info("Created Redis vector index: %s", self.INDEX_NAME)
def _embed_query(self, text: str) -> list[float]:
"""Generate embedding using Titan Embeddings V2."""
response = self.bedrock_client.invoke_model(
modelId=self.embedding_model_id,
body=json.dumps({"inputText": text}),
)
body = json.loads(response["body"].read())
return body["embedding"]
def _normalize(self, query: str) -> str:
"""Normalize query for consistent matching."""
import re
text = query.lower().strip()
text = re.sub(r"[^\w\s\-]", "", text)
return text
# ── L1: In-Memory Exact Match ──
def _l1_get(self, normalized_query: str) -> Optional[str]:
entry = self._l1_cache.get(normalized_query)
if entry is None:
self._metrics["l1_misses"] += 1
return None
if time.time() > entry["expires_at"]:
del self._l1_cache[normalized_query]
self._metrics["l1_misses"] += 1
return None
self._metrics["l1_hits"] += 1
return entry["response"]
def _l1_put(self, normalized_query: str, response: str, ttl: int) -> None:
if len(self._l1_cache) >= self._l1_max_size:
oldest = min(self._l1_cache, key=lambda k: self._l1_cache[k]["expires_at"])
del self._l1_cache[oldest]
self._l1_cache[normalized_query] = {
"response": response,
"expires_at": time.time() + ttl,
}
# ── L2: Redis Semantic Cache ──
def _l2_search(self, embedding: list[float], intent: str) -> Optional[dict]:
"""
Vector similarity search in Redis.
Returns the best match above the similarity threshold.
"""
query_vector = np.array(embedding, dtype=np.float32).tobytes()
# KNN search with intent filter for precision
query = (
f"(@intent:{{{intent}}})=>"
f"[KNN 3 @embedding $vec AS similarity]"
)
try:
results = self.redis_client.execute_command(
"FT.SEARCH", self.INDEX_NAME, query,
"PARAMS", "2", "vec", query_vector,
"SORTBY", "similarity", "ASC", # COSINE: lower = more similar
"LIMIT", "0", "1",
"RETURN", "4", "response_text", "query_text", "similarity", "hit_count",
"DIALECT", "2",
)
if results[0] == 0:
return None
doc_fields = results[2]
field_dict = {}
for i in range(0, len(doc_fields), 2):
key = doc_fields[i].decode() if isinstance(doc_fields[i], bytes) else doc_fields[i]
val = doc_fields[i + 1].decode() if isinstance(doc_fields[i + 1], bytes) else doc_fields[i + 1]
field_dict[key] = val
# COSINE distance: 0 = identical, 2 = opposite
# Convert to similarity: 1 - (distance / 2)
distance = float(field_dict.get("similarity", 2.0))
similarity = 1.0 - (distance / 2.0)
if similarity >= self.similarity_threshold:
self._metrics["l2_hits"] += 1
# Increment hit count
doc_key = results[1].decode() if isinstance(results[1], bytes) else results[1]
self.redis_client.hincrby(doc_key, "hit_count", 1)
return {
"response": field_dict["response_text"],
"original_query": field_dict["query_text"],
"similarity": similarity,
}
self._metrics["l2_misses"] += 1
return None
except redis.ResponseError as e:
logger.error("Redis search failed: %s", e)
self._metrics["l2_misses"] += 1
return None
def _l2_store(self, entry: CacheEntry) -> None:
"""Store a cache entry in Redis with vector embedding."""
fingerprint = hashlib.sha256(
f"{entry.query_text}|{entry.intent}|{time.time()}".encode()
).hexdigest()[:16]
key = f"{self.KEY_PREFIX}{fingerprint}"
vector_bytes = np.array(entry.embedding, dtype=np.float32).tobytes()
self.redis_client.hset(key, mapping={
"embedding": vector_bytes,
"query_text": entry.query_text,
"response_text": entry.response_text,
"intent": entry.intent,
"model_id": entry.model_id,
"confidence": entry.confidence,
"created_at": entry.created_at,
"ttl": entry.ttl,
"hit_count": 0,
"entities_json": json.dumps(entry.entities, ensure_ascii=False),
})
self.redis_client.expire(key, entry.ttl)
# ── Public API ──
def get(self, query: str, intent: str) -> Optional[dict]:
"""
Try L1 (exact match) then L2 (semantic match).
Returns dict with 'response', 'source', and metadata, or None.
"""
self._metrics["total_requests"] += 1
normalized = self._normalize(query)
# L1: exact match
l1_result = self._l1_get(normalized)
if l1_result is not None:
return {"response": l1_result, "source": "L1_MEMORY", "similarity": 1.0}
# L2: semantic match
embedding = self._embed_query(normalized)
l2_result = self._l2_search(embedding, intent)
if l2_result is not None:
# Promote to L1 for future exact matches
self._l1_put(normalized, l2_result["response"], self._l1_default_ttl)
return {
"response": l2_result["response"],
"source": "L2_SEMANTIC",
"similarity": l2_result["similarity"],
"original_query": l2_result["original_query"],
}
return None
def put(
self,
query: str,
response: str,
intent: str,
entities: dict[str, str],
model_id: str = "anthropic.claude-3-sonnet-20240229-v1:0",
confidence: float = 1.0,
) -> None:
"""Store response in both L1 and L2."""
normalized = self._normalize(query)
ttl = CacheKeyGenerator.ttl_for_intent(intent)
# L1
self._l1_put(normalized, response, min(ttl, self._l1_default_ttl))
# L2
embedding = self._embed_query(normalized)
entry = CacheEntry(
query_text=normalized,
response_text=response,
embedding=embedding,
intent=intent,
model_id=model_id,
confidence=confidence,
created_at=time.time(),
ttl=ttl,
entities=entities,
)
self._l2_store(entry)
def get_metrics(self) -> dict:
"""Return cache performance metrics."""
total = self._metrics["total_requests"]
return {
**self._metrics,
"l1_hit_rate": self._metrics["l1_hits"] / total if total else 0,
"l2_hit_rate": self._metrics["l2_hits"] / total if total else 0,
"aggregate_hit_rate": (
(self._metrics["l1_hits"] + self._metrics["l2_hits"]) / total
if total else 0
),
}
Cache Invalidation Strategies
Strategy Matrix
| Strategy | Trigger | Scope | Latency | MangaAssist Use Case |
|---|---|---|---|---|
| TTL-Based | Time expiry | Per-entry | Zero (passive) | Product info (1hr), recommendations (4hr), FAQ (24hr) |
| Event-Driven | Business event | Per-intent or per-entity | Seconds (via EventBridge) | Price change invalidates all product_info entries for that title |
| Version-Based | Config change | Global or per-model | Immediate | Prompt version bump flushes all L2 entries for that model |
| Manual | Operator action | Targeted | Immediate | Emergency purge of incorrect cached response |
Event-Driven Invalidation Flow
sequenceDiagram
participant CMS as Catalog CMS
participant EB as EventBridge
participant INV as Invalidation Lambda
participant REDIS as ElastiCache Redis
participant CF as CloudFront
CMS->>EB: PriceChanged event<br/>{title: "One Piece 108", new_price: 528}
EB->>INV: Route to invalidation handler
INV->>REDIS: Scan keys by intent=product_info<br/>+ entity title="One Piece 108"
REDIS-->>INV: Found 14 cached entries
INV->>REDIS: DEL matching keys
INV->>CF: CreateInvalidation /api/products/one-piece-108*
INV-->>EB: Invalidation complete<br/>{entries_removed: 14, cf_invalidation_id: "I3X7..."}
Version-Based Invalidation
import time
import logging
logger = logging.getLogger(__name__)
class CacheVersionManager:
"""
Manages cache versions tied to system prompt and model versions.
When a version changes, all entries for that version are invalidated.
"""
VERSION_KEY = "cache:version:current"
VERSION_HISTORY_KEY = "cache:version:history"
def __init__(self, redis_client):
self.redis = redis_client
def get_current_version(self) -> str:
version = self.redis.get(self.VERSION_KEY)
return version.decode() if version else "v1.0.0"
def bump_version(
self,
new_version: str,
reason: str,
flush_cache: bool = True,
) -> dict:
"""
Bump cache version. Optionally flush all cached entries.
Called when system prompt or model version changes.
"""
old_version = self.get_current_version()
# Record version history
self.redis.rpush(self.VERSION_HISTORY_KEY, f"{new_version}|{reason}|{time.time()}")
# Update current version
self.redis.set(self.VERSION_KEY, new_version)
flushed_count = 0
if flush_cache:
# Scan and delete all cache entries
cursor = 0
while True:
cursor, keys = self.redis.scan(cursor, match="cache:*", count=500)
if keys:
# Don't delete version management keys
cache_keys = [
k for k in keys
if not k.decode().startswith("cache:version:")
]
if cache_keys:
self.redis.delete(*cache_keys)
flushed_count += len(cache_keys)
if cursor == 0:
break
logger.info(
"Cache version bumped %s → %s (reason: %s, flushed: %d entries)",
old_version, new_version, reason, flushed_count,
)
return {
"old_version": old_version,
"new_version": new_version,
"reason": reason,
"entries_flushed": flushed_count,
}
Cache Tier Comparison
| Cache Tier | Hit Rate (MangaAssist) | Latency (Hit) | Latency (Miss Overhead) | Monthly Cost Savings | Staleness Risk | Best For |
|---|---|---|---|---|---|---|
| L1 — In-Memory | ~12% | < 1ms | 0ms (no overhead) | ~$2,160 | Low (short TTL) | Verbatim repeated queries ("track my order") |
| L2 — Semantic Redis | ~28% | 15–25ms | 8ms (embedding generation) | ~$5,040 | Medium (similarity may drift) | Paraphrased manga queries, FAQ variations |
| L3 — Prompt Cache | ~95% (system prompt) | N/A (reduces TTFT) | 0ms | ~$8,100 | None (exact match) | System prompt reuse across all requests |
| L4 — Edge Cache | ~60% (for eligible content) | 5–15ms | 0ms | ~$1,800 | Low–Medium (TTL-based) | FAQ pages, product catalog, shipping info |
| Aggregate | ~40% overall | Avg 12ms | — | ~$17,100/mo | — | — |
Cost Calculation Basis
| Metric | Value |
|---|---|
| Daily messages | 1,000,000 |
| Avg input tokens/request | 1,800 (system prompt + context + query) |
| Avg output tokens/request | 350 |
| Sonnet input cost | $0.003 / 1K tokens |
| Sonnet output cost | $0.015 / 1K tokens |
| Monthly Bedrock cost (no cache) | ~$45,000 |
| Monthly Bedrock cost (with caching) | ~$27,900 |
| Net monthly savings | ~$17,100 |
Key Takeaways
- L1 + L2 together achieve ~40% cache hit rate — reducing both latency (2.8s to 15ms for hits) and Bedrock invocation cost by ~38%.
- Semantic caching is the highest-value tier — paraphrased manga queries ("When does OP 108 come out?" vs "One Piece volume 108 release date?") cluster well in embedding space.
- Prompt caching (L3) is essentially free money — Bedrock caches the identical system prompt across all 1M daily messages at a 90% input token discount.
- Event-driven invalidation is critical for correctness — a stale cached price during a flash sale creates real business damage and customer complaints.
- Order status and account-specific queries must never be cached — these are personalized, real-time, and caching them would leak data across users.