Task 2.4: Implement FM API Integrations
Overview
This task covers building reliable, real-time, and intelligent FM API integrations with synchronous/async patterns, streaming, resilience, and model routing.
Skill 2.4.1: Create Flexible Model Interaction Systems
Core Concepts
- Synchronous: Lambda/EC2/ECS calls Bedrock API, waits for response
- Asynchronous: Decouple request from response using SQS, process in background
- API Gateway: Custom API layer with request validation, transformation, and throttling
- Language-specific SDKs: boto3 (Python), AWS SDK for Java, JavaScript SDK
User Story 17: Multi-Channel AI Platform with Sync and Async Patterns
As a software architect, I want an AI platform supporting both real-time chat (sync) and batch document processing (async), So that customers get instant responses for chat while document analysis happens efficiently in the background.
Deep Dive Scenario
Company: DocuAI - serves 10K concurrent chat users + processes 50K documents/day
Architecture:
[Channel 1: Real-Time Chat] [Channel 2: Document Processing]
| |
v v
[API Gateway] [API Gateway]
| |
v v
[Lambda: Sync Handler] [Lambda: Async Submitter]
| |
v v
[Bedrock API - Synchronous] [SQS: Document Queue]
| |
v v
[Immediate Response] [Lambda: Batch Processor]
|
v
[Bedrock API - Batch]
|
v
[S3: Results] --> [SNS: Notification]
Synchronous Pattern - Real-Time Chat:
import boto3
import json
bedrock_runtime = boto3.client(
'bedrock-runtime',
region_name='us-east-1',
config=boto3.session.Config(
retries={'max_attempts': 3, 'mode': 'adaptive'},
connect_timeout=5,
read_timeout=60
)
)
def sync_chat_handler(event, context):
"""Synchronous FM invocation for real-time chat.
Flow: API Gateway -> Lambda -> Bedrock -> Lambda -> API Gateway -> Client
Total latency budget: <3 seconds
"""
body = json.loads(event["body"])
messages = body["messages"]
# Validate request
if not messages or len(messages) > 50:
return {"statusCode": 400, "body": "Invalid message count"}
# Synchronous Bedrock call - Lambda waits for response
response = bedrock_runtime.invoke_model(
modelId=body.get("model", "anthropic.claude-sonnet-4-20250514"),
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": body.get("max_tokens", 1024),
"messages": messages,
"system": body.get("system_prompt", "You are a helpful assistant.")
}),
contentType="application/json",
accept="application/json"
)
result = json.loads(response['body'].read())
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({
"response": result["content"][0]["text"],
"model": result.get("model"),
"usage": result["usage"],
"stop_reason": result["stop_reason"]
})
}
Asynchronous Pattern - Document Processing:
# Step 1: Accept request, queue for processing
def async_submit_handler(event, context):
"""Accept document for async processing. Returns immediately with job ID."""
body = json.loads(event["body"])
job_id = str(uuid.uuid4())
# Validate document
document_key = body["s3_key"]
if not s3_object_exists(body["s3_bucket"], document_key):
return {"statusCode": 404, "body": "Document not found"}
# Queue for processing
sqs.send_message(
QueueUrl=os.environ["DOCUMENT_QUEUE_URL"],
MessageBody=json.dumps({
"job_id": job_id,
"s3_bucket": body["s3_bucket"],
"s3_key": document_key,
"analysis_type": body.get("analysis_type", "summarize"),
"callback_url": body.get("callback_url"),
"submitted_at": datetime.utcnow().isoformat()
}),
MessageAttributes={
"Priority": {"DataType": "String", "StringValue": body.get("priority", "normal")}
}
)
# Store job status
dynamodb.put_item(
TableName="DocumentJobs",
Item={
"job_id": job_id,
"status": "QUEUED",
"submitted_at": datetime.utcnow().isoformat()
}
)
return {
"statusCode": 202, # Accepted
"body": json.dumps({
"job_id": job_id,
"status": "QUEUED",
"status_url": f"/api/v1/jobs/{job_id}"
})
}
# Step 2: Process from queue (triggered by SQS)
def async_processor(event, context):
"""Process documents from SQS queue with Bedrock."""
for record in event["Records"]:
job = json.loads(record["body"])
job_id = job["job_id"]
try:
# Update status
update_job_status(job_id, "PROCESSING")
# Download document
document_text = download_and_extract(job["s3_bucket"], job["s3_key"])
# Process with FM
response = bedrock_runtime.invoke_model(
modelId="anthropic.claude-sonnet-4-20250514",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 4096,
"messages": [{
"role": "user",
"content": f"Analyze this document:\n\n{document_text}"
}]
})
)
result = json.loads(response['body'].read())["content"][0]["text"]
# Store result in S3
s3.put_object(
Bucket="results-bucket",
Key=f"results/{job_id}.json",
Body=json.dumps({"analysis": result, "job_id": job_id})
)
update_job_status(job_id, "COMPLETED")
# Notify via callback if provided
if job.get("callback_url"):
requests.post(job["callback_url"], json={"job_id": job_id, "status": "COMPLETED"})
except Exception as e:
update_job_status(job_id, "FAILED", error=str(e))
API Gateway with Request Validation:
# OpenAPI spec for API Gateway
openapi: "3.0"
paths:
/v1/chat:
post:
x-amazon-apigateway-request-validator: all
requestBody:
required: true
content:
application/json:
schema:
type: object
required: [messages]
properties:
messages:
type: array
maxItems: 50
items:
type: object
required: [role, content]
properties:
role:
type: string
enum: [user, assistant]
content:
type: string
maxLength: 100000
max_tokens:
type: integer
minimum: 1
maximum: 4096
default: 1024
model:
type: string
enum:
- anthropic.claude-haiku-4-5-20251001
- anthropic.claude-sonnet-4-20250514
Skill 2.4.2: Develop Real-Time AI Interaction Systems (Streaming)
User Story 18: Real-Time Code Assistant with Streaming Responses
As a developer tools product manager, I want a code assistant that streams explanations and code suggestions in real-time, So that developers see responses progressively instead of waiting 10+ seconds for full completion.
Deep Dive Scenario
Streaming Architecture:
[IDE Plugin / Web UI]
|
|--- WebSocket connection to API Gateway
|
v
[API Gateway: WebSocket API]
|
v
[Lambda: Connection Manager]
|--- On $connect: Store connectionId in DynamoDB
|--- On message: Invoke Bedrock streaming
|--- On $disconnect: Clean up
|
v
[Bedrock: invoke_model_with_response_stream]
|
|--- Chunk 1: "Here's how to" --> [WebSocket: push to client]
|--- Chunk 2: " fix the bug:" --> [WebSocket: push to client]
|--- Chunk 3: "\n```python\n" --> [WebSocket: push to client]
|--- Chunk N: "```" --> [WebSocket: push to client]
|
v
[Client renders progressively as chunks arrive]
Bedrock Streaming API:
def stream_response(event, context):
"""Stream FM response via Bedrock streaming API."""
connection_id = event["requestContext"]["connectionId"]
body = json.loads(event["body"])
# Invoke Bedrock with streaming
response = bedrock_runtime.invoke_model_with_response_stream(
modelId="anthropic.claude-sonnet-4-20250514",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 4096,
"messages": body["messages"],
"system": "You are a code assistant. Provide clear explanations and working code."
})
)
# Process stream and forward chunks to WebSocket client
full_response = ""
for event_chunk in response['body']:
chunk = json.loads(event_chunk['chunk']['bytes'])
if chunk['type'] == 'content_block_delta':
text_delta = chunk['delta'].get('text', '')
full_response += text_delta
# Push chunk to client via WebSocket
apigateway_management.post_to_connection(
ConnectionId=connection_id,
Data=json.dumps({
"type": "delta",
"text": text_delta
}).encode()
)
elif chunk['type'] == 'message_delta':
# End of stream - send usage info
apigateway_management.post_to_connection(
ConnectionId=connection_id,
Data=json.dumps({
"type": "complete",
"stop_reason": chunk['delta'].get('stop_reason'),
"usage": chunk.get('usage', {})
}).encode()
)
return {"statusCode": 200}
Server-Sent Events (SSE) Alternative via API Gateway:
# For clients that support SSE (simpler than WebSockets)
def sse_stream_handler(event, context):
"""Stream via Server-Sent Events using API Gateway chunked transfer."""
body = json.loads(event["body"])
response = bedrock_runtime.invoke_model_with_response_stream(
modelId="anthropic.claude-sonnet-4-20250514",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 4096,
"messages": body["messages"]
})
)
# Collect chunks and format as SSE
# Note: Lambda response streaming uses Lambda function URLs
# or API Gateway with chunked transfer encoding
chunks = []
for event_chunk in response['body']:
chunk = json.loads(event_chunk['chunk']['bytes'])
if chunk['type'] == 'content_block_delta':
text = chunk['delta'].get('text', '')
chunks.append(f"data: {json.dumps({'text': text})}\n\n")
chunks.append("data: [DONE]\n\n")
return {
"statusCode": 200,
"headers": {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Transfer-Encoding": "chunked"
},
"body": "".join(chunks),
"isBase64Encoded": False
}
Exam-Relevant Points:
- invoke_model_with_response_stream is the Bedrock streaming API
- WebSocket API Gateway enables bidirectional real-time communication
- Server-Sent Events (SSE) are simpler for unidirectional streaming
- Chunked transfer encoding sends response incrementally
- Lambda function URLs support response streaming natively
- Time-to-first-token (TTFT) is the key streaming latency metric
Skill 2.4.3: Create Resilient FM Systems
User Story 19: Fault-Tolerant AI Service Handling 10K RPM
As a reliability engineer, I want an FM-powered service that gracefully handles Bedrock throttling, timeouts, and outages, So that end users experience <0.1% error rate even during degraded conditions.
Deep Dive Scenario
Resilience Patterns Architecture:
[Client Request]
|
v
[API Gateway: Rate Limiting]
|--- Per-client throttling (1000 RPM)
|--- Global throttling (10000 RPM)
|--- Burst capacity for spikes
|
v
[Lambda: Resilient FM Client]
|
|--- [Primary: Bedrock us-east-1]
| |--- Exponential backoff (AWS SDK)
| |--- Timeout: 30 seconds
|
|--- [Fallback 1: Bedrock us-west-2] (on throttling/timeout)
| |--- Cross-region failover
|
|--- [Fallback 2: Cached/Simpler Response] (on all failures)
| |--- Semantic cache lookup
| |--- Graceful degradation message
|
v
[X-Ray Trace: End-to-end observability]
Exponential Backoff with AWS SDK:
from botocore.config import Config
# Configure retry behavior
bedrock_config = Config(
retries={
'max_attempts': 5, # Total attempts including initial
'mode': 'adaptive' # Adaptive: adjusts based on error type
# 'mode': 'standard' # Standard: always uses exponential backoff
# 'mode': 'legacy' # Legacy: basic retry
},
connect_timeout=5, # Connection establishment timeout
read_timeout=60, # Response read timeout
max_pool_connections=25 # Connection pool size
)
bedrock_runtime = boto3.client('bedrock-runtime', config=bedrock_config)
"""
Adaptive mode retry behavior:
Attempt 1: Immediate
Attempt 2: Wait ~1s (base * 2^1)
Attempt 3: Wait ~2s (base * 2^2)
Attempt 4: Wait ~4s (base * 2^3)
Attempt 5: Wait ~8s (base * 2^4)
For ThrottlingException: SDK adds additional rate-based delay
For ServiceUnavailable: Standard exponential backoff
"""
Full Resilience Implementation:
import boto3
from botocore.config import Config
from aws_xray_sdk.core import xray_recorder
import json
import hashlib
# Primary and fallback clients
primary_client = boto3.client('bedrock-runtime', region_name='us-east-1', config=bedrock_config)
fallback_client = boto3.client('bedrock-runtime', region_name='us-west-2', config=bedrock_config)
class ResilientFMClient:
"""FM client with exponential backoff, fallback, and graceful degradation."""
def __init__(self):
self.cache = boto3.client('elasticache') # Semantic cache
@xray_recorder.capture("invoke_fm")
def invoke(self, messages, model_id, max_tokens=1024):
"""Invoke FM with full resilience chain."""
# Step 1: Check semantic cache
cache_key = self._cache_key(messages)
cached = self._check_cache(cache_key)
if cached:
xray_recorder.current_subsegment().put_annotation("cache_hit", True)
return {"text": cached, "source": "cache"}
# Step 2: Try primary region
try:
xray_recorder.current_subsegment().put_annotation("region", "us-east-1")
result = self._invoke_bedrock(primary_client, model_id, messages, max_tokens)
self._update_cache(cache_key, result["text"])
return {**result, "source": "primary"}
except (
primary_client.exceptions.ThrottlingException,
primary_client.exceptions.ServiceUnavailableException,
) as e:
xray_recorder.current_subsegment().put_annotation("primary_error", str(type(e).__name__))
# Step 3: Try fallback region
try:
xray_recorder.current_subsegment().put_annotation("region", "us-west-2")
result = self._invoke_bedrock(fallback_client, model_id, messages, max_tokens)
self._update_cache(cache_key, result["text"])
return {**result, "source": "fallback_region"}
except Exception as fallback_error:
xray_recorder.current_subsegment().put_annotation("fallback_error", str(type(fallback_error).__name__))
# Step 4: Graceful degradation
return {
"text": "I'm experiencing high demand. Here's a brief response based on cached knowledge.",
"source": "degraded",
"degraded": True
}
def _invoke_bedrock(self, client, model_id, messages, max_tokens):
"""Core Bedrock invocation with proper error handling."""
response = client.invoke_model(
modelId=model_id,
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": max_tokens,
"messages": messages
})
)
result = json.loads(response['body'].read())
return {
"text": result["content"][0]["text"],
"usage": result["usage"],
"model": model_id
}
def _cache_key(self, messages):
"""Generate cache key from message content."""
content = json.dumps(messages, sort_keys=True)
return hashlib.sha256(content.encode()).hexdigest()
def _check_cache(self, key):
"""Check Redis/ElastiCache for cached response."""
try:
return redis_client.get(f"fm:cache:{key}")
except Exception:
return None
def _update_cache(self, key, response):
"""Cache response with 1-hour TTL."""
try:
redis_client.setex(f"fm:cache:{key}", 3600, response)
except Exception:
pass # Cache miss is acceptable
# API Gateway rate limiting configuration
"""
API Gateway Usage Plan:
{
"throttle": {
"rateLimit": 100, # Steady-state requests per second
"burstLimit": 200 # Burst capacity
},
"quota": {
"limit": 100000, # Monthly request quota
"period": "MONTH"
}
}
"""
X-Ray Distributed Tracing:
# X-Ray provides end-to-end trace across:
# API Gateway -> Lambda -> Bedrock -> DynamoDB -> etc.
from aws_xray_sdk.core import xray_recorder, patch_all
patch_all() # Auto-instrument boto3, requests, etc.
@xray_recorder.capture("process_request")
def handler(event, context):
"""X-Ray traces the full request lifecycle."""
# Custom annotations for filtering traces
xray_recorder.current_subsegment().put_annotation("model_id", "claude-sonnet")
xray_recorder.current_subsegment().put_annotation("request_type", "chat")
# Custom metadata for debugging
xray_recorder.current_subsegment().put_metadata(
"input_tokens", len(event["body"]),
"fm_invocation"
)
# All downstream AWS SDK calls are automatically traced
response = bedrock_runtime.invoke_model(...)
xray_recorder.current_subsegment().put_annotation(
"latency_ms",
response["ResponseMetadata"]["HTTPHeaders"].get("x-amzn-bedrock-invocation-latency")
)
return response
Skill 2.4.4: Develop Intelligent Model Routing Systems
User Story 20: Smart Model Router for Multi-Purpose AI Platform
As a platform architect, I want intelligent routing that sends each request to the optimal FM based on content, complexity, and metrics, So that we maximize quality while minimizing cost and latency across 15 different AI features.
Deep Dive Scenario
Routing Architecture:
[Incoming Request]
|
v
[API Gateway: Request Transformation]
|--- Extract routing metadata (content type, complexity hints, SLA)
|
v
[Routing Decision Engine]
|
|--- [Static Routing] (API Gateway path-based)
| /v1/moderate -> Haiku (always fast/cheap)
| /v1/summarize -> Sonnet (balanced)
| /v1/analyze -> Opus (best quality)
|
|--- [Dynamic Content-Based Routing] (Step Functions)
| Classify content -> Route to specialist FM
| Code -> Code-optimized model
| Legal -> Legal-fine-tuned model
| Math -> Math-specialized model
|
|--- [Metrics-Based Routing] (Lambda + CloudWatch)
| If model latency > threshold -> Route to alternate
| If error rate > threshold -> Route to fallback
| If cost > budget -> Downgrade to cheaper model
|
v
[Selected FM] -> [Response]
Static Routing via API Gateway:
# API Gateway routes different paths to different models
paths:
/v1/moderate:
post:
x-amazon-apigateway-integration:
type: aws_proxy
uri: arn:aws:lambda:us-east-1:123:function:invoke-haiku
httpMethod: POST
/v1/summarize:
post:
x-amazon-apigateway-integration:
type: aws_proxy
uri: arn:aws:lambda:us-east-1:123:function:invoke-sonnet
httpMethod: POST
/v1/analyze:
post:
x-amazon-apigateway-integration:
type: aws_proxy
uri: arn:aws:lambda:us-east-1:123:function:invoke-opus
httpMethod: POST
Dynamic Content-Based Routing via Step Functions:
{
"StartAt": "ClassifyContent",
"States": {
"ClassifyContent": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123:function:ClassifyContent",
"Comment": "Determine content type and complexity",
"ResultPath": "$.classification",
"Next": "RouteToSpecialist"
},
"RouteToSpecialist": {
"Type": "Choice",
"Choices": [
{
"And": [
{"Variable": "$.classification.domain", "StringEquals": "code"},
{"Variable": "$.classification.complexity", "StringEquals": "high"}
],
"Next": "InvokeCodeSpecialist"
},
{
"Variable": "$.classification.domain",
"StringEquals": "legal",
"Next": "InvokeLegalModel"
},
{
"Variable": "$.classification.domain",
"StringEquals": "math",
"Next": "InvokeMathModel"
},
{
"Variable": "$.classification.complexity",
"StringEquals": "simple",
"Next": "InvokeHaiku"
}
],
"Default": "InvokeSonnet"
},
"InvokeCodeSpecialist": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123:function:InvokeModel",
"Parameters": {
"model_id": "anthropic.claude-sonnet-4-20250514",
"system_prompt": "You are an expert software engineer...",
"request.$": "$.request"
},
"End": true
},
"InvokeLegalModel": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123:function:InvokeModel",
"Parameters": {
"model_id": "arn:aws:sagemaker:us-east-1:123:endpoint/legal-fine-tuned",
"request.$": "$.request"
},
"End": true
}
}
}
Metrics-Based Intelligent Routing:
class MetricsBasedRouter:
"""Route requests based on real-time model performance metrics."""
def __init__(self):
self.cloudwatch = boto3.client('cloudwatch')
self.models = {
"primary": "anthropic.claude-sonnet-4-20250514",
"fallback": "anthropic.claude-haiku-4-5-20251001",
"premium": "anthropic.claude-opus-4-20250514"
}
def select_model(self, request):
"""Select model based on real-time metrics and request requirements."""
# Get current metrics for each model
metrics = self._get_model_metrics()
# Rule 1: If primary model latency > SLA, use fallback
if metrics["primary"]["p99_latency_ms"] > request.get("max_latency_ms", 5000):
return self.models["fallback"]
# Rule 2: If primary error rate > threshold, use fallback
if metrics["primary"]["error_rate"] > 0.05:
return self.models["fallback"]
# Rule 3: If daily budget exceeded, downgrade
if metrics["primary"]["daily_cost"] > request.get("daily_budget", 100):
return self.models["fallback"]
# Rule 4: If request explicitly requires high quality
if request.get("quality_requirement") == "highest":
return self.models["premium"]
return self.models["primary"]
def _get_model_metrics(self):
"""Fetch real-time metrics from CloudWatch."""
metrics = {}
for name, model_id in self.models.items():
# Get p99 latency from last 5 minutes
latency = self.cloudwatch.get_metric_statistics(
Namespace="GenAI/ModelMetrics",
MetricName="InvocationLatency",
Dimensions=[{"Name": "ModelId", "Value": model_id}],
StartTime=datetime.utcnow() - timedelta(minutes=5),
EndTime=datetime.utcnow(),
Period=300,
ExtendedStatistics=["p99"]
)
error_rate = self.cloudwatch.get_metric_statistics(
Namespace="GenAI/ModelMetrics",
MetricName="ErrorRate",
Dimensions=[{"Name": "ModelId", "Value": model_id}],
StartTime=datetime.utcnow() - timedelta(minutes=5),
EndTime=datetime.utcnow(),
Period=300,
Statistics=["Average"]
)
metrics[name] = {
"p99_latency_ms": extract_stat(latency, "p99"),
"error_rate": extract_stat(error_rate, "Average"),
"daily_cost": get_daily_cost(model_id)
}
return metrics
API Gateway Request Transformation for Routing:
{
"Comment": "API Gateway mapping template for model routing",
"requestTemplates": {
"application/json": {
"#set($routingHint = $input.path('$.routing_hint'))",
"#if($routingHint == 'fast')",
" #set($modelId = 'anthropic.claude-haiku-4-5-20251001')",
"#elseif($routingHint == 'balanced')",
" #set($modelId = 'anthropic.claude-sonnet-4-20250514')",
"#else",
" #set($modelId = 'anthropic.claude-sonnet-4-20250514')",
"#end",
"{\"model_id\": \"$modelId\", \"request\": $input.json('$')}"
}
}
}
Routing Strategy Comparison:
| Strategy | Implementation | Latency | Flexibility | Best For |
|---|---|---|---|---|
| Static | API Gateway paths | ~1ms | Low | Known, fixed routing rules |
| Content-Based | Step Functions | ~100ms | High | Domain-specific model selection |
| Metrics-Based | Lambda + CloudWatch | ~50ms | High | Performance optimization |
| Cost-Based | Application code | ~10ms | Medium | Budget management |
| Hybrid | Combination | ~100ms | Highest | Production platforms |
Exam-Relevant Points:
- Static routing: API Gateway path-based, fastest, simplest
- Dynamic routing: Step Functions Choice states for content-based decisions
- Metrics-based routing: CloudWatch metrics drive real-time model selection
- API Gateway request transformations add routing logic at the gateway level
- Exponential backoff in AWS SDK: retries={'mode': 'adaptive'} for smart retry
- Fallback mechanisms: cross-region failover, model downgrade, cached responses
- X-Ray traces across API Gateway -> Lambda -> Bedrock for observability
- Rate limiting at API Gateway prevents overwhelming FM services
- Streaming uses invoke_model_with_response_stream + WebSocket or SSE
- Async processing: SQS decouples request submission from FM processing