LOCAL PREVIEW View on GitHub

Semantic Cache Implementation for FM Responses

AWS AIP-C01 Task 4.1 — Skill 4.1.4: Design intelligent caching systems for FM applications Context: MangaAssist e-commerce chatbot — Bedrock Claude 3 (Sonnet/Haiku), OpenSearch Serverless, DynamoDB, ECS Fargate, API Gateway WebSocket, ElastiCache Redis. 1M messages/day.


Skill Mapping

Certification Domain Task Skill
AWS AIP-C01 Domain 4 — Operational Efficiency Task 4.1 — Optimize FM applications Skill 4.1.4 — Implement semantic caching to serve semantically identical queries from cache instead of invoking the FM

Skill scope: Deep-dive into the L2 semantic cache tier — embedding model selection, Redis vector similarity search (RediSearch VSS), cache entry design, similarity threshold tuning, MangaAssist-specific caching patterns, prompt caching with Bedrock, and cache warming strategies.


Why Semantic Caching for a Manga Chatbot?

Exact-match caching catches only verbatim repeats. But customers ask the same question in dozens of ways:

Variation Normalized Form Same Answer?
"When does One Piece 108 come out?" one piece 108 release Yes
"One Piece volume 108 release date" one piece 108 release Yes
"ワンピース108巻の発売日はいつ?" one piece 108 release (JP) Yes
"What day is OP vol 108 dropping?" one piece 108 release Yes
"new one piece volume release" one piece release (ambiguous) Maybe

Exact-match caching hits on zero of these variations. Semantic caching, which compares embedding vectors rather than text, hits on all four confirmed matches above — a 12% hit rate jumps to 28%.


Embedding Model Selection

Candidate Models for Cache Key Embeddings

Model Dimensions Latency (p50) Cost per 1K Tokens Quality (MTEB Avg) MangaAssist Fit
Amazon Titan Embeddings V2 1024 8ms $0.00002 0.68 Best — native AWS, low latency, good multilingual
Amazon Titan Embeddings V1 1536 12ms $0.0001 0.64 Higher dim = more Redis memory, slightly worse quality
Cohere Embed v3 (via Bedrock) 1024 15ms $0.0001 0.72 Higher quality but 5x cost and higher latency

Why Titan Embeddings V2 for MangaAssist

graph TD
    subgraph "Decision Factors"
        A[Low Latency — 8ms p50<br/>Cache lookup must be fast] --> TITAN
        B[Low Cost — $0.00002/1K tokens<br/>Every query embeds on miss] --> TITAN
        C[1024 Dimensions<br/>Good balance: quality vs Redis memory] --> TITAN
        D[Multilingual — JP + EN<br/>MangaAssist serves both languages] --> TITAN
        E[Native Bedrock Integration<br/>No cross-service latency] --> TITAN
    end

    TITAN[Amazon Titan Embeddings V2<br/>Selected Model]

    style TITAN fill:#2d6a4f,stroke:#1b4332,color:#fff

Key tradeoff: Cohere Embed v3 scores ~6% higher on retrieval benchmarks, but adds 7ms latency and 5x cost. For cache key similarity (not RAG retrieval), the marginal quality improvement does not justify the overhead. Cache decisions are binary (hit/miss at threshold), not ranked.


Redis Vector Similarity Search (RediSearch VSS)

Index Schema Design

ElastiCache Redis with the RediSearch module supports vector similarity search natively. MangaAssist uses an HNSW (Hierarchical Navigable Small World) index for approximate nearest neighbor search.

graph TD
    subgraph "Redis Cache Entry (HASH)"
        K[Key: cache:a7f3b2e1...]
        F1["embedding: FLOAT32[1024] — Titan V2 vector"]
        F2["query_text: TEXT — original normalized query"]
        F3["response_text: TEXT — cached FM response"]
        F4["intent: TAG — classified intent (manga_release_date)"]
        F5["model_id: TAG — anthropic.claude-3-sonnet..."]
        F6["confidence: NUMERIC — classification confidence 0.0-1.0"]
        F7["created_at: NUMERIC — Unix timestamp"]
        F8["ttl: NUMERIC — time-to-live in seconds"]
        F9["hit_count: NUMERIC — times this entry was served"]
        F10["entities_json: TEXT — JSON of extracted entities"]
        F11["language: TAG — en or ja"]
    end

    K --> F1
    K --> F2
    K --> F3
    K --> F4
    K --> F5
    K --> F6
    K --> F7
    K --> F8
    K --> F9
    K --> F10
    K --> F11

    style K fill:#e76f51,stroke:#f4a261,color:#fff

Cache Entry Structure — Detailed

