Semantic Cache Implementation for FM Responses
AWS AIP-C01 Task 4.1 — Skill 4.1.4: Design intelligent caching systems for FM applications Context: MangaAssist e-commerce chatbot — Bedrock Claude 3 (Sonnet/Haiku), OpenSearch Serverless, DynamoDB, ECS Fargate, API Gateway WebSocket, ElastiCache Redis. 1M messages/day.
Skill Mapping
| Certification | Domain | Task | Skill |
|---|---|---|---|
| AWS AIP-C01 | Domain 4 — Operational Efficiency | Task 4.1 — Optimize FM applications | Skill 4.1.4 — Implement semantic caching to serve semantically identical queries from cache instead of invoking the FM |
Skill scope: Deep-dive into the L2 semantic cache tier — embedding model selection, Redis vector similarity search (RediSearch VSS), cache entry design, similarity threshold tuning, MangaAssist-specific caching patterns, prompt caching with Bedrock, and cache warming strategies.
Why Semantic Caching for a Manga Chatbot?
Exact-match caching catches only verbatim repeats. But customers ask the same question in dozens of ways:
| Variation | Normalized Form | Same Answer? |
|---|---|---|
| "When does One Piece 108 come out?" | one piece 108 release | Yes |
| "One Piece volume 108 release date" | one piece 108 release | Yes |
| "ワンピース108巻の発売日はいつ?" | one piece 108 release (JP) | Yes |
| "What day is OP vol 108 dropping?" | one piece 108 release | Yes |
| "new one piece volume release" | one piece release (ambiguous) | Maybe |
Exact-match caching hits on zero of these variations. Semantic caching, which compares embedding vectors rather than text, hits on all four confirmed matches above — a 12% hit rate jumps to 28%.
Embedding Model Selection
Candidate Models for Cache Key Embeddings
| Model | Dimensions | Latency (p50) | Cost per 1K Tokens | Quality (MTEB Avg) | MangaAssist Fit |
|---|---|---|---|---|---|
| Amazon Titan Embeddings V2 | 1024 | 8ms | $0.00002 | 0.68 | Best — native AWS, low latency, good multilingual |
| Amazon Titan Embeddings V1 | 1536 | 12ms | $0.0001 | 0.64 | Higher dim = more Redis memory, slightly worse quality |
| Cohere Embed v3 (via Bedrock) | 1024 | 15ms | $0.0001 | 0.72 | Higher quality but 5x cost and higher latency |
Why Titan Embeddings V2 for MangaAssist
graph TD
subgraph "Decision Factors"
A[Low Latency — 8ms p50<br/>Cache lookup must be fast] --> TITAN
B[Low Cost — $0.00002/1K tokens<br/>Every query embeds on miss] --> TITAN
C[1024 Dimensions<br/>Good balance: quality vs Redis memory] --> TITAN
D[Multilingual — JP + EN<br/>MangaAssist serves both languages] --> TITAN
E[Native Bedrock Integration<br/>No cross-service latency] --> TITAN
end
TITAN[Amazon Titan Embeddings V2<br/>Selected Model]
style TITAN fill:#2d6a4f,stroke:#1b4332,color:#fff
Key tradeoff: Cohere Embed v3 scores ~6% higher on retrieval benchmarks, but adds 7ms latency and 5x cost. For cache key similarity (not RAG retrieval), the marginal quality improvement does not justify the overhead. Cache decisions are binary (hit/miss at threshold), not ranked.
Redis Vector Similarity Search (RediSearch VSS)
Index Schema Design
ElastiCache Redis with the RediSearch module supports vector similarity search natively. MangaAssist uses an HNSW (Hierarchical Navigable Small World) index for approximate nearest neighbor search.
graph TD
subgraph "Redis Cache Entry (HASH)"
K[Key: cache:a7f3b2e1...]
F1["embedding: FLOAT32[1024] — Titan V2 vector"]
F2["query_text: TEXT — original normalized query"]
F3["response_text: TEXT — cached FM response"]
F4["intent: TAG — classified intent (manga_release_date)"]
F5["model_id: TAG — anthropic.claude-3-sonnet..."]
F6["confidence: NUMERIC — classification confidence 0.0-1.0"]
F7["created_at: NUMERIC — Unix timestamp"]
F8["ttl: NUMERIC — time-to-live in seconds"]
F9["hit_count: NUMERIC — times this entry was served"]
F10["entities_json: TEXT — JSON of extracted entities"]
F11["language: TAG — en or ja"]
end
K --> F1
K --> F2
K --> F3
K --> F4
K --> F5
K --> F6
K --> F7
K --> F8
K --> F9
K --> F10
K --> F11
style K fill:#e76f51,stroke:#f4a261,color:#fff
Cache Entry Structure — Detailed
| Field | Redis Type | Purpose | Example Value |
|---|---|---|---|
embedding |
VECTOR (FLOAT32, 1024-dim) | Similarity search target | [0.023, -0.041, 0.019, ...] |
query_text |
TEXT | Original query (for debugging/logging) | "one piece 108 release date" |
response_text |
TEXT | Cached FM response to return on hit | "One Piece Vol. 108 releases on..." |
intent |
TAG | Classified intent (used as pre-filter) | manga_release_date |
model_id |
TAG | Model that generated the response | anthropic.claude-3-sonnet-20240229-v1:0 |
confidence |
NUMERIC | Intent classification confidence | 0.94 |
created_at |
NUMERIC (sortable) | Entry creation timestamp | 1711843200.0 |
ttl |
NUMERIC | Original TTL in seconds | 3600 |
hit_count |
NUMERIC (sortable) | Number of cache hits served | 47 |
entities_json |
TEXT | JSON-serialized extracted entities | {"title": "One Piece", "volume": "108"} |
language |
TAG | Detected language of the query | en |
Similarity Threshold Tuning
The cosine similarity threshold is the single most critical parameter in semantic caching. It directly controls the precision/recall tradeoff.
Threshold Impact Analysis
| Threshold | Hit Rate | False Positive Rate | Effect on MangaAssist |
|---|---|---|---|
| 0.85 | ~42% | ~8% | Too many false matches — "Dragon Ball Z" queries return "Dragon Ball Super" answers |
| 0.88 | ~35% | ~3% | Occasional mismatches on similar manga titles within the same franchise |
| 0.92 | ~28% | < 0.5% | Sweet spot — paraphrases match, distinct queries separate |
| 0.95 | ~18% | ~0.1% | Too strict — many valid paraphrases missed |
| 0.98 | ~13% | ~0% | Nearly exact-match only — defeats the purpose of semantic caching |
Why 0.92 for MangaAssist
graph LR
subgraph "Similarity Spectrum"
direction LR
A["0.85<br/>High recall<br/>Many false positives"] ---|"Dangerous zone"| B["0.88"]
B ---|"Better but risky"| C["0.92<br/>✓ MangaAssist<br/>Best tradeoff"]
C ---|"Conservative"| D["0.95"]
D ---|"Too strict"| E["0.98<br/>Nearly exact match"]
end
style C fill:#2d6a4f,stroke:#1b4332,color:#fff
Empirical validation: We tested 500 query pairs from MangaAssist production logs. At 0.92: - 97.3% of true paraphrases are correctly matched (recall) - 99.5% of returned results are actually equivalent queries (precision) - The 0.5% false positive rate translates to ~1,400 incorrect cached responses per day — caught and corrected by the feedback loop
Intent-Specific Thresholds
Not all intents benefit equally from a uniform threshold. MangaAssist uses intent-aware overrides:
INTENT_SIMILARITY_THRESHOLDS = {
# FAQ queries are highly paraphrasable — relax threshold
"faq": 0.88,
"shipping_info": 0.88,
"return_policy": 0.88,
# Manga title queries need precision — titles are close in embedding space
"manga_release_date": 0.93,
"product_info": 0.93,
"manga_search": 0.94,
# Recommendations are personalized — tight threshold to avoid cross-user leakage
"recommendation": 0.96,
# Default
"default": 0.92,
}
def get_threshold(intent: str) -> float:
"""Return the similarity threshold for a given intent."""
return INTENT_SIMILARITY_THRESHOLDS.get(
intent,
INTENT_SIMILARITY_THRESHOLDS["default"],
)
MangaAssist-Specific Caching Patterns
Pattern 1 — Manga Title Queries Cluster Well
Manga title queries form tight clusters in embedding space because the title and volume number dominate the embedding vector. Different phrasings about the same title+volume produce embeddings within cosine distance 0.03–0.06 of each other.
| Query Cluster | Intra-Cluster Cosine Similarity | Cache Hit Rate | Why |
|---|---|---|---|
| Manga title + volume queries | 0.94–0.98 | ~35% | Title dominates embedding; phrasings vary little |
| Shipping/policy questions | 0.91–0.96 | ~40% | Finite question space; customers use similar language |
| Greeting / small talk | 0.93–0.99 | ~55% | Very limited variation ("hi", "hello", "hey") |
| Order status queries | N/A — never cached | 0% | Personalized, real-time; must always invoke backend |
| Recommendations | 0.70–0.88 | ~8% | Highly personalized; same question ≠ same answer for different users |
Pattern 2 — Recommendation Queries Are Personalized
Recommendation queries like "suggest a manga similar to Chainsaw Man" have low cacheability because the correct answer depends on the user's purchase history, reading preferences, and browsing context. Two users asking the exact same question should get different answers.
Mitigation: Cache recommendations only when the user segment (not individual user) matches. MangaAssist defines 12 user segments (shonen-fan, shojo-fan, seinen-collector, etc.). The cache key includes the segment:
def recommendation_cache_key(query_embedding: list[float], user_segment: str) -> str:
"""
For recommendation queries, include user segment in the cache key
to avoid cross-segment response leakage.
"""
import hashlib
import numpy as np
# Append segment as a suffix to the embedding-based lookup
# This is done at the Redis filter level, not in the embedding itself
return f"rec:{user_segment}" # Used as TAG filter in FT.SEARCH
Pattern 3 — Shipping Questions Are Highly Cacheable
Shipping policy questions have a finite answer space and rarely change. MangaAssist has ~50 shipping-related FAQ entries. With semantic caching, any phrasing that maps to one of these 50 entries returns the cached answer.
Hit rate: ~40% of shipping queries are served from cache.
Prompt Caching with Bedrock
Bedrock Prompt Caching Architecture
Bedrock's prompt caching feature caches the processed system prompt at the model provider level. This is fundamentally different from application-level semantic caching — it reduces time-to-first-token (TTFT) and input token cost, not invocation count.
sequenceDiagram
participant APP as MangaAssist<br/>ECS Fargate
participant BED as Bedrock<br/>Converse API
participant PCACHE as Prompt Cache<br/>(Provider-Managed)
participant MODEL as Claude 3 Sonnet
Note over APP,MODEL: First request — cache WRITE
APP->>BED: converse(system=[{text: "...", cachePoint: {type: default}}], messages=[...])
BED->>PCACHE: Hash system prompt → cache key
PCACHE-->>BED: MISS
BED->>MODEL: Process full system prompt (2,200 tokens) + user message
MODEL-->>BED: Response
BED->>PCACHE: Store processed system prompt
BED-->>APP: Response + usage{cacheWriteInputTokens: 2200}
Note over APP,MODEL: Subsequent requests — cache READ
APP->>BED: converse(system=[{text: "...", cachePoint: {type: default}}], messages=[...])
BED->>PCACHE: Hash system prompt → cache key
PCACHE-->>BED: HIT — return processed prefix
BED->>MODEL: Cached prefix + user message (skip re-processing 2,200 tokens)
MODEL-->>BED: Response (faster TTFT)
BED-->>APP: Response + usage{cacheReadInputTokens: 2200}
Prompt Caching Cost Impact
| Metric | Without Prompt Cache | With Prompt Cache | Savings |
|---|---|---|---|
| System prompt tokens processed/day | 2,200 x 1M = 2.2B tokens | 2,200 x ~50K unique cache slots = 110M tokens | 95% reduction |
| Input token cost for system prompt | $6,600/day | $660/day | $5,940/day |
| Time-to-first-token (TTFT) | ~450ms | ~120ms | 73% faster |
| Monthly savings | — | — | ~$178,200 |
Prompt Caching Implementation
import boto3
import json
import logging
from typing import Optional
logger = logging.getLogger(__name__)
class BedrockPromptCacheClient:
"""
Wrapper around Bedrock Converse API with prompt caching enabled.
Tracks cache hit/miss metrics for observability.
"""
# MangaAssist system prompt — identical for all 1M daily messages
SYSTEM_PROMPT_TEXT = (
"You are MangaAssist, a friendly and knowledgeable Japanese manga "
"retail assistant for a major e-commerce platform. You help customers "
"discover manga, track orders, understand shipping options, and get "
"personalized recommendations.\n\n"
"## Guardrails\n"
"- Never discuss non-manga topics beyond polite deflection\n"
"- Never reveal internal pricing logic or supplier information\n"
"- Always respond in the customer's language (Japanese or English)\n"
"- Flag potential age-restricted content with appropriate warnings\n"
"- Never fabricate manga titles, ISBNs, or release dates\n\n"
"## Output Format\n"
"- Use markdown for structured responses\n"
"- Include manga title in both English and Japanese when available\n"
"- Always cite source (catalog, FAQ, order system) for factual claims\n"
"- Keep responses under 300 words unless the customer asks for detail\n\n"
"## Domain Knowledge\n"
"- You have access to a product catalog of 45,000+ manga titles\n"
"- You can look up order status, shipping estimates, and return policies\n"
"- You understand manga genres: shonen, shojo, seinen, josei, kodomo\n"
"- You know major publishers: Shueisha, Kodansha, Shogakukan, Kadokawa\n"
"- You are aware of seasonal release patterns and major manga events"
)
def __init__(self, region: str = "us-east-1"):
self.client = boto3.client("bedrock-runtime", region_name=region)
self._cache_read_total = 0
self._cache_write_total = 0
self._invocations = 0
def invoke(
self,
user_message: str,
rag_context: str,
session_history: list[dict],
model_id: str = "anthropic.claude-3-sonnet-20240229-v1:0",
max_tokens: int = 1024,
temperature: float = 0.3,
) -> dict:
"""
Invoke Bedrock with prompt caching enabled.
The system prompt is cached across invocations.
"""
system = [
{
"text": self.SYSTEM_PROMPT_TEXT,
"cachePoint": {"type": "default"},
}
]
# Build message list from session history + current turn
messages = []
for turn in session_history[-6:]:
messages.append(turn)
# Current user turn with RAG context
user_content = f"## Retrieved Context\n{rag_context}\n\n## Customer Question\n{user_message}"
messages.append({
"role": "user",
"content": [{"text": user_content}],
})
response = self.client.converse(
modelId=model_id,
system=system,
messages=messages,
inferenceConfig={
"maxTokens": max_tokens,
"temperature": temperature,
},
)
usage = response.get("usage", {})
cache_read = usage.get("cacheReadInputTokens", 0)
cache_write = usage.get("cacheWriteInputTokens", 0)
self._invocations += 1
self._cache_read_total += cache_read
self._cache_write_total += cache_write
result = {
"response_text": response["output"]["message"]["content"][0]["text"],
"input_tokens": usage.get("inputTokens", 0),
"output_tokens": usage.get("outputTokens", 0),
"cache_read_tokens": cache_read,
"cache_write_tokens": cache_write,
"prompt_cache_hit": cache_read > 0,
"stop_reason": response.get("stopReason", "unknown"),
}
logger.info(
"Bedrock invocation: cache_hit=%s, cache_read=%d, cache_write=%d, "
"input=%d, output=%d",
result["prompt_cache_hit"],
cache_read,
cache_write,
result["input_tokens"],
result["output_tokens"],
)
return result
@property
def prompt_cache_hit_rate(self) -> float:
"""Fraction of invocations where prompt cache was read."""
if self._invocations == 0:
return 0.0
# If cache_read_total > 0 for an invocation, it's a hit
# Approximation: cache_read_total / (expected_system_tokens * invocations)
expected_total = 2200 * self._invocations
return min(self._cache_read_total / expected_total, 1.0) if expected_total else 0.0
Cache Warming Strategies
Cache warming pre-populates the semantic cache with high-probability queries so that the first user to ask a question gets a cache hit instead of a cold-start miss.
Warming Strategy Matrix
| Strategy | Trigger | Volume | MangaAssist Example |
|---|---|---|---|
| Top-N FAQ Warming | Daily cron (6:00 AM JST) | Top 100 FAQ queries | "How do I return a manga?", "What's your shipping policy?" |
| New Release Warming | Catalog update event | 10–20 queries per new title | "When does [title] release?", "[title] price", "[title] review" |
| Seasonal Warming | Calendar trigger | 50–100 queries | "Holiday shipping deadlines", "Gift wrapping available?" |
| Post-Invalidation Warming | After cache flush | Re-populate invalidated entries | After price change, re-cache updated product info |
Cache Warming Implementation
import json
import logging
import time
from typing import Optional
import boto3
logger = logging.getLogger(__name__)
class CacheWarmer:
"""
Pre-populates the semantic cache with high-probability queries.
Runs as a scheduled ECS task or triggered by EventBridge events.
"""
# Top 100 MangaAssist FAQ queries — curated from production logs
TOP_FAQ_QUERIES = [
{"query": "How do I return a manga?", "intent": "return_policy"},
{"query": "What is your shipping policy?", "intent": "shipping_info"},
{"query": "Do you ship internationally?", "intent": "shipping_info"},
{"query": "How long does shipping take?", "intent": "shipping_info"},
{"query": "Can I track my order?", "intent": "order_status"},
{"query": "Do you have gift wrapping?", "intent": "faq"},
{"query": "What payment methods do you accept?", "intent": "faq"},
{"query": "How do I cancel my order?", "intent": "faq"},
{"query": "Do you sell digital manga?", "intent": "faq"},
{"query": "What genres do you carry?", "intent": "faq"},
# ... remaining 90 queries
]
def __init__(
self,
semantic_cache, # SemanticCacheManager instance
bedrock_client, # BedrockPromptCacheClient instance
rag_retriever, # RAG retrieval function
):
self.cache = semantic_cache
self.bedrock = bedrock_client
self.retriever = rag_retriever
def warm_faq_cache(self) -> dict:
"""
Pre-populate cache with top FAQ queries.
Called daily at 6:00 AM JST via EventBridge scheduled rule.
"""
results = {"warmed": 0, "skipped": 0, "errors": 0}
for item in self.TOP_FAQ_QUERIES:
query = item["query"]
intent = item["intent"]
try:
# Check if already cached (avoid redundant Bedrock calls)
existing = self.cache.get(query=query, intent=intent)
if existing is not None:
results["skipped"] += 1
continue
# Generate fresh response via full RAG pipeline
rag_context = self.retriever(query)
response = self.bedrock.invoke(
user_message=query,
rag_context=rag_context,
session_history=[],
)
# Store in cache
self.cache.put(
query=query,
response=response["response_text"],
intent=intent,
entities={},
confidence=1.0,
)
results["warmed"] += 1
# Rate limit to avoid Bedrock throttling
time.sleep(0.1)
except Exception as e:
logger.error("Failed to warm cache for '%s': %s", query, e)
results["errors"] += 1
logger.info("Cache warming complete: %s", results)
return results
def warm_new_release(self, title: str, volume: Optional[str] = None) -> dict:
"""
Pre-populate cache with common queries about a new manga release.
Triggered by catalog update event via EventBridge.
"""
volume_str = f" volume {volume}" if volume else ""
query_templates = [
f"When does {title}{volume_str} release?",
f"How much is {title}{volume_str}?",
f"Is {title}{volume_str} available for pre-order?",
f"{title}{volume_str} release date",
f"Do you have {title}{volume_str}?",
]
# Add Japanese variants if title has a known JP mapping
# (lookup from DynamoDB title_mapping table)
results = {"warmed": 0, "errors": 0}
for query in query_templates:
try:
rag_context = self.retriever(query)
response = self.bedrock.invoke(
user_message=query,
rag_context=rag_context,
session_history=[],
)
self.cache.put(
query=query,
response=response["response_text"],
intent="product_info",
entities={"title": title, "volume": volume or ""},
confidence=1.0,
)
results["warmed"] += 1
time.sleep(0.1)
except Exception as e:
logger.error("Warming failed for '%s': %s", query, e)
results["errors"] += 1
return results
Full RedisSemanticCache Implementation
import hashlib
import json
import logging
import time
from dataclasses import dataclass, field
from typing import Any, Optional
import boto3
import numpy as np
import redis
logger = logging.getLogger(__name__)
INTENT_TTL_MAP = {
"product_info": 3600,
"manga_release_date": 3600,
"recommendation": 14400,
"shipping_info": 7200,
"faq": 86400,
"order_status": 0,
"greeting": 86400,
"return_policy": 43200,
"manga_search": 1800,
}
INTENT_THRESHOLD_MAP = {
"faq": 0.88,
"shipping_info": 0.88,
"return_policy": 0.88,
"greeting": 0.85,
"manga_release_date": 0.93,
"product_info": 0.93,
"manga_search": 0.94,
"recommendation": 0.96,
}
DEFAULT_THRESHOLD = 0.92
DEFAULT_TTL = 1800
@dataclass
class SemanticCacheConfig:
"""Configuration for the semantic cache."""
redis_url: str
embedding_model_id: str = "amazon.titan-embed-text-v2:0"
embedding_dim: int = 1024
default_threshold: float = 0.92
l1_max_size: int = 5_000
l1_default_ttl: int = 300
max_redis_memory_mb: int = 4096
index_name: str = "mangaassist_cache_idx"
key_prefix: str = "sc:"
enable_l1: bool = True
enable_metrics: bool = True
class RedisSemanticCache:
"""
Production-grade semantic cache for MangaAssist.
Features:
- L1 in-memory exact-match cache (per ECS container)
- L2 Redis vector similarity cache (shared across containers)
- Intent-aware similarity thresholds
- Intent-aware TTLs
- Cache hit promotion (L2 hit → L1 write)
- Memory budget enforcement
- Comprehensive metrics emission
Architecture:
User Query → Normalize → L1 Lookup → L2 VSS Lookup → (miss) → Bedrock
↓ hit ↓ hit
Return Promote to L1 + Return
"""
def __init__(self, config: SemanticCacheConfig):
self.config = config
self.redis = redis.Redis.from_url(
config.redis_url,
decode_responses=False,
socket_timeout=5,
socket_connect_timeout=5,
retry_on_timeout=True,
)
self.bedrock = boto3.client("bedrock-runtime", region_name="us-east-1")
# L1 cache
self._l1: dict[str, dict[str, Any]] = {}
# Metrics
self._metrics = {
"l1_hits": 0,
"l1_misses": 0,
"l2_hits": 0,
"l2_misses": 0,
"l2_false_positives_reported": 0,
"embed_calls": 0,
"embed_latency_ms_total": 0,
"cache_writes": 0,
"cache_evictions": 0,
"total_lookups": 0,
}
self._ensure_index()
# ── Index Management ──
def _ensure_index(self) -> None:
"""Create RediSearch vector index if not present."""
try:
self.redis.execute_command("FT.INFO", self.config.index_name)
logger.info("Redis index '%s' exists", self.config.index_name)
except redis.ResponseError:
self._create_index()
def _create_index(self) -> None:
"""Create HNSW vector index on Redis."""
cmd = [
"FT.CREATE", self.config.index_name,
"ON", "HASH",
"PREFIX", "1", self.config.key_prefix,
"SCHEMA",
"vec", "VECTOR", "HNSW", "8",
"TYPE", "FLOAT32",
"DIM", str(self.config.embedding_dim),
"DISTANCE_METRIC", "COSINE",
"EF_CONSTRUCTION", "200",
"query", "TEXT",
"response", "TEXT", "NOINDEX",
"intent", "TAG",
"model", "TAG",
"lang", "TAG",
"ts", "NUMERIC", "SORTABLE",
"hits", "NUMERIC", "SORTABLE",
"conf", "NUMERIC",
"ents", "TEXT", "NOINDEX",
]
self.redis.execute_command(*cmd)
logger.info("Created Redis vector index '%s'", self.config.index_name)
# ── Embedding ──
def _embed(self, text: str) -> list[float]:
"""Generate embedding via Titan Embeddings V2."""
start = time.monotonic()
resp = self.bedrock.invoke_model(
modelId=self.config.embedding_model_id,
body=json.dumps({"inputText": text}),
)
vec = json.loads(resp["body"].read())["embedding"]
elapsed = (time.monotonic() - start) * 1000
self._metrics["embed_calls"] += 1
self._metrics["embed_latency_ms_total"] += elapsed
return vec
# ── Normalization ──
@staticmethod
def _normalize(text: str) -> str:
"""Normalize query text for consistent caching."""
import re
t = text.lower().strip()
t = re.sub(r"\s+", " ", t)
t = re.sub(r"[^\w\s\-\u3000-\u9fff\uf900-\ufaff]", "", t)
return t
@staticmethod
def _l1_key(normalized: str, intent: str) -> str:
return hashlib.md5(f"{normalized}|{intent}".encode()).hexdigest()
# ── L1 Operations ──
def _l1_get(self, normalized: str, intent: str) -> Optional[str]:
key = self._l1_key(normalized, intent)
entry = self._l1.get(key)
if entry and time.time() < entry["exp"]:
self._metrics["l1_hits"] += 1
return entry["resp"]
if entry:
del self._l1[key]
self._metrics["l1_misses"] += 1
return None
def _l1_put(self, normalized: str, intent: str, response: str, ttl: int) -> None:
if not self.config.enable_l1:
return
key = self._l1_key(normalized, intent)
if len(self._l1) >= self.config.l1_max_size:
oldest = min(self._l1, key=lambda k: self._l1[k]["exp"])
del self._l1[oldest]
self._metrics["cache_evictions"] += 1
self._l1[key] = {"resp": response, "exp": time.time() + ttl}
# ── L2 Operations ──
def _l2_search(
self,
embedding: list[float],
intent: str,
language: str = "en",
) -> Optional[dict]:
"""Vector similarity search with intent and language pre-filtering."""
threshold = INTENT_THRESHOLD_MAP.get(intent, DEFAULT_THRESHOLD)
vec_bytes = np.array(embedding, dtype=np.float32).tobytes()
# Build filter: intent + language
filter_expr = f"@intent:{{{intent}}} @lang:{{{language}}}"
query = f"({filter_expr})=>[KNN 3 @vec $blob AS dist]"
try:
results = self.redis.execute_command(
"FT.SEARCH", self.config.index_name, query,
"PARAMS", "2", "blob", vec_bytes,
"SORTBY", "dist", "ASC",
"LIMIT", "0", "1",
"RETURN", "5", "response", "query", "dist", "hits", "conf",
"DIALECT", "2",
)
except redis.ResponseError as e:
logger.error("L2 search error: %s", e)
self._metrics["l2_misses"] += 1
return None
if results[0] == 0:
self._metrics["l2_misses"] += 1
return None
doc_key = results[1]
fields = self._parse_hash_fields(results[2])
distance = float(fields.get("dist", 2.0))
similarity = 1.0 - (distance / 2.0)
if similarity < threshold:
self._metrics["l2_misses"] += 1
logger.debug(
"L2 below threshold: sim=%.4f < threshold=%.4f for intent=%s",
similarity, threshold, intent,
)
return None
self._metrics["l2_hits"] += 1
# Increment hit counter
self.redis.hincrby(doc_key, "hits", 1)
return {
"response": fields["response"],
"original_query": fields.get("query", ""),
"similarity": similarity,
"confidence": float(fields.get("conf", 0)),
"hit_count": int(fields.get("hits", 0)) + 1,
}
def _l2_store(
self,
normalized: str,
response: str,
embedding: list[float],
intent: str,
model_id: str,
confidence: float,
entities: dict,
language: str,
ttl: int,
) -> None:
"""Store entry in Redis with vector embedding and TTL."""
uid = hashlib.sha256(
f"{normalized}|{intent}|{time.time()}".encode()
).hexdigest()[:16]
key = f"{self.config.key_prefix}{uid}"
vec_bytes = np.array(embedding, dtype=np.float32).tobytes()
self.redis.hset(key, mapping={
"vec": vec_bytes,
"query": normalized,
"response": response,
"intent": intent,
"model": model_id,
"lang": language,
"ts": time.time(),
"hits": 0,
"conf": confidence,
"ents": json.dumps(entities, ensure_ascii=False),
})
if ttl > 0:
self.redis.expire(key, ttl)
self._metrics["cache_writes"] += 1
# ── Public API ──
def lookup(
self,
query: str,
intent: str,
language: str = "en",
) -> Optional[dict]:
"""
Look up a query in L1 then L2.
Returns:
dict with keys: response, source, similarity, original_query
None if cache miss on all tiers
"""
self._metrics["total_lookups"] += 1
# Never cache order status or account queries
if intent in ("order_status", "account_info"):
return None
normalized = self._normalize(query)
# L1
if self.config.enable_l1:
l1_resp = self._l1_get(normalized, intent)
if l1_resp is not None:
return {
"response": l1_resp,
"source": "L1_MEMORY",
"similarity": 1.0,
}
# L2
embedding = self._embed(normalized)
l2_result = self._l2_search(embedding, intent, language)
if l2_result is not None:
# Promote to L1
ttl = INTENT_TTL_MAP.get(intent, DEFAULT_TTL)
self._l1_put(normalized, intent, l2_result["response"], min(ttl, 300))
return {
"response": l2_result["response"],
"source": "L2_SEMANTIC",
"similarity": l2_result["similarity"],
"original_query": l2_result["original_query"],
}
return None
def store(
self,
query: str,
response: str,
intent: str,
entities: dict[str, str],
model_id: str = "anthropic.claude-3-sonnet-20240229-v1:0",
confidence: float = 1.0,
language: str = "en",
) -> None:
"""Store a fresh response in L1 + L2."""
if intent in ("order_status", "account_info"):
return # Never cache these
ttl = INTENT_TTL_MAP.get(intent, DEFAULT_TTL)
if ttl == 0:
return # TTL=0 means "do not cache"
normalized = self._normalize(query)
# L1
self._l1_put(normalized, intent, response, min(ttl, 300))
# L2
embedding = self._embed(normalized)
self._l2_store(
normalized=normalized,
response=response,
embedding=embedding,
intent=intent,
model_id=model_id,
confidence=confidence,
entities=entities,
language=language,
ttl=ttl,
)
def report_false_positive(self, query: str, intent: str) -> None:
"""
Called when a user flags a cached response as incorrect.
Removes the matching entry and adjusts metrics.
"""
self._metrics["l2_false_positives_reported"] += 1
normalized = self._normalize(query)
# Remove from L1
key = self._l1_key(normalized, intent)
self._l1.pop(key, None)
# Note: L2 removal requires finding the exact Redis key,
# which would need a secondary index or scan. In production,
# flagged entries are logged and batch-removed by a cleanup job.
logger.warning("False positive reported: query='%s', intent='%s'", query, intent)
def get_metrics(self) -> dict:
"""Return comprehensive cache metrics."""
total = self._metrics["total_lookups"]
l1h = self._metrics["l1_hits"]
l2h = self._metrics["l2_hits"]
embed_calls = self._metrics["embed_calls"]
return {
**self._metrics,
"l1_hit_rate": l1h / total if total else 0,
"l2_hit_rate": l2h / total if total else 0,
"aggregate_hit_rate": (l1h + l2h) / total if total else 0,
"avg_embed_latency_ms": (
self._metrics["embed_latency_ms_total"] / embed_calls
if embed_calls else 0
),
}
# ── Helpers ──
@staticmethod
def _parse_hash_fields(raw_fields: list) -> dict[str, str]:
"""Parse Redis HASH field pairs into a dict."""
d = {}
for i in range(0, len(raw_fields), 2):
k = raw_fields[i].decode() if isinstance(raw_fields[i], bytes) else raw_fields[i]
v = raw_fields[i + 1].decode() if isinstance(raw_fields[i + 1], bytes) else raw_fields[i + 1]
d[k] = v
return d
Cache Hit vs Cache Miss — Sequence Diagram
sequenceDiagram
participant U as User
participant GW as API Gateway<br/>WebSocket
participant ECS as ECS Fargate<br/>Orchestrator
participant L1 as L1 In-Memory
participant EMB as Titan Embeddings V2
participant L2 as ElastiCache Redis<br/>Semantic Cache
participant RAG as OpenSearch<br/>Vector Store
participant BED as Bedrock Claude 3
participant DDB as DynamoDB<br/>Session Store
Note over U,DDB: Scenario A — Cache HIT (L2 Semantic)
U->>GW: "When's One Piece 108 out?"
GW->>ECS: Route to container
ECS->>ECS: Normalize → "one piece 108 release date"
ECS->>ECS: Classify intent → manga_release_date
ECS->>L1: Exact-match lookup
L1-->>ECS: MISS
ECS->>EMB: Embed normalized query
EMB-->>ECS: 1024-dim vector (8ms)
ECS->>L2: FT.SEARCH with vector + intent filter
L2-->>ECS: HIT (sim=0.96, original="one piece volume 108 release")
ECS->>L1: Promote to L1 for future exact hits
ECS->>DDB: Log cache hit event
ECS-->>GW: Cached response (total: 35ms)
GW-->>U: "One Piece Vol. 108 releases on..."
Note over U,DDB: Scenario B — Cache MISS (full pipeline)
U->>GW: "Is Chainsaw Man Part 2 getting an anime?"
GW->>ECS: Route to container
ECS->>ECS: Normalize + classify → manga_news (not cached)
ECS->>L1: MISS
ECS->>EMB: Embed query
EMB-->>ECS: 1024-dim vector (8ms)
ECS->>L2: FT.SEARCH
L2-->>ECS: MISS (best match sim=0.81 < 0.92 threshold)
ECS->>DDB: Load session history
DDB-->>ECS: Last 3 turns
ECS->>RAG: Vector search for relevant context
RAG-->>ECS: Top 5 chunks (120ms)
ECS->>BED: Converse API (system prompt cached by L3)
BED-->>ECS: Generated response (2400ms)
ECS->>L2: Store response + embedding
ECS->>L1: Store exact-match entry
ECS->>DDB: Save to session history
ECS-->>GW: Fresh response (total: 2650ms)
GW-->>U: "As of now, Chainsaw Man Part 2..."
Key Takeaways
- Titan Embeddings V2 is the right model for cache keys — lowest latency (8ms), lowest cost ($0.00002/1K tokens), good multilingual support for JP+EN, and sufficient quality for binary hit/miss decisions.
- 0.92 cosine similarity threshold balances precision and recall — but intent-specific overrides are essential. FAQ at 0.88, manga title queries at 0.93, recommendations at 0.96.
- Recommendation queries should include user segment in the cache key — otherwise you serve User A's recommendations to User B.
- Prompt caching (L3) and semantic caching (L2) are complementary — L3 reduces per-token cost for every request; L2 eliminates entire invocations. Together they cut monthly Bedrock cost by ~60%.
- Cache warming is not optional — without it, every morning has a cold-start period where the first user to ask each common question gets 2.8s latency instead of 35ms.
- Never cache order_status or account_info intents — these are personalized, real-time, and caching them creates data leakage risks.