LOCAL PREVIEW View on GitHub

DynamoDB Streams & Event-Driven Architecture — Low-Level Design

Project: MangaAssist (Amazon-style chatbot) Scope: How DynamoDB Streams trigger downstream services — summary generation, analytics, notifications, audit, and cleanup.


1. What Are DynamoDB Streams and Why They Matter

DynamoDB Streams capture a time-ordered log of every item-level change (INSERT, MODIFY, REMOVE) in a table. This turns DynamoDB from a "dumb database" into an event source that powers real-time pipelines.

In MangaAssist, Streams are how we: - Auto-generate conversation summaries every N turns - Push analytics events to Kinesis/OpenSearch - Trigger notifications when sessions are handed off to humans - Audit every data change for compliance


2. Full Event-Driven Architecture

flowchart TB
    subgraph DynamoDB
        TABLE[(manga-assist-sessions)]
        STREAMS[DynamoDB Streams<br/>NEW_AND_OLD_IMAGES]
    end

    TABLE -->|Item changes| STREAMS

    subgraph Stream_Consumers["Stream Consumers (Lambda)"]
        PIPE_FILTER["Event Router Lambda<br/>(Pipes & Filters)"]
    end

    STREAMS --> PIPE_FILTER

    subgraph Downstream_Targets["Downstream Targets"]
        SUMMARY["Summary Generator<br/>Lambda"]
        ANALYTICS["Analytics Pipeline<br/>Kinesis Data Firehose"]
        NOTIFY["Notification Service<br/>SNS → SQS → Lambda"]
        AUDIT["Audit Logger<br/>S3 + CloudWatch"]
        EVENTBRIDGE["EventBridge<br/>Custom Event Bus"]
    end

    PIPE_FILTER -->|"TURN items, every 10th turn"| SUMMARY
    PIPE_FILTER -->|"All changes"| ANALYTICS
    PIPE_FILTER -->|"HANDOFF items"| NOTIFY
    PIPE_FILTER -->|"All changes"| AUDIT
    PIPE_FILTER -->|"META status changes"| EVENTBRIDGE

    subgraph Summary_Flow["Summary Generation"]
        SUMMARY -->|"Query last 10 turns"| TABLE
        SUMMARY -->|"Call LLM"| BEDROCK[Amazon Bedrock]
        SUMMARY -->|"PutItem SUMMARY#"| TABLE
    end

    subgraph Analytics_Flow["Analytics"]
        ANALYTICS --> FIREHOSE[Kinesis Firehose]
        FIREHOSE --> S3_ANALYTICS[S3 Analytics Bucket]
        FIREHOSE --> OPENSEARCH[OpenSearch]
    end

    subgraph Notification_Flow["Notifications"]
        NOTIFY --> SNS[SNS Topic]
        SNS --> SQS_AGENT[SQS → Agent Dashboard]
        SNS --> SQS_CUSTOMER[SQS → Customer Email]
    end

    subgraph Audit_Flow["Audit"]
        AUDIT --> S3_AUDIT[S3 Audit Bucket<br/>Immutable]
        AUDIT --> CW[CloudWatch Logs<br/>Structured JSON]
    end

3. DynamoDB Streams Record Anatomy

classDiagram
    class StreamRecord {
        +String eventID
        +String eventName : INSERT | MODIFY | REMOVE
        +String eventSource : "aws:dynamodb"
        +Map~String,AttributeValue~ dynamodb.Keys
        +Map~String,AttributeValue~ dynamodb.NewImage
        +Map~String,AttributeValue~ dynamodb.OldImage
        +String dynamodb.StreamViewType
        +Number dynamodb.ApproximateCreationDateTime
        +Number dynamodb.SizeBytes
        +String dynamodb.SequenceNumber
    }

    class StreamViewTypes {
        KEYS_ONLY : only PK/SK
        NEW_IMAGE : full item after change
        OLD_IMAGE : full item before change
        NEW_AND_OLD_IMAGES : both before and after
    }

    StreamRecord --> StreamViewTypes : uses

Choose the Right StreamViewType

View Type Use Case in MangaAssist Cost Impact
KEYS_ONLY Cheap — just know something changed, query for details Lowest
NEW_IMAGE Summary generator needs the new turn content Medium
OLD_IMAGE Audit needs to know what was overwritten Medium
NEW_AND_OLD_IMAGES Our choice — audit needs before/after, analytics needs new state Highest (2× data)

