LOCAL PREVIEW View on GitHub

Intelligent Caching Architecture for GenAI Applications

AWS AIP-C01 Task 4.1 — Skill 4.1.4: Design intelligent caching systems for FM applications Context: MangaAssist e-commerce chatbot — Bedrock Claude 3 (Sonnet/Haiku), OpenSearch Serverless, DynamoDB, ECS Fargate, API Gateway WebSocket, ElastiCache Redis. 1M messages/day.


Skill Mapping

Certification Domain Task Skill
AWS AIP-C01 Domain 4 — Operational Efficiency Task 4.1 — Optimize FM applications Skill 4.1.4 — Design intelligent caching systems to reduce FM invocation cost and latency

Skill scope: Architect multi-tier caching layers — from in-process exact-match through semantic similarity to edge delivery — that reduce Bedrock invocation volume, lower p95 latency, and cut per-message cost while guaranteeing answer freshness for a high-traffic manga retail chatbot.


Mind Map — Intelligent Caching Dimensions

mindmap
  root((Intelligent<br/>Caching))
    Cache Tiers
      L1 — In-Memory
        Python LRU / TTLCache
        Exact Query Match
        Sub-ms Latency
        Per-Container Scope
      L2 — Semantic Cache
        ElastiCache Redis
        Vector Similarity Search
        Cosine Threshold 0.92
        Cross-Container Shared
      L3 — Prompt Cache
        Bedrock Prompt Caching
        System Prompt Reuse
        Up to 100K Tokens
        Model-Provider Managed
      L4 — Edge Cache
        CloudFront Distribution
        FAQ Responses
        Product Info Pages
        Global Low Latency
    Cache Key Strategies
      Exact Match Hashing
        Normalized Query Text
        Lowercase + Stopword Removal
        Deterministic Hash
      Semantic Fingerprinting
        Intent + Entities + Model
        Sorted Entity Keys
        SHA-256 Digest
      Embedding Similarity
        Titan Embeddings V2
        1024-Dim Vectors
        Cosine Distance
    Invalidation
      TTL-Based
        Product Info 1hr
        Recommendations 4hr
        FAQ 24hr
      Event-Driven
        Price Change Webhook
        Inventory Update
        Catalog Refresh
      Version-Based
        Prompt Version Tag
        Model Version Change
        System Config Update
    Observability
      Hit Rate per Tier
      Latency Saved
      Cost Avoidance
      Staleness Incidents
      Memory Utilization

Multi-Tier Caching Architecture

MangaAssist processes 1M messages/day. Without caching, every message invokes Bedrock ($0.003 input + $0.015 output per 1K tokens on Sonnet). A 40% aggregate cache hit rate saves ~$18,000/month and drops p95 latency from 2.8s to 180ms for cached responses.

Tier Overview

graph TD
    subgraph Client
        U[User Message via WebSocket]
    end

    subgraph "API Gateway + ECS Fargate"
        N[Normalize Query]
        L1[L1 — In-Memory Cache<br/>Python TTLCache<br/>Exact Match]
        L2[L2 — Semantic Cache<br/>ElastiCache Redis<br/>Cosine ≥ 0.92]
        L3[L3 — Prompt Cache<br/>Bedrock Prompt Caching<br/>System Prompt Reuse]
        ORCH[Orchestrator<br/>RAG + Bedrock Invoke]
        STORE[Cache Store<br/>Write-Back to L1 + L2]
    end

    subgraph "Edge"
        L4[L4 — CloudFront Edge Cache<br/>FAQ & Product Pages<br/>TTL 5–60 min]
    end

    subgraph "AWS Services"
        BED[Bedrock Claude 3 Sonnet/Haiku]
        OS[OpenSearch Serverless<br/>Vector Store]
        DDB[DynamoDB<br/>Session History]
        EMB[Bedrock Titan Embeddings V2]
    end

    U -->|Static FAQ / Product| L4
    L4 -->|MISS| N
    U -->|Conversational| N
    N --> L1
    L1 -->|HIT| U
    L1 -->|MISS| L2
    L2 -->|HIT| U
    L2 -->|MISS| L3
    L3 -->|System prompt cached| BED
    L3 -->|MISS| ORCH
    ORCH --> OS
    ORCH --> DDB
    ORCH --> BED
    BED --> STORE
    STORE --> L1
    STORE --> L2
    STORE --> U

    style L1 fill:#2d6a4f,stroke:#1b4332,color:#fff
    style L2 fill:#e76f51,stroke:#f4a261,color:#fff
    style L3 fill:#264653,stroke:#2a9d8f,color:#fff
    style L4 fill:#457b9d,stroke:#1d3557,color:#fff

