LOCAL PREVIEW View on GitHub

DynamoDB + S3 + Kinesis — Data Pipeline, Backup & Analytics LLD

Project: MangaAssist (Amazon-style chatbot) Scope: How conversation data flows from DynamoDB to S3 for archival, through Kinesis for real-time analytics, backup/restore strategies, and data lifecycle management.


1. Complete Data Pipeline Architecture

flowchart TB
    subgraph Sources["Data Sources"]
        DDB[(DynamoDB<br/>manga-assist-sessions)]
        STREAMS[DynamoDB Streams]
    end

    subgraph RealTime["Real-Time Pipeline"]
        KDS[Kinesis Data Streams<br/>manga-assist-events]
        KDA[Kinesis Data Analytics<br/>Anomaly Detection]
        KDF[Kinesis Data Firehose<br/>Delivery Stream]
    end

    subgraph BatchPipeline["Batch Pipeline"]
        EXPORT[DynamoDB Export to S3<br/>Point-in-Time Export]
        PITR[Point-in-Time Recovery<br/>Continuous Backups]
        BACKUP[On-Demand Backup<br/>Scheduled via EventBridge]
    end

    subgraph Storage["S3 Storage Tiers"]
        S3_RAW[S3: Raw Events<br/>s3://manga-assist-data/raw/]
        S3_PROCESSED[S3: Processed<br/>s3://manga-assist-data/processed/]
        S3_ARCHIVE[S3: Archive<br/>s3://manga-assist-data/archive/]
        S3_BACKUP[S3: Table Exports<br/>s3://manga-assist-backups/]
    end

    subgraph Analytics["Analytics & Query"]
        ATHENA[Amazon Athena<br/>SQL over S3]
        GLUE[AWS Glue<br/>ETL + Catalog]
        QUICKSIGHT[QuickSight<br/>Dashboards]
        OPENSEARCH[OpenSearch<br/>Search + Visualization]
    end

    DDB --> STREAMS
    STREAMS -->|Stream Router Lambda| KDS
    KDS --> KDA
    KDS --> KDF
    KDF --> S3_RAW

    DDB -->|Scheduled export| EXPORT
    EXPORT --> S3_BACKUP
    DDB -->|Continuous| PITR

    S3_RAW -->|Glue ETL job| S3_PROCESSED
    S3_PROCESSED --> ATHENA
    ATHENA --> QUICKSIGHT

    S3_PROCESSED -->|After 90 days| S3_ARCHIVE

    GLUE -->|Catalog| ATHENA
    KDF -->|Direct delivery| OPENSEARCH

    DDB -->|On-demand| BACKUP

2. Real-Time Pipeline — DynamoDB → Kinesis → S3

sequenceDiagram
    participant U as User Message
    participant D as DynamoDB
    participant ST as DynamoDB Streams
    participant R as Router Lambda
    participant K as Kinesis Data Streams
    participant F as Kinesis Firehose
    participant S3 as S3 Raw Bucket
    participant A as Athena

    U->>D: PutItem (new turn)
    D->>ST: INSERT event (NEW_AND_OLD_IMAGES)
    ST->>R: Batch of stream records

    R->>K: PutRecord (structured event)
    Note over K: Shard: partitioned by session_id<br/>Retention: 24 hours

    K->>F: Consumer reads records
    Note over F: Buffer: 5MB or 300 seconds<br/>Compression: GZIP<br/>Format: Parquet

    F->>S3: Deliver batch file
    Note over S3: Path: raw/year=2026/month=03/day=25/hour=14/<br/>Hive-style partitioning

    Note over A: Query at any time:<br/>SELECT * FROM manga_events<br/>WHERE year='2026' AND month='03'

Kinesis Producer (Stream Router)

"""
Lambda: manga-assist-kinesis-producer
Purpose: Transform DynamoDB stream records into structured analytics events
         and publish to Kinesis Data Streams.
"""

import json
import os
import time
import hashlib
import boto3

kinesis = boto3.client("kinesis")
STREAM_NAME = os.environ["KINESIS_STREAM_NAME"]


def handler(event, context):
    """Process DynamoDB stream records and forward to Kinesis."""
    kinesis_records = []

    for record in event["Records"]:
        analytics_event = _transform_to_analytics_event(record)
        if analytics_event:
            kinesis_records.append({
                "Data": json.dumps(analytics_event) + "\n",
                # Partition by session_id → same session always goes to same shard
                # This preserves ordering within a session
                "PartitionKey": analytics_event.get("session_id", "unknown"),
            })

    # Kinesis PutRecords: up to 500 records or 5MB per call
    if kinesis_records:
        for batch_start in range(0, len(kinesis_records), 500):
            batch = kinesis_records[batch_start:batch_start + 500]
            response = kinesis.put_records(
                StreamName=STREAM_NAME,
                Records=batch,
            )

            # Check for failed records
            if response.get("FailedRecordCount", 0) > 0:
                _handle_failed_records(response, batch)

    return {"statusCode": 200, "batchItemFailures": []}


def _transform_to_analytics_event(record: dict) -> dict | None:
    """Transform a DynamoDB stream record into a flat analytics event."""
    event_name = record["eventName"]
    new_image = record["dynamodb"].get("NewImage", {})
    old_image = record["dynamodb"].get("OldImage", {})
    keys = record["dynamodb"]["Keys"]

    pk = keys["PK"]["S"]
    sk = keys["SK"]["S"]
    session_id = pk.replace("SESSION#", "")

    # Base event structure
    event = {
        "event_id": record["eventID"],
        "event_type": event_name,
        "session_id": session_id,
        "sort_key": sk,
        "timestamp": record["dynamodb"].get("ApproximateCreationDateTime", int(time.time())),
        "size_bytes": record["dynamodb"].get("SizeBytes", 0),
    }

    # Enrich based on item type
    if sk.startswith("TURN#"):
        event["item_type"] = "turn"
        event["role"] = _extract_string(new_image, "role")
        event["token_count"] = _extract_number(new_image, "token_count")
        event["response_id"] = _extract_string(new_image, "response_id")

    elif sk == "META":
        event["item_type"] = "meta"
        event["status"] = _extract_string(new_image, "status")
        event["old_status"] = _extract_string(old_image, "status")
        event["customer_id"] = _extract_string(new_image, "customer_id")
        event["turn_count"] = _extract_number(new_image, "turn_count")

    elif sk.startswith("SUMMARY#"):
        event["item_type"] = "summary"
        event["summary_number"] = sk.replace("SUMMARY#", "")

    elif sk.startswith("HANDOFF#"):
        event["item_type"] = "handoff"
        event["agent_id"] = _extract_string(new_image, "agent_id")
        event["reason"] = _extract_string(new_image, "reason")

    else:
        return None  # Unknown item type, skip

    return event


def _extract_string(image: dict, field: str) -> str:
    return image.get(field, {}).get("S", "")


def _extract_number(image: dict, field: str) -> float:
    val = image.get(field, {}).get("N", "0")
    return float(val)


def _handle_failed_records(response: dict, batch: list):
    """Log failed records for debugging. In production, send to DLQ."""
    for i, result in enumerate(response["Records"]):
        if "ErrorCode" in result:
            print(f"Failed record {i}: {result['ErrorCode']} - {result.get('ErrorMessage')}")

Kinesis Firehose Delivery Configuration (CloudFormation)

KinesisFirehoseDelivery:
  Type: AWS::KinesisFirehose::DeliveryStream
  Properties:
    DeliveryStreamName: manga-assist-analytics-delivery
    DeliveryStreamType: KinesisStreamAsSource
    KinesisStreamSourceConfiguration:
      KinesisStreamARN: !GetAtt KinesisDataStream.Arn
      RoleARN: !GetAtt FirehoseRole.Arn

    ExtendedS3DestinationConfiguration:
      BucketARN: !GetAtt AnalyticsBucket.Arn
      RoleARN: !GetAtt FirehoseRole.Arn

      # Buffering — batch before writing to S3
      BufferingHints:
        SizeInMBs: 64        # Flush every 64MB
        IntervalInSeconds: 300  # Or every 5 minutes

      # Compression — saves S3 storage cost
      CompressionFormat: GZIP

      # Hive-style partitioning for Athena
      Prefix: "raw/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/"
      ErrorOutputPrefix: "errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/!{firehose:error-output-type}/"

      # Convert to Parquet for efficient Athena queries
      DataFormatConversionConfiguration:
        Enabled: true
        InputFormatConfiguration:
          Deserializer:
            OpenXJsonSerDe: {}
        OutputFormatConfiguration:
          Serializer:
            ParquetSerDe:
              Compression: SNAPPY
        SchemaConfiguration:
          DatabaseName: !Ref GlueDatabase
          TableName: !Ref GlueTable
          RoleARN: !GetAtt FirehoseRole.Arn

3. DynamoDB Export to S3 — Full Table Backup

flowchart LR
    subgraph DynamoDB
        TABLE[(manga-assist-sessions)]
        PITR["Point-in-Time Recovery<br/>(continuous, 35-day window)"]
    end

    subgraph Export_Options["Export Methods"]
        FULL["Full Export<br/>(entire table snapshot)"]
        INCR["Incremental Export<br/>(changes since last export)"]
        ONDEMAND["On-Demand Backup<br/>(DynamoDB-managed)"]
    end

    subgraph S3_Storage["S3 Backup Storage"]
        MANIFEST["manifest-summary.json"]
        DATA["data/*.json.gz<br/>(DynamoDB JSON format)"]
    end

    subgraph Schedule["Scheduling"]
        EB_RULE["EventBridge Rule<br/>cron(0 2 * * ? *)<br/>Daily at 2 AM"]
        LAMBDA["Backup Orchestrator<br/>Lambda"]
    end

    TABLE --> PITR
    TABLE --> FULL
    TABLE --> INCR
    TABLE --> ONDEMAND

    FULL --> S3_Storage
    INCR --> S3_Storage

    EB_RULE --> LAMBDA
    LAMBDA -->|"Start export"| FULL
    LAMBDA -->|"Start export"| INCR

Automated Backup Code

"""
Lambda: manga-assist-backup-orchestrator
Trigger: EventBridge scheduled rule (daily at 2 AM UTC)
Purpose: Export DynamoDB table to S3 for disaster recovery and analytics
"""

import json
import os
import time
from datetime import datetime, timedelta
import boto3

dynamodb = boto3.client("dynamodb")
s3 = boto3.client("s3")
ssm = boto3.client("ssm")

TABLE_ARN = os.environ["TABLE_ARN"]
BACKUP_BUCKET = os.environ["BACKUP_BUCKET"]
SSM_LAST_EXPORT_PARAM = "/manga-assist/last-export-time"


def handler(event, context):
    export_type = event.get("export_type", "incremental")

    if export_type == "full":
        return _full_export()
    elif export_type == "incremental":
        return _incremental_export()
    elif export_type == "on_demand_backup":
        return _on_demand_backup()


def _full_export() -> dict:
    """Export entire table to S3. Runs weekly or on-demand."""
    now = datetime.utcnow()
    prefix = f"exports/full/{now.strftime('%Y/%m/%d/%H%M%S')}/"

    response = dynamodb.export_table_to_point_in_time(
        TableArn=TABLE_ARN,
        S3Bucket=BACKUP_BUCKET,
        S3Prefix=prefix,
        ExportFormat="DYNAMODB_JSON",  # or "ION"
        ExportType="FULL_EXPORT",
        ExportTime=datetime(now.year, now.month, now.day, now.hour),
    )

    export_arn = response["ExportDescription"]["ExportArn"]

    # Store the export time for incremental exports
    _update_last_export_time(now)

    return {
        "status": "export_started",
        "type": "full",
        "export_arn": export_arn,
        "s3_prefix": prefix,
    }


def _incremental_export() -> dict:
    """Export only changes since last export. Runs daily."""
    now = datetime.utcnow()
    last_export = _get_last_export_time()

    if not last_export:
        # No previous export — fall back to full
        return _full_export()

    prefix = f"exports/incremental/{now.strftime('%Y/%m/%d/%H%M%S')}/"

    response = dynamodb.export_table_to_point_in_time(
        TableArn=TABLE_ARN,
        S3Bucket=BACKUP_BUCKET,
        S3Prefix=prefix,
        ExportFormat="DYNAMODB_JSON",
        ExportType="INCREMENTAL_EXPORT",
        IncrementalExportSpecification={
            "ExportFromTime": last_export,
            "ExportToTime": now,
            "ExportViewType": "NEW_AND_OLD_IMAGES",
        },
    )

    _update_last_export_time(now)

    return {
        "status": "export_started",
        "type": "incremental",
        "export_arn": response["ExportDescription"]["ExportArn"],
        "from": str(last_export),
        "to": str(now),
    }


def _on_demand_backup() -> dict:
    """Create a DynamoDB-managed on-demand backup."""
    backup_name = f"manga-assist-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}"

    response = dynamodb.create_backup(
        TableName="manga-assist-sessions",
        BackupName=backup_name,
    )

    return {
        "status": "backup_created",
        "backup_arn": response["BackupDetails"]["BackupArn"],
        "backup_name": backup_name,
    }


def _get_last_export_time() -> datetime | None:
    try:
        resp = ssm.get_parameter(Name=SSM_LAST_EXPORT_PARAM)
        return datetime.fromisoformat(resp["Parameter"]["Value"])
    except ssm.exceptions.ParameterNotFound:
        return None


def _update_last_export_time(dt: datetime):
    ssm.put_parameter(
        Name=SSM_LAST_EXPORT_PARAM,
        Value=dt.isoformat(),
        Type="String",
        Overwrite=True,
    )

4. S3 Data Lifecycle Management

flowchart LR
    subgraph Day_0_to_30["Days 0–30: Hot Data"]
        S3_STD["S3 Standard<br/>Frequent access<br/>Athena queries"]
    end

    subgraph Day_30_to_90["Days 30–90: Warm Data"]
        S3_IA["S3 Infrequent Access<br/>Cheaper storage<br/>Still queryable"]
    end

    subgraph Day_90_to_365["Days 90–365: Cold Data"]
        GLACIER_IR["Glacier Instant Retrieval<br/>Quarterly reports<br/>Retrieval: milliseconds"]
    end

    subgraph Day_365_plus["Days 365+: Archive"]
        GLACIER_DA["Glacier Deep Archive<br/>Compliance retention<br/>Retrieval: 12 hours"]
    end

    subgraph Delete["7 Years: Delete"]
        DEL["Permanent deletion<br/>Retention policy complete"]
    end

    S3_STD -->|"After 30 days"| S3_IA
    S3_IA -->|"After 90 days"| GLACIER_IR
    GLACIER_IR -->|"After 365 days"| GLACIER_DA
    GLACIER_DA -->|"After 7 years"| DEL

S3 Lifecycle Policy

{
  "Rules": [
    {
      "ID": "manga-assist-data-lifecycle",
      "Status": "Enabled",
      "Filter": {
        "Prefix": "raw/"
      },
      "Transitions": [
        {
          "Days": 30,
          "StorageClass": "STANDARD_IA"
        },
        {
          "Days": 90,
          "StorageClass": "GLACIER_IR"
        },
        {
          "Days": 365,
          "StorageClass": "DEEP_ARCHIVE"
        }
      ],
      "Expiration": {
        "Days": 2555
      }
    },
    {
      "ID": "cleanup-incomplete-uploads",
      "Status": "Enabled",
      "Filter": {
        "Prefix": ""
      },
      "AbortIncompleteMultipartUpload": {
        "DaysAfterInitiation": 7
      }
    }
  ]
}

5. Athena Queries Over DynamoDB Export Data

flowchart TD
    subgraph Data_Flow["Data Flow to Athena"]
        DDB[(DynamoDB)] -->|Firehose| S3_RAW[S3: Parquet files]
        DDB -->|Export| S3_EXPORT[S3: DynamoDB JSON]
    end

    subgraph Glue["AWS Glue"]
        CRAWLER["Glue Crawler<br/>(auto-detect schema)"]
        CATALOG["Glue Data Catalog<br/>(table definitions)"]
        ETL["Glue ETL Job<br/>(transform DDB JSON → Parquet)"]
    end

    S3_RAW --> CRAWLER
    S3_EXPORT --> ETL
    ETL --> S3_PROCESSED[S3: Processed Parquet]
    S3_PROCESSED --> CRAWLER
    CRAWLER --> CATALOG

    subgraph Query["Athena Queries"]
        Q1["Daily active sessions"]
        Q2["Average turns per session"]
        Q3["Handoff rate by hour"]
        Q4["Token usage trends"]
        Q5["Customer engagement patterns"]
    end

    CATALOG --> Q1
    CATALOG --> Q2
    CATALOG --> Q3
    CATALOG --> Q4
    CATALOG --> Q5

Athena Table Definition + Example Queries

-- Create Athena table over Firehose-delivered Parquet data
CREATE EXTERNAL TABLE manga_assist_events (
    event_id        STRING,
    event_type      STRING,
    session_id      STRING,
    sort_key        STRING,
    item_type       STRING,
    role            STRING,
    status          STRING,
    old_status      STRING,
    customer_id     STRING,
    turn_count      DOUBLE,
    token_count     DOUBLE,
    agent_id        STRING,
    reason          STRING,
    response_id     STRING,
    size_bytes      BIGINT,
    `timestamp`     BIGINT
)
PARTITIONED BY (
    `year` STRING,
    `month` STRING,
    `day` STRING,
    `hour` STRING
)
STORED AS PARQUET
LOCATION 's3://manga-assist-data/raw/'
TBLPROPERTIES ('parquet.compression' = 'SNAPPY');

-- Load partitions
MSCK REPAIR TABLE manga_assist_events;

-- ─────────────────────────────────────────────
-- Query 1: Daily active sessions (last 7 days)
-- ─────────────────────────────────────────────
SELECT
    year || '-' || month || '-' || day AS date,
    COUNT(DISTINCT session_id) AS active_sessions,
    COUNT(CASE WHEN item_type = 'turn' AND role = 'user' THEN 1 END) AS user_messages,
    COUNT(CASE WHEN item_type = 'handoff' THEN 1 END) AS handoffs
FROM manga_assist_events
WHERE year = '2026' AND month = '03' AND day >= '18'
GROUP BY year, month, day
ORDER BY date DESC;

-- ─────────────────────────────────────────────
-- Query 2: Average conversation length
-- ─────────────────────────────────────────────
SELECT
    ROUND(AVG(turn_count), 1) AS avg_turns,
    ROUND(STDDEV(turn_count), 1) AS stddev_turns,
    MAX(turn_count) AS max_turns,
    APPROX_PERCENTILE(turn_count, 0.50) AS p50_turns,
    APPROX_PERCENTILE(turn_count, 0.95) AS p95_turns
FROM manga_assist_events
WHERE item_type = 'meta'
  AND event_type = 'MODIFY'
  AND status = 'disconnected'
  AND year = '2026' AND month = '03';

-- ─────────────────────────────────────────────
-- Query 3: Handoff rate by hour of day
-- ─────────────────────────────────────────────
SELECT
    hour,
    COUNT(*) AS handoff_count,
    COUNT(*) * 100.0 / SUM(COUNT(*)) OVER () AS pct_of_total
FROM manga_assist_events
WHERE item_type = 'handoff'
  AND year = '2026' AND month = '03'
GROUP BY hour
ORDER BY hour;

-- ─────────────────────────────────────────────
-- Query 4: Token usage and cost estimation
-- ─────────────────────────────────────────────
SELECT
    year || '-' || month || '-' || day AS date,
    SUM(token_count) AS total_tokens,
    ROUND(SUM(token_count) / 1000.0 * 0.003, 2) AS estimated_input_cost_usd,
    COUNT(*) AS total_turns
FROM manga_assist_events
WHERE item_type = 'turn'
  AND year = '2026' AND month = '03'
GROUP BY year, month, day
ORDER BY date DESC;

-- ─────────────────────────────────────────────
-- Query 5: Sessions that escalated to human
-- ─────────────────────────────────────────────
SELECT
    e.session_id,
    e.customer_id,
    e.turn_count AS turns_before_handoff,
    h.agent_id,
    h.reason
FROM manga_assist_events e
JOIN manga_assist_events h
  ON e.session_id = h.session_id
WHERE e.item_type = 'meta' AND e.status = 'handoff'
  AND h.item_type = 'handoff'
  AND e.year = '2026'
ORDER BY e.timestamp DESC
LIMIT 100;

6. Disaster Recovery — Restore Strategies

flowchart TD
    DISASTER{"What happened?"}

    DISASTER -->|"Accidental item delete<br/>(single session)"| PITR_RESTORE["PITR Table Restore<br/>1. Restore table to point before deletion<br/>2. Query restored table for the item<br/>3. Write item back to production table<br/>4. Delete restored table"]

    DISASTER -->|"Accidental table delete"| TABLE_RESTORE["On-Demand Backup Restore<br/>1. Restore from latest backup<br/>2. New table created<br/>3. Update Lambda env vars to new table<br/>4. Replay Kinesis/Stream data for gap"]

    DISASTER -->|"Data corruption<br/>(bad deploy)"| S3_RESTORE["S3 Export Restore<br/>1. Find export before corruption<br/>2. Glue job transforms to PutItem format<br/>3. Lambda batch-writes items back<br/>4. Validate via Athena query"]

    DISASTER -->|"Region outage"| GLOBAL["Global Tables Failover<br/>1. Route53 health check triggers failover<br/>2. Traffic shifts to secondary region<br/>3. Global Tables already replicated<br/>4. RPO ≈ seconds (replication lag)"]

    subgraph Recovery_Times["Recovery Time Comparison"]
        RT1["PITR: 15-60 min<br/>(new table created from continuous backup)"]
        RT2["On-Demand Backup: 10-30 min<br/>(depends on table size)"]
        RT3["S3 Export: 1-4 hours<br/>(transform + batch write)"]
        RT4["Global Tables: seconds<br/>(DNS failover)"]
    end

PITR Restore Code (Single Item Recovery)

"""
Recover a specific session from PITR by restoring to a temporary table
and copying the needed items back.
"""

import boto3
import time
from datetime import datetime

dynamodb = boto3.client("dynamodb")
dynamodb_resource = boto3.resource("dynamodb")

PRODUCTION_TABLE = "manga-assist-sessions"


def recover_session(session_id: str, restore_to_time: datetime) -> dict:
    """
    Restore a single session from PITR.

    Steps:
    1. Restore table to a temporary table at the specified point in time
    2. Query the temporary table for the session
    3. Write the items back to the production table
    4. Delete the temporary table
    """
    temp_table_name = f"manga-assist-recovery-{int(time.time())}"

    # ── Step 1: Restore table ──
    print(f"Restoring table to {restore_to_time}...")
    dynamodb.restore_table_to_point_in_time(
        SourceTableName=PRODUCTION_TABLE,
        TargetTableName=temp_table_name,
        RestoreDateTime=restore_to_time,
    )

    # Wait for restore to complete
    _wait_for_table(temp_table_name)
    print(f"Restore complete: {temp_table_name}")

    # ── Step 2: Query the session from restored table ──
    temp_table = dynamodb_resource.Table(temp_table_name)
    pk = f"SESSION#{session_id}"

    items = []
    params = {
        "KeyConditionExpression": "PK = :pk",
        "ExpressionAttributeValues": {":pk": pk},
    }
    while True:
        resp = temp_table.query(**params)
        items.extend(resp.get("Items", []))
        if "LastEvaluatedKey" not in resp:
            break
        params["ExclusiveStartKey"] = resp["LastEvaluatedKey"]

    if not items:
        print(f"Session {session_id} not found at {restore_to_time}")
        _cleanup_table(temp_table_name)
        return {"status": "not_found", "items_recovered": 0}

    # ── Step 3: Write items back to production ──
    prod_table = dynamodb_resource.Table(PRODUCTION_TABLE)
    with prod_table.batch_writer() as batch:
        for item in items:
            batch.put_item(Item=item)

    print(f"Recovered {len(items)} items for session {session_id}")

    # ── Step 4: Cleanup temporary table ──
    _cleanup_table(temp_table_name)

    return {"status": "recovered", "items_recovered": len(items)}


def _wait_for_table(table_name: str, max_wait: int = 600):
    """Wait for table to become ACTIVE."""
    for _ in range(max_wait // 10):
        resp = dynamodb.describe_table(TableName=table_name)
        status = resp["Table"]["TableStatus"]
        if status == "ACTIVE":
            return
        time.sleep(10)
    raise TimeoutError(f"Table {table_name} did not become ACTIVE")


def _cleanup_table(table_name: str):
    """Delete the temporary restored table."""
    try:
        dynamodb.delete_table(TableName=table_name)
        print(f"Deleted temporary table: {table_name}")
    except Exception as e:
        print(f"Failed to delete {table_name}: {e}")

7. Common Mistakes Teams Make

Mistake Consequence Fix
Not enabling PITR Accidental delete = data loss forever Enable PITR on day one. Cost is ~$0.20/GB/month
Kinesis Firehose buffer too small (1MB, 60s) Too many small S3 files → slow Athena queries Use 64MB or 300s buffers for fewer, larger files
No S3 lifecycle policy Raw data grows forever, storage costs spiral Define lifecycle rules: IA → Glacier → Delete
Exporting DynamoDB JSON to S3 without Parquet conversion Athena scans raw JSON = slow, expensive Convert to Parquet via Firehose or Glue ETL
Not partitioning S3 data by date Every Athena query scans ALL data Use Hive-style partitions: year=.../month=.../day=...
Using DynamoDB Scan for analytics Full table scan costs massive RCU, blocks production reads Export to S3, run analytics via Athena
No monitoring on Kinesis IteratorAge Stream processing falls behind silently Alarm when Kinesis GetRecords.IteratorAgeMilliseconds > 60000
Testing restore only in theory PITR restore process is untested, takes longer than expected in incident Run quarterly restore drills to a temp table

8. Critical Things to Remember

For Interviews

  1. DynamoDB Export to S3 does NOT consume RCU — It reads directly from the backup storage, not the live table. This is the only safe way to get bulk data out.

  2. PITR creates a NEW table — It doesn't restore in-place. You must copy items back to the original table yourself.

  3. Kinesis Data Streams vs Firehose — Streams: custom consumers, sub-second latency, you manage shards. Firehose: managed delivery to S3/OpenSearch/Redshift, buffered (not real-time).

  4. Partition key = shard key in Kinesis — Using session_id as the partition key guarantees ordering within a session across the Kinesis pipeline.

  5. DynamoDB Streams vs Kinesis — Streams: 24hr retention, 2 consumers max, tightly coupled to table. Kinesis: 7-day retention, unlimited consumers, decoupled. Use Streams to feed Kinesis for fan-out.

For Production

  1. Run PITR restore drills quarterly — The worst time to learn the restore process is during an incident.

  2. Hive-style S3 partitioning cuts Athena costs 90%+WHERE year='2026' AND month='03' only scans that month's data.

  3. Firehose Parquet conversion saves 60-80% on Athena — Parquet is columnar + compressed. JSON scans are orders of magnitude more expensive.

  4. Set S3 bucket versioning on backup bucket — Protects against accidental overwrites of backup data.

  5. Incremental exports are the sweet spot — Full exports are expensive for large tables. Daily incremental exports capture only changed items.