4. Event Router Lambda — The Central Dispatcher

flowchart TD
    STREAM["DynamoDB Stream Record"]
    ROUTER["Event Router Lambda"]

    STREAM --> ROUTER

    ROUTER --> CHECK_TYPE{"What item type?<br/>(parse SK prefix)"}

    CHECK_TYPE -->|"SK starts with TURN#"| TURN_LOGIC{"Is this the Nth turn?<br/>(turn_count % 10 == 0)"}
    CHECK_TYPE -->|"SK starts with META"| META_LOGIC{"What changed?<br/>(compare old vs new)"}
    CHECK_TYPE -->|"SK starts with HANDOFF#"| HANDOFF_LOGIC["Trigger notification"]
    CHECK_TYPE -->|"SK starts with SUMMARY#"| SUMMARY_LOGIC["Log only — no action"]

    TURN_LOGIC -->|"Yes, 10th turn"| INVOKE_SUMMARY["Invoke Summary Generator"]
    TURN_LOGIC -->|"No"| SEND_ANALYTICS["Send to Analytics only"]

    META_LOGIC -->|"status: active → handoff"| SEND_HANDOFF_EVENT["Publish to EventBridge"]
    META_LOGIC -->|"status: active → disconnected"| SEND_DISCONNECT_EVENT["Publish to EventBridge"]
    META_LOGIC -->|"Other update"| SEND_ANALYTICS

    HANDOFF_LOGIC --> SNS_PUBLISH["Publish to SNS"]

Event Router Code

"""
Lambda: manga-assist-stream-router
Trigger: DynamoDB Streams (manga-assist-sessions table)
Purpose: Classify stream events and fan out to appropriate services
"""

import json
import os
import boto3
from typing import Any

lambda_client = boto3.client("lambda")
sns_client = boto3.client("sns")
firehose_client = boto3.client("firehose")
eventbridge_client = boto3.client("events")

SUMMARY_FUNCTION = os.environ["SUMMARY_FUNCTION_NAME"]
SNS_HANDOFF_TOPIC = os.environ["SNS_HANDOFF_TOPIC_ARN"]
FIREHOSE_STREAM = os.environ["FIREHOSE_DELIVERY_STREAM"]
EVENT_BUS_NAME = os.environ.get("EVENT_BUS_NAME", "manga-assist-events")


def handler(event, context):
    """Process a batch of DynamoDB Stream records."""
    for record in event["Records"]:
        event_name = record["eventName"]  # INSERT, MODIFY, REMOVE
        new_image = record["dynamodb"].get("NewImage", {})
        old_image = record["dynamodb"].get("OldImage", {})
        keys = record["dynamodb"]["Keys"]

        sk = keys["SK"]["S"]
        pk = keys["PK"]["S"]
        session_id = pk.replace("SESSION#", "")

        # ── Route based on SK prefix ──
        if sk.startswith("TURN#"):
            _handle_turn_event(event_name, session_id, new_image, record)

        elif sk == "META":
            _handle_meta_event(event_name, session_id, new_image, old_image)

        elif sk.startswith("HANDOFF#"):
            _handle_handoff_event(session_id, new_image)

        # ── Always send to analytics (all events) ──
        _send_to_analytics(record)

    return {"statusCode": 200, "batchItemFailures": []}


def _handle_turn_event(event_name: str, session_id: str, new_image: dict, raw_record: dict):
    """On every new TURN, check if it's time to generate a summary."""
    if event_name != "INSERT":
        return

    # We need to check turn_count from META — but we don't have it in the TURN record.
    # Option A: Async invoke summary lambda, let IT check the count.
    # Option B: Read META here (adds a read per turn — expensive at scale).
    # We choose Option A — the summary lambda decides whether to actually run.

    lambda_client.invoke(
        FunctionName=SUMMARY_FUNCTION,
        InvocationType="Event",  # Async — don't wait
        Payload=json.dumps({
            "session_id": session_id,
            "trigger": "stream",
            "turn_sk": new_image["SK"]["S"],
        }),
    )