Field Redis Type Purpose Example Value
embedding VECTOR (FLOAT32, 1024-dim) Similarity search target [0.023, -0.041, 0.019, ...]
query_text TEXT Original query (for debugging/logging) "one piece 108 release date"
response_text TEXT Cached FM response to return on hit "One Piece Vol. 108 releases on..."
intent TAG Classified intent (used as pre-filter) manga_release_date
model_id TAG Model that generated the response anthropic.claude-3-sonnet-20240229-v1:0
confidence NUMERIC Intent classification confidence 0.94
created_at NUMERIC (sortable) Entry creation timestamp 1711843200.0
ttl NUMERIC Original TTL in seconds 3600
hit_count NUMERIC (sortable) Number of cache hits served 47
entities_json TEXT JSON-serialized extracted entities {"title": "One Piece", "volume": "108"}
language TAG Detected language of the query en

Similarity Threshold Tuning

The cosine similarity threshold is the single most critical parameter in semantic caching. It directly controls the precision/recall tradeoff.

Threshold Impact Analysis

Threshold Hit Rate False Positive Rate Effect on MangaAssist
0.85 ~42% ~8% Too many false matches — "Dragon Ball Z" queries return "Dragon Ball Super" answers
0.88 ~35% ~3% Occasional mismatches on similar manga titles within the same franchise
0.92 ~28% < 0.5% Sweet spot — paraphrases match, distinct queries separate
0.95 ~18% ~0.1% Too strict — many valid paraphrases missed
0.98 ~13% ~0% Nearly exact-match only — defeats the purpose of semantic caching

Why 0.92 for MangaAssist

graph LR
    subgraph "Similarity Spectrum"
        direction LR
        A["0.85<br/>High recall<br/>Many false positives"] ---|"Dangerous zone"| B["0.88"]
        B ---|"Better but risky"| C["0.92<br/>✓ MangaAssist<br/>Best tradeoff"]
        C ---|"Conservative"| D["0.95"]
        D ---|"Too strict"| E["0.98<br/>Nearly exact match"]
    end

    style C fill:#2d6a4f,stroke:#1b4332,color:#fff

Empirical validation: We tested 500 query pairs from MangaAssist production logs. At 0.92: - 97.3% of true paraphrases are correctly matched (recall) - 99.5% of returned results are actually equivalent queries (precision) - The 0.5% false positive rate translates to ~1,400 incorrect cached responses per day — caught and corrected by the feedback loop

Intent-Specific Thresholds

Not all intents benefit equally from a uniform threshold. MangaAssist uses intent-aware overrides:

INTENT_SIMILARITY_THRESHOLDS = {
    # FAQ queries are highly paraphrasable — relax threshold
    "faq": 0.88,
    "shipping_info": 0.88,
    "return_policy": 0.88,

    # Manga title queries need precision — titles are close in embedding space
    "manga_release_date": 0.93,
    "product_info": 0.93,
    "manga_search": 0.94,

    # Recommendations are personalized — tight threshold to avoid cross-user leakage
    "recommendation": 0.96,

    # Default
    "default": 0.92,
}


def get_threshold(intent: str) -> float:
    """Return the similarity threshold for a given intent."""
    return INTENT_SIMILARITY_THRESHOLDS.get(
        intent,
        INTENT_SIMILARITY_THRESHOLDS["default"],
    )

MangaAssist-Specific Caching Patterns

Pattern 1 — Manga Title Queries Cluster Well

Manga title queries form tight clusters in embedding space because the title and volume number dominate the embedding vector. Different phrasings about the same title+volume produce embeddings within cosine distance 0.03–0.06 of each other.

Query Cluster Intra-Cluster Cosine Similarity Cache Hit Rate Why
Manga title + volume queries 0.94–0.98 ~35% Title dominates embedding; phrasings vary little
Shipping/policy questions 0.91–0.96 ~40% Finite question space; customers use similar language
Greeting / small talk 0.93–0.99 ~55% Very limited variation ("hi", "hello", "hey")
Order status queries N/A — never cached 0% Personalized, real-time; must always invoke backend
Recommendations 0.70–0.88 ~8% Highly personalized; same question ≠ same answer for different users

Pattern 2 — Recommendation Queries Are Personalized

Recommendation queries like "suggest a manga similar to Chainsaw Man" have low cacheability because the correct answer depends on the user's purchase history, reading preferences, and browsing context. Two users asking the exact same question should get different answers.

Mitigation: Cache recommendations only when the user segment (not individual user) matches. MangaAssist defines 12 user segments (shonen-fan, shojo-fan, seinen-collector, etc.). The cache key includes the segment:

def recommendation_cache_key(query_embedding: list[float], user_segment: str) -> str:
    """
    For recommendation queries, include user segment in the cache key
    to avoid cross-segment response leakage.
    """
    import hashlib
    import numpy as np

    # Append segment as a suffix to the embedding-based lookup
    # This is done at the Redis filter level, not in the embedding itself
    return f"rec:{user_segment}"  # Used as TAG filter in FT.SEARCH

Pattern 3 — Shipping Questions Are Highly Cacheable

Shipping policy questions have a finite answer space and rarely change. MangaAssist has ~50 shipping-related FAQ entries. With semantic caching, any phrasing that maps to one of these 50 entries returns the cached answer.

Hit rate: ~40% of shipping queries are served from cache.


Prompt Caching with Bedrock

Bedrock Prompt Caching Architecture

Bedrock's prompt caching feature caches the processed system prompt at the model provider level. This is fundamentally different from application-level semantic caching — it reduces time-to-first-token (TTFT) and input token cost, not invocation count.

sequenceDiagram
    participant APP as MangaAssist<br/>ECS Fargate
    participant BED as Bedrock<br/>Converse API
    participant PCACHE as Prompt Cache<br/>(Provider-Managed)
    participant MODEL as Claude 3 Sonnet

    Note over APP,MODEL: First request — cache WRITE
    APP->>BED: converse(system=[{text: "...", cachePoint: {type: default}}], messages=[...])
    BED->>PCACHE: Hash system prompt → cache key
    PCACHE-->>BED: MISS
    BED->>MODEL: Process full system prompt (2,200 tokens) + user message
    MODEL-->>BED: Response
    BED->>PCACHE: Store processed system prompt
    BED-->>APP: Response + usage{cacheWriteInputTokens: 2200}

    Note over APP,MODEL: Subsequent requests — cache READ
    APP->>BED: converse(system=[{text: "...", cachePoint: {type: default}}], messages=[...])
    BED->>PCACHE: Hash system prompt → cache key
    PCACHE-->>BED: HIT — return processed prefix
    BED->>MODEL: Cached prefix + user message (skip re-processing 2,200 tokens)
    MODEL-->>BED: Response (faster TTFT)
    BED-->>APP: Response + usage{cacheReadInputTokens: 2200}

Prompt Caching Cost Impact

Metric Without Prompt Cache With Prompt Cache Savings
System prompt tokens processed/day 2,200 x 1M = 2.2B tokens 2,200 x ~50K unique cache slots = 110M tokens 95% reduction
Input token cost for system prompt $6,600/day $660/day $5,940/day
Time-to-first-token (TTFT) ~450ms ~120ms 73% faster
Monthly savings ~$178,200

Prompt Caching Implementation

import boto3
import json
import logging
from typing import Optional

logger = logging.getLogger(__name__)


