LOCAL PREVIEW View on GitHub

DynamoDB + Lambda Integration — Low-Level Design

Project: MangaAssist (Amazon-style chatbot) Scope: How Lambda functions interact with DynamoDB for every chatbot operation, trigger patterns, error handling, and production-grade code.


1. Why Lambda + DynamoDB Is the Core Pair

In MangaAssist, every user message flows through Lambda before touching DynamoDB. Lambda is the compute layer; DynamoDB is the state layer. Understanding how they interact is the single most important integration to master.


2. Architecture Overview

flowchart TB
    subgraph Client
        USER[👤 Customer Browser/App]
    end

    subgraph API_Layer["API Layer"]
        APIGW[API Gateway WebSocket]
        ALB[Application Load Balancer]
    end

    subgraph Compute["Lambda Functions"]
        CONNECT["$connect Handler"]
        MSG["$default Message Handler"]
        DISCONNECT["$disconnect Handler"]
        TURN_WRITER["Turn Writer Lambda"]
        SUMMARY_GEN["Summary Generator Lambda"]
        SESSION_CLEANUP["Session Cleanup Lambda"]
        HANDOFF["Human Handoff Lambda"]
    end

    subgraph State["DynamoDB"]
        SESSIONS[(manga-assist-sessions)]
    end

    subgraph AI_Layer["AI/ML"]
        BEDROCK[Amazon Bedrock / LLM]
    end

    subgraph Async["Async Processing"]
        SQS_RETRY[SQS Dead Letter Queue]
        STREAMS[DynamoDB Streams]
    end

    USER -->|WebSocket| APIGW
    APIGW --> CONNECT
    APIGW --> MSG
    APIGW --> DISCONNECT

    CONNECT -->|PutItem: create session META| SESSIONS
    MSG -->|Query: load context| SESSIONS
    MSG -->|PutItem: write user TURN| SESSIONS
    MSG -->|Invoke| BEDROCK
    MSG -->|PutItem: write assistant TURN| SESSIONS

    TURN_WRITER -->|BatchWriteItem: async writes| SESSIONS
    SUMMARY_GEN -->|Query + PutItem| SESSIONS
    SESSION_CLEANUP -->|DeleteItem: expired sessions| SESSIONS
    HANDOFF -->|UpdateItem: transfer session| SESSIONS

    SESSIONS -->|Change events| STREAMS
    STREAMS --> SUMMARY_GEN
    STREAMS --> SESSION_CLEANUP

    MSG -.->|Failed writes| SQS_RETRY
    SQS_RETRY --> TURN_WRITER

3. Request Lifecycle — What Happens on Every Message

