LOCAL PREVIEW View on GitHub

PO-04: DynamoDB Conversation Memory Read Performance

User Story

As a backend engineer, I want to reduce conversation memory read latency from DynamoDB to under 10ms at p95 using DAX and efficient access patterns, So that context loading never becomes a bottleneck in the orchestrator's critical path.

Acceptance Criteria

  • DAX cluster provides sub-5ms reads for recent session metadata at p95.
  • Single-query pattern fetches META + last N turns in one DynamoDB call.
  • Hot session data (active conversations) hits DAX cache > 80% of the time.
  • Write latency for new TURN items remains under 15ms at p95.
  • Session context load adds no more than 10ms to the critical path.

High-Level Design

The Memory Latency Problem

Every incoming message requires loading the session's META item and the most recent conversation turns. This is on the critical path before intent classification can use conversation context.

graph LR
    subgraph "Current: Two Queries (~25ms)"
        A[Get META item<br>~12ms] --> B[Query latest turns<br>~15ms]
        B --> C[Build context<br>~2ms]
    end

    subgraph "Optimized: Single Query + DAX (~5ms)"
        D[DAX: Query pk + sk range<br>~3ms] --> E[Build context<br>~2ms]
    end

    style A fill:#f66,stroke:#333
    style B fill:#f66,stroke:#333
    style D fill:#2d8,stroke:#333

Optimization Strategy

graph TD
    subgraph "Read Path"
        A1[DAX Accelerator<br>In-memory cache]
        A2[Single Query Pattern<br>pk + sk range]
        A3[Projection Expression<br>Fetch only needed fields]
    end

    subgraph "Write Path"
        B1[Async Turn Writes<br>Non-blocking]
        B2[Batch META Updates<br>Debounced]
        B3[Conditional Writes<br>Avoid conflicts]
    end

    subgraph "Data Layout"
        C1[Sort Key Design<br>META + TURNs colocated]
        C2[Item Size Control<br>Compact turn format]
        C3[TTL Cleanup<br>Automatic expiry]
    end

    A1 --> D[p95 < 10ms]
    A2 --> D
    B1 --> D
    C1 --> D

Low-Level Design

1. DAX (DynamoDB Accelerator) Configuration

DAX provides an in-memory cache in front of DynamoDB with microsecond read latency for cached items.

graph TD
    subgraph "Request Flow"
        A[Orchestrator] --> B{DAX Cluster<br>3 nodes, r6g.large}
        B -->|Cache Hit<br>~0.5ms| C[Return Data]
        B -->|Cache Miss<br>~5ms| D[DynamoDB Table]
        D --> E[Populate DAX Cache]
        E --> C
    end

    subgraph "DAX Cache Properties"
        F[Item Cache TTL: 60s<br>Active sessions]
        G[Query Cache TTL: 30s<br>Turn queries]
        H[Cluster: 3 nodes<br>Cross-AZ]
    end

    style B fill:#2d8,stroke:#333

Code Example: DAX-Backed Memory Client

import asyncio
import time
from dataclasses import dataclass, field
from typing import Optional

import boto3
from amazondax import AmazonDaxClient


@dataclass
class SessionContext:
    session_id: str
    customer_id: str | None
    turns: list[dict] = field(default_factory=list)
    summaries: list[dict] = field(default_factory=list)
    page_context: dict = field(default_factory=dict)
    turn_count: int = 0
    last_intent: str | None = None
    load_latency_ms: float = 0.0