L1 — In-Memory Exact-Match Cache

The fastest tier. Each ECS container maintains a local Python dictionary with TTL expiry. Only exact normalized-query matches return a hit.

Why it works for MangaAssist: Popular queries repeat verbatim — "what's new this week", "track my order", "do you ship to Osaka". These account for ~12% of traffic.

import hashlib
import time
from threading import Lock
from typing import Optional, Any


class L1InMemoryCache:
    """
    Per-container in-memory cache with TTL.
    Designed for exact-match lookups on normalized queries.
    MangaAssist: ~12% hit rate on verbatim repeated queries.
    """

    def __init__(self, max_size: int = 10_000, default_ttl: int = 300):
        self._cache: dict[str, dict[str, Any]] = {}
        self._max_size = max_size
        self._default_ttl = default_ttl
        self._lock = Lock()
        self._hits = 0
        self._misses = 0

    def _make_key(self, normalized_query: str, intent: str) -> str:
        """Deterministic cache key from normalized query + intent."""
        raw = f"{normalized_query}|{intent}"
        return hashlib.sha256(raw.encode("utf-8")).hexdigest()

    def get(self, normalized_query: str, intent: str) -> Optional[str]:
        key = self._make_key(normalized_query, intent)
        with self._lock:
            entry = self._cache.get(key)
            if entry is None:
                self._misses += 1
                return None
            if time.time() > entry["expires_at"]:
                del self._cache[key]
                self._misses += 1
                return None
            self._hits += 1
            return entry["response"]

    def put(
        self,
        normalized_query: str,
        intent: str,
        response: str,
        ttl: Optional[int] = None,
    ) -> None:
        key = self._make_key(normalized_query, intent)
        ttl = ttl or self._default_ttl
        with self._lock:
            if len(self._cache) >= self._max_size:
                self._evict_oldest()
            self._cache[key] = {
                "response": response,
                "expires_at": time.time() + ttl,
                "created_at": time.time(),
            }

    def _evict_oldest(self) -> None:
        """Remove the entry closest to expiration."""
        oldest_key = min(self._cache, key=lambda k: self._cache[k]["expires_at"])
        del self._cache[oldest_key]

    @property
    def hit_rate(self) -> float:
        total = self._hits + self._misses
        return self._hits / total if total > 0 else 0.0

L2 — Semantic Cache with ElastiCache Redis

The core caching tier. Queries are embedded with Titan Embeddings V2, then matched against cached embeddings using Redis vector similarity search. A cosine similarity >= 0.92 returns the cached response.

Semantic Caching Flow

sequenceDiagram
    participant U as User
    participant N as Normalizer
    participant E as Titan Embeddings V2
    participant R as ElastiCache Redis<br/>(RediSearch VSS)
    participant B as Bedrock Claude 3

    U->>N: "When does One Piece vol 108 come out?"
    N->>N: normalize → "one piece vol 108 release date"
    N->>E: Embed normalized query
    E-->>N: 1024-dim vector
    N->>R: FT.SEARCH idx @embedding:[VECTOR_RANGE 0.08 $vec]
    alt Cosine similarity ≥ 0.92
        R-->>U: Return cached response + metadata
        Note over R,U: Cache HIT — 15ms total
    else No match above threshold
        R-->>N: No match
        N->>B: Full RAG pipeline invocation
        B-->>N: Generated response
        N->>R: Store embedding + response + metadata
        N-->>U: Return fresh response
        Note over B,U: Cache MISS — 2800ms total
    end

