LOCAL PREVIEW View on GitHub

Skill 2.3.2 --- Integrated AI Architecture: Microservice, Webhook, and Event-Driven GenAI Patterns

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Mind Map --- Integrated AI Capabilities for GenAI-Enhanced Applications

Integrated AI Capabilities (Skill 2.3.2)
|
+-- 1. Microservice Integration via API Gateway
|   +-- REST API endpoints for FM invocation
|   |   +-- /chat (synchronous, Haiku fast-path)
|   |   +-- /generate (async, Sonnet complex reasoning)
|   |   +-- /search (RAG pipeline via OpenSearch)
|   |   +-- /recommend (product recommendation engine)
|   +-- WebSocket API for streaming responses
|   |   +-- $connect / $disconnect lifecycle
|   |   +-- sendMessage route for real-time chat
|   |   +-- Token-by-token streaming from Bedrock
|   +-- API Gateway configuration patterns
|   |   +-- Usage plans and API keys for rate limiting
|   |   +-- Request/response mapping templates
|   |   +-- Custom authorizers (Lambda / Cognito)
|   |   +-- Stage variables for environment promotion
|   +-- Integration types
|       +-- Lambda proxy integration (most common)
|       +-- HTTP proxy to ECS Fargate services
|       +-- AWS service integration (direct Bedrock)
|       +-- VPC Link for private microservices
|
+-- 2. Lambda Webhook Handlers
|   +-- Inbound webhook processing
|   |   +-- Stripe payment events -> order confirmation AI
|   |   +-- GitHub/CI events -> deployment notification AI
|   |   +-- Third-party manga API updates -> catalog sync
|   |   +-- Customer review submissions -> sentiment analysis
|   +-- Outbound webhook dispatch
|   |   +-- AI-generated summaries to Slack/Discord
|   |   +-- Order status updates via webhook callbacks
|   |   +-- Inventory alerts to supplier systems
|   +-- Webhook security patterns
|   |   +-- HMAC signature verification
|   |   +-- Replay attack prevention (timestamp validation)
|   |   +-- IP allowlist + secret rotation
|   |   +-- Idempotency key handling (DynamoDB conditional writes)
|   +-- Lambda configuration for webhooks
|       +-- Function URL (simpler than API Gateway for single-purpose)
|       +-- Provisioned concurrency for latency-sensitive hooks
|       +-- Dead-letter queues for failed processing
|       +-- Layers for shared webhook utilities
|
+-- 3. EventBridge Event-Driven Patterns
|   +-- AI event schemas
|   |   +-- MangaAssist.Chat.MessageReceived
|   |   +-- MangaAssist.AI.InferenceCompleted
|   |   +-- MangaAssist.AI.ModerationFlagged
|   |   +-- MangaAssist.Recommendation.Generated
|   +-- Event routing rules
|   |   +-- Content-based filtering (detail-type matching)
|   |   +-- Input transformation for target services
|   |   +-- Fan-out to multiple targets (analytics + action)
|   |   +-- Cross-account event forwarding
|   +-- Integration targets
|   |   +-- Lambda (real-time processing)
|   |   +-- SQS (buffered batch processing)
|   |   +-- Step Functions (complex AI workflows)
|   |   +-- SNS (notification fan-out)
|   |   +-- Kinesis Data Firehose (analytics pipeline)
|   +-- Reliability patterns
|       +-- DLQ for failed event delivery
|       +-- Retry policies (exponential backoff)
|       +-- Archive and replay for debugging
|       +-- Schema registry for contract enforcement
|
+-- 4. Service Mesh Integration
|   +-- AWS App Mesh with ECS Fargate
|   |   +-- Envoy sidecar proxies
|   |   +-- Traffic splitting for AI model A/B testing
|   |   +-- Circuit breaker on Bedrock integration
|   |   +-- Mutual TLS between microservices
|   +-- Service discovery
|   |   +-- AWS Cloud Map for dynamic endpoints
|   |   +-- Health checks for AI service availability
|   |   +-- Weighted routing for gradual AI rollout
|   +-- Observability
|       +-- Distributed tracing (X-Ray)
|       +-- Metrics collection per AI endpoint
|       +-- Access logging for audit trails
|
+-- 5. Feature Flag-Controlled AI Rollout
    +-- AWS AppConfig integration
    |   +-- Feature flags for AI model selection
    |   +-- Percentage-based rollout (10% -> 50% -> 100%)
    |   +-- User segment targeting (premium vs free tier)
    |   +-- Emergency kill switch for AI features
    +-- Deployment strategies
    |   +-- Canary deployment with CloudWatch alarms
    |   +-- Blue/green for model version switching
    |   +-- Shadow mode (dual-invoke, compare results)
    +-- Configuration patterns
        +-- Model parameters per feature flag
        +-- Prompt templates versioned via AppConfig
        +-- Cost guardrails (token budget per flag)
        +-- Fallback behavior on flag evaluation failure