class DaxBackedMemoryClient:
    """Conversation memory client with DAX for sub-10ms reads."""

    TABLE_NAME = "manga_chatbot_memory"

    def __init__(self, dax_endpoint: str, region: str = "us-east-1"):
        # DAX client for reads (cached)
        self.dax = AmazonDaxClient(
            endpoints=[dax_endpoint],
            region_name=region,
        )
        # Standard DynamoDB client for writes (DAX write-through)
        self.dynamodb = boto3.resource("dynamodb", region_name=region)
        self.table = self.dynamodb.Table(self.TABLE_NAME)

    async def load_session_context(
        self,
        session_id: str,
        max_turns: int = 10,
    ) -> SessionContext:
        """Load META + recent turns in a single query via DAX."""
        start = time.monotonic()

        # Single query: fetch all items for the session
        # The sort key design ensures META, SUMMARY, and TURN items
        # are colocated under the same partition key
        response = await asyncio.to_thread(
            self.dax.query,
            TableName=self.TABLE_NAME,
            KeyConditionExpression="pk = :pk",
            ExpressionAttributeValues={
                ":pk": {"S": f"SESSION#{session_id}"},
            },
            # Only fetch needed attributes to reduce transfer size
            ProjectionExpression=(
                "pk, sk, #r, content, intent, page_context, "
                "customer_id, turn_count, last_intent, created_at"
            ),
            ExpressionAttributeNames={"#r": "role"},
            ScanIndexForward=False,  # Newest items first
            Limit=max_turns + 5,  # turns + META + summaries buffer
        )

        items = response.get("Items", [])
        context = SessionContext(session_id=session_id)

        for item in items:
            sk = item["sk"]["S"]

            if sk == "META":
                context.customer_id = item.get("customer_id", {}).get("S")
                context.turn_count = int(item.get("turn_count", {}).get("N", "0"))
                context.last_intent = item.get("last_intent", {}).get("S")
                page_ctx = item.get("page_context", {}).get("M", {})
                context.page_context = self._parse_map(page_ctx)

            elif sk.startswith("TURN#"):
                context.turns.append({
                    "role": item.get("role", {}).get("S", "user"),
                    "content": item.get("content", {}).get("S", ""),
                    "intent": item.get("intent", {}).get("S"),
                    "timestamp": sk.split("#")[1],
                })

            elif sk.startswith("SUMMARY#"):
                context.summaries.append({
                    "content": item.get("content", {}).get("S", ""),
                    "window_id": sk.split("#")[1],
                })

        # Reverse turns to chronological order (we queried newest-first)
        context.turns.reverse()

        # Keep only the most recent max_turns
        if len(context.turns) > max_turns:
            context.turns = context.turns[-max_turns:]

        context.load_latency_ms = (time.monotonic() - start) * 1000
        return context

    async def save_turn(
        self,
        session_id: str,
        role: str,
        content: str,
        intent: str | None = None,
        response_id: str | None = None,
    ) -> None:
        """Write a new turn item (non-blocking, fire-and-forget friendly)."""
        import time as time_mod
        timestamp = str(int(time_mod.time() * 1000))
        ttl = int(time_mod.time()) + 86400  # 24-hour expiry

        item = {
            "pk": f"SESSION#{session_id}",
            "sk": f"TURN#{timestamp}",
            "role": role,
            "content": content,
            "created_at": int(timestamp),
            "ttl": ttl,
        }
        if intent:
            item["intent"] = intent
        if response_id:
            item["response_id"] = response_id

        await asyncio.to_thread(self.table.put_item, Item=item)

    async def update_meta(
        self,
        session_id: str,
        turn_count: int,
        last_intent: str,
        page_context: Optional[dict] = None,
    ) -> None:
        """Update session META item with latest state."""
        import time as time_mod
        now = int(time_mod.time() * 1000)

        update_expr = (
            "SET turn_count = :tc, last_intent = :li, updated_at = :ua"
        )
        expr_values = {
            ":tc": turn_count,
            ":li": last_intent,
            ":ua": now,
        }

        if page_context is not None:
            update_expr += ", page_context = :pc"
            expr_values[":pc"] = page_context

        await asyncio.to_thread(
            self.table.update_item,
            Key={
                "pk": f"SESSION#{session_id}",
                "sk": "META",
            },
            UpdateExpression=update_expr,
            ExpressionAttributeValues=expr_values,
        )

    def _parse_map(self, dynamodb_map: dict) -> dict:
        """Parse a DynamoDB Map attribute to a plain dict."""
        result = {}
        for key, value in dynamodb_map.items():
            if "S" in value:
                result[key] = value["S"]
            elif "N" in value:
                result[key] = int(value["N"])
            elif "L" in value:
                result[key] = [
                    v.get("S", v.get("N")) for v in value["L"]
                ]
        return result

2. Async Write Pattern

Turn writes and META updates should not block the response path. Write asynchronously and rely on DAX for read consistency within the session.

sequenceDiagram
    participant Orchestrator
    participant DAX
    participant DynamoDB
    participant SQS as Dead Letter Queue

    Note over Orchestrator: Response generated, ready to return

    par Return response to user
        Orchestrator-->>Orchestrator: Stream response to client
    and Write turn async
        Orchestrator->>DynamoDB: PutItem (TURN)
        alt Write succeeds
            DynamoDB-->>Orchestrator: 200 OK
            Orchestrator->>DynamoDB: UpdateItem (META)
        else Write throttled (retry 2x)
            DynamoDB-->>Orchestrator: Throttled
            Orchestrator->>DynamoDB: Retry with backoff
            alt Retry succeeds
                DynamoDB-->>Orchestrator: 200 OK
            else Retry fails
                Orchestrator->>SQS: Send to DLQ for async retry
            end
        end
    end

Code Example: Non-Blocking Write Manager

