PO-04: DynamoDB Conversation Memory Read Performance
User Story
As a backend engineer, I want to reduce conversation memory read latency from DynamoDB to under 10ms at p95 using DAX and efficient access patterns, So that context loading never becomes a bottleneck in the orchestrator's critical path.
Acceptance Criteria
- DAX cluster provides sub-5ms reads for recent session metadata at p95.
- Single-query pattern fetches META + last N turns in one DynamoDB call.
- Hot session data (active conversations) hits DAX cache > 80% of the time.
- Write latency for new TURN items remains under 15ms at p95.
- Session context load adds no more than 10ms to the critical path.
High-Level Design
The Memory Latency Problem
Every incoming message requires loading the session's META item and the most recent conversation turns. This is on the critical path before intent classification can use conversation context.
graph LR
subgraph "Current: Two Queries (~25ms)"
A[Get META item<br>~12ms] --> B[Query latest turns<br>~15ms]
B --> C[Build context<br>~2ms]
end
subgraph "Optimized: Single Query + DAX (~5ms)"
D[DAX: Query pk + sk range<br>~3ms] --> E[Build context<br>~2ms]
end
style A fill:#f66,stroke:#333
style B fill:#f66,stroke:#333
style D fill:#2d8,stroke:#333
Optimization Strategy
graph TD
subgraph "Read Path"
A1[DAX Accelerator<br>In-memory cache]
A2[Single Query Pattern<br>pk + sk range]
A3[Projection Expression<br>Fetch only needed fields]
end
subgraph "Write Path"
B1[Async Turn Writes<br>Non-blocking]
B2[Batch META Updates<br>Debounced]
B3[Conditional Writes<br>Avoid conflicts]
end
subgraph "Data Layout"
C1[Sort Key Design<br>META + TURNs colocated]
C2[Item Size Control<br>Compact turn format]
C3[TTL Cleanup<br>Automatic expiry]
end
A1 --> D[p95 < 10ms]
A2 --> D
B1 --> D
C1 --> D
Low-Level Design
1. DAX (DynamoDB Accelerator) Configuration
DAX provides an in-memory cache in front of DynamoDB with microsecond read latency for cached items.
graph TD
subgraph "Request Flow"
A[Orchestrator] --> B{DAX Cluster<br>3 nodes, r6g.large}
B -->|Cache Hit<br>~0.5ms| C[Return Data]
B -->|Cache Miss<br>~5ms| D[DynamoDB Table]
D --> E[Populate DAX Cache]
E --> C
end
subgraph "DAX Cache Properties"
F[Item Cache TTL: 60s<br>Active sessions]
G[Query Cache TTL: 30s<br>Turn queries]
H[Cluster: 3 nodes<br>Cross-AZ]
end
style B fill:#2d8,stroke:#333
Code Example: DAX-Backed Memory Client
import asyncio
import time
from dataclasses import dataclass, field
from typing import Optional
import boto3
from amazondax import AmazonDaxClient
@dataclass
class SessionContext:
session_id: str
customer_id: str | None
turns: list[dict] = field(default_factory=list)
summaries: list[dict] = field(default_factory=list)
page_context: dict = field(default_factory=dict)
turn_count: int = 0
last_intent: str | None = None
load_latency_ms: float = 0.0
class DaxBackedMemoryClient:
"""Conversation memory client with DAX for sub-10ms reads."""
TABLE_NAME = "manga_chatbot_memory"
def __init__(self, dax_endpoint: str, region: str = "us-east-1"):
# DAX client for reads (cached)
self.dax = AmazonDaxClient(
endpoints=[dax_endpoint],
region_name=region,
)
# Standard DynamoDB client for writes (DAX write-through)
self.dynamodb = boto3.resource("dynamodb", region_name=region)
self.table = self.dynamodb.Table(self.TABLE_NAME)
async def load_session_context(
self,
session_id: str,
max_turns: int = 10,
) -> SessionContext:
"""Load META + recent turns in a single query via DAX."""
start = time.monotonic()
# Single query: fetch all items for the session
# The sort key design ensures META, SUMMARY, and TURN items
# are colocated under the same partition key
response = await asyncio.to_thread(
self.dax.query,
TableName=self.TABLE_NAME,
KeyConditionExpression="pk = :pk",
ExpressionAttributeValues={
":pk": {"S": f"SESSION#{session_id}"},
},
# Only fetch needed attributes to reduce transfer size
ProjectionExpression=(
"pk, sk, #r, content, intent, page_context, "
"customer_id, turn_count, last_intent, created_at"
),
ExpressionAttributeNames={"#r": "role"},
ScanIndexForward=False, # Newest items first
Limit=max_turns + 5, # turns + META + summaries buffer
)
items = response.get("Items", [])
context = SessionContext(session_id=session_id)
for item in items:
sk = item["sk"]["S"]
if sk == "META":
context.customer_id = item.get("customer_id", {}).get("S")
context.turn_count = int(item.get("turn_count", {}).get("N", "0"))
context.last_intent = item.get("last_intent", {}).get("S")
page_ctx = item.get("page_context", {}).get("M", {})
context.page_context = self._parse_map(page_ctx)
elif sk.startswith("TURN#"):
context.turns.append({
"role": item.get("role", {}).get("S", "user"),
"content": item.get("content", {}).get("S", ""),
"intent": item.get("intent", {}).get("S"),
"timestamp": sk.split("#")[1],
})
elif sk.startswith("SUMMARY#"):
context.summaries.append({
"content": item.get("content", {}).get("S", ""),
"window_id": sk.split("#")[1],
})
# Reverse turns to chronological order (we queried newest-first)
context.turns.reverse()
# Keep only the most recent max_turns
if len(context.turns) > max_turns:
context.turns = context.turns[-max_turns:]
context.load_latency_ms = (time.monotonic() - start) * 1000
return context
async def save_turn(
self,
session_id: str,
role: str,
content: str,
intent: str | None = None,
response_id: str | None = None,
) -> None:
"""Write a new turn item (non-blocking, fire-and-forget friendly)."""
import time as time_mod
timestamp = str(int(time_mod.time() * 1000))
ttl = int(time_mod.time()) + 86400 # 24-hour expiry
item = {
"pk": f"SESSION#{session_id}",
"sk": f"TURN#{timestamp}",
"role": role,
"content": content,
"created_at": int(timestamp),
"ttl": ttl,
}
if intent:
item["intent"] = intent
if response_id:
item["response_id"] = response_id
await asyncio.to_thread(self.table.put_item, Item=item)
async def update_meta(
self,
session_id: str,
turn_count: int,
last_intent: str,
page_context: Optional[dict] = None,
) -> None:
"""Update session META item with latest state."""
import time as time_mod
now = int(time_mod.time() * 1000)
update_expr = (
"SET turn_count = :tc, last_intent = :li, updated_at = :ua"
)
expr_values = {
":tc": turn_count,
":li": last_intent,
":ua": now,
}
if page_context is not None:
update_expr += ", page_context = :pc"
expr_values[":pc"] = page_context
await asyncio.to_thread(
self.table.update_item,
Key={
"pk": f"SESSION#{session_id}",
"sk": "META",
},
UpdateExpression=update_expr,
ExpressionAttributeValues=expr_values,
)
def _parse_map(self, dynamodb_map: dict) -> dict:
"""Parse a DynamoDB Map attribute to a plain dict."""
result = {}
for key, value in dynamodb_map.items():
if "S" in value:
result[key] = value["S"]
elif "N" in value:
result[key] = int(value["N"])
elif "L" in value:
result[key] = [
v.get("S", v.get("N")) for v in value["L"]
]
return result
2. Async Write Pattern
Turn writes and META updates should not block the response path. Write asynchronously and rely on DAX for read consistency within the session.
sequenceDiagram
participant Orchestrator
participant DAX
participant DynamoDB
participant SQS as Dead Letter Queue
Note over Orchestrator: Response generated, ready to return
par Return response to user
Orchestrator-->>Orchestrator: Stream response to client
and Write turn async
Orchestrator->>DynamoDB: PutItem (TURN)
alt Write succeeds
DynamoDB-->>Orchestrator: 200 OK
Orchestrator->>DynamoDB: UpdateItem (META)
else Write throttled (retry 2x)
DynamoDB-->>Orchestrator: Throttled
Orchestrator->>DynamoDB: Retry with backoff
alt Retry succeeds
DynamoDB-->>Orchestrator: 200 OK
else Retry fails
Orchestrator->>SQS: Send to DLQ for async retry
end
end
end
Code Example: Non-Blocking Write Manager
import asyncio
import logging
import time
from typing import Optional
logger = logging.getLogger(__name__)
class AsyncWriteManager:
"""Manages non-blocking writes to DynamoDB for conversation turns."""
MAX_RETRIES = 2
BASE_BACKOFF_MS = 50
def __init__(
self,
memory_client: "DaxBackedMemoryClient",
dlq_client: Optional[object] = None,
):
self.memory = memory_client
self.dlq = dlq_client
async def save_turn_non_blocking(
self,
session_id: str,
role: str,
content: str,
intent: str | None,
response_id: str | None,
turn_count: int,
page_context: dict | None,
) -> None:
"""Fire-and-forget turn save with retry and DLQ fallback."""
asyncio.create_task(
self._save_with_retry(
session_id, role, content, intent,
response_id, turn_count, page_context,
)
)
async def _save_with_retry(
self,
session_id: str,
role: str,
content: str,
intent: str | None,
response_id: str | None,
turn_count: int,
page_context: dict | None,
) -> None:
"""Attempt write with exponential backoff, fall to DLQ on failure."""
for attempt in range(self.MAX_RETRIES + 1):
try:
await self.memory.save_turn(
session_id=session_id,
role=role,
content=content,
intent=intent,
response_id=response_id,
)
# Update META after successful turn write
await self.memory.update_meta(
session_id=session_id,
turn_count=turn_count,
last_intent=intent or "unknown",
page_context=page_context,
)
return
except Exception as e:
if attempt < self.MAX_RETRIES:
backoff = self.BASE_BACKOFF_MS * (2 ** attempt) / 1000
logger.warning(
f"DynamoDB write attempt {attempt + 1} failed: {e}. "
f"Retrying in {backoff}s"
)
await asyncio.sleep(backoff)
else:
logger.error(
f"DynamoDB write failed after {self.MAX_RETRIES + 1} "
f"attempts for session {session_id}: {e}"
)
if self.dlq:
await self._send_to_dlq(
session_id, role, content, intent, response_id
)
async def _send_to_dlq(
self,
session_id: str,
role: str,
content: str,
intent: str | None,
response_id: str | None,
) -> None:
"""Send failed write to SQS dead letter queue for async retry."""
import json
message = {
"session_id": session_id,
"role": role,
"content": content,
"intent": intent,
"response_id": response_id,
"timestamp": int(time.time() * 1000),
}
try:
await asyncio.to_thread(
self.dlq.send_message,
MessageBody=json.dumps(message),
)
except Exception as e:
logger.error(f"Failed to send to DLQ: {e}")
3. Projection Expression Optimization
Fetching only needed fields reduces data transfer and DynamoDB read capacity consumption.
graph LR
subgraph "Without Projection"
A[Full Item<br>~2 KB avg] --> B[All attributes<br>including embedding refs]
B --> C[Transfer: 20 KB<br>for 10 turns]
end
subgraph "With Projection"
D[Projected Item<br>~0.5 KB avg] --> E[Only: role, content,<br>intent, timestamp]
E --> F[Transfer: 5 KB<br>for 10 turns]
end
style C fill:#f66,stroke:#333
style F fill:#2d8,stroke:#333
| Access Pattern | Fields Needed | Fields Excluded |
|---|---|---|
| Context load (typical) | role, content, intent, sk |
guardrail_flags, response_id, full page_context |
| Summarization | role, content |
Everything else |
| Escalation handoff | All fields | None (full read) |
| Analytics export | All fields | None (full read) |
Metrics and Monitoring
| Metric | Target | Alarm Threshold |
|---|---|---|
memory.read_latency_ms |
p95 < 10ms | p95 > 20ms for 5 min |
dax.cache_hit_rate |
> 80% | < 60% |
dax.item_cache_misses |
< 20% of reads | > 40% |
memory.write_latency_ms |
p95 < 15ms | p95 > 30ms |
memory.dlq_messages |
0 under normal load | > 10/min |
memory.items_per_query |
avg < 12 | avg > 20 (query too broad) |
graph TD
subgraph "DAX Cluster Monitoring"
A[Cache Hit Rate<br>Target: > 80%]
B[Item Cache Size<br>Monitor evictions]
C[CPU Utilization<br>Target: < 70%]
D[Connections<br>Monitor pool usage]
end
A --> E{< 60%?}
E -->|Yes| F[Increase cache TTL<br>or node size]
C --> G{> 70%?}
G -->|Yes| H[Scale out DAX nodes]