Architecture Diagram --- GenAI Microservice Integration

                              MangaAssist Integrated AI Architecture
    ========================================================================================

    [Mobile/Web Client]
           |
           | HTTPS / WSS
           v
    +------------------+       +-------------------+       +---------------------+
    | API Gateway      |       | API Gateway       |       | CloudFront          |
    | (REST API)       |       | (WebSocket API)   |       | (Static + API Edge) |
    +--------+---------+       +--------+----------+       +----------+----------+
             |                          |                              |
             | Lambda Proxy             | Lambda Route                 |
             v                          v                              v
    +------------------+       +-------------------+       +---------------------+
    | Lambda:          |       | Lambda:           |       | S3: Static Assets   |
    | REST Handler     |       | WS Handler        |       | (manga images,      |
    | - /chat          |       | - $connect        |       |  UI bundles)        |
    | - /search        |       | - sendMessage     |       +---------------------+
    | - /recommend     |       | - $disconnect     |
    +--------+---------+       +--------+----------+
             |                          |
             |   +----------------------+
             |   |
             v   v
    +---------------------------+
    | ECS Fargate:              |
    | Orchestrator Service      |       +---------------------+
    | - Request routing         |<----->| ElastiCache Redis   |
    | - Session management      |       | - Session cache     |
    | - Prompt engineering      |       | - Response cache    |
    | - Response formatting     |       | - Rate limit state  |
    +--------+------------------+       +---------------------+
             |
             +-------------------+-------------------+
             |                   |                   |
             v                   v                   v
    +-----------------+ +-----------------+ +------------------+
    | Bedrock         | | OpenSearch      | | DynamoDB         |
    | Claude 3 Sonnet | | Serverless      | | - Sessions table |
    | Claude 3 Haiku  | | - Vector store  | | - Products table |
    | (FM invocation) | | - RAG retrieval | | - Orders table   |
    +-----------------+ +-----------------+ +------------------+
             |
             v
    +---------------------------+       +---------------------+
    | EventBridge               |       | Lambda:             |
    | (AI Event Bus)            |------>| Webhook Handlers    |
    | - InferenceCompleted      |       | - Payment hooks     |
    | - ModerationFlagged       |       | - Review hooks      |
    | - RecommendationGenerated |       | - Catalog sync      |
    +--------+------------------+       +---------------------+
             |
             +-------------------+-------------------+
             |                   |                   |
             v                   v                   v
    +-----------------+ +-----------------+ +------------------+
    | SQS: Batch      | | Step Functions: | | Kinesis Firehose:|
    | Processing      | | Complex AI      | | Analytics        |
    | Queue           | | Workflows       | | Pipeline         |
    +-----------------+ +-----------------+ +------------------+
                                                     |
                                                     v
                                            +------------------+
                                            | S3: AI Analytics  |
                                            | Data Lake         |
                                            +------------------+

    ========================================================================================
    Event Flow Patterns:

    [Sync Path]  Client -> API GW -> Lambda -> Fargate -> Bedrock -> Response (< 3s)
    [Async Path] Client -> API GW -> Lambda -> EventBridge -> SQS -> Batch Lambda
    [Stream Path] Client -> WS API GW -> Lambda -> Fargate -> Bedrock Stream -> WS push
    [Hook Path]  External -> Lambda URL -> Validate -> EventBridge -> Target Lambda

Production Python Code

1. API Gateway REST Endpoint Configuration (CDK)

"""
API Gateway REST API configuration for MangaAssist GenAI integration.
Defines REST endpoints, authorizers, throttling, and Lambda integrations.
"""

import json
from aws_cdk import (
    Stack,
    Duration,
    RemovalPolicy,
    CfnOutput,
    aws_apigateway as apigw,
    aws_lambda as _lambda,
    aws_iam as iam,
    aws_logs as logs,
    aws_wafv2 as wafv2,
)
from constructs import Construct


