DynamoDB Streams & Event-Driven Architecture — Low-Level Design
Project: MangaAssist (Amazon-style chatbot) Scope: How DynamoDB Streams trigger downstream services — summary generation, analytics, notifications, audit, and cleanup.
1. What Are DynamoDB Streams and Why They Matter
DynamoDB Streams capture a time-ordered log of every item-level change (INSERT, MODIFY, REMOVE) in a table. This turns DynamoDB from a "dumb database" into an event source that powers real-time pipelines.
In MangaAssist, Streams are how we: - Auto-generate conversation summaries every N turns - Push analytics events to Kinesis/OpenSearch - Trigger notifications when sessions are handed off to humans - Audit every data change for compliance
2. Full Event-Driven Architecture
flowchart TB
subgraph DynamoDB
TABLE[(manga-assist-sessions)]
STREAMS[DynamoDB Streams<br/>NEW_AND_OLD_IMAGES]
end
TABLE -->|Item changes| STREAMS
subgraph Stream_Consumers["Stream Consumers (Lambda)"]
PIPE_FILTER["Event Router Lambda<br/>(Pipes & Filters)"]
end
STREAMS --> PIPE_FILTER
subgraph Downstream_Targets["Downstream Targets"]
SUMMARY["Summary Generator<br/>Lambda"]
ANALYTICS["Analytics Pipeline<br/>Kinesis Data Firehose"]
NOTIFY["Notification Service<br/>SNS → SQS → Lambda"]
AUDIT["Audit Logger<br/>S3 + CloudWatch"]
EVENTBRIDGE["EventBridge<br/>Custom Event Bus"]
end
PIPE_FILTER -->|"TURN items, every 10th turn"| SUMMARY
PIPE_FILTER -->|"All changes"| ANALYTICS
PIPE_FILTER -->|"HANDOFF items"| NOTIFY
PIPE_FILTER -->|"All changes"| AUDIT
PIPE_FILTER -->|"META status changes"| EVENTBRIDGE
subgraph Summary_Flow["Summary Generation"]
SUMMARY -->|"Query last 10 turns"| TABLE
SUMMARY -->|"Call LLM"| BEDROCK[Amazon Bedrock]
SUMMARY -->|"PutItem SUMMARY#"| TABLE
end
subgraph Analytics_Flow["Analytics"]
ANALYTICS --> FIREHOSE[Kinesis Firehose]
FIREHOSE --> S3_ANALYTICS[S3 Analytics Bucket]
FIREHOSE --> OPENSEARCH[OpenSearch]
end
subgraph Notification_Flow["Notifications"]
NOTIFY --> SNS[SNS Topic]
SNS --> SQS_AGENT[SQS → Agent Dashboard]
SNS --> SQS_CUSTOMER[SQS → Customer Email]
end
subgraph Audit_Flow["Audit"]
AUDIT --> S3_AUDIT[S3 Audit Bucket<br/>Immutable]
AUDIT --> CW[CloudWatch Logs<br/>Structured JSON]
end
3. DynamoDB Streams Record Anatomy
classDiagram
class StreamRecord {
+String eventID
+String eventName : INSERT | MODIFY | REMOVE
+String eventSource : "aws:dynamodb"
+Map~String,AttributeValue~ dynamodb.Keys
+Map~String,AttributeValue~ dynamodb.NewImage
+Map~String,AttributeValue~ dynamodb.OldImage
+String dynamodb.StreamViewType
+Number dynamodb.ApproximateCreationDateTime
+Number dynamodb.SizeBytes
+String dynamodb.SequenceNumber
}
class StreamViewTypes {
KEYS_ONLY : only PK/SK
NEW_IMAGE : full item after change
OLD_IMAGE : full item before change
NEW_AND_OLD_IMAGES : both before and after
}
StreamRecord --> StreamViewTypes : uses
Choose the Right StreamViewType
| View Type | Use Case in MangaAssist | Cost Impact |
|---|---|---|
KEYS_ONLY |
Cheap — just know something changed, query for details | Lowest |
NEW_IMAGE |
Summary generator needs the new turn content | Medium |
OLD_IMAGE |
Audit needs to know what was overwritten | Medium |
NEW_AND_OLD_IMAGES |
Our choice — audit needs before/after, analytics needs new state | Highest (2× data) |
4. Event Router Lambda — The Central Dispatcher
flowchart TD
STREAM["DynamoDB Stream Record"]
ROUTER["Event Router Lambda"]
STREAM --> ROUTER
ROUTER --> CHECK_TYPE{"What item type?<br/>(parse SK prefix)"}
CHECK_TYPE -->|"SK starts with TURN#"| TURN_LOGIC{"Is this the Nth turn?<br/>(turn_count % 10 == 0)"}
CHECK_TYPE -->|"SK starts with META"| META_LOGIC{"What changed?<br/>(compare old vs new)"}
CHECK_TYPE -->|"SK starts with HANDOFF#"| HANDOFF_LOGIC["Trigger notification"]
CHECK_TYPE -->|"SK starts with SUMMARY#"| SUMMARY_LOGIC["Log only — no action"]
TURN_LOGIC -->|"Yes, 10th turn"| INVOKE_SUMMARY["Invoke Summary Generator"]
TURN_LOGIC -->|"No"| SEND_ANALYTICS["Send to Analytics only"]
META_LOGIC -->|"status: active → handoff"| SEND_HANDOFF_EVENT["Publish to EventBridge"]
META_LOGIC -->|"status: active → disconnected"| SEND_DISCONNECT_EVENT["Publish to EventBridge"]
META_LOGIC -->|"Other update"| SEND_ANALYTICS
HANDOFF_LOGIC --> SNS_PUBLISH["Publish to SNS"]
Event Router Code
"""
Lambda: manga-assist-stream-router
Trigger: DynamoDB Streams (manga-assist-sessions table)
Purpose: Classify stream events and fan out to appropriate services
"""
import json
import os
import boto3
from typing import Any
lambda_client = boto3.client("lambda")
sns_client = boto3.client("sns")
firehose_client = boto3.client("firehose")
eventbridge_client = boto3.client("events")
SUMMARY_FUNCTION = os.environ["SUMMARY_FUNCTION_NAME"]
SNS_HANDOFF_TOPIC = os.environ["SNS_HANDOFF_TOPIC_ARN"]
FIREHOSE_STREAM = os.environ["FIREHOSE_DELIVERY_STREAM"]
EVENT_BUS_NAME = os.environ.get("EVENT_BUS_NAME", "manga-assist-events")
def handler(event, context):
"""Process a batch of DynamoDB Stream records."""
for record in event["Records"]:
event_name = record["eventName"] # INSERT, MODIFY, REMOVE
new_image = record["dynamodb"].get("NewImage", {})
old_image = record["dynamodb"].get("OldImage", {})
keys = record["dynamodb"]["Keys"]
sk = keys["SK"]["S"]
pk = keys["PK"]["S"]
session_id = pk.replace("SESSION#", "")
# ── Route based on SK prefix ──
if sk.startswith("TURN#"):
_handle_turn_event(event_name, session_id, new_image, record)
elif sk == "META":
_handle_meta_event(event_name, session_id, new_image, old_image)
elif sk.startswith("HANDOFF#"):
_handle_handoff_event(session_id, new_image)
# ── Always send to analytics (all events) ──
_send_to_analytics(record)
return {"statusCode": 200, "batchItemFailures": []}
def _handle_turn_event(event_name: str, session_id: str, new_image: dict, raw_record: dict):
"""On every new TURN, check if it's time to generate a summary."""
if event_name != "INSERT":
return
# We need to check turn_count from META — but we don't have it in the TURN record.
# Option A: Async invoke summary lambda, let IT check the count.
# Option B: Read META here (adds a read per turn — expensive at scale).
# We choose Option A — the summary lambda decides whether to actually run.
lambda_client.invoke(
FunctionName=SUMMARY_FUNCTION,
InvocationType="Event", # Async — don't wait
Payload=json.dumps({
"session_id": session_id,
"trigger": "stream",
"turn_sk": new_image["SK"]["S"],
}),
)
def _handle_meta_event(event_name: str, session_id: str, new_image: dict, old_image: dict):
"""Detect status transitions and publish to EventBridge."""
if event_name != "MODIFY":
return
old_status = old_image.get("status", {}).get("S", "unknown")
new_status = new_image.get("status", {}).get("S", "unknown")
if old_status == new_status:
return # No status change
eventbridge_client.put_events(
Entries=[{
"Source": "mangaassist.sessions",
"DetailType": "SessionStatusChanged",
"Detail": json.dumps({
"session_id": session_id,
"old_status": old_status,
"new_status": new_status,
"customer_id": new_image.get("customer_id", {}).get("S", "unknown"),
"timestamp": new_image.get("updated_at", {}).get("N", "0"),
}),
"EventBusName": EVENT_BUS_NAME,
}]
)
def _handle_handoff_event(session_id: str, new_image: dict):
"""Notify human agents when a handoff record is created."""
agent_id = new_image.get("agent_id", {}).get("S", "unassigned")
reason = new_image.get("reason", {}).get("S", "no reason provided")
sns_client.publish(
TopicArn=SNS_HANDOFF_TOPIC,
Subject=f"Chat Handoff: {session_id}",
Message=json.dumps({
"session_id": session_id,
"agent_id": agent_id,
"reason": reason,
}),
MessageAttributes={
"event_type": {"DataType": "String", "StringValue": "handoff"},
"agent_id": {"DataType": "String", "StringValue": agent_id},
},
)
def _send_to_analytics(record: dict):
"""Forward every stream record to Kinesis Firehose for analytics."""
firehose_client.put_record(
DeliveryStreamName=FIREHOSE_STREAM,
Record={
"Data": json.dumps({
"event_name": record["eventName"],
"keys": record["dynamodb"]["Keys"],
"approximate_timestamp": record["dynamodb"].get("ApproximateCreationDateTime"),
"size_bytes": record["dynamodb"].get("SizeBytes"),
}) + "\n" # Newline-delimited JSON for Firehose
},
)
5. Summary Generator — Stream-Triggered Intelligence
sequenceDiagram
participant S as DynamoDB Stream
participant R as Event Router
participant SG as Summary Generator Lambda
participant D as DynamoDB Table
participant B as Bedrock LLM
S->>R: New TURN# INSERT event
R->>SG: Async invoke (session_id)
Note over SG: Step 1 — Check if summary needed
SG->>D: GetItem(PK=SESSION#abc, SK=META)<br/>Read turn_count
D-->>SG: turn_count = 30
alt turn_count % 10 != 0
Note over SG: Not time yet — exit
else turn_count % 10 == 0
Note over SG: Step 2 — Load turns since last summary
SG->>D: Query(PK=SESSION#abc, SK between TURN#ts_start and TURN#ts_end)
D-->>SG: 10 turns
Note over SG: Step 3 — Load previous summary for context
SG->>D: Query(PK=SESSION#abc, SK begins_with SUMMARY#, Limit=1, Reverse)
D-->>SG: Previous summary text
Note over SG: Step 4 — Generate new rolling summary
SG->>B: "Summarize this conversation segment<br/>given the previous summary as context"
B-->>SG: New summary text
Note over SG: Step 5 — Write summary item
SG->>D: PutItem(PK=SESSION#abc, SK=SUMMARY#3,<br/>summary_text, window_start, window_end)
end
Summary Generator Code
"""
Lambda: manga-assist-summary-generator
Trigger: Async invocation from Event Router Lambda
Purpose: Generate rolling conversation summaries every 10 turns
"""
import json
import os
import time
import zlib
import boto3
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ["SESSION_TABLE"])
bedrock = boto3.client("bedrock-runtime")
SUMMARY_INTERVAL = 10 # Generate summary every N turns
def handler(event, context):
session_id = event["session_id"]
pk = f"SESSION#{session_id}"
# ── Step 1: Check turn count ──
meta = table.get_item(
Key={"PK": pk, "SK": "META"},
ProjectionExpression="turn_count",
).get("Item", {})
turn_count = meta.get("turn_count", 0)
if turn_count < SUMMARY_INTERVAL or turn_count % SUMMARY_INTERVAL != 0:
return {"status": "skipped", "reason": f"turn_count={turn_count}, not at interval"}
summary_number = turn_count // SUMMARY_INTERVAL
# ── Step 2: Load the turns since the last summary ──
all_turns = _query_all_turns(pk)
# Pick the last SUMMARY_INTERVAL turns
recent_turns = all_turns[-SUMMARY_INTERVAL:]
if not recent_turns:
return {"status": "skipped", "reason": "no turns found"}
# ── Step 3: Load previous summary ──
prev_summary = _load_latest_summary(pk)
# ── Step 4: Generate summary via LLM ──
turn_text = _format_turns_for_prompt(recent_turns)
prompt = _build_summary_prompt(prev_summary, turn_text)
new_summary = _call_bedrock(prompt)
# ── Step 5: Write summary item ──
window_start = recent_turns[0]["SK"]
window_end = recent_turns[-1]["SK"]
table.put_item(
Item={
"PK": pk,
"SK": f"SUMMARY#{summary_number}",
"summary_text": new_summary,
"window_start": window_start,
"window_end": window_end,
"turn_count_at_summary": turn_count,
"created_at": int(time.time()),
"ttl": int(time.time()) + 86400,
},
ConditionExpression="attribute_not_exists(SK)", # Don't overwrite
)
return {"status": "generated", "summary_number": summary_number}
def _query_all_turns(pk: str) -> list:
"""Query all TURN items for a session (paginated)."""
items = []
params = {
"KeyConditionExpression": "PK = :pk AND begins_with(SK, :prefix)",
"ExpressionAttributeValues": {":pk": pk, ":prefix": "TURN#"},
"ScanIndexForward": True,
}
while True:
resp = table.query(**params)
items.extend(resp.get("Items", []))
if "LastEvaluatedKey" not in resp:
break
params["ExclusiveStartKey"] = resp["LastEvaluatedKey"]
return items
def _load_latest_summary(pk: str) -> str | None:
resp = table.query(
KeyConditionExpression="PK = :pk AND begins_with(SK, :prefix)",
ExpressionAttributeValues={":pk": pk, ":prefix": "SUMMARY#"},
ScanIndexForward=False,
Limit=1,
)
items = resp.get("Items", [])
return items[0].get("summary_text") if items else None
def _format_turns_for_prompt(turns: list) -> str:
lines = []
for t in turns:
content = t.get("content_compressed", b"")
if isinstance(content, bytes) and content:
text = zlib.decompress(content).decode("utf-8")
else:
text = str(content)
lines.append(f"{t.get('role', 'unknown')}: {text}")
return "\n".join(lines)
def _build_summary_prompt(prev_summary: str | None, turn_text: str) -> str:
context = f"Previous summary: {prev_summary}\n\n" if prev_summary else ""
return (
f"{context}"
f"New conversation segment:\n{turn_text}\n\n"
f"Generate a concise summary (3-5 sentences) that captures the key topics, "
f"decisions, and any unresolved questions from this conversation segment. "
f"Incorporate context from the previous summary if available."
)
def _call_bedrock(prompt: str) -> str:
response = bedrock.invoke_model(
modelId="anthropic.claude-3-haiku-20240307-v1:0", # Use Haiku for cost efficiency
contentType="application/json",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 512,
"messages": [{"role": "user", "content": prompt}],
}),
)
result = json.loads(response["body"].read())
return result["content"][0]["text"]
6. EventBridge Integration — Decoupled Event Routing
flowchart LR
subgraph DynamoDB_Side["DynamoDB Side"]
ROUTER["Stream Router Lambda"]
end
subgraph EventBridge["EventBridge (manga-assist-events)"]
EB[Custom Event Bus]
end
ROUTER -->|PutEvents| EB
subgraph Rules["EventBridge Rules"]
R1["Rule: SessionStarted<br/>detail-type = SessionStatusChanged<br/>new_status = active"]
R2["Rule: SessionHandoff<br/>detail-type = SessionStatusChanged<br/>new_status = handoff"]
R3["Rule: SessionEnded<br/>detail-type = SessionStatusChanged<br/>new_status = disconnected"]
R4["Rule: AllEvents<br/>detail-type = *<br/>→ Archive"]
end
EB --> R1
EB --> R2
EB --> R3
EB --> R4
subgraph Targets["Targets"]
T1["Metrics Lambda<br/>(count active sessions)"]
T2["Agent Queue SQS<br/>(notify available agents)"]
T3["Cleanup Lambda<br/>(schedule TTL review)"]
T4["S3 Archive<br/>(event replay)"]
end
R1 --> T1
R2 --> T2
R3 --> T3
R4 --> T4
EventBridge Rule Definition (CloudFormation)
# EventBridge rule to route handoff events to agent notification queue
HandoffEventRule:
Type: AWS::Events::Rule
Properties:
Name: manga-assist-handoff-rule
EventBusName: manga-assist-events
EventPattern:
source:
- mangaassist.sessions
detail-type:
- SessionStatusChanged
detail:
new_status:
- handoff
Targets:
- Id: agent-notification-queue
Arn: !GetAtt AgentNotificationQueue.Arn
- Id: handoff-metrics-lambda
Arn: !GetAtt HandoffMetricsFunction.Arn
7. SNS + SQS Fan-Out Pattern for Notifications
flowchart TD
subgraph Publisher
ROUTER["Stream Router Lambda"]
end
subgraph SNS
TOPIC["SNS Topic<br/>manga-assist-handoffs"]
end
ROUTER -->|Publish| TOPIC
subgraph Subscribers["SQS Subscribers"]
Q1["SQS: agent-dashboard-queue<br/>Filter: agent_id exists"]
Q2["SQS: customer-notification-queue<br/>Filter: all handoffs"]
Q3["SQS: escalation-queue<br/>Filter: reason = 'escalated'"]
end
TOPIC --> Q1
TOPIC --> Q2
TOPIC --> Q3
subgraph Consumers["Lambda Consumers"]
L1["Agent Dashboard Updater"]
L2["Customer Email Sender"]
L3["Escalation Manager"]
end
Q1 --> L1
Q2 --> L2
Q3 --> L3
subgraph DLQ["Dead Letter Queues"]
DLQ1["DLQ for agent-dashboard"]
DLQ2["DLQ for customer-notification"]
DLQ3["DLQ for escalation"]
end
Q1 -.->|"After 3 failures"| DLQ1
Q2 -.->|"After 3 failures"| DLQ2
Q3 -.->|"After 3 failures"| DLQ3
SNS Subscription Filter Policy
"""
Create SNS subscription with filter policy
so agents only receive handoffs assigned to them.
"""
import boto3
sns = boto3.client("sns")
# Agent-specific subscription — only receives their handoffs
sns.subscribe(
TopicArn="arn:aws:sns:us-east-1:123456789:manga-assist-handoffs",
Protocol="sqs",
Endpoint="arn:aws:sqs:us-east-1:123456789:agent-dashboard-queue",
Attributes={
"FilterPolicy": json.dumps({
"event_type": ["handoff"],
# Agent can optionally filter to only their handoffs
# "agent_id": ["agent-007"]
}),
"FilterPolicyScope": "MessageAttributes",
},
)
8. Streams Processing — Batch Failure Handling
flowchart TD
STREAM["DynamoDB Stream<br/>(batch of 10 records)"]
LAMBDA["Router Lambda"]
STREAM --> LAMBDA
LAMBDA --> PROCESS{"Process each record"}
PROCESS -->|"Record 1-5: Success"| OK["✅ Processed"]
PROCESS -->|"Record 6: FAILS"| FAIL["❌ Error"]
PROCESS -->|"Record 7-10: Success"| OK
FAIL --> STRATEGY{"Failure Strategy?"}
STRATEGY -->|"Bad: Return error"| ENTIRE_BATCH_RETRY["❌ Entire batch retries<br/>Records 1-5 processed AGAIN"]
STRATEGY -->|"Good: Partial batch failure"| PARTIAL["✅ Return batchItemFailures<br/>Only record 6 retries"]
PARTIAL --> RESPONSE["Response:<br/>{batchItemFailures:<br/>[{itemIdentifier: seq#6}]}"]
Partial Batch Failure Code
"""
Proper DynamoDB Streams batch processing with partial failure reporting.
This prevents re-processing successful records when one fails.
"""
def handler(event, context):
batch_item_failures = []
for record in event["Records"]:
try:
_process_single_record(record)
except Exception as e:
print(f"Failed to process record {record['eventID']}: {e}")
# Report ONLY the failed record
batch_item_failures.append({
"itemIdentifier": record["dynamodb"]["SequenceNumber"]
})
# Return partial failures — only failed records will be retried
return {
"batchItemFailures": batch_item_failures
}
def _process_single_record(record: dict):
"""Process one stream record. Raise on failure."""
event_name = record["eventName"]
sk = record["dynamodb"]["Keys"]["SK"]["S"]
if sk.startswith("TURN#") and event_name == "INSERT":
_handle_turn_event(record)
elif sk == "META" and event_name == "MODIFY":
_handle_meta_event(record)
# ... etc
9. Stream Record Retention and Ordering Guarantees
flowchart LR
subgraph Guarantees["DynamoDB Streams Guarantees"]
G1["✅ Exactly-once delivery<br/>of each change to the stream"]
G2["✅ Ordered within a partition key<br/>(all changes to SESSION#abc are in order)"]
G3["❌ NOT ordered across partition keys<br/>(SESSION#abc and SESSION#xyz may interleave)"]
G4["⏰ Records retained for 24 hours<br/>(then auto-deleted from stream)"]
end
subgraph Consumer_Implications["What This Means for Our Lambda"]
I1["Session-level ordering is guaranteed<br/>→ Summary for turn 20 always sees turns 1-19"]
I2["Cross-session ordering is NOT guaranteed<br/>→ Don't rely on global order"]
I3["Must process within 24h or lose events<br/>→ Monitor IteratorAge metric"]
I4["Lambda may see the same record twice<br/>on retry → Make handlers idempotent"]
end
G1 --> I4
G2 --> I1
G3 --> I2
G4 --> I3
10. Common Mistakes Teams Make with Streams
| Mistake | Why It Happens | Consequence | Fix |
|---|---|---|---|
Using NEW_IMAGE when you need old values |
"I only need the new data" | Can't detect what changed (old vs new status) | Use NEW_AND_OLD_IMAGES for event routing |
| Not enabling partial batch failure reporting | Default behavior retries entire batch | Successful records re-processed, duplicate side effects | Return batchItemFailures array |
| Synchronous Lambda invocation from stream | "I want to know if it succeeded" | Stream Lambda blocks, processing falls behind | Use async Event invocation for downstream |
| Stream Lambda does too much work | "One Lambda to rule them all" | Slow processing → IteratorAge grows → events pile up |
Keep router thin, fan out to specialized Lambdas |
Not monitoring IteratorAge |
Don't know the metric exists | Stream falls behind silently, events delayed by hours | Alarm on IteratorAge > 60000ms |
| Filtering in Lambda instead of EventBridge Pipes | "I'll just add an if-statement" | Lambda invoked for every event even when most are filtered | Use EventBridge Pipes filtering for high-volume streams |
| Not handling REMOVE events from TTL deletion | "TTL deletes just happen automatically" | No audit trail for expired sessions | Process REMOVE events and log to audit |
11. Critical Things to Remember
For Interviews
-
DynamoDB Streams = Change Data Capture (CDC) — It's the same concept as PostgreSQL WAL or MySQL binlog, but serverless and managed.
-
Ordering is per-partition-key, NOT global — Changes to
SESSION#abcarrive in order, but you cannot assume ordering between different sessions. -
Exactly-once delivery to the stream, at-least-once to Lambda — The stream record exists exactly once, but Lambda may receive it again on retry. Make handlers idempotent.
-
24-hour retention — If your consumer is down for >24 hours, you lose events forever. Monitor
IteratorAge. -
Streams don't affect table performance — Reading from a stream does NOT consume RCU from the table. It's a separate read path.
For Production
-
NEW_AND_OLD_IMAGEScosts twice the stream storage — but is almost always worth it for audit and change detection. -
Set
MaximumBatchingWindowInSeconds— Don't process one record at a time. Batch 10-100 records with a 5-second window for throughput. -
Alarm on
IteratorAge > 60 seconds— This means your Lambda is falling behind. Increase concurrency or optimize processing. -
Use
FunctionResponseTypes: [ReportBatchItemFailures]in CloudFormation/SAM — Without this, the Lambda runtime doesn't know to read thebatchItemFailuresresponse. -
DynamoDB Streams + EventBridge Pipes is the modern pattern — Pipes can filter, enrich, and transform stream records before they reach your Lambda, reducing cold invocations.