LOCAL PREVIEW View on GitHub

Task 2.4: Implement FM API Integrations

Overview

This task covers building reliable, real-time, and intelligent FM API integrations with synchronous/async patterns, streaming, resilience, and model routing.


Skill 2.4.1: Create Flexible Model Interaction Systems

Core Concepts

  • Synchronous: Lambda/EC2/ECS calls Bedrock API, waits for response
  • Asynchronous: Decouple request from response using SQS, process in background
  • API Gateway: Custom API layer with request validation, transformation, and throttling
  • Language-specific SDKs: boto3 (Python), AWS SDK for Java, JavaScript SDK

User Story 17: Multi-Channel AI Platform with Sync and Async Patterns

As a software architect, I want an AI platform supporting both real-time chat (sync) and batch document processing (async), So that customers get instant responses for chat while document analysis happens efficiently in the background.

Deep Dive Scenario

Company: DocuAI - serves 10K concurrent chat users + processes 50K documents/day

Architecture:

[Channel 1: Real-Time Chat]              [Channel 2: Document Processing]
    |                                         |
    v                                         v
[API Gateway]                           [API Gateway]
    |                                         |
    v                                         v
[Lambda: Sync Handler]                   [Lambda: Async Submitter]
    |                                         |
    v                                         v
[Bedrock API - Synchronous]              [SQS: Document Queue]
    |                                         |
    v                                         v
[Immediate Response]                     [Lambda: Batch Processor]
                                              |
                                              v
                                         [Bedrock API - Batch]
                                              |
                                              v
                                         [S3: Results] --> [SNS: Notification]

Synchronous Pattern - Real-Time Chat:

import boto3
import json

bedrock_runtime = boto3.client(
    'bedrock-runtime',
    region_name='us-east-1',
    config=boto3.session.Config(
        retries={'max_attempts': 3, 'mode': 'adaptive'},
        connect_timeout=5,
        read_timeout=60
    )
)

def sync_chat_handler(event, context):
    """Synchronous FM invocation for real-time chat.

    Flow: API Gateway -> Lambda -> Bedrock -> Lambda -> API Gateway -> Client
    Total latency budget: <3 seconds
    """
    body = json.loads(event["body"])
    messages = body["messages"]

    # Validate request
    if not messages or len(messages) > 50:
        return {"statusCode": 400, "body": "Invalid message count"}

    # Synchronous Bedrock call - Lambda waits for response
    response = bedrock_runtime.invoke_model(
        modelId=body.get("model", "anthropic.claude-sonnet-4-20250514"),
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": body.get("max_tokens", 1024),
            "messages": messages,
            "system": body.get("system_prompt", "You are a helpful assistant.")
        }),
        contentType="application/json",
        accept="application/json"
    )

    result = json.loads(response['body'].read())

    return {
        "statusCode": 200,
        "headers": {"Content-Type": "application/json"},
        "body": json.dumps({
            "response": result["content"][0]["text"],
            "model": result.get("model"),
            "usage": result["usage"],
            "stop_reason": result["stop_reason"]
        })
    }

Asynchronous Pattern - Document Processing:

# Step 1: Accept request, queue for processing
def async_submit_handler(event, context):
    """Accept document for async processing. Returns immediately with job ID."""

    body = json.loads(event["body"])
    job_id = str(uuid.uuid4())

    # Validate document
    document_key = body["s3_key"]
    if not s3_object_exists(body["s3_bucket"], document_key):
        return {"statusCode": 404, "body": "Document not found"}

    # Queue for processing
    sqs.send_message(
        QueueUrl=os.environ["DOCUMENT_QUEUE_URL"],
        MessageBody=json.dumps({
            "job_id": job_id,
            "s3_bucket": body["s3_bucket"],
            "s3_key": document_key,
            "analysis_type": body.get("analysis_type", "summarize"),
            "callback_url": body.get("callback_url"),
            "submitted_at": datetime.utcnow().isoformat()
        }),
        MessageAttributes={
            "Priority": {"DataType": "String", "StringValue": body.get("priority", "normal")}
        }
    )

    # Store job status
    dynamodb.put_item(
        TableName="DocumentJobs",
        Item={
            "job_id": job_id,
            "status": "QUEUED",
            "submitted_at": datetime.utcnow().isoformat()
        }
    )

    return {
        "statusCode": 202,  # Accepted
        "body": json.dumps({
            "job_id": job_id,
            "status": "QUEUED",
            "status_url": f"/api/v1/jobs/{job_id}"
        })
    }

