LOCAL PREVIEW View on GitHub

DynamoDB + API Gateway + Step Functions — Orchestration LLD

Project: MangaAssist (Amazon-style chatbot) Scope: How API Gateway routes requests, Step Functions orchestrate multi-step workflows, and DynamoDB serves as the state backbone.


1. Three-Layer Architecture

flowchart TB
    subgraph Client_Layer["Client Layer"]
        WEB["Web App"]
        MOBILE["Mobile App"]
        AGENT["Agent Console"]
    end

    subgraph API_Layer["API Gateway Layer"]
        REST["REST API<br/>(CRUD operations)"]
        WS["WebSocket API<br/>(Real-time chat)"]
    end

    subgraph Orchestration["Step Functions Orchestration"]
        SF_ONBOARD["Session Onboarding Flow"]
        SF_HANDOFF["Human Handoff Flow"]
        SF_CLEANUP["Session Cleanup Flow"]
        SF_EXPORT["Data Export Flow"]
    end

    subgraph Compute["Lambda Functions"]
        L1["Auth Lambda"]
        L2["Message Lambda"]
        L3["Session Lambda"]
        L4["Admin Lambda"]
    end

    subgraph State["DynamoDB"]
        SESSIONS[(manga-assist-sessions)]
    end

    WEB --> REST
    WEB --> WS
    MOBILE --> REST
    MOBILE --> WS
    AGENT --> REST

    REST --> L1
    REST --> L3
    REST --> L4
    WS --> L2

    L1 --> SESSIONS
    L2 --> SESSIONS
    L3 --> SESSIONS
    L4 --> SESSIONS

    REST -->|"POST /sessions/{id}/handoff"| SF_HANDOFF
    REST -->|"POST /admin/cleanup"| SF_CLEANUP
    REST -->|"POST /admin/export"| SF_EXPORT
    WS -->|"$connect"| SF_ONBOARD

    SF_HANDOFF --> SESSIONS
    SF_CLEANUP --> SESSIONS
    SF_EXPORT --> SESSIONS

2. API Gateway REST Endpoints → DynamoDB

flowchart LR
    subgraph REST_API["REST API Gateway"]
        direction TB
        E1["GET /sessions/{id}"]
        E2["GET /sessions/{id}/turns"]
        E3["POST /sessions"]
        E4["POST /sessions/{id}/handoff"]
        E5["DELETE /sessions/{id}"]
        E6["GET /customers/{id}/sessions"]
    end

    subgraph Lambda_Handlers["Lambda"]
        H1["get_session"]
        H2["get_turns"]
        H3["create_session"]
        H4["initiate_handoff"]
        H5["delete_session"]
        H6["list_customer_sessions"]
    end

    subgraph DynamoDB_Ops["DynamoDB Operations"]
        D1["GetItem(PK, SK=META)"]
        D2["Query(PK, SK begins_with TURN#)"]
        D3["PutItem(PK, SK=META)<br/>ConditionExpression"]
        D4["TransactWriteItems<br/>(META + HANDOFF + TURN)"]
        D5["DeleteItem(PK, SK=META)<br/>+ BatchWrite to remove turns"]
        D6["Query GSI1<br/>(customer_id, updated_at)"]
    end

    E1 --> H1 --> D1
    E2 --> H2 --> D2
    E3 --> H3 --> D3
    E4 --> H4 --> D4
    E5 --> H5 --> D5
    E6 --> H6 --> D6

API Gateway + Lambda Proxy Integration

"""
Lambda: manga-assist-session-api
Trigger: API Gateway REST proxy integration
Purpose: Unified handler for session CRUD operations
"""

import json
import os
import time
import uuid
import boto3
from botocore.exceptions import ClientError

dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ["SESSION_TABLE"])