class MangaAssistApiGatewayStack(Stack):
    """CDK Stack for MangaAssist REST API Gateway with GenAI endpoints."""

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # --- Access Log Group ---
        api_log_group = logs.LogGroup(
            self, "MangaAssistApiLogs",
            retention=logs.RetentionDays.THIRTY_DAYS,
            removal_policy=RemovalPolicy.DESTROY,
        )

        # --- REST API Definition ---
        api = apigw.RestApi(
            self, "MangaAssistApi",
            rest_api_name="MangaAssist-GenAI-API",
            description="MangaAssist chatbot REST API with GenAI endpoints",
            deploy_options=apigw.StageOptions(
                stage_name="prod",
                throttling_rate_limit=1000,       # 1000 req/sec global
                throttling_burst_limit=2000,       # burst to 2000
                logging_level=apigw.MethodLoggingLevel.INFO,
                access_log_destination=apigw.LogGroupLogDestination(api_log_group),
                access_log_format=apigw.AccessLogFormat.json_with_standard_fields(
                    caller=True,
                    http_method=True,
                    ip=True,
                    protocol=True,
                    request_time=True,
                    resource_path=True,
                    response_length=True,
                    status=True,
                    user=True,
                ),
                tracing_enabled=True,              # X-Ray tracing
                metrics_enabled=True,
            ),
            default_cors_preflight_options=apigw.CorsOptions(
                allow_origins=["https://mangaassist.example.com"],
                allow_methods=["GET", "POST", "OPTIONS"],
                allow_headers=["Content-Type", "Authorization", "X-Api-Key"],
                max_age=Duration.hours(1),
            ),
            endpoint_types=[apigw.EndpointType.REGIONAL],
        )

        # --- Usage Plan and API Key (rate limiting per client) ---
        usage_plan = api.add_usage_plan(
            "MangaAssistUsagePlan",
            name="MangaAssist-Standard",
            description="Standard plan: 10K requests/day per API key",
            throttle=apigw.ThrottleSettings(
                rate_limit=100,                    # 100 req/sec per key
                burst_limit=200,
            ),
            quota=apigw.QuotaSettings(
                limit=10_000,
                period=apigw.Period.DAY,
            ),
        )

        api_key = api.add_api_key(
            "MangaAssistDefaultKey",
            api_key_name="manga-assist-default-key",
        )
        usage_plan.add_api_key(api_key)

        # --- Lambda Authorizer (JWT validation) ---
        authorizer_fn = _lambda.Function(
            self, "AuthorizerFn",
            runtime=_lambda.Runtime.PYTHON_3_12,
            handler="authorizer.handler",
            code=_lambda.Code.from_asset("lambda/authorizer"),
            timeout=Duration.seconds(10),
            memory_size=256,
            environment={
                "COGNITO_USER_POOL_ID": "ap-northeast-1_XXXXXXXX",
                "COGNITO_CLIENT_ID": "xxxxxxxxxxxxxxxxxxxxxxxxxx",
            },
        )

        token_authorizer = apigw.TokenAuthorizer(
            self, "MangaAssistAuthorizer",
            handler=authorizer_fn,
            results_cache_ttl=Duration.minutes(5),
            identity_source="method.request.header.Authorization",
        )

        # --- Lambda Functions for GenAI Endpoints ---

        # Chat endpoint (synchronous, optimized for Haiku fast-path)
        chat_fn = _lambda.Function(
            self, "ChatFn",
            runtime=_lambda.Runtime.PYTHON_3_12,
            handler="chat.handler",
            code=_lambda.Code.from_asset("lambda/chat"),
            timeout=Duration.seconds(29),          # API GW max = 29s
            memory_size=1024,
            environment={
                "BEDROCK_MODEL_ID": "anthropic.claude-3-haiku-20240307-v1:0",
                "BEDROCK_FALLBACK_MODEL_ID": "anthropic.claude-3-sonnet-20240229-v1:0",
                "DYNAMODB_SESSIONS_TABLE": "MangaAssist-Sessions",
                "ELASTICACHE_ENDPOINT": "manga-cache.xxxxx.apne1.cache.amazonaws.com",
                "MAX_TOKENS": "1024",
                "TEMPERATURE": "0.3",
            },
            tracing=_lambda.Tracing.ACTIVE,
        )

        # Search endpoint (RAG pipeline via OpenSearch)
        search_fn = _lambda.Function(
            self, "SearchFn",
            runtime=_lambda.Runtime.PYTHON_3_12,
            handler="search.handler",
            code=_lambda.Code.from_asset("lambda/search"),
            timeout=Duration.seconds(29),
            memory_size=1024,
            environment={
                "OPENSEARCH_ENDPOINT": "https://xxxxx.ap-northeast-1.aoss.amazonaws.com",
                "OPENSEARCH_INDEX": "manga-products-vectors",
                "BEDROCK_EMBED_MODEL_ID": "amazon.titan-embed-text-v2:0",
                "BEDROCK_MODEL_ID": "anthropic.claude-3-sonnet-20240229-v1:0",
                "TOP_K": "5",
            },
            tracing=_lambda.Tracing.ACTIVE,
        )

        # Recommend endpoint (product recommendations)
        recommend_fn = _lambda.Function(
            self, "RecommendFn",
            runtime=_lambda.Runtime.PYTHON_3_12,
            handler="recommend.handler",
            code=_lambda.Code.from_asset("lambda/recommend"),
            timeout=Duration.seconds(29),
            memory_size=512,
            environment={
                "DYNAMODB_PRODUCTS_TABLE": "MangaAssist-Products",
                "BEDROCK_MODEL_ID": "anthropic.claude-3-haiku-20240307-v1:0",
                "MAX_RECOMMENDATIONS": "10",
            },
            tracing=_lambda.Tracing.ACTIVE,
        )

        # Grant Bedrock invoke permissions
        bedrock_policy = iam.PolicyStatement(
            actions=["bedrock:InvokeModel", "bedrock:InvokeModelWithResponseStream"],
            resources=["arn:aws:bedrock:ap-northeast-1::foundation-model/*"],
        )
        for fn in [chat_fn, search_fn, recommend_fn]:
            fn.add_to_role_policy(bedrock_policy)

        # --- API Resources and Methods ---

        # /v1 base path
        v1 = api.root.add_resource("v1")

        # /v1/chat  POST
        chat_resource = v1.add_resource("chat")
        chat_resource.add_method(
            "POST",
            apigw.LambdaIntegration(
                chat_fn,
                proxy=True,
                integration_responses=[
                    apigw.IntegrationResponse(status_code="200"),
                    apigw.IntegrationResponse(
                        status_code="504",
                        selection_pattern=".*Task timed out.*",
                    ),
                ],
            ),
            authorizer=token_authorizer,
            authorization_type=apigw.AuthorizationType.CUSTOM,
            api_key_required=True,
            method_responses=[
                apigw.MethodResponse(status_code="200"),
                apigw.MethodResponse(status_code="504"),
            ],
            request_parameters={
                "method.request.header.Authorization": True,
                "method.request.header.X-Api-Key": True,
            },
        )

        # /v1/search  POST
        search_resource = v1.add_resource("search")
        search_resource.add_method(
            "POST",
            apigw.LambdaIntegration(search_fn, proxy=True),
            authorizer=token_authorizer,
            authorization_type=apigw.AuthorizationType.CUSTOM,
            api_key_required=True,
        )

        # /v1/recommend  POST
        recommend_resource = v1.add_resource("recommend")
        recommend_resource.add_method(
            "POST",
            apigw.LambdaIntegration(recommend_fn, proxy=True),
            authorizer=token_authorizer,
            authorization_type=apigw.AuthorizationType.CUSTOM,
            api_key_required=True,
        )

        # --- Associate Usage Plan with API Stage ---
        usage_plan.add_api_stage(
            stage=api.deployment_stage,
            throttle=[
                apigw.ThrottlingPerMethod(
                    method=chat_resource.node.find_child("POST"),
                    throttle=apigw.ThrottleSettings(
                        rate_limit=500,            # /chat gets higher limit
                        burst_limit=1000,
                    ),
                ),
            ],
        )

        # --- Outputs ---
        CfnOutput(self, "ApiUrl", value=api.url, description="API Gateway URL")
        CfnOutput(
            self, "ApiKeyId",
            value=api_key.key_id,
            description="API Key ID (retrieve value from console)",
        )

2. Lambda Webhook Handler with HMAC Verification and Idempotency

"""
Lambda webhook handler for MangaAssist.
Handles inbound webhooks from external services (payment, review, catalog).
Implements HMAC signature verification, replay prevention, and idempotency.
"""

import json
import hashlib
import hmac
import time
import os
import logging
from datetime import datetime, timezone
from typing import Any
from decimal import Decimal

import boto3
from botocore.exceptions import ClientError

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# --- Clients ---
dynamodb = boto3.resource("dynamodb")
eventbridge = boto3.client("events")
secrets_manager = boto3.client("secretsmanager")

