DynamoDB + Caching (DAX & ElastiCache Redis) — Low-Level Design
Project: MangaAssist (Amazon-style chatbot) Scope: When and how to cache DynamoDB data, DAX vs Redis trade-offs, cache invalidation patterns, and production code.
1. Why Cache in Front of DynamoDB?
DynamoDB is already fast (single-digit millisecond). But in a chatbot serving 50K+ concurrent sessions: - Hot sessions get read repeatedly (load context on every message) - Read amplification: 2-3 DynamoDB reads per user message × 1000 msg/sec = 3000 RCU - Cost: On-demand RCU costs add up at scale - P99 latency: DynamoDB's 5-10ms is great, but DAX's 0.2ms is better for real-time chat
2. Cache Architecture Decision Tree
flowchart TD
START{"Need to cache<br/>DynamoDB data?"}
START -->|"Yes"| WHAT{"What kind of<br/>access pattern?"}
WHAT -->|"Same DynamoDB API calls<br/>(GetItem, Query)"| DAX_PATH["Use DAX<br/>(DynamoDB Accelerator)"]
WHAT -->|"Custom data structures<br/>(leaderboards, pub/sub, complex logic)"| REDIS_PATH["Use ElastiCache Redis"]
WHAT -->|"Both"| BOTH["DAX for reads +<br/>Redis for computed state"]
DAX_PATH --> DAX_WHEN{"When specifically?"}
DAX_WHEN -->|"Read-heavy, same key patterns"| DAX_YES["✅ DAX"]
DAX_WHEN -->|"Write-heavy, consistency critical"| DAX_NO["❌ Skip DAX"]
REDIS_PATH --> REDIS_WHEN{"When specifically?"}
REDIS_WHEN -->|"Session state, connection mapping,<br/>rate limiting, pub/sub"| REDIS_YES["✅ Redis"]
REDIS_WHEN -->|"Just caching DynamoDB reads"| REDIS_NO["❌ Use DAX instead"]
subgraph MangaAssist_Decision["MangaAssist Decision"]
D1["DAX: Cache session META and recent turns<br/>(microsecond reads on hot sessions)"]
D2["Redis: Connection ID mapping,<br/>rate limiting, online status"]
end
DAX_YES --> D1
REDIS_YES --> D2
3. DAX (DynamoDB Accelerator) — Deep Dive
flowchart LR
subgraph Application
LAMBDA["Lambda Function"]
end
subgraph DAX_Cluster["DAX Cluster (3 nodes)"]
DAX_PRIMARY["Primary Node<br/>(reads + writes)"]
DAX_REPLICA1["Replica Node 1<br/>(reads only)"]
DAX_REPLICA2["Replica Node 2<br/>(reads only)"]
end
subgraph DynamoDB
TABLE[(manga-assist-sessions)]
end
LAMBDA -->|"GetItem / Query"| DAX_PRIMARY
LAMBDA -->|"GetItem / Query"| DAX_REPLICA1
LAMBDA -->|"GetItem / Query"| DAX_REPLICA2
DAX_PRIMARY -->|"Cache MISS → read from DynamoDB"| TABLE
DAX_PRIMARY -->|"PutItem / UpdateItem → write-through"| TABLE
subgraph Cache_Types["Two Cache Layers in DAX"]
ITEM_CACHE["Item Cache<br/>- Caches GetItem/BatchGetItem results<br/>- Key: PK + SK<br/>- TTL: 5 minutes default"]
QUERY_CACHE["Query Cache<br/>- Caches Query/Scan results<br/>- Key: full query parameters<br/>- TTL: 5 minutes default"]
end
DAX Client Code
"""
DAX integration — drop-in replacement for DynamoDB resource.
The ONLY code change: swap the client initialization.
"""
import os
import amazondax # pip install amazon-dax-client
import boto3
# ── Configuration ──
DAX_ENDPOINT = os.environ.get("DAX_ENDPOINT") # e.g., "dax://my-cluster.abc123.dax-clusters.us-east-1.amazonaws.com:8111"
USE_DAX = bool(DAX_ENDPOINT)
# ── Initialize the right client ──
if USE_DAX:
# DAX client — API-compatible with boto3 DynamoDB resource
dax_client = amazondax.AmazonDaxClient.resource(
endpoint_url=DAX_ENDPOINT,
region_name=os.environ.get("AWS_REGION", "us-east-1"),
)
table = dax_client.Table(os.environ["SESSION_TABLE"])
else:
# Fallback to standard DynamoDB
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ["SESSION_TABLE"])
def load_session_context(session_id: str, turn_limit: int = 20) -> dict:
"""
Load session context — transparently uses DAX if configured.
DAX cache hit: ~0.2ms | DynamoDB direct: ~5ms
"""
pk = f"SESSION#{session_id}"
# This Query goes through DAX's query cache
# Second call with same parameters returns in microseconds
meta_resp = table.get_item(
Key={"PK": pk, "SK": "META"},
)
turns_resp = table.query(
KeyConditionExpression="PK = :pk AND begins_with(SK, :prefix)",
ExpressionAttributeValues={":pk": pk, ":prefix": "TURN#"},
ScanIndexForward=False,
Limit=turn_limit,
)
return {
"meta": meta_resp.get("Item"),
"turns": list(reversed(turns_resp.get("Items", []))),
}
def write_turn_through_dax(session_id: str, turn_data: dict):
"""
Write through DAX — DAX forwards to DynamoDB AND updates its item cache.
The query cache for this session is NOT automatically invalidated.
"""
table.put_item(Item={
"PK": f"SESSION#{session_id}",
**turn_data,
})
# ⚠️ DAX query cache may still serve stale turn list for up to TTL seconds
# This is acceptable for chat context — next message will see updated data
DAX Cache Behavior Diagram
sequenceDiagram
participant L as Lambda
participant D as DAX
participant DB as DynamoDB
Note over L,DB: Scenario 1 — Cache MISS (first read)
L->>D: GetItem(SESSION#abc, META)
D->>D: Check item cache → MISS
D->>DB: GetItem(SESSION#abc, META)
DB-->>D: Return item
D->>D: Store in item cache (TTL: 5min)
D-->>L: Return item (latency: ~5ms)
Note over L,DB: Scenario 2 — Cache HIT (subsequent reads)
L->>D: GetItem(SESSION#abc, META)
D->>D: Check item cache → HIT
D-->>L: Return cached item (latency: ~0.2ms)
Note over L,DB: Scenario 3 — Write-through
L->>D: PutItem(SESSION#abc, TURN#123)
D->>DB: PutItem(SESSION#abc, TURN#123)
DB-->>D: 200 OK
D->>D: Update item cache for this key
D->>D: ⚠️ Query cache NOT invalidated
D-->>L: 200 OK
Note over L,DB: Scenario 4 — Stale query cache
L->>D: Query(PK=SESSION#abc, SK begins_with TURN#)
D->>D: Check query cache → HIT (stale!)
D-->>L: Return cached results (missing latest turn)
Note over L: Acceptable for chat — <br/>cached for at most 5 min
4. ElastiCache Redis — Connection and Session State
flowchart TB
subgraph Redis_Use_Cases["What Redis Does (NOT DynamoDB)"]
R1["Connection ID ↔ Session ID mapping<br/>(ephemeral, high-frequency lookups)"]
R2["Rate limiting per customer<br/>(sliding window counter)"]
R3["Online presence tracking<br/>(who is currently chatting)"]
R4["Session lock for concurrent writes<br/>(distributed lock)"]
R5["Recent context pre-fetch cache<br/>(warm cache for active sessions)"]
end
subgraph DynamoDB_Does["What DynamoDB Does (NOT Redis)"]
D1["Durable session state<br/>(source of truth)"]
D2["Turn history<br/>(persistent, auditable)"]
D3["Summaries<br/>(generated, persisted)"]
D4["Customer session index<br/>(GSI queries)"]
end
subgraph Flow["Data Flow"]
LAMBDA["Lambda Handler"]
LAMBDA -->|"Ephemeral state"| REDIS[(Redis)]
LAMBDA -->|"Durable state"| DDB[(DynamoDB)]
REDIS -.-|"Cache warm-up:<br/>read from DDB, store in Redis"| DDB
end
Redis Integration Code
"""
ElastiCache Redis integration for ephemeral session state.
Redis complements DynamoDB — it does NOT replace it.
"""
import json
import os
import time
import redis
# ── Redis Connection (module-level for Lambda reuse) ──
redis_client = redis.Redis(
host=os.environ["REDIS_ENDPOINT"],
port=6379,
ssl=True,
decode_responses=True,
socket_connect_timeout=2,
socket_timeout=2,
retry_on_timeout=True,
)
# ── Key Prefixes ──
CONN_PREFIX = "conn:" # connection_id → session_id
SESSION_PREFIX = "session:" # session_id → cached context
RATE_PREFIX = "rate:" # customer rate limiting
ONLINE_PREFIX = "online:" # online presence set
# ────────────────────────────────────────────────────────
# Connection ID Mapping (WebSocket)
# ────────────────────────────────────────────────────────
def map_connection_to_session(connection_id: str, session_id: str):
"""Store bidirectional mapping. Expires when WebSocket typically times out."""
pipe = redis_client.pipeline()
pipe.setex(f"{CONN_PREFIX}{connection_id}", 7200, session_id) # 2hr TTL
pipe.setex(f"session_conn:{session_id}", 7200, connection_id)
pipe.execute()
def get_session_for_connection(connection_id: str) -> str | None:
"""Look up session_id from connection_id. O(1), sub-millisecond."""
return redis_client.get(f"{CONN_PREFIX}{connection_id}")
def remove_connection_mapping(connection_id: str):
"""Clean up on disconnect."""
session_id = redis_client.get(f"{CONN_PREFIX}{connection_id}")
if session_id:
pipe = redis_client.pipeline()
pipe.delete(f"{CONN_PREFIX}{connection_id}")
pipe.delete(f"session_conn:{session_id}")
pipe.execute()
# ────────────────────────────────────────────────────────
# Rate Limiting (Sliding Window)
# ────────────────────────────────────────────────────────
def check_rate_limit(customer_id: str, max_requests: int = 30, window_seconds: int = 60) -> bool:
"""
Sliding window rate limiter using Redis sorted set.
Returns True if request is allowed, False if rate limited.
"""
key = f"{RATE_PREFIX}{customer_id}"
now = time.time()
window_start = now - window_seconds
pipe = redis_client.pipeline()
# Remove entries outside the window
pipe.zremrangebyscore(key, 0, window_start)
# Count entries in the current window
pipe.zcard(key)
# Add the current request
pipe.zadd(key, {str(now): now})
# Set expiry on the key itself
pipe.expire(key, window_seconds)
results = pipe.execute()
current_count = results[1]
return current_count < max_requests
# ────────────────────────────────────────────────────────
# Context Pre-fetch Cache
# ────────────────────────────────────────────────────────
def cache_session_context(session_id: str, context: dict, ttl_seconds: int = 300):
"""
Cache the assembled context (turns + summary) in Redis.
Saves 2 DynamoDB reads on the next message in this session.
"""
redis_client.setex(
f"{SESSION_PREFIX}{session_id}:context",
ttl_seconds,
json.dumps(context, default=str),
)
def get_cached_context(session_id: str) -> dict | None:
"""
Try to get cached context from Redis before hitting DynamoDB.
Returns None on cache miss.
"""
cached = redis_client.get(f"{SESSION_PREFIX}{session_id}:context")
if cached:
return json.loads(cached)
return None
# ────────────────────────────────────────────────────────
# Distributed Lock (for concurrent write protection)
# ────────────────────────────────────────────────────────
def acquire_session_lock(session_id: str, lock_ttl: int = 10) -> bool:
"""
Prevent two Lambda invocations from writing to the same session simultaneously.
Uses Redis SET NX (set-if-not-exists) as a distributed lock.
"""
lock_key = f"lock:session:{session_id}"
acquired = redis_client.set(lock_key, "1", nx=True, ex=lock_ttl)
return bool(acquired)
def release_session_lock(session_id: str):
redis_client.delete(f"lock:session:{session_id}")
5. Cache-Aside Pattern with DynamoDB + Redis
sequenceDiagram
participant L as Lambda
participant R as Redis
participant D as DynamoDB
Note over L,D: Read Path (Cache-Aside)
L->>R: GET session:abc:context
alt Cache HIT
R-->>L: Return cached context (0.5ms)
else Cache MISS
R-->>L: null
L->>D: Query turns + summary (5-10ms)
D-->>L: Return fresh data
L->>R: SETEX session:abc:context (TTL 5min)
R-->>L: OK
end
Note over L,D: Write Path (Write-Through + Invalidate)
L->>D: PutItem new turn
D-->>L: 200 OK
L->>R: DEL session:abc:context
R-->>L: OK
Note over L: Next read will rebuild cache with new turn
Cache-Aside Implementation
"""
Complete cache-aside pattern: Redis cache + DynamoDB source of truth.
"""
import json
import os
import zlib
import boto3
import redis
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ["SESSION_TABLE"])
redis_client = redis.Redis(
host=os.environ["REDIS_ENDPOINT"], port=6379, ssl=True, decode_responses=True
)
CACHE_TTL = 300 # 5 minutes
def get_session_context(session_id: str) -> dict:
"""
Try Redis first → fallback to DynamoDB → populate cache.
"""
cache_key = f"session:{session_id}:context"
# ── Step 1: Check Redis cache ──
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# ── Step 2: Cache miss → Read from DynamoDB ──
pk = f"SESSION#{session_id}"
meta_resp = table.get_item(Key={"PK": pk, "SK": "META"})
meta = meta_resp.get("Item", {})
turns_resp = table.query(
KeyConditionExpression="PK = :pk AND begins_with(SK, :prefix)",
ExpressionAttributeValues={":pk": pk, ":prefix": "TURN#"},
ScanIndexForward=False,
Limit=20,
)
turns = turns_resp.get("Items", [])
summary_resp = table.query(
KeyConditionExpression="PK = :pk AND begins_with(SK, :prefix)",
ExpressionAttributeValues={":pk": pk, ":prefix": "SUMMARY#"},
ScanIndexForward=False,
Limit=1,
)
summaries = summary_resp.get("Items", [])
# Decompress turn content
decoded_turns = []
for t in reversed(turns):
content = t.get("content_compressed", b"")
if isinstance(content, bytes) and content:
text = zlib.decompress(content).decode("utf-8")
else:
text = str(content)
decoded_turns.append({"role": t["role"], "content": text, "sk": t["SK"]})
context = {
"meta": {k: v for k, v in meta.items() if k not in ("PK", "SK")},
"turns": decoded_turns,
"summary": summaries[0].get("summary_text") if summaries else None,
}
# ── Step 3: Store in Redis cache ──
try:
redis_client.setex(cache_key, CACHE_TTL, json.dumps(context, default=str))
except Exception:
pass # Redis failure is non-fatal — DynamoDB is source of truth
return context
def invalidate_session_cache(session_id: str):
"""Call after every write to DynamoDB for this session."""
try:
redis_client.delete(f"session:{session_id}:context")
except Exception:
pass # Non-fatal
6. DAX vs Redis — Decision Matrix
quadrantChart
title DAX vs Redis Decision for MangaAssist
x-axis Low Complexity --> High Complexity
y-axis Low Performance Need --> High Performance Need
DAX GetItem Cache: [0.2, 0.7]
DAX Query Cache: [0.3, 0.6]
Redis Connection Map: [0.4, 0.9]
Redis Rate Limiting: [0.5, 0.8]
Redis Context Cache: [0.6, 0.7]
Redis Distributed Lock: [0.7, 0.5]
Redis Pub/Sub: [0.8, 0.6]
| Feature | DAX | ElastiCache Redis | Winner for MangaAssist |
|---|---|---|---|
| Setup complexity | Drop-in (same API as DynamoDB) | New client, new data model | DAX |
| Read latency | ~200μs (microseconds) | ~500μs | DAX |
| Write-through | Automatic | Manual (you manage) | DAX |
| Query cache | Yes (but not invalidated on writes) | Manual (you manage) | Depends |
| Data structures | None (mirrors DynamoDB) | Strings, Lists, Sets, Sorted Sets, Hashes | Redis |
| Rate limiting | Not possible | Sliding window with sorted sets | Redis |
| Pub/sub | Not possible | Built-in | Redis |
| Distributed locks | Not possible | SET NX | Redis |
| Connection mapping | Possible but overkill | Perfect fit | Redis |
| Cost (3 nodes) | ~$300/month (t3.medium) | ~$150/month (cache.t3.medium) | Redis |
| VPC requirement | Yes (must be in same VPC as Lambda) | Yes | Tie |
| Failure impact | Falls back to DynamoDB automatically | Must handle in code | DAX |
7. Common Mistakes Teams Make
| Mistake | Why It Happens | Consequence | Fix |
|---|---|---|---|
| Using DAX for write-heavy workloads | "Caching should help writes too" | DAX write-through adds latency (~2ms overhead) | DAX is for read-heavy. Writes should go directly to DynamoDB |
| Not handling Redis failures | "Redis never goes down" | Lambda crashes when Redis is unavailable | Wrap all Redis calls in try/except, fall back to DynamoDB |
| Caching DynamoDB items with TTL in Redis but not respiring | Item expired in DynamoDB, Redis still serves it | Stale/deleted data served to users | Check DynamoDB TTL field in cached items |
| DAX query cache serving stale data after writes | Don't understand DAX query cache invalidation | User writes a turn, loads context, doesn't see their own turn | Accept eventual consistency OR use ConsistentRead=True (bypasses DAX) |
| Redis as source of truth for session data | "Redis is faster, let's just use it" | Durability loss — Redis reboot = all sessions gone | Redis is cache only. DynamoDB is source of truth. Always. |
| Not setting Redis key TTLs | "We'll clean up later" | Memory fills up, Redis evicts important keys | Every SET must have an EX (expire) parameter |
| DAX cluster too small | "t3.small is fine" | OOM errors under peak load, cache evictions | Size DAX nodes for peak working set + 20% buffer |
8. Critical Things to Remember
For Interviews
-
DAX is a read-through/write-through cache — Same DynamoDB API, transparent caching. No code changes needed for reads.
-
DAX has TWO caches — Item cache (GetItem/BatchGetItem) and Query cache (Query/Scan). Query cache is ONLY keyed on exact query parameters.
-
DAX query cache is NOT invalidated by writes — Writing a new turn does NOT update the query cache that lists turns. This causes stale reads for up to TTL.
-
Redis is for computed/ephemeral state — Connection mapping, rate limiting, locks, online status. Not for duplicating DynamoDB data.
-
Cache-aside is the safest pattern — Read from cache, miss → read from DB → populate cache. Never let the cache be the only copy.
For Production
-
DAX cluster must be in the same VPC as Lambda — Lambda needs VPC configuration to reach DAX. This adds cold start latency (~1-2s extra for VPC attachment).
-
Use
ConsistentRead=Trueto bypass DAX for critical reads — When you MUST see the latest data (e.g., checking session status before handoff), bypass the cache. -
Redis
maxmemory-policyshould beallkeys-lru— When memory is full, evict least-recently-used keys across all keys. Don't let Redis crash. -
Monitor cache hit ratio — If DAX hit ratio drops below 80%, your access patterns have changed. Re-evaluate cache sizing or TTL.
-
Redis failure must be graceful — Every Redis call must be wrapped in try/except. If Redis is down, the app must still work via DynamoDB directly.