def _handle_meta_event(event_name: str, session_id: str, new_image: dict, old_image: dict):
    """Detect status transitions and publish to EventBridge."""
    if event_name != "MODIFY":
        return

    old_status = old_image.get("status", {}).get("S", "unknown")
    new_status = new_image.get("status", {}).get("S", "unknown")

    if old_status == new_status:
        return  # No status change

    eventbridge_client.put_events(
        Entries=[{
            "Source": "mangaassist.sessions",
            "DetailType": "SessionStatusChanged",
            "Detail": json.dumps({
                "session_id": session_id,
                "old_status": old_status,
                "new_status": new_status,
                "customer_id": new_image.get("customer_id", {}).get("S", "unknown"),
                "timestamp": new_image.get("updated_at", {}).get("N", "0"),
            }),
            "EventBusName": EVENT_BUS_NAME,
        }]
    )


def _handle_handoff_event(session_id: str, new_image: dict):
    """Notify human agents when a handoff record is created."""
    agent_id = new_image.get("agent_id", {}).get("S", "unassigned")
    reason = new_image.get("reason", {}).get("S", "no reason provided")

    sns_client.publish(
        TopicArn=SNS_HANDOFF_TOPIC,
        Subject=f"Chat Handoff: {session_id}",
        Message=json.dumps({
            "session_id": session_id,
            "agent_id": agent_id,
            "reason": reason,
        }),
        MessageAttributes={
            "event_type": {"DataType": "String", "StringValue": "handoff"},
            "agent_id": {"DataType": "String", "StringValue": agent_id},
        },
    )


def _send_to_analytics(record: dict):
    """Forward every stream record to Kinesis Firehose for analytics."""
    firehose_client.put_record(
        DeliveryStreamName=FIREHOSE_STREAM,
        Record={
            "Data": json.dumps({
                "event_name": record["eventName"],
                "keys": record["dynamodb"]["Keys"],
                "approximate_timestamp": record["dynamodb"].get("ApproximateCreationDateTime"),
                "size_bytes": record["dynamodb"].get("SizeBytes"),
            }) + "\n"  # Newline-delimited JSON for Firehose
        },
    )

5. Summary Generator — Stream-Triggered Intelligence

