LOCAL PREVIEW View on GitHub

DynamoDB Cross-Service Security, IAM & Observability — Low-Level Design

Project: MangaAssist (Amazon-style chatbot) Scope: How DynamoDB is secured across all service boundaries, IAM policies, encryption, X-Ray tracing, CloudWatch monitoring, and compliance patterns.


1. Complete Security Architecture

flowchart TB
    subgraph External["External Boundary"]
        USER["👤 Customer"]
        AGENT["👩‍💼 Human Agent"]
        ADMIN["🔧 Admin"]
    end

    subgraph Auth["Authentication & Authorization"]
        COGNITO["Amazon Cognito<br/>User Pool + Identity Pool"]
        APIGW_AUTH["API Gateway<br/>Lambda Authorizer"]
    end

    subgraph Compute["Compute (IAM Roles)"]
        L_MSG["Message Lambda<br/>Role: manga-msg-role"]
        L_STREAM["Stream Lambda<br/>Role: manga-stream-role"]
        L_ADMIN["Admin Lambda<br/>Role: manga-admin-role"]
        SF["Step Functions<br/>Role: manga-sf-role"]
    end

    subgraph Data["Data Layer (Encrypted)"]
        DDB["DynamoDB<br/>SSE-KMS<br/>VPC Endpoint"]
        DAX["DAX Cluster<br/>In VPC<br/>TLS in-transit"]
        REDIS["ElastiCache Redis<br/>In VPC<br/>Auth token + TLS"]
        S3["S3 Backup Bucket<br/>SSE-S3<br/>Bucket Policy"]
    end

    subgraph Monitoring["Observability"]
        XRAY["X-Ray Tracing"]
        CW["CloudWatch Metrics + Alarms"]
        CT["CloudTrail<br/>API Audit Logs"]
    end

    USER -->|"JWT Token"| COGNITO
    COGNITO -->|"Validate"| APIGW_AUTH
    APIGW_AUTH -->|"Allow/Deny"| L_MSG

    AGENT -->|"Agent JWT"| COGNITO
    ADMIN -->|"Admin JWT"| COGNITO

    L_MSG -->|"manga-msg-role:<br/>Query, PutItem, UpdateItem<br/>on PK prefix SESSION#"| DDB
    L_STREAM -->|"manga-stream-role:<br/>GetRecords from Stream<br/>+ PutItem for summaries"| DDB
    L_ADMIN -->|"manga-admin-role:<br/>Scan, DeleteItem, Export"| DDB
    SF -->|"manga-sf-role:<br/>GetItem, UpdateItem"| DDB

    L_MSG --> DAX
    L_MSG --> REDIS

    DDB --> XRAY
    L_MSG --> XRAY
    DDB --> CW
    DDB --> CT

2. IAM Least-Privilege Policies — Per Lambda Role

flowchart TD
    subgraph Roles["IAM Roles — One Per Function"]
        R1["manga-msg-role<br/>(Message Handler)"]
        R2["manga-stream-role<br/>(Stream Processor)"]
        R3["manga-admin-role<br/>(Admin Operations)"]
        R4["manga-sf-role<br/>(Step Functions)"]
    end

    subgraph Permissions["DynamoDB Permissions Matrix"]
        direction TB
        P1["✅ Query, GetItem, PutItem, UpdateItem<br/>❌ Scan, DeleteItem, CreateTable"]
        P2["✅ GetRecords, GetShardIterator, DescribeStream<br/>✅ PutItem (for summaries)<br/>❌ DeleteItem, Scan"]
        P3["✅ Scan, Query, DeleteItem, ExportTableToPointInTime<br/>❌ CreateTable, DeleteTable"]
        P4["✅ GetItem, UpdateItem, Query<br/>❌ Scan, DeleteItem, PutItem"]
    end

    R1 --- P1
    R2 --- P2
    R3 --- P3
    R4 --- P4

