DynamoDB Cross-Service Security, IAM & Observability — Low-Level Design
Project: MangaAssist (Amazon-style chatbot) Scope: How DynamoDB is secured across all service boundaries, IAM policies, encryption, X-Ray tracing, CloudWatch monitoring, and compliance patterns.
1. Complete Security Architecture
flowchart TB
subgraph External["External Boundary"]
USER["👤 Customer"]
AGENT["👩💼 Human Agent"]
ADMIN["🔧 Admin"]
end
subgraph Auth["Authentication & Authorization"]
COGNITO["Amazon Cognito<br/>User Pool + Identity Pool"]
APIGW_AUTH["API Gateway<br/>Lambda Authorizer"]
end
subgraph Compute["Compute (IAM Roles)"]
L_MSG["Message Lambda<br/>Role: manga-msg-role"]
L_STREAM["Stream Lambda<br/>Role: manga-stream-role"]
L_ADMIN["Admin Lambda<br/>Role: manga-admin-role"]
SF["Step Functions<br/>Role: manga-sf-role"]
end
subgraph Data["Data Layer (Encrypted)"]
DDB["DynamoDB<br/>SSE-KMS<br/>VPC Endpoint"]
DAX["DAX Cluster<br/>In VPC<br/>TLS in-transit"]
REDIS["ElastiCache Redis<br/>In VPC<br/>Auth token + TLS"]
S3["S3 Backup Bucket<br/>SSE-S3<br/>Bucket Policy"]
end
subgraph Monitoring["Observability"]
XRAY["X-Ray Tracing"]
CW["CloudWatch Metrics + Alarms"]
CT["CloudTrail<br/>API Audit Logs"]
end
USER -->|"JWT Token"| COGNITO
COGNITO -->|"Validate"| APIGW_AUTH
APIGW_AUTH -->|"Allow/Deny"| L_MSG
AGENT -->|"Agent JWT"| COGNITO
ADMIN -->|"Admin JWT"| COGNITO
L_MSG -->|"manga-msg-role:<br/>Query, PutItem, UpdateItem<br/>on PK prefix SESSION#"| DDB
L_STREAM -->|"manga-stream-role:<br/>GetRecords from Stream<br/>+ PutItem for summaries"| DDB
L_ADMIN -->|"manga-admin-role:<br/>Scan, DeleteItem, Export"| DDB
SF -->|"manga-sf-role:<br/>GetItem, UpdateItem"| DDB
L_MSG --> DAX
L_MSG --> REDIS
DDB --> XRAY
L_MSG --> XRAY
DDB --> CW
DDB --> CT
2. IAM Least-Privilege Policies — Per Lambda Role
flowchart TD
subgraph Roles["IAM Roles — One Per Function"]
R1["manga-msg-role<br/>(Message Handler)"]
R2["manga-stream-role<br/>(Stream Processor)"]
R3["manga-admin-role<br/>(Admin Operations)"]
R4["manga-sf-role<br/>(Step Functions)"]
end
subgraph Permissions["DynamoDB Permissions Matrix"]
direction TB
P1["✅ Query, GetItem, PutItem, UpdateItem<br/>❌ Scan, DeleteItem, CreateTable"]
P2["✅ GetRecords, GetShardIterator, DescribeStream<br/>✅ PutItem (for summaries)<br/>❌ DeleteItem, Scan"]
P3["✅ Scan, Query, DeleteItem, ExportTableToPointInTime<br/>❌ CreateTable, DeleteTable"]
P4["✅ GetItem, UpdateItem, Query<br/>❌ Scan, DeleteItem, PutItem"]
end
R1 --- P1
R2 --- P2
R3 --- P3
R4 --- P4
IAM Policy: Message Handler Lambda (Most Restrictive)
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DynamoDBSessionAccess",
"Effect": "Allow",
"Action": [
"dynamodb:GetItem",
"dynamodb:Query",
"dynamodb:PutItem",
"dynamodb:UpdateItem"
],
"Resource": [
"arn:aws:dynamodb:us-east-1:123456789:table/manga-assist-sessions",
"arn:aws:dynamodb:us-east-1:123456789:table/manga-assist-sessions/index/*"
],
"Condition": {
"ForAllValues:StringLike": {
"dynamodb:LeadingKeys": ["SESSION#*"]
}
}
},
{
"Sid": "DenyDangerous",
"Effect": "Deny",
"Action": [
"dynamodb:Scan",
"dynamodb:DeleteItem",
"dynamodb:DeleteTable",
"dynamodb:CreateTable",
"dynamodb:BatchWriteItem"
],
"Resource": "*"
},
{
"Sid": "BedrockInvoke",
"Effect": "Allow",
"Action": ["bedrock:InvokeModel"],
"Resource": "arn:aws:bedrock:us-east-1::foundation-model/anthropic.*"
},
{
"Sid": "XRayTrace",
"Effect": "Allow",
"Action": [
"xray:PutTraceSegments",
"xray:PutTelemetryRecords"
],
"Resource": "*"
}
]
}
IAM Policy: Stream Processor Lambda
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DynamoDBStreamRead",
"Effect": "Allow",
"Action": [
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
"dynamodb:DescribeStream",
"dynamodb:ListStreams"
],
"Resource": "arn:aws:dynamodb:us-east-1:123456789:table/manga-assist-sessions/stream/*"
},
{
"Sid": "DynamoDBWriteSummaries",
"Effect": "Allow",
"Action": [
"dynamodb:PutItem",
"dynamodb:GetItem",
"dynamodb:Query"
],
"Resource": "arn:aws:dynamodb:us-east-1:123456789:table/manga-assist-sessions",
"Condition": {
"ForAllValues:StringLike": {
"dynamodb:LeadingKeys": ["SESSION#*"]
}
}
},
{
"Sid": "KinesisWrite",
"Effect": "Allow",
"Action": ["kinesis:PutRecord", "kinesis:PutRecords"],
"Resource": "arn:aws:kinesis:us-east-1:123456789:stream/manga-assist-events"
},
{
"Sid": "SNSPublish",
"Effect": "Allow",
"Action": ["sns:Publish"],
"Resource": "arn:aws:sns:us-east-1:123456789:manga-assist-handoffs"
}
]
}
3. Encryption Architecture
flowchart LR
subgraph AtRest["Encryption at Rest"]
DDB_ENC["DynamoDB<br/>SSE with AWS-managed KMS key<br/>(aws/dynamodb)"]
S3_ENC["S3 Backups<br/>SSE-S3 (AES-256)<br/>or SSE-KMS for compliance"]
REDIS_ENC["ElastiCache Redis<br/>at-rest encryption enabled"]
DAX_ENC["DAX<br/>at-rest encryption enabled"]
end
subgraph InTransit["Encryption in Transit"]
DDB_TLS["DynamoDB<br/>HTTPS/TLS only<br/>(enforced by VPC endpoint policy)"]
DAX_TLS["DAX<br/>TLS encryption enabled<br/>(ClusterEndpointEncryptionType: TLS)"]
REDIS_TLS["Redis<br/>in-transit encryption = true<br/>AUTH token required"]
API_TLS["API Gateway<br/>TLS 1.2 minimum"]
end
subgraph Keys["Key Management"]
KMS["AWS KMS"]
CMK["Customer-managed key:<br/>manga-assist-cmk<br/>(for audit + compliance)"]
end
DDB_ENC --> KMS
S3_ENC --> KMS
CMK --> KMS
VPC Endpoint Policy — Restrict DynamoDB Access to VPC
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowOnlyFromVPC",
"Effect": "Allow",
"Principal": "*",
"Action": [
"dynamodb:GetItem",
"dynamodb:Query",
"dynamodb:PutItem",
"dynamodb:UpdateItem",
"dynamodb:DeleteItem",
"dynamodb:BatchWriteItem"
],
"Resource": "arn:aws:dynamodb:us-east-1:123456789:table/manga-assist-*",
"Condition": {
"StringEquals": {
"aws:SourceVpc": "vpc-0abc1234def56789"
}
}
},
{
"Sid": "DenyNonTLS",
"Effect": "Deny",
"Principal": "*",
"Action": "dynamodb:*",
"Resource": "*",
"Condition": {
"Bool": {
"aws:SecureTransport": "false"
}
}
}
]
}
4. X-Ray Distributed Tracing — Full Request Flow
flowchart LR
subgraph Trace["X-Ray Trace: User Message → AI Response"]
direction TB
SEG1["Segment: API Gateway<br/>Duration: 2ms<br/>Status: 200"]
SEG2["Segment: Lambda (message-handler)<br/>Duration: 920ms<br/>Cold Start: No"]
SUB1[" └─ Subsegment: DynamoDB Query (context)<br/> Duration: 8ms<br/> Table: manga-assist-sessions"]
SUB2[" └─ Subsegment: DynamoDB Query (summary)<br/> Duration: 5ms<br/> Table: manga-assist-sessions"]
SUB3[" └─ Subsegment: DynamoDB PutItem (user turn)<br/> Duration: 6ms<br/> Table: manga-assist-sessions"]
SUB4[" └─ Subsegment: Bedrock InvokeModel<br/> Duration: 850ms ⚠️<br/> Model: claude-3-sonnet"]
SUB5[" └─ Subsegment: DynamoDB PutItem (assistant turn)<br/> Duration: 7ms<br/> Table: manga-assist-sessions"]
SUB6[" └─ Subsegment: DynamoDB UpdateItem (META)<br/> Duration: 4ms<br/> Table: manga-assist-sessions"]
SEG1 --> SEG2
SEG2 --> SUB1
SEG2 --> SUB2
SEG2 --> SUB3
SEG2 --> SUB4
SEG2 --> SUB5
SEG2 --> SUB6
end
X-Ray Instrumentation Code
"""
X-Ray instrumentation for Lambda + DynamoDB.
Traces every DynamoDB call and Bedrock call automatically.
"""
# ── Enable X-Ray in Lambda ──
# Method 1: Patch boto3 globally (recommended)
from aws_xray_sdk.core import xray_recorder, patch_all
# Patches boto3, requests, and other libraries automatically
patch_all()
# ── OR Method 2: Patch only specific libraries ──
from aws_xray_sdk.core import patch
patch(["boto3", "botocore"])
import boto3
import os
import json
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ["SESSION_TABLE"])
bedrock = boto3.client("bedrock-runtime")
def handler(event, context):
session_id = event.get("session_id")
# ── Custom subsegment for business logic ──
with xray_recorder.in_subsegment("LoadContext") as subsegment:
subsegment.put_annotation("session_id", session_id)
subsegment.put_metadata("event", event, "request")
# DynamoDB calls within this block are auto-traced
meta = table.get_item(Key={
"PK": f"SESSION#{session_id}",
"SK": "META",
}).get("Item", {})
turns = table.query(
KeyConditionExpression="PK = :pk AND begins_with(SK, :prefix)",
ExpressionAttributeValues={
":pk": f"SESSION#{session_id}",
":prefix": "TURN#",
},
ScanIndexForward=False,
Limit=20,
).get("Items", [])
subsegment.put_annotation("turn_count", len(turns))
# ── Custom subsegment for LLM call ──
with xray_recorder.in_subsegment("BedrockInvoke") as subsegment:
subsegment.put_annotation("model", "claude-3-sonnet")
response = bedrock.invoke_model(
modelId="anthropic.claude-3-sonnet-20240229-v1:0",
contentType="application/json",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1024,
"messages": [{"role": "user", "content": "Hello"}],
}),
)
subsegment.put_annotation("response_status", "success")
# ── Custom subsegment for DynamoDB writes ──
with xray_recorder.in_subsegment("WriteTurns") as subsegment:
# PutItem calls here are auto-traced
table.put_item(Item={
"PK": f"SESSION#{session_id}",
"SK": "TURN#12345",
"role": "assistant",
"content_compressed": b"response...",
})
subsegment.put_annotation("write_count", 1)
return {"statusCode": 200}
5. CloudWatch Monitoring — DynamoDB Dashboard
flowchart TB
subgraph Metrics["Key CloudWatch Metrics"]
M1["SuccessfulRequestLatency (P50, P99)<br/>🟢 P99 < 10ms: Healthy<br/>🟡 P99 10-25ms: Watch<br/>🔴 P99 > 25ms: Alert"]
M2["ConsumedReadCapacityUnits<br/>ConsumedWriteCapacityUnits<br/>📊 Track against provisioned/burst"]
M3["ThrottledRequests<br/>🔴 Any > 0: Alarm immediately"]
M4["SystemErrors (5xx)<br/>🔴 Any > 0: Alarm immediately"]
M5["UserErrors (4xx)<br/>🟡 Includes ConditionalCheckFailed<br/>Filter before alarming"]
M6["AccountProvisionedReadCapacityUtilization<br/>AccountProvisionedWriteCapacityUtilization<br/>🟡 > 80%: Plan capacity increase"]
end
subgraph Alarms["CloudWatch Alarms"]
A1["CRITICAL: ThrottledRequests > 0<br/>for 1 minute"]
A2["CRITICAL: SystemErrors > 0<br/>for 1 minute"]
A3["WARNING: P99 Latency > 25ms<br/>for 5 minutes"]
A4["WARNING: ConsumedWCU > 80%<br/>of provisioned for 10 minutes"]
A5["INFO: IteratorAge > 60s<br/>(Streams processing lag)"]
end
subgraph Actions["Alarm Actions"]
SNS_OPS["SNS → PagerDuty/Slack"]
AUTO_SCALE["Trigger auto-scaling"]
RUNBOOK["Link to runbook"]
end
M3 --> A1 --> SNS_OPS
M4 --> A2 --> SNS_OPS
M1 --> A3 --> RUNBOOK
M2 --> A4 --> AUTO_SCALE
CloudWatch Alarms (CloudFormation)
# Critical: Throttled requests alarm
DynamoDBThrottleAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: manga-assist-dynamodb-throttled
AlarmDescription: "DynamoDB throttling detected — requests being rejected"
Namespace: AWS/DynamoDB
MetricName: ThrottledRequests
Dimensions:
- Name: TableName
Value: manga-assist-sessions
Statistic: Sum
Period: 60
EvaluationPeriods: 1
Threshold: 0
ComparisonOperator: GreaterThanThreshold
TreatMissingData: notBreaching
AlarmActions:
- !Ref OpsNotificationTopic
# Critical: System errors
DynamoDBSystemErrorAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: manga-assist-dynamodb-system-errors
AlarmDescription: "DynamoDB 5xx errors — AWS-side issue"
Namespace: AWS/DynamoDB
MetricName: SystemErrors
Dimensions:
- Name: TableName
Value: manga-assist-sessions
Statistic: Sum
Period: 60
EvaluationPeriods: 1
Threshold: 0
ComparisonOperator: GreaterThanThreshold
AlarmActions:
- !Ref OpsNotificationTopic
# Warning: High P99 latency
DynamoDBLatencyAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: manga-assist-dynamodb-p99-latency
AlarmDescription: "DynamoDB P99 latency > 25ms"
Namespace: AWS/DynamoDB
MetricName: SuccessfulRequestLatency
Dimensions:
- Name: TableName
Value: manga-assist-sessions
- Name: Operation
Value: Query
ExtendedStatistic: p99
Period: 300
EvaluationPeriods: 3
Threshold: 25
ComparisonOperator: GreaterThanThreshold
AlarmActions:
- !Ref OpsNotificationTopic
# Warning: Stream processing lag
StreamIteratorAgeAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: manga-assist-stream-iterator-age
AlarmDescription: "DynamoDB Stream consumer falling behind"
Namespace: AWS/DynamoDB
MetricName: GetRecords.IteratorAgeMilliseconds
Dimensions:
- Name: TableName
Value: manga-assist-sessions
Statistic: Maximum
Period: 60
EvaluationPeriods: 5
Threshold: 60000
ComparisonOperator: GreaterThanThreshold
AlarmActions:
- !Ref OpsNotificationTopic
6. CloudTrail — API-Level Audit Trail
flowchart LR
subgraph API_Calls["DynamoDB API Calls"]
MGMT["Management Events<br/>(auto-logged)<br/>- CreateTable<br/>- DeleteTable<br/>- UpdateTable<br/>- UpdateTimeToLive"]
DATA["Data Events<br/>(must enable explicitly)<br/>- GetItem<br/>- PutItem<br/>- Query<br/>- DeleteItem"]
end
subgraph CloudTrail
CT["CloudTrail Trail"]
end
subgraph Storage
S3_CT["S3: CloudTrail Logs<br/>(encrypted, immutable)"]
CW_CT["CloudWatch Logs<br/>(for real-time alerting)"]
end
MGMT --> CT
DATA --> CT
CT --> S3_CT
CT --> CW_CT
subgraph Security_Alerts["Security Monitoring"]
ALERT1["Alert: DeleteTable called<br/>→ Page on-call immediately"]
ALERT2["Alert: Scan operation detected<br/>→ Investigate (should never happen)"]
ALERT3["Alert: AccessDenied errors<br/>→ Check for credential leaks"]
end
CW_CT --> ALERT1
CW_CT --> ALERT2
CW_CT --> ALERT3
CloudTrail Metric Filter for Dangerous Operations
"""
CloudWatch Logs metric filter to detect dangerous DynamoDB operations.
Deploy via CloudFormation or create manually.
"""
# Pattern to detect Scan operations (should never happen in production)
SCAN_FILTER_PATTERN = (
'{ ($.eventSource = "dynamodb.amazonaws.com") '
'&& ($.eventName = "Scan") '
'&& ($.requestParameters.tableName = "manga-assist-sessions") }'
)
# Pattern to detect table modifications
TABLE_MODIFY_PATTERN = (
'{ ($.eventSource = "dynamodb.amazonaws.com") '
'&& ($.eventName = "DeleteTable" || $.eventName = "UpdateTable" '
'|| $.eventName = "CreateBackup" || $.eventName = "RestoreTableFromBackup") }'
)
# Pattern to detect access denied (potential credential issue)
ACCESS_DENIED_PATTERN = (
'{ ($.eventSource = "dynamodb.amazonaws.com") '
'&& ($.errorCode = "AccessDeniedException" '
'|| $.errorCode = "UnauthorizedAccess") }'
)
7. End-to-End Request Observability
flowchart TB
subgraph Request_Flow["Full Request Observability Stack"]
direction LR
subgraph L1["Layer 1: API Gateway"]
ACCESS_LOG["Access Logs<br/>- Request ID<br/>- Client IP<br/>- Latency<br/>- Status code"]
end
subgraph L2["Layer 2: Lambda"]
STRUCTURED_LOG["Structured Logs (JSON)<br/>- Request ID (correlated)<br/>- Session ID<br/>- Function name<br/>- Duration per step"]
XRAY_TRACE["X-Ray Trace<br/>- Full service map<br/>- Subsegment timings<br/>- Annotations for search"]
end
subgraph L3["Layer 3: DynamoDB"]
DDB_METRICS["CloudWatch Metrics<br/>- Per-operation latency<br/>- Consumed capacity<br/>- Throttle events"]
DDB_CONTRIB["Contributor Insights<br/>- Most accessed keys<br/>- Hottest partitions"]
end
subgraph L4["Layer 4: Downstream"]
BEDROCK_LOG["Bedrock Model Logs<br/>- Token counts<br/>- Input/output logged to S3"]
KINESIS_METRICS["Kinesis Metrics<br/>- Iterator age<br/>- Put success rate"]
end
L1 --> L2 --> L3 --> L4
end
Structured Logging Pattern for Lambda
"""
Structured JSON logging for Lambda + DynamoDB operations.
Enables filtering, searching, and correlating logs across services.
"""
import json
import logging
import os
import time
import uuid
from functools import wraps
# ── Structured Logger Setup ──
logger = logging.getLogger()
logger.setLevel(logging.INFO)
class StructuredMessage:
"""Format log messages as JSON for CloudWatch Logs Insights queries."""
def __init__(self, **kwargs):
self.kwargs = kwargs
def __str__(self):
return json.dumps(self.kwargs, default=str)
def log_info(**kwargs):
logger.info(StructuredMessage(**kwargs))
def log_error(**kwargs):
logger.error(StructuredMessage(**kwargs))
# ── DynamoDB Operation Logger ──
def log_dynamodb_operation(operation: str):
"""Decorator that logs DynamoDB operation timing and outcome."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
try:
result = func(*args, **kwargs)
duration_ms = (time.time() - start) * 1000
log_info(
event="dynamodb_operation",
operation=operation,
duration_ms=round(duration_ms, 2),
status="success",
table=os.environ.get("SESSION_TABLE"),
)
return result
except Exception as e:
duration_ms = (time.time() - start) * 1000
log_error(
event="dynamodb_operation",
operation=operation,
duration_ms=round(duration_ms, 2),
status="error",
error_type=type(e).__name__,
error_message=str(e),
table=os.environ.get("SESSION_TABLE"),
)
raise
return wrapper
return decorator
# ── Usage Example ──
import boto3
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ.get("SESSION_TABLE", "manga-assist-sessions"))
@log_dynamodb_operation("Query:LoadTurns")
def load_recent_turns(session_id: str, limit: int = 20):
return table.query(
KeyConditionExpression="PK = :pk AND begins_with(SK, :prefix)",
ExpressionAttributeValues={
":pk": f"SESSION#{session_id}",
":prefix": "TURN#",
},
ScanIndexForward=False,
Limit=limit,
)
@log_dynamodb_operation("PutItem:WriteTurn")
def write_turn(session_id: str, turn_data: dict):
return table.put_item(Item={
"PK": f"SESSION#{session_id}",
**turn_data,
})
CloudWatch Logs Insights Queries
-- Find slowest DynamoDB operations in the last hour
fields @timestamp, operation, duration_ms, status, error_type
| filter event = "dynamodb_operation"
| sort duration_ms desc
| limit 20
-- Count DynamoDB errors by type
fields @timestamp, operation, error_type
| filter event = "dynamodb_operation" and status = "error"
| stats count(*) as error_count by error_type, operation
| sort error_count desc
-- P50/P95/P99 latency by operation
fields @timestamp, operation, duration_ms
| filter event = "dynamodb_operation" and status = "success"
| stats avg(duration_ms) as avg_ms,
pct(duration_ms, 50) as p50_ms,
pct(duration_ms, 95) as p95_ms,
pct(duration_ms, 99) as p99_ms
by operation
-- Find sessions with the most turns (potential abuse)
fields @timestamp, session_id, turn_count
| filter event = "dynamodb_operation" and operation = "Query:LoadTurns"
| stats max(turn_count) as max_turns by session_id
| sort max_turns desc
| limit 10
8. DynamoDB Contributor Insights — Hot Key Detection
flowchart TD
subgraph Normal["Normal Traffic Distribution"]
N1["SESSION#aaa: 50 req/s"]
N2["SESSION#bbb: 48 req/s"]
N3["SESSION#ccc: 52 req/s"]
N4["SESSION#ddd: 45 req/s"]
N_LABEL["Evenly distributed ✅"]
end
subgraph Hot["Hot Key Detected! 🔴"]
H1["SESSION#viral-session: 5000 req/s ⚠️"]
H2["SESSION#bbb: 48 req/s"]
H3["SESSION#ccc: 52 req/s"]
H_LABEL["One key consuming 90% of capacity"]
end
subgraph Detection["Contributor Insights"]
CI["CloudWatch Metric:<br/>DynamoDBContributorInsights<br/>Most accessed partition keys"]
end
subgraph Response["Automated Response"]
R1["1. Alarm fires on hot key"]
R2["2. Lambda checks if session is legitimate"]
R3["3a. Legitimate: Enable DAX for that session"]
R4["3b. Abuse: Rate-limit via Redis + block"]
end
Hot --> Detection --> Response
9. Compliance and Data Protection Patterns
flowchart TD
subgraph GDPR_CCPA["GDPR / CCPA Compliance"]
direction TB
RIGHT_ACCESS["Right to Access<br/>→ Query GSI by customer_id<br/>→ Return all session data"]
RIGHT_DELETE["Right to Deletion<br/>→ Query all items by customer_id<br/>→ Delete each item individually<br/>→ Log deletion in audit trail<br/>→ Purge from S3 backups too"]
RIGHT_PORT["Right to Portability<br/>→ Export customer data to JSON<br/>→ Deliver via signed S3 URL"]
end
subgraph Implementation
DEL_LAMBDA["Deletion Lambda<br/>Triggered by support ticket"]
DEL_LAMBDA -->|"1. Query GSI"| DDB[(DynamoDB)]
DEL_LAMBDA -->|"2. Delete items"| DDB
DEL_LAMBDA -->|"3. Audit log"| CT[CloudTrail]
DEL_LAMBDA -->|"4. Purge backups"| S3[S3]
DEL_LAMBDA -->|"5. Confirm"| SNS[SNS → Customer]
end
GDPR Deletion Code
"""
GDPR/CCPA compliant data deletion for a specific customer.
Deletes from DynamoDB AND S3 backups.
"""
import json
import os
import time
import boto3
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ["SESSION_TABLE"])
s3 = boto3.client("s3")
audit_log = boto3.client("logs")
BACKUP_BUCKET = os.environ["BACKUP_BUCKET"]
AUDIT_LOG_GROUP = os.environ["AUDIT_LOG_GROUP"]
def delete_customer_data(customer_id: str, request_id: str) -> dict:
"""
Complete customer data deletion for GDPR Article 17 compliance.
IMPORTANT: TTL is NOT sufficient for compliance deletion.
TTL deletes are eventually consistent (up to 48hr delay).
Explicit DeleteItem is required.
"""
deleted_sessions = []
total_items_deleted = 0
# ── Step 1: Find all sessions for this customer via GSI ──
sessions = _query_customer_sessions(customer_id)
for session in sessions:
session_id = session["session_id"]
pk = f"SESSION#{session_id}"
# ── Step 2: Delete all items in each session ──
items_deleted = _delete_all_session_items(pk)
total_items_deleted += items_deleted
deleted_sessions.append(session_id)
# ── Step 3: Log the deletion to audit trail ──
_write_audit_log(
action="GDPR_DELETION",
customer_id=customer_id,
request_id=request_id,
sessions_deleted=deleted_sessions,
items_deleted=total_items_deleted,
)
# ── Step 4: Mark S3 backup objects for deletion ──
# Note: S3 Glacier objects may need special handling
_purge_s3_backups(customer_id, deleted_sessions)
return {
"status": "deleted",
"customer_id": customer_id,
"sessions_deleted": len(deleted_sessions),
"items_deleted": total_items_deleted,
"audit_request_id": request_id,
}
def _query_customer_sessions(customer_id: str) -> list:
"""Find all sessions via GSI."""
items = []
params = {
"IndexName": "GSI1-customer-sessions",
"KeyConditionExpression": "customer_id = :cid",
"ExpressionAttributeValues": {":cid": customer_id},
}
while True:
resp = table.query(**params)
items.extend(resp.get("Items", []))
if "LastEvaluatedKey" not in resp:
break
params["ExclusiveStartKey"] = resp["LastEvaluatedKey"]
return items
def _delete_all_session_items(pk: str) -> int:
"""Delete every item (META, TURN, SUMMARY, HANDOFF) for a session."""
count = 0
params = {
"KeyConditionExpression": "PK = :pk",
"ExpressionAttributeValues": {":pk": pk},
"ProjectionExpression": "PK, SK", # Only need keys for delete
}
while True:
resp = table.query(**params)
items = resp.get("Items", [])
# Delete in batches of 25
for i in range(0, len(items), 25):
batch = items[i:i + 25]
with table.batch_writer() as writer:
for item in batch:
writer.delete_item(Key={"PK": item["PK"], "SK": item["SK"]})
count += len(batch)
if "LastEvaluatedKey" not in resp:
break
params["ExclusiveStartKey"] = resp["LastEvaluatedKey"]
return count
def _write_audit_log(action: str, **details):
"""Write immutable audit record."""
log_entry = {
"timestamp": int(time.time() * 1000),
"action": action,
**details,
}
audit_log.put_log_events(
logGroupName=AUDIT_LOG_GROUP,
logStreamName=f"gdpr-deletions-{time.strftime('%Y-%m-%d')}",
logEvents=[{
"timestamp": log_entry["timestamp"],
"message": json.dumps(log_entry, default=str),
}],
)
def _purge_s3_backups(customer_id: str, session_ids: list):
"""Delete customer data from S3 backup archives."""
for session_id in session_ids:
prefix = f"sessions/{session_id}/"
paginator = s3.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=BACKUP_BUCKET, Prefix=prefix):
objects = page.get("Contents", [])
if objects:
s3.delete_objects(
Bucket=BACKUP_BUCKET,
Delete={"Objects": [{"Key": obj["Key"]} for obj in objects]},
)
10. The Complete Service Interaction Map
graph TB
DDB[(DynamoDB<br/>manga-assist-sessions)]
%% Direct integrations
LAMBDA["Lambda<br/>(6 functions)"] -->|"Query, PutItem,<br/>UpdateItem, DeleteItem"| DDB
STEPFN["Step Functions"] -->|"GetItem, UpdateItem<br/>(direct SDK)"| DDB
DAX["DAX"] -->|"Read/Write cache"| DDB
STREAMS["DynamoDB Streams"] -->|"CDC events"| DDB
%% Stream consumers
STREAMS --> LAMBDA_STREAM["Stream Router Lambda"]
LAMBDA_STREAM --> KINESIS["Kinesis Data Streams"]
LAMBDA_STREAM --> SNS["SNS"]
LAMBDA_STREAM --> EVENTBRIDGE["EventBridge"]
%% Kinesis pipeline
KINESIS --> FIREHOSE["Kinesis Firehose"]
FIREHOSE --> S3["S3<br/>(Analytics + Backup)"]
S3 --> ATHENA["Athena"]
S3 --> GLUE["Glue ETL"]
%% Caching
LAMBDA --> DAX
LAMBDA --> REDIS["ElastiCache Redis"]
%% Async
LAMBDA -.->|"Failed writes"| SQS["SQS DLQ"]
SQS --> LAMBDA_RETRY["Retry Lambda"]
LAMBDA_RETRY --> DDB
%% Observability
DDB --> CW["CloudWatch"]
DDB --> XRAY["X-Ray"]
DDB --> CT["CloudTrail"]
DDB --> CI["Contributor Insights"]
%% Security
DDB --> KMS["KMS<br/>(encryption)"]
IAM["IAM Roles"] --> DDB
VPC_EP["VPC Endpoint"] --> DDB
%% Backup
DDB --> PITR["PITR<br/>(continuous backup)"]
DDB -->|"Export"| S3
%% External
LAMBDA --> BEDROCK["Amazon Bedrock"]
LAMBDA --> APIGW["API Gateway"]
EVENTBRIDGE --> TARGETS["EventBridge Targets<br/>(cleanup, metrics, alerts)"]
SNS --> SQS_AGENTS["SQS Agent Queues"]
style DDB fill:#f9a825,stroke:#f57f17,color:#000
11. Common Mistakes Teams Make
| Mistake | Why It Happens | Consequence | Fix |
|---|---|---|---|
| Single IAM role for all Lambdas | "Easier to manage" | One compromised function can delete the table | One role per function with least-privilege |
| Not enabling CloudTrail data events | "Management events are enough" | Can't audit who read/wrote specific items | Enable data events for compliance |
| Logging PII in CloudWatch | Dumping full DynamoDB responses to logs | Customer data in logs = compliance violation | Log metadata only, redact content |
| No VPC endpoint for DynamoDB | Lambda calls DynamoDB over public internet | Higher latency, data traverses public network | Create Gateway VPC endpoint (free) |
| Alarming on ALL UserErrors | Including ConditionalCheckFailedException | Constant false alarms from idempotent writes | Filter metric to exclude conditional failures |
| Not using Contributor Insights | Don't know it exists | Hot key detected only after major outage | Enable Contributor Insights ($0.02/metric/hour) |
| X-Ray not enabled on Lambda | "We have CloudWatch logs" | Can't trace latency across DynamoDB + Bedrock | Enable active tracing + patch boto3 |
| TTL as compliance deletion | "TTL deletes the data" | TTL has up to 48hr lag — audit/compliance fail | Use explicit DeleteItem + audit log |
12. Critical Things to Remember
For Interviews
-
One IAM role per Lambda function — This is the #1 security principle. The message handler should NEVER have Scan or DeleteTable permissions.
-
dynamodb:LeadingKeyscondition — Restricts which partition keys a role can access. This is how you enforce session-level access control. -
DynamoDB VPC Endpoint is a Gateway Endpoint — It's free, no per-hour charge. There's no reason NOT to use it. It keeps traffic off the public internet.
-
CloudTrail data events cost extra — $0.10 per 100K events. But for compliance, you MUST have an audit trail of who accessed what data.
-
X-Ray + DynamoDB gives you subsegment-level timing — You can see exactly how long each GetItem/Query/PutItem took within a single Lambda invocation.
For Production
-
Encrypt everything at rest AND in transit — DynamoDB SSE-KMS by default, TLS for DAX and Redis, enforce
aws:SecureTransportin policies. -
Contributor Insights catches hot keys before they cause outages — Enable it on the table and all GSIs. $0.02/metric/hour is cheap insurance.
-
Structured JSON logs + CloudWatch Logs Insights = powerful debugging — Don't use print statements. Use structured JSON. Query with Logs Insights.
-
Run quarterly incident simulations — Delete a session, restore it from PITR. Trace a request end-to-end in X-Ray. Verify CloudTrail captured it.
-
GDPR deletion is an explicit, audited process — TTL is for storage optimization. Compliance deletion requires explicit DeleteItem + audit log + S3 purge.