LOCAL PREVIEW View on GitHub

CD-07: Database Migration Pipeline# CD-07: Database Migration Pipeline

| CloudFormation rollback may delete new GSI | Data loss if GSI had writes during backfill | Use retain removal policy for GSIs || DAX caches stale items after schema migration | Customers see old schema data | Reboot DAX node after data migration || TTL deletion is best-effort (48-hour SLA) | Stale items visible after TTL expiry | Don't rely on TTL for access control — filter in app || GSI limit: 20 per table | Hard limit — can't exceed | Monitor GSI count in pipeline, alert at 15 || GSI backfill uses provisioned read capacity | Can throttle production reads | Move to on-demand before GSI creation ||---|---|---|| Gotcha | Impact | Mitigation |### DynamoDB Migration Gotchas (Learned from Production)| Schema change frequency | Minimize by using flexible schema (DynamoDB schemaless) | Over-flexible schema makes querying harder — GSIs compensate || Data loss prevention | All tables have PITR enabled + pre-migration snapshot | PITR costs ~$0.20/GB/month (justified for business-critical data) || Migration safety | Staged: synthetic staging → PITR staging (high risk) → production | PITR restore adds 30-60 min to high-risk migration pipeline || Zero downtime | Streams-based backfill for field additions + GSI additions | Partition key changes still require brief maintenance window ||---|---|---|| Concern | Solution | Compromise |### ResolutionPM3 ---|"Unrealistic"| AR1 PM2 ---|"Justifies"| BD3 BD2 ---|"Tension"| AR3 BD1 ---|"Agrees"| PM1 end PM3["Can't we just avoid schema changes?"] PM2["Bad migration = data loss = headline risk"] PM1["Maintenance windows kill user engagement"] subgraph "Product Manager" end AR3["Streams-based migration is the safest path"] AR2["Every migration is a backfill or a new table"] AR1["DynamoDB doesn't support ALTER TABLE"] subgraph "Architect" end BD3["Let me test against real data before production"] BD2["Online migrations are complex — bugs creep in"] BD1["Zero downtime is non-negotiable"] subgraph "Backend Developer"graph TDmermaid### The Debate: Zero Downtime vs Migration Safety## Tradeoffs---Rationale: Default to synthetic staging tables (fast, no data sensitivity risk). For high-risk migrations (partition key redesign, complex backfills), use PITR to restore a production snapshot to staging first. This balances speed (most changes test against synthetic data in seconds) with safety (risky changes are tested against real data patterns).Decision: Separate staging tables with synthetic data + PITR restore for high-risk migrations only| Realistic Testing | Excellent — real data patterns | Excellent — real data patterns | Fair — may miss edge cases || Data Sensitivity | Risk — production data in staging | Risk — production data restored | Safe — synthetic data || Setup Time | Slow — minutes to hours for large tables | Slow — ~10 min per GB | Fast — already exists || Cost | High — full table copy | Medium — restore cost | Low — small dataset || Data Fidelity | 100% — exact copy | 100% — exact copy | ~Variable — synthetic data ||---|---|---|---|| Criteria | Table Clone (On-Demand Export/Import) | PITR Restore | Separate Staging Tables |### Decision 3: Staging Validation — Table Clone vs Point-in-Time Recovery vs Separate Tables---Rationale: CDK manages the GSI definition (keeping IaC as source of truth), but a custom GitHub Actions step monitors the backfill progress after cdk deploy completes. This avoids the CloudFormation timeout issue (CFN has a 60-minute timeout, but GSI backfill can take hours for large tables) while maintaining CDK as the single source of truth for table structure.Decision: Hybrid — CDK for declaration, custom monitoring overlay| Blast Radius | High — CDK rollback may remove GSI | Low — independent from CDK stack || Safety | Medium — CloudFormation timeout possible | High — explicit health checks || Backfill Monitoring | None — CDK fire-and-forget | Full — poll status, alert on timeout || Simplicity | High — declare GSI in CDK, deploy | Low — custom state machine ||---|---|---|| Criteria | CDK-Managed GSIs | Custom Pipeline (Step Functions) |### Decision 2: GSI Management — CDK-Managed vs Custom Pipeline---Rationale: Streams-based migration reads the change stream to populate new fields/tables while the application continues serving traffic. New writes are automatically captured by the stream processor, and a one-time backfill handles historical data. This gives us near-zero downtime without the complexity of dual-write application logic. The exception is partition key redesigns (which fundamentally change the data model) — those require an offline migration with a maintenance window because you can't stream from one key structure to another.Decision: DynamoDB Streams-based backfill (primary) with offline batch as fallback for partition key redesigns| Weighted Score | 6.85/10 | 6.85/10 | 7.75/10 || Performance Impact (10%) | 5/10 — doubles write throughput during migration | 8/10 — no production impact (offline) | 7/10 — stream adds read capacity || Rollback Capability (15%) | 6/10 — must undo dual-write logic | 9/10 — restore from backup | 7/10 — stop stream processor || Complexity (20%) | 4/10 — dual-write logic in app layer | 9/10 — simple batch job | 6/10 — stream processor needed || Data Consistency (25%) | 7/10 — eventual consistency during migration | 10/10 — strong consistency (writes paused) | 8/10 — eventual, ordered by stream || Zero Downtime (30%) | 10/10 — no downtime | 3/10 — requires maintenance window | 9/10 — near-zero downtime ||---|---|---|---|| Criteria (Weight) | Online Dual-Write | Offline Batch (Maintenance Window) | DynamoDB Streams Backfill |### Decision 1: Migration Strategy — Online Dual-Write vs Offline Batch Migration## Critical Decisions---return {'action': 'no_flush_needed', 'reason': migration_type} return {'action': 'dax_node_rebooted', 'reason': migration_type} ) NodeId=target['Nodes'][0]['NodeId'], ClusterName=target['ClusterName'], dax_client.reboot_node( if target: ) None, (c for c in clusters if c['ClusterEndpoint']['Address'] == dax_cluster_endpoint), target = next( clusters = dax_client.describe_clusters()['Clusters'] # Workaround: restart the DAX cluster (rolling reboot) # DAX doesn't have a native flush API if flush_required.get(migration_type, True): } 'ttl_change': False, # TTL is DynamoDB-side, DAX respects it 'field_removal': True, # Cached items have removed field 'field_addition': True, # Cached items missing new field 'gsi_removal': True, # Queries may use wrong index 'gsi_addition': False, # New GSI doesn't affect existing item reads flush_required = { # Schema changes that require DAX cache flush dax_client = boto3.client('dax') """Flush DAX cache when schema changes could cause stale reads."""): migration_type: str, table_name: str, dax_cluster_endpoint: str,def invalidate_dax_on_schema_change(import boto3python### 5. DAX Cache Invalidation on Schema ChangeVERIFY-->>GH: All checks passed GH->>VERIFY: Post-migration smoke tests end MIG-->>GH: Migration result MIG->>DDB: Execute migration (production) GH->>MIG: Run migration on production Note over GH: Manual approval gate VERIFY-->>GH: Validation passed GH->>VERIFY: Validate data integrity (staging) MIG-->>GH: Migration result MIG->>DDB: Execute migration (staging) GH->>MIG: Run migration on staging clone GH->>GH: Request approval (High Risk) alt Data migration required end end DDB-->>GH: ACTIVE GH->>DDB: Poll GSI status every 30s alt GSI Addition CDK->>DDB: Apply infrastructure changes GH->>CDK: cdk deploy —require-approval=never alt CDK-only changes (GSI, TTL, capacity) CDK-->>GH: Change report GH->>CDK: cdk diff (detect changes) DEV->>GH: Push CDK + migration changes participant VERIFY as Verifier participant MIG as Migration Runner participant DDB as DynamoDB participant CDK as CDK Deploy participant GH as GitHub Actions participant DEV as DevelopersequenceDiagrammermaid### 4. Pipeline Orchestrationreturn {'items_reverted': reverted} scan_kwargs['ExclusiveStartKey'] = response['LastEvaluatedKey'] break if 'LastEvaluatedKey' not in response: reverted += 1 ) UpdateExpression='REMOVE intentClassification, datePartition', }, 'conversationId': item['conversationId'], 'userId': item['userId'], Key={ self.table.update_item( for item in response['Items']: response = self.table.scan(**scan_kwargs) while True: } 'ExpressionAttributeValues': {':unclassified': 'UNCLASSIFIED'}, 'FilterExpression': 'intentClassification = :unclassified', scan_kwargs = { reverted = 0 """Remove intentClassification from items where it's UNCLASSIFIED.""" def rollback(self) -> dict: return {'items_updated': updated} scan_kwargs['ExclusiveStartKey'] = response['LastEvaluatedKey'] break if 'LastEvaluatedKey' not in response: updated += 1 batch.put_item(Item=item) item['datePartition'] = item['createdAt'][:10] # YYYY-MM-DD item['intentClassification'] = 'UNCLASSIFIED' for item in items: with self.table.batch_writer() as batch: items = response['Items'] response = self.table.scan(**scan_kwargs) while True: } 'FilterExpression': 'attribute_not_exists(intentClassification)', scan_kwargs = { updated = 0 def migrate(self) -> dict: super().__init__('MangaAssist-Conversations') def __init__(self): """Backfill intentClassification field for existing conversations."""class Migration_20260401_AddIntentField(Migration):# Example: Add 'intentClassification' field to existing conversations ) }, ':e': error, ':f': datetime.utcnow().isoformat(), ':s': 'failed', ExpressionAttributeValues={ ExpressionAttributeNames={'#s': 'status'}, UpdateExpression='SET #s = :s, failedAt = :f, errorMsg = :e', Key={'migrationId': self.migration_id}, self.metadata_table.update_item( def _record_failure(self, error: str): ) }, ':r': json.dumps(result), ':c': datetime.utcnow().isoformat(), ':s': 'completed', ExpressionAttributeValues={ ExpressionAttributeNames={'#s': 'status'}, UpdateExpression='SET #s = :s, completedAt = :c, result = :r', Key={'migrationId': self.migration_id}, self.metadata_table.update_item( def _record_completion(self, result: dict): }) 'startedAt': datetime.utcnow().isoformat(), 'status': 'in_progress', 'migrationId': self.migration_id, self.metadata_table.put_item(Item={ def _record_start(self): return item.get('status') == 'completed' item = response.get('Item', {}) ) Key={'migrationId': self.migration_id} response = self.metadata_table.get_item( def _is_applied(self) -> bool: pass """Implement rollback logic.""" def rollback(self) -> dict: @abstractmethod pass """Implement the migration logic. Must be idempotent.""" def migrate(self) -> dict: @abstractmethod raise self._record_failure(str(e)) except Exception as e: return {'status': 'success', **result} self._record_completion(result) result = self.migrate() try: self._record_start() return {'status': 'skipped', 'reason': 'already applied'} if self._is_applied(): """Run migration if not already applied.""" def execute(self) -> dict: self.metadata_table = self.dynamodb.Table('MangaAssist-Migrations') self.migration_id = self.__class__.__name__ self.table = self.dynamodb.Table(table_name) self.dynamodb = boto3.resource('dynamodb') def __init__(self, table_name: str): """Base class for DynamoDB data migrations. Each migration is idempotent."""class Migration(ABC):import jsonfrom datetime import datetimefrom abc import ABC, abstractmethodimport boto3python### 3. Data Migration Script Framework} 'missing_ids': missing[:10], # First 10 for debugging 'integrity_pass': len(missing) == 0, 'missing_from_gsi': len(missing), 'sampled': len(main_items), return { missing.append(item.get('conversationId')) if not gsi_response['Items']: ) Limit=1, ExpressionAttributeValues={':pk': item.get('datePartition', '')}, ExpressionAttributeNames={'#pk': 'datePartition'}, KeyConditionExpression='#pk = :pk', IndexName=index_name, gsi_response = table.query( for item in main_items: missing = [] # For each item, verify it appears in the GSI main_items = main_response['Items'] main_response = table.scan(Limit=sample_size) # Scan a sample from main table table = dynamodb.Table(table_name) dynamodb = boto3.resource('dynamodb') """Validate GSI contains expected data by sampling.""") -> dict: table_name: str, index_name: str, sample_size: int = 100def validate_gsi_data_integrity( } 'last_status': status, 'duration_seconds': int(time.time() - start), 'status': 'timeout', return { elapsed = time.time() - start time.sleep(poll_interval) ) f"items={item_count}, elapsed={int(elapsed)}s" f"GSI {self.index_name}: status={status}, " print( } 'item_count': item_count, 'duration_seconds': int(time.time() - start), 'status': 'complete', return { if status == 'ACTIVE': item_count = target_gsi.get('ItemCount', 0) status = target_gsi['IndexStatus'] raise ValueError(f"GSI {self.index_name} not found on {self.table_name}") if not target_gsi: ) (g for g in gsis if g['IndexName'] == self.index_name), None target_gsi = next( gsis = response['Table'].get('GlobalSecondaryIndexes', []) response = self.dynamodb.describe_table(TableName=self.table_name) while elapsed < timeout_minutes * 60: elapsed = 0 start = time.time() """Wait for GSI to become ACTIVE (backfill complete).""" ) -> dict: self, timeout_minutes: int = 60, poll_interval: int = 30 def wait_for_backfill( self.index_name = index_name self.table_name = table_name self.dynamodb = boto3.client('dynamodb') def __init__(self, table_name: str, index_name: str): """Monitor GSI backfill progress and block pipeline until complete."""class GSIBackfillMonitor:import timeimport boto3python### 2. GSI Addition Monitor} } }); removalPolicy: cdk.RemovalPolicy.DESTROY, timeToLiveAttribute: 'sessionExpiresAt', // 24-hour TTL billingMode: dynamodb.BillingMode.PAY_PER_REQUEST, partitionKey: { name: 'sessionId', type: dynamodb.AttributeType.STRING }, tableName: 'MangaAssist-Sessions', this.sessionTable = new dynamodb.Table(this, 'SessionTable', { // Session table with aggressive TTL }); projectionType: dynamodb.ProjectionType.KEYS_ONLY, sortKey: { name: 'createdAt', type: dynamodb.AttributeType.STRING }, partitionKey: { name: 'intentClassification', type: dynamodb.AttributeType.STRING }, indexName: 'GSI-ByIntent', this.conversationTable.addGlobalSecondaryIndex({ // GSI for intent analysis: query by intent type }); nonKeyAttributes: ['userId', 'intentClassification', 'satisfactionScore'], projectionType: dynamodb.ProjectionType.INCLUDE, sortKey: { name: 'createdAt', type: dynamodb.AttributeType.STRING }, partitionKey: { name: 'datePartition', type: dynamodb.AttributeType.STRING }, indexName: 'GSI-ConversationsByDate', this.conversationTable.addGlobalSecondaryIndex({ // GSI for analytics: query conversations by date }); removalPolicy: cdk.RemovalPolicy.RETAIN, pointInTimeRecovery: true, stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES, timeToLiveAttribute: 'expiresAt', billingMode: dynamodb.BillingMode.PAY_PER_REQUEST, sortKey: { name: 'conversationId', type: dynamodb.AttributeType.STRING }, partitionKey: { name: 'userId', type: dynamodb.AttributeType.STRING }, tableName: 'MangaAssist-Conversations', this.conversationTable = new dynamodb.Table(this, 'ConversationTable', { // Main conversation table super(scope, id, props); constructor(scope: cdk.App, id: string, props: cdk.StackProps) { public readonly userProfileTable: dynamodb.Table; public readonly sessionTable: dynamodb.Table; public readonly conversationTable: dynamodb.Table;export class DataStack extends cdk.Stack {import * as dax from 'aws-cdk-lib/aws-dax';import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';import * as cdk from 'aws-cdk-lib';// lib/stacks/data-stack.tstypescript### 1. DynamoDB Table Definitions in CDK## Low-Level Design---style S fill:#DD344C,color:#fff style R fill:#1B660F,color:#fff Q -->|No| S[Execute Rollback Procedure] Q -->|Yes| R[Migration Complete] P --> Q{All Pass?} N --> P F --> P[Post-Migration Smoke Tests] J -->|No| O[Abort + Document Reason] M --> N[Deploy to Production] L --> M[Validate Data Integrity] K --> L[Run Migration Script] J -->|Yes| K[Deploy to Staging Clone] E --> J{Approved?} H -->|Timeout| I[Alert + Manual Review] H -->|Yes| F G --> H{Backfill Complete?} D --> G[Monitor GSI Backfill] C --> F[Validate Post-Deploy] B -->|High| E[Request Manual Approval] B -->|Medium| D[Deploy + Monitor] B -->|Low| C[Auto-Deploy CDK] A[CDK Diff Generated] --> B{Risk Level?}flowchart TDmermaid### Pipeline FlowC1 & C2 & C3 & C4 --> F[Human Approval + Staged Rollout] B1 & B2 & B3 & B4 --> E[Deploy + Monitor + Validate] A1 & A2 & A3 & A4 --> D[Auto-Deploy via CDK] end C4[Partition Key Redesign] C3[Data Migration / Backfill] C2[Table Deletion] C1[GSI Removal] subgraph "High Risk — Manual Approval" end B4[Stream Enable/Modify] B3[DAX Node Scaling] B2[New Table Creation] B1[GSI Addition] subgraph "Medium Risk — Auto + Validation" end A4[DAX Parameter Update] A3[Tag Changes] A2[Capacity Mode Switch] A1[TTL Enable/Adjust] subgraph "Low Risk — Automated"graph LRmermaid### Database Change Types and Risk Levels## High-Level Design---- [ ] DynamoDB Streams remain operational during and after migration- [ ] Pipeline blocks if estimated migration duration exceeds maintenance window- [ ] Capacity changes (on-demand ↔ provisioned) are validated with load-test results- [ ] Rollback procedures are documented and tested for each migration type- [ ] Every migration runs against a staging table copy before production- [ ] Data migration scripts for schema evolution are version-controlled and idempotent- [ ] DAX cluster configuration changes are zero-downtime (parameter group updates, node scaling)- [ ] GSI removals follow a deprecation workflow (stop reads → remove after 7-day grace period)- [ ] GSI additions are deployed with backfill monitoring and completion verification- [ ] All DynamoDB table definitions, GSI configurations, and TTL settings are managed in CDK## Acceptance Criteria---So that database migrations happen as code with zero downtime, full rollback capability, and automated validation that data integrity is maintained.I want to establish a safe, automated pipeline for deploying DynamoDB schema changes (new tables, GSI additions/removals, TTL adjustments, capacity changes) and DAX cache configuration changes,As a Backend Developer on the MangaAssist AI Chatbot team,## User Story