IAM Policy: Message Handler Lambda (Most Restrictive)

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DynamoDBSessionAccess",
      "Effect": "Allow",
      "Action": [
        "dynamodb:GetItem",
        "dynamodb:Query",
        "dynamodb:PutItem",
        "dynamodb:UpdateItem"
      ],
      "Resource": [
        "arn:aws:dynamodb:us-east-1:123456789:table/manga-assist-sessions",
        "arn:aws:dynamodb:us-east-1:123456789:table/manga-assist-sessions/index/*"
      ],
      "Condition": {
        "ForAllValues:StringLike": {
          "dynamodb:LeadingKeys": ["SESSION#*"]
        }
      }
    },
    {
      "Sid": "DenyDangerous",
      "Effect": "Deny",
      "Action": [
        "dynamodb:Scan",
        "dynamodb:DeleteItem",
        "dynamodb:DeleteTable",
        "dynamodb:CreateTable",
        "dynamodb:BatchWriteItem"
      ],
      "Resource": "*"
    },
    {
      "Sid": "BedrockInvoke",
      "Effect": "Allow",
      "Action": ["bedrock:InvokeModel"],
      "Resource": "arn:aws:bedrock:us-east-1::foundation-model/anthropic.*"
    },
    {
      "Sid": "XRayTrace",
      "Effect": "Allow",
      "Action": [
        "xray:PutTraceSegments",
        "xray:PutTelemetryRecords"
      ],
      "Resource": "*"
    }
  ]
}

IAM Policy: Stream Processor Lambda

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DynamoDBStreamRead",
      "Effect": "Allow",
      "Action": [
        "dynamodb:GetRecords",
        "dynamodb:GetShardIterator",
        "dynamodb:DescribeStream",
        "dynamodb:ListStreams"
      ],
      "Resource": "arn:aws:dynamodb:us-east-1:123456789:table/manga-assist-sessions/stream/*"
    },
    {
      "Sid": "DynamoDBWriteSummaries",
      "Effect": "Allow",
      "Action": [
        "dynamodb:PutItem",
        "dynamodb:GetItem",
        "dynamodb:Query"
      ],
      "Resource": "arn:aws:dynamodb:us-east-1:123456789:table/manga-assist-sessions",
      "Condition": {
        "ForAllValues:StringLike": {
          "dynamodb:LeadingKeys": ["SESSION#*"]
        }
      }
    },
    {
      "Sid": "KinesisWrite",
      "Effect": "Allow",
      "Action": ["kinesis:PutRecord", "kinesis:PutRecords"],
      "Resource": "arn:aws:kinesis:us-east-1:123456789:stream/manga-assist-events"
    },
    {
      "Sid": "SNSPublish",
      "Effect": "Allow",
      "Action": ["sns:Publish"],
      "Resource": "arn:aws:sns:us-east-1:123456789:manga-assist-handoffs"
    }
  ]
}

3. Encryption Architecture

flowchart LR
    subgraph AtRest["Encryption at Rest"]
        DDB_ENC["DynamoDB<br/>SSE with AWS-managed KMS key<br/>(aws/dynamodb)"]
        S3_ENC["S3 Backups<br/>SSE-S3 (AES-256)<br/>or SSE-KMS for compliance"]
        REDIS_ENC["ElastiCache Redis<br/>at-rest encryption enabled"]
        DAX_ENC["DAX<br/>at-rest encryption enabled"]
    end

    subgraph InTransit["Encryption in Transit"]
        DDB_TLS["DynamoDB<br/>HTTPS/TLS only<br/>(enforced by VPC endpoint policy)"]
        DAX_TLS["DAX<br/>TLS encryption enabled<br/>(ClusterEndpointEncryptionType: TLS)"]
        REDIS_TLS["Redis<br/>in-transit encryption = true<br/>AUTH token required"]
        API_TLS["API Gateway<br/>TLS 1.2 minimum"]
    end

    subgraph Keys["Key Management"]
        KMS["AWS KMS"]
        CMK["Customer-managed key:<br/>manga-assist-cmk<br/>(for audit + compliance)"]
    end

    DDB_ENC --> KMS
    S3_ENC --> KMS
    CMK --> KMS

VPC Endpoint Policy — Restrict DynamoDB Access to VPC

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "AllowOnlyFromVPC",
      "Effect": "Allow",
      "Principal": "*",
      "Action": [
        "dynamodb:GetItem",
        "dynamodb:Query",
        "dynamodb:PutItem",
        "dynamodb:UpdateItem",
        "dynamodb:DeleteItem",
        "dynamodb:BatchWriteItem"
      ],
      "Resource": "arn:aws:dynamodb:us-east-1:123456789:table/manga-assist-*",
      "Condition": {
        "StringEquals": {
          "aws:SourceVpc": "vpc-0abc1234def56789"
        }
      }
    },
    {
      "Sid": "DenyNonTLS",
      "Effect": "Deny",
      "Principal": "*",
      "Action": "dynamodb:*",
      "Resource": "*",
      "Condition": {
        "Bool": {
          "aws:SecureTransport": "false"
        }
      }
    }
  ]
}

4. X-Ray Distributed Tracing — Full Request Flow