# --- Configuration ---
IDEMPOTENCY_TABLE = os.environ["IDEMPOTENCY_TABLE"]     # DynamoDB table
EVENT_BUS_NAME = os.environ["EVENT_BUS_NAME"]            # EventBridge bus
WEBHOOK_SECRET_ARN = os.environ["WEBHOOK_SECRET_ARN"]    # Secrets Manager ARN
MAX_TIMESTAMP_DRIFT_SECONDS = 300                        # 5-minute replay window
IDEMPOTENCY_TTL_HOURS = 24                               # 24h dedup window

idempotency_table = dynamodb.Table(IDEMPOTENCY_TABLE)


# ============================================================
# Webhook Signature Verification
# ============================================================

def get_webhook_secret(source: str) -> str:
    """
    Retrieve webhook signing secret from Secrets Manager.
    Secrets are stored as JSON: {"stripe": "whsec_...", "github": "ghsec_..."}.
    """
    response = secrets_manager.get_secret_value(SecretId=WEBHOOK_SECRET_ARN)
    secrets = json.loads(response["SecretString"])
    secret = secrets.get(source)
    if not secret:
        raise ValueError(f"No webhook secret configured for source: {source}")
    return secret


def verify_hmac_signature(
    payload: str,
    signature: str,
    secret: str,
    algorithm: str = "sha256",
) -> bool:
    """
    Verify HMAC signature of webhook payload.
    Supports SHA-256 (Stripe, generic) and SHA-1 (GitHub legacy).
    Uses constant-time comparison to prevent timing attacks.
    """
    hash_func = hashlib.sha256 if algorithm == "sha256" else hashlib.sha1
    expected = hmac.new(
        key=secret.encode("utf-8"),
        msg=payload.encode("utf-8"),
        digestmod=hash_func,
    ).hexdigest()

    # Constant-time comparison prevents timing side-channel attacks
    return hmac.compare_digest(expected, signature)


def verify_timestamp(timestamp_str: str) -> bool:
    """
    Verify webhook timestamp is within acceptable drift window.
    Prevents replay attacks using stale webhook deliveries.
    """
    try:
        webhook_ts = int(timestamp_str)
    except (ValueError, TypeError):
        logger.warning("Invalid timestamp format: %s", timestamp_str)
        return False

    current_ts = int(time.time())
    drift = abs(current_ts - webhook_ts)

    if drift > MAX_TIMESTAMP_DRIFT_SECONDS:
        logger.warning(
            "Timestamp drift too large: %d seconds (max %d)",
            drift,
            MAX_TIMESTAMP_DRIFT_SECONDS,
        )
        return False

    return True


# ============================================================
# Idempotency via DynamoDB Conditional Writes
# ============================================================

def check_and_claim_idempotency(idempotency_key: str) -> bool:
    """
    Attempt to claim an idempotency key using DynamoDB conditional write.
    Returns True if this is the first time we've seen this key (claimed).
    Returns False if the key was already processed (duplicate delivery).

    TTL-based cleanup: entries expire after IDEMPOTENCY_TTL_HOURS.
    """
    ttl = int(time.time()) + (IDEMPOTENCY_TTL_HOURS * 3600)

    try:
        idempotency_table.put_item(
            Item={
                "idempotency_key": idempotency_key,
                "claimed_at": datetime.now(timezone.utc).isoformat(),
                "status": "processing",
                "ttl": ttl,
            },
            ConditionExpression="attribute_not_exists(idempotency_key)",
        )
        logger.info("Claimed idempotency key: %s", idempotency_key)
        return True

    except ClientError as e:
        if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
            logger.info("Duplicate webhook delivery detected: %s", idempotency_key)
            return False
        raise


def mark_idempotency_complete(idempotency_key: str, result: dict) -> None:
    """Mark an idempotency entry as successfully processed."""
    idempotency_table.update_item(
        Key={"idempotency_key": idempotency_key},
        UpdateExpression="SET #s = :status, completed_at = :ts, result_summary = :r",
        ExpressionAttributeNames={"#s": "status"},
        ExpressionAttributeValues={
            ":status": "completed",
            ":ts": datetime.now(timezone.utc).isoformat(),
            ":r": json.dumps(result, default=str),
        },
    )


def mark_idempotency_failed(idempotency_key: str, error: str) -> None:
    """Mark an idempotency entry as failed (allows retry)."""
    idempotency_table.delete_item(Key={"idempotency_key": idempotency_key})
    logger.warning("Removed failed idempotency key for retry: %s (%s)", idempotency_key, error)


# ============================================================
# Webhook Source Handlers
# ============================================================

def handle_stripe_payment(payload: dict) -> dict:
    """
    Process Stripe payment webhook.
    On successful payment, trigger AI-generated order confirmation via EventBridge.
    """
    event_type = payload.get("type", "")
    data = payload.get("data", {}).get("object", {})

    if event_type == "checkout.session.completed":
        order_id = data.get("metadata", {}).get("order_id", "unknown")
        customer_email = data.get("customer_email", "")
        amount = data.get("amount_total", 0) / 100  # cents to dollars
        currency = data.get("currency", "jpy").upper()

        # Publish to EventBridge for AI-generated order confirmation
        event_detail = {
            "order_id": order_id,
            "customer_email": customer_email,
            "amount": str(amount),
            "currency": currency,
            "items": data.get("metadata", {}).get("items", "[]"),
            "action": "generate_confirmation",
        }

        eventbridge.put_events(
            Entries=[{
                "Source": "mangaassist.payments",
                "DetailType": "PaymentCompleted",
                "Detail": json.dumps(event_detail),
                "EventBusName": EVENT_BUS_NAME,
            }],
        )

        logger.info("Payment event published for order: %s", order_id)
        return {"processed": True, "order_id": order_id, "action": "confirmation_triggered"}

    elif event_type == "charge.refunded":
        # Handle refund notification
        charge_id = data.get("id", "unknown")
        eventbridge.put_events(
            Entries=[{
                "Source": "mangaassist.payments",
                "DetailType": "RefundProcessed",
                "Detail": json.dumps({"charge_id": charge_id, "amount": str(data.get("amount_refunded", 0))}),
                "EventBusName": EVENT_BUS_NAME,
            }],
        )
        return {"processed": True, "charge_id": charge_id, "action": "refund_notification"}

    logger.info("Unhandled Stripe event type: %s", event_type)
    return {"processed": False, "reason": f"unhandled_event_type: {event_type}"}