sequenceDiagram
    participant S as DynamoDB Stream
    participant R as Event Router
    participant SG as Summary Generator Lambda
    participant D as DynamoDB Table
    participant B as Bedrock LLM

    S->>R: New TURN# INSERT event
    R->>SG: Async invoke (session_id)

    Note over SG: Step 1 — Check if summary needed
    SG->>D: GetItem(PK=SESSION#abc, SK=META)<br/>Read turn_count
    D-->>SG: turn_count = 30

    alt turn_count % 10 != 0
        Note over SG: Not time yet — exit
    else turn_count % 10 == 0
        Note over SG: Step 2 — Load turns since last summary
        SG->>D: Query(PK=SESSION#abc, SK between TURN#ts_start and TURN#ts_end)
        D-->>SG: 10 turns

        Note over SG: Step 3 — Load previous summary for context
        SG->>D: Query(PK=SESSION#abc, SK begins_with SUMMARY#, Limit=1, Reverse)
        D-->>SG: Previous summary text

        Note over SG: Step 4 — Generate new rolling summary
        SG->>B: "Summarize this conversation segment<br/>given the previous summary as context"
        B-->>SG: New summary text

        Note over SG: Step 5 — Write summary item
        SG->>D: PutItem(PK=SESSION#abc, SK=SUMMARY#3,<br/>summary_text, window_start, window_end)
    end

Summary Generator Code

"""
Lambda: manga-assist-summary-generator
Trigger: Async invocation from Event Router Lambda
Purpose: Generate rolling conversation summaries every 10 turns
"""

import json
import os
import time
import zlib
import boto3

dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ["SESSION_TABLE"])
bedrock = boto3.client("bedrock-runtime")

SUMMARY_INTERVAL = 10  # Generate summary every N turns


def handler(event, context):
    session_id = event["session_id"]
    pk = f"SESSION#{session_id}"

    # ── Step 1: Check turn count ──
    meta = table.get_item(
        Key={"PK": pk, "SK": "META"},
        ProjectionExpression="turn_count",
    ).get("Item", {})

    turn_count = meta.get("turn_count", 0)

    if turn_count < SUMMARY_INTERVAL or turn_count % SUMMARY_INTERVAL != 0:
        return {"status": "skipped", "reason": f"turn_count={turn_count}, not at interval"}

    summary_number = turn_count // SUMMARY_INTERVAL

    # ── Step 2: Load the turns since the last summary ──
    all_turns = _query_all_turns(pk)
    # Pick the last SUMMARY_INTERVAL turns
    recent_turns = all_turns[-SUMMARY_INTERVAL:]

    if not recent_turns:
        return {"status": "skipped", "reason": "no turns found"}

    # ── Step 3: Load previous summary ──
    prev_summary = _load_latest_summary(pk)

    # ── Step 4: Generate summary via LLM ──
    turn_text = _format_turns_for_prompt(recent_turns)
    prompt = _build_summary_prompt(prev_summary, turn_text)
    new_summary = _call_bedrock(prompt)

    # ── Step 5: Write summary item ──
    window_start = recent_turns[0]["SK"]
    window_end = recent_turns[-1]["SK"]

    table.put_item(
        Item={
            "PK": pk,
            "SK": f"SUMMARY#{summary_number}",
            "summary_text": new_summary,
            "window_start": window_start,
            "window_end": window_end,
            "turn_count_at_summary": turn_count,
            "created_at": int(time.time()),
            "ttl": int(time.time()) + 86400,
        },
        ConditionExpression="attribute_not_exists(SK)",  # Don't overwrite
    )

    return {"status": "generated", "summary_number": summary_number}


def _query_all_turns(pk: str) -> list:
    """Query all TURN items for a session (paginated)."""
    items = []
    params = {
        "KeyConditionExpression": "PK = :pk AND begins_with(SK, :prefix)",
        "ExpressionAttributeValues": {":pk": pk, ":prefix": "TURN#"},
        "ScanIndexForward": True,
    }
    while True:
        resp = table.query(**params)
        items.extend(resp.get("Items", []))
        if "LastEvaluatedKey" not in resp:
            break
        params["ExclusiveStartKey"] = resp["LastEvaluatedKey"]
    return items


def _load_latest_summary(pk: str) -> str | None:
    resp = table.query(
        KeyConditionExpression="PK = :pk AND begins_with(SK, :prefix)",
        ExpressionAttributeValues={":pk": pk, ":prefix": "SUMMARY#"},
        ScanIndexForward=False,
        Limit=1,
    )
    items = resp.get("Items", [])
    return items[0].get("summary_text") if items else None


def _format_turns_for_prompt(turns: list) -> str:
    lines = []
    for t in turns:
        content = t.get("content_compressed", b"")
        if isinstance(content, bytes) and content:
            text = zlib.decompress(content).decode("utf-8")
        else:
            text = str(content)
        lines.append(f"{t.get('role', 'unknown')}: {text}")
    return "\n".join(lines)


def _build_summary_prompt(prev_summary: str | None, turn_text: str) -> str:
    context = f"Previous summary: {prev_summary}\n\n" if prev_summary else ""
    return (
        f"{context}"
        f"New conversation segment:\n{turn_text}\n\n"
        f"Generate a concise summary (3-5 sentences) that captures the key topics, "
        f"decisions, and any unresolved questions from this conversation segment. "
        f"Incorporate context from the previous summary if available."
    )


def _call_bedrock(prompt: str) -> str:
    response = bedrock.invoke_model(
        modelId="anthropic.claude-3-haiku-20240307-v1:0",  # Use Haiku for cost efficiency
        contentType="application/json",
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 512,
            "messages": [{"role": "user", "content": prompt}],
        }),
    )
    result = json.loads(response["body"].read())
    return result["content"][0]["text"]

6. EventBridge Integration — Decoupled Event Routing

flowchart LR
    subgraph DynamoDB_Side["DynamoDB Side"]
        ROUTER["Stream Router Lambda"]
    end

    subgraph EventBridge["EventBridge (manga-assist-events)"]
        EB[Custom Event Bus]
    end

    ROUTER -->|PutEvents| EB

    subgraph Rules["EventBridge Rules"]
        R1["Rule: SessionStarted<br/>detail-type = SessionStatusChanged<br/>new_status = active"]
        R2["Rule: SessionHandoff<br/>detail-type = SessionStatusChanged<br/>new_status = handoff"]
        R3["Rule: SessionEnded<br/>detail-type = SessionStatusChanged<br/>new_status = disconnected"]
        R4["Rule: AllEvents<br/>detail-type = *<br/>→ Archive"]
    end

    EB --> R1
    EB --> R2
    EB --> R3
    EB --> R4

    subgraph Targets["Targets"]
        T1["Metrics Lambda<br/>(count active sessions)"]
        T2["Agent Queue SQS<br/>(notify available agents)"]
        T3["Cleanup Lambda<br/>(schedule TTL review)"]
        T4["S3 Archive<br/>(event replay)"]
    end

    R1 --> T1
    R2 --> T2
    R3 --> T3
    R4 --> T4

EventBridge Rule Definition (CloudFormation)

# EventBridge rule to route handoff events to agent notification queue
HandoffEventRule:
  Type: AWS::Events::Rule
  Properties:
    Name: manga-assist-handoff-rule
    EventBusName: manga-assist-events
    EventPattern:
      source:
        - mangaassist.sessions
      detail-type:
        - SessionStatusChanged
      detail:
        new_status:
          - handoff
    Targets:
      - Id: agent-notification-queue
        Arn: !GetAtt AgentNotificationQueue.Arn
      - Id: handoff-metrics-lambda
        Arn: !GetAtt HandoffMetricsFunction.Arn

7. SNS + SQS Fan-Out Pattern for Notifications

flowchart TD
    subgraph Publisher
        ROUTER["Stream Router Lambda"]
    end

    subgraph SNS
        TOPIC["SNS Topic<br/>manga-assist-handoffs"]
    end

    ROUTER -->|Publish| TOPIC

    subgraph Subscribers["SQS Subscribers"]
        Q1["SQS: agent-dashboard-queue<br/>Filter: agent_id exists"]
        Q2["SQS: customer-notification-queue<br/>Filter: all handoffs"]
        Q3["SQS: escalation-queue<br/>Filter: reason = 'escalated'"]
    end

    TOPIC --> Q1
    TOPIC --> Q2
    TOPIC --> Q3

    subgraph Consumers["Lambda Consumers"]
        L1["Agent Dashboard Updater"]
        L2["Customer Email Sender"]
        L3["Escalation Manager"]
    end

    Q1 --> L1
    Q2 --> L2
    Q3 --> L3

    subgraph DLQ["Dead Letter Queues"]
        DLQ1["DLQ for agent-dashboard"]
        DLQ2["DLQ for customer-notification"]
        DLQ3["DLQ for escalation"]
    end

    Q1 -.->|"After 3 failures"| DLQ1
    Q2 -.->|"After 3 failures"| DLQ2
    Q3 -.->|"After 3 failures"| DLQ3

SNS Subscription Filter Policy

"""
Create SNS subscription with filter policy
so agents only receive handoffs assigned to them.
"""

import boto3

sns = boto3.client("sns")

# Agent-specific subscription — only receives their handoffs
sns.subscribe(
    TopicArn="arn:aws:sns:us-east-1:123456789:manga-assist-handoffs",
    Protocol="sqs",
    Endpoint="arn:aws:sqs:us-east-1:123456789:agent-dashboard-queue",
    Attributes={
        "FilterPolicy": json.dumps({
            "event_type": ["handoff"],
            # Agent can optionally filter to only their handoffs
            # "agent_id": ["agent-007"]
        }),
        "FilterPolicyScope": "MessageAttributes",
    },
)

8. Streams Processing — Batch Failure Handling

flowchart TD
    STREAM["DynamoDB Stream<br/>(batch of 10 records)"]
    LAMBDA["Router Lambda"]

    STREAM --> LAMBDA

    LAMBDA --> PROCESS{"Process each record"}

    PROCESS -->|"Record 1-5: Success"| OK["✅ Processed"]
    PROCESS -->|"Record 6: FAILS"| FAIL["❌ Error"]
    PROCESS -->|"Record 7-10: Success"| OK

    FAIL --> STRATEGY{"Failure Strategy?"}

    STRATEGY -->|"Bad: Return error"| ENTIRE_BATCH_RETRY["❌ Entire batch retries<br/>Records 1-5 processed AGAIN"]

    STRATEGY -->|"Good: Partial batch failure"| PARTIAL["✅ Return batchItemFailures<br/>Only record 6 retries"]

    PARTIAL --> RESPONSE["Response:<br/>{batchItemFailures:<br/>[{itemIdentifier: seq#6}]}"]

Partial Batch Failure Code

"""
Proper DynamoDB Streams batch processing with partial failure reporting.
This prevents re-processing successful records when one fails.
"""


def handler(event, context):
    batch_item_failures = []

    for record in event["Records"]:
        try:
            _process_single_record(record)
        except Exception as e:
            print(f"Failed to process record {record['eventID']}: {e}")
            # Report ONLY the failed record
            batch_item_failures.append({
                "itemIdentifier": record["dynamodb"]["SequenceNumber"]
            })

    # Return partial failures — only failed records will be retried
    return {
        "batchItemFailures": batch_item_failures
    }


def _process_single_record(record: dict):
    """Process one stream record. Raise on failure."""
    event_name = record["eventName"]
    sk = record["dynamodb"]["Keys"]["SK"]["S"]

    if sk.startswith("TURN#") and event_name == "INSERT":
        _handle_turn_event(record)
    elif sk == "META" and event_name == "MODIFY":
        _handle_meta_event(record)
    # ... etc

9. Stream Record Retention and Ordering Guarantees

flowchart LR
    subgraph Guarantees["DynamoDB Streams Guarantees"]
        G1["✅ Exactly-once delivery<br/>of each change to the stream"]
        G2["✅ Ordered within a partition key<br/>(all changes to SESSION#abc are in order)"]
        G3["❌ NOT ordered across partition keys<br/>(SESSION#abc and SESSION#xyz may interleave)"]
        G4["⏰ Records retained for 24 hours<br/>(then auto-deleted from stream)"]
    end

    subgraph Consumer_Implications["What This Means for Our Lambda"]
        I1["Session-level ordering is guaranteed<br/>→ Summary for turn 20 always sees turns 1-19"]
        I2["Cross-session ordering is NOT guaranteed<br/>→ Don't rely on global order"]
        I3["Must process within 24h or lose events<br/>→ Monitor IteratorAge metric"]
        I4["Lambda may see the same record twice<br/>on retry → Make handlers idempotent"]
    end

    G1 --> I4
    G2 --> I1
    G3 --> I2
    G4 --> I3

10. Common Mistakes Teams Make with Streams

Mistake Why It Happens Consequence Fix
Using NEW_IMAGE when you need old values "I only need the new data" Can't detect what changed (old vs new status) Use NEW_AND_OLD_IMAGES for event routing
Not enabling partial batch failure reporting Default behavior retries entire batch Successful records re-processed, duplicate side effects Return batchItemFailures array
Synchronous Lambda invocation from stream "I want to know if it succeeded" Stream Lambda blocks, processing falls behind Use async Event invocation for downstream
Stream Lambda does too much work "One Lambda to rule them all" Slow processing → IteratorAge grows → events pile up Keep router thin, fan out to specialized Lambdas
Not monitoring IteratorAge Don't know the metric exists Stream falls behind silently, events delayed by hours Alarm on IteratorAge > 60000ms
Filtering in Lambda instead of EventBridge Pipes "I'll just add an if-statement" Lambda invoked for every event even when most are filtered Use EventBridge Pipes filtering for high-volume streams
Not handling REMOVE events from TTL deletion "TTL deletes just happen automatically" No audit trail for expired sessions Process REMOVE events and log to audit

11. Critical Things to Remember

For Interviews

  1. DynamoDB Streams = Change Data Capture (CDC) — It's the same concept as PostgreSQL WAL or MySQL binlog, but serverless and managed.

  2. Ordering is per-partition-key, NOT global — Changes to SESSION#abc arrive in order, but you cannot assume ordering between different sessions.

  3. Exactly-once delivery to the stream, at-least-once to Lambda — The stream record exists exactly once, but Lambda may receive it again on retry. Make handlers idempotent.

  4. 24-hour retention — If your consumer is down for >24 hours, you lose events forever. Monitor IteratorAge.

  5. Streams don't affect table performance — Reading from a stream does NOT consume RCU from the table. It's a separate read path.

For Production

  1. NEW_AND_OLD_IMAGES costs twice the stream storage — but is almost always worth it for audit and change detection.

  2. Set MaximumBatchingWindowInSeconds — Don't process one record at a time. Batch 10-100 records with a 5-second window for throughput.

  3. Alarm on IteratorAge > 60 seconds — This means your Lambda is falling behind. Increase concurrency or optimize processing.

  4. Use FunctionResponseTypes: [ReportBatchItemFailures] in CloudFormation/SAM — Without this, the Lambda runtime doesn't know to read the batchItemFailures response.

  5. DynamoDB Streams + EventBridge Pipes is the modern pattern — Pipes can filter, enrich, and transform stream records before they reach your Lambda, reducing cold invocations.