flowchart LR
    subgraph Trace["X-Ray Trace: User Message → AI Response"]
        direction TB
        SEG1["Segment: API Gateway<br/>Duration: 2ms<br/>Status: 200"]
        SEG2["Segment: Lambda (message-handler)<br/>Duration: 920ms<br/>Cold Start: No"]
        SUB1["  └─ Subsegment: DynamoDB Query (context)<br/>     Duration: 8ms<br/>     Table: manga-assist-sessions"]
        SUB2["  └─ Subsegment: DynamoDB Query (summary)<br/>     Duration: 5ms<br/>     Table: manga-assist-sessions"]
        SUB3["  └─ Subsegment: DynamoDB PutItem (user turn)<br/>     Duration: 6ms<br/>     Table: manga-assist-sessions"]
        SUB4["  └─ Subsegment: Bedrock InvokeModel<br/>     Duration: 850ms ⚠️<br/>     Model: claude-3-sonnet"]
        SUB5["  └─ Subsegment: DynamoDB PutItem (assistant turn)<br/>     Duration: 7ms<br/>     Table: manga-assist-sessions"]
        SUB6["  └─ Subsegment: DynamoDB UpdateItem (META)<br/>     Duration: 4ms<br/>     Table: manga-assist-sessions"]

        SEG1 --> SEG2
        SEG2 --> SUB1
        SEG2 --> SUB2
        SEG2 --> SUB3
        SEG2 --> SUB4
        SEG2 --> SUB5
        SEG2 --> SUB6
    end

X-Ray Instrumentation Code

"""
X-Ray instrumentation for Lambda + DynamoDB.
Traces every DynamoDB call and Bedrock call automatically.
"""

# ── Enable X-Ray in Lambda ──
# Method 1: Patch boto3 globally (recommended)
from aws_xray_sdk.core import xray_recorder, patch_all

# Patches boto3, requests, and other libraries automatically
patch_all()

# ── OR Method 2: Patch only specific libraries ──
from aws_xray_sdk.core import patch
patch(["boto3", "botocore"])

import boto3
import os
import json

dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ["SESSION_TABLE"])
bedrock = boto3.client("bedrock-runtime")


def handler(event, context):
    session_id = event.get("session_id")

    # ── Custom subsegment for business logic ──
    with xray_recorder.in_subsegment("LoadContext") as subsegment:
        subsegment.put_annotation("session_id", session_id)
        subsegment.put_metadata("event", event, "request")

        # DynamoDB calls within this block are auto-traced
        meta = table.get_item(Key={
            "PK": f"SESSION#{session_id}",
            "SK": "META",
        }).get("Item", {})

        turns = table.query(
            KeyConditionExpression="PK = :pk AND begins_with(SK, :prefix)",
            ExpressionAttributeValues={
                ":pk": f"SESSION#{session_id}",
                ":prefix": "TURN#",
            },
            ScanIndexForward=False,
            Limit=20,
        ).get("Items", [])

        subsegment.put_annotation("turn_count", len(turns))

    # ── Custom subsegment for LLM call ──
    with xray_recorder.in_subsegment("BedrockInvoke") as subsegment:
        subsegment.put_annotation("model", "claude-3-sonnet")

        response = bedrock.invoke_model(
            modelId="anthropic.claude-3-sonnet-20240229-v1:0",
            contentType="application/json",
            body=json.dumps({
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": 1024,
                "messages": [{"role": "user", "content": "Hello"}],
            }),
        )

        subsegment.put_annotation("response_status", "success")

    # ── Custom subsegment for DynamoDB writes ──
    with xray_recorder.in_subsegment("WriteTurns") as subsegment:
        # PutItem calls here are auto-traced
        table.put_item(Item={
            "PK": f"SESSION#{session_id}",
            "SK": "TURN#12345",
            "role": "assistant",
            "content_compressed": b"response...",
        })

        subsegment.put_annotation("write_count", 1)

    return {"statusCode": 200}

5. CloudWatch Monitoring — DynamoDB Dashboard