Result Fingerprinting

For deterministic cache keys that go beyond embedding similarity, MangaAssist uses result fingerprinting — a hash of the structured intent, sorted entities, and model ID.

import hashlib
import json
import re
from typing import Optional


# Common Japanese/English stopwords for manga queries
STOPWORDS = {
    "the", "a", "an", "is", "are", "was", "were", "do", "does", "did",
    "what", "when", "where", "how", "can", "could", "please", "i", "my",
    "me", "tell", "show", "about", "this", "that", "の", "は", "が", "を",
    "に", "で", "と", "も", "か", "よ", "ね",
}


class CacheKeyGenerator:
    """
    Generates deterministic cache keys for MangaAssist queries.
    Two strategies: exact-match hash and result fingerprint.
    """

    @staticmethod
    def normalize_query(raw_query: str) -> str:
        """
        Normalize query for exact-match hashing:
        lowercase, strip punctuation, remove stopwords, sort remaining tokens.
        """
        text = raw_query.lower().strip()
        # Remove punctuation except hyphens (for manga titles like 'Jujutsu-Kaisen')
        text = re.sub(r"[^\w\s\-]", "", text)
        tokens = text.split()
        filtered = [t for t in tokens if t not in STOPWORDS]
        return " ".join(sorted(filtered))

    @staticmethod
    def exact_match_key(normalized_query: str) -> str:
        """SHA-256 of normalized query text."""
        return hashlib.sha256(normalized_query.encode("utf-8")).hexdigest()

    @staticmethod
    def result_fingerprint(
        intent: str,
        entities: dict[str, str],
        model_id: str = "anthropic.claude-3-sonnet-20240229-v1:0",
    ) -> str:
        """
        Deterministic fingerprint: hash(intent + sorted_entities + model_id).
        Used when the same structured intent+entities should always return
        the same cached response regardless of phrasing.

        Example:
            intent = "manga_release_date"
            entities = {"title": "One Piece", "volume": "108"}
            → consistent key for any phrasing asking about OP vol 108 release
        """
        sorted_entities = json.dumps(entities, sort_keys=True, ensure_ascii=False)
        raw = f"{intent}|{sorted_entities}|{model_id}"
        return hashlib.sha256(raw.encode("utf-8")).hexdigest()

    @staticmethod
    def ttl_for_intent(intent: str) -> int:
        """
        Intent-aware TTL selection for MangaAssist.
        Product info changes more often than FAQ answers.
        """
        ttl_map = {
            "product_info": 3600,          # 1 hour — prices/stock change
            "manga_release_date": 3600,    # 1 hour — dates can shift
            "recommendation": 14400,       # 4 hours — personalized, less volatile
            "shipping_info": 7200,         # 2 hours — policy changes are rare
            "faq": 86400,                  # 24 hours — highly stable
            "order_status": 0,             # Never cache — always real-time
            "greeting": 86400,             # 24 hours — static
        }
        return ttl_map.get(intent, 1800)   # Default 30 minutes

L3 — Bedrock Prompt Caching

Bedrock's built-in prompt caching feature caches the system prompt prefix across invocations. When MangaAssist sends the same system prompt (persona instructions, guardrails, output format) with every request, Bedrock can skip re-processing that prefix.

How Prompt Caching Works

Aspect Detail
What is cached System prompt + any static prefix (up to 100K tokens)
Cache scope Same model, same account, same region
Latency saving Up to 85% reduction in time-to-first-token for cached prefix
Cost saving Cached tokens billed at reduced rate (~90% discount on input tokens)
Cache lifetime 5 minutes of inactivity before eviction
MangaAssist system prompt ~2,200 tokens (persona + guardrails + format instructions + manga domain context)