def handle_review_submission(payload: dict) -> dict:
    """
    Process customer review submission webhook.
    Triggers AI sentiment analysis and moderation via EventBridge.
    """
    review_id = payload.get("review_id", "unknown")
    product_id = payload.get("product_id", "")
    review_text = payload.get("text", "")
    rating = payload.get("rating", 0)
    customer_id = payload.get("customer_id", "")

    if not review_text or not product_id:
        return {"processed": False, "reason": "missing_required_fields"}

    # Publish to EventBridge for AI moderation + sentiment analysis
    event_detail = {
        "review_id": review_id,
        "product_id": product_id,
        "review_text": review_text[:2000],  # Truncate to avoid oversized events
        "rating": rating,
        "customer_id": customer_id,
        "actions": ["sentiment_analysis", "content_moderation", "response_generation"],
    }

    eventbridge.put_events(
        Entries=[{
            "Source": "mangaassist.reviews",
            "DetailType": "ReviewSubmitted",
            "Detail": json.dumps(event_detail),
            "EventBusName": EVENT_BUS_NAME,
        }],
    )

    logger.info("Review event published for review_id=%s product_id=%s", review_id, product_id)
    return {"processed": True, "review_id": review_id, "actions_triggered": 3}


def handle_catalog_update(payload: dict) -> dict:
    """
    Process manga catalog update webhook from publisher API.
    Triggers embedding regeneration and search index update.
    """
    update_type = payload.get("type", "")          # new_release | price_change | discontinued
    items = payload.get("items", [])

    if not items:
        return {"processed": False, "reason": "empty_items_list"}

    # Batch publish events (EventBridge supports up to 10 entries per call)
    entries = []
    for item in items[:10]:  # Cap at 10 per batch
        entries.append({
            "Source": "mangaassist.catalog",
            "DetailType": f"CatalogUpdate.{update_type}",
            "Detail": json.dumps({
                "product_id": item.get("id", ""),
                "title": item.get("title", ""),
                "update_type": update_type,
                "data": item,
                "actions": ["regenerate_embedding", "update_search_index", "notify_subscribers"],
            }),
            "EventBusName": EVENT_BUS_NAME,
        })

    response = eventbridge.put_events(Entries=entries)
    failed_count = response.get("FailedEntryCount", 0)

    if failed_count > 0:
        logger.error("Failed to publish %d/%d catalog events", failed_count, len(entries))

    return {
        "processed": True,
        "update_type": update_type,
        "total_items": len(items),
        "published": len(entries) - failed_count,
        "failed": failed_count,
    }


# ============================================================
# Source Routing Table
# ============================================================

WEBHOOK_HANDLERS = {
    "stripe": {
        "handler": handle_stripe_payment,
        "signature_header": "Stripe-Signature",
        "timestamp_header": None,            # Stripe embeds timestamp in signature
        "algorithm": "sha256",
        "extract_idempotency_key": lambda e: e.get("body", {}).get("id", ""),
    },
    "reviews": {
        "handler": handle_review_submission,
        "signature_header": "X-Webhook-Signature",
        "timestamp_header": "X-Webhook-Timestamp",
        "algorithm": "sha256",
        "extract_idempotency_key": lambda e: e.get("body", {}).get("review_id", ""),
    },
    "catalog": {
        "handler": handle_catalog_update,
        "signature_header": "X-Catalog-Signature",
        "timestamp_header": "X-Catalog-Timestamp",
        "algorithm": "sha256",
        "extract_idempotency_key": lambda e: e.get("body", {}).get("batch_id", ""),
    },
}


# ============================================================
# Lambda Entry Point
# ============================================================

def handler(event: dict, context: Any) -> dict:
    """
    Main Lambda handler for webhook processing.
    Routes webhooks by source, verifies signatures, enforces idempotency.

    Invoked via Lambda Function URL:
      POST https://<function-url>.lambda-url.ap-northeast-1.on.aws/{source}

    Path parameter 'source' determines which handler processes the webhook.
    """
    logger.info("Webhook event received: requestId=%s", context.aws_request_id)

    try:
        # Extract source from path
        path = event.get("rawPath", event.get("path", ""))
        source = path.strip("/").split("/")[-1] if path else ""

        if source not in WEBHOOK_HANDLERS:
            logger.warning("Unknown webhook source: %s", source)
            return _response(404, {"error": f"Unknown webhook source: {source}"})

        config = WEBHOOK_HANDLERS[source]
        headers = event.get("headers", {})
        raw_body = event.get("body", "")

        # --- Step 1: Verify HMAC signature ---
        signature = headers.get(config["signature_header"].lower(), "")
        if not signature:
            logger.warning("Missing signature header for source: %s", source)
            return _response(401, {"error": "Missing webhook signature"})

        secret = get_webhook_secret(source)
        if not verify_hmac_signature(raw_body, signature, secret, config["algorithm"]):
            logger.warning("Invalid signature for source: %s", source)
            return _response(403, {"error": "Invalid webhook signature"})

        # --- Step 2: Verify timestamp (replay prevention) ---
        ts_header = config.get("timestamp_header")
        if ts_header:
            timestamp = headers.get(ts_header.lower(), "")
            if not verify_timestamp(timestamp):
                return _response(403, {"error": "Webhook timestamp outside acceptable window"})

        # --- Step 3: Parse body ---
        try:
            payload = json.loads(raw_body) if isinstance(raw_body, str) else raw_body
        except json.JSONDecodeError:
            return _response(400, {"error": "Invalid JSON payload"})

        # --- Step 4: Idempotency check ---
        idempotency_key = f"{source}:{config['extract_idempotency_key'](payload)}"
        if not idempotency_key or idempotency_key == f"{source}:":
            idempotency_key = f"{source}:{context.aws_request_id}"

        if not check_and_claim_idempotency(idempotency_key):
            return _response(200, {"status": "already_processed", "idempotency_key": idempotency_key})

        # --- Step 5: Process webhook ---
        try:
            result = config["handler"](payload)
            mark_idempotency_complete(idempotency_key, result)
            logger.info("Webhook processed successfully: source=%s key=%s", source, idempotency_key)
            return _response(200, {"status": "processed", **result})

        except Exception as processing_error:
            mark_idempotency_failed(idempotency_key, str(processing_error))
            raise

    except Exception as e:
        logger.exception("Webhook processing failed: %s", str(e))
        return _response(500, {"error": "Internal processing error"})