def handler(event, context):
    http_method = event["httpMethod"]
    path = event["resource"]
    path_params = event.get("pathParameters") or {}
    query_params = event.get("queryStringParameters") or {}
    body = json.loads(event.get("body") or "{}")

    # Route based on method + path
    routes = {
        ("GET", "/sessions/{id}"): get_session,
        ("GET", "/sessions/{id}/turns"): get_turns,
        ("POST", "/sessions"): create_session,
        ("DELETE", "/sessions/{id}"): delete_session,
        ("GET", "/customers/{id}/sessions"): list_customer_sessions,
    }

    handler_fn = routes.get((http_method, path))
    if not handler_fn:
        return _response(404, {"error": "Not found"})

    try:
        return handler_fn(path_params, query_params, body)
    except ClientError as e:
        code = e.response["Error"]["Code"]
        if code == "ConditionalCheckFailedException":
            return _response(409, {"error": "Conflict — resource already exists or condition not met"})
        if code == "ResourceNotFoundException":
            return _response(404, {"error": "Resource not found"})
        raise


def get_session(path_params, query_params, body):
    session_id = path_params["id"]
    resp = table.get_item(Key={"PK": f"SESSION#{session_id}", "SK": "META"})
    item = resp.get("Item")
    if not item:
        return _response(404, {"error": "Session not found"})
    return _response(200, _sanitize(item))


def get_turns(path_params, query_params, body):
    session_id = path_params["id"]
    limit = int(query_params.get("limit", "20"))
    # Cap at 100 to prevent abuse
    limit = min(limit, 100)

    resp = table.query(
        KeyConditionExpression="PK = :pk AND begins_with(SK, :prefix)",
        ExpressionAttributeValues={
            ":pk": f"SESSION#{session_id}",
            ":prefix": "TURN#",
        },
        ScanIndexForward=False,
        Limit=limit,
    )

    turns = resp.get("Items", [])
    turns.reverse()  # Return in chronological order

    return _response(200, {
        "session_id": session_id,
        "turns": [_sanitize(t) for t in turns],
        "has_more": "LastEvaluatedKey" in resp,
    })


def create_session(path_params, query_params, body):
    session_id = str(uuid.uuid4())
    customer_id = body.get("customer_id", "anonymous")
    now = int(time.time())

    table.put_item(
        Item={
            "PK": f"SESSION#{session_id}",
            "SK": "META",
            "session_id": session_id,
            "customer_id": customer_id,
            "status": "active",
            "turn_count": 0,
            "created_at": now,
            "updated_at": now,
            "ttl": now + 86400,
        },
        ConditionExpression="attribute_not_exists(PK)",
    )

    return _response(201, {"session_id": session_id})


def delete_session(path_params, query_params, body):
    session_id = path_params["id"]
    pk = f"SESSION#{session_id}"

    # First, mark META as deleted (soft delete)
    table.update_item(
        Key={"PK": pk, "SK": "META"},
        UpdateExpression="SET #s = :deleted, deleted_at = :now",
        ExpressionAttributeNames={"#s": "status"},
        ExpressionAttributeValues={
            ":deleted": "deleted",
            ":now": int(time.time()),
        },
    )

    # Actual cleanup of TURN/SUMMARY items happens via Step Functions async
    return _response(200, {"status": "deletion_initiated", "session_id": session_id})


def list_customer_sessions(path_params, query_params, body):
    customer_id = path_params["id"]
    limit = min(int(query_params.get("limit", "10")), 50)

    resp = table.query(
        IndexName="GSI1-customer-sessions",
        KeyConditionExpression="customer_id = :cid",
        ExpressionAttributeValues={":cid": customer_id},
        ScanIndexForward=False,  # Most recent first
        Limit=limit,
    )

    return _response(200, {
        "customer_id": customer_id,
        "sessions": [_sanitize(i) for i in resp.get("Items", [])],
    })


def _sanitize(item: dict) -> dict:
    """Remove internal fields and binary data before returning to client."""
    exclude = {"PK", "SK", "content_compressed", "ttl"}
    return {k: v for k, v in item.items() if k not in exclude}