# Step 2: Process from queue (triggered by SQS)
def async_processor(event, context):
    """Process documents from SQS queue with Bedrock."""

    for record in event["Records"]:
        job = json.loads(record["body"])
        job_id = job["job_id"]

        try:
            # Update status
            update_job_status(job_id, "PROCESSING")

            # Download document
            document_text = download_and_extract(job["s3_bucket"], job["s3_key"])

            # Process with FM
            response = bedrock_runtime.invoke_model(
                modelId="anthropic.claude-sonnet-4-20250514",
                body=json.dumps({
                    "anthropic_version": "bedrock-2023-05-31",
                    "max_tokens": 4096,
                    "messages": [{
                        "role": "user",
                        "content": f"Analyze this document:\n\n{document_text}"
                    }]
                })
            )

            result = json.loads(response['body'].read())["content"][0]["text"]

            # Store result in S3
            s3.put_object(
                Bucket="results-bucket",
                Key=f"results/{job_id}.json",
                Body=json.dumps({"analysis": result, "job_id": job_id})
            )

            update_job_status(job_id, "COMPLETED")

            # Notify via callback if provided
            if job.get("callback_url"):
                requests.post(job["callback_url"], json={"job_id": job_id, "status": "COMPLETED"})

        except Exception as e:
            update_job_status(job_id, "FAILED", error=str(e))

API Gateway with Request Validation:

# OpenAPI spec for API Gateway
openapi: "3.0"
paths:
  /v1/chat:
    post:
      x-amazon-apigateway-request-validator: all
      requestBody:
        required: true
        content:
          application/json:
            schema:
              type: object
              required: [messages]
              properties:
                messages:
                  type: array
                  maxItems: 50
                  items:
                    type: object
                    required: [role, content]
                    properties:
                      role:
                        type: string
                        enum: [user, assistant]
                      content:
                        type: string
                        maxLength: 100000
                max_tokens:
                  type: integer
                  minimum: 1
                  maximum: 4096
                  default: 1024
                model:
                  type: string
                  enum:
                    - anthropic.claude-haiku-4-5-20251001
                    - anthropic.claude-sonnet-4-20250514


Skill 2.4.2: Develop Real-Time AI Interaction Systems (Streaming)

User Story 18: Real-Time Code Assistant with Streaming Responses

As a developer tools product manager, I want a code assistant that streams explanations and code suggestions in real-time, So that developers see responses progressively instead of waiting 10+ seconds for full completion.

Deep Dive Scenario

Streaming Architecture:

[IDE Plugin / Web UI]
    |
    |--- WebSocket connection to API Gateway
    |
    v
[API Gateway: WebSocket API]
    |
    v
[Lambda: Connection Manager]
    |--- On $connect: Store connectionId in DynamoDB
    |--- On message: Invoke Bedrock streaming
    |--- On $disconnect: Clean up
    |
    v
[Bedrock: invoke_model_with_response_stream]
    |
    |--- Chunk 1: "Here's how to" --> [WebSocket: push to client]
    |--- Chunk 2: " fix the bug:" --> [WebSocket: push to client]
    |--- Chunk 3: "\n```python\n"  --> [WebSocket: push to client]
    |--- Chunk N: "```"           --> [WebSocket: push to client]
    |
    v
[Client renders progressively as chunks arrive]

Bedrock Streaming API:

def stream_response(event, context):
    """Stream FM response via Bedrock streaming API."""

    connection_id = event["requestContext"]["connectionId"]
    body = json.loads(event["body"])

    # Invoke Bedrock with streaming
    response = bedrock_runtime.invoke_model_with_response_stream(
        modelId="anthropic.claude-sonnet-4-20250514",
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 4096,
            "messages": body["messages"],
            "system": "You are a code assistant. Provide clear explanations and working code."
        })
    )

    # Process stream and forward chunks to WebSocket client
    full_response = ""
    for event_chunk in response['body']:
        chunk = json.loads(event_chunk['chunk']['bytes'])

        if chunk['type'] == 'content_block_delta':
            text_delta = chunk['delta'].get('text', '')
            full_response += text_delta

            # Push chunk to client via WebSocket
            apigateway_management.post_to_connection(
                ConnectionId=connection_id,
                Data=json.dumps({
                    "type": "delta",
                    "text": text_delta
                }).encode()
            )

        elif chunk['type'] == 'message_delta':
            # End of stream - send usage info
            apigateway_management.post_to_connection(
                ConnectionId=connection_id,
                Data=json.dumps({
                    "type": "complete",
                    "stop_reason": chunk['delta'].get('stop_reason'),
                    "usage": chunk.get('usage', {})
                }).encode()
            )

    return {"statusCode": 200}

Server-Sent Events (SSE) Alternative via API Gateway:

# For clients that support SSE (simpler than WebSockets)
def sse_stream_handler(event, context):
    """Stream via Server-Sent Events using API Gateway chunked transfer."""

    body = json.loads(event["body"])

    response = bedrock_runtime.invoke_model_with_response_stream(
        modelId="anthropic.claude-sonnet-4-20250514",
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 4096,
            "messages": body["messages"]
        })
    )

    # Collect chunks and format as SSE
    # Note: Lambda response streaming uses Lambda function URLs
    # or API Gateway with chunked transfer encoding
    chunks = []
    for event_chunk in response['body']:
        chunk = json.loads(event_chunk['chunk']['bytes'])
        if chunk['type'] == 'content_block_delta':
            text = chunk['delta'].get('text', '')
            chunks.append(f"data: {json.dumps({'text': text})}\n\n")

    chunks.append("data: [DONE]\n\n")

    return {
        "statusCode": 200,
        "headers": {
            "Content-Type": "text/event-stream",
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "Transfer-Encoding": "chunked"
        },
        "body": "".join(chunks),
        "isBase64Encoded": False
    }

Exam-Relevant Points: - invoke_model_with_response_stream is the Bedrock streaming API - WebSocket API Gateway enables bidirectional real-time communication - Server-Sent Events (SSE) are simpler for unidirectional streaming - Chunked transfer encoding sends response incrementally - Lambda function URLs support response streaming natively - Time-to-first-token (TTFT) is the key streaming latency metric


Skill 2.4.3: Create Resilient FM Systems

User Story 19: Fault-Tolerant AI Service Handling 10K RPM

As a reliability engineer, I want an FM-powered service that gracefully handles Bedrock throttling, timeouts, and outages, So that end users experience <0.1% error rate even during degraded conditions.

Deep Dive Scenario

Resilience Patterns Architecture:

[Client Request]
    |
    v
[API Gateway: Rate Limiting]
    |--- Per-client throttling (1000 RPM)
    |--- Global throttling (10000 RPM)
    |--- Burst capacity for spikes
    |
    v
[Lambda: Resilient FM Client]
    |
    |--- [Primary: Bedrock us-east-1]
    |       |--- Exponential backoff (AWS SDK)
    |       |--- Timeout: 30 seconds
    |
    |--- [Fallback 1: Bedrock us-west-2] (on throttling/timeout)
    |       |--- Cross-region failover
    |
    |--- [Fallback 2: Cached/Simpler Response] (on all failures)
    |       |--- Semantic cache lookup
    |       |--- Graceful degradation message
    |
    v
[X-Ray Trace: End-to-end observability]

Exponential Backoff with AWS SDK:

from botocore.config import Config

# Configure retry behavior
bedrock_config = Config(
    retries={
        'max_attempts': 5,         # Total attempts including initial
        'mode': 'adaptive'          # Adaptive: adjusts based on error type
        # 'mode': 'standard'       # Standard: always uses exponential backoff
        # 'mode': 'legacy'         # Legacy: basic retry
    },
    connect_timeout=5,              # Connection establishment timeout
    read_timeout=60,                # Response read timeout
    max_pool_connections=25         # Connection pool size
)

bedrock_runtime = boto3.client('bedrock-runtime', config=bedrock_config)