def _response(status_code: int, body: dict) -> dict:
    """Build Lambda Function URL / API Gateway proxy response."""
    return {
        "statusCode": status_code,
        "headers": {"Content-Type": "application/json"},
        "body": json.dumps(body, default=str),
    }

3. EventBridge Rule Configuration and AI Event Publishing

"""
EventBridge configuration for MangaAssist AI event-driven integration.
Defines event bus, rules, targets, and event publishing utilities.
CDK infrastructure + runtime event publisher.
"""

import json
import os
import time
import logging
from datetime import datetime, timezone
from typing import Optional

import boto3
from aws_cdk import (
    Stack,
    Duration,
    RemovalPolicy,
    aws_events as events,
    aws_events_targets as targets,
    aws_lambda as _lambda,
    aws_sqs as sqs,
    aws_iam as iam,
    aws_logs as logs,
)
from constructs import Construct

logger = logging.getLogger()
logger.setLevel(logging.INFO)


# ============================================================
# CDK: EventBridge Infrastructure
# ============================================================

class MangaAssistEventBridgeStack(Stack):
    """CDK Stack for MangaAssist EventBridge event-driven AI integration."""

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # --- Custom Event Bus ---
        ai_event_bus = events.EventBus(
            self, "MangaAssistAIBus",
            event_bus_name="MangaAssist-AI-Events",
        )

        # Event archive for replay/debugging (30-day retention)
        events.CfnArchive(
            self, "AIEventArchive",
            source_arn=ai_event_bus.event_bus_arn,
            archive_name="manga-assist-ai-archive",
            retention_days=30,
            event_pattern={
                "source": [{"prefix": "mangaassist."}],
            },
        )

        # --- Dead Letter Queue for failed deliveries ---
        event_dlq = sqs.Queue(
            self, "EventDLQ",
            queue_name="MangaAssist-EventBridge-DLQ",
            retention_period=Duration.days(14),
            visibility_timeout=Duration.seconds(300),
        )

        # --- Lambda: AI Inference Post-Processor ---
        inference_processor_fn = _lambda.Function(
            self, "InferenceProcessorFn",
            runtime=_lambda.Runtime.PYTHON_3_12,
            handler="inference_processor.handler",
            code=_lambda.Code.from_asset("lambda/inference_processor"),
            timeout=Duration.seconds(60),
            memory_size=512,
            environment={
                "DYNAMODB_TABLE": "MangaAssist-Sessions",
                "NOTIFICATION_TOPIC_ARN": "arn:aws:sns:ap-northeast-1:123456789012:MangaAssist-Notifications",
            },
        )

        # --- Lambda: Content Moderation Handler ---
        moderation_fn = _lambda.Function(
            self, "ModerationFn",
            runtime=_lambda.Runtime.PYTHON_3_12,
            handler="moderation.handler",
            code=_lambda.Code.from_asset("lambda/moderation"),
            timeout=Duration.seconds(30),
            memory_size=512,
            environment={
                "BEDROCK_MODEL_ID": "anthropic.claude-3-haiku-20240307-v1:0",
                "MODERATION_THRESHOLD": "0.8",
                "QUARANTINE_TABLE": "MangaAssist-Quarantined",
            },
        )

        # --- Lambda: Embedding Regeneration Handler ---
        embedding_fn = _lambda.Function(
            self, "EmbeddingFn",
            runtime=_lambda.Runtime.PYTHON_3_12,
            handler="embedding.handler",
            code=_lambda.Code.from_asset("lambda/embedding"),
            timeout=Duration.minutes(5),
            memory_size=1024,
            environment={
                "OPENSEARCH_ENDPOINT": "https://xxxxx.ap-northeast-1.aoss.amazonaws.com",
                "BEDROCK_EMBED_MODEL_ID": "amazon.titan-embed-text-v2:0",
                "OPENSEARCH_INDEX": "manga-products-vectors",
            },
        )

        # --- Lambda: Analytics Event Processor ---
        analytics_fn = _lambda.Function(
            self, "AnalyticsFn",
            runtime=_lambda.Runtime.PYTHON_3_12,
            handler="analytics.handler",
            code=_lambda.Code.from_asset("lambda/analytics"),
            timeout=Duration.seconds(30),
            memory_size=256,
            environment={
                "FIREHOSE_STREAM_NAME": "MangaAssist-AI-Analytics",
            },
        )

        # ============================================================
        # EventBridge Rules
        # ============================================================

        # Rule 1: AI Inference Completed -> Post-processing
        events.Rule(
            self, "InferenceCompletedRule",
            rule_name="MangaAssist-InferenceCompleted",
            event_bus=ai_event_bus,
            description="Route completed AI inferences to post-processor",
            event_pattern=events.EventPattern(
                source=["mangaassist.ai"],
                detail_type=["InferenceCompleted"],
                detail={
                    "status": ["success"],
                    "model_id": [{"prefix": "anthropic.claude"}],
                },
            ),
            targets=[
                targets.LambdaFunction(
                    inference_processor_fn,
                    dead_letter_queue=event_dlq,
                    retry_attempts=2,
                    max_event_age=Duration.hours(1),
                ),
                targets.LambdaFunction(
                    analytics_fn,
                    dead_letter_queue=event_dlq,
                    retry_attempts=2,
                ),
            ],
        )

        # Rule 2: Content Moderation Flagged -> Quarantine + Alert
        events.Rule(
            self, "ModerationFlaggedRule",
            rule_name="MangaAssist-ModerationFlagged",
            event_bus=ai_event_bus,
            description="Route flagged content to moderation handler",
            event_pattern=events.EventPattern(
                source=["mangaassist.ai"],
                detail_type=["ModerationFlagged"],
                detail={
                    "severity": ["high", "critical"],
                },
            ),
            targets=[
                targets.LambdaFunction(
                    moderation_fn,
                    dead_letter_queue=event_dlq,
                    retry_attempts=3,
                ),
            ],
        )

        # Rule 3: Catalog Update -> Embedding Regeneration
        events.Rule(
            self, "CatalogUpdateRule",
            rule_name="MangaAssist-CatalogUpdate",
            event_bus=ai_event_bus,
            description="Regenerate embeddings on catalog changes",
            event_pattern=events.EventPattern(
                source=["mangaassist.catalog"],
                detail_type=[
                    "CatalogUpdate.new_release",
                    "CatalogUpdate.price_change",
                    "CatalogUpdate.discontinued",
                ],
            ),
            targets=[
                targets.LambdaFunction(
                    embedding_fn,
                    dead_letter_queue=event_dlq,
                    retry_attempts=2,
                    max_event_age=Duration.hours(6),
                ),
            ],
        )

        # Rule 4: Payment Completed -> AI Confirmation Generator
        events.Rule(
            self, "PaymentCompletedRule",
            rule_name="MangaAssist-PaymentCompleted",
            event_bus=ai_event_bus,
            description="Generate AI order confirmations after payment",
            event_pattern=events.EventPattern(
                source=["mangaassist.payments"],
                detail_type=["PaymentCompleted"],
            ),
            targets=[
                targets.LambdaFunction(
                    inference_processor_fn,
                    dead_letter_queue=event_dlq,
                    retry_attempts=3,
                ),
            ],
        )

        # Rule 5: Review Submitted -> Sentiment Analysis + Moderation
        events.Rule(
            self, "ReviewSubmittedRule",
            rule_name="MangaAssist-ReviewSubmitted",
            event_bus=ai_event_bus,
            description="Analyze and moderate customer reviews with AI",
            event_pattern=events.EventPattern(
                source=["mangaassist.reviews"],
                detail_type=["ReviewSubmitted"],
            ),
            targets=[
                targets.LambdaFunction(moderation_fn, dead_letter_queue=event_dlq),
                targets.LambdaFunction(analytics_fn, dead_letter_queue=event_dlq),
            ],
        )

        # Rule 6: Catch-all for analytics (every mangaassist event)
        events.Rule(
            self, "AllEventsAnalyticsRule",
            rule_name="MangaAssist-AllEvents-Analytics",
            event_bus=ai_event_bus,
            description="Send all MangaAssist events to analytics pipeline",
            event_pattern=events.EventPattern(
                source=[{"prefix": "mangaassist."}],
            ),
            targets=[
                targets.LambdaFunction(analytics_fn, dead_letter_queue=event_dlq),
            ],
        )