def _response(status: int, body: dict) -> dict:
    return {
        "statusCode": status,
        "headers": {
            "Content-Type": "application/json",
            "Access-Control-Allow-Origin": "*",
        },
        "body": json.dumps(body, default=str),
    }

3. Step Functions — Human Handoff Workflow

stateDiagram-v2
    [*] --> ValidateSession
    ValidateSession --> CheckAgentAvailability : Session is active
    ValidateSession --> HandoffFailed : Session not found or inactive

    CheckAgentAvailability --> AssignAgent : Agent available
    CheckAgentAvailability --> QueueForAgent : No agent available

    QueueForAgent --> WaitForAgent : Add to queue
    WaitForAgent --> AssignAgent : Agent picked up (callback)
    WaitForAgent --> EscalateToManager : Timeout (5 min)

    AssignAgent --> UpdateDynamoDB : Agent confirmed
    UpdateDynamoDB --> NotifyCustomer : Transaction success
    UpdateDynamoDB --> HandoffFailed : Transaction failed

    NotifyCustomer --> TransferConversationContext
    TransferConversationContext --> NotifyAgent
    NotifyAgent --> MonitorHandoff

    MonitorHandoff --> HandoffComplete : Agent sends first reply
    MonitorHandoff --> ReassignAgent : Agent idle > 2 min

    ReassignAgent --> CheckAgentAvailability

    EscalateToManager --> HandoffFailed : No manager available
    EscalateToManager --> AssignAgent : Manager takes over

    HandoffComplete --> [*]
    HandoffFailed --> [*]

Step Functions Definition (ASL)

{
  "Comment": "Human Handoff Workflow for MangaAssist",
  "StartAt": "ValidateSession",
  "States": {
    "ValidateSession": {
      "Type": "Task",
      "Resource": "arn:aws:states:::dynamodb:getItem",
      "Parameters": {
        "TableName": "manga-assist-sessions",
        "Key": {
          "PK": {"S.$": "States.Format('SESSION#{}', $.session_id)"},
          "SK": {"S": "META"}
        }
      },
      "ResultPath": "$.session",
      "Next": "CheckSessionActive",
      "Catch": [{
        "ErrorEquals": ["States.ALL"],
        "Next": "HandoffFailed",
        "ResultPath": "$.error"
      }]
    },

    "CheckSessionActive": {
      "Type": "Choice",
      "Choices": [{
        "Variable": "$.session.Item.status.S",
        "StringEquals": "active",
        "Next": "CheckAgentAvailability"
      }],
      "Default": "HandoffFailed"
    },

    "CheckAgentAvailability": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789:function:check-agent-availability",
      "Parameters": {
        "skill_required.$": "$.reason",
        "priority.$": "$.priority"
      },
      "ResultPath": "$.agent",
      "Next": "IsAgentAvailable",
      "Retry": [{
        "ErrorEquals": ["ServiceException"],
        "IntervalSeconds": 2,
        "MaxAttempts": 3,
        "BackoffRate": 2.0
      }]
    },

    "IsAgentAvailable": {
      "Type": "Choice",
      "Choices": [{
        "Variable": "$.agent.available",
        "BooleanEquals": true,
        "Next": "UpdateDynamoDB"
      }],
      "Default": "WaitForAgent"
    },

    "WaitForAgent": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
      "Parameters": {
        "QueueUrl": "https://sqs.us-east-1.amazonaws.com/123456789/agent-assignment-queue",
        "MessageBody": {
          "session_id.$": "$.session_id",
          "reason.$": "$.reason",
          "taskToken.$": "$$.Task.Token"
        }
      },
      "TimeoutSeconds": 300,
      "ResultPath": "$.agent",
      "Next": "UpdateDynamoDB",
      "Catch": [{
        "ErrorEquals": ["States.Timeout"],
        "Next": "EscalateToManager"
      }]
    },

    "UpdateDynamoDB": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789:function:update-handoff-dynamodb",
      "Parameters": {
        "session_id.$": "$.session_id",
        "agent_id.$": "$.agent.agent_id",
        "reason.$": "$.reason"
      },
      "Next": "NotifyParties",
      "Catch": [{
        "ErrorEquals": ["ConditionalCheckFailedException"],
        "Next": "HandoffFailed"
      }]
    },

    "NotifyParties": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "NotifyCustomer",
          "States": {
            "NotifyCustomer": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:us-east-1:123456789:function:notify-customer",
              "End": true
            }
          }
        },
        {
          "StartAt": "NotifyAgent",
          "States": {
            "NotifyAgent": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:us-east-1:123456789:function:notify-agent",
              "End": true
            }
          }
        }
      ],
      "Next": "HandoffComplete"
    },

    "EscalateToManager": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789:function:escalate-to-manager",
      "Next": "CheckAgentAvailability",
      "Catch": [{
        "ErrorEquals": ["States.ALL"],
        "Next": "HandoffFailed"
      }]
    },

    "HandoffComplete": {
      "Type": "Succeed"
    },

    "HandoffFailed": {
      "Type": "Fail",
      "Error": "HandoffError",
      "Cause": "Unable to complete handoff to human agent"
    }
  }
}