class BedrockPromptCacheClient:
    """
    Wrapper around Bedrock Converse API with prompt caching enabled.
    Tracks cache hit/miss metrics for observability.
    """

    # MangaAssist system prompt — identical for all 1M daily messages
    SYSTEM_PROMPT_TEXT = (
        "You are MangaAssist, a friendly and knowledgeable Japanese manga "
        "retail assistant for a major e-commerce platform. You help customers "
        "discover manga, track orders, understand shipping options, and get "
        "personalized recommendations.\n\n"
        "## Guardrails\n"
        "- Never discuss non-manga topics beyond polite deflection\n"
        "- Never reveal internal pricing logic or supplier information\n"
        "- Always respond in the customer's language (Japanese or English)\n"
        "- Flag potential age-restricted content with appropriate warnings\n"
        "- Never fabricate manga titles, ISBNs, or release dates\n\n"
        "## Output Format\n"
        "- Use markdown for structured responses\n"
        "- Include manga title in both English and Japanese when available\n"
        "- Always cite source (catalog, FAQ, order system) for factual claims\n"
        "- Keep responses under 300 words unless the customer asks for detail\n\n"
        "## Domain Knowledge\n"
        "- You have access to a product catalog of 45,000+ manga titles\n"
        "- You can look up order status, shipping estimates, and return policies\n"
        "- You understand manga genres: shonen, shojo, seinen, josei, kodomo\n"
        "- You know major publishers: Shueisha, Kodansha, Shogakukan, Kadokawa\n"
        "- You are aware of seasonal release patterns and major manga events"
    )

    def __init__(self, region: str = "us-east-1"):
        self.client = boto3.client("bedrock-runtime", region_name=region)
        self._cache_read_total = 0
        self._cache_write_total = 0
        self._invocations = 0

    def invoke(
        self,
        user_message: str,
        rag_context: str,
        session_history: list[dict],
        model_id: str = "anthropic.claude-3-sonnet-20240229-v1:0",
        max_tokens: int = 1024,
        temperature: float = 0.3,
    ) -> dict:
        """
        Invoke Bedrock with prompt caching enabled.
        The system prompt is cached across invocations.
        """
        system = [
            {
                "text": self.SYSTEM_PROMPT_TEXT,
                "cachePoint": {"type": "default"},
            }
        ]

        # Build message list from session history + current turn
        messages = []
        for turn in session_history[-6:]:
            messages.append(turn)

        # Current user turn with RAG context
        user_content = f"## Retrieved Context\n{rag_context}\n\n## Customer Question\n{user_message}"
        messages.append({
            "role": "user",
            "content": [{"text": user_content}],
        })

        response = self.client.converse(
            modelId=model_id,
            system=system,
            messages=messages,
            inferenceConfig={
                "maxTokens": max_tokens,
                "temperature": temperature,
            },
        )

        usage = response.get("usage", {})
        cache_read = usage.get("cacheReadInputTokens", 0)
        cache_write = usage.get("cacheWriteInputTokens", 0)

        self._invocations += 1
        self._cache_read_total += cache_read
        self._cache_write_total += cache_write

        result = {
            "response_text": response["output"]["message"]["content"][0]["text"],
            "input_tokens": usage.get("inputTokens", 0),
            "output_tokens": usage.get("outputTokens", 0),
            "cache_read_tokens": cache_read,
            "cache_write_tokens": cache_write,
            "prompt_cache_hit": cache_read > 0,
            "stop_reason": response.get("stopReason", "unknown"),
        }

        logger.info(
            "Bedrock invocation: cache_hit=%s, cache_read=%d, cache_write=%d, "
            "input=%d, output=%d",
            result["prompt_cache_hit"],
            cache_read,
            cache_write,
            result["input_tokens"],
            result["output_tokens"],
        )

        return result

    @property
    def prompt_cache_hit_rate(self) -> float:
        """Fraction of invocations where prompt cache was read."""
        if self._invocations == 0:
            return 0.0
        # If cache_read_total > 0 for an invocation, it's a hit
        # Approximation: cache_read_total / (expected_system_tokens * invocations)
        expected_total = 2200 * self._invocations
        return min(self._cache_read_total / expected_total, 1.0) if expected_total else 0.0

Cache Warming Strategies

Cache warming pre-populates the semantic cache with high-probability queries so that the first user to ask a question gets a cache hit instead of a cold-start miss.

Warming Strategy Matrix

Strategy Trigger Volume MangaAssist Example
Top-N FAQ Warming Daily cron (6:00 AM JST) Top 100 FAQ queries "How do I return a manga?", "What's your shipping policy?"
New Release Warming Catalog update event 10–20 queries per new title "When does [title] release?", "[title] price", "[title] review"
Seasonal Warming Calendar trigger 50–100 queries "Holiday shipping deadlines", "Gift wrapping available?"
Post-Invalidation Warming After cache flush Re-populate invalidated entries After price change, re-cache updated product info

Cache Warming Implementation

import json
import logging
import time
from typing import Optional

import boto3

logger = logging.getLogger(__name__)