flowchart TB
    subgraph Metrics["Key CloudWatch Metrics"]
        M1["SuccessfulRequestLatency (P50, P99)<br/>🟢 P99 < 10ms: Healthy<br/>🟡 P99 10-25ms: Watch<br/>🔴 P99 > 25ms: Alert"]
        M2["ConsumedReadCapacityUnits<br/>ConsumedWriteCapacityUnits<br/>📊 Track against provisioned/burst"]
        M3["ThrottledRequests<br/>🔴 Any > 0: Alarm immediately"]
        M4["SystemErrors (5xx)<br/>🔴 Any > 0: Alarm immediately"]
        M5["UserErrors (4xx)<br/>🟡 Includes ConditionalCheckFailed<br/>Filter before alarming"]
        M6["AccountProvisionedReadCapacityUtilization<br/>AccountProvisionedWriteCapacityUtilization<br/>🟡 > 80%: Plan capacity increase"]
    end

    subgraph Alarms["CloudWatch Alarms"]
        A1["CRITICAL: ThrottledRequests > 0<br/>for 1 minute"]
        A2["CRITICAL: SystemErrors > 0<br/>for 1 minute"]
        A3["WARNING: P99 Latency > 25ms<br/>for 5 minutes"]
        A4["WARNING: ConsumedWCU > 80%<br/>of provisioned for 10 minutes"]
        A5["INFO: IteratorAge > 60s<br/>(Streams processing lag)"]
    end

    subgraph Actions["Alarm Actions"]
        SNS_OPS["SNS → PagerDuty/Slack"]
        AUTO_SCALE["Trigger auto-scaling"]
        RUNBOOK["Link to runbook"]
    end

    M3 --> A1 --> SNS_OPS
    M4 --> A2 --> SNS_OPS
    M1 --> A3 --> RUNBOOK
    M2 --> A4 --> AUTO_SCALE

CloudWatch Alarms (CloudFormation)

# Critical: Throttled requests alarm
DynamoDBThrottleAlarm:
  Type: AWS::CloudWatch::Alarm
  Properties:
    AlarmName: manga-assist-dynamodb-throttled
    AlarmDescription: "DynamoDB throttling detected  requests being rejected"
    Namespace: AWS/DynamoDB
    MetricName: ThrottledRequests
    Dimensions:
      - Name: TableName
        Value: manga-assist-sessions
    Statistic: Sum
    Period: 60
    EvaluationPeriods: 1
    Threshold: 0
    ComparisonOperator: GreaterThanThreshold
    TreatMissingData: notBreaching
    AlarmActions:
      - !Ref OpsNotificationTopic

# Critical: System errors
DynamoDBSystemErrorAlarm:
  Type: AWS::CloudWatch::Alarm
  Properties:
    AlarmName: manga-assist-dynamodb-system-errors
    AlarmDescription: "DynamoDB 5xx errors  AWS-side issue"
    Namespace: AWS/DynamoDB
    MetricName: SystemErrors
    Dimensions:
      - Name: TableName
        Value: manga-assist-sessions
    Statistic: Sum
    Period: 60
    EvaluationPeriods: 1
    Threshold: 0
    ComparisonOperator: GreaterThanThreshold
    AlarmActions:
      - !Ref OpsNotificationTopic

# Warning: High P99 latency
DynamoDBLatencyAlarm:
  Type: AWS::CloudWatch::Alarm
  Properties:
    AlarmName: manga-assist-dynamodb-p99-latency
    AlarmDescription: "DynamoDB P99 latency > 25ms"
    Namespace: AWS/DynamoDB
    MetricName: SuccessfulRequestLatency
    Dimensions:
      - Name: TableName
        Value: manga-assist-sessions
      - Name: Operation
        Value: Query
    ExtendedStatistic: p99
    Period: 300
    EvaluationPeriods: 3
    Threshold: 25
    ComparisonOperator: GreaterThanThreshold
    AlarmActions:
      - !Ref OpsNotificationTopic

# Warning: Stream processing lag
StreamIteratorAgeAlarm:
  Type: AWS::CloudWatch::Alarm
  Properties:
    AlarmName: manga-assist-stream-iterator-age
    AlarmDescription: "DynamoDB Stream consumer falling behind"
    Namespace: AWS/DynamoDB
    MetricName: GetRecords.IteratorAgeMilliseconds
    Dimensions:
      - Name: TableName
        Value: manga-assist-sessions
    Statistic: Maximum
    Period: 60
    EvaluationPeriods: 5
    Threshold: 60000
    ComparisonOperator: GreaterThanThreshold
    AlarmActions:
      - !Ref OpsNotificationTopic

6. CloudTrail — API-Level Audit Trail

flowchart LR
    subgraph API_Calls["DynamoDB API Calls"]
        MGMT["Management Events<br/>(auto-logged)<br/>- CreateTable<br/>- DeleteTable<br/>- UpdateTable<br/>- UpdateTimeToLive"]
        DATA["Data Events<br/>(must enable explicitly)<br/>- GetItem<br/>- PutItem<br/>- Query<br/>- DeleteItem"]
    end

    subgraph CloudTrail
        CT["CloudTrail Trail"]
    end

    subgraph Storage
        S3_CT["S3: CloudTrail Logs<br/>(encrypted, immutable)"]
        CW_CT["CloudWatch Logs<br/>(for real-time alerting)"]
    end

    MGMT --> CT
    DATA --> CT
    CT --> S3_CT
    CT --> CW_CT

    subgraph Security_Alerts["Security Monitoring"]
        ALERT1["Alert: DeleteTable called<br/>→ Page on-call immediately"]
        ALERT2["Alert: Scan operation detected<br/>→ Investigate (should never happen)"]
        ALERT3["Alert: AccessDenied errors<br/>→ Check for credential leaks"]
    end

    CW_CT --> ALERT1
    CW_CT --> ALERT2
    CW_CT --> ALERT3