sequenceDiagram
    participant U as User
    participant GW as API Gateway
    participant L as Message Lambda
    participant D as DynamoDB
    participant B as Bedrock LLM
    participant S as DynamoDB Streams

    U->>GW: Send message via WebSocket
    GW->>L: Invoke $default route

    Note over L: Step 1 — Load Context
    L->>D: Query(PK=SESSION#abc, SK begins_with TURN#, Limit=20, ScanIndexForward=false)
    D-->>L: Last 20 turns (newest first)
    L->>D: Query(PK=SESSION#abc, SK begins_with SUMMARY#, Limit=1, ScanIndexForward=false)
    D-->>L: Latest summary

    Note over L: Step 2 — Write User Turn
    L->>D: PutItem(PK=SESSION#abc, SK=TURN#1711324800000, role=user, response_id=uuid-1)
    D-->>L: 200 OK (or ConditionalCheckFailed if duplicate)

    Note over L: Step 3 — Call LLM
    L->>B: InvokeModel(context + user message)
    B-->>L: AI response (streamed)

    Note over L: Step 4 — Write Assistant Turn
    L->>D: PutItem(PK=SESSION#abc, SK=TURN#1711324801000, role=assistant, response_id=uuid-2)
    D-->>L: 200 OK

    Note over L: Step 5 — Update Session Metadata
    L->>D: UpdateItem(PK=SESSION#abc, SK=META, SET turn_count=turn_count+1, updated_at=now)
    D-->>L: 200 OK

    L-->>GW: Return AI response
    GW-->>U: Push response via WebSocket

    Note over D,S: Async — Stream triggers summary check
    D->>S: NewImage event for new TURN item
    S->>S: Summary Lambda evaluates if turn_count % 10 == 0

4. Lambda Function Implementations

4a. $connect — Session Initialization

"""
Lambda: manga-assist-connect
Trigger: API Gateway WebSocket $connect route
Purpose: Create or resume a DynamoDB session when user connects
"""

import json
import os
import time
import uuid
import boto3
from botocore.exceptions import ClientError

dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ["SESSION_TABLE"])

# Connection TTL: 24 hours
SESSION_TTL_SECONDS = 86400


def handler(event, context):
    connection_id = event["requestContext"]["connectionId"]
    query_params = event.get("queryStringParameters") or {}
    customer_id = query_params.get("customer_id")
    existing_session_id = query_params.get("session_id")

    now = int(time.time())
    ttl = now + SESSION_TTL_SECONDS

    # --- RESUME: If client passes an existing session_id, validate it ---
    if existing_session_id:
        try:
            resp = table.get_item(
                Key={"PK": f"SESSION#{existing_session_id}", "SK": "META"},
                ProjectionExpression="session_id, customer_id, #s",
                ExpressionAttributeNames={"#s": "status"},
            )
            item = resp.get("Item")
            if item and item.get("status") == "active":
                # Rebind connection_id to existing session
                table.update_item(
                    Key={"PK": f"SESSION#{existing_session_id}", "SK": "META"},
                    UpdateExpression="SET connection_id = :cid, updated_at = :now, #ttl = :ttl",
                    ExpressionAttributeNames={"#ttl": "ttl"},
                    ExpressionAttributeValues={
                        ":cid": connection_id,
                        ":now": now,
                        ":ttl": ttl,
                    },
                )
                return {"statusCode": 200, "body": json.dumps({"session_id": existing_session_id, "resumed": True})}
        except ClientError as e:
            print(f"Resume failed: {e.response['Error']['Code']}")
            # Fall through to create new session

    # --- NEW SESSION ---
    session_id = str(uuid.uuid4())

    table.put_item(
        Item={
            "PK": f"SESSION#{session_id}",
            "SK": "META",
            "session_id": session_id,
            "customer_id": customer_id or "anonymous",
            "connection_id": connection_id,
            "status": "active",
            "turn_count": 0,
            "created_at": now,
            "updated_at": now,
            "ttl": ttl,
        },
        # Prevent accidental overwrite of existing session
        ConditionExpression="attribute_not_exists(PK)",
    )

    return {"statusCode": 200, "body": json.dumps({"session_id": session_id, "resumed": False})}

4b. $default — Core Message Handler (The Critical Path)

"""
Lambda: manga-assist-message
Trigger: API Gateway WebSocket $default route
Purpose: Process user message → load context → call LLM → save turns
"""

import json
import os
import time
import uuid
import zlib
import boto3
from botocore.exceptions import ClientError
from botocore.config import Config

# --- Clients with retry config ---
boto_config = Config(
    retries={"max_attempts": 3, "mode": "adaptive"},
    read_timeout=30,
    connect_timeout=5,
)
dynamodb = boto3.resource("dynamodb", config=boto_config)
table = dynamodb.Table(os.environ["SESSION_TABLE"])
bedrock = boto3.client("bedrock-runtime", config=boto_config)

# API Gateway Management API for pushing response back
apigw_management = None  # Initialized lazily from event

MAX_CONTEXT_TURNS = 20
MAX_CONTEXT_TOKENS = 4000


def handler(event, context):
    global apigw_management

    connection_id = event["requestContext"]["connectionId"]
    domain = event["requestContext"]["domainName"]
    stage = event["requestContext"]["stage"]

    # Lazy-init the management API client
    if apigw_management is None:
        apigw_management = boto3.client(
            "apigatewaymanagementapi",
            endpoint_url=f"https://{domain}/{stage}",
        )

    body = json.loads(event.get("body", "{}"))
    session_id = body.get("session_id")
    user_message = body.get("message", "").strip()

    if not session_id or not user_message:
        return _error(connection_id, "Missing session_id or message")

    pk = f"SESSION#{session_id}"
    now_ms = int(time.time() * 1000)
    response_id = str(uuid.uuid4())

    try:
        # ── STEP 1: Load context in parallel-friendly queries ──
        context_turns = _load_recent_turns(pk, MAX_CONTEXT_TURNS)
        latest_summary = _load_latest_summary(pk)

        # ── STEP 2: Write user turn (idempotent) ──
        _write_turn(
            pk=pk,
            sort_key=f"TURN#{now_ms}",
            role="user",
            content=user_message,
            response_id=response_id,
        )

        # ── STEP 3: Build prompt and call LLM ──
        prompt_messages = _build_prompt(latest_summary, context_turns, user_message)
        ai_response = _invoke_bedrock(prompt_messages)

        # ── STEP 4: Write assistant turn ──
        assistant_response_id = str(uuid.uuid4())
        _write_turn(
            pk=pk,
            sort_key=f"TURN#{now_ms + 1}",  # +1ms to guarantee ordering
            role="assistant",
            content=ai_response,
            response_id=assistant_response_id,
        )

        # ── STEP 5: Update META (atomic counter) ──
        table.update_item(
            Key={"PK": pk, "SK": "META"},
            UpdateExpression="SET turn_count = turn_count + :inc, updated_at = :now",
            ExpressionAttributeValues={":inc": 2, ":now": int(time.time())},
        )

        # ── STEP 6: Push response to client ──
        _send_to_client(connection_id, {
            "type": "message",
            "response_id": assistant_response_id,
            "content": ai_response,
        })

        return {"statusCode": 200}

    except ClientError as e:
        code = e.response["Error"]["Code"]
        if code == "ConditionalCheckFailedException":
            # Duplicate write — safe to ignore
            return {"statusCode": 200}
        if code == "ProvisionedThroughputExceededException":
            # Throttled — let SQS DLQ retry handle it
            raise  # Lambda retries or DLQ picks it up
        raise


def _load_recent_turns(pk: str, limit: int) -> list:
    """Load last N turns (newest first), then reverse to chronological."""
    resp = table.query(
        KeyConditionExpression="PK = :pk AND begins_with(SK, :prefix)",
        ExpressionAttributeValues={":pk": pk, ":prefix": "TURN#"},
        ScanIndexForward=False,
        Limit=limit,
        ProjectionExpression="SK, #r, content_compressed, token_count",
        ExpressionAttributeNames={"#r": "role"},
    )
    turns = resp.get("Items", [])
    turns.reverse()  # Chronological order for prompt
    return turns


def _load_latest_summary(pk: str) -> str | None:
    """Load the most recent conversation summary."""
    resp = table.query(
        KeyConditionExpression="PK = :pk AND begins_with(SK, :prefix)",
        ExpressionAttributeValues={":pk": pk, ":prefix": "SUMMARY#"},
        ScanIndexForward=False,
        Limit=1,
        ProjectionExpression="summary_text",
    )
    items = resp.get("Items", [])
    return items[0]["summary_text"] if items else None


def _write_turn(pk: str, sort_key: str, role: str, content: str, response_id: str):
    """Write a turn item with idempotency guard on response_id."""
    compressed = zlib.compress(content.encode("utf-8"))
    table.put_item(
        Item={
            "PK": pk,
            "SK": sort_key,
            "role": role,
            "content_compressed": compressed,
            "token_count": len(content.split()) * 1.3,  # rough estimate
            "response_id": response_id,
            "ttl": int(time.time()) + 86400,
        },
        # Idempotency: skip if this exact response_id was already written
        ConditionExpression="attribute_not_exists(response_id) OR response_id <> :rid",
        ExpressionAttributeValues={":rid": response_id},
    )


def _build_prompt(summary: str | None, turns: list, user_message: str) -> list:
    """Assemble the LLM prompt from summary + recent turns + new message."""
    messages = []

    if summary:
        messages.append({
            "role": "user",
            "content": f"[Previous conversation summary]: {summary}",
        })
        messages.append({
            "role": "assistant",
            "content": "Understood, I have the conversation context.",
        })

    for turn in turns:
        content_bytes = turn.get("content_compressed", b"")
        if content_bytes:
            text = zlib.decompress(content_bytes).decode("utf-8")
            messages.append({"role": turn["role"], "content": text})

    messages.append({"role": "user", "content": user_message})
    return messages


def _invoke_bedrock(messages: list) -> str:
    """Call Amazon Bedrock Claude model."""
    response = bedrock.invoke_model(
        modelId=os.environ.get("MODEL_ID", "anthropic.claude-3-sonnet-20240229-v1:0"),
        contentType="application/json",
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 1024,
            "messages": messages,
            "system": "You are MangaAssist, a helpful Amazon customer support chatbot.",
        }),
    )
    result = json.loads(response["body"].read())
    return result["content"][0]["text"]


def _send_to_client(connection_id: str, payload: dict):
    """Push message back to the WebSocket client."""
    try:
        apigw_management.post_to_connection(
            ConnectionId=connection_id,
            Data=json.dumps(payload).encode("utf-8"),
        )
    except ClientError as e:
        if e.response["Error"]["Code"] == "GoneException":
            print(f"Connection {connection_id} is gone, client disconnected")
        else:
            raise


def _error(connection_id: str, message: str):
    _send_to_client(connection_id, {"type": "error", "message": message})
    return {"statusCode": 400}

4c. $disconnect — Clean Disconnect

"""
Lambda: manga-assist-disconnect
Trigger: API Gateway WebSocket $disconnect route
Purpose: Mark session as disconnected (NOT delete — user might reconnect)
"""

import os
import time
import boto3

dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ["SESSION_TABLE"])


def handler(event, context):
    connection_id = event["requestContext"]["connectionId"]

    # We don't know session_id from disconnect event alone.
    # Option 1: Store connection_id → session_id mapping in a GSI.
    # Option 2: Store mapping in ElastiCache (faster, ephemeral is OK).
    # Here we use the GSI approach.

    # Query GSI on connection_id to find the session
    resp = table.query(
        IndexName="GSI2-connection-lookup",
        KeyConditionExpression="connection_id = :cid",
        ExpressionAttributeValues={":cid": connection_id},
        Limit=1,
    )
    items = resp.get("Items", [])
    if not items:
        return {"statusCode": 200}

    session_pk = items[0]["PK"]

    table.update_item(
        Key={"PK": session_pk, "SK": "META"},
        UpdateExpression="SET #s = :disconnected, disconnected_at = :now",
        ExpressionAttributeNames={"#s": "status"},
        ExpressionAttributeValues={
            ":disconnected": "disconnected",
            ":now": int(time.time()),
        },
    )

    return {"statusCode": 200}

5. Lambda ↔ DynamoDB Error Handling Patterns

flowchart TD
    LAMBDA[Lambda Function] -->|Write| DDB[DynamoDB]

    DDB -->|200 OK| SUCCESS[✅ Continue]
    DDB -->|ConditionalCheckFailed| IDEMPOTENT["✅ Duplicate — Safe to Ignore"]
    DDB -->|ProvisionedThroughputExceeded| THROTTLE{Retry Strategy}
    DDB -->|InternalServerError| TRANSIENT{Retry Strategy}
    DDB -->|ValidationException| BUG["❌ Fix Code — Bad Request Shape"]
    DDB -->|ResourceNotFoundException| CONFIG["❌ Table Missing — Check Deployment"]

    THROTTLE -->|Attempt 1-3| BACKOFF["Exponential Backoff + Jitter<br/>SDK handles automatically"]
    BACKOFF -->|Still failing| DLQ["Send to SQS DLQ"]

    TRANSIENT -->|Attempt 1-3| BACKOFF

    DLQ --> RETRY_LAMBDA["Turn Writer Lambda<br/>(processes DLQ messages)"]
    RETRY_LAMBDA -->|Write| DDB

Critical Error Handling Code

"""
Robust DynamoDB write with proper error classification.
Use this pattern in every Lambda that writes to DynamoDB.
"""

from botocore.exceptions import ClientError
import random
import time

# Errors that are SAFE to retry
RETRYABLE_ERRORS = {
    "ProvisionedThroughputExceededException",
    "InternalServerError",
    "ServiceUnavailable",
    "RequestLimitExceeded",
    "ThrottlingException",
}

# Errors that mean "duplicate write" — safe to ignore
IDEMPOTENT_ERRORS = {
    "ConditionalCheckFailedException",
}

# Errors that need code or config fixes
FATAL_ERRORS = {
    "ValidationException",
    "ResourceNotFoundException",
    "AccessDeniedException",
}


def safe_dynamodb_write(table, operation: str, **kwargs) -> dict:
    """
    Wrapper for DynamoDB writes with proper error handling.

    Returns: {"status": "success"|"duplicate"|"failed", "response": ...}
    """
    max_retries = 3
    base_delay = 0.1  # 100ms

    for attempt in range(max_retries + 1):
        try:
            if operation == "put_item":
                resp = table.put_item(**kwargs)
            elif operation == "update_item":
                resp = table.update_item(**kwargs)
            elif operation == "delete_item":
                resp = table.delete_item(**kwargs)
            else:
                raise ValueError(f"Unknown operation: {operation}")

            return {"status": "success", "response": resp}

        except ClientError as e:
            error_code = e.response["Error"]["Code"]

            if error_code in IDEMPOTENT_ERRORS:
                return {"status": "duplicate", "response": None}

            if error_code in FATAL_ERRORS:
                raise  # Don't retry — fix the code

            if error_code in RETRYABLE_ERRORS and attempt < max_retries:
                delay = base_delay * (2 ** attempt) + random.uniform(0, 0.1)
                time.sleep(delay)
                continue

            raise  # Exhausted retries

    return {"status": "failed", "response": None}

6. Lambda Cold Start Impact on DynamoDB

gantt
    title Lambda Cold Start vs Warm Invocation Timeline
    dateFormat X
    axisFormat %L ms

    section Cold Start (worst case)
    Init runtime         :cold1, 0, 300
    Import boto3         :cold2, after cold1, 200
    Create DDB resource  :cold3, after cold2, 50
    Handler starts       :cold4, after cold3, 10
    DynamoDB Query       :cold5, after cold4, 15
    DynamoDB PutItem     :cold6, after cold5, 10
    Bedrock call         :cold7, after cold6, 800
    DynamoDB PutItem     :cold8, after cold7, 10
    Total                :milestone, after cold8, 0

    section Warm Start (normal)
    Handler starts       :warm1, 0, 5
    DynamoDB Query       :warm2, after warm1, 8
    DynamoDB PutItem     :warm3, after warm2, 6
    Bedrock call         :warm4, after warm3, 800
    DynamoDB PutItem     :warm5, after warm4, 6
    Total                :milestone, after warm5, 0

Cold Start Mitigation

"""
Module-level initialization: DynamoDB client created OUTSIDE handler
so it persists across warm invocations.
"""

import boto3
import os

# ✅ These run ONCE on cold start, then reuse on warm invocations
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ["SESSION_TABLE"])