class CacheWarmer:
    """
    Pre-populates the semantic cache with high-probability queries.
    Runs as a scheduled ECS task or triggered by EventBridge events.
    """

    # Top 100 MangaAssist FAQ queries — curated from production logs
    TOP_FAQ_QUERIES = [
        {"query": "How do I return a manga?", "intent": "return_policy"},
        {"query": "What is your shipping policy?", "intent": "shipping_info"},
        {"query": "Do you ship internationally?", "intent": "shipping_info"},
        {"query": "How long does shipping take?", "intent": "shipping_info"},
        {"query": "Can I track my order?", "intent": "order_status"},
        {"query": "Do you have gift wrapping?", "intent": "faq"},
        {"query": "What payment methods do you accept?", "intent": "faq"},
        {"query": "How do I cancel my order?", "intent": "faq"},
        {"query": "Do you sell digital manga?", "intent": "faq"},
        {"query": "What genres do you carry?", "intent": "faq"},
        # ... remaining 90 queries
    ]

    def __init__(
        self,
        semantic_cache,  # SemanticCacheManager instance
        bedrock_client,  # BedrockPromptCacheClient instance
        rag_retriever,   # RAG retrieval function
    ):
        self.cache = semantic_cache
        self.bedrock = bedrock_client
        self.retriever = rag_retriever

    def warm_faq_cache(self) -> dict:
        """
        Pre-populate cache with top FAQ queries.
        Called daily at 6:00 AM JST via EventBridge scheduled rule.
        """
        results = {"warmed": 0, "skipped": 0, "errors": 0}

        for item in self.TOP_FAQ_QUERIES:
            query = item["query"]
            intent = item["intent"]

            try:
                # Check if already cached (avoid redundant Bedrock calls)
                existing = self.cache.get(query=query, intent=intent)
                if existing is not None:
                    results["skipped"] += 1
                    continue

                # Generate fresh response via full RAG pipeline
                rag_context = self.retriever(query)
                response = self.bedrock.invoke(
                    user_message=query,
                    rag_context=rag_context,
                    session_history=[],
                )

                # Store in cache
                self.cache.put(
                    query=query,
                    response=response["response_text"],
                    intent=intent,
                    entities={},
                    confidence=1.0,
                )
                results["warmed"] += 1

                # Rate limit to avoid Bedrock throttling
                time.sleep(0.1)

            except Exception as e:
                logger.error("Failed to warm cache for '%s': %s", query, e)
                results["errors"] += 1

        logger.info("Cache warming complete: %s", results)
        return results

    def warm_new_release(self, title: str, volume: Optional[str] = None) -> dict:
        """
        Pre-populate cache with common queries about a new manga release.
        Triggered by catalog update event via EventBridge.
        """
        volume_str = f" volume {volume}" if volume else ""
        query_templates = [
            f"When does {title}{volume_str} release?",
            f"How much is {title}{volume_str}?",
            f"Is {title}{volume_str} available for pre-order?",
            f"{title}{volume_str} release date",
            f"Do you have {title}{volume_str}?",
        ]

        # Add Japanese variants if title has a known JP mapping
        # (lookup from DynamoDB title_mapping table)

        results = {"warmed": 0, "errors": 0}
        for query in query_templates:
            try:
                rag_context = self.retriever(query)
                response = self.bedrock.invoke(
                    user_message=query,
                    rag_context=rag_context,
                    session_history=[],
                )
                self.cache.put(
                    query=query,
                    response=response["response_text"],
                    intent="product_info",
                    entities={"title": title, "volume": volume or ""},
                    confidence=1.0,
                )
                results["warmed"] += 1
                time.sleep(0.1)
            except Exception as e:
                logger.error("Warming failed for '%s': %s", query, e)
                results["errors"] += 1

        return results

Full RedisSemanticCache Implementation

import hashlib
import json
import logging
import time
from dataclasses import dataclass, field
from typing import Any, Optional

import boto3
import numpy as np
import redis

logger = logging.getLogger(__name__)


INTENT_TTL_MAP = {
    "product_info": 3600,
    "manga_release_date": 3600,
    "recommendation": 14400,
    "shipping_info": 7200,
    "faq": 86400,
    "order_status": 0,
    "greeting": 86400,
    "return_policy": 43200,
    "manga_search": 1800,
}

INTENT_THRESHOLD_MAP = {
    "faq": 0.88,
    "shipping_info": 0.88,
    "return_policy": 0.88,
    "greeting": 0.85,
    "manga_release_date": 0.93,
    "product_info": 0.93,
    "manga_search": 0.94,
    "recommendation": 0.96,
}

DEFAULT_THRESHOLD = 0.92
DEFAULT_TTL = 1800


@dataclass
class SemanticCacheConfig:
    """Configuration for the semantic cache."""
    redis_url: str
    embedding_model_id: str = "amazon.titan-embed-text-v2:0"
    embedding_dim: int = 1024
    default_threshold: float = 0.92
    l1_max_size: int = 5_000
    l1_default_ttl: int = 300
    max_redis_memory_mb: int = 4096
    index_name: str = "mangaassist_cache_idx"
    key_prefix: str = "sc:"
    enable_l1: bool = True
    enable_metrics: bool = True