CloudTrail Metric Filter for Dangerous Operations

"""
CloudWatch Logs metric filter to detect dangerous DynamoDB operations.
Deploy via CloudFormation or create manually.
"""

# Pattern to detect Scan operations (should never happen in production)
SCAN_FILTER_PATTERN = (
    '{ ($.eventSource = "dynamodb.amazonaws.com") '
    '&& ($.eventName = "Scan") '
    '&& ($.requestParameters.tableName = "manga-assist-sessions") }'
)

# Pattern to detect table modifications
TABLE_MODIFY_PATTERN = (
    '{ ($.eventSource = "dynamodb.amazonaws.com") '
    '&& ($.eventName = "DeleteTable" || $.eventName = "UpdateTable" '
    '|| $.eventName = "CreateBackup" || $.eventName = "RestoreTableFromBackup") }'
)

# Pattern to detect access denied (potential credential issue)
ACCESS_DENIED_PATTERN = (
    '{ ($.eventSource = "dynamodb.amazonaws.com") '
    '&& ($.errorCode = "AccessDeniedException" '
    '|| $.errorCode = "UnauthorizedAccess") }'
)

7. End-to-End Request Observability

flowchart TB
    subgraph Request_Flow["Full Request Observability Stack"]
        direction LR

        subgraph L1["Layer 1: API Gateway"]
            ACCESS_LOG["Access Logs<br/>- Request ID<br/>- Client IP<br/>- Latency<br/>- Status code"]
        end

        subgraph L2["Layer 2: Lambda"]
            STRUCTURED_LOG["Structured Logs (JSON)<br/>- Request ID (correlated)<br/>- Session ID<br/>- Function name<br/>- Duration per step"]
            XRAY_TRACE["X-Ray Trace<br/>- Full service map<br/>- Subsegment timings<br/>- Annotations for search"]
        end

        subgraph L3["Layer 3: DynamoDB"]
            DDB_METRICS["CloudWatch Metrics<br/>- Per-operation latency<br/>- Consumed capacity<br/>- Throttle events"]
            DDB_CONTRIB["Contributor Insights<br/>- Most accessed keys<br/>- Hottest partitions"]
        end

        subgraph L4["Layer 4: Downstream"]
            BEDROCK_LOG["Bedrock Model Logs<br/>- Token counts<br/>- Input/output logged to S3"]
            KINESIS_METRICS["Kinesis Metrics<br/>- Iterator age<br/>- Put success rate"]
        end

        L1 --> L2 --> L3 --> L4
    end

Structured Logging Pattern for Lambda

"""
Structured JSON logging for Lambda + DynamoDB operations.
Enables filtering, searching, and correlating logs across services.
"""

import json
import logging
import os
import time
import uuid
from functools import wraps

# ── Structured Logger Setup ──
logger = logging.getLogger()
logger.setLevel(logging.INFO)


class StructuredMessage:
    """Format log messages as JSON for CloudWatch Logs Insights queries."""

    def __init__(self, **kwargs):
        self.kwargs = kwargs

    def __str__(self):
        return json.dumps(self.kwargs, default=str)


def log_info(**kwargs):
    logger.info(StructuredMessage(**kwargs))


def log_error(**kwargs):
    logger.error(StructuredMessage(**kwargs))


