LOCAL PREVIEW View on GitHub

DynamoDB + Caching (DAX & ElastiCache Redis) — Low-Level Design

Project: MangaAssist (Amazon-style chatbot) Scope: When and how to cache DynamoDB data, DAX vs Redis trade-offs, cache invalidation patterns, and production code.


1. Why Cache in Front of DynamoDB?

DynamoDB is already fast (single-digit millisecond). But in a chatbot serving 50K+ concurrent sessions: - Hot sessions get read repeatedly (load context on every message) - Read amplification: 2-3 DynamoDB reads per user message × 1000 msg/sec = 3000 RCU - Cost: On-demand RCU costs add up at scale - P99 latency: DynamoDB's 5-10ms is great, but DAX's 0.2ms is better for real-time chat


2. Cache Architecture Decision Tree

flowchart TD
    START{"Need to cache<br/>DynamoDB data?"}

    START -->|"Yes"| WHAT{"What kind of<br/>access pattern?"}

    WHAT -->|"Same DynamoDB API calls<br/>(GetItem, Query)"| DAX_PATH["Use DAX<br/>(DynamoDB Accelerator)"]
    WHAT -->|"Custom data structures<br/>(leaderboards, pub/sub, complex logic)"| REDIS_PATH["Use ElastiCache Redis"]
    WHAT -->|"Both"| BOTH["DAX for reads +<br/>Redis for computed state"]

    DAX_PATH --> DAX_WHEN{"When specifically?"}
    DAX_WHEN -->|"Read-heavy, same key patterns"| DAX_YES["✅ DAX"]
    DAX_WHEN -->|"Write-heavy, consistency critical"| DAX_NO["❌ Skip DAX"]

    REDIS_PATH --> REDIS_WHEN{"When specifically?"}
    REDIS_WHEN -->|"Session state, connection mapping,<br/>rate limiting, pub/sub"| REDIS_YES["✅ Redis"]
    REDIS_WHEN -->|"Just caching DynamoDB reads"| REDIS_NO["❌ Use DAX instead"]

    subgraph MangaAssist_Decision["MangaAssist Decision"]
        D1["DAX: Cache session META and recent turns<br/>(microsecond reads on hot sessions)"]
        D2["Redis: Connection ID mapping,<br/>rate limiting, online status"]
    end

    DAX_YES --> D1
    REDIS_YES --> D2

3. DAX (DynamoDB Accelerator) — Deep Dive

flowchart LR
    subgraph Application
        LAMBDA["Lambda Function"]
    end

    subgraph DAX_Cluster["DAX Cluster (3 nodes)"]
        DAX_PRIMARY["Primary Node<br/>(reads + writes)"]
        DAX_REPLICA1["Replica Node 1<br/>(reads only)"]
        DAX_REPLICA2["Replica Node 2<br/>(reads only)"]
    end

    subgraph DynamoDB
        TABLE[(manga-assist-sessions)]
    end

    LAMBDA -->|"GetItem / Query"| DAX_PRIMARY
    LAMBDA -->|"GetItem / Query"| DAX_REPLICA1
    LAMBDA -->|"GetItem / Query"| DAX_REPLICA2

    DAX_PRIMARY -->|"Cache MISS → read from DynamoDB"| TABLE
    DAX_PRIMARY -->|"PutItem / UpdateItem → write-through"| TABLE

    subgraph Cache_Types["Two Cache Layers in DAX"]
        ITEM_CACHE["Item Cache<br/>- Caches GetItem/BatchGetItem results<br/>- Key: PK + SK<br/>- TTL: 5 minutes default"]
        QUERY_CACHE["Query Cache<br/>- Caches Query/Scan results<br/>- Key: full query parameters<br/>- TTL: 5 minutes default"]
    end

DAX Client Code

"""
DAX integration — drop-in replacement for DynamoDB resource.
The ONLY code change: swap the client initialization.
"""

import os
import amazondax  # pip install amazon-dax-client
import boto3

# ── Configuration ──
DAX_ENDPOINT = os.environ.get("DAX_ENDPOINT")  # e.g., "dax://my-cluster.abc123.dax-clusters.us-east-1.amazonaws.com:8111"
USE_DAX = bool(DAX_ENDPOINT)