class RedisSemanticCache:
    """
    Production-grade semantic cache for MangaAssist.

    Features:
    - L1 in-memory exact-match cache (per ECS container)
    - L2 Redis vector similarity cache (shared across containers)
    - Intent-aware similarity thresholds
    - Intent-aware TTLs
    - Cache hit promotion (L2 hit → L1 write)
    - Memory budget enforcement
    - Comprehensive metrics emission

    Architecture:
        User Query → Normalize → L1 Lookup → L2 VSS Lookup → (miss) → Bedrock
                                    ↓ hit        ↓ hit
                                  Return       Promote to L1 + Return
    """

    def __init__(self, config: SemanticCacheConfig):
        self.config = config
        self.redis = redis.Redis.from_url(
            config.redis_url,
            decode_responses=False,
            socket_timeout=5,
            socket_connect_timeout=5,
            retry_on_timeout=True,
        )
        self.bedrock = boto3.client("bedrock-runtime", region_name="us-east-1")

        # L1 cache
        self._l1: dict[str, dict[str, Any]] = {}

        # Metrics
        self._metrics = {
            "l1_hits": 0,
            "l1_misses": 0,
            "l2_hits": 0,
            "l2_misses": 0,
            "l2_false_positives_reported": 0,
            "embed_calls": 0,
            "embed_latency_ms_total": 0,
            "cache_writes": 0,
            "cache_evictions": 0,
            "total_lookups": 0,
        }

        self._ensure_index()

    # ── Index Management ──

    def _ensure_index(self) -> None:
        """Create RediSearch vector index if not present."""
        try:
            self.redis.execute_command("FT.INFO", self.config.index_name)
            logger.info("Redis index '%s' exists", self.config.index_name)
        except redis.ResponseError:
            self._create_index()

    def _create_index(self) -> None:
        """Create HNSW vector index on Redis."""
        cmd = [
            "FT.CREATE", self.config.index_name,
            "ON", "HASH",
            "PREFIX", "1", self.config.key_prefix,
            "SCHEMA",
            "vec", "VECTOR", "HNSW", "8",
                "TYPE", "FLOAT32",
                "DIM", str(self.config.embedding_dim),
                "DISTANCE_METRIC", "COSINE",
                "EF_CONSTRUCTION", "200",
            "query", "TEXT",
            "response", "TEXT", "NOINDEX",
            "intent", "TAG",
            "model", "TAG",
            "lang", "TAG",
            "ts", "NUMERIC", "SORTABLE",
            "hits", "NUMERIC", "SORTABLE",
            "conf", "NUMERIC",
            "ents", "TEXT", "NOINDEX",
        ]
        self.redis.execute_command(*cmd)
        logger.info("Created Redis vector index '%s'", self.config.index_name)

    # ── Embedding ──

    def _embed(self, text: str) -> list[float]:
        """Generate embedding via Titan Embeddings V2."""
        start = time.monotonic()
        resp = self.bedrock.invoke_model(
            modelId=self.config.embedding_model_id,
            body=json.dumps({"inputText": text}),
        )
        vec = json.loads(resp["body"].read())["embedding"]
        elapsed = (time.monotonic() - start) * 1000
        self._metrics["embed_calls"] += 1
        self._metrics["embed_latency_ms_total"] += elapsed
        return vec

    # ── Normalization ──

    @staticmethod
    def _normalize(text: str) -> str:
        """Normalize query text for consistent caching."""
        import re
        t = text.lower().strip()
        t = re.sub(r"\s+", " ", t)
        t = re.sub(r"[^\w\s\-\u3000-\u9fff\uf900-\ufaff]", "", t)
        return t

    @staticmethod
    def _l1_key(normalized: str, intent: str) -> str:
        return hashlib.md5(f"{normalized}|{intent}".encode()).hexdigest()

    # ── L1 Operations ──

    def _l1_get(self, normalized: str, intent: str) -> Optional[str]:
        key = self._l1_key(normalized, intent)
        entry = self._l1.get(key)
        if entry and time.time() < entry["exp"]:
            self._metrics["l1_hits"] += 1
            return entry["resp"]
        if entry:
            del self._l1[key]
        self._metrics["l1_misses"] += 1
        return None

    def _l1_put(self, normalized: str, intent: str, response: str, ttl: int) -> None:
        if not self.config.enable_l1:
            return
        key = self._l1_key(normalized, intent)
        if len(self._l1) >= self.config.l1_max_size:
            oldest = min(self._l1, key=lambda k: self._l1[k]["exp"])
            del self._l1[oldest]
            self._metrics["cache_evictions"] += 1
        self._l1[key] = {"resp": response, "exp": time.time() + ttl}

    # ── L2 Operations ──

    def _l2_search(
        self,
        embedding: list[float],
        intent: str,
        language: str = "en",
    ) -> Optional[dict]:
        """Vector similarity search with intent and language pre-filtering."""
        threshold = INTENT_THRESHOLD_MAP.get(intent, DEFAULT_THRESHOLD)
        vec_bytes = np.array(embedding, dtype=np.float32).tobytes()

        # Build filter: intent + language
        filter_expr = f"@intent:{{{intent}}} @lang:{{{language}}}"
        query = f"({filter_expr})=>[KNN 3 @vec $blob AS dist]"

        try:
            results = self.redis.execute_command(
                "FT.SEARCH", self.config.index_name, query,
                "PARAMS", "2", "blob", vec_bytes,
                "SORTBY", "dist", "ASC",
                "LIMIT", "0", "1",
                "RETURN", "5", "response", "query", "dist", "hits", "conf",
                "DIALECT", "2",
            )
        except redis.ResponseError as e:
            logger.error("L2 search error: %s", e)
            self._metrics["l2_misses"] += 1
            return None

        if results[0] == 0:
            self._metrics["l2_misses"] += 1
            return None

        doc_key = results[1]
        fields = self._parse_hash_fields(results[2])
        distance = float(fields.get("dist", 2.0))
        similarity = 1.0 - (distance / 2.0)

        if similarity < threshold:
            self._metrics["l2_misses"] += 1
            logger.debug(
                "L2 below threshold: sim=%.4f < threshold=%.4f for intent=%s",
                similarity, threshold, intent,
            )
            return None

        self._metrics["l2_hits"] += 1
        # Increment hit counter
        self.redis.hincrby(doc_key, "hits", 1)

        return {
            "response": fields["response"],
            "original_query": fields.get("query", ""),
            "similarity": similarity,
            "confidence": float(fields.get("conf", 0)),
            "hit_count": int(fields.get("hits", 0)) + 1,
        }

    def _l2_store(
        self,
        normalized: str,
        response: str,
        embedding: list[float],
        intent: str,
        model_id: str,
        confidence: float,
        entities: dict,
        language: str,
        ttl: int,
    ) -> None:
        """Store entry in Redis with vector embedding and TTL."""
        uid = hashlib.sha256(
            f"{normalized}|{intent}|{time.time()}".encode()
        ).hexdigest()[:16]
        key = f"{self.config.key_prefix}{uid}"
        vec_bytes = np.array(embedding, dtype=np.float32).tobytes()

        self.redis.hset(key, mapping={
            "vec": vec_bytes,
            "query": normalized,
            "response": response,
            "intent": intent,
            "model": model_id,
            "lang": language,
            "ts": time.time(),
            "hits": 0,
            "conf": confidence,
            "ents": json.dumps(entities, ensure_ascii=False),
        })
        if ttl > 0:
            self.redis.expire(key, ttl)
        self._metrics["cache_writes"] += 1

    # ── Public API ──

    def lookup(
        self,
        query: str,
        intent: str,
        language: str = "en",
    ) -> Optional[dict]:
        """
        Look up a query in L1 then L2.

        Returns:
            dict with keys: response, source, similarity, original_query
            None if cache miss on all tiers
        """
        self._metrics["total_lookups"] += 1

        # Never cache order status or account queries
        if intent in ("order_status", "account_info"):
            return None

        normalized = self._normalize(query)

        # L1
        if self.config.enable_l1:
            l1_resp = self._l1_get(normalized, intent)
            if l1_resp is not None:
                return {
                    "response": l1_resp,
                    "source": "L1_MEMORY",
                    "similarity": 1.0,
                }

        # L2
        embedding = self._embed(normalized)
        l2_result = self._l2_search(embedding, intent, language)
        if l2_result is not None:
            # Promote to L1
            ttl = INTENT_TTL_MAP.get(intent, DEFAULT_TTL)
            self._l1_put(normalized, intent, l2_result["response"], min(ttl, 300))
            return {
                "response": l2_result["response"],
                "source": "L2_SEMANTIC",
                "similarity": l2_result["similarity"],
                "original_query": l2_result["original_query"],
            }

        return None

    def store(
        self,
        query: str,
        response: str,
        intent: str,
        entities: dict[str, str],
        model_id: str = "anthropic.claude-3-sonnet-20240229-v1:0",
        confidence: float = 1.0,
        language: str = "en",
    ) -> None:
        """Store a fresh response in L1 + L2."""
        if intent in ("order_status", "account_info"):
            return  # Never cache these

        ttl = INTENT_TTL_MAP.get(intent, DEFAULT_TTL)
        if ttl == 0:
            return  # TTL=0 means "do not cache"

        normalized = self._normalize(query)

        # L1
        self._l1_put(normalized, intent, response, min(ttl, 300))

        # L2
        embedding = self._embed(normalized)
        self._l2_store(
            normalized=normalized,
            response=response,
            embedding=embedding,
            intent=intent,
            model_id=model_id,
            confidence=confidence,
            entities=entities,
            language=language,
            ttl=ttl,
        )

    def report_false_positive(self, query: str, intent: str) -> None:
        """
        Called when a user flags a cached response as incorrect.
        Removes the matching entry and adjusts metrics.
        """
        self._metrics["l2_false_positives_reported"] += 1
        normalized = self._normalize(query)

        # Remove from L1
        key = self._l1_key(normalized, intent)
        self._l1.pop(key, None)

        # Note: L2 removal requires finding the exact Redis key,
        # which would need a secondary index or scan. In production,
        # flagged entries are logged and batch-removed by a cleanup job.
        logger.warning("False positive reported: query='%s', intent='%s'", query, intent)

    def get_metrics(self) -> dict:
        """Return comprehensive cache metrics."""
        total = self._metrics["total_lookups"]
        l1h = self._metrics["l1_hits"]
        l2h = self._metrics["l2_hits"]
        embed_calls = self._metrics["embed_calls"]

        return {
            **self._metrics,
            "l1_hit_rate": l1h / total if total else 0,
            "l2_hit_rate": l2h / total if total else 0,
            "aggregate_hit_rate": (l1h + l2h) / total if total else 0,
            "avg_embed_latency_ms": (
                self._metrics["embed_latency_ms_total"] / embed_calls
                if embed_calls else 0
            ),
        }

    # ── Helpers ──

    @staticmethod
    def _parse_hash_fields(raw_fields: list) -> dict[str, str]:
        """Parse Redis HASH field pairs into a dict."""
        d = {}
        for i in range(0, len(raw_fields), 2):
            k = raw_fields[i].decode() if isinstance(raw_fields[i], bytes) else raw_fields[i]
            v = raw_fields[i + 1].decode() if isinstance(raw_fields[i + 1], bytes) else raw_fields[i + 1]
            d[k] = v
        return d