def handler(event, context):
    # ✅ table is already initialized — no cold-start penalty here
    resp = table.get_item(Key={"PK": "SESSION#abc", "SK": "META"})
    ...

7. Lambda Concurrency vs DynamoDB Capacity Planning

graph LR
    subgraph Traffic["User Traffic"]
        REQ["1000 req/sec at peak"]
    end

    subgraph Lambda_Config["Lambda Config"]
        CONC["Reserved Concurrency: 500"]
        MEM["Memory: 512 MB"]
        TIMEOUT["Timeout: 30s"]
    end

    subgraph DDB_Capacity["DynamoDB Capacity"]
        WCU["WCU needed: 2 writes × 1000 req = 2000 WCU"]
        RCU["RCU needed: 2 reads × 1000 req = 2000 RCU<br/>(eventually consistent halves this to 1000)"]
    end

    REQ --> Lambda_Config
    Lambda_Config --> DDB_Capacity

Capacity Formula for Lambda + DynamoDB

Per message:
  - 2 Queries     (context load + summary load)  → 2 RCU (eventually consistent, items < 4KB)
  - 2 PutItems    (user turn + assistant turn)    → 2 WCU (items < 1KB = 1 WCU each)
  - 1 UpdateItem  (META counter update)           → 1 WCU

Total per message:
  - Reads:  2 RCU  (eventually consistent)
  - Writes: 3 WCU