# ============================================================
# Runtime: Event Publisher Utility
# ============================================================

class MangaAssistEventPublisher:
    """
    Utility class for publishing AI events to EventBridge.
    Used by ECS Fargate orchestrator and Lambda functions.
    """

    def __init__(self, event_bus_name: Optional[str] = None):
        self.client = boto3.client("events")
        self.event_bus_name = event_bus_name or os.environ.get(
            "EVENT_BUS_NAME", "MangaAssist-AI-Events"
        )

    def publish_inference_completed(
        self,
        session_id: str,
        model_id: str,
        input_tokens: int,
        output_tokens: int,
        latency_ms: int,
        response_summary: str,
    ) -> dict:
        """Publish an InferenceCompleted event after FM invocation."""
        detail = {
            "session_id": session_id,
            "model_id": model_id,
            "status": "success",
            "metrics": {
                "input_tokens": input_tokens,
                "output_tokens": output_tokens,
                "latency_ms": latency_ms,
                "estimated_cost_usd": self._estimate_cost(model_id, input_tokens, output_tokens),
            },
            "response_summary": response_summary[:500],
            "timestamp": datetime.now(timezone.utc).isoformat(),
        }

        return self._put_event("mangaassist.ai", "InferenceCompleted", detail)

    def publish_moderation_flagged(
        self,
        session_id: str,
        content: str,
        severity: str,
        categories: list[str],
        model_id: str,
    ) -> dict:
        """Publish a ModerationFlagged event when content fails moderation."""
        detail = {
            "session_id": session_id,
            "content_hash": hashlib.sha256(content.encode()).hexdigest(),
            "content_preview": content[:200],
            "severity": severity,          # low | medium | high | critical
            "flagged_categories": categories,
            "model_id": model_id,
            "timestamp": datetime.now(timezone.utc).isoformat(),
        }

        return self._put_event("mangaassist.ai", "ModerationFlagged", detail)

    def publish_recommendation_generated(
        self,
        customer_id: str,
        product_ids: list[str],
        strategy: str,
        model_id: str,
    ) -> dict:
        """Publish a RecommendationGenerated event."""
        detail = {
            "customer_id": customer_id,
            "recommended_product_ids": product_ids,
            "strategy": strategy,
            "model_id": model_id,
            "count": len(product_ids),
            "timestamp": datetime.now(timezone.utc).isoformat(),
        }

        return self._put_event("mangaassist.ai", "RecommendationGenerated", detail)

    def _put_event(self, source: str, detail_type: str, detail: dict) -> dict:
        """Put a single event to EventBridge."""
        try:
            response = self.client.put_events(
                Entries=[{
                    "Source": source,
                    "DetailType": detail_type,
                    "Detail": json.dumps(detail, default=str),
                    "EventBusName": self.event_bus_name,
                }],
            )

            failed = response.get("FailedEntryCount", 0)
            if failed > 0:
                logger.error(
                    "Failed to publish event: source=%s type=%s errors=%s",
                    source,
                    detail_type,
                    response.get("Entries", []),
                )

            return response

        except Exception as e:
            logger.exception("EventBridge publish failed: %s", str(e))
            raise

    @staticmethod
    def _estimate_cost(model_id: str, input_tokens: int, output_tokens: int) -> str:
        """
        Estimate cost for Bedrock Claude 3 invocation.
        Sonnet: $3/$15 per 1M tokens (input/output)
        Haiku:  $0.25/$1.25 per 1M tokens (input/output)
        """
        if "sonnet" in model_id.lower():
            cost = (input_tokens * 3.0 / 1_000_000) + (output_tokens * 15.0 / 1_000_000)
        elif "haiku" in model_id.lower():
            cost = (input_tokens * 0.25 / 1_000_000) + (output_tokens * 1.25 / 1_000_000)
        else:
            cost = 0.0

        return f"{cost:.6f}"