import asyncio
import logging
import time
from typing import Optional

logger = logging.getLogger(__name__)


class AsyncWriteManager:
    """Manages non-blocking writes to DynamoDB for conversation turns."""

    MAX_RETRIES = 2
    BASE_BACKOFF_MS = 50

    def __init__(
        self,
        memory_client: "DaxBackedMemoryClient",
        dlq_client: Optional[object] = None,
    ):
        self.memory = memory_client
        self.dlq = dlq_client

    async def save_turn_non_blocking(
        self,
        session_id: str,
        role: str,
        content: str,
        intent: str | None,
        response_id: str | None,
        turn_count: int,
        page_context: dict | None,
    ) -> None:
        """Fire-and-forget turn save with retry and DLQ fallback."""
        asyncio.create_task(
            self._save_with_retry(
                session_id, role, content, intent,
                response_id, turn_count, page_context,
            )
        )

    async def _save_with_retry(
        self,
        session_id: str,
        role: str,
        content: str,
        intent: str | None,
        response_id: str | None,
        turn_count: int,
        page_context: dict | None,
    ) -> None:
        """Attempt write with exponential backoff, fall to DLQ on failure."""
        for attempt in range(self.MAX_RETRIES + 1):
            try:
                await self.memory.save_turn(
                    session_id=session_id,
                    role=role,
                    content=content,
                    intent=intent,
                    response_id=response_id,
                )
                # Update META after successful turn write
                await self.memory.update_meta(
                    session_id=session_id,
                    turn_count=turn_count,
                    last_intent=intent or "unknown",
                    page_context=page_context,
                )
                return

            except Exception as e:
                if attempt < self.MAX_RETRIES:
                    backoff = self.BASE_BACKOFF_MS * (2 ** attempt) / 1000
                    logger.warning(
                        f"DynamoDB write attempt {attempt + 1} failed: {e}. "
                        f"Retrying in {backoff}s"
                    )
                    await asyncio.sleep(backoff)
                else:
                    logger.error(
                        f"DynamoDB write failed after {self.MAX_RETRIES + 1} "
                        f"attempts for session {session_id}: {e}"
                    )
                    if self.dlq:
                        await self._send_to_dlq(
                            session_id, role, content, intent, response_id
                        )

    async def _send_to_dlq(
        self,
        session_id: str,
        role: str,
        content: str,
        intent: str | None,
        response_id: str | None,
    ) -> None:
        """Send failed write to SQS dead letter queue for async retry."""
        import json

        message = {
            "session_id": session_id,
            "role": role,
            "content": content,
            "intent": intent,
            "response_id": response_id,
            "timestamp": int(time.time() * 1000),
        }

        try:
            await asyncio.to_thread(
                self.dlq.send_message,
                MessageBody=json.dumps(message),
            )
        except Exception as e:
            logger.error(f"Failed to send to DLQ: {e}")

3. Projection Expression Optimization

Fetching only needed fields reduces data transfer and DynamoDB read capacity consumption.

graph LR
    subgraph "Without Projection"
        A[Full Item<br>~2 KB avg] --> B[All attributes<br>including embedding refs]
        B --> C[Transfer: 20 KB<br>for 10 turns]
    end

    subgraph "With Projection"
        D[Projected Item<br>~0.5 KB avg] --> E[Only: role, content,<br>intent, timestamp]
        E --> F[Transfer: 5 KB<br>for 10 turns]
    end

    style C fill:#f66,stroke:#333
    style F fill:#2d8,stroke:#333
Access Pattern Fields Needed Fields Excluded
Context load (typical) role, content, intent, sk guardrail_flags, response_id, full page_context
Summarization role, content Everything else
Escalation handoff All fields None (full read)
Analytics export All fields None (full read)

Metrics and Monitoring

Metric Target Alarm Threshold
memory.read_latency_ms p95 < 10ms p95 > 20ms for 5 min
dax.cache_hit_rate > 80% < 60%
dax.item_cache_misses < 20% of reads > 40%
memory.write_latency_ms p95 < 15ms p95 > 30ms
memory.dlq_messages 0 under normal load > 10/min
memory.items_per_query avg < 12 avg > 20 (query too broad)
graph TD
    subgraph "DAX Cluster Monitoring"
        A[Cache Hit Rate<br>Target: > 80%]
        B[Item Cache Size<br>Monitor evictions]
        C[CPU Utilization<br>Target: < 70%]
        D[Connections<br>Monitor pool usage]
    end

    A --> E{< 60%?}
    E -->|Yes| F[Increase cache TTL<br>or node size]
    C --> G{> 70%?}
    G -->|Yes| H[Scale out DAX nodes]