MangaAssist Prompt Structure for Caching

import boto3
import json


def invoke_with_prompt_caching(
    user_message: str,
    rag_context: str,
    session_history: list[dict],
) -> dict:
    """
    Invoke Bedrock Claude 3 Sonnet with prompt caching enabled.
    The system prompt (2,200 tokens) is cached across invocations.
    Only user message + RAG context + session history vary per request.
    """
    client = boto3.client("bedrock-runtime", region_name="us-east-1")

    # This system prompt is identical across all 1M daily messages.
    # Bedrock caches it after the first invocation.
    system_prompt = [
        {
            "text": (
                "You are MangaAssist, a friendly and knowledgeable Japanese manga "
                "retail assistant for a major e-commerce platform. You help customers "
                "discover manga, track orders, understand shipping options, and get "
                "personalized recommendations.\n\n"
                "## Guardrails\n"
                "- Never discuss non-manga topics beyond polite deflection\n"
                "- Never reveal internal pricing logic or supplier information\n"
                "- Always respond in the customer's language (Japanese or English)\n"
                "- Flag potential age-restricted content with appropriate warnings\n"
                "- Never fabricate manga titles, ISBNs, or release dates\n\n"
                "## Output Format\n"
                "- Use markdown for structured responses\n"
                "- Include manga title in both English and Japanese when available\n"
                "- Always cite source (catalog, FAQ, order system) for factual claims\n"
                "- Keep responses under 300 words unless the customer asks for detail"
            ),
            "cachePoint": {"type": "default"},  # <-- Enables prompt caching
        }
    ]

    # Dynamic content — changes per request, not cached
    messages = []
    for turn in session_history[-6:]:  # Last 3 turns
        messages.append(turn)
    messages.append({
        "role": "user",
        "content": [
            {"text": f"## Retrieved Context\n{rag_context}\n\n## Customer Question\n{user_message}"}
        ],
    })

    response = client.converse(
        modelId="anthropic.claude-3-sonnet-20240229-v1:0",
        system=system_prompt,
        messages=messages,
        inferenceConfig={
            "maxTokens": 1024,
            "temperature": 0.3,
        },
    )

    # Check cache usage in response metadata
    usage = response.get("usage", {})
    cache_read_tokens = usage.get("cacheReadInputTokens", 0)
    cache_write_tokens = usage.get("cacheWriteInputTokens", 0)

    return {
        "response": response["output"]["message"]["content"][0]["text"],
        "cache_read_tokens": cache_read_tokens,
        "cache_write_tokens": cache_write_tokens,
        "total_input_tokens": usage.get("inputTokens", 0),
        "output_tokens": usage.get("outputTokens", 0),
    }

L4 — CloudFront Edge Cache

Static and semi-static responses — FAQs, product catalog pages, shipping policy — are cached at CloudFront edge locations. These bypass the chatbot pipeline entirely.

Edge Cache Design

Content Type CloudFront TTL Cache-Control Header Invalidation Trigger
FAQ answers (top 50) 60 minutes public, max-age=3600, s-maxage=3600 FAQ content update in CMS
Product info pages 15 minutes public, max-age=900, s-maxage=900 Price or stock change event
Shipping policy 24 hours public, max-age=86400 Policy document update
Manga cover images 7 days public, max-age=604800, immutable Never (immutable assets)
Order status 0 (no cache) private, no-store N/A — always real-time

Edge Cache Architecture