# ── DynamoDB Operation Logger ──
def log_dynamodb_operation(operation: str):
    """Decorator that logs DynamoDB operation timing and outcome."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            start = time.time()
            try:
                result = func(*args, **kwargs)
                duration_ms = (time.time() - start) * 1000
                log_info(
                    event="dynamodb_operation",
                    operation=operation,
                    duration_ms=round(duration_ms, 2),
                    status="success",
                    table=os.environ.get("SESSION_TABLE"),
                )
                return result
            except Exception as e:
                duration_ms = (time.time() - start) * 1000
                log_error(
                    event="dynamodb_operation",
                    operation=operation,
                    duration_ms=round(duration_ms, 2),
                    status="error",
                    error_type=type(e).__name__,
                    error_message=str(e),
                    table=os.environ.get("SESSION_TABLE"),
                )
                raise
        return wrapper
    return decorator


# ── Usage Example ──
import boto3

dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ.get("SESSION_TABLE", "manga-assist-sessions"))


@log_dynamodb_operation("Query:LoadTurns")
def load_recent_turns(session_id: str, limit: int = 20):
    return table.query(
        KeyConditionExpression="PK = :pk AND begins_with(SK, :prefix)",
        ExpressionAttributeValues={
            ":pk": f"SESSION#{session_id}",
            ":prefix": "TURN#",
        },
        ScanIndexForward=False,
        Limit=limit,
    )


@log_dynamodb_operation("PutItem:WriteTurn")
def write_turn(session_id: str, turn_data: dict):
    return table.put_item(Item={
        "PK": f"SESSION#{session_id}",
        **turn_data,
    })

CloudWatch Logs Insights Queries

-- Find slowest DynamoDB operations in the last hour
fields @timestamp, operation, duration_ms, status, error_type
| filter event = "dynamodb_operation"
| sort duration_ms desc
| limit 20

-- Count DynamoDB errors by type
fields @timestamp, operation, error_type
| filter event = "dynamodb_operation" and status = "error"
| stats count(*) as error_count by error_type, operation
| sort error_count desc

-- P50/P95/P99 latency by operation
fields @timestamp, operation, duration_ms
| filter event = "dynamodb_operation" and status = "success"
| stats avg(duration_ms) as avg_ms,
        pct(duration_ms, 50) as p50_ms,
        pct(duration_ms, 95) as p95_ms,
        pct(duration_ms, 99) as p99_ms
  by operation

-- Find sessions with the most turns (potential abuse)
fields @timestamp, session_id, turn_count
| filter event = "dynamodb_operation" and operation = "Query:LoadTurns"
| stats max(turn_count) as max_turns by session_id
| sort max_turns desc
| limit 10

8. DynamoDB Contributor Insights — Hot Key Detection

flowchart TD
    subgraph Normal["Normal Traffic Distribution"]
        N1["SESSION#aaa: 50 req/s"]
        N2["SESSION#bbb: 48 req/s"]
        N3["SESSION#ccc: 52 req/s"]
        N4["SESSION#ddd: 45 req/s"]
        N_LABEL["Evenly distributed ✅"]
    end

    subgraph Hot["Hot Key Detected! 🔴"]
        H1["SESSION#viral-session: 5000 req/s ⚠️"]
        H2["SESSION#bbb: 48 req/s"]
        H3["SESSION#ccc: 52 req/s"]
        H_LABEL["One key consuming 90% of capacity"]
    end

    subgraph Detection["Contributor Insights"]
        CI["CloudWatch Metric:<br/>DynamoDBContributorInsights<br/>Most accessed partition keys"]
    end

    subgraph Response["Automated Response"]
        R1["1. Alarm fires on hot key"]
        R2["2. Lambda checks if session is legitimate"]
        R3["3a. Legitimate: Enable DAX for that session"]
        R4["3b. Abuse: Rate-limit via Redis + block"]
    end

    Hot --> Detection --> Response

9. Compliance and Data Protection Patterns

flowchart TD
    subgraph GDPR_CCPA["GDPR / CCPA Compliance"]
        direction TB
        RIGHT_ACCESS["Right to Access<br/>→ Query GSI by customer_id<br/>→ Return all session data"]
        RIGHT_DELETE["Right to Deletion<br/>→ Query all items by customer_id<br/>→ Delete each item individually<br/>→ Log deletion in audit trail<br/>→ Purge from S3 backups too"]
        RIGHT_PORT["Right to Portability<br/>→ Export customer data to JSON<br/>→ Deliver via signed S3 URL"]
    end

    subgraph Implementation
        DEL_LAMBDA["Deletion Lambda<br/>Triggered by support ticket"]
        DEL_LAMBDA -->|"1. Query GSI"| DDB[(DynamoDB)]
        DEL_LAMBDA -->|"2. Delete items"| DDB
        DEL_LAMBDA -->|"3. Audit log"| CT[CloudTrail]
        DEL_LAMBDA -->|"4. Purge backups"| S3[S3]
        DEL_LAMBDA -->|"5. Confirm"| SNS[SNS → Customer]
    end

GDPR Deletion Code

"""
GDPR/CCPA compliant data deletion for a specific customer.
Deletes from DynamoDB AND S3 backups.
"""

import json
import os
import time
import boto3

dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ["SESSION_TABLE"])
s3 = boto3.client("s3")
audit_log = boto3.client("logs")

BACKUP_BUCKET = os.environ["BACKUP_BUCKET"]
AUDIT_LOG_GROUP = os.environ["AUDIT_LOG_GROUP"]


def delete_customer_data(customer_id: str, request_id: str) -> dict:
    """
    Complete customer data deletion for GDPR Article 17 compliance.

    IMPORTANT: TTL is NOT sufficient for compliance deletion.
    TTL deletes are eventually consistent (up to 48hr delay).
    Explicit DeleteItem is required.
    """
    deleted_sessions = []
    total_items_deleted = 0

    # ── Step 1: Find all sessions for this customer via GSI ──
    sessions = _query_customer_sessions(customer_id)

    for session in sessions:
        session_id = session["session_id"]
        pk = f"SESSION#{session_id}"

        # ── Step 2: Delete all items in each session ──
        items_deleted = _delete_all_session_items(pk)
        total_items_deleted += items_deleted
        deleted_sessions.append(session_id)

    # ── Step 3: Log the deletion to audit trail ──
    _write_audit_log(
        action="GDPR_DELETION",
        customer_id=customer_id,
        request_id=request_id,
        sessions_deleted=deleted_sessions,
        items_deleted=total_items_deleted,
    )

    # ── Step 4: Mark S3 backup objects for deletion ──
    # Note: S3 Glacier objects may need special handling
    _purge_s3_backups(customer_id, deleted_sessions)

    return {
        "status": "deleted",
        "customer_id": customer_id,
        "sessions_deleted": len(deleted_sessions),
        "items_deleted": total_items_deleted,
        "audit_request_id": request_id,
    }


def _query_customer_sessions(customer_id: str) -> list:
    """Find all sessions via GSI."""
    items = []
    params = {
        "IndexName": "GSI1-customer-sessions",
        "KeyConditionExpression": "customer_id = :cid",
        "ExpressionAttributeValues": {":cid": customer_id},
    }
    while True:
        resp = table.query(**params)
        items.extend(resp.get("Items", []))
        if "LastEvaluatedKey" not in resp:
            break
        params["ExclusiveStartKey"] = resp["LastEvaluatedKey"]
    return items


def _delete_all_session_items(pk: str) -> int:
    """Delete every item (META, TURN, SUMMARY, HANDOFF) for a session."""
    count = 0
    params = {
        "KeyConditionExpression": "PK = :pk",
        "ExpressionAttributeValues": {":pk": pk},
        "ProjectionExpression": "PK, SK",  # Only need keys for delete
    }
    while True:
        resp = table.query(**params)
        items = resp.get("Items", [])

        # Delete in batches of 25
        for i in range(0, len(items), 25):
            batch = items[i:i + 25]
            with table.batch_writer() as writer:
                for item in batch:
                    writer.delete_item(Key={"PK": item["PK"], "SK": item["SK"]})
            count += len(batch)

        if "LastEvaluatedKey" not in resp:
            break
        params["ExclusiveStartKey"] = resp["LastEvaluatedKey"]

    return count


def _write_audit_log(action: str, **details):
    """Write immutable audit record."""
    log_entry = {
        "timestamp": int(time.time() * 1000),
        "action": action,
        **details,
    }
    audit_log.put_log_events(
        logGroupName=AUDIT_LOG_GROUP,
        logStreamName=f"gdpr-deletions-{time.strftime('%Y-%m-%d')}",
        logEvents=[{
            "timestamp": log_entry["timestamp"],
            "message": json.dumps(log_entry, default=str),
        }],
    )


def _purge_s3_backups(customer_id: str, session_ids: list):
    """Delete customer data from S3 backup archives."""
    for session_id in session_ids:
        prefix = f"sessions/{session_id}/"
        paginator = s3.get_paginator("list_objects_v2")
        for page in paginator.paginate(Bucket=BACKUP_BUCKET, Prefix=prefix):
            objects = page.get("Contents", [])
            if objects:
                s3.delete_objects(
                    Bucket=BACKUP_BUCKET,
                    Delete={"Objects": [{"Key": obj["Key"]} for obj in objects]},
                )

10. The Complete Service Interaction Map

graph TB
    DDB[(DynamoDB<br/>manga-assist-sessions)]

    %% Direct integrations
    LAMBDA["Lambda<br/>(6 functions)"] -->|"Query, PutItem,<br/>UpdateItem, DeleteItem"| DDB
    STEPFN["Step Functions"] -->|"GetItem, UpdateItem<br/>(direct SDK)"| DDB
    DAX["DAX"] -->|"Read/Write cache"| DDB
    STREAMS["DynamoDB Streams"] -->|"CDC events"| DDB

    %% Stream consumers
    STREAMS --> LAMBDA_STREAM["Stream Router Lambda"]
    LAMBDA_STREAM --> KINESIS["Kinesis Data Streams"]
    LAMBDA_STREAM --> SNS["SNS"]
    LAMBDA_STREAM --> EVENTBRIDGE["EventBridge"]

    %% Kinesis pipeline
    KINESIS --> FIREHOSE["Kinesis Firehose"]
    FIREHOSE --> S3["S3<br/>(Analytics + Backup)"]
    S3 --> ATHENA["Athena"]
    S3 --> GLUE["Glue ETL"]

    %% Caching
    LAMBDA --> DAX
    LAMBDA --> REDIS["ElastiCache Redis"]

    %% Async
    LAMBDA -.->|"Failed writes"| SQS["SQS DLQ"]
    SQS --> LAMBDA_RETRY["Retry Lambda"]
    LAMBDA_RETRY --> DDB

    %% Observability
    DDB --> CW["CloudWatch"]
    DDB --> XRAY["X-Ray"]
    DDB --> CT["CloudTrail"]
    DDB --> CI["Contributor Insights"]

    %% Security
    DDB --> KMS["KMS<br/>(encryption)"]
    IAM["IAM Roles"] --> DDB
    VPC_EP["VPC Endpoint"] --> DDB

    %% Backup
    DDB --> PITR["PITR<br/>(continuous backup)"]
    DDB -->|"Export"| S3

    %% External
    LAMBDA --> BEDROCK["Amazon Bedrock"]
    LAMBDA --> APIGW["API Gateway"]
    EVENTBRIDGE --> TARGETS["EventBridge Targets<br/>(cleanup, metrics, alerts)"]
    SNS --> SQS_AGENTS["SQS Agent Queues"]

    style DDB fill:#f9a825,stroke:#f57f17,color:#000

11. Common Mistakes Teams Make

Mistake Why It Happens Consequence Fix
Single IAM role for all Lambdas "Easier to manage" One compromised function can delete the table One role per function with least-privilege
Not enabling CloudTrail data events "Management events are enough" Can't audit who read/wrote specific items Enable data events for compliance
Logging PII in CloudWatch Dumping full DynamoDB responses to logs Customer data in logs = compliance violation Log metadata only, redact content
No VPC endpoint for DynamoDB Lambda calls DynamoDB over public internet Higher latency, data traverses public network Create Gateway VPC endpoint (free)
Alarming on ALL UserErrors Including ConditionalCheckFailedException Constant false alarms from idempotent writes Filter metric to exclude conditional failures
Not using Contributor Insights Don't know it exists Hot key detected only after major outage Enable Contributor Insights ($0.02/metric/hour)
X-Ray not enabled on Lambda "We have CloudWatch logs" Can't trace latency across DynamoDB + Bedrock Enable active tracing + patch boto3
TTL as compliance deletion "TTL deletes the data" TTL has up to 48hr lag — audit/compliance fail Use explicit DeleteItem + audit log

12. Critical Things to Remember

For Interviews

  1. One IAM role per Lambda function — This is the #1 security principle. The message handler should NEVER have Scan or DeleteTable permissions.

  2. dynamodb:LeadingKeys condition — Restricts which partition keys a role can access. This is how you enforce session-level access control.

  3. DynamoDB VPC Endpoint is a Gateway Endpoint — It's free, no per-hour charge. There's no reason NOT to use it. It keeps traffic off the public internet.

  4. CloudTrail data events cost extra — $0.10 per 100K events. But for compliance, you MUST have an audit trail of who accessed what data.

  5. X-Ray + DynamoDB gives you subsegment-level timing — You can see exactly how long each GetItem/Query/PutItem took within a single Lambda invocation.

For Production

  1. Encrypt everything at rest AND in transit — DynamoDB SSE-KMS by default, TLS for DAX and Redis, enforce aws:SecureTransport in policies.

  2. Contributor Insights catches hot keys before they cause outages — Enable it on the table and all GSIs. $0.02/metric/hour is cheap insurance.

  3. Structured JSON logs + CloudWatch Logs Insights = powerful debugging — Don't use print statements. Use structured JSON. Query with Logs Insights.

  4. Run quarterly incident simulations — Delete a session, restore it from PITR. Trace a request end-to-end in X-Ray. Verify CloudTrail captured it.

  5. GDPR deletion is an explicit, audited process — TTL is for storage optimization. Compliance deletion requires explicit DeleteItem + audit log + S3 purge.