At 1000 messages/sec:
  - Reads:  2,000 RCU
  - Writes: 3,000 WCU

With DynamoDB on-demand mode — auto-scales, no pre-provisioning needed.
With provisioned mode — set to 3,600 WCU and 2,400 RCU (20% buffer).

8. Lambda + DynamoDB Transactions — When You Need Them

flowchart TD
    Q{"Do you need atomicity<br/>across multiple items?"}
    Q -->|No, single item| COND["Use ConditionExpression<br/>(cheaper, faster)"]
    Q -->|Yes, 2+ items must<br/>succeed or all fail| TXN{How many items?}

    TXN -->|"2–100 items"| USE_TXN["Use TransactWriteItems<br/>2× WCU cost per item"]
    TXN -->|">100 items"| BATCH["Break into BatchWriteItem<br/>+ application-level compensation"]

    COND --> EX1["Example: Idempotent turn write<br/>ConditionExpression on response_id"]
    USE_TXN --> EX2["Example: Transfer session to human agent<br/>Update META status + Write HANDOFF item atomically"]
    BATCH --> EX3["Example: Bulk import historical turns<br/>25-item batches with retry"]

Transaction Example: Human Handoff

"""
When transferring a chat session to a human agent, we MUST atomically:
1. Update session META to status='handoff'
2. Create a HANDOFF item with agent details
3. Write a system turn announcing the handoff

If any one fails, none should apply.
"""