User Story

As a Backend Developer on the MangaAssist AI Chatbot team, I want to establish a safe, automated pipeline for deploying DynamoDB schema changes (GSI additions/removals, TTL policy changes, DAX cache configuration, capacity mode switches, backup policy updates) alongside application code changes, So that database changes are applied without downtime, with automatic rollback capability, and without risk of data loss or inconsistency.


Acceptance Criteria

  • All DynamoDB table changes are defined in CDK (no manual console changes)
  • GSI additions are deployed before application code that queries them
  • GSI removals are deployed after application code no longer references them
  • Pipeline validates that GSI is ACTIVE before proceeding with dependent code deploy
  • TTL policy changes are validated against data retention compliance requirements
  • Capacity mode switches (on-demand ↔ provisioned) are tested in staging first
  • DAX cache invalidation is coordinated with schema changes
  • Zero-downtime deployment — no read/write interruptions during migration
  • DynamoDB Streams backfill is used for data transformations (not scan-and-write)
  • Pipeline creates point-in-time backup before any schema modification
  • Rollback plan is documented and tested for every migration type

High-Level Design

Migration Types and Risk Levels

graph TB
    subgraph "Low Risk — Automated"
        L1["Add GSI"]
        L2["Enable TTL"]
        L3["Change backup policy"]
        L4["Update DAX TTL"]
    end

    subgraph "Medium Risk — Staged + Validated"
        M1["Modify GSI (add/remove attributes)"]
        M2["Switch capacity mode"]
        M3["Change partition key structure"]
        M4["Enable/configure DynamoDB Streams"]
    end

    subgraph "High Risk — Manual Approval"
        H1["Remove GSI"]
        H2["Change primary key design"]
        H3["Large-scale data backfill"]
        H4["Cross-table data migration"]
    end

    style L1 fill:#1B660F,color:#fff
    style L2 fill:#1B660F,color:#fff
    style L3 fill:#1B660F,color:#fff
    style L4 fill:#1B660F,color:#fff
    style M1 fill:#ff9900,color:#000
    style M2 fill:#ff9900,color:#000
    style M3 fill:#ff9900,color:#000
    style M4 fill:#ff9900,color:#000
    style H1 fill:#DD344C,color:#fff
    style H2 fill:#DD344C,color:#fff
    style H3 fill:#DD344C,color:#fff
    style H4 fill:#DD344C,color:#fff