"""
Adaptive mode retry behavior:
Attempt 1: Immediate
Attempt 2: Wait ~1s (base * 2^1)
Attempt 3: Wait ~2s (base * 2^2)
Attempt 4: Wait ~4s (base * 2^3)
Attempt 5: Wait ~8s (base * 2^4)

For ThrottlingException: SDK adds additional rate-based delay
For ServiceUnavailable: Standard exponential backoff
"""

Full Resilience Implementation:

import boto3
from botocore.config import Config
from aws_xray_sdk.core import xray_recorder
import json
import hashlib

# Primary and fallback clients
primary_client = boto3.client('bedrock-runtime', region_name='us-east-1', config=bedrock_config)
fallback_client = boto3.client('bedrock-runtime', region_name='us-west-2', config=bedrock_config)

class ResilientFMClient:
    """FM client with exponential backoff, fallback, and graceful degradation."""

    def __init__(self):
        self.cache = boto3.client('elasticache')  # Semantic cache

    @xray_recorder.capture("invoke_fm")
    def invoke(self, messages, model_id, max_tokens=1024):
        """Invoke FM with full resilience chain."""

        # Step 1: Check semantic cache
        cache_key = self._cache_key(messages)
        cached = self._check_cache(cache_key)
        if cached:
            xray_recorder.current_subsegment().put_annotation("cache_hit", True)
            return {"text": cached, "source": "cache"}

        # Step 2: Try primary region
        try:
            xray_recorder.current_subsegment().put_annotation("region", "us-east-1")
            result = self._invoke_bedrock(primary_client, model_id, messages, max_tokens)
            self._update_cache(cache_key, result["text"])
            return {**result, "source": "primary"}

        except (
            primary_client.exceptions.ThrottlingException,
            primary_client.exceptions.ServiceUnavailableException,
        ) as e:
            xray_recorder.current_subsegment().put_annotation("primary_error", str(type(e).__name__))

            # Step 3: Try fallback region
            try:
                xray_recorder.current_subsegment().put_annotation("region", "us-west-2")
                result = self._invoke_bedrock(fallback_client, model_id, messages, max_tokens)
                self._update_cache(cache_key, result["text"])
                return {**result, "source": "fallback_region"}

            except Exception as fallback_error:
                xray_recorder.current_subsegment().put_annotation("fallback_error", str(type(fallback_error).__name__))

                # Step 4: Graceful degradation
                return {
                    "text": "I'm experiencing high demand. Here's a brief response based on cached knowledge.",
                    "source": "degraded",
                    "degraded": True
                }

    def _invoke_bedrock(self, client, model_id, messages, max_tokens):
        """Core Bedrock invocation with proper error handling."""
        response = client.invoke_model(
            modelId=model_id,
            body=json.dumps({
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": max_tokens,
                "messages": messages
            })
        )
        result = json.loads(response['body'].read())
        return {
            "text": result["content"][0]["text"],
            "usage": result["usage"],
            "model": model_id
        }

    def _cache_key(self, messages):
        """Generate cache key from message content."""
        content = json.dumps(messages, sort_keys=True)
        return hashlib.sha256(content.encode()).hexdigest()

    def _check_cache(self, key):
        """Check Redis/ElastiCache for cached response."""
        try:
            return redis_client.get(f"fm:cache:{key}")
        except Exception:
            return None

    def _update_cache(self, key, response):
        """Cache response with 1-hour TTL."""
        try:
            redis_client.setex(f"fm:cache:{key}", 3600, response)
        except Exception:
            pass  # Cache miss is acceptable

# API Gateway rate limiting configuration
"""
API Gateway Usage Plan:
{
    "throttle": {
        "rateLimit": 100,        # Steady-state requests per second
        "burstLimit": 200         # Burst capacity
    },
    "quota": {
        "limit": 100000,          # Monthly request quota
        "period": "MONTH"
    }
}
"""

X-Ray Distributed Tracing:

# X-Ray provides end-to-end trace across:
# API Gateway -> Lambda -> Bedrock -> DynamoDB -> etc.

from aws_xray_sdk.core import xray_recorder, patch_all

patch_all()  # Auto-instrument boto3, requests, etc.