import time
import uuid
import boto3

client = boto3.client("dynamodb")


def transfer_to_human(session_id: str, agent_id: str, reason: str):
    now = int(time.time())
    now_ms = int(time.time() * 1000)
    pk = f"SESSION#{session_id}"

    client.transact_write_items(
        TransactItems=[
            # 1. Update META status
            {
                "Update": {
                    "TableName": "manga-assist-sessions",
                    "Key": {
                        "PK": {"S": pk},
                        "SK": {"S": "META"},
                    },
                    "UpdateExpression": "SET #s = :handoff, handoff_at = :now, agent_id = :agent",
                    "ExpressionAttributeNames": {"#s": "status"},
                    "ExpressionAttributeValues": {
                        ":handoff": {"S": "handoff"},
                        ":now": {"N": str(now)},
                        ":agent": {"S": agent_id},
                    },
                    # Guard: only if session is currently active
                    "ConditionExpression": "#s = :active",
                    "ExpressionAttributeValues": {
                        ":handoff": {"S": "handoff"},
                        ":now": {"N": str(now)},
                        ":agent": {"S": agent_id},
                        ":active": {"S": "active"},
                    },
                }
            },
            # 2. Create HANDOFF record
            {
                "Put": {
                    "TableName": "manga-assist-sessions",
                    "Item": {
                        "PK": {"S": pk},
                        "SK": {"S": f"HANDOFF#{now_ms}"},
                        "agent_id": {"S": agent_id},
                        "reason": {"S": reason},
                        "timestamp": {"N": str(now)},
                    },
                }
            },
            # 3. Write system message turn
            {
                "Put": {
                    "TableName": "manga-assist-sessions",
                    "Item": {
                        "PK": {"S": pk},
                        "SK": {"S": f"TURN#{now_ms + 1}"},
                        "role": {"S": "system"},
                        "content_compressed": {"B": b"Transferring to human agent..."},
                        "response_id": {"S": str(uuid.uuid4())},
                        "ttl": {"N": str(now + 86400)},
                    },
                }
            },
        ]
    )