graph LR
    subgraph "Edge (CloudFront PoPs)"
        CF[CloudFront Distribution]
        CF_CACHE[(Edge Cache<br/>FAQ + Product Pages)]
    end

    subgraph "Origin (ECS Fargate)"
        ALB[Application Load Balancer]
        FAQ_SVC[FAQ Service<br/>Pre-rendered Answers]
        PROD_SVC[Product API<br/>Catalog Data]
    end

    subgraph "Invalidation"
        EB[EventBridge]
        LAMBDA[Lambda<br/>Cache Invalidator]
    end

    USER[User / Client] --> CF
    CF --> CF_CACHE
    CF_CACHE -->|MISS| ALB
    ALB --> FAQ_SVC
    ALB --> PROD_SVC
    EB -->|price_changed| LAMBDA
    EB -->|faq_updated| LAMBDA
    LAMBDA -->|CreateInvalidation| CF

    style CF fill:#457b9d,stroke:#1d3557,color:#fff
    style CF_CACHE fill:#a8dadc,stroke:#457b9d,color:#1d3557

SemanticCacheManager — Full Implementation

import hashlib
import json
import time
import logging
from typing import Optional, Any
from dataclasses import dataclass, field

import boto3
import redis
import numpy as np

logger = logging.getLogger(__name__)


@dataclass
class CacheEntry:
    """A single cached response with metadata."""
    query_text: str
    response_text: str
    embedding: list[float]
    intent: str
    model_id: str
    confidence: float
    created_at: float
    ttl: int
    entities: dict[str, str] = field(default_factory=dict)
    hit_count: int = 0


