Skill 2.3.2 --- Integrated AI Architecture: Microservice, Webhook, and Event-Driven GenAI Patterns
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Mind Map --- Integrated AI Capabilities for GenAI-Enhanced Applications
Integrated AI Capabilities (Skill 2.3.2)
|
+-- 1. Microservice Integration via API Gateway
| +-- REST API endpoints for FM invocation
| | +-- /chat (synchronous, Haiku fast-path)
| | +-- /generate (async, Sonnet complex reasoning)
| | +-- /search (RAG pipeline via OpenSearch)
| | +-- /recommend (product recommendation engine)
| +-- WebSocket API for streaming responses
| | +-- $connect / $disconnect lifecycle
| | +-- sendMessage route for real-time chat
| | +-- Token-by-token streaming from Bedrock
| +-- API Gateway configuration patterns
| | +-- Usage plans and API keys for rate limiting
| | +-- Request/response mapping templates
| | +-- Custom authorizers (Lambda / Cognito)
| | +-- Stage variables for environment promotion
| +-- Integration types
| +-- Lambda proxy integration (most common)
| +-- HTTP proxy to ECS Fargate services
| +-- AWS service integration (direct Bedrock)
| +-- VPC Link for private microservices
|
+-- 2. Lambda Webhook Handlers
| +-- Inbound webhook processing
| | +-- Stripe payment events -> order confirmation AI
| | +-- GitHub/CI events -> deployment notification AI
| | +-- Third-party manga API updates -> catalog sync
| | +-- Customer review submissions -> sentiment analysis
| +-- Outbound webhook dispatch
| | +-- AI-generated summaries to Slack/Discord
| | +-- Order status updates via webhook callbacks
| | +-- Inventory alerts to supplier systems
| +-- Webhook security patterns
| | +-- HMAC signature verification
| | +-- Replay attack prevention (timestamp validation)
| | +-- IP allowlist + secret rotation
| | +-- Idempotency key handling (DynamoDB conditional writes)
| +-- Lambda configuration for webhooks
| +-- Function URL (simpler than API Gateway for single-purpose)
| +-- Provisioned concurrency for latency-sensitive hooks
| +-- Dead-letter queues for failed processing
| +-- Layers for shared webhook utilities
|
+-- 3. EventBridge Event-Driven Patterns
| +-- AI event schemas
| | +-- MangaAssist.Chat.MessageReceived
| | +-- MangaAssist.AI.InferenceCompleted
| | +-- MangaAssist.AI.ModerationFlagged
| | +-- MangaAssist.Recommendation.Generated
| +-- Event routing rules
| | +-- Content-based filtering (detail-type matching)
| | +-- Input transformation for target services
| | +-- Fan-out to multiple targets (analytics + action)
| | +-- Cross-account event forwarding
| +-- Integration targets
| | +-- Lambda (real-time processing)
| | +-- SQS (buffered batch processing)
| | +-- Step Functions (complex AI workflows)
| | +-- SNS (notification fan-out)
| | +-- Kinesis Data Firehose (analytics pipeline)
| +-- Reliability patterns
| +-- DLQ for failed event delivery
| +-- Retry policies (exponential backoff)
| +-- Archive and replay for debugging
| +-- Schema registry for contract enforcement
|
+-- 4. Service Mesh Integration
| +-- AWS App Mesh with ECS Fargate
| | +-- Envoy sidecar proxies
| | +-- Traffic splitting for AI model A/B testing
| | +-- Circuit breaker on Bedrock integration
| | +-- Mutual TLS between microservices
| +-- Service discovery
| | +-- AWS Cloud Map for dynamic endpoints
| | +-- Health checks for AI service availability
| | +-- Weighted routing for gradual AI rollout
| +-- Observability
| +-- Distributed tracing (X-Ray)
| +-- Metrics collection per AI endpoint
| +-- Access logging for audit trails
|
+-- 5. Feature Flag-Controlled AI Rollout
+-- AWS AppConfig integration
| +-- Feature flags for AI model selection
| +-- Percentage-based rollout (10% -> 50% -> 100%)
| +-- User segment targeting (premium vs free tier)
| +-- Emergency kill switch for AI features
+-- Deployment strategies
| +-- Canary deployment with CloudWatch alarms
| +-- Blue/green for model version switching
| +-- Shadow mode (dual-invoke, compare results)
+-- Configuration patterns
+-- Model parameters per feature flag
+-- Prompt templates versioned via AppConfig
+-- Cost guardrails (token budget per flag)
+-- Fallback behavior on flag evaluation failure
Architecture Diagram --- GenAI Microservice Integration
MangaAssist Integrated AI Architecture
========================================================================================
[Mobile/Web Client]
|
| HTTPS / WSS
v
+------------------+ +-------------------+ +---------------------+
| API Gateway | | API Gateway | | CloudFront |
| (REST API) | | (WebSocket API) | | (Static + API Edge) |
+--------+---------+ +--------+----------+ +----------+----------+
| | |
| Lambda Proxy | Lambda Route |
v v v
+------------------+ +-------------------+ +---------------------+
| Lambda: | | Lambda: | | S3: Static Assets |
| REST Handler | | WS Handler | | (manga images, |
| - /chat | | - $connect | | UI bundles) |
| - /search | | - sendMessage | +---------------------+
| - /recommend | | - $disconnect |
+--------+---------+ +--------+----------+
| |
| +----------------------+
| |
v v
+---------------------------+
| ECS Fargate: |
| Orchestrator Service | +---------------------+
| - Request routing |<----->| ElastiCache Redis |
| - Session management | | - Session cache |
| - Prompt engineering | | - Response cache |
| - Response formatting | | - Rate limit state |
+--------+------------------+ +---------------------+
|
+-------------------+-------------------+
| | |
v v v
+-----------------+ +-----------------+ +------------------+
| Bedrock | | OpenSearch | | DynamoDB |
| Claude 3 Sonnet | | Serverless | | - Sessions table |
| Claude 3 Haiku | | - Vector store | | - Products table |
| (FM invocation) | | - RAG retrieval | | - Orders table |
+-----------------+ +-----------------+ +------------------+
|
v
+---------------------------+ +---------------------+
| EventBridge | | Lambda: |
| (AI Event Bus) |------>| Webhook Handlers |
| - InferenceCompleted | | - Payment hooks |
| - ModerationFlagged | | - Review hooks |
| - RecommendationGenerated | | - Catalog sync |
+--------+------------------+ +---------------------+
|
+-------------------+-------------------+
| | |
v v v
+-----------------+ +-----------------+ +------------------+
| SQS: Batch | | Step Functions: | | Kinesis Firehose:|
| Processing | | Complex AI | | Analytics |
| Queue | | Workflows | | Pipeline |
+-----------------+ +-----------------+ +------------------+
|
v
+------------------+
| S3: AI Analytics |
| Data Lake |
+------------------+
========================================================================================
Event Flow Patterns:
[Sync Path] Client -> API GW -> Lambda -> Fargate -> Bedrock -> Response (< 3s)
[Async Path] Client -> API GW -> Lambda -> EventBridge -> SQS -> Batch Lambda
[Stream Path] Client -> WS API GW -> Lambda -> Fargate -> Bedrock Stream -> WS push
[Hook Path] External -> Lambda URL -> Validate -> EventBridge -> Target Lambda
Production Python Code
1. API Gateway REST Endpoint Configuration (CDK)
"""
API Gateway REST API configuration for MangaAssist GenAI integration.
Defines REST endpoints, authorizers, throttling, and Lambda integrations.
"""
import json
from aws_cdk import (
Stack,
Duration,
RemovalPolicy,
CfnOutput,
aws_apigateway as apigw,
aws_lambda as _lambda,
aws_iam as iam,
aws_logs as logs,
aws_wafv2 as wafv2,
)
from constructs import Construct
class MangaAssistApiGatewayStack(Stack):
"""CDK Stack for MangaAssist REST API Gateway with GenAI endpoints."""
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# --- Access Log Group ---
api_log_group = logs.LogGroup(
self, "MangaAssistApiLogs",
retention=logs.RetentionDays.THIRTY_DAYS,
removal_policy=RemovalPolicy.DESTROY,
)
# --- REST API Definition ---
api = apigw.RestApi(
self, "MangaAssistApi",
rest_api_name="MangaAssist-GenAI-API",
description="MangaAssist chatbot REST API with GenAI endpoints",
deploy_options=apigw.StageOptions(
stage_name="prod",
throttling_rate_limit=1000, # 1000 req/sec global
throttling_burst_limit=2000, # burst to 2000
logging_level=apigw.MethodLoggingLevel.INFO,
access_log_destination=apigw.LogGroupLogDestination(api_log_group),
access_log_format=apigw.AccessLogFormat.json_with_standard_fields(
caller=True,
http_method=True,
ip=True,
protocol=True,
request_time=True,
resource_path=True,
response_length=True,
status=True,
user=True,
),
tracing_enabled=True, # X-Ray tracing
metrics_enabled=True,
),
default_cors_preflight_options=apigw.CorsOptions(
allow_origins=["https://mangaassist.example.com"],
allow_methods=["GET", "POST", "OPTIONS"],
allow_headers=["Content-Type", "Authorization", "X-Api-Key"],
max_age=Duration.hours(1),
),
endpoint_types=[apigw.EndpointType.REGIONAL],
)
# --- Usage Plan and API Key (rate limiting per client) ---
usage_plan = api.add_usage_plan(
"MangaAssistUsagePlan",
name="MangaAssist-Standard",
description="Standard plan: 10K requests/day per API key",
throttle=apigw.ThrottleSettings(
rate_limit=100, # 100 req/sec per key
burst_limit=200,
),
quota=apigw.QuotaSettings(
limit=10_000,
period=apigw.Period.DAY,
),
)
api_key = api.add_api_key(
"MangaAssistDefaultKey",
api_key_name="manga-assist-default-key",
)
usage_plan.add_api_key(api_key)
# --- Lambda Authorizer (JWT validation) ---
authorizer_fn = _lambda.Function(
self, "AuthorizerFn",
runtime=_lambda.Runtime.PYTHON_3_12,
handler="authorizer.handler",
code=_lambda.Code.from_asset("lambda/authorizer"),
timeout=Duration.seconds(10),
memory_size=256,
environment={
"COGNITO_USER_POOL_ID": "ap-northeast-1_XXXXXXXX",
"COGNITO_CLIENT_ID": "xxxxxxxxxxxxxxxxxxxxxxxxxx",
},
)
token_authorizer = apigw.TokenAuthorizer(
self, "MangaAssistAuthorizer",
handler=authorizer_fn,
results_cache_ttl=Duration.minutes(5),
identity_source="method.request.header.Authorization",
)
# --- Lambda Functions for GenAI Endpoints ---
# Chat endpoint (synchronous, optimized for Haiku fast-path)
chat_fn = _lambda.Function(
self, "ChatFn",
runtime=_lambda.Runtime.PYTHON_3_12,
handler="chat.handler",
code=_lambda.Code.from_asset("lambda/chat"),
timeout=Duration.seconds(29), # API GW max = 29s
memory_size=1024,
environment={
"BEDROCK_MODEL_ID": "anthropic.claude-3-haiku-20240307-v1:0",
"BEDROCK_FALLBACK_MODEL_ID": "anthropic.claude-3-sonnet-20240229-v1:0",
"DYNAMODB_SESSIONS_TABLE": "MangaAssist-Sessions",
"ELASTICACHE_ENDPOINT": "manga-cache.xxxxx.apne1.cache.amazonaws.com",
"MAX_TOKENS": "1024",
"TEMPERATURE": "0.3",
},
tracing=_lambda.Tracing.ACTIVE,
)
# Search endpoint (RAG pipeline via OpenSearch)
search_fn = _lambda.Function(
self, "SearchFn",
runtime=_lambda.Runtime.PYTHON_3_12,
handler="search.handler",
code=_lambda.Code.from_asset("lambda/search"),
timeout=Duration.seconds(29),
memory_size=1024,
environment={
"OPENSEARCH_ENDPOINT": "https://xxxxx.ap-northeast-1.aoss.amazonaws.com",
"OPENSEARCH_INDEX": "manga-products-vectors",
"BEDROCK_EMBED_MODEL_ID": "amazon.titan-embed-text-v2:0",
"BEDROCK_MODEL_ID": "anthropic.claude-3-sonnet-20240229-v1:0",
"TOP_K": "5",
},
tracing=_lambda.Tracing.ACTIVE,
)
# Recommend endpoint (product recommendations)
recommend_fn = _lambda.Function(
self, "RecommendFn",
runtime=_lambda.Runtime.PYTHON_3_12,
handler="recommend.handler",
code=_lambda.Code.from_asset("lambda/recommend"),
timeout=Duration.seconds(29),
memory_size=512,
environment={
"DYNAMODB_PRODUCTS_TABLE": "MangaAssist-Products",
"BEDROCK_MODEL_ID": "anthropic.claude-3-haiku-20240307-v1:0",
"MAX_RECOMMENDATIONS": "10",
},
tracing=_lambda.Tracing.ACTIVE,
)
# Grant Bedrock invoke permissions
bedrock_policy = iam.PolicyStatement(
actions=["bedrock:InvokeModel", "bedrock:InvokeModelWithResponseStream"],
resources=["arn:aws:bedrock:ap-northeast-1::foundation-model/*"],
)
for fn in [chat_fn, search_fn, recommend_fn]:
fn.add_to_role_policy(bedrock_policy)
# --- API Resources and Methods ---
# /v1 base path
v1 = api.root.add_resource("v1")
# /v1/chat POST
chat_resource = v1.add_resource("chat")
chat_resource.add_method(
"POST",
apigw.LambdaIntegration(
chat_fn,
proxy=True,
integration_responses=[
apigw.IntegrationResponse(status_code="200"),
apigw.IntegrationResponse(
status_code="504",
selection_pattern=".*Task timed out.*",
),
],
),
authorizer=token_authorizer,
authorization_type=apigw.AuthorizationType.CUSTOM,
api_key_required=True,
method_responses=[
apigw.MethodResponse(status_code="200"),
apigw.MethodResponse(status_code="504"),
],
request_parameters={
"method.request.header.Authorization": True,
"method.request.header.X-Api-Key": True,
},
)
# /v1/search POST
search_resource = v1.add_resource("search")
search_resource.add_method(
"POST",
apigw.LambdaIntegration(search_fn, proxy=True),
authorizer=token_authorizer,
authorization_type=apigw.AuthorizationType.CUSTOM,
api_key_required=True,
)
# /v1/recommend POST
recommend_resource = v1.add_resource("recommend")
recommend_resource.add_method(
"POST",
apigw.LambdaIntegration(recommend_fn, proxy=True),
authorizer=token_authorizer,
authorization_type=apigw.AuthorizationType.CUSTOM,
api_key_required=True,
)
# --- Associate Usage Plan with API Stage ---
usage_plan.add_api_stage(
stage=api.deployment_stage,
throttle=[
apigw.ThrottlingPerMethod(
method=chat_resource.node.find_child("POST"),
throttle=apigw.ThrottleSettings(
rate_limit=500, # /chat gets higher limit
burst_limit=1000,
),
),
],
)
# --- Outputs ---
CfnOutput(self, "ApiUrl", value=api.url, description="API Gateway URL")
CfnOutput(
self, "ApiKeyId",
value=api_key.key_id,
description="API Key ID (retrieve value from console)",
)
2. Lambda Webhook Handler with HMAC Verification and Idempotency
"""
Lambda webhook handler for MangaAssist.
Handles inbound webhooks from external services (payment, review, catalog).
Implements HMAC signature verification, replay prevention, and idempotency.
"""
import json
import hashlib
import hmac
import time
import os
import logging
from datetime import datetime, timezone
from typing import Any
from decimal import Decimal
import boto3
from botocore.exceptions import ClientError
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# --- Clients ---
dynamodb = boto3.resource("dynamodb")
eventbridge = boto3.client("events")
secrets_manager = boto3.client("secretsmanager")
# --- Configuration ---
IDEMPOTENCY_TABLE = os.environ["IDEMPOTENCY_TABLE"] # DynamoDB table
EVENT_BUS_NAME = os.environ["EVENT_BUS_NAME"] # EventBridge bus
WEBHOOK_SECRET_ARN = os.environ["WEBHOOK_SECRET_ARN"] # Secrets Manager ARN
MAX_TIMESTAMP_DRIFT_SECONDS = 300 # 5-minute replay window
IDEMPOTENCY_TTL_HOURS = 24 # 24h dedup window
idempotency_table = dynamodb.Table(IDEMPOTENCY_TABLE)
# ============================================================
# Webhook Signature Verification
# ============================================================
def get_webhook_secret(source: str) -> str:
"""
Retrieve webhook signing secret from Secrets Manager.
Secrets are stored as JSON: {"stripe": "whsec_...", "github": "ghsec_..."}.
"""
response = secrets_manager.get_secret_value(SecretId=WEBHOOK_SECRET_ARN)
secrets = json.loads(response["SecretString"])
secret = secrets.get(source)
if not secret:
raise ValueError(f"No webhook secret configured for source: {source}")
return secret
def verify_hmac_signature(
payload: str,
signature: str,
secret: str,
algorithm: str = "sha256",
) -> bool:
"""
Verify HMAC signature of webhook payload.
Supports SHA-256 (Stripe, generic) and SHA-1 (GitHub legacy).
Uses constant-time comparison to prevent timing attacks.
"""
hash_func = hashlib.sha256 if algorithm == "sha256" else hashlib.sha1
expected = hmac.new(
key=secret.encode("utf-8"),
msg=payload.encode("utf-8"),
digestmod=hash_func,
).hexdigest()
# Constant-time comparison prevents timing side-channel attacks
return hmac.compare_digest(expected, signature)
def verify_timestamp(timestamp_str: str) -> bool:
"""
Verify webhook timestamp is within acceptable drift window.
Prevents replay attacks using stale webhook deliveries.
"""
try:
webhook_ts = int(timestamp_str)
except (ValueError, TypeError):
logger.warning("Invalid timestamp format: %s", timestamp_str)
return False
current_ts = int(time.time())
drift = abs(current_ts - webhook_ts)
if drift > MAX_TIMESTAMP_DRIFT_SECONDS:
logger.warning(
"Timestamp drift too large: %d seconds (max %d)",
drift,
MAX_TIMESTAMP_DRIFT_SECONDS,
)
return False
return True
# ============================================================
# Idempotency via DynamoDB Conditional Writes
# ============================================================
def check_and_claim_idempotency(idempotency_key: str) -> bool:
"""
Attempt to claim an idempotency key using DynamoDB conditional write.
Returns True if this is the first time we've seen this key (claimed).
Returns False if the key was already processed (duplicate delivery).
TTL-based cleanup: entries expire after IDEMPOTENCY_TTL_HOURS.
"""
ttl = int(time.time()) + (IDEMPOTENCY_TTL_HOURS * 3600)
try:
idempotency_table.put_item(
Item={
"idempotency_key": idempotency_key,
"claimed_at": datetime.now(timezone.utc).isoformat(),
"status": "processing",
"ttl": ttl,
},
ConditionExpression="attribute_not_exists(idempotency_key)",
)
logger.info("Claimed idempotency key: %s", idempotency_key)
return True
except ClientError as e:
if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
logger.info("Duplicate webhook delivery detected: %s", idempotency_key)
return False
raise
def mark_idempotency_complete(idempotency_key: str, result: dict) -> None:
"""Mark an idempotency entry as successfully processed."""
idempotency_table.update_item(
Key={"idempotency_key": idempotency_key},
UpdateExpression="SET #s = :status, completed_at = :ts, result_summary = :r",
ExpressionAttributeNames={"#s": "status"},
ExpressionAttributeValues={
":status": "completed",
":ts": datetime.now(timezone.utc).isoformat(),
":r": json.dumps(result, default=str),
},
)
def mark_idempotency_failed(idempotency_key: str, error: str) -> None:
"""Mark an idempotency entry as failed (allows retry)."""
idempotency_table.delete_item(Key={"idempotency_key": idempotency_key})
logger.warning("Removed failed idempotency key for retry: %s (%s)", idempotency_key, error)
# ============================================================
# Webhook Source Handlers
# ============================================================
def handle_stripe_payment(payload: dict) -> dict:
"""
Process Stripe payment webhook.
On successful payment, trigger AI-generated order confirmation via EventBridge.
"""
event_type = payload.get("type", "")
data = payload.get("data", {}).get("object", {})
if event_type == "checkout.session.completed":
order_id = data.get("metadata", {}).get("order_id", "unknown")
customer_email = data.get("customer_email", "")
amount = data.get("amount_total", 0) / 100 # cents to dollars
currency = data.get("currency", "jpy").upper()
# Publish to EventBridge for AI-generated order confirmation
event_detail = {
"order_id": order_id,
"customer_email": customer_email,
"amount": str(amount),
"currency": currency,
"items": data.get("metadata", {}).get("items", "[]"),
"action": "generate_confirmation",
}
eventbridge.put_events(
Entries=[{
"Source": "mangaassist.payments",
"DetailType": "PaymentCompleted",
"Detail": json.dumps(event_detail),
"EventBusName": EVENT_BUS_NAME,
}],
)
logger.info("Payment event published for order: %s", order_id)
return {"processed": True, "order_id": order_id, "action": "confirmation_triggered"}
elif event_type == "charge.refunded":
# Handle refund notification
charge_id = data.get("id", "unknown")
eventbridge.put_events(
Entries=[{
"Source": "mangaassist.payments",
"DetailType": "RefundProcessed",
"Detail": json.dumps({"charge_id": charge_id, "amount": str(data.get("amount_refunded", 0))}),
"EventBusName": EVENT_BUS_NAME,
}],
)
return {"processed": True, "charge_id": charge_id, "action": "refund_notification"}
logger.info("Unhandled Stripe event type: %s", event_type)
return {"processed": False, "reason": f"unhandled_event_type: {event_type}"}
def handle_review_submission(payload: dict) -> dict:
"""
Process customer review submission webhook.
Triggers AI sentiment analysis and moderation via EventBridge.
"""
review_id = payload.get("review_id", "unknown")
product_id = payload.get("product_id", "")
review_text = payload.get("text", "")
rating = payload.get("rating", 0)
customer_id = payload.get("customer_id", "")
if not review_text or not product_id:
return {"processed": False, "reason": "missing_required_fields"}
# Publish to EventBridge for AI moderation + sentiment analysis
event_detail = {
"review_id": review_id,
"product_id": product_id,
"review_text": review_text[:2000], # Truncate to avoid oversized events
"rating": rating,
"customer_id": customer_id,
"actions": ["sentiment_analysis", "content_moderation", "response_generation"],
}
eventbridge.put_events(
Entries=[{
"Source": "mangaassist.reviews",
"DetailType": "ReviewSubmitted",
"Detail": json.dumps(event_detail),
"EventBusName": EVENT_BUS_NAME,
}],
)
logger.info("Review event published for review_id=%s product_id=%s", review_id, product_id)
return {"processed": True, "review_id": review_id, "actions_triggered": 3}
def handle_catalog_update(payload: dict) -> dict:
"""
Process manga catalog update webhook from publisher API.
Triggers embedding regeneration and search index update.
"""
update_type = payload.get("type", "") # new_release | price_change | discontinued
items = payload.get("items", [])
if not items:
return {"processed": False, "reason": "empty_items_list"}
# Batch publish events (EventBridge supports up to 10 entries per call)
entries = []
for item in items[:10]: # Cap at 10 per batch
entries.append({
"Source": "mangaassist.catalog",
"DetailType": f"CatalogUpdate.{update_type}",
"Detail": json.dumps({
"product_id": item.get("id", ""),
"title": item.get("title", ""),
"update_type": update_type,
"data": item,
"actions": ["regenerate_embedding", "update_search_index", "notify_subscribers"],
}),
"EventBusName": EVENT_BUS_NAME,
})
response = eventbridge.put_events(Entries=entries)
failed_count = response.get("FailedEntryCount", 0)
if failed_count > 0:
logger.error("Failed to publish %d/%d catalog events", failed_count, len(entries))
return {
"processed": True,
"update_type": update_type,
"total_items": len(items),
"published": len(entries) - failed_count,
"failed": failed_count,
}
# ============================================================
# Source Routing Table
# ============================================================
WEBHOOK_HANDLERS = {
"stripe": {
"handler": handle_stripe_payment,
"signature_header": "Stripe-Signature",
"timestamp_header": None, # Stripe embeds timestamp in signature
"algorithm": "sha256",
"extract_idempotency_key": lambda e: e.get("body", {}).get("id", ""),
},
"reviews": {
"handler": handle_review_submission,
"signature_header": "X-Webhook-Signature",
"timestamp_header": "X-Webhook-Timestamp",
"algorithm": "sha256",
"extract_idempotency_key": lambda e: e.get("body", {}).get("review_id", ""),
},
"catalog": {
"handler": handle_catalog_update,
"signature_header": "X-Catalog-Signature",
"timestamp_header": "X-Catalog-Timestamp",
"algorithm": "sha256",
"extract_idempotency_key": lambda e: e.get("body", {}).get("batch_id", ""),
},
}
# ============================================================
# Lambda Entry Point
# ============================================================
def handler(event: dict, context: Any) -> dict:
"""
Main Lambda handler for webhook processing.
Routes webhooks by source, verifies signatures, enforces idempotency.
Invoked via Lambda Function URL:
POST https://<function-url>.lambda-url.ap-northeast-1.on.aws/{source}
Path parameter 'source' determines which handler processes the webhook.
"""
logger.info("Webhook event received: requestId=%s", context.aws_request_id)
try:
# Extract source from path
path = event.get("rawPath", event.get("path", ""))
source = path.strip("/").split("/")[-1] if path else ""
if source not in WEBHOOK_HANDLERS:
logger.warning("Unknown webhook source: %s", source)
return _response(404, {"error": f"Unknown webhook source: {source}"})
config = WEBHOOK_HANDLERS[source]
headers = event.get("headers", {})
raw_body = event.get("body", "")
# --- Step 1: Verify HMAC signature ---
signature = headers.get(config["signature_header"].lower(), "")
if not signature:
logger.warning("Missing signature header for source: %s", source)
return _response(401, {"error": "Missing webhook signature"})
secret = get_webhook_secret(source)
if not verify_hmac_signature(raw_body, signature, secret, config["algorithm"]):
logger.warning("Invalid signature for source: %s", source)
return _response(403, {"error": "Invalid webhook signature"})
# --- Step 2: Verify timestamp (replay prevention) ---
ts_header = config.get("timestamp_header")
if ts_header:
timestamp = headers.get(ts_header.lower(), "")
if not verify_timestamp(timestamp):
return _response(403, {"error": "Webhook timestamp outside acceptable window"})
# --- Step 3: Parse body ---
try:
payload = json.loads(raw_body) if isinstance(raw_body, str) else raw_body
except json.JSONDecodeError:
return _response(400, {"error": "Invalid JSON payload"})
# --- Step 4: Idempotency check ---
idempotency_key = f"{source}:{config['extract_idempotency_key'](payload)}"
if not idempotency_key or idempotency_key == f"{source}:":
idempotency_key = f"{source}:{context.aws_request_id}"
if not check_and_claim_idempotency(idempotency_key):
return _response(200, {"status": "already_processed", "idempotency_key": idempotency_key})
# --- Step 5: Process webhook ---
try:
result = config["handler"](payload)
mark_idempotency_complete(idempotency_key, result)
logger.info("Webhook processed successfully: source=%s key=%s", source, idempotency_key)
return _response(200, {"status": "processed", **result})
except Exception as processing_error:
mark_idempotency_failed(idempotency_key, str(processing_error))
raise
except Exception as e:
logger.exception("Webhook processing failed: %s", str(e))
return _response(500, {"error": "Internal processing error"})
def _response(status_code: int, body: dict) -> dict:
"""Build Lambda Function URL / API Gateway proxy response."""
return {
"statusCode": status_code,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(body, default=str),
}
3. EventBridge Rule Configuration and AI Event Publishing
"""
EventBridge configuration for MangaAssist AI event-driven integration.
Defines event bus, rules, targets, and event publishing utilities.
CDK infrastructure + runtime event publisher.
"""
import json
import os
import time
import logging
from datetime import datetime, timezone
from typing import Optional
import boto3
from aws_cdk import (
Stack,
Duration,
RemovalPolicy,
aws_events as events,
aws_events_targets as targets,
aws_lambda as _lambda,
aws_sqs as sqs,
aws_iam as iam,
aws_logs as logs,
)
from constructs import Construct
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# ============================================================
# CDK: EventBridge Infrastructure
# ============================================================
class MangaAssistEventBridgeStack(Stack):
"""CDK Stack for MangaAssist EventBridge event-driven AI integration."""
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# --- Custom Event Bus ---
ai_event_bus = events.EventBus(
self, "MangaAssistAIBus",
event_bus_name="MangaAssist-AI-Events",
)
# Event archive for replay/debugging (30-day retention)
events.CfnArchive(
self, "AIEventArchive",
source_arn=ai_event_bus.event_bus_arn,
archive_name="manga-assist-ai-archive",
retention_days=30,
event_pattern={
"source": [{"prefix": "mangaassist."}],
},
)
# --- Dead Letter Queue for failed deliveries ---
event_dlq = sqs.Queue(
self, "EventDLQ",
queue_name="MangaAssist-EventBridge-DLQ",
retention_period=Duration.days(14),
visibility_timeout=Duration.seconds(300),
)
# --- Lambda: AI Inference Post-Processor ---
inference_processor_fn = _lambda.Function(
self, "InferenceProcessorFn",
runtime=_lambda.Runtime.PYTHON_3_12,
handler="inference_processor.handler",
code=_lambda.Code.from_asset("lambda/inference_processor"),
timeout=Duration.seconds(60),
memory_size=512,
environment={
"DYNAMODB_TABLE": "MangaAssist-Sessions",
"NOTIFICATION_TOPIC_ARN": "arn:aws:sns:ap-northeast-1:123456789012:MangaAssist-Notifications",
},
)
# --- Lambda: Content Moderation Handler ---
moderation_fn = _lambda.Function(
self, "ModerationFn",
runtime=_lambda.Runtime.PYTHON_3_12,
handler="moderation.handler",
code=_lambda.Code.from_asset("lambda/moderation"),
timeout=Duration.seconds(30),
memory_size=512,
environment={
"BEDROCK_MODEL_ID": "anthropic.claude-3-haiku-20240307-v1:0",
"MODERATION_THRESHOLD": "0.8",
"QUARANTINE_TABLE": "MangaAssist-Quarantined",
},
)
# --- Lambda: Embedding Regeneration Handler ---
embedding_fn = _lambda.Function(
self, "EmbeddingFn",
runtime=_lambda.Runtime.PYTHON_3_12,
handler="embedding.handler",
code=_lambda.Code.from_asset("lambda/embedding"),
timeout=Duration.minutes(5),
memory_size=1024,
environment={
"OPENSEARCH_ENDPOINT": "https://xxxxx.ap-northeast-1.aoss.amazonaws.com",
"BEDROCK_EMBED_MODEL_ID": "amazon.titan-embed-text-v2:0",
"OPENSEARCH_INDEX": "manga-products-vectors",
},
)
# --- Lambda: Analytics Event Processor ---
analytics_fn = _lambda.Function(
self, "AnalyticsFn",
runtime=_lambda.Runtime.PYTHON_3_12,
handler="analytics.handler",
code=_lambda.Code.from_asset("lambda/analytics"),
timeout=Duration.seconds(30),
memory_size=256,
environment={
"FIREHOSE_STREAM_NAME": "MangaAssist-AI-Analytics",
},
)
# ============================================================
# EventBridge Rules
# ============================================================
# Rule 1: AI Inference Completed -> Post-processing
events.Rule(
self, "InferenceCompletedRule",
rule_name="MangaAssist-InferenceCompleted",
event_bus=ai_event_bus,
description="Route completed AI inferences to post-processor",
event_pattern=events.EventPattern(
source=["mangaassist.ai"],
detail_type=["InferenceCompleted"],
detail={
"status": ["success"],
"model_id": [{"prefix": "anthropic.claude"}],
},
),
targets=[
targets.LambdaFunction(
inference_processor_fn,
dead_letter_queue=event_dlq,
retry_attempts=2,
max_event_age=Duration.hours(1),
),
targets.LambdaFunction(
analytics_fn,
dead_letter_queue=event_dlq,
retry_attempts=2,
),
],
)
# Rule 2: Content Moderation Flagged -> Quarantine + Alert
events.Rule(
self, "ModerationFlaggedRule",
rule_name="MangaAssist-ModerationFlagged",
event_bus=ai_event_bus,
description="Route flagged content to moderation handler",
event_pattern=events.EventPattern(
source=["mangaassist.ai"],
detail_type=["ModerationFlagged"],
detail={
"severity": ["high", "critical"],
},
),
targets=[
targets.LambdaFunction(
moderation_fn,
dead_letter_queue=event_dlq,
retry_attempts=3,
),
],
)
# Rule 3: Catalog Update -> Embedding Regeneration
events.Rule(
self, "CatalogUpdateRule",
rule_name="MangaAssist-CatalogUpdate",
event_bus=ai_event_bus,
description="Regenerate embeddings on catalog changes",
event_pattern=events.EventPattern(
source=["mangaassist.catalog"],
detail_type=[
"CatalogUpdate.new_release",
"CatalogUpdate.price_change",
"CatalogUpdate.discontinued",
],
),
targets=[
targets.LambdaFunction(
embedding_fn,
dead_letter_queue=event_dlq,
retry_attempts=2,
max_event_age=Duration.hours(6),
),
],
)
# Rule 4: Payment Completed -> AI Confirmation Generator
events.Rule(
self, "PaymentCompletedRule",
rule_name="MangaAssist-PaymentCompleted",
event_bus=ai_event_bus,
description="Generate AI order confirmations after payment",
event_pattern=events.EventPattern(
source=["mangaassist.payments"],
detail_type=["PaymentCompleted"],
),
targets=[
targets.LambdaFunction(
inference_processor_fn,
dead_letter_queue=event_dlq,
retry_attempts=3,
),
],
)
# Rule 5: Review Submitted -> Sentiment Analysis + Moderation
events.Rule(
self, "ReviewSubmittedRule",
rule_name="MangaAssist-ReviewSubmitted",
event_bus=ai_event_bus,
description="Analyze and moderate customer reviews with AI",
event_pattern=events.EventPattern(
source=["mangaassist.reviews"],
detail_type=["ReviewSubmitted"],
),
targets=[
targets.LambdaFunction(moderation_fn, dead_letter_queue=event_dlq),
targets.LambdaFunction(analytics_fn, dead_letter_queue=event_dlq),
],
)
# Rule 6: Catch-all for analytics (every mangaassist event)
events.Rule(
self, "AllEventsAnalyticsRule",
rule_name="MangaAssist-AllEvents-Analytics",
event_bus=ai_event_bus,
description="Send all MangaAssist events to analytics pipeline",
event_pattern=events.EventPattern(
source=[{"prefix": "mangaassist."}],
),
targets=[
targets.LambdaFunction(analytics_fn, dead_letter_queue=event_dlq),
],
)
# ============================================================
# Runtime: Event Publisher Utility
# ============================================================
class MangaAssistEventPublisher:
"""
Utility class for publishing AI events to EventBridge.
Used by ECS Fargate orchestrator and Lambda functions.
"""
def __init__(self, event_bus_name: Optional[str] = None):
self.client = boto3.client("events")
self.event_bus_name = event_bus_name or os.environ.get(
"EVENT_BUS_NAME", "MangaAssist-AI-Events"
)
def publish_inference_completed(
self,
session_id: str,
model_id: str,
input_tokens: int,
output_tokens: int,
latency_ms: int,
response_summary: str,
) -> dict:
"""Publish an InferenceCompleted event after FM invocation."""
detail = {
"session_id": session_id,
"model_id": model_id,
"status": "success",
"metrics": {
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"latency_ms": latency_ms,
"estimated_cost_usd": self._estimate_cost(model_id, input_tokens, output_tokens),
},
"response_summary": response_summary[:500],
"timestamp": datetime.now(timezone.utc).isoformat(),
}
return self._put_event("mangaassist.ai", "InferenceCompleted", detail)
def publish_moderation_flagged(
self,
session_id: str,
content: str,
severity: str,
categories: list[str],
model_id: str,
) -> dict:
"""Publish a ModerationFlagged event when content fails moderation."""
detail = {
"session_id": session_id,
"content_hash": hashlib.sha256(content.encode()).hexdigest(),
"content_preview": content[:200],
"severity": severity, # low | medium | high | critical
"flagged_categories": categories,
"model_id": model_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
return self._put_event("mangaassist.ai", "ModerationFlagged", detail)
def publish_recommendation_generated(
self,
customer_id: str,
product_ids: list[str],
strategy: str,
model_id: str,
) -> dict:
"""Publish a RecommendationGenerated event."""
detail = {
"customer_id": customer_id,
"recommended_product_ids": product_ids,
"strategy": strategy,
"model_id": model_id,
"count": len(product_ids),
"timestamp": datetime.now(timezone.utc).isoformat(),
}
return self._put_event("mangaassist.ai", "RecommendationGenerated", detail)
def _put_event(self, source: str, detail_type: str, detail: dict) -> dict:
"""Put a single event to EventBridge."""
try:
response = self.client.put_events(
Entries=[{
"Source": source,
"DetailType": detail_type,
"Detail": json.dumps(detail, default=str),
"EventBusName": self.event_bus_name,
}],
)
failed = response.get("FailedEntryCount", 0)
if failed > 0:
logger.error(
"Failed to publish event: source=%s type=%s errors=%s",
source,
detail_type,
response.get("Entries", []),
)
return response
except Exception as e:
logger.exception("EventBridge publish failed: %s", str(e))
raise
@staticmethod
def _estimate_cost(model_id: str, input_tokens: int, output_tokens: int) -> str:
"""
Estimate cost for Bedrock Claude 3 invocation.
Sonnet: $3/$15 per 1M tokens (input/output)
Haiku: $0.25/$1.25 per 1M tokens (input/output)
"""
if "sonnet" in model_id.lower():
cost = (input_tokens * 3.0 / 1_000_000) + (output_tokens * 15.0 / 1_000_000)
elif "haiku" in model_id.lower():
cost = (input_tokens * 0.25 / 1_000_000) + (output_tokens * 1.25 / 1_000_000)
else:
cost = 0.0
return f"{cost:.6f}"
# ============================================================
# Feature Flag Integration (AppConfig)
# ============================================================
import hashlib
class FeatureFlagController:
"""
AWS AppConfig feature flag controller for AI feature rollout.
Controls which AI features are active, model selection, and rollout percentage.
"""
def __init__(self):
self.appconfig_client = boto3.client("appconfigdata")
self.app_id = os.environ.get("APPCONFIG_APP_ID", "")
self.env_id = os.environ.get("APPCONFIG_ENV_ID", "")
self.config_profile_id = os.environ.get("APPCONFIG_PROFILE_ID", "")
self._config_cache: dict = {}
self._cache_token: Optional[str] = None
self._cache_expiry: float = 0
def get_flags(self) -> dict:
"""
Retrieve current feature flags from AppConfig.
Caches locally for 30 seconds to reduce API calls.
"""
now = time.time()
if self._config_cache and now < self._cache_expiry:
return self._config_cache
try:
if not self._cache_token:
session_response = self.appconfig_client.start_configuration_session(
ApplicationIdentifier=self.app_id,
EnvironmentIdentifier=self.env_id,
ConfigurationProfileIdentifier=self.config_profile_id,
RequiredMinimumPollIntervalInSeconds=30,
)
self._cache_token = session_response["InitialConfigurationToken"]
config_response = self.appconfig_client.get_latest_configuration(
ConfigurationToken=self._cache_token,
)
self._cache_token = config_response["NextPollConfigurationToken"]
content = config_response["Configuration"].read()
if content:
self._config_cache = json.loads(content)
self._cache_expiry = now + 30
return self._config_cache
except Exception as e:
logger.warning("AppConfig fetch failed, using cached config: %s", str(e))
return self._config_cache
def is_enabled(self, flag_name: str, user_id: str = "") -> bool:
"""
Check if a feature flag is enabled for a given user.
Supports percentage-based rollout using consistent hashing.
"""
flags = self.get_flags()
flag = flags.get(flag_name, {})
if not flag.get("enabled", False):
return False
rollout_pct = flag.get("rollout_percentage", 100)
if rollout_pct >= 100:
return True
if rollout_pct <= 0:
return False
# Consistent hash: same user always gets same result for same flag
hash_input = f"{flag_name}:{user_id}".encode()
hash_value = int(hashlib.md5(hash_input).hexdigest(), 16) % 100
return hash_value < rollout_pct
def get_model_id(self, flag_name: str, user_id: str = "") -> str:
"""
Get the model ID configured for a feature flag.
Falls back to Haiku if flag is disabled or missing.
"""
if not self.is_enabled(flag_name, user_id):
return "anthropic.claude-3-haiku-20240307-v1:0"
flags = self.get_flags()
flag = flags.get(flag_name, {})
return flag.get("model_id", "anthropic.claude-3-haiku-20240307-v1:0")
Key Takeaways for the AIP-C01 Exam
| Concept | What to Remember |
|---|---|
| API Gateway integration types | Lambda proxy (most common), HTTP proxy, AWS service, VPC Link |
| API Gateway timeout limit | 29 seconds max for REST API; use WebSocket for streaming |
| Lambda Function URL | Simpler than API Gateway for single-purpose webhooks; built-in HTTPS |
| Webhook security | HMAC signature verification + timestamp validation + idempotency |
| Idempotency pattern | DynamoDB conditional writes with TTL-based cleanup |
| EventBridge event size | Max 256 KB per event; truncate FM responses before publishing |
| EventBridge targets | Up to 5 targets per rule; each gets a copy of the event |
| EventBridge retry | Built-in retry with exponential backoff; DLQ for permanent failures |
| Feature flags | AppConfig for AI rollout; consistent hashing for user-level percentage |
| Cost estimation | Sonnet $3/$15 per 1M input/output; Haiku $0.25/$1.25 per 1M |
| Fargate orchestrator | Central service for routing, session management, prompt engineering |
Quick Reference: Integration Selection Guide
Decision: Which integration pattern for your GenAI use case?
Need real-time response < 3s?
YES --> API Gateway REST + Lambda + Bedrock (sync path)
Need token-by-token streaming?
YES --> API Gateway WebSocket + ECS Fargate + Bedrock streaming
Processing can be deferred?
YES --> EventBridge + SQS + Lambda (async path)
Receiving external system callbacks?
YES --> Lambda Function URL or API Gateway + webhook handler
Need complex multi-step AI workflow?
YES --> EventBridge + Step Functions (orchestrated async)
Need to fan out AI results to multiple consumers?
YES --> EventBridge rules with multiple targets
Need gradual AI feature rollout?
YES --> AppConfig feature flags + consistent hashing