Pipeline Overview

flowchart LR
    subgraph "Phase 1: Pre-Migration"
        A[CDK Diff Detection] --> B[Risk Classification]
        B --> C[Backup Creation]
        C --> D{Risk Level?}
    end

    subgraph "Phase 2: Apply"
        D -->|Low| E[Auto-Deploy CDK]
        D -->|Medium| F[Deploy to Staging → Validate → Deploy to Prod]
        D -->|High| G[Manual Approval → Staged Deploy]
    end

    subgraph "Phase 3: Validate"
        E --> H[Verify GSI Status = ACTIVE]
        F --> H
        G --> H
        H --> I[Run Integration Tests]
        I --> J[Deploy Dependent App Code]
    end

    subgraph "Phase 4: Post-Migration"
        J --> K[Monitor Error Rates]
        K --> L[Cleanup Old GSIs after 7 days]
    end

Low-Level Design

1. DynamoDB Table Landscape (from CD-02)

MangaAssist DynamoDB Tables:
├── ConversationTable
│   ├── PK: USER#<user_id>
│   ├── SK: CONV#<conversation_id>#MSG#<timestamp>
│   ├── GSI1: ConversationIndex (PK: CONV#<id>, SK: timestamp)
│   ├── GSI2: UserRecentIndex (PK: USER#<id>, SK: timestamp)
│   ├── TTL: expires_at (90 days)
│   └── Stream: NEW_AND_OLD_IMAGES
│
├── SessionTable
│   ├── PK: SESSION#<session_id>
│   ├── SK: METADATA
│   ├── TTL: expires_at (24 hours)
│   └── DAX: enabled (5-second TTL)
│
├── IntentCacheTable
│   ├── PK: INTENT#<hash(query)>
│   ├── SK: MODEL_VERSION#<version>
│   ├── TTL: expires_at (1 hour)
│   └── DAX: enabled (30-second TTL)
│
├── ProductCatalogTable
│   ├── PK: PRODUCT#<product_id>
│   ├── SK: METADATA
│   ├── GSI1: CategoryIndex (PK: CATEGORY#<cat>, SK: price)
│   ├── GSI2: SearchIndex (PK: SEARCH_TERM, SK: relevance_score)
│   └── Stream: NEW_IMAGE → triggers RAG re-index
│
└── AnalyticsTable
    ├── PK: DATE#<date>
    ├── SK: METRIC#<metric_name>#<dimension>
    └── TTL: expires_at (365 days)

2. Migration Ordering — The Expand-Contract Pattern

sequenceDiagram
    participant CDK as CDK Pipeline
    participant DDB as DynamoDB
    participant APP as Application Code
    participant DAX as DAX Cache

    Note over CDK,DAX: EXPAND Phase (safe to add)
    CDK->>DDB: Add new GSI
    DDB-->>CDK: GSI status: CREATING → BACKFILLING → ACTIVE
    CDK->>CDK: Wait for ACTIVE status (poll every 30s)

    Note over CDK,DAX: MIGRATE Phase (dual-write)
    CDK->>APP: Deploy new code that writes to BOTH old and new patterns
    APP->>DDB: Write to old pattern + new pattern
    APP->>DDB: Read from new GSI (if available) else old

    Note over CDK,DAX: CONTRACT Phase (cleanup)
    CDK->>APP: Deploy code that uses ONLY new pattern
    CDK->>DAX: Invalidate cache entries for old access pattern
    CDK->>DDB: Remove old GSI (7-day delay, manual approval)

3. CDK Migration Detection

// lib/migration-detector.ts
import { CfnDiff, CfnTemplate, diffTemplate } from 'aws-cdk-lib/cloudformation-diff';

interface MigrationChange {
  tableName: string;
  changeType: 'ADD_GSI' | 'REMOVE_GSI' | 'MODIFY_GSI' | 'CHANGE_TTL' 
    | 'CHANGE_CAPACITY' | 'CHANGE_STREAM' | 'CHANGE_PITR';
  risk: 'low' | 'medium' | 'high';
  details: string;
  requiresApproval: boolean;
}

function classifyDynamoDBChanges(diff: CfnDiff): MigrationChange[] {
  const changes: MigrationChange[] = [];

  for (const [logicalId, change] of Object.entries(diff.resources.changes)) {
    if (change.resourceType !== 'AWS::DynamoDB::Table') continue;

    for (const [propPath, propChange] of Object.entries(change.propertyUpdates)) {
      if (propPath.startsWith('GlobalSecondaryIndexes')) {
        if (propChange.changeImpact === 'WILL_REPLACE') {
          changes.push({
            tableName: logicalId,
            changeType: 'REMOVE_GSI',
            risk: 'high',
            details: `GSI replacement detected — this will delete and recreate the index`,
            requiresApproval: true,
          });
        } else if (propChange.newValue && !propChange.oldValue) {
          changes.push({
            tableName: logicalId,
            changeType: 'ADD_GSI',
            risk: 'low',
            details: `New GSI: ${JSON.stringify(propChange.newValue)}`,
            requiresApproval: false,
          });
        }
      }

      if (propPath === 'TimeToLiveSpecification') {
        changes.push({
          tableName: logicalId,
          changeType: 'CHANGE_TTL',
          risk: 'low',
          details: `TTL change: ${JSON.stringify(propChange)}`,
          requiresApproval: false,
        });
      }

      if (propPath === 'BillingMode') {
        changes.push({
          tableName: logicalId,
          changeType: 'CHANGE_CAPACITY',
          risk: 'medium',
          details: `Capacity mode switch: ${propChange.oldValue}${propChange.newValue}`,
          requiresApproval: true,
        });
      }
    }
  }

  return changes;
}

4. GSI Status Waiter

import boto3
import time

def wait_for_gsi_active(
    table_name: str,
    gsi_name: str,
    max_wait_seconds: int = 3600,
    poll_interval: int = 30,
) -> bool:
    """Wait for a GSI to become ACTIVE after creation."""
    dynamodb = boto3.client('dynamodb')
    elapsed = 0

    while elapsed < max_wait_seconds:
        response = dynamodb.describe_table(TableName=table_name)
        gsis = response['Table'].get('GlobalSecondaryIndexes', [])

        for gsi in gsis:
            if gsi['IndexName'] == gsi_name:
                status = gsi['IndexStatus']
                if status == 'ACTIVE':
                    print(f"GSI {gsi_name} is ACTIVE after {elapsed}s")
                    return True
                elif status in ('CREATING', 'UPDATING'):
                    # Check backfill progress
                    item_count = gsi.get('ItemCount', 0)
                    table_count = response['Table'].get('ItemCount', 0)
                    progress = (item_count / table_count * 100) if table_count > 0 else 0
                    print(f"GSI {gsi_name}: {status} ({progress:.1f}% backfilled)")
                else:
                    print(f"GSI {gsi_name} unexpected status: {status}")
                    return False
                break
        else:
            print(f"GSI {gsi_name} not found on table {table_name}")
            return False

        time.sleep(poll_interval)
        elapsed += poll_interval

    print(f"GSI {gsi_name} did not become ACTIVE within {max_wait_seconds}s")
    return False

5. Data Backfill via DynamoDB Streams

import boto3
import json

class StreamBackfiller:
    """Backfill data to new access patterns using DynamoDB Streams."""

    def __init__(self, table_name: str, stream_arn: str):
        self.dynamodb = boto3.resource('dynamodb')
        self.table = self.dynamodb.Table(table_name)
        self.streams = boto3.client('dynamodbstreams')
        self.stream_arn = stream_arn

    def process_stream_records(self, transform_fn, batch_size: int = 25):
        """Process stream records and apply transformation."""
        shards = self.streams.describe_stream(
            StreamArn=self.stream_arn
        )['StreamDescription']['Shards']

        for shard in shards:
            iterator = self.streams.get_shard_iterator(
                StreamArn=self.stream_arn,
                ShardId=shard['ShardId'],
                ShardIteratorType='TRIM_HORIZON',
            )['ShardIterator']

            while iterator:
                response = self.streams.get_records(
                    ShardIterator=iterator, Limit=batch_size
                )

                for record in response['Records']:
                    if record['eventName'] in ('INSERT', 'MODIFY'):
                        new_image = record['dynamodb']['NewImage']
                        transformed = transform_fn(new_image)
                        if transformed:
                            self._write_batch([transformed])

                iterator = response.get('NextShardIterator')

    def backfill_full_table(self, transform_fn, batch_size: int = 25):
        """Full table scan backfill for historical data."""
        with self.table.batch_writer() as batch:
            scan_kwargs = {}
            while True:
                response = self.table.scan(**scan_kwargs)
                for item in response['Items']:
                    transformed = transform_fn(item)
                    if transformed:
                        batch.put_item(Item=transformed)

                if 'LastEvaluatedKey' not in response:
                    break
                scan_kwargs['ExclusiveStartKey'] = response['LastEvaluatedKey']

6. DAX Cache Coordination

import boto3

class DAXCacheCoordinator:
    """Coordinate DAX cache invalidation with schema changes."""

    def __init__(self, dax_cluster_endpoint: str):
        self.dax = boto3.client('dax')

    def pre_migration_flush(self, tables: list[str]):
        """Flush DAX cache before schema migration to prevent stale reads."""
        # DAX doesn't support selective invalidation
        # Options:
        # 1. Set TTL very low before migration
        # 2. Restart DAX nodes (causes brief cache miss storm)
        # 3. Accept stale reads during migration window

        # Strategy: Lower TTL to 1 second, wait for cache to expire, then migrate
        print(f"Lowering DAX TTL for tables: {tables}")
        print("Waiting 30 seconds for cache entries to expire...")
        # In practice: update DAX parameter group via CDK

    def post_migration_restore(self, tables: list[str]):
        """Restore normal DAX TTL after migration completes."""
        print(f"Restoring DAX TTL for tables: {tables}")
        # Restore parameter group to normal values:
        # SessionTable: 5 seconds
        # IntentCacheTable: 30 seconds

7. GitHub Actions Pipeline

name: database-migration
on:
  push:
    branches: [main]
    paths: ['infra/lib/data-stack.ts', 'migrations/**']

jobs:
  detect-and-classify:
    runs-on: ubuntu-latest
    outputs:
      has_changes: ${{ steps.diff.outputs.has_changes }}
      risk_level: ${{ steps.classify.outputs.risk_level }}
      changes_json: ${{ steps.classify.outputs.changes_json }}
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-node@v4
        with:
          node-version: 20

      - name: CDK Diff
        id: diff
        run: |
          cd infra
          npm ci
          npx cdk diff DataStack --no-color > /tmp/diff.txt 2>&1 || true
          if grep -q "no differences" /tmp/diff.txt; then
            echo "has_changes=false" >> $GITHUB_OUTPUT
          else
            echo "has_changes=true" >> $GITHUB_OUTPUT
          fi

      - name: Classify Changes
        id: classify
        if: steps.diff.outputs.has_changes == 'true'
        run: |
          # Parse CDK diff to classify risk level
          node scripts/classify-db-changes.js /tmp/diff.txt

  backup:
    needs: detect-and-classify
    if: needs.detect-and-classify.outputs.has_changes == 'true'
    runs-on: ubuntu-latest
    steps:
      - uses: aws-actions/configure-aws-credentials@v4
        with:
          role-to-assume: arn:aws:iam::role/github-actions-infra
          aws-region: us-east-1

      - name: Create PITR Backup
        run: |
          for table in ConversationTable SessionTable ProductCatalogTable; do
            aws dynamodb create-backup \
              --table-name $table \
              --backup-name "pre-migration-$(date +%Y%m%d-%H%M%S)"
          done

  deploy-staging:
    needs: [detect-and-classify, backup]
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: aws-actions/configure-aws-credentials@v4
        with:
          role-to-assume: arn:aws:iam::role/github-actions-infra-staging
          aws-region: us-east-1

      - name: Deploy to Staging
        run: |
          cd infra
          npm ci
          npx cdk deploy DataStack --require-approval never

      - name: Wait for GSI Active
        run: python scripts/wait-for-gsi.py --env staging

      - name: Run Integration Tests
        run: pytest tests/integration/dynamodb/ -v

  deploy-production:
    needs: [detect-and-classify, deploy-staging]
    if: needs.detect-and-classify.outputs.risk_level != 'high'
    runs-on: ubuntu-latest
    environment: production  # Requires manual approval for 'medium' risk
    steps:
      - uses: actions/checkout@v4
      - uses: aws-actions/configure-aws-credentials@v4
        with:
          role-to-assume: arn:aws:iam::role/github-actions-infra-prod
          aws-region: us-east-1

      - name: Deploy to Production
        run: |
          cd infra
          npm ci
          npx cdk deploy DataStack --require-approval never

      - name: Wait for GSI Active
        run: python scripts/wait-for-gsi.py --env production

      - name: Verify Read/Write
        run: python scripts/verify-table-health.py --env production

Critical Decisions

Decision 1: Migration Strategy — Online Dual-Write vs Offline Migration vs Expand-Contract

Criteria (Weight) Expand-Contract Online Dual-Write Offline Migration
Zero Downtime (30%) 10/10 — always available 9/10 — available during migration 2/10 — requires maintenance window
Data Consistency (25%) 9/10 — streams ensure consistency 7/10 — risk of divergence 10/10 — single write path
Complexity (20%) 6/10 — 3-phase coordination 4/10 — dual-write code paths 8/10 — straightforward
Rollback Safety (15%) 9/10 — old GSI kept for days 7/10 — can stop writing to new 3/10 — restore from backup
Cost (10%) 7/10 — temporary GSI overlap 5/10 — double write capacity 9/10 — one-time cost
Weighted Score 8.5/10 6.7/10 5.5/10

Decision: Expand-Contract pattern for all DynamoDB migrations

Rationale: DynamoDB's unique constraint is that GSI creation involves asynchronous backfilling that can take hours for large tables. The expand-contract pattern accounts for this: 1. Expand: Add the new GSI (CDK deploy). Wait for ACTIVE. 2. Migrate: Deploy code that reads from the new GSI. Old GSI still exists as fallback. 3. Contract: After 7 days of stable reads from new GSI, remove the old GSI (separate PR with manual approval).

This is safer than dual-write because DynamoDB Streams automatically backfills the GSI — no application-level dual-write logic needed. Offline migration is rejected because the chatbot runs 24/7 and any downtime directly impacts customer purchase decisions.


Decision 2: Backfill Approach — DynamoDB Streams vs Scan-and-Write

Criteria DynamoDB Streams Full Table Scan + Write
Data Completeness Only captures changes after stream is enabled Processes ALL existing records
Performance Impact Zero — reads from stream, not table High — full scan consumes read capacity
Cost Free — stream reads don't count toward table RCU Expensive — full scan at $0.25/million reads
Ordering Guarantee Per-partition-key ordering No ordering guarantee
Failure Recovery Checkpoint-based — resume from last position Must rescan or track progress

Decision: Hybrid — Streams for ongoing changes + one-time scan for historical data

Rationale: When adding a new GSI that requires data transformation (not just a new key projection), DynamoDB's automatic GSI backfill handles simple cases. For complex transformations (e.g., adding a computed field), we need: 1. Enable DynamoDB Streams first (captures all new writes) 2. Run a one-time scan-and-transform for historical data 3. Stream processor catches up any writes that happened during the scan

This hybrid approach ensures no data is missed. Using only Streams would miss historical data; using only Scan would miss concurrent writes.


Decision 3: Capacity Mode During Migration — Stay On-Demand vs Switch to Provisioned

Criteria Stay On-Demand Switch to Provisioned
Migration Burst Handling Excellent — auto-scales Requires capacity planning
Cost Predictability Unpredictable during migration Predictable
Throttling Risk Low — but has per-partition limits Higher if under-provisioned
Simplicity No changes needed Must estimate migration throughput

Decision: Stay on-demand during migration, optimize afterward

Rationale: DynamoDB on-demand mode handles the burst traffic from GSI backfilling automatically. Switching to provisioned mode before a migration adds risk — if we under-estimate the backfill throughput, we get throttling that slows the migration. The cost difference during a migration window (typically < 1 hour) is negligible compared to the risk of throttling.


Tradeoffs

The Debate: Migration Speed vs Safety

graph TD
    subgraph "Backend Developer"
        BD1["GSI creation takes hours on large tables"]
        BD2["Want to add GSI and deploy code in one PR"]
        BD3["Expand-contract is 3 PRs for one feature"]
    end

    subgraph "Architect"
        AR1["Deploying code before GSI is ACTIVE = errors"]
        AR2["One PR = one concern — separate schema from code"]
        AR3["7-day cleanup delay catches edge cases"]
    end

    subgraph "Product Manager"
        PM1["Feature blocked until GSI is ready"]
        PM2["Can we build without the GSI?"]
        PM3["3-PR cycle = 3 review cycles = slow"]
    end

    BD2 ---|"Conflict"| AR2
    BD1 ---|"Frustrates"| PM1
    AR3 ---|"Delays"| PM3

Resolution

Concern Solution Compromise
Developer: Too many PRs GSI PR auto-merges (low risk); code PR created immediately; cleanup PR auto-generated after 7 days Developer creates 1 PR manually; 2 are automated
Developer: GSI takes hours Pipeline creates GSI in staging first; by the time staging tests pass, prod GSI may already be ready Must plan GSI additions 1 day ahead for large tables (>10M items)
Architect: Code before GSI Pipeline blocks code deploy until wait_for_gsi_active() passes Adds 5-60 min to deploy (GSI backfill time)
PM: Feature blocked Feature flag hides UI until GSI is ready; backend code deploys but returns fallback behavior Feature has "soft launch" with fallback until GSI is live

Key Tradeoff: DynamoDB Streams Cost

Streams are charged at:
- $0.02 per 100,000 read requests (from Kinesis adapter)
- Stream records retained for 24 hours (free storage)

For a table with ~10K writes/day:
- Stream reads during migration: ~10K records × $0.02/100K = $0.002/day
- Total for a typical migration: < $0.01

The cost is negligible. The real cost is operational complexity:
- Must handle stream processing failures
- Must handle out-of-order delivery (within partition)
- Must handle duplicate records (at-least-once delivery)

The operational complexity of Streams is the true cost — not the $0.01 AWS charge. Every stream processor must be idempotent, handle duplicates, and checkpoint progress. This is why we only use Streams for migrations that genuinely need data transformation — for simple GSI additions, let DynamoDB's built-in backfill handle it.