# ── Initialize the right client ──
if USE_DAX:
    # DAX client — API-compatible with boto3 DynamoDB resource
    dax_client = amazondax.AmazonDaxClient.resource(
        endpoint_url=DAX_ENDPOINT,
        region_name=os.environ.get("AWS_REGION", "us-east-1"),
    )
    table = dax_client.Table(os.environ["SESSION_TABLE"])
else:
    # Fallback to standard DynamoDB
    dynamodb = boto3.resource("dynamodb")
    table = dynamodb.Table(os.environ["SESSION_TABLE"])


def load_session_context(session_id: str, turn_limit: int = 20) -> dict:
    """
    Load session context — transparently uses DAX if configured.
    DAX cache hit: ~0.2ms | DynamoDB direct: ~5ms
    """
    pk = f"SESSION#{session_id}"

    # This Query goes through DAX's query cache
    # Second call with same parameters returns in microseconds
    meta_resp = table.get_item(
        Key={"PK": pk, "SK": "META"},
    )

    turns_resp = table.query(
        KeyConditionExpression="PK = :pk AND begins_with(SK, :prefix)",
        ExpressionAttributeValues={":pk": pk, ":prefix": "TURN#"},
        ScanIndexForward=False,
        Limit=turn_limit,
    )

    return {
        "meta": meta_resp.get("Item"),
        "turns": list(reversed(turns_resp.get("Items", []))),
    }


def write_turn_through_dax(session_id: str, turn_data: dict):
    """
    Write through DAX — DAX forwards to DynamoDB AND updates its item cache.
    The query cache for this session is NOT automatically invalidated.
    """
    table.put_item(Item={
        "PK": f"SESSION#{session_id}",
        **turn_data,
    })
    # ⚠️ DAX query cache may still serve stale turn list for up to TTL seconds
    # This is acceptable for chat context — next message will see updated data

DAX Cache Behavior Diagram