@xray_recorder.capture("process_request")
def handler(event, context):
    """X-Ray traces the full request lifecycle."""

    # Custom annotations for filtering traces
    xray_recorder.current_subsegment().put_annotation("model_id", "claude-sonnet")
    xray_recorder.current_subsegment().put_annotation("request_type", "chat")

    # Custom metadata for debugging
    xray_recorder.current_subsegment().put_metadata(
        "input_tokens", len(event["body"]),
        "fm_invocation"
    )

    # All downstream AWS SDK calls are automatically traced
    response = bedrock_runtime.invoke_model(...)

    xray_recorder.current_subsegment().put_annotation(
        "latency_ms",
        response["ResponseMetadata"]["HTTPHeaders"].get("x-amzn-bedrock-invocation-latency")
    )

    return response


Skill 2.4.4: Develop Intelligent Model Routing Systems

User Story 20: Smart Model Router for Multi-Purpose AI Platform

As a platform architect, I want intelligent routing that sends each request to the optimal FM based on content, complexity, and metrics, So that we maximize quality while minimizing cost and latency across 15 different AI features.

Deep Dive Scenario

Routing Architecture:

[Incoming Request]
    |
    v
[API Gateway: Request Transformation]
    |--- Extract routing metadata (content type, complexity hints, SLA)
    |
    v
[Routing Decision Engine]
    |
    |--- [Static Routing] (API Gateway path-based)
    |       /v1/moderate -> Haiku (always fast/cheap)
    |       /v1/summarize -> Sonnet (balanced)
    |       /v1/analyze -> Opus (best quality)
    |
    |--- [Dynamic Content-Based Routing] (Step Functions)
    |       Classify content -> Route to specialist FM
    |       Code -> Code-optimized model
    |       Legal -> Legal-fine-tuned model
    |       Math -> Math-specialized model
    |
    |--- [Metrics-Based Routing] (Lambda + CloudWatch)
    |       If model latency > threshold -> Route to alternate
    |       If error rate > threshold -> Route to fallback
    |       If cost > budget -> Downgrade to cheaper model
    |
    v
[Selected FM] -> [Response]

Static Routing via API Gateway:

# API Gateway routes different paths to different models
paths:
  /v1/moderate:
    post:
      x-amazon-apigateway-integration:
        type: aws_proxy
        uri: arn:aws:lambda:us-east-1:123:function:invoke-haiku
        httpMethod: POST
  /v1/summarize:
    post:
      x-amazon-apigateway-integration:
        type: aws_proxy
        uri: arn:aws:lambda:us-east-1:123:function:invoke-sonnet
        httpMethod: POST
  /v1/analyze:
    post:
      x-amazon-apigateway-integration:
        type: aws_proxy
        uri: arn:aws:lambda:us-east-1:123:function:invoke-opus
        httpMethod: POST

Dynamic Content-Based Routing via Step Functions:

{
  "StartAt": "ClassifyContent",
  "States": {
    "ClassifyContent": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123:function:ClassifyContent",
      "Comment": "Determine content type and complexity",
      "ResultPath": "$.classification",
      "Next": "RouteToSpecialist"
    },
    "RouteToSpecialist": {
      "Type": "Choice",
      "Choices": [
        {
          "And": [
            {"Variable": "$.classification.domain", "StringEquals": "code"},
            {"Variable": "$.classification.complexity", "StringEquals": "high"}
          ],
          "Next": "InvokeCodeSpecialist"
        },
        {
          "Variable": "$.classification.domain",
          "StringEquals": "legal",
          "Next": "InvokeLegalModel"
        },
        {
          "Variable": "$.classification.domain",
          "StringEquals": "math",
          "Next": "InvokeMathModel"
        },
        {
          "Variable": "$.classification.complexity",
          "StringEquals": "simple",
          "Next": "InvokeHaiku"
        }
      ],
      "Default": "InvokeSonnet"
    },
    "InvokeCodeSpecialist": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123:function:InvokeModel",
      "Parameters": {
        "model_id": "anthropic.claude-sonnet-4-20250514",
        "system_prompt": "You are an expert software engineer...",
        "request.$": "$.request"
      },
      "End": true
    },
    "InvokeLegalModel": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123:function:InvokeModel",
      "Parameters": {
        "model_id": "arn:aws:sagemaker:us-east-1:123:endpoint/legal-fine-tuned",
        "request.$": "$.request"
      },
      "End": true
    }
  }
}