class SemanticCacheManager:
    """
    Multi-tier cache manager for MangaAssist.
    L1: In-memory exact match (per container)
    L2: ElastiCache Redis with vector similarity search

    Usage:
        cache = SemanticCacheManager(
            redis_url="rediss://manga-cache.xxxxx.use1.cache.amazonaws.com:6379",
            similarity_threshold=0.92,
        )
        # Try cache first
        result = await cache.get(query="When does One Piece 108 release?", intent="manga_release_date")
        if result is None:
            response = invoke_bedrock(...)
            await cache.put(query=..., response=response, intent=..., entities=...)
    """

    EMBEDDING_DIM = 1024  # Titan Embeddings V2 dimension
    INDEX_NAME = "manga_cache_idx"
    KEY_PREFIX = "cache:"

    def __init__(
        self,
        redis_url: str,
        similarity_threshold: float = 0.92,
        l1_max_size: int = 5_000,
        l1_default_ttl: int = 300,
        embedding_model_id: str = "amazon.titan-embed-text-v2:0",
    ):
        self.redis_client = redis.Redis.from_url(
            redis_url, decode_responses=False
        )
        self.similarity_threshold = similarity_threshold
        self.embedding_model_id = embedding_model_id
        self.bedrock_client = boto3.client("bedrock-runtime", region_name="us-east-1")

        # L1 in-memory cache
        self._l1_cache: dict[str, dict[str, Any]] = {}
        self._l1_max_size = l1_max_size
        self._l1_default_ttl = l1_default_ttl

        # Metrics
        self._metrics = {
            "l1_hits": 0, "l1_misses": 0,
            "l2_hits": 0, "l2_misses": 0,
            "total_requests": 0,
        }

        self._ensure_redis_index()

    def _ensure_redis_index(self) -> None:
        """Create RediSearch vector similarity index if it doesn't exist."""
        try:
            self.redis_client.execute_command("FT.INFO", self.INDEX_NAME)
            logger.info("Redis vector index already exists")
        except redis.ResponseError:
            schema = (
                "FT.CREATE", self.INDEX_NAME, "ON", "HASH",
                "PREFIX", "1", self.KEY_PREFIX,
                "SCHEMA",
                "embedding", "VECTOR", "HNSW", "6",
                    "TYPE", "FLOAT32",
                    "DIM", str(self.EMBEDDING_DIM),
                    "DISTANCE_METRIC", "COSINE",
                "query_text", "TEXT",
                "response_text", "TEXT",
                "intent", "TAG",
                "model_id", "TAG",
                "created_at", "NUMERIC", "SORTABLE",
                "ttl", "NUMERIC",
                "hit_count", "NUMERIC", "SORTABLE",
                "confidence", "NUMERIC",
                "entities_json", "TEXT",
            )
            self.redis_client.execute_command(*schema)
            logger.info("Created Redis vector index: %s", self.INDEX_NAME)

    def _embed_query(self, text: str) -> list[float]:
        """Generate embedding using Titan Embeddings V2."""
        response = self.bedrock_client.invoke_model(
            modelId=self.embedding_model_id,
            body=json.dumps({"inputText": text}),
        )
        body = json.loads(response["body"].read())
        return body["embedding"]

    def _normalize(self, query: str) -> str:
        """Normalize query for consistent matching."""
        import re
        text = query.lower().strip()
        text = re.sub(r"[^\w\s\-]", "", text)
        return text

    # ── L1: In-Memory Exact Match ──

    def _l1_get(self, normalized_query: str) -> Optional[str]:
        entry = self._l1_cache.get(normalized_query)
        if entry is None:
            self._metrics["l1_misses"] += 1
            return None
        if time.time() > entry["expires_at"]:
            del self._l1_cache[normalized_query]
            self._metrics["l1_misses"] += 1
            return None
        self._metrics["l1_hits"] += 1
        return entry["response"]

    def _l1_put(self, normalized_query: str, response: str, ttl: int) -> None:
        if len(self._l1_cache) >= self._l1_max_size:
            oldest = min(self._l1_cache, key=lambda k: self._l1_cache[k]["expires_at"])
            del self._l1_cache[oldest]
        self._l1_cache[normalized_query] = {
            "response": response,
            "expires_at": time.time() + ttl,
        }

    # ── L2: Redis Semantic Cache ──

    def _l2_search(self, embedding: list[float], intent: str) -> Optional[dict]:
        """
        Vector similarity search in Redis.
        Returns the best match above the similarity threshold.
        """
        query_vector = np.array(embedding, dtype=np.float32).tobytes()

        # KNN search with intent filter for precision
        query = (
            f"(@intent:{{{intent}}})=>"
            f"[KNN 3 @embedding $vec AS similarity]"
        )

        try:
            results = self.redis_client.execute_command(
                "FT.SEARCH", self.INDEX_NAME, query,
                "PARAMS", "2", "vec", query_vector,
                "SORTBY", "similarity", "ASC",  # COSINE: lower = more similar
                "LIMIT", "0", "1",
                "RETURN", "4", "response_text", "query_text", "similarity", "hit_count",
                "DIALECT", "2",
            )

            if results[0] == 0:
                return None

            doc_fields = results[2]
            field_dict = {}
            for i in range(0, len(doc_fields), 2):
                key = doc_fields[i].decode() if isinstance(doc_fields[i], bytes) else doc_fields[i]
                val = doc_fields[i + 1].decode() if isinstance(doc_fields[i + 1], bytes) else doc_fields[i + 1]
                field_dict[key] = val

            # COSINE distance: 0 = identical, 2 = opposite
            # Convert to similarity: 1 - (distance / 2)
            distance = float(field_dict.get("similarity", 2.0))
            similarity = 1.0 - (distance / 2.0)

            if similarity >= self.similarity_threshold:
                self._metrics["l2_hits"] += 1
                # Increment hit count
                doc_key = results[1].decode() if isinstance(results[1], bytes) else results[1]
                self.redis_client.hincrby(doc_key, "hit_count", 1)
                return {
                    "response": field_dict["response_text"],
                    "original_query": field_dict["query_text"],
                    "similarity": similarity,
                }

            self._metrics["l2_misses"] += 1
            return None

        except redis.ResponseError as e:
            logger.error("Redis search failed: %s", e)
            self._metrics["l2_misses"] += 1
            return None

    def _l2_store(self, entry: CacheEntry) -> None:
        """Store a cache entry in Redis with vector embedding."""
        fingerprint = hashlib.sha256(
            f"{entry.query_text}|{entry.intent}|{time.time()}".encode()
        ).hexdigest()[:16]

        key = f"{self.KEY_PREFIX}{fingerprint}"
        vector_bytes = np.array(entry.embedding, dtype=np.float32).tobytes()

        self.redis_client.hset(key, mapping={
            "embedding": vector_bytes,
            "query_text": entry.query_text,
            "response_text": entry.response_text,
            "intent": entry.intent,
            "model_id": entry.model_id,
            "confidence": entry.confidence,
            "created_at": entry.created_at,
            "ttl": entry.ttl,
            "hit_count": 0,
            "entities_json": json.dumps(entry.entities, ensure_ascii=False),
        })
        self.redis_client.expire(key, entry.ttl)

    # ── Public API ──

    def get(self, query: str, intent: str) -> Optional[dict]:
        """
        Try L1 (exact match) then L2 (semantic match).
        Returns dict with 'response', 'source', and metadata, or None.
        """
        self._metrics["total_requests"] += 1
        normalized = self._normalize(query)

        # L1: exact match
        l1_result = self._l1_get(normalized)
        if l1_result is not None:
            return {"response": l1_result, "source": "L1_MEMORY", "similarity": 1.0}

        # L2: semantic match
        embedding = self._embed_query(normalized)
        l2_result = self._l2_search(embedding, intent)
        if l2_result is not None:
            # Promote to L1 for future exact matches
            self._l1_put(normalized, l2_result["response"], self._l1_default_ttl)
            return {
                "response": l2_result["response"],
                "source": "L2_SEMANTIC",
                "similarity": l2_result["similarity"],
                "original_query": l2_result["original_query"],
            }

        return None

    def put(
        self,
        query: str,
        response: str,
        intent: str,
        entities: dict[str, str],
        model_id: str = "anthropic.claude-3-sonnet-20240229-v1:0",
        confidence: float = 1.0,
    ) -> None:
        """Store response in both L1 and L2."""
        normalized = self._normalize(query)
        ttl = CacheKeyGenerator.ttl_for_intent(intent)

        # L1
        self._l1_put(normalized, response, min(ttl, self._l1_default_ttl))

        # L2
        embedding = self._embed_query(normalized)
        entry = CacheEntry(
            query_text=normalized,
            response_text=response,
            embedding=embedding,
            intent=intent,
            model_id=model_id,
            confidence=confidence,
            created_at=time.time(),
            ttl=ttl,
            entities=entities,
        )
        self._l2_store(entry)

    def get_metrics(self) -> dict:
        """Return cache performance metrics."""
        total = self._metrics["total_requests"]
        return {
            **self._metrics,
            "l1_hit_rate": self._metrics["l1_hits"] / total if total else 0,
            "l2_hit_rate": self._metrics["l2_hits"] / total if total else 0,
            "aggregate_hit_rate": (
                (self._metrics["l1_hits"] + self._metrics["l2_hits"]) / total
                if total else 0
            ),
        }