sequenceDiagram
    participant L as Lambda
    participant D as DAX
    participant DB as DynamoDB

    Note over L,DB: Scenario 1 — Cache MISS (first read)
    L->>D: GetItem(SESSION#abc, META)
    D->>D: Check item cache → MISS
    D->>DB: GetItem(SESSION#abc, META)
    DB-->>D: Return item
    D->>D: Store in item cache (TTL: 5min)
    D-->>L: Return item (latency: ~5ms)

    Note over L,DB: Scenario 2 — Cache HIT (subsequent reads)
    L->>D: GetItem(SESSION#abc, META)
    D->>D: Check item cache → HIT
    D-->>L: Return cached item (latency: ~0.2ms)

    Note over L,DB: Scenario 3 — Write-through
    L->>D: PutItem(SESSION#abc, TURN#123)
    D->>DB: PutItem(SESSION#abc, TURN#123)
    DB-->>D: 200 OK
    D->>D: Update item cache for this key
    D->>D: ⚠️ Query cache NOT invalidated
    D-->>L: 200 OK

    Note over L,DB: Scenario 4 — Stale query cache
    L->>D: Query(PK=SESSION#abc, SK begins_with TURN#)
    D->>D: Check query cache → HIT (stale!)
    D-->>L: Return cached results (missing latest turn)
    Note over L: Acceptable for chat — <br/>cached for at most 5 min

4. ElastiCache Redis — Connection and Session State

flowchart TB
    subgraph Redis_Use_Cases["What Redis Does (NOT DynamoDB)"]
        R1["Connection ID ↔ Session ID mapping<br/>(ephemeral, high-frequency lookups)"]
        R2["Rate limiting per customer<br/>(sliding window counter)"]
        R3["Online presence tracking<br/>(who is currently chatting)"]
        R4["Session lock for concurrent writes<br/>(distributed lock)"]
        R5["Recent context pre-fetch cache<br/>(warm cache for active sessions)"]
    end

    subgraph DynamoDB_Does["What DynamoDB Does (NOT Redis)"]
        D1["Durable session state<br/>(source of truth)"]
        D2["Turn history<br/>(persistent, auditable)"]
        D3["Summaries<br/>(generated, persisted)"]
        D4["Customer session index<br/>(GSI queries)"]
    end

    subgraph Flow["Data Flow"]
        LAMBDA["Lambda Handler"]
        LAMBDA -->|"Ephemeral state"| REDIS[(Redis)]
        LAMBDA -->|"Durable state"| DDB[(DynamoDB)]

        REDIS -.-|"Cache warm-up:<br/>read from DDB, store in Redis"| DDB
    end

Redis Integration Code

"""
ElastiCache Redis integration for ephemeral session state.
Redis complements DynamoDB — it does NOT replace it.
"""

import json
import os
import time
import redis

# ── Redis Connection (module-level for Lambda reuse) ──
redis_client = redis.Redis(
    host=os.environ["REDIS_ENDPOINT"],
    port=6379,
    ssl=True,
    decode_responses=True,
    socket_connect_timeout=2,
    socket_timeout=2,
    retry_on_timeout=True,
)

# ── Key Prefixes ──
CONN_PREFIX = "conn:"        # connection_id → session_id
SESSION_PREFIX = "session:"  # session_id → cached context
RATE_PREFIX = "rate:"        # customer rate limiting
ONLINE_PREFIX = "online:"    # online presence set


# ────────────────────────────────────────────────────────
# Connection ID Mapping (WebSocket)
# ────────────────────────────────────────────────────────

def map_connection_to_session(connection_id: str, session_id: str):
    """Store bidirectional mapping. Expires when WebSocket typically times out."""
    pipe = redis_client.pipeline()
    pipe.setex(f"{CONN_PREFIX}{connection_id}", 7200, session_id)  # 2hr TTL
    pipe.setex(f"session_conn:{session_id}", 7200, connection_id)
    pipe.execute()


def get_session_for_connection(connection_id: str) -> str | None:
    """Look up session_id from connection_id. O(1), sub-millisecond."""
    return redis_client.get(f"{CONN_PREFIX}{connection_id}")


def remove_connection_mapping(connection_id: str):
    """Clean up on disconnect."""
    session_id = redis_client.get(f"{CONN_PREFIX}{connection_id}")
    if session_id:
        pipe = redis_client.pipeline()
        pipe.delete(f"{CONN_PREFIX}{connection_id}")
        pipe.delete(f"session_conn:{session_id}")
        pipe.execute()


# ────────────────────────────────────────────────────────
# Rate Limiting (Sliding Window)
# ────────────────────────────────────────────────────────

def check_rate_limit(customer_id: str, max_requests: int = 30, window_seconds: int = 60) -> bool:
    """
    Sliding window rate limiter using Redis sorted set.
    Returns True if request is allowed, False if rate limited.
    """
    key = f"{RATE_PREFIX}{customer_id}"
    now = time.time()
    window_start = now - window_seconds

    pipe = redis_client.pipeline()
    # Remove entries outside the window
    pipe.zremrangebyscore(key, 0, window_start)
    # Count entries in the current window
    pipe.zcard(key)
    # Add the current request
    pipe.zadd(key, {str(now): now})
    # Set expiry on the key itself
    pipe.expire(key, window_seconds)
    results = pipe.execute()

    current_count = results[1]
    return current_count < max_requests


# ────────────────────────────────────────────────────────
# Context Pre-fetch Cache
# ────────────────────────────────────────────────────────

def cache_session_context(session_id: str, context: dict, ttl_seconds: int = 300):
    """
    Cache the assembled context (turns + summary) in Redis.
    Saves 2 DynamoDB reads on the next message in this session.
    """
    redis_client.setex(
        f"{SESSION_PREFIX}{session_id}:context",
        ttl_seconds,
        json.dumps(context, default=str),
    )


def get_cached_context(session_id: str) -> dict | None:
    """
    Try to get cached context from Redis before hitting DynamoDB.
    Returns None on cache miss.
    """
    cached = redis_client.get(f"{SESSION_PREFIX}{session_id}:context")
    if cached:
        return json.loads(cached)
    return None


# ────────────────────────────────────────────────────────
# Distributed Lock (for concurrent write protection)
# ────────────────────────────────────────────────────────

def acquire_session_lock(session_id: str, lock_ttl: int = 10) -> bool:
    """
    Prevent two Lambda invocations from writing to the same session simultaneously.
    Uses Redis SET NX (set-if-not-exists) as a distributed lock.
    """
    lock_key = f"lock:session:{session_id}"
    acquired = redis_client.set(lock_key, "1", nx=True, ex=lock_ttl)
    return bool(acquired)


def release_session_lock(session_id: str):
    redis_client.delete(f"lock:session:{session_id}")

5. Cache-Aside Pattern with DynamoDB + Redis

sequenceDiagram
    participant L as Lambda
    participant R as Redis
    participant D as DynamoDB

    Note over L,D: Read Path (Cache-Aside)
    L->>R: GET session:abc:context
    alt Cache HIT
        R-->>L: Return cached context (0.5ms)
    else Cache MISS
        R-->>L: null
        L->>D: Query turns + summary (5-10ms)
        D-->>L: Return fresh data
        L->>R: SETEX session:abc:context (TTL 5min)
        R-->>L: OK
    end

    Note over L,D: Write Path (Write-Through + Invalidate)
    L->>D: PutItem new turn
    D-->>L: 200 OK
    L->>R: DEL session:abc:context
    R-->>L: OK
    Note over L: Next read will rebuild cache with new turn

Cache-Aside Implementation

"""
Complete cache-aside pattern: Redis cache + DynamoDB source of truth.
"""

import json
import os
import zlib
import boto3
import redis

dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ["SESSION_TABLE"])
redis_client = redis.Redis(
    host=os.environ["REDIS_ENDPOINT"], port=6379, ssl=True, decode_responses=True
)

CACHE_TTL = 300  # 5 minutes


def get_session_context(session_id: str) -> dict:
    """
    Try Redis first → fallback to DynamoDB → populate cache.
    """
    cache_key = f"session:{session_id}:context"

    # ── Step 1: Check Redis cache ──
    cached = redis_client.get(cache_key)
    if cached:
        return json.loads(cached)

    # ── Step 2: Cache miss → Read from DynamoDB ──
    pk = f"SESSION#{session_id}"

    meta_resp = table.get_item(Key={"PK": pk, "SK": "META"})
    meta = meta_resp.get("Item", {})

    turns_resp = table.query(
        KeyConditionExpression="PK = :pk AND begins_with(SK, :prefix)",
        ExpressionAttributeValues={":pk": pk, ":prefix": "TURN#"},
        ScanIndexForward=False,
        Limit=20,
    )
    turns = turns_resp.get("Items", [])

    summary_resp = table.query(
        KeyConditionExpression="PK = :pk AND begins_with(SK, :prefix)",
        ExpressionAttributeValues={":pk": pk, ":prefix": "SUMMARY#"},
        ScanIndexForward=False,
        Limit=1,
    )
    summaries = summary_resp.get("Items", [])

    # Decompress turn content
    decoded_turns = []
    for t in reversed(turns):
        content = t.get("content_compressed", b"")
        if isinstance(content, bytes) and content:
            text = zlib.decompress(content).decode("utf-8")
        else:
            text = str(content)
        decoded_turns.append({"role": t["role"], "content": text, "sk": t["SK"]})

    context = {
        "meta": {k: v for k, v in meta.items() if k not in ("PK", "SK")},
        "turns": decoded_turns,
        "summary": summaries[0].get("summary_text") if summaries else None,
    }

    # ── Step 3: Store in Redis cache ──
    try:
        redis_client.setex(cache_key, CACHE_TTL, json.dumps(context, default=str))
    except Exception:
        pass  # Redis failure is non-fatal — DynamoDB is source of truth

    return context


def invalidate_session_cache(session_id: str):
    """Call after every write to DynamoDB for this session."""
    try:
        redis_client.delete(f"session:{session_id}:context")
    except Exception:
        pass  # Non-fatal

6. DAX vs Redis — Decision Matrix

quadrantChart
    title DAX vs Redis Decision for MangaAssist
    x-axis Low Complexity --> High Complexity
    y-axis Low Performance Need --> High Performance Need

    DAX GetItem Cache: [0.2, 0.7]
    DAX Query Cache: [0.3, 0.6]
    Redis Connection Map: [0.4, 0.9]
    Redis Rate Limiting: [0.5, 0.8]
    Redis Context Cache: [0.6, 0.7]
    Redis Distributed Lock: [0.7, 0.5]
    Redis Pub/Sub: [0.8, 0.6]
Feature DAX ElastiCache Redis Winner for MangaAssist
Setup complexity Drop-in (same API as DynamoDB) New client, new data model DAX
Read latency ~200μs (microseconds) ~500μs DAX
Write-through Automatic Manual (you manage) DAX
Query cache Yes (but not invalidated on writes) Manual (you manage) Depends
Data structures None (mirrors DynamoDB) Strings, Lists, Sets, Sorted Sets, Hashes Redis
Rate limiting Not possible Sliding window with sorted sets Redis
Pub/sub Not possible Built-in Redis
Distributed locks Not possible SET NX Redis
Connection mapping Possible but overkill Perfect fit Redis
Cost (3 nodes) ~$300/month (t3.medium) ~$150/month (cache.t3.medium) Redis
VPC requirement Yes (must be in same VPC as Lambda) Yes Tie
Failure impact Falls back to DynamoDB automatically Must handle in code DAX

7. Common Mistakes Teams Make

Mistake Why It Happens Consequence Fix
Using DAX for write-heavy workloads "Caching should help writes too" DAX write-through adds latency (~2ms overhead) DAX is for read-heavy. Writes should go directly to DynamoDB
Not handling Redis failures "Redis never goes down" Lambda crashes when Redis is unavailable Wrap all Redis calls in try/except, fall back to DynamoDB
Caching DynamoDB items with TTL in Redis but not respiring Item expired in DynamoDB, Redis still serves it Stale/deleted data served to users Check DynamoDB TTL field in cached items
DAX query cache serving stale data after writes Don't understand DAX query cache invalidation User writes a turn, loads context, doesn't see their own turn Accept eventual consistency OR use ConsistentRead=True (bypasses DAX)
Redis as source of truth for session data "Redis is faster, let's just use it" Durability loss — Redis reboot = all sessions gone Redis is cache only. DynamoDB is source of truth. Always.
Not setting Redis key TTLs "We'll clean up later" Memory fills up, Redis evicts important keys Every SET must have an EX (expire) parameter
DAX cluster too small "t3.small is fine" OOM errors under peak load, cache evictions Size DAX nodes for peak working set + 20% buffer

8. Critical Things to Remember

For Interviews

  1. DAX is a read-through/write-through cache — Same DynamoDB API, transparent caching. No code changes needed for reads.

  2. DAX has TWO caches — Item cache (GetItem/BatchGetItem) and Query cache (Query/Scan). Query cache is ONLY keyed on exact query parameters.

  3. DAX query cache is NOT invalidated by writes — Writing a new turn does NOT update the query cache that lists turns. This causes stale reads for up to TTL.

  4. Redis is for computed/ephemeral state — Connection mapping, rate limiting, locks, online status. Not for duplicating DynamoDB data.

  5. Cache-aside is the safest pattern — Read from cache, miss → read from DB → populate cache. Never let the cache be the only copy.

For Production

  1. DAX cluster must be in the same VPC as Lambda — Lambda needs VPC configuration to reach DAX. This adds cold start latency (~1-2s extra for VPC attachment).

  2. Use ConsistentRead=True to bypass DAX for critical reads — When you MUST see the latest data (e.g., checking session status before handoff), bypass the cache.

  3. Redis maxmemory-policy should be allkeys-lru — When memory is full, evict least-recently-used keys across all keys. Don't let Redis crash.

  4. Monitor cache hit ratio — If DAX hit ratio drops below 80%, your access patterns have changed. Re-evaluate cache sizing or TTL.

  5. Redis failure must be graceful — Every Redis call must be wrapped in try/except. If Redis is down, the app must still work via DynamoDB directly.