4. Step Functions — Session Cleanup Workflow

flowchart TD
    START["Triggered by:<br/>- CloudWatch scheduled rule (hourly)<br/>- Admin API call<br/>- EventBridge session disconnected"]

    START --> QUERY_EXPIRED["Query DynamoDB<br/>Find sessions with<br/>status=disconnected AND<br/>disconnected_at < (now - 1h)"]

    QUERY_EXPIRED --> MAP_STATE{"Map State<br/>(parallel per session)"}

    MAP_STATE --> |"Session 1"| EXPORT1["Export turns to S3<br/>(audit archive)"]
    MAP_STATE --> |"Session 2"| EXPORT2["Export turns to S3"]
    MAP_STATE --> |"Session N"| EXPORTN["Export turns to S3"]

    EXPORT1 --> DELETE1["BatchWriteItem<br/>Delete all TURN# items"]
    EXPORT2 --> DELETE2["BatchWriteItem<br/>Delete all TURN# items"]
    EXPORTN --> DELETEN["BatchWriteItem<br/>Delete all TURN# items"]

    DELETE1 --> DELETE_SUMMARY1["Delete SUMMARY# items"]
    DELETE2 --> DELETE_SUMMARY2["Delete SUMMARY# items"]
    DELETEN --> DELETE_SUMMARYN["Delete SUMMARY# items"]

    DELETE_SUMMARY1 --> UPDATE_META1["Update META<br/>status = 'archived'"]
    DELETE_SUMMARY2 --> UPDATE_META2["Update META<br/>status = 'archived'"]
    DELETE_SUMMARYN --> UPDATE_METAN["Update META<br/>status = 'archived'"]

    UPDATE_META1 --> DONE["All sessions cleaned"]
    UPDATE_META2 --> DONE
    UPDATE_METAN --> DONE

Cleanup Lambda Used by Step Functions

"""
Lambda: manga-assist-cleanup-worker
Trigger: Step Functions Map state (one invocation per session)
Purpose: Archive and delete expired session data
"""

import json
import os
import time
import boto3

dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ["SESSION_TABLE"])
s3 = boto3.client("s3")
ARCHIVE_BUCKET = os.environ["ARCHIVE_BUCKET"]