Cache Invalidation Strategies

Strategy Matrix

Strategy Trigger Scope Latency MangaAssist Use Case
TTL-Based Time expiry Per-entry Zero (passive) Product info (1hr), recommendations (4hr), FAQ (24hr)
Event-Driven Business event Per-intent or per-entity Seconds (via EventBridge) Price change invalidates all product_info entries for that title
Version-Based Config change Global or per-model Immediate Prompt version bump flushes all L2 entries for that model
Manual Operator action Targeted Immediate Emergency purge of incorrect cached response

Event-Driven Invalidation Flow

sequenceDiagram
    participant CMS as Catalog CMS
    participant EB as EventBridge
    participant INV as Invalidation Lambda
    participant REDIS as ElastiCache Redis
    participant CF as CloudFront

    CMS->>EB: PriceChanged event<br/>{title: "One Piece 108", new_price: 528}
    EB->>INV: Route to invalidation handler
    INV->>REDIS: Scan keys by intent=product_info<br/>+ entity title="One Piece 108"
    REDIS-->>INV: Found 14 cached entries
    INV->>REDIS: DEL matching keys
    INV->>CF: CreateInvalidation /api/products/one-piece-108*
    INV-->>EB: Invalidation complete<br/>{entries_removed: 14, cf_invalidation_id: "I3X7..."}