# ============================================================
# Feature Flag Integration (AppConfig)
# ============================================================

import hashlib


class FeatureFlagController:
    """
    AWS AppConfig feature flag controller for AI feature rollout.
    Controls which AI features are active, model selection, and rollout percentage.
    """

    def __init__(self):
        self.appconfig_client = boto3.client("appconfigdata")
        self.app_id = os.environ.get("APPCONFIG_APP_ID", "")
        self.env_id = os.environ.get("APPCONFIG_ENV_ID", "")
        self.config_profile_id = os.environ.get("APPCONFIG_PROFILE_ID", "")
        self._config_cache: dict = {}
        self._cache_token: Optional[str] = None
        self._cache_expiry: float = 0

    def get_flags(self) -> dict:
        """
        Retrieve current feature flags from AppConfig.
        Caches locally for 30 seconds to reduce API calls.
        """
        now = time.time()
        if self._config_cache and now < self._cache_expiry:
            return self._config_cache

        try:
            if not self._cache_token:
                session_response = self.appconfig_client.start_configuration_session(
                    ApplicationIdentifier=self.app_id,
                    EnvironmentIdentifier=self.env_id,
                    ConfigurationProfileIdentifier=self.config_profile_id,
                    RequiredMinimumPollIntervalInSeconds=30,
                )
                self._cache_token = session_response["InitialConfigurationToken"]

            config_response = self.appconfig_client.get_latest_configuration(
                ConfigurationToken=self._cache_token,
            )
            self._cache_token = config_response["NextPollConfigurationToken"]

            content = config_response["Configuration"].read()
            if content:
                self._config_cache = json.loads(content)

            self._cache_expiry = now + 30
            return self._config_cache

        except Exception as e:
            logger.warning("AppConfig fetch failed, using cached config: %s", str(e))
            return self._config_cache

    def is_enabled(self, flag_name: str, user_id: str = "") -> bool:
        """
        Check if a feature flag is enabled for a given user.
        Supports percentage-based rollout using consistent hashing.
        """
        flags = self.get_flags()
        flag = flags.get(flag_name, {})

        if not flag.get("enabled", False):
            return False

        rollout_pct = flag.get("rollout_percentage", 100)
        if rollout_pct >= 100:
            return True
        if rollout_pct <= 0:
            return False

        # Consistent hash: same user always gets same result for same flag
        hash_input = f"{flag_name}:{user_id}".encode()
        hash_value = int(hashlib.md5(hash_input).hexdigest(), 16) % 100
        return hash_value < rollout_pct

    def get_model_id(self, flag_name: str, user_id: str = "") -> str:
        """
        Get the model ID configured for a feature flag.
        Falls back to Haiku if flag is disabled or missing.
        """
        if not self.is_enabled(flag_name, user_id):
            return "anthropic.claude-3-haiku-20240307-v1:0"

        flags = self.get_flags()
        flag = flags.get(flag_name, {})
        return flag.get("model_id", "anthropic.claude-3-haiku-20240307-v1:0")

Key Takeaways for the AIP-C01 Exam

Concept What to Remember
API Gateway integration types Lambda proxy (most common), HTTP proxy, AWS service, VPC Link
API Gateway timeout limit 29 seconds max for REST API; use WebSocket for streaming
Lambda Function URL Simpler than API Gateway for single-purpose webhooks; built-in HTTPS
Webhook security HMAC signature verification + timestamp validation + idempotency
Idempotency pattern DynamoDB conditional writes with TTL-based cleanup
EventBridge event size Max 256 KB per event; truncate FM responses before publishing
EventBridge targets Up to 5 targets per rule; each gets a copy of the event
EventBridge retry Built-in retry with exponential backoff; DLQ for permanent failures
Feature flags AppConfig for AI rollout; consistent hashing for user-level percentage
Cost estimation Sonnet $3/$15 per 1M input/output; Haiku $0.25/$1.25 per 1M
Fargate orchestrator Central service for routing, session management, prompt engineering

Quick Reference: Integration Selection Guide

Decision: Which integration pattern for your GenAI use case?

Need real-time response < 3s?
  YES --> API Gateway REST + Lambda + Bedrock (sync path)

Need token-by-token streaming?
  YES --> API Gateway WebSocket + ECS Fargate + Bedrock streaming

Processing can be deferred?
  YES --> EventBridge + SQS + Lambda (async path)

Receiving external system callbacks?
  YES --> Lambda Function URL or API Gateway + webhook handler

Need complex multi-step AI workflow?
  YES --> EventBridge + Step Functions (orchestrated async)

Need to fan out AI results to multiple consumers?
  YES --> EventBridge rules with multiple targets

Need gradual AI feature rollout?
  YES --> AppConfig feature flags + consistent hashing