def handler(event, context):
    session_id = event["session_id"]
    pk = f"SESSION#{session_id}"

    # ── Step 1: Query all items for this session ──
    all_items = _query_all_session_items(pk)

    if not all_items:
        return {"session_id": session_id, "status": "not_found"}

    # ── Step 2: Archive to S3 ──
    archive_key = f"sessions/{session_id}/{int(time.time())}.json"
    s3.put_object(
        Bucket=ARCHIVE_BUCKET,
        Key=archive_key,
        Body=json.dumps(all_items, default=str),
        ServerSideEncryption="aws:kms",
    )

    # ── Step 3: Delete TURN and SUMMARY items (not META) ──
    items_to_delete = [item for item in all_items if item["SK"] != "META"]

    # BatchWriteItem: max 25 items per batch
    for batch_start in range(0, len(items_to_delete), 25):
        batch = items_to_delete[batch_start:batch_start + 25]
        request_items = [
            {"DeleteRequest": {"Key": {"PK": item["PK"], "SK": item["SK"]}}}
            for item in batch
        ]

        response = dynamodb.meta.client.batch_write_item(
            RequestItems={os.environ["SESSION_TABLE"]: request_items}
        )

        # Handle unprocessed items (throttled)
        unprocessed = response.get("UnprocessedItems", {})
        while unprocessed:
            time.sleep(0.5)  # Brief backoff
            response = dynamodb.meta.client.batch_write_item(
                RequestItems=unprocessed
            )
            unprocessed = response.get("UnprocessedItems", {})

    # ── Step 4: Update META to archived ──
    table.update_item(
        Key={"PK": pk, "SK": "META"},
        UpdateExpression="SET #s = :archived, archived_at = :now, archive_key = :key",
        ExpressionAttributeNames={"#s": "status"},
        ExpressionAttributeValues={
            ":archived": "archived",
            ":now": int(time.time()),
            ":key": archive_key,
        },
    )

    return {
        "session_id": session_id,
        "status": "archived",
        "items_deleted": len(items_to_delete),
        "archive_key": archive_key,
    }


def _query_all_session_items(pk: str) -> list:
    items = []
    params = {
        "KeyConditionExpression": "PK = :pk",
        "ExpressionAttributeValues": {":pk": pk},
    }
    while True:
        resp = table.query(**params)
        items.extend(resp.get("Items", []))
        if "LastEvaluatedKey" not in resp:
            break
        params["ExclusiveStartKey"] = resp["LastEvaluatedKey"]
    return items

5. API Gateway Request Validation + DynamoDB

flowchart LR
    CLIENT["Client Request"] --> APIGW["API Gateway"]

    APIGW --> VALIDATE{"Request Validation"}

    VALIDATE -->|"Missing required fields"| REJECT["400 Bad Request<br/>(Never hits Lambda)"]
    VALIDATE -->|"Valid"| AUTH["Lambda Authorizer"]

    AUTH -->|"Token invalid"| DENY["403 Forbidden"]
    AUTH -->|"Token valid"| LAMBDA["Lambda Handler"]

    LAMBDA -->|"Query/Put"| DDB["DynamoDB"]

    DDB -->|"ConditionalCheckFailed"| LAMBDA
    LAMBDA -->|"409 Conflict"| CLIENT

    DDB -->|"Item not found"| LAMBDA
    LAMBDA -->|"404 Not Found"| CLIENT

    DDB -->|"Success"| LAMBDA
    LAMBDA -->|"200 OK"| CLIENT

API Gateway Model Validation (OpenAPI)

# Validate request body BEFORE Lambda invocation
# This saves Lambda invocations and DynamoDB reads on garbage requests

paths:
  /sessions:
    post:
      requestBody:
        required: true
        content:
          application/json:
            schema:
              type: object
              required:
                - customer_id
              properties:
                customer_id:
                  type: string
                  pattern: "^[a-zA-Z0-9-]{1,64}$"
                metadata:
                  type: object
                  properties:
                    source:
                      type: string
                      enum: [web, mobile, voice]
                    language:
                      type: string
                      pattern: "^[a-z]{2}(-[A-Z]{2})?$"

  /sessions/{id}/turns:
    get:
      parameters:
        - name: id
          in: path
          required: true
          schema:
            type: string
            format: uuid
        - name: limit
          in: query
          schema:
            type: integer
            minimum: 1
            maximum: 100
            default: 20

6. Step Functions Direct DynamoDB Integration (No Lambda)