Cache Hit vs Cache Miss — Sequence Diagram

sequenceDiagram
    participant U as User
    participant GW as API Gateway<br/>WebSocket
    participant ECS as ECS Fargate<br/>Orchestrator
    participant L1 as L1 In-Memory
    participant EMB as Titan Embeddings V2
    participant L2 as ElastiCache Redis<br/>Semantic Cache
    participant RAG as OpenSearch<br/>Vector Store
    participant BED as Bedrock Claude 3
    participant DDB as DynamoDB<br/>Session Store

    Note over U,DDB: Scenario A — Cache HIT (L2 Semantic)
    U->>GW: "When's One Piece 108 out?"
    GW->>ECS: Route to container
    ECS->>ECS: Normalize → "one piece 108 release date"
    ECS->>ECS: Classify intent → manga_release_date
    ECS->>L1: Exact-match lookup
    L1-->>ECS: MISS
    ECS->>EMB: Embed normalized query
    EMB-->>ECS: 1024-dim vector (8ms)
    ECS->>L2: FT.SEARCH with vector + intent filter
    L2-->>ECS: HIT (sim=0.96, original="one piece volume 108 release")
    ECS->>L1: Promote to L1 for future exact hits
    ECS->>DDB: Log cache hit event
    ECS-->>GW: Cached response (total: 35ms)
    GW-->>U: "One Piece Vol. 108 releases on..."

    Note over U,DDB: Scenario B — Cache MISS (full pipeline)
    U->>GW: "Is Chainsaw Man Part 2 getting an anime?"
    GW->>ECS: Route to container
    ECS->>ECS: Normalize + classify → manga_news (not cached)
    ECS->>L1: MISS
    ECS->>EMB: Embed query
    EMB-->>ECS: 1024-dim vector (8ms)
    ECS->>L2: FT.SEARCH
    L2-->>ECS: MISS (best match sim=0.81 < 0.92 threshold)
    ECS->>DDB: Load session history
    DDB-->>ECS: Last 3 turns
    ECS->>RAG: Vector search for relevant context
    RAG-->>ECS: Top 5 chunks (120ms)
    ECS->>BED: Converse API (system prompt cached by L3)
    BED-->>ECS: Generated response (2400ms)
    ECS->>L2: Store response + embedding
    ECS->>L1: Store exact-match entry
    ECS->>DDB: Save to session history
    ECS-->>GW: Fresh response (total: 2650ms)
    GW-->>U: "As of now, Chainsaw Man Part 2..."

Key Takeaways

  1. Titan Embeddings V2 is the right model for cache keys — lowest latency (8ms), lowest cost ($0.00002/1K tokens), good multilingual support for JP+EN, and sufficient quality for binary hit/miss decisions.
  2. 0.92 cosine similarity threshold balances precision and recall — but intent-specific overrides are essential. FAQ at 0.88, manga title queries at 0.93, recommendations at 0.96.
  3. Recommendation queries should include user segment in the cache key — otherwise you serve User A's recommendations to User B.
  4. Prompt caching (L3) and semantic caching (L2) are complementary — L3 reduces per-token cost for every request; L2 eliminates entire invocations. Together they cut monthly Bedrock cost by ~60%.
  5. Cache warming is not optional — without it, every morning has a cold-start period where the first user to ask each common question gets 2.8s latency instead of 35ms.
  6. Never cache order_status or account_info intents — these are personalized, real-time, and caching them creates data leakage risks.