Version-Based Invalidation

import time
import logging

logger = logging.getLogger(__name__)


class CacheVersionManager:
    """
    Manages cache versions tied to system prompt and model versions.
    When a version changes, all entries for that version are invalidated.
    """

    VERSION_KEY = "cache:version:current"
    VERSION_HISTORY_KEY = "cache:version:history"

    def __init__(self, redis_client):
        self.redis = redis_client

    def get_current_version(self) -> str:
        version = self.redis.get(self.VERSION_KEY)
        return version.decode() if version else "v1.0.0"

    def bump_version(
        self,
        new_version: str,
        reason: str,
        flush_cache: bool = True,
    ) -> dict:
        """
        Bump cache version. Optionally flush all cached entries.
        Called when system prompt or model version changes.
        """
        old_version = self.get_current_version()

        # Record version history
        self.redis.rpush(self.VERSION_HISTORY_KEY, f"{new_version}|{reason}|{time.time()}")

        # Update current version
        self.redis.set(self.VERSION_KEY, new_version)

        flushed_count = 0
        if flush_cache:
            # Scan and delete all cache entries
            cursor = 0
            while True:
                cursor, keys = self.redis.scan(cursor, match="cache:*", count=500)
                if keys:
                    # Don't delete version management keys
                    cache_keys = [
                        k for k in keys
                        if not k.decode().startswith("cache:version:")
                    ]
                    if cache_keys:
                        self.redis.delete(*cache_keys)
                        flushed_count += len(cache_keys)
                if cursor == 0:
                    break

        logger.info(
            "Cache version bumped %s%s (reason: %s, flushed: %d entries)",
            old_version, new_version, reason, flushed_count,
        )

        return {
            "old_version": old_version,
            "new_version": new_version,
            "reason": reason,
            "entries_flushed": flushed_count,
        }

Cache Tier Comparison

Cache Tier Hit Rate (MangaAssist) Latency (Hit) Latency (Miss Overhead) Monthly Cost Savings Staleness Risk Best For
L1 — In-Memory ~12% < 1ms 0ms (no overhead) ~$2,160 Low (short TTL) Verbatim repeated queries ("track my order")
L2 — Semantic Redis ~28% 15–25ms 8ms (embedding generation) ~$5,040 Medium (similarity may drift) Paraphrased manga queries, FAQ variations
L3 — Prompt Cache ~95% (system prompt) N/A (reduces TTFT) 0ms ~$8,100 None (exact match) System prompt reuse across all requests
L4 — Edge Cache ~60% (for eligible content) 5–15ms 0ms ~$1,800 Low–Medium (TTL-based) FAQ pages, product catalog, shipping info
Aggregate ~40% overall Avg 12ms ~$17,100/mo

Cost Calculation Basis

Metric Value
Daily messages 1,000,000
Avg input tokens/request 1,800 (system prompt + context + query)
Avg output tokens/request 350
Sonnet input cost $0.003 / 1K tokens
Sonnet output cost $0.015 / 1K tokens
Monthly Bedrock cost (no cache) ~$45,000
Monthly Bedrock cost (with caching) ~$27,900
Net monthly savings ~$17,100

Key Takeaways

  1. L1 + L2 together achieve ~40% cache hit rate — reducing both latency (2.8s to 15ms for hits) and Bedrock invocation cost by ~38%.
  2. Semantic caching is the highest-value tier — paraphrased manga queries ("When does OP 108 come out?" vs "One Piece volume 108 release date?") cluster well in embedding space.
  3. Prompt caching (L3) is essentially free money — Bedrock caches the identical system prompt across all 1M daily messages at a 90% input token discount.
  4. Event-driven invalidation is critical for correctness — a stale cached price during a flash sale creates real business damage and customer complaints.
  5. Order status and account-specific queries must never be cached — these are personalized, real-time, and caching them would leak data across users.