flowchart LR
    SF["Step Functions"] -->|"Direct SDK integration"| DDB["DynamoDB"]

    subgraph No_Lambda_Needed["No Lambda Needed!"]
        direction TB
        OP1["GetItem — Read session"]
        OP2["PutItem — Write record"]
        OP3["UpdateItem — Modify status"]
        OP4["Query — List items"]
        OP5["DeleteItem — Remove item"]
    end

    SF --> No_Lambda_Needed
    No_Lambda_Needed --> DDB

    subgraph Still_Need_Lambda["Still Need Lambda For"]
        direction TB
        NL1["Complex business logic"]
        NL2["Multiple DynamoDB calls"]
        NL3["Cross-service orchestration"]
        NL4["Data transformation"]
    end

Direct Integration Example

{
  "Comment": "Read session directly from DynamoDB — no Lambda needed",
  "StartAt": "ReadSession",
  "States": {
    "ReadSession": {
      "Type": "Task",
      "Resource": "arn:aws:states:::dynamodb:getItem",
      "Parameters": {
        "TableName": "manga-assist-sessions",
        "Key": {
          "PK": {"S.$": "States.Format('SESSION#{}', $.session_id)"},
          "SK": {"S": "META"}
        },
        "ProjectionExpression": "session_id, customer_id, #s, turn_count",
        "ExpressionAttributeNames": {
          "#s": "status"
        }
      },
      "ResultPath": "$.sessionData",
      "Next": "ProcessResult"
    },
    "ProcessResult": {
      "Type": "Choice",
      "Choices": [{
        "Variable": "$.sessionData.Item",
        "IsPresent": true,
        "Next": "SessionFound"
      }],
      "Default": "SessionNotFound"
    },
    "SessionFound": { "Type": "Succeed" },
    "SessionNotFound": { "Type": "Fail", "Error": "SessionNotFound" }
  }
}

7. Common Mistakes Teams Make

Mistake Consequence Fix
Using Lambda for simple DynamoDB reads in Step Functions Extra Lambda invocation cost and latency Use direct SDK integration (arn:aws:states:::dynamodb:getItem)
No request validation at API Gateway Lambda invoked for junk requests, wastes compute + DynamoDB reads Define request models in OpenAPI spec
Step Functions timeout too short Workflow fails during peak when DynamoDB is slightly slower Set generous timeouts (5min+) with retry policies
Not using Map state for batch cleanup Processing sessions sequentially takes hours Use Map with MaxConcurrency for parallel processing
Synchronous Step Functions execution from API Client waits for entire workflow to complete (could be minutes) Use StartExecution async, return execution ARN, client polls
No idempotency token on Step Functions start Duplicate API calls create duplicate workflow executions Pass name parameter as idempotency key

8. Critical Things to Remember

For Interviews

  1. API Gateway validates BEFORE Lambda — Use request models to reject bad requests at the gateway level. This saves Lambda invocations AND DynamoDB capacity.

  2. Step Functions can call DynamoDB directly — No Lambda needed for simple GetItem/PutItem/UpdateItem. This is cheaper, faster, and less code.

  3. Use waitForTaskToken for human-in-the-loop — Step Functions pauses until an external system sends the token back. Perfect for "wait for agent to accept handoff."

  4. Map state is parallel foreach — When you need to process N sessions, Map state runs them concurrently with configurable MaxConcurrency.

  5. Step Functions Express vs Standard — Express: <5 min, high-volume, cheaper (use for per-message processing). Standard: up to 1 year, at-most-once (use for handoff workflows).

For Production

  1. Always return execution ARN to client — Let clients poll DescribeExecution instead of waiting synchronously.

  2. Set MaxConcurrency on Map states — Unbounded parallelism can overwhelm DynamoDB and downstream services.

  3. Use Step Functions retry policies — Built-in exponential backoff with jitter is better than rolling your own in Lambda.

  4. API Gateway throttling protects DynamoDB — Set rate limits on API Gateway to prevent a misbehaving client from overwhelming your DynamoDB capacity.

  5. Log execution input/output in Step Functions — Turn on CloudWatch logging at ALL level for debugging, but disable includeExecutionData in production for PII safety.