Metrics-Based Intelligent Routing:

class MetricsBasedRouter:
    """Route requests based on real-time model performance metrics."""

    def __init__(self):
        self.cloudwatch = boto3.client('cloudwatch')
        self.models = {
            "primary": "anthropic.claude-sonnet-4-20250514",
            "fallback": "anthropic.claude-haiku-4-5-20251001",
            "premium": "anthropic.claude-opus-4-20250514"
        }

    def select_model(self, request):
        """Select model based on real-time metrics and request requirements."""

        # Get current metrics for each model
        metrics = self._get_model_metrics()

        # Rule 1: If primary model latency > SLA, use fallback
        if metrics["primary"]["p99_latency_ms"] > request.get("max_latency_ms", 5000):
            return self.models["fallback"]

        # Rule 2: If primary error rate > threshold, use fallback
        if metrics["primary"]["error_rate"] > 0.05:
            return self.models["fallback"]

        # Rule 3: If daily budget exceeded, downgrade
        if metrics["primary"]["daily_cost"] > request.get("daily_budget", 100):
            return self.models["fallback"]

        # Rule 4: If request explicitly requires high quality
        if request.get("quality_requirement") == "highest":
            return self.models["premium"]

        return self.models["primary"]

    def _get_model_metrics(self):
        """Fetch real-time metrics from CloudWatch."""
        metrics = {}
        for name, model_id in self.models.items():
            # Get p99 latency from last 5 minutes
            latency = self.cloudwatch.get_metric_statistics(
                Namespace="GenAI/ModelMetrics",
                MetricName="InvocationLatency",
                Dimensions=[{"Name": "ModelId", "Value": model_id}],
                StartTime=datetime.utcnow() - timedelta(minutes=5),
                EndTime=datetime.utcnow(),
                Period=300,
                ExtendedStatistics=["p99"]
            )

            error_rate = self.cloudwatch.get_metric_statistics(
                Namespace="GenAI/ModelMetrics",
                MetricName="ErrorRate",
                Dimensions=[{"Name": "ModelId", "Value": model_id}],
                StartTime=datetime.utcnow() - timedelta(minutes=5),
                EndTime=datetime.utcnow(),
                Period=300,
                Statistics=["Average"]
            )

            metrics[name] = {
                "p99_latency_ms": extract_stat(latency, "p99"),
                "error_rate": extract_stat(error_rate, "Average"),
                "daily_cost": get_daily_cost(model_id)
            }

        return metrics

API Gateway Request Transformation for Routing:

{
    "Comment": "API Gateway mapping template for model routing",
    "requestTemplates": {
        "application/json": {
            "#set($routingHint = $input.path('$.routing_hint'))",
            "#if($routingHint == 'fast')",
            "  #set($modelId = 'anthropic.claude-haiku-4-5-20251001')",
            "#elseif($routingHint == 'balanced')",
            "  #set($modelId = 'anthropic.claude-sonnet-4-20250514')",
            "#else",
            "  #set($modelId = 'anthropic.claude-sonnet-4-20250514')",
            "#end",
            "{\"model_id\": \"$modelId\", \"request\": $input.json('$')}"
        }
    }
}

Routing Strategy Comparison:

Strategy Implementation Latency Flexibility Best For
Static API Gateway paths ~1ms Low Known, fixed routing rules
Content-Based Step Functions ~100ms High Domain-specific model selection
Metrics-Based Lambda + CloudWatch ~50ms High Performance optimization
Cost-Based Application code ~10ms Medium Budget management
Hybrid Combination ~100ms Highest Production platforms

Exam-Relevant Points: - Static routing: API Gateway path-based, fastest, simplest - Dynamic routing: Step Functions Choice states for content-based decisions - Metrics-based routing: CloudWatch metrics drive real-time model selection - API Gateway request transformations add routing logic at the gateway level - Exponential backoff in AWS SDK: retries={'mode': 'adaptive'} for smart retry - Fallback mechanisms: cross-region failover, model downgrade, cached responses - X-Ray traces across API Gateway -> Lambda -> Bedrock for observability - Rate limiting at API Gateway prevents overwhelming FM services - Streaming uses invoke_model_with_response_stream + WebSocket or SSE - Async processing: SQS decouples request submission from FM processing