Skill 2.3.2 --- Microservice Integration and Webhook Handler Patterns
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Field | Value |
|---|---|
| Certification | AWS Certified AI Practitioner (AIP-C01) |
| Domain | 2 --- Development and Implementation of GenAI Applications |
| Task | 2.3 --- Describe methods to integrate foundation models into applications |
| Skill | 2.3.2 --- Develop integrated AI capabilities to enhance existing applications with GenAI functionality (API Gateway for microservice integrations, Lambda for webhook handlers, EventBridge for event-driven integrations) |
Mind Map --- Microservice, Webhook, and Event-Driven GenAI Integration
mindmap
root((Skill 2.3.2<br/>Microservice & Webhook<br/>FM Integration))
API Gateway Microservice Patterns
REST Endpoints for FM Invocation
Synchronous chat via Haiku fast-path
RAG search via OpenSearch + Sonnet
Product recommendation engine
29-second hard timeout ceiling
WebSocket Streaming
Token-by-token Bedrock streaming
Connection lifecycle management
Binary frame support for images
Integration Types
Lambda proxy --- most common for FM
HTTP proxy to ECS Fargate
AWS service direct --- Bedrock invoke
VPC Link for private microservices
Traffic Management
Usage plans and API key throttling
Stage variables for environment config
Custom domain with Route 53
WAF integration for abuse protection
Lambda Webhook Handlers
Inbound Webhook Processing
Stripe payment callbacks
Publisher catalog sync
Customer review submissions
Third-party manga API updates
Security Patterns
HMAC SHA-256 signature verification
Timestamp drift replay prevention
IP allowlist enforcement
Secret rotation via Secrets Manager
Idempotency Enforcement
DynamoDB conditional writes
TTL-based dedup window cleanup
Idempotency key extraction per source
Outbound Webhook Dispatch
AI-generated summaries to Slack
Order confirmations to CRM
Inventory alerts to suppliers
EventBridge Event-Driven Triggers
AI Event Schemas
MangaAssist.AI.InferenceCompleted
MangaAssist.AI.ModerationFlagged
MangaAssist.Recommendation.Generated
MangaAssist.Chat.MessageReceived
Routing Rules
Content-based detail-type matching
Input transformation for targets
Fan-out to analytics + action targets
Cross-account event forwarding
Reliability
Dead letter queues for failures
Event archive 30-day replay
Schema registry enforcement
Retry with exponential backoff
Sidecar Pattern for GenAI
Envoy Proxy with App Mesh
Traffic splitting for model A/B tests
Circuit breaker on Bedrock calls
Mutual TLS between services
Cloud Map Service Discovery
Health checks for AI endpoints
Weighted routing for gradual rollout
Observability Sidecar
X-Ray distributed tracing
Per-endpoint latency metrics
Structured access logging
Architecture Diagram --- Microservice Integration Topology
MangaAssist Microservice + Webhook + Event Integration Topology
========================================================================================
EXTERNAL SYSTEMS INTERNAL MICROSERVICES
================== ======================
[Stripe Payments] [Mobile / Web Client]
| |
| POST /webhooks/stripe | HTTPS / WSS
v v
+-------------------+ +--------------------+
| Lambda Function | | API Gateway |
| URL: Webhook | | (REST + WebSocket) |
| Handler | +--------+-----------+
+--------+----------+ |
| |
| EventBridge | Lambda Proxy / HTTP Proxy
v v
+-------------------+ +--------------------+
| EventBridge |<----- events --- | ECS Fargate |
| (AI Event Bus) | | Orchestrator |
+--------+----------+ | +----------------+ |
| | | App Container | |
+----------+---------+ | | - routing | |
| | | | | - sessions | |
v v v | | - prompts | |
+----------+ +----------+ +--------+ | +----------------+ |
| Lambda: | | Lambda: | | SQS: | | | Sidecar: | |
| Inference| | Content | | Batch | | | Envoy Proxy | |
| Post- | | Moder- | | Queue | | | - circuit break| |
| Processor| | ation | | | | | - mTLS | |
+----------+ +----------+ +--------+ | | - tracing | |
| +----------------+ |
[Publisher Catalog API] +--------+-----------+
| |
| POST /webhooks/catalog +---------+---------+
v | | |
+-------------------+ +------+--+ +----+----+ +--+-------+
| Lambda Function | | Bedrock | | Open- | | DynamoDB |
| URL: Catalog | | Claude 3| | Search | | Sessions |
| Sync Handler | | Sonnet/ | | Server- | | Products |
+-------------------+ | Haiku | | less | | Orders |
+---------+ +---------+ +----------+
[Customer Reviews]
| +---------------------+
| POST /webhooks/reviews | ElastiCache Redis |
v | - Response cache |
+-------------------+ | - Session cache |
| Lambda Function | | - Rate limit state |
| URL: Review | +---------------------+
| Sentiment Handler |
+-------------------+
========================================================================================
Integration Flows:
[Sync FM] Client -> API GW REST -> Lambda -> Fargate -> Bedrock -> Response (< 3s)
[Stream FM] Client -> API GW WS -> Lambda -> Fargate -> Bedrock Stream -> WS push
[Webhook In] External -> Lambda URL -> HMAC verify -> EventBridge -> Target Lambda
[Event Fan] EventBridge rule -> [Lambda A, Lambda B, SQS C] (parallel fan-out)
[Sidecar] Fargate App -> Envoy Sidecar -> Bedrock (circuit-broken, traced)
1. API Gateway Microservice Integration Patterns for FM
1.1 REST API Direct-to-Bedrock Integration
API Gateway can invoke Bedrock directly without Lambda using AWS service integration. This eliminates cold-start latency but trades off request/response transformation flexibility.
Pattern: API Gateway -> AWS Service Integration -> Bedrock InvokeModel
Pros:
- Zero Lambda cold start
- Lower cost (no Lambda execution charges)
- Simpler architecture
Cons:
- Limited request transformation (VTL templates only)
- No complex orchestration logic
- No session state management
- 29-second timeout still applies
When to use:
- Simple single-turn inference (no session context)
- Internal microservice-to-FM calls where orchestration lives elsewhere
- Cost-optimized batch scenarios
1.2 Lambda Proxy Integration for FM (Primary Pattern)
The dominant pattern for MangaAssist: API Gateway delegates to Lambda, which orchestrates session lookup, prompt assembly, Bedrock invocation, and response formatting.
Flow: Client -> API GW -> Lambda -> [Redis cache check] -> [DynamoDB session] -> Bedrock -> Response
Key design decisions:
- Lambda timeout = 29s (matches API GW hard limit)
- Memory = 1024 MB (more memory = more CPU = faster Bedrock SDK calls)
- Provisioned concurrency for /chat endpoint (latency-critical)
- X-Ray tracing enabled for end-to-end visibility
1.3 HTTP Proxy to ECS Fargate
For long-running or stateful operations where Lambda's execution model is too restrictive, API Gateway routes through a VPC Link to ECS Fargate.
Flow: Client -> API GW -> VPC Link -> NLB -> ECS Fargate -> Bedrock
When to use:
- WebSocket connection management (persistent connections)
- Complex multi-step reasoning that may exceed 29s
- Stateful conversation management with in-memory context
- Connection pooling to downstream services
1.4 Request Mapping Templates for FM Payloads
API Gateway VTL (Velocity Template Language) templates can transform client requests into Bedrock-compatible payloads without Lambda.
## Example: VTL mapping template transforming a simple chat request
## into a Claude 3 Messages API payload
#set($body = $util.parseJson($input.body))
{
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1024,
"messages": [
{
"role": "user",
"content": "$util.escapeJavaScript($body.message)"
}
],
"temperature": 0.3
}
2. Lambda Webhook Handlers for External System Callbacks
2.1 Webhook Handler Architecture
Webhooks are HTTP callbacks from external systems that trigger FM processing. MangaAssist receives webhooks from Stripe (payments), publisher APIs (catalog), and review platforms (customer feedback).
Webhook Security Layers:
========================
Layer 1: Network -> WAF rules, IP allowlist (CloudFront or API GW)
Layer 2: Authentication -> HMAC signature verification (per-source secret)
Layer 3: Freshness -> Timestamp validation (5-minute drift window)
Layer 4: Deduplication -> Idempotency key (DynamoDB conditional write)
Layer 5: Authorization -> Source-specific payload validation
2.2 Lambda Function URL vs API Gateway for Webhooks
+-----------------------------+--------------------+---------------------+
| Criterion | Lambda Function URL| API Gateway |
+-----------------------------+--------------------+---------------------+
| Setup complexity | Minimal | Moderate |
| Cost | Free (Lambda only) | Per-request charge |
| Custom domain | Not natively | Yes (custom domain) |
| WAF integration | Via CloudFront | Native |
| Request validation | In Lambda code | Request validators |
| Throttling | Reserved concur. | Usage plans + keys |
| Multiple routes | Single endpoint | Multiple resources |
| Best for | Single-source hook | Multi-source hooks |
+-----------------------------+--------------------+---------------------+
MangaAssist decision: Lambda Function URL per webhook source
- Stripe -> https://stripe-hook.lambda-url.ap-northeast-1.on.aws/
- Reviews -> https://review-hook.lambda-url.ap-northeast-1.on.aws/
- Catalog -> https://catalog-hook.lambda-url.ap-northeast-1.on.aws/
2.3 Provisioned Concurrency for Latency-Sensitive Webhooks
Stripe webhooks have a 30-second timeout and retry aggressively. Cold starts on webhook handlers can cause failures.
Configuration:
- Provisioned concurrency = 5 (handles burst without cold start)
- Reserved concurrency = 50 (prevents runaway invocations)
- Dead letter queue = SQS (captures failed webhook processing)
- Timeout = 30s (match Stripe's retry window)
Cost: 5 provisioned * $0.000004646/GB-sec * 512MB * 3600s/hr * 24hr = ~$0.32/day
3. EventBridge Event-Driven FM Trigger Patterns
3.1 Event Schema Design for FM Integration
Well-designed event schemas ensure loose coupling between producers and FM consumers.
MangaAssist Event Schema Registry:
===================================
Source: mangaassist.chat
DetailType: MessageReceived
Detail:
session_id: string
user_id: string
message: string (max 4000 chars)
language: string (ja | en)
channel: string (web | mobile | api)
Source: mangaassist.ai
DetailType: InferenceCompleted
Detail:
session_id: string
model_id: string
status: string (success | error | timeout)
metrics:
input_tokens: integer
output_tokens: integer
latency_ms: integer
estimated_cost_usd: string
response_summary: string (max 500 chars)
Source: mangaassist.ai
DetailType: ModerationFlagged
Detail:
session_id: string
content_hash: string (SHA-256)
content_preview: string (max 200 chars)
severity: string (low | medium | high | critical)
flagged_categories: string[]
model_id: string
3.2 Event Routing Patterns
Pattern 1: Content-Based Routing
---------------------------------
Rule: Route only high/critical moderation events to the moderation handler
EventPattern:
source: ["mangaassist.ai"]
detail-type: ["ModerationFlagged"]
detail:
severity: ["high", "critical"]
Target: Lambda moderation handler
Pattern 2: Fan-Out to Multiple Consumers
------------------------------------------
Rule: Every InferenceCompleted event goes to BOTH post-processor AND analytics
EventPattern:
source: ["mangaassist.ai"]
detail-type: ["InferenceCompleted"]
Targets:
- Lambda inference post-processor (primary action)
- Lambda analytics processor (metrics + logging)
Pattern 3: Input Transformation
---------------------------------
Rule: Transform catalog events before sending to embedding regeneration
InputTransformer:
InputPathsMap:
product_id: "$.detail.product_id"
title: "$.detail.title"
InputTemplate: '{"action": "regenerate", "product_id": "<product_id>", "title": "<title>"}'
Target: Lambda embedding handler (receives simplified payload)
Pattern 4: Scheduled FM Invocation
------------------------------------
Rule: Every 6 hours, trigger batch recommendation regeneration
ScheduleExpression: rate(6 hours)
Target: Step Functions workflow (iterates all active customers)
3.3 Preventing Infinite Event Loops
A critical concern with event-driven FM triggers: if an FM consumer publishes events that re-trigger the same consumer.
Anti-Loop Strategies:
======================
1. Source Differentiation
- Producer source: "mangaassist.chat"
- Consumer source: "mangaassist.ai"
- Rule only matches "mangaassist.chat" -> never re-triggers
2. Event Metadata Guards
- Include "processing_stage" in detail
- Rule filters: detail.processing_stage: ["raw"] (not "processed")
- Consumer publishes with processing_stage: "processed"
3. Circuit Breaker via DynamoDB
- Track event chain depth per session_id
- Reject processing if chain_depth > 3
- TTL-based cleanup after 1 hour
4. EventBridge Rule Exclusion
- Rule pattern with $or / prefix matching
- Explicitly exclude events from AI consumers
4. Sidecar Pattern for Adding GenAI to Existing Microservices
4.1 Envoy Sidecar Proxy with AWS App Mesh
The sidecar pattern adds FM capabilities to existing microservices without modifying application code. An Envoy proxy runs alongside the application container in the same ECS task.
ECS Fargate Task Definition:
=============================
Task: MangaAssist-Orchestrator
Container 1: app (application container)
- Port 8080
- Business logic, routing, session management
- Calls localhost:9080 for AI operations
Container 2: envoy-sidecar (App Mesh proxy)
- Port 9080 (inbound from app)
- Port 15000 (admin interface)
- Routes to Bedrock endpoint via service mesh
- Circuit breaker: 5 consecutive failures -> open
- Retry policy: 2 retries, 1s/2s backoff
- Timeout: 10s per request
Container 3: xray-daemon (observability sidecar)
- Port 2000 (UDP)
- Collects traces from both app and envoy
- Forwards to X-Ray service
4.2 Circuit Breaker Configuration for Bedrock Calls
App Mesh Circuit Breaker Settings:
===================================
Outlier Detection:
consecutive_5xx: 5 # 5 failures -> eject endpoint
interval: 10s # check every 10 seconds
base_ejection_time: 30s # ejected for 30 seconds minimum
max_ejection_percent: 50 # never eject more than 50% of endpoints
Connection Pool:
max_connections: 100 # max concurrent connections to Bedrock
max_pending_requests: 50 # max queued requests
max_requests: 200 # max active requests
Timeout Policy:
per_request_timeout: 10s # individual request timeout
idle_timeout: 60s # close idle connections after 60s
Retry Policy:
max_retries: 2
retry_on: ["5xx", "gateway-error", "reset", "connect-failure"]
per_retry_timeout: 5s
4.3 Traffic Splitting for Model A/B Testing
App Mesh Virtual Router:
=========================
Route: /v1/chat
Match: POST /v1/chat
Action:
WeightedTargets:
- VirtualNode: chat-service-sonnet weight: 20 # 20% traffic to Sonnet
- VirtualNode: chat-service-haiku weight: 80 # 80% traffic to Haiku
Route: /v1/search
Match: POST /v1/search
Action:
WeightedTargets:
- VirtualNode: search-service-v2 weight: 10 # 10% canary
- VirtualNode: search-service-v1 weight: 90 # 90% stable
Monitoring:
- CloudWatch metrics per virtual node
- Compare latency, error rate, user satisfaction
- Automated rollback if error rate > 5%
5. Production Code
5.1 WebhookFMHandler --- Webhook-to-FM Bridge with Full Security
"""
WebhookFMHandler: Receives external webhooks, validates security,
and triggers Foundation Model processing via EventBridge.
Deployed as Lambda Function URL for each webhook source.
"""
import json
import hashlib
import hmac
import time
import os
import logging
from datetime import datetime, timezone
from typing import Any, Optional
import boto3
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# --- AWS Clients ---
dynamodb = boto3.resource("dynamodb")
eventbridge = boto3.client("events")
secrets_client = boto3.client("secretsmanager")
bedrock_runtime = boto3.client("bedrock-runtime")
# --- Configuration ---
IDEMPOTENCY_TABLE_NAME = os.environ.get("IDEMPOTENCY_TABLE", "MangaAssist-Webhook-Idempotency")
EVENT_BUS_NAME = os.environ.get("EVENT_BUS_NAME", "MangaAssist-AI-Events")
WEBHOOK_SECRET_ARN = os.environ.get("WEBHOOK_SECRET_ARN", "")
MAX_TIMESTAMP_DRIFT = int(os.environ.get("MAX_TIMESTAMP_DRIFT", "300"))
IDEMPOTENCY_TTL_HOURS = int(os.environ.get("IDEMPOTENCY_TTL_HOURS", "24"))
FM_MODEL_ID = os.environ.get("FM_MODEL_ID", "anthropic.claude-3-haiku-20240307-v1:0")
idempotency_table = dynamodb.Table(IDEMPOTENCY_TABLE_NAME)
# Cached secret to avoid repeated Secrets Manager calls within same invocation
_cached_secrets: Optional[dict] = None
# ============================================================
# Security: HMAC Verification + Replay Prevention
# ============================================================
def _load_webhook_secrets() -> dict:
"""
Load webhook signing secrets from Secrets Manager.
Cached for the lifetime of the Lambda execution environment.
Secret format: {"stripe": "whsec_...", "catalog": "cat_sec_...", "reviews": "rev_..."}
"""
global _cached_secrets
if _cached_secrets is not None:
return _cached_secrets
response = secrets_client.get_secret_value(SecretId=WEBHOOK_SECRET_ARN)
_cached_secrets = json.loads(response["SecretString"])
return _cached_secrets
def verify_webhook_signature(
raw_body: str,
signature_header: str,
source: str,
algorithm: str = "sha256",
) -> bool:
"""
Verify HMAC signature against the raw request body.
Uses constant-time comparison to prevent timing attacks.
"""
secrets = _load_webhook_secrets()
secret = secrets.get(source)
if not secret:
logger.error("No webhook secret configured for source: %s", source)
return False
hash_func = hashlib.sha256 if algorithm == "sha256" else hashlib.sha1
# Some providers prefix signatures with algorithm identifier
# e.g., Stripe: "t=timestamp,v1=signature"
# Generic: just the hex digest
sig_value = signature_header
if "=" in sig_value and "," in sig_value:
# Stripe-style: extract v1 signature
parts = {k: v for k, v in (p.split("=", 1) for p in sig_value.split(","))}
sig_value = parts.get("v1", sig_value)
expected = hmac.new(
key=secret.encode("utf-8"),
msg=raw_body.encode("utf-8"),
digestmod=hash_func,
).hexdigest()
return hmac.compare_digest(expected, sig_value)
def verify_timestamp_freshness(timestamp_value: str) -> bool:
"""
Verify webhook timestamp is within the acceptable drift window.
Prevents replay attacks using captured webhook deliveries.
"""
try:
ts = int(timestamp_value)
except (ValueError, TypeError):
logger.warning("Invalid timestamp value: %s", timestamp_value)
return False
drift = abs(int(time.time()) - ts)
if drift > MAX_TIMESTAMP_DRIFT:
logger.warning("Timestamp drift %ds exceeds max %ds", drift, MAX_TIMESTAMP_DRIFT)
return False
return True
# ============================================================
# Idempotency: DynamoDB Conditional Write Pattern
# ============================================================
def claim_idempotency_key(key: str) -> bool:
"""
Attempt to claim an idempotency key via DynamoDB conditional put.
Returns True if claimed (first time), False if already exists (duplicate).
"""
ttl_epoch = int(time.time()) + (IDEMPOTENCY_TTL_HOURS * 3600)
try:
idempotency_table.put_item(
Item={
"idempotency_key": key,
"claimed_at": datetime.now(timezone.utc).isoformat(),
"status": "processing",
"ttl": ttl_epoch,
},
ConditionExpression="attribute_not_exists(idempotency_key)",
)
return True
except ClientError as e:
if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
logger.info("Duplicate webhook: %s", key)
return False
raise
def complete_idempotency_key(key: str, result: dict) -> None:
"""Mark processing as completed with result summary."""
idempotency_table.update_item(
Key={"idempotency_key": key},
UpdateExpression="SET #s = :done, completed_at = :ts, result_summary = :r",
ExpressionAttributeNames={"#s": "status"},
ExpressionAttributeValues={
":done": "completed",
":ts": datetime.now(timezone.utc).isoformat(),
":r": json.dumps(result, default=str)[:1000],
},
)
def release_idempotency_key(key: str) -> None:
"""Delete key on failure so the webhook can be retried."""
idempotency_table.delete_item(Key={"idempotency_key": key})
# ============================================================
# FM Integration: Quick Sentiment/Summary via Bedrock
# ============================================================
def invoke_fm_for_webhook(prompt: str, max_tokens: int = 512) -> str:
"""
Invoke Bedrock Claude 3 for webhook-triggered FM processing.
Used for real-time sentiment analysis, content moderation, summaries.
"""
response = bedrock_runtime.invoke_model(
modelId=FM_MODEL_ID,
contentType="application/json",
accept="application/json",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": max_tokens,
"temperature": 0.1,
"messages": [{"role": "user", "content": prompt}],
}),
)
result = json.loads(response["body"].read())
return result.get("content", [{}])[0].get("text", "")
# ============================================================
# Webhook Source Processors
# ============================================================
def process_payment_webhook(payload: dict) -> dict:
"""Process Stripe payment completed -> generate AI order confirmation."""
event_type = payload.get("type", "")
data = payload.get("data", {}).get("object", {})
if event_type != "checkout.session.completed":
return {"processed": False, "reason": f"unhandled_type: {event_type}"}
order_id = data.get("metadata", {}).get("order_id", "unknown")
items_json = data.get("metadata", {}).get("items", "[]")
amount = data.get("amount_total", 0) / 100
currency = data.get("currency", "jpy").upper()
# Publish to EventBridge for async AI confirmation generation
eventbridge.put_events(Entries=[{
"Source": "mangaassist.payments",
"DetailType": "PaymentCompleted",
"Detail": json.dumps({
"order_id": order_id,
"customer_email": data.get("customer_email", ""),
"amount": str(amount),
"currency": currency,
"items": items_json,
"action": "generate_confirmation",
"processing_stage": "raw",
}),
"EventBusName": EVENT_BUS_NAME,
}])
logger.info("Payment webhook -> EventBridge: order=%s amount=%s%s", order_id, amount, currency)
return {"processed": True, "order_id": order_id}
def process_review_webhook(payload: dict) -> dict:
"""Process customer review -> real-time sentiment + publish for moderation."""
review_id = payload.get("review_id", "unknown")
review_text = payload.get("text", "")
product_id = payload.get("product_id", "")
if not review_text or not product_id:
return {"processed": False, "reason": "missing_required_fields"}
# Real-time sentiment analysis via Haiku (fast, cheap)
sentiment_prompt = (
f"Analyze the sentiment of this manga product review. "
f"Reply with exactly one word: positive, negative, or neutral.\n\n"
f"Review: {review_text[:1000]}"
)
sentiment = invoke_fm_for_webhook(sentiment_prompt, max_tokens=10).strip().lower()
# Publish enriched event to EventBridge
eventbridge.put_events(Entries=[{
"Source": "mangaassist.reviews",
"DetailType": "ReviewSubmitted",
"Detail": json.dumps({
"review_id": review_id,
"product_id": product_id,
"review_text": review_text[:2000],
"rating": payload.get("rating", 0),
"customer_id": payload.get("customer_id", ""),
"ai_sentiment": sentiment,
"actions": ["content_moderation", "response_generation"],
"processing_stage": "raw",
}),
"EventBusName": EVENT_BUS_NAME,
}])
logger.info("Review webhook -> sentiment=%s review_id=%s", sentiment, review_id)
return {"processed": True, "review_id": review_id, "sentiment": sentiment}
def process_catalog_webhook(payload: dict) -> dict:
"""Process publisher catalog update -> trigger embedding regeneration."""
update_type = payload.get("type", "")
items = payload.get("items", [])
if not items:
return {"processed": False, "reason": "empty_items"}
# Batch publish (EventBridge max 10 entries per PutEvents call)
entries = []
for item in items[:10]:
entries.append({
"Source": "mangaassist.catalog",
"DetailType": f"CatalogUpdate.{update_type}",
"Detail": json.dumps({
"product_id": item.get("id", ""),
"title": item.get("title", ""),
"update_type": update_type,
"data": item,
"actions": ["regenerate_embedding", "update_search_index"],
"processing_stage": "raw",
}),
"EventBusName": EVENT_BUS_NAME,
})
response = eventbridge.put_events(Entries=entries)
failed = response.get("FailedEntryCount", 0)
if failed:
logger.error("Failed to publish %d/%d catalog events", failed, len(entries))
return {
"processed": True,
"update_type": update_type,
"published": len(entries) - failed,
"failed": failed,
}
# ============================================================
# Source Configuration Table
# ============================================================
SOURCE_CONFIG = {
"stripe": {
"handler": process_payment_webhook,
"sig_header": "stripe-signature",
"ts_header": None,
"algorithm": "sha256",
"id_field": "id",
},
"reviews": {
"handler": process_review_webhook,
"sig_header": "x-webhook-signature",
"ts_header": "x-webhook-timestamp",
"algorithm": "sha256",
"id_field": "review_id",
},
"catalog": {
"handler": process_catalog_webhook,
"sig_header": "x-catalog-signature",
"ts_header": "x-catalog-timestamp",
"algorithm": "sha256",
"id_field": "batch_id",
},
}
# ============================================================
# Lambda Entry Point
# ============================================================
def handler(event: dict, context: Any) -> dict:
"""
WebhookFMHandler Lambda entry point.
Deployed as Lambda Function URL. Path suffix determines source.
POST https://<fn-url>.lambda-url.ap-northeast-1.on.aws/{source}
"""
path = event.get("rawPath", event.get("path", ""))
source = path.strip("/").split("/")[-1] if path else ""
headers = {k.lower(): v for k, v in event.get("headers", {}).items()}
raw_body = event.get("body", "")
logger.info("Webhook received: source=%s request_id=%s", source, context.aws_request_id)
# --- Validate source ---
if source not in SOURCE_CONFIG:
return _resp(404, {"error": f"Unknown source: {source}"})
cfg = SOURCE_CONFIG[source]
# --- Step 1: HMAC signature verification ---
signature = headers.get(cfg["sig_header"], "")
if not signature:
return _resp(401, {"error": "Missing signature header"})
if not verify_webhook_signature(raw_body, signature, source, cfg["algorithm"]):
return _resp(403, {"error": "Invalid signature"})
# --- Step 2: Timestamp freshness (replay prevention) ---
if cfg["ts_header"]:
ts_val = headers.get(cfg["ts_header"], "")
if not verify_timestamp_freshness(ts_val):
return _resp(403, {"error": "Stale timestamp"})
# --- Step 3: Parse payload ---
try:
payload = json.loads(raw_body) if isinstance(raw_body, str) else raw_body
except json.JSONDecodeError:
return _resp(400, {"error": "Invalid JSON"})
# --- Step 4: Idempotency ---
id_value = payload.get(cfg["id_field"], context.aws_request_id)
idem_key = f"{source}:{id_value}"
if not claim_idempotency_key(idem_key):
return _resp(200, {"status": "already_processed", "key": idem_key})
# --- Step 5: Process ---
try:
result = cfg["handler"](payload)
complete_idempotency_key(idem_key, result)
return _resp(200, {"status": "processed", **result})
except Exception as e:
release_idempotency_key(idem_key)
logger.exception("Processing failed: %s", e)
return _resp(500, {"error": "Processing failed"})
def _resp(code: int, body: dict) -> dict:
return {
"statusCode": code,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(body, default=str),
}
5.2 MicroserviceFMProxy --- Sidecar-Aware FM Client for ECS Fargate
"""
MicroserviceFMProxy: A production FM client for ECS Fargate microservices.
Integrates with the Envoy sidecar for circuit breaking, retries, and tracing.
Provides caching, model routing, and cost tracking.
"""
import json
import time
import hashlib
import os
import logging
from typing import Optional
from dataclasses import dataclass, field
import boto3
import redis
from botocore.config import Config as BotoConfig
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# --- Configuration ---
REDIS_ENDPOINT = os.environ.get("ELASTICACHE_ENDPOINT", "localhost")
REDIS_PORT = int(os.environ.get("ELASTICACHE_PORT", "6379"))
EVENT_BUS_NAME = os.environ.get("EVENT_BUS_NAME", "MangaAssist-AI-Events")
CACHE_TTL_SECONDS = int(os.environ.get("FM_CACHE_TTL", "300"))
# Bedrock client with retry and timeout config
bedrock_config = BotoConfig(
region_name="ap-northeast-1",
retries={"max_attempts": 2, "mode": "adaptive"},
read_timeout=15,
connect_timeout=5,
)
bedrock_runtime = boto3.client("bedrock-runtime", config=bedrock_config)
eventbridge_client = boto3.client("events")
# Redis connection pool (shared across requests in the same container)
redis_pool = redis.ConnectionPool(
host=REDIS_ENDPOINT,
port=REDIS_PORT,
max_connections=20,
socket_timeout=2,
socket_connect_timeout=1,
retry_on_timeout=True,
decode_responses=True,
)
@dataclass
class FMResponse:
"""Structured response from Foundation Model invocation."""
text: str
model_id: str
input_tokens: int
output_tokens: int
latency_ms: int
cached: bool = False
estimated_cost_usd: float = 0.0
stop_reason: str = ""
@dataclass
class ModelConfig:
"""Configuration for a specific model variant."""
model_id: str
max_tokens: int = 1024
temperature: float = 0.3
top_p: float = 0.9
input_cost_per_1m: float = 0.25 # Haiku default
output_cost_per_1m: float = 1.25 # Haiku default
# Pre-defined model configurations
MODELS = {
"haiku": ModelConfig(
model_id="anthropic.claude-3-haiku-20240307-v1:0",
max_tokens=1024,
temperature=0.3,
input_cost_per_1m=0.25,
output_cost_per_1m=1.25,
),
"sonnet": ModelConfig(
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
max_tokens=2048,
temperature=0.5,
input_cost_per_1m=3.0,
output_cost_per_1m=15.0,
),
}
class MicroserviceFMProxy:
"""
FM proxy for ECS Fargate microservices.
Features:
- Response caching via ElastiCache Redis
- Automatic model selection (Haiku for simple, Sonnet for complex)
- Cost tracking and budget enforcement
- EventBridge event publishing for analytics
- Circuit breaker awareness (respects Envoy sidecar signals)
"""
def __init__(self, default_model: str = "haiku"):
self.redis_client = redis.Redis(connection_pool=redis_pool)
self.default_model = default_model
self._request_count = 0
self._total_cost = 0.0
# -------------------------------------------------------
# Public: Invoke FM with caching and routing
# -------------------------------------------------------
def invoke(
self,
prompt: str,
model: str = "",
system_prompt: str = "",
session_id: str = "",
skip_cache: bool = False,
max_tokens: Optional[int] = None,
temperature: Optional[float] = None,
) -> FMResponse:
"""
Invoke Foundation Model with automatic caching, routing, and tracking.
Args:
prompt: User message or prompt text.
model: Model key ("haiku" or "sonnet"). Defaults to self.default_model.
system_prompt: Optional system prompt for context.
session_id: Session ID for event correlation.
skip_cache: Bypass cache for this request.
max_tokens: Override default max tokens.
temperature: Override default temperature.
Returns:
FMResponse with text, metrics, and cost.
"""
model_key = model or self.default_model
config = MODELS.get(model_key, MODELS["haiku"])
effective_max_tokens = max_tokens or config.max_tokens
effective_temperature = temperature if temperature is not None else config.temperature
# --- Cache check ---
cache_key = None
if not skip_cache:
cache_key = self._build_cache_key(prompt, system_prompt, config.model_id)
cached = self._get_from_cache(cache_key)
if cached:
logger.info("Cache hit: model=%s key=%s", model_key, cache_key[:16])
return cached
# --- Invoke Bedrock ---
start_time = time.monotonic()
messages = [{"role": "user", "content": prompt}]
body = {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": effective_max_tokens,
"temperature": effective_temperature,
"top_p": config.top_p,
"messages": messages,
}
if system_prompt:
body["system"] = system_prompt
try:
response = bedrock_runtime.invoke_model(
modelId=config.model_id,
contentType="application/json",
accept="application/json",
body=json.dumps(body),
)
except ClientError as e:
error_code = e.response["Error"]["Code"]
logger.error("Bedrock invocation failed: %s - %s", error_code, str(e))
if error_code == "ThrottlingException":
# Signal circuit breaker: this request should be retried
raise
raise
elapsed_ms = int((time.monotonic() - start_time) * 1000)
# --- Parse response ---
result_body = json.loads(response["body"].read())
text = result_body.get("content", [{}])[0].get("text", "")
usage = result_body.get("usage", {})
input_tokens = usage.get("input_tokens", 0)
output_tokens = usage.get("output_tokens", 0)
stop_reason = result_body.get("stop_reason", "")
cost = (
(input_tokens * config.input_cost_per_1m / 1_000_000)
+ (output_tokens * config.output_cost_per_1m / 1_000_000)
)
fm_response = FMResponse(
text=text,
model_id=config.model_id,
input_tokens=input_tokens,
output_tokens=output_tokens,
latency_ms=elapsed_ms,
cached=False,
estimated_cost_usd=cost,
stop_reason=stop_reason,
)
# --- Cache response ---
if cache_key and stop_reason == "end_turn":
self._put_to_cache(cache_key, fm_response)
# --- Publish metrics event ---
self._publish_inference_event(session_id, fm_response)
# --- Track cost ---
self._request_count += 1
self._total_cost += cost
logger.info(
"FM invoked: model=%s tokens=%d/%d latency=%dms cost=$%.6f",
model_key, input_tokens, output_tokens, elapsed_ms, cost,
)
return fm_response
def select_model_for_complexity(self, prompt: str) -> str:
"""
Auto-select model based on prompt complexity heuristics.
Haiku for simple queries, Sonnet for complex reasoning.
"""
word_count = len(prompt.split())
has_analysis_keywords = any(
kw in prompt.lower()
for kw in ["analyze", "compare", "explain why", "summarize", "recommend based on"]
)
if word_count > 200 or has_analysis_keywords:
return "sonnet"
return "haiku"
def get_stats(self) -> dict:
"""Return runtime statistics for this proxy instance."""
return {
"total_requests": self._request_count,
"total_cost_usd": round(self._total_cost, 6),
"avg_cost_per_request": round(self._total_cost / max(self._request_count, 1), 6),
}
# -------------------------------------------------------
# Private: Cache operations
# -------------------------------------------------------
def _build_cache_key(self, prompt: str, system_prompt: str, model_id: str) -> str:
content = f"{model_id}:{system_prompt}:{prompt}"
return f"fm:cache:{hashlib.sha256(content.encode()).hexdigest()}"
def _get_from_cache(self, key: str) -> Optional[FMResponse]:
try:
data = self.redis_client.get(key)
if data:
obj = json.loads(data)
return FMResponse(cached=True, **obj)
except (redis.RedisError, json.JSONDecodeError) as e:
logger.warning("Cache read failed: %s", e)
return None
def _put_to_cache(self, key: str, response: FMResponse) -> None:
try:
data = {
"text": response.text,
"model_id": response.model_id,
"input_tokens": response.input_tokens,
"output_tokens": response.output_tokens,
"latency_ms": response.latency_ms,
"estimated_cost_usd": response.estimated_cost_usd,
"stop_reason": response.stop_reason,
}
self.redis_client.setex(key, CACHE_TTL_SECONDS, json.dumps(data))
except redis.RedisError as e:
logger.warning("Cache write failed: %s", e)
# -------------------------------------------------------
# Private: Event publishing
# -------------------------------------------------------
def _publish_inference_event(self, session_id: str, response: FMResponse) -> None:
try:
eventbridge_client.put_events(Entries=[{
"Source": "mangaassist.ai",
"DetailType": "InferenceCompleted",
"Detail": json.dumps({
"session_id": session_id,
"model_id": response.model_id,
"status": "success",
"metrics": {
"input_tokens": response.input_tokens,
"output_tokens": response.output_tokens,
"latency_ms": response.latency_ms,
"estimated_cost_usd": response.estimated_cost_usd,
},
"stop_reason": response.stop_reason,
"processing_stage": "completed",
}),
"EventBusName": EVENT_BUS_NAME,
}])
except Exception as e:
logger.warning("Event publish failed (non-fatal): %s", e)
5.3 EventDrivenFMTrigger --- EventBridge Consumer that Invokes FM
"""
EventDrivenFMTrigger: Lambda function triggered by EventBridge rules.
Consumes events and invokes Foundation Models for asynchronous AI processing.
Handles inference post-processing, content moderation, and confirmation generation.
"""
import json
import os
import time
import logging
from datetime import datetime, timezone
from typing import Any
import boto3
from botocore.config import Config as BotoConfig
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# --- AWS Clients ---
bedrock_config = BotoConfig(
retries={"max_attempts": 3, "mode": "adaptive"},
read_timeout=30,
connect_timeout=5,
)
bedrock_runtime = boto3.client("bedrock-runtime", config=bedrock_config)
dynamodb = boto3.resource("dynamodb")
eventbridge = boto3.client("events")
ses_client = boto3.client("ses")
# --- Configuration ---
SESSIONS_TABLE = os.environ.get("DYNAMODB_SESSIONS_TABLE", "MangaAssist-Sessions")
PRODUCTS_TABLE = os.environ.get("DYNAMODB_PRODUCTS_TABLE", "MangaAssist-Products")
EVENT_BUS_NAME = os.environ.get("EVENT_BUS_NAME", "MangaAssist-AI-Events")
SENDER_EMAIL = os.environ.get("SENDER_EMAIL", "noreply@mangaassist.example.com")
SONNET_MODEL = "anthropic.claude-3-sonnet-20240229-v1:0"
HAIKU_MODEL = "anthropic.claude-3-haiku-20240307-v1:0"
MAX_EVENT_CHAIN_DEPTH = int(os.environ.get("MAX_CHAIN_DEPTH", "3"))
sessions_table = dynamodb.Table(SESSIONS_TABLE)
products_table = dynamodb.Table(PRODUCTS_TABLE)
# ============================================================
# Anti-Loop: Event Chain Depth Tracking
# ============================================================
def check_chain_depth(event_detail: dict) -> int:
"""
Check and increment event processing chain depth.
Prevents infinite loops where FM output triggers new events
that re-invoke FM processing.
Returns current depth. Raises ValueError if max depth exceeded.
"""
depth = event_detail.get("_chain_depth", 0)
if depth >= MAX_EVENT_CHAIN_DEPTH:
raise ValueError(
f"Event chain depth {depth} exceeds maximum {MAX_EVENT_CHAIN_DEPTH}. "
f"Possible infinite loop detected."
)
return depth + 1
def add_chain_metadata(detail: dict, current_depth: int, source_event_id: str) -> dict:
"""Add chain tracking metadata to outbound event detail."""
detail["_chain_depth"] = current_depth
detail["_source_event_id"] = source_event_id
detail["_processed_at"] = datetime.now(timezone.utc).isoformat()
detail["processing_stage"] = "processed"
return detail
# ============================================================
# FM Invocation Helpers
# ============================================================
def invoke_claude(
prompt: str,
model_id: str = HAIKU_MODEL,
system_prompt: str = "",
max_tokens: int = 1024,
temperature: float = 0.3,
) -> dict:
"""
Invoke Claude 3 via Bedrock and return structured result.
Used for all event-driven FM processing.
"""
start = time.monotonic()
body = {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": max_tokens,
"temperature": temperature,
"messages": [{"role": "user", "content": prompt}],
}
if system_prompt:
body["system"] = system_prompt
response = bedrock_runtime.invoke_model(
modelId=model_id,
contentType="application/json",
accept="application/json",
body=json.dumps(body),
)
result = json.loads(response["body"].read())
elapsed_ms = int((time.monotonic() - start) * 1000)
usage = result.get("usage", {})
return {
"text": result.get("content", [{}])[0].get("text", ""),
"input_tokens": usage.get("input_tokens", 0),
"output_tokens": usage.get("output_tokens", 0),
"latency_ms": elapsed_ms,
"model_id": model_id,
"stop_reason": result.get("stop_reason", ""),
}
# ============================================================
# Event Handlers by DetailType
# ============================================================
def handle_payment_completed(detail: dict, chain_depth: int) -> dict:
"""
Generate AI-powered order confirmation email.
Triggered by: mangaassist.payments / PaymentCompleted
"""
order_id = detail.get("order_id", "unknown")
customer_email = detail.get("customer_email", "")
amount = detail.get("amount", "0")
currency = detail.get("currency", "JPY")
items = detail.get("items", "[]")
if isinstance(items, str):
try:
items = json.loads(items)
except json.JSONDecodeError:
items = []
# Build confirmation prompt
items_text = "\n".join(
f"- {item.get('title', 'Unknown')} (x{item.get('qty', 1)})"
for item in items
) if items else "- Order items (details in confirmation email)"
prompt = (
f"Generate a friendly, professional order confirmation message for a Japanese manga store.\n"
f"Order ID: {order_id}\n"
f"Amount: {amount} {currency}\n"
f"Items:\n{items_text}\n\n"
f"Write in a warm tone. Include the order ID, total amount, and a thank-you message. "
f"Keep it under 200 words. Write in English with a light manga/anime flavor."
)
result = invoke_claude(prompt, model_id=HAIKU_MODEL, max_tokens=512, temperature=0.5)
# Send email via SES
if customer_email:
try:
ses_client.send_email(
Source=SENDER_EMAIL,
Destination={"ToAddresses": [customer_email]},
Message={
"Subject": {"Data": f"MangaAssist Order Confirmed! #{order_id}"},
"Body": {"Text": {"Data": result["text"]}},
},
)
logger.info("Confirmation email sent: order=%s email=%s", order_id, customer_email)
except ClientError as e:
logger.error("SES send failed: %s", e)
return {"action": "confirmation_sent", "order_id": order_id, **result}
def handle_review_submitted(detail: dict, chain_depth: int) -> dict:
"""
Perform AI content moderation and sentiment analysis on customer review.
Triggered by: mangaassist.reviews / ReviewSubmitted
"""
review_id = detail.get("review_id", "unknown")
review_text = detail.get("review_text", "")
product_id = detail.get("product_id", "")
if not review_text:
return {"action": "skipped", "reason": "empty_review"}
# Content moderation check
moderation_prompt = (
f"You are a content moderator for a Japanese manga store. "
f"Analyze this customer review for inappropriate content.\n\n"
f"Review: {review_text}\n\n"
f"Respond with a JSON object:\n"
f'{{"safe": true/false, "severity": "none/low/medium/high/critical", '
f'"categories": ["list of flagged categories"], '
f'"reason": "brief explanation"}}'
)
moderation_result = invoke_claude(
moderation_prompt, model_id=HAIKU_MODEL, max_tokens=256, temperature=0.1
)
# Parse moderation response
try:
moderation = json.loads(moderation_result["text"])
except json.JSONDecodeError:
moderation = {"safe": True, "severity": "none", "categories": [], "reason": "parse_error"}
# If flagged, publish ModerationFlagged event
if not moderation.get("safe", True):
severity = moderation.get("severity", "medium")
eventbridge.put_events(Entries=[{
"Source": "mangaassist.ai",
"DetailType": "ModerationFlagged",
"Detail": json.dumps(add_chain_metadata(
{
"review_id": review_id,
"product_id": product_id,
"severity": severity,
"categories": moderation.get("categories", []),
"reason": moderation.get("reason", ""),
"content_preview": review_text[:200],
},
chain_depth,
review_id,
)),
"EventBusName": EVENT_BUS_NAME,
}])
logger.warning("Review flagged: review_id=%s severity=%s", review_id, severity)
return {
"action": "moderation_completed",
"review_id": review_id,
"safe": moderation.get("safe", True),
"severity": moderation.get("severity", "none"),
**moderation_result,
}
def handle_catalog_update(detail: dict, chain_depth: int) -> dict:
"""
Generate product description and tags using FM for catalog updates.
Triggered by: mangaassist.catalog / CatalogUpdate.*
"""
product_id = detail.get("product_id", "")
title = detail.get("title", "")
data = detail.get("data", {})
update_type = detail.get("update_type", "")
if update_type == "discontinued":
# No need to generate new descriptions for discontinued items
return {"action": "skipped", "reason": "discontinued_product", "product_id": product_id}
# Generate enhanced product description
prompt = (
f"Generate a compelling product description for this manga item.\n"
f"Title: {title}\n"
f"Author: {data.get('author', 'Unknown')}\n"
f"Genre: {data.get('genre', 'Unknown')}\n"
f"Volume: {data.get('volume', 'N/A')}\n"
f"Synopsis: {data.get('synopsis', 'No synopsis available')}\n\n"
f"Write a 2-3 sentence description that would appeal to manga fans. "
f"Also provide 5 search tags as a comma-separated list on a new line starting with 'Tags: '."
)
result = invoke_claude(prompt, model_id=HAIKU_MODEL, max_tokens=300, temperature=0.6)
# Update product in DynamoDB with AI-generated description
try:
products_table.update_item(
Key={"product_id": product_id},
UpdateExpression=(
"SET ai_description = :desc, ai_updated_at = :ts, ai_model = :model"
),
ExpressionAttributeValues={
":desc": result["text"],
":ts": datetime.now(timezone.utc).isoformat(),
":model": result["model_id"],
},
)
logger.info("Product description updated: %s", product_id)
except ClientError as e:
logger.error("DynamoDB update failed for %s: %s", product_id, e)
return {"action": "description_generated", "product_id": product_id, **result}
# ============================================================
# Event Router
# ============================================================
EVENT_HANDLERS = {
"PaymentCompleted": handle_payment_completed,
"ReviewSubmitted": handle_review_submitted,
"CatalogUpdate.new_release": handle_catalog_update,
"CatalogUpdate.price_change": handle_catalog_update,
}
# ============================================================
# Lambda Entry Point
# ============================================================
def handler(event: dict, context: Any) -> dict:
"""
EventDrivenFMTrigger Lambda entry point.
Invoked by EventBridge rules. Routes events to appropriate FM handler.
"""
detail_type = event.get("detail-type", "")
detail = event.get("detail", {})
source = event.get("source", "")
event_id = event.get("id", "unknown")
logger.info(
"Event received: source=%s type=%s id=%s",
source, detail_type, event_id,
)
# --- Anti-loop check ---
try:
chain_depth = check_chain_depth(detail)
except ValueError as e:
logger.error("Infinite loop prevented: %s", e)
return {"status": "rejected", "reason": str(e)}
# --- Route to handler ---
handler_fn = EVENT_HANDLERS.get(detail_type)
if not handler_fn:
logger.warning("No handler for detail_type: %s", detail_type)
return {"status": "unhandled", "detail_type": detail_type}
try:
result = handler_fn(detail, chain_depth)
logger.info("Event processed: type=%s result=%s", detail_type, result.get("action"))
return {"status": "success", **result}
except ClientError as e:
logger.exception("AWS service error: %s", e)
return {"status": "error", "error": str(e)}
except Exception as e:
logger.exception("Unhandled error processing event: %s", e)
raise # Let Lambda retry via EventBridge DLQ
Key Takeaways for the AIP-C01 Exam
| Concept | What to Remember |
|---|---|
| API Gateway 29s timeout | Hard limit for REST API synchronous responses; use WebSocket for streaming or async pattern for long FM calls |
| Lambda Function URL | Simpler than API Gateway for single-purpose webhooks; built-in HTTPS, no additional cost |
| HMAC webhook verification | Use hmac.compare_digest() for constant-time comparison; never use == for signature comparison |
| Idempotency pattern | DynamoDB conditional write (attribute_not_exists) + TTL cleanup; prevents duplicate FM invocations on webhook retry |
| EventBridge event size | Max 256 KB per event; truncate FM responses before publishing |
| EventBridge fan-out | Up to 5 targets per rule; each target gets a copy of the event for parallel processing |
| Event loop prevention | Use processing_stage field, source differentiation, or chain depth counter to prevent infinite loops |
| Sidecar pattern | Envoy proxy in same ECS task; adds circuit breaking, mTLS, tracing without modifying app code |
| Circuit breaker | Open after N consecutive failures; prevents cascading failures when Bedrock is throttled or unavailable |
| Model A/B testing | App Mesh traffic splitting with weighted virtual nodes; monitor per-node metrics in CloudWatch |
| Response caching | Redis cache keyed by hash(model + system_prompt + user_prompt); TTL 5 minutes; saves cost on repeated queries |
| Cost tracking | Haiku $0.25/$1.25 per 1M tokens; Sonnet $3/$15 per 1M tokens; track per-request in EventBridge events |
Quick Reference: Integration Pattern Decision Matrix
+----------------------------+------------------+----------------+-------------------+
| Scenario | API Gateway REST | Lambda Fn URL | EventBridge |
+----------------------------+------------------+----------------+-------------------+
| Client-facing chat API | YES (primary) | No | No |
| External webhook receiver | Possible | YES (simpler) | No |
| Async FM processing | No | No | YES (primary) |
| Multi-consumer fan-out | No | No | YES (up to 5) |
| Real-time streaming | WebSocket API | No | No |
| Internal microservice call | HTTP proxy | No | Possible |
| Batch FM invocation | No | No | YES + SQS + Lambda|
| Model A/B testing | App Mesh split | No | No |
+----------------------------+------------------+----------------+-------------------+