DynamoDB + API Gateway + Step Functions — Orchestration LLD
Project: MangaAssist (Amazon-style chatbot) Scope: How API Gateway routes requests, Step Functions orchestrate multi-step workflows, and DynamoDB serves as the state backbone.
1. Three-Layer Architecture
flowchart TB
subgraph Client_Layer["Client Layer"]
WEB["Web App"]
MOBILE["Mobile App"]
AGENT["Agent Console"]
end
subgraph API_Layer["API Gateway Layer"]
REST["REST API<br/>(CRUD operations)"]
WS["WebSocket API<br/>(Real-time chat)"]
end
subgraph Orchestration["Step Functions Orchestration"]
SF_ONBOARD["Session Onboarding Flow"]
SF_HANDOFF["Human Handoff Flow"]
SF_CLEANUP["Session Cleanup Flow"]
SF_EXPORT["Data Export Flow"]
end
subgraph Compute["Lambda Functions"]
L1["Auth Lambda"]
L2["Message Lambda"]
L3["Session Lambda"]
L4["Admin Lambda"]
end
subgraph State["DynamoDB"]
SESSIONS[(manga-assist-sessions)]
end
WEB --> REST
WEB --> WS
MOBILE --> REST
MOBILE --> WS
AGENT --> REST
REST --> L1
REST --> L3
REST --> L4
WS --> L2
L1 --> SESSIONS
L2 --> SESSIONS
L3 --> SESSIONS
L4 --> SESSIONS
REST -->|"POST /sessions/{id}/handoff"| SF_HANDOFF
REST -->|"POST /admin/cleanup"| SF_CLEANUP
REST -->|"POST /admin/export"| SF_EXPORT
WS -->|"$connect"| SF_ONBOARD
SF_HANDOFF --> SESSIONS
SF_CLEANUP --> SESSIONS
SF_EXPORT --> SESSIONS
2. API Gateway REST Endpoints → DynamoDB
flowchart LR
subgraph REST_API["REST API Gateway"]
direction TB
E1["GET /sessions/{id}"]
E2["GET /sessions/{id}/turns"]
E3["POST /sessions"]
E4["POST /sessions/{id}/handoff"]
E5["DELETE /sessions/{id}"]
E6["GET /customers/{id}/sessions"]
end
subgraph Lambda_Handlers["Lambda"]
H1["get_session"]
H2["get_turns"]
H3["create_session"]
H4["initiate_handoff"]
H5["delete_session"]
H6["list_customer_sessions"]
end
subgraph DynamoDB_Ops["DynamoDB Operations"]
D1["GetItem(PK, SK=META)"]
D2["Query(PK, SK begins_with TURN#)"]
D3["PutItem(PK, SK=META)<br/>ConditionExpression"]
D4["TransactWriteItems<br/>(META + HANDOFF + TURN)"]
D5["DeleteItem(PK, SK=META)<br/>+ BatchWrite to remove turns"]
D6["Query GSI1<br/>(customer_id, updated_at)"]
end
E1 --> H1 --> D1
E2 --> H2 --> D2
E3 --> H3 --> D3
E4 --> H4 --> D4
E5 --> H5 --> D5
E6 --> H6 --> D6
API Gateway + Lambda Proxy Integration
"""
Lambda: manga-assist-session-api
Trigger: API Gateway REST proxy integration
Purpose: Unified handler for session CRUD operations
"""
import json
import os
import time
import uuid
import boto3
from botocore.exceptions import ClientError
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ["SESSION_TABLE"])
def handler(event, context):
http_method = event["httpMethod"]
path = event["resource"]
path_params = event.get("pathParameters") or {}
query_params = event.get("queryStringParameters") or {}
body = json.loads(event.get("body") or "{}")
# Route based on method + path
routes = {
("GET", "/sessions/{id}"): get_session,
("GET", "/sessions/{id}/turns"): get_turns,
("POST", "/sessions"): create_session,
("DELETE", "/sessions/{id}"): delete_session,
("GET", "/customers/{id}/sessions"): list_customer_sessions,
}
handler_fn = routes.get((http_method, path))
if not handler_fn:
return _response(404, {"error": "Not found"})
try:
return handler_fn(path_params, query_params, body)
except ClientError as e:
code = e.response["Error"]["Code"]
if code == "ConditionalCheckFailedException":
return _response(409, {"error": "Conflict — resource already exists or condition not met"})
if code == "ResourceNotFoundException":
return _response(404, {"error": "Resource not found"})
raise
def get_session(path_params, query_params, body):
session_id = path_params["id"]
resp = table.get_item(Key={"PK": f"SESSION#{session_id}", "SK": "META"})
item = resp.get("Item")
if not item:
return _response(404, {"error": "Session not found"})
return _response(200, _sanitize(item))
def get_turns(path_params, query_params, body):
session_id = path_params["id"]
limit = int(query_params.get("limit", "20"))
# Cap at 100 to prevent abuse
limit = min(limit, 100)
resp = table.query(
KeyConditionExpression="PK = :pk AND begins_with(SK, :prefix)",
ExpressionAttributeValues={
":pk": f"SESSION#{session_id}",
":prefix": "TURN#",
},
ScanIndexForward=False,
Limit=limit,
)
turns = resp.get("Items", [])
turns.reverse() # Return in chronological order
return _response(200, {
"session_id": session_id,
"turns": [_sanitize(t) for t in turns],
"has_more": "LastEvaluatedKey" in resp,
})
def create_session(path_params, query_params, body):
session_id = str(uuid.uuid4())
customer_id = body.get("customer_id", "anonymous")
now = int(time.time())
table.put_item(
Item={
"PK": f"SESSION#{session_id}",
"SK": "META",
"session_id": session_id,
"customer_id": customer_id,
"status": "active",
"turn_count": 0,
"created_at": now,
"updated_at": now,
"ttl": now + 86400,
},
ConditionExpression="attribute_not_exists(PK)",
)
return _response(201, {"session_id": session_id})
def delete_session(path_params, query_params, body):
session_id = path_params["id"]
pk = f"SESSION#{session_id}"
# First, mark META as deleted (soft delete)
table.update_item(
Key={"PK": pk, "SK": "META"},
UpdateExpression="SET #s = :deleted, deleted_at = :now",
ExpressionAttributeNames={"#s": "status"},
ExpressionAttributeValues={
":deleted": "deleted",
":now": int(time.time()),
},
)
# Actual cleanup of TURN/SUMMARY items happens via Step Functions async
return _response(200, {"status": "deletion_initiated", "session_id": session_id})
def list_customer_sessions(path_params, query_params, body):
customer_id = path_params["id"]
limit = min(int(query_params.get("limit", "10")), 50)
resp = table.query(
IndexName="GSI1-customer-sessions",
KeyConditionExpression="customer_id = :cid",
ExpressionAttributeValues={":cid": customer_id},
ScanIndexForward=False, # Most recent first
Limit=limit,
)
return _response(200, {
"customer_id": customer_id,
"sessions": [_sanitize(i) for i in resp.get("Items", [])],
})
def _sanitize(item: dict) -> dict:
"""Remove internal fields and binary data before returning to client."""
exclude = {"PK", "SK", "content_compressed", "ttl"}
return {k: v for k, v in item.items() if k not in exclude}
def _response(status: int, body: dict) -> dict:
return {
"statusCode": status,
"headers": {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
},
"body": json.dumps(body, default=str),
}
3. Step Functions — Human Handoff Workflow
stateDiagram-v2
[*] --> ValidateSession
ValidateSession --> CheckAgentAvailability : Session is active
ValidateSession --> HandoffFailed : Session not found or inactive
CheckAgentAvailability --> AssignAgent : Agent available
CheckAgentAvailability --> QueueForAgent : No agent available
QueueForAgent --> WaitForAgent : Add to queue
WaitForAgent --> AssignAgent : Agent picked up (callback)
WaitForAgent --> EscalateToManager : Timeout (5 min)
AssignAgent --> UpdateDynamoDB : Agent confirmed
UpdateDynamoDB --> NotifyCustomer : Transaction success
UpdateDynamoDB --> HandoffFailed : Transaction failed
NotifyCustomer --> TransferConversationContext
TransferConversationContext --> NotifyAgent
NotifyAgent --> MonitorHandoff
MonitorHandoff --> HandoffComplete : Agent sends first reply
MonitorHandoff --> ReassignAgent : Agent idle > 2 min
ReassignAgent --> CheckAgentAvailability
EscalateToManager --> HandoffFailed : No manager available
EscalateToManager --> AssignAgent : Manager takes over
HandoffComplete --> [*]
HandoffFailed --> [*]
Step Functions Definition (ASL)
{
"Comment": "Human Handoff Workflow for MangaAssist",
"StartAt": "ValidateSession",
"States": {
"ValidateSession": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:getItem",
"Parameters": {
"TableName": "manga-assist-sessions",
"Key": {
"PK": {"S.$": "States.Format('SESSION#{}', $.session_id)"},
"SK": {"S": "META"}
}
},
"ResultPath": "$.session",
"Next": "CheckSessionActive",
"Catch": [{
"ErrorEquals": ["States.ALL"],
"Next": "HandoffFailed",
"ResultPath": "$.error"
}]
},
"CheckSessionActive": {
"Type": "Choice",
"Choices": [{
"Variable": "$.session.Item.status.S",
"StringEquals": "active",
"Next": "CheckAgentAvailability"
}],
"Default": "HandoffFailed"
},
"CheckAgentAvailability": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789:function:check-agent-availability",
"Parameters": {
"skill_required.$": "$.reason",
"priority.$": "$.priority"
},
"ResultPath": "$.agent",
"Next": "IsAgentAvailable",
"Retry": [{
"ErrorEquals": ["ServiceException"],
"IntervalSeconds": 2,
"MaxAttempts": 3,
"BackoffRate": 2.0
}]
},
"IsAgentAvailable": {
"Type": "Choice",
"Choices": [{
"Variable": "$.agent.available",
"BooleanEquals": true,
"Next": "UpdateDynamoDB"
}],
"Default": "WaitForAgent"
},
"WaitForAgent": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
"QueueUrl": "https://sqs.us-east-1.amazonaws.com/123456789/agent-assignment-queue",
"MessageBody": {
"session_id.$": "$.session_id",
"reason.$": "$.reason",
"taskToken.$": "$$.Task.Token"
}
},
"TimeoutSeconds": 300,
"ResultPath": "$.agent",
"Next": "UpdateDynamoDB",
"Catch": [{
"ErrorEquals": ["States.Timeout"],
"Next": "EscalateToManager"
}]
},
"UpdateDynamoDB": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789:function:update-handoff-dynamodb",
"Parameters": {
"session_id.$": "$.session_id",
"agent_id.$": "$.agent.agent_id",
"reason.$": "$.reason"
},
"Next": "NotifyParties",
"Catch": [{
"ErrorEquals": ["ConditionalCheckFailedException"],
"Next": "HandoffFailed"
}]
},
"NotifyParties": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "NotifyCustomer",
"States": {
"NotifyCustomer": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789:function:notify-customer",
"End": true
}
}
},
{
"StartAt": "NotifyAgent",
"States": {
"NotifyAgent": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789:function:notify-agent",
"End": true
}
}
}
],
"Next": "HandoffComplete"
},
"EscalateToManager": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789:function:escalate-to-manager",
"Next": "CheckAgentAvailability",
"Catch": [{
"ErrorEquals": ["States.ALL"],
"Next": "HandoffFailed"
}]
},
"HandoffComplete": {
"Type": "Succeed"
},
"HandoffFailed": {
"Type": "Fail",
"Error": "HandoffError",
"Cause": "Unable to complete handoff to human agent"
}
}
}
4. Step Functions — Session Cleanup Workflow
flowchart TD
START["Triggered by:<br/>- CloudWatch scheduled rule (hourly)<br/>- Admin API call<br/>- EventBridge session disconnected"]
START --> QUERY_EXPIRED["Query DynamoDB<br/>Find sessions with<br/>status=disconnected AND<br/>disconnected_at < (now - 1h)"]
QUERY_EXPIRED --> MAP_STATE{"Map State<br/>(parallel per session)"}
MAP_STATE --> |"Session 1"| EXPORT1["Export turns to S3<br/>(audit archive)"]
MAP_STATE --> |"Session 2"| EXPORT2["Export turns to S3"]
MAP_STATE --> |"Session N"| EXPORTN["Export turns to S3"]
EXPORT1 --> DELETE1["BatchWriteItem<br/>Delete all TURN# items"]
EXPORT2 --> DELETE2["BatchWriteItem<br/>Delete all TURN# items"]
EXPORTN --> DELETEN["BatchWriteItem<br/>Delete all TURN# items"]
DELETE1 --> DELETE_SUMMARY1["Delete SUMMARY# items"]
DELETE2 --> DELETE_SUMMARY2["Delete SUMMARY# items"]
DELETEN --> DELETE_SUMMARYN["Delete SUMMARY# items"]
DELETE_SUMMARY1 --> UPDATE_META1["Update META<br/>status = 'archived'"]
DELETE_SUMMARY2 --> UPDATE_META2["Update META<br/>status = 'archived'"]
DELETE_SUMMARYN --> UPDATE_METAN["Update META<br/>status = 'archived'"]
UPDATE_META1 --> DONE["All sessions cleaned"]
UPDATE_META2 --> DONE
UPDATE_METAN --> DONE
Cleanup Lambda Used by Step Functions
"""
Lambda: manga-assist-cleanup-worker
Trigger: Step Functions Map state (one invocation per session)
Purpose: Archive and delete expired session data
"""
import json
import os
import time
import boto3
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ["SESSION_TABLE"])
s3 = boto3.client("s3")
ARCHIVE_BUCKET = os.environ["ARCHIVE_BUCKET"]
def handler(event, context):
session_id = event["session_id"]
pk = f"SESSION#{session_id}"
# ── Step 1: Query all items for this session ──
all_items = _query_all_session_items(pk)
if not all_items:
return {"session_id": session_id, "status": "not_found"}
# ── Step 2: Archive to S3 ──
archive_key = f"sessions/{session_id}/{int(time.time())}.json"
s3.put_object(
Bucket=ARCHIVE_BUCKET,
Key=archive_key,
Body=json.dumps(all_items, default=str),
ServerSideEncryption="aws:kms",
)
# ── Step 3: Delete TURN and SUMMARY items (not META) ──
items_to_delete = [item for item in all_items if item["SK"] != "META"]
# BatchWriteItem: max 25 items per batch
for batch_start in range(0, len(items_to_delete), 25):
batch = items_to_delete[batch_start:batch_start + 25]
request_items = [
{"DeleteRequest": {"Key": {"PK": item["PK"], "SK": item["SK"]}}}
for item in batch
]
response = dynamodb.meta.client.batch_write_item(
RequestItems={os.environ["SESSION_TABLE"]: request_items}
)
# Handle unprocessed items (throttled)
unprocessed = response.get("UnprocessedItems", {})
while unprocessed:
time.sleep(0.5) # Brief backoff
response = dynamodb.meta.client.batch_write_item(
RequestItems=unprocessed
)
unprocessed = response.get("UnprocessedItems", {})
# ── Step 4: Update META to archived ──
table.update_item(
Key={"PK": pk, "SK": "META"},
UpdateExpression="SET #s = :archived, archived_at = :now, archive_key = :key",
ExpressionAttributeNames={"#s": "status"},
ExpressionAttributeValues={
":archived": "archived",
":now": int(time.time()),
":key": archive_key,
},
)
return {
"session_id": session_id,
"status": "archived",
"items_deleted": len(items_to_delete),
"archive_key": archive_key,
}
def _query_all_session_items(pk: str) -> list:
items = []
params = {
"KeyConditionExpression": "PK = :pk",
"ExpressionAttributeValues": {":pk": pk},
}
while True:
resp = table.query(**params)
items.extend(resp.get("Items", []))
if "LastEvaluatedKey" not in resp:
break
params["ExclusiveStartKey"] = resp["LastEvaluatedKey"]
return items
5. API Gateway Request Validation + DynamoDB
flowchart LR
CLIENT["Client Request"] --> APIGW["API Gateway"]
APIGW --> VALIDATE{"Request Validation"}
VALIDATE -->|"Missing required fields"| REJECT["400 Bad Request<br/>(Never hits Lambda)"]
VALIDATE -->|"Valid"| AUTH["Lambda Authorizer"]
AUTH -->|"Token invalid"| DENY["403 Forbidden"]
AUTH -->|"Token valid"| LAMBDA["Lambda Handler"]
LAMBDA -->|"Query/Put"| DDB["DynamoDB"]
DDB -->|"ConditionalCheckFailed"| LAMBDA
LAMBDA -->|"409 Conflict"| CLIENT
DDB -->|"Item not found"| LAMBDA
LAMBDA -->|"404 Not Found"| CLIENT
DDB -->|"Success"| LAMBDA
LAMBDA -->|"200 OK"| CLIENT
API Gateway Model Validation (OpenAPI)
# Validate request body BEFORE Lambda invocation
# This saves Lambda invocations and DynamoDB reads on garbage requests
paths:
/sessions:
post:
requestBody:
required: true
content:
application/json:
schema:
type: object
required:
- customer_id
properties:
customer_id:
type: string
pattern: "^[a-zA-Z0-9-]{1,64}$"
metadata:
type: object
properties:
source:
type: string
enum: [web, mobile, voice]
language:
type: string
pattern: "^[a-z]{2}(-[A-Z]{2})?$"
/sessions/{id}/turns:
get:
parameters:
- name: id
in: path
required: true
schema:
type: string
format: uuid
- name: limit
in: query
schema:
type: integer
minimum: 1
maximum: 100
default: 20
6. Step Functions Direct DynamoDB Integration (No Lambda)
flowchart LR
SF["Step Functions"] -->|"Direct SDK integration"| DDB["DynamoDB"]
subgraph No_Lambda_Needed["No Lambda Needed!"]
direction TB
OP1["GetItem — Read session"]
OP2["PutItem — Write record"]
OP3["UpdateItem — Modify status"]
OP4["Query — List items"]
OP5["DeleteItem — Remove item"]
end
SF --> No_Lambda_Needed
No_Lambda_Needed --> DDB
subgraph Still_Need_Lambda["Still Need Lambda For"]
direction TB
NL1["Complex business logic"]
NL2["Multiple DynamoDB calls"]
NL3["Cross-service orchestration"]
NL4["Data transformation"]
end
Direct Integration Example
{
"Comment": "Read session directly from DynamoDB — no Lambda needed",
"StartAt": "ReadSession",
"States": {
"ReadSession": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:getItem",
"Parameters": {
"TableName": "manga-assist-sessions",
"Key": {
"PK": {"S.$": "States.Format('SESSION#{}', $.session_id)"},
"SK": {"S": "META"}
},
"ProjectionExpression": "session_id, customer_id, #s, turn_count",
"ExpressionAttributeNames": {
"#s": "status"
}
},
"ResultPath": "$.sessionData",
"Next": "ProcessResult"
},
"ProcessResult": {
"Type": "Choice",
"Choices": [{
"Variable": "$.sessionData.Item",
"IsPresent": true,
"Next": "SessionFound"
}],
"Default": "SessionNotFound"
},
"SessionFound": { "Type": "Succeed" },
"SessionNotFound": { "Type": "Fail", "Error": "SessionNotFound" }
}
}
7. Common Mistakes Teams Make
| Mistake | Consequence | Fix |
|---|---|---|
| Using Lambda for simple DynamoDB reads in Step Functions | Extra Lambda invocation cost and latency | Use direct SDK integration (arn:aws:states:::dynamodb:getItem) |
| No request validation at API Gateway | Lambda invoked for junk requests, wastes compute + DynamoDB reads | Define request models in OpenAPI spec |
| Step Functions timeout too short | Workflow fails during peak when DynamoDB is slightly slower | Set generous timeouts (5min+) with retry policies |
| Not using Map state for batch cleanup | Processing sessions sequentially takes hours | Use Map with MaxConcurrency for parallel processing |
| Synchronous Step Functions execution from API | Client waits for entire workflow to complete (could be minutes) | Use StartExecution async, return execution ARN, client polls |
| No idempotency token on Step Functions start | Duplicate API calls create duplicate workflow executions | Pass name parameter as idempotency key |
8. Critical Things to Remember
For Interviews
-
API Gateway validates BEFORE Lambda — Use request models to reject bad requests at the gateway level. This saves Lambda invocations AND DynamoDB capacity.
-
Step Functions can call DynamoDB directly — No Lambda needed for simple GetItem/PutItem/UpdateItem. This is cheaper, faster, and less code.
-
Use
waitForTaskTokenfor human-in-the-loop — Step Functions pauses until an external system sends the token back. Perfect for "wait for agent to accept handoff." -
Map state is parallel foreach — When you need to process N sessions, Map state runs them concurrently with configurable
MaxConcurrency. -
Step Functions Express vs Standard — Express: <5 min, high-volume, cheaper (use for per-message processing). Standard: up to 1 year, at-most-once (use for handoff workflows).
For Production
-
Always return execution ARN to client — Let clients poll
DescribeExecutioninstead of waiting synchronously. -
Set
MaxConcurrencyon Map states — Unbounded parallelism can overwhelm DynamoDB and downstream services. -
Use Step Functions retry policies — Built-in exponential backoff with jitter is better than rolling your own in Lambda.
-
API Gateway throttling protects DynamoDB — Set rate limits on API Gateway to prevent a misbehaving client from overwhelming your DynamoDB capacity.
-
Log execution input/output in Step Functions — Turn on CloudWatch logging at
ALLlevel for debugging, but disableincludeExecutionDatain production for PII safety.