9. Common Mistakes Teams Make

Mistake Why It Happens What Goes Wrong Fix
Creating a new DynamoDB client inside the handler Copy-pasted from a tutorial Cold start penalty on EVERY invocation, connection pool wasted Initialize client at module level
Not compressing content before writing "It's just text" Hit 400KB item limit on long messages; higher WCU cost GZIP compress in _write_turn()
Using Scan to find a session Don't understand Query vs Scan Full table scan, costs RCU proportional to ALL data Use Query with PK + SK prefix
Ignoring ConditionalCheckFailedException Don't understand idempotency Lambda retries cause error logs, alerts fire Catch and treat as success
Setting Lambda timeout < DynamoDB timeout Default Lambda timeout is 3s Lambda dies mid-write, orphaned partial state Set Lambda timeout to 30s for message handler
Not setting ScanIndexForward=False Don't know the parameter Loading oldest turns instead of latest Always False when you want "latest N"
Using BatchWriteItem for critical writes "Batch is faster" No conditional expressions, no idempotency guard Use individual PutItem with conditions for critical paths
Forgetting to handle GoneException on WebSocket push Happy-path thinking Lambda crashes when client disconnected during processing Catch GoneException, log and continue

10. Critical Things to Remember

For Interviews

  1. Lambda + DynamoDB is stateless compute + stateful storage — Lambda holds no state between invocations. Every invocation reads fresh from DynamoDB.

  2. Module-level initialization is NOT optional — Creating boto3 clients inside the handler is a production-killing mistake. Always initialize at module scope.

  3. Every write must be idempotent — Lambda can retry. API Gateway can retry. Use ConditionExpression with a response_id on every PutItem.

  4. Query, never Scan — If you're using Scan in a Lambda, your data model is wrong. Design your PK/SK so Query covers every access pattern.

  5. DynamoDB on-demand mode is your friend for Lambda — Provisioned capacity can throttle during spikes. On-demand auto-scales with Lambda concurrency.

For Production

  1. Set Lambda reserved concurrency — Unbounded Lambda concurrency can overwhelm DynamoDB provisioned capacity. Match them.

  2. Use SQS DLQ for failed writes — Never silently drop a user's message. Failed DynamoDB writes go to SQS, a retry Lambda processes them.

  3. Monitor UserErrors and SystemErrors togetherUserErrors includes ConditionalCheckFailedException (idempotent duplicates). Don't alarm on those. Alarm on SystemErrors.

  4. Compress before storing — GZIP text content. A 2KB message becomes ~400 bytes. Saves WCU and storage cost.

  5. The Bedrock call dominates latency (800ms+), not DynamoDB (5-10ms) — Don't over-optimize DynamoDB reads. Optimize the LLM call (prompt size, model choice, streaming).