LOCAL PREVIEW View on GitHub

Skill 2.3.2 --- Microservice Integration and Webhook Handler Patterns

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Field Value
Certification AWS Certified AI Practitioner (AIP-C01)
Domain 2 --- Development and Implementation of GenAI Applications
Task 2.3 --- Describe methods to integrate foundation models into applications
Skill 2.3.2 --- Develop integrated AI capabilities to enhance existing applications with GenAI functionality (API Gateway for microservice integrations, Lambda for webhook handlers, EventBridge for event-driven integrations)

Mind Map --- Microservice, Webhook, and Event-Driven GenAI Integration

mindmap
  root((Skill 2.3.2<br/>Microservice & Webhook<br/>FM Integration))
    API Gateway Microservice Patterns
      REST Endpoints for FM Invocation
        Synchronous chat via Haiku fast-path
        RAG search via OpenSearch + Sonnet
        Product recommendation engine
        29-second hard timeout ceiling
      WebSocket Streaming
        Token-by-token Bedrock streaming
        Connection lifecycle management
        Binary frame support for images
      Integration Types
        Lambda proxy --- most common for FM
        HTTP proxy to ECS Fargate
        AWS service direct --- Bedrock invoke
        VPC Link for private microservices
      Traffic Management
        Usage plans and API key throttling
        Stage variables for environment config
        Custom domain with Route 53
        WAF integration for abuse protection
    Lambda Webhook Handlers
      Inbound Webhook Processing
        Stripe payment callbacks
        Publisher catalog sync
        Customer review submissions
        Third-party manga API updates
      Security Patterns
        HMAC SHA-256 signature verification
        Timestamp drift replay prevention
        IP allowlist enforcement
        Secret rotation via Secrets Manager
      Idempotency Enforcement
        DynamoDB conditional writes
        TTL-based dedup window cleanup
        Idempotency key extraction per source
      Outbound Webhook Dispatch
        AI-generated summaries to Slack
        Order confirmations to CRM
        Inventory alerts to suppliers
    EventBridge Event-Driven Triggers
      AI Event Schemas
        MangaAssist.AI.InferenceCompleted
        MangaAssist.AI.ModerationFlagged
        MangaAssist.Recommendation.Generated
        MangaAssist.Chat.MessageReceived
      Routing Rules
        Content-based detail-type matching
        Input transformation for targets
        Fan-out to analytics + action targets
        Cross-account event forwarding
      Reliability
        Dead letter queues for failures
        Event archive 30-day replay
        Schema registry enforcement
        Retry with exponential backoff
    Sidecar Pattern for GenAI
      Envoy Proxy with App Mesh
        Traffic splitting for model A/B tests
        Circuit breaker on Bedrock calls
        Mutual TLS between services
      Cloud Map Service Discovery
        Health checks for AI endpoints
        Weighted routing for gradual rollout
      Observability Sidecar
        X-Ray distributed tracing
        Per-endpoint latency metrics
        Structured access logging

Architecture Diagram --- Microservice Integration Topology

    MangaAssist Microservice + Webhook + Event Integration Topology
    ========================================================================================

    EXTERNAL SYSTEMS                         INTERNAL MICROSERVICES
    ==================                       ======================

    [Stripe Payments]                        [Mobile / Web Client]
          |                                          |
          | POST /webhooks/stripe                    | HTTPS / WSS
          v                                          v
    +-------------------+                  +--------------------+
    | Lambda Function   |                  | API Gateway        |
    | URL: Webhook      |                  | (REST + WebSocket) |
    | Handler           |                  +--------+-----------+
    +--------+----------+                           |
             |                                      |
             | EventBridge                          | Lambda Proxy / HTTP Proxy
             v                                      v
    +-------------------+                  +--------------------+
    | EventBridge       |<----- events --- | ECS Fargate        |
    | (AI Event Bus)    |                  | Orchestrator       |
    +--------+----------+                  | +----------------+ |
             |                             | | App Container  | |
             +----------+---------+        | | - routing      | |
             |          |         |        | | - sessions     | |
             v          v         v        | | - prompts      | |
    +----------+ +----------+ +--------+   | +----------------+ |
    | Lambda:  | | Lambda:  | | SQS:   |   | | Sidecar:       | |
    | Inference| | Content  | | Batch  |   | | Envoy Proxy    | |
    | Post-    | | Moder-   | | Queue  |   | | - circuit break| |
    | Processor| | ation    | |        |   | | - mTLS         | |
    +----------+ +----------+ +--------+   | | - tracing      | |
                                           | +----------------+ |
    [Publisher Catalog API]                +--------+-----------+
          |                                         |
          | POST /webhooks/catalog        +---------+---------+
          v                               |         |         |
    +-------------------+          +------+--+ +----+----+ +--+-------+
    | Lambda Function   |          | Bedrock | | Open-   | | DynamoDB |
    | URL: Catalog      |          | Claude 3| | Search  | | Sessions |
    | Sync Handler      |          | Sonnet/ | | Server- | | Products |
    +-------------------+          | Haiku   | | less    | | Orders   |
                                   +---------+ +---------+ +----------+
    [Customer Reviews]
          |                        +---------------------+
          | POST /webhooks/reviews | ElastiCache Redis   |
          v                        | - Response cache    |
    +-------------------+          | - Session cache     |
    | Lambda Function   |          | - Rate limit state  |
    | URL: Review       |          +---------------------+
    | Sentiment Handler |
    +-------------------+

    ========================================================================================
    Integration Flows:

    [Sync FM]     Client -> API GW REST -> Lambda -> Fargate -> Bedrock -> Response (< 3s)
    [Stream FM]   Client -> API GW WS -> Lambda -> Fargate -> Bedrock Stream -> WS push
    [Webhook In]  External -> Lambda URL -> HMAC verify -> EventBridge -> Target Lambda
    [Event Fan]   EventBridge rule -> [Lambda A, Lambda B, SQS C] (parallel fan-out)
    [Sidecar]     Fargate App -> Envoy Sidecar -> Bedrock (circuit-broken, traced)

1. API Gateway Microservice Integration Patterns for FM

1.1 REST API Direct-to-Bedrock Integration

API Gateway can invoke Bedrock directly without Lambda using AWS service integration. This eliminates cold-start latency but trades off request/response transformation flexibility.

Pattern: API Gateway -> AWS Service Integration -> Bedrock InvokeModel

Pros:
- Zero Lambda cold start
- Lower cost (no Lambda execution charges)
- Simpler architecture

Cons:
- Limited request transformation (VTL templates only)
- No complex orchestration logic
- No session state management
- 29-second timeout still applies

When to use:
- Simple single-turn inference (no session context)
- Internal microservice-to-FM calls where orchestration lives elsewhere
- Cost-optimized batch scenarios

1.2 Lambda Proxy Integration for FM (Primary Pattern)

The dominant pattern for MangaAssist: API Gateway delegates to Lambda, which orchestrates session lookup, prompt assembly, Bedrock invocation, and response formatting.

Flow: Client -> API GW -> Lambda -> [Redis cache check] -> [DynamoDB session] -> Bedrock -> Response

Key design decisions:
- Lambda timeout = 29s (matches API GW hard limit)
- Memory = 1024 MB (more memory = more CPU = faster Bedrock SDK calls)
- Provisioned concurrency for /chat endpoint (latency-critical)
- X-Ray tracing enabled for end-to-end visibility

1.3 HTTP Proxy to ECS Fargate

For long-running or stateful operations where Lambda's execution model is too restrictive, API Gateway routes through a VPC Link to ECS Fargate.

Flow: Client -> API GW -> VPC Link -> NLB -> ECS Fargate -> Bedrock

When to use:
- WebSocket connection management (persistent connections)
- Complex multi-step reasoning that may exceed 29s
- Stateful conversation management with in-memory context
- Connection pooling to downstream services

1.4 Request Mapping Templates for FM Payloads

API Gateway VTL (Velocity Template Language) templates can transform client requests into Bedrock-compatible payloads without Lambda.

## Example: VTL mapping template transforming a simple chat request
## into a Claude 3 Messages API payload

#set($body = $util.parseJson($input.body))
{
  "anthropic_version": "bedrock-2023-05-31",
  "max_tokens": 1024,
  "messages": [
    {
      "role": "user",
      "content": "$util.escapeJavaScript($body.message)"
    }
  ],
  "temperature": 0.3
}

2. Lambda Webhook Handlers for External System Callbacks

2.1 Webhook Handler Architecture

Webhooks are HTTP callbacks from external systems that trigger FM processing. MangaAssist receives webhooks from Stripe (payments), publisher APIs (catalog), and review platforms (customer feedback).

Webhook Security Layers:
========================

Layer 1: Network       -> WAF rules, IP allowlist (CloudFront or API GW)
Layer 2: Authentication -> HMAC signature verification (per-source secret)
Layer 3: Freshness     -> Timestamp validation (5-minute drift window)
Layer 4: Deduplication -> Idempotency key (DynamoDB conditional write)
Layer 5: Authorization -> Source-specific payload validation

2.2 Lambda Function URL vs API Gateway for Webhooks

+-----------------------------+--------------------+---------------------+
| Criterion                   | Lambda Function URL| API Gateway         |
+-----------------------------+--------------------+---------------------+
| Setup complexity            | Minimal            | Moderate            |
| Cost                        | Free (Lambda only) | Per-request charge  |
| Custom domain               | Not natively       | Yes (custom domain) |
| WAF integration             | Via CloudFront     | Native              |
| Request validation          | In Lambda code     | Request validators  |
| Throttling                  | Reserved concur.   | Usage plans + keys  |
| Multiple routes             | Single endpoint    | Multiple resources  |
| Best for                    | Single-source hook | Multi-source hooks  |
+-----------------------------+--------------------+---------------------+

MangaAssist decision: Lambda Function URL per webhook source
- Stripe  -> https://stripe-hook.lambda-url.ap-northeast-1.on.aws/
- Reviews -> https://review-hook.lambda-url.ap-northeast-1.on.aws/
- Catalog -> https://catalog-hook.lambda-url.ap-northeast-1.on.aws/

2.3 Provisioned Concurrency for Latency-Sensitive Webhooks

Stripe webhooks have a 30-second timeout and retry aggressively. Cold starts on webhook handlers can cause failures.

Configuration:
- Provisioned concurrency = 5 (handles burst without cold start)
- Reserved concurrency = 50 (prevents runaway invocations)
- Dead letter queue = SQS (captures failed webhook processing)
- Timeout = 30s (match Stripe's retry window)

Cost: 5 provisioned * $0.000004646/GB-sec * 512MB * 3600s/hr * 24hr = ~$0.32/day

3. EventBridge Event-Driven FM Trigger Patterns

3.1 Event Schema Design for FM Integration

Well-designed event schemas ensure loose coupling between producers and FM consumers.

MangaAssist Event Schema Registry:
===================================

Source: mangaassist.chat
  DetailType: MessageReceived
  Detail:
    session_id: string
    user_id: string
    message: string (max 4000 chars)
    language: string (ja | en)
    channel: string (web | mobile | api)

Source: mangaassist.ai
  DetailType: InferenceCompleted
  Detail:
    session_id: string
    model_id: string
    status: string (success | error | timeout)
    metrics:
      input_tokens: integer
      output_tokens: integer
      latency_ms: integer
      estimated_cost_usd: string
    response_summary: string (max 500 chars)

Source: mangaassist.ai
  DetailType: ModerationFlagged
  Detail:
    session_id: string
    content_hash: string (SHA-256)
    content_preview: string (max 200 chars)
    severity: string (low | medium | high | critical)
    flagged_categories: string[]
    model_id: string

3.2 Event Routing Patterns

Pattern 1: Content-Based Routing
---------------------------------
Rule: Route only high/critical moderation events to the moderation handler
EventPattern:
  source: ["mangaassist.ai"]
  detail-type: ["ModerationFlagged"]
  detail:
    severity: ["high", "critical"]
Target: Lambda moderation handler

Pattern 2: Fan-Out to Multiple Consumers
------------------------------------------
Rule: Every InferenceCompleted event goes to BOTH post-processor AND analytics
EventPattern:
  source: ["mangaassist.ai"]
  detail-type: ["InferenceCompleted"]
Targets:
  - Lambda inference post-processor (primary action)
  - Lambda analytics processor (metrics + logging)

Pattern 3: Input Transformation
---------------------------------
Rule: Transform catalog events before sending to embedding regeneration
InputTransformer:
  InputPathsMap:
    product_id: "$.detail.product_id"
    title: "$.detail.title"
  InputTemplate: '{"action": "regenerate", "product_id": "<product_id>", "title": "<title>"}'
Target: Lambda embedding handler (receives simplified payload)

Pattern 4: Scheduled FM Invocation
------------------------------------
Rule: Every 6 hours, trigger batch recommendation regeneration
ScheduleExpression: rate(6 hours)
Target: Step Functions workflow (iterates all active customers)

3.3 Preventing Infinite Event Loops

A critical concern with event-driven FM triggers: if an FM consumer publishes events that re-trigger the same consumer.

Anti-Loop Strategies:
======================

1. Source Differentiation
   - Producer source: "mangaassist.chat"
   - Consumer source: "mangaassist.ai"
   - Rule only matches "mangaassist.chat" -> never re-triggers

2. Event Metadata Guards
   - Include "processing_stage" in detail
   - Rule filters: detail.processing_stage: ["raw"] (not "processed")
   - Consumer publishes with processing_stage: "processed"

3. Circuit Breaker via DynamoDB
   - Track event chain depth per session_id
   - Reject processing if chain_depth > 3
   - TTL-based cleanup after 1 hour

4. EventBridge Rule Exclusion
   - Rule pattern with $or / prefix matching
   - Explicitly exclude events from AI consumers

4. Sidecar Pattern for Adding GenAI to Existing Microservices

4.1 Envoy Sidecar Proxy with AWS App Mesh

The sidecar pattern adds FM capabilities to existing microservices without modifying application code. An Envoy proxy runs alongside the application container in the same ECS task.

ECS Fargate Task Definition:
=============================

Task: MangaAssist-Orchestrator
  Container 1: app (application container)
    - Port 8080
    - Business logic, routing, session management
    - Calls localhost:9080 for AI operations

  Container 2: envoy-sidecar (App Mesh proxy)
    - Port 9080 (inbound from app)
    - Port 15000 (admin interface)
    - Routes to Bedrock endpoint via service mesh
    - Circuit breaker: 5 consecutive failures -> open
    - Retry policy: 2 retries, 1s/2s backoff
    - Timeout: 10s per request

  Container 3: xray-daemon (observability sidecar)
    - Port 2000 (UDP)
    - Collects traces from both app and envoy
    - Forwards to X-Ray service

4.2 Circuit Breaker Configuration for Bedrock Calls

App Mesh Circuit Breaker Settings:
===================================

Outlier Detection:
  consecutive_5xx: 5        # 5 failures -> eject endpoint
  interval: 10s             # check every 10 seconds
  base_ejection_time: 30s   # ejected for 30 seconds minimum
  max_ejection_percent: 50  # never eject more than 50% of endpoints

Connection Pool:
  max_connections: 100       # max concurrent connections to Bedrock
  max_pending_requests: 50   # max queued requests
  max_requests: 200          # max active requests

Timeout Policy:
  per_request_timeout: 10s   # individual request timeout
  idle_timeout: 60s          # close idle connections after 60s

Retry Policy:
  max_retries: 2
  retry_on: ["5xx", "gateway-error", "reset", "connect-failure"]
  per_retry_timeout: 5s

4.3 Traffic Splitting for Model A/B Testing

App Mesh Virtual Router:
=========================

Route: /v1/chat
  Match: POST /v1/chat
  Action:
    WeightedTargets:
      - VirtualNode: chat-service-sonnet   weight: 20   # 20% traffic to Sonnet
      - VirtualNode: chat-service-haiku    weight: 80   # 80% traffic to Haiku

Route: /v1/search
  Match: POST /v1/search
  Action:
    WeightedTargets:
      - VirtualNode: search-service-v2     weight: 10   # 10% canary
      - VirtualNode: search-service-v1     weight: 90   # 90% stable

Monitoring:
  - CloudWatch metrics per virtual node
  - Compare latency, error rate, user satisfaction
  - Automated rollback if error rate > 5%

5. Production Code

5.1 WebhookFMHandler --- Webhook-to-FM Bridge with Full Security

"""
WebhookFMHandler: Receives external webhooks, validates security,
and triggers Foundation Model processing via EventBridge.
Deployed as Lambda Function URL for each webhook source.
"""

import json
import hashlib
import hmac
import time
import os
import logging
from datetime import datetime, timezone
from typing import Any, Optional

import boto3
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

# --- AWS Clients ---
dynamodb = boto3.resource("dynamodb")
eventbridge = boto3.client("events")
secrets_client = boto3.client("secretsmanager")
bedrock_runtime = boto3.client("bedrock-runtime")

# --- Configuration ---
IDEMPOTENCY_TABLE_NAME = os.environ.get("IDEMPOTENCY_TABLE", "MangaAssist-Webhook-Idempotency")
EVENT_BUS_NAME = os.environ.get("EVENT_BUS_NAME", "MangaAssist-AI-Events")
WEBHOOK_SECRET_ARN = os.environ.get("WEBHOOK_SECRET_ARN", "")
MAX_TIMESTAMP_DRIFT = int(os.environ.get("MAX_TIMESTAMP_DRIFT", "300"))
IDEMPOTENCY_TTL_HOURS = int(os.environ.get("IDEMPOTENCY_TTL_HOURS", "24"))
FM_MODEL_ID = os.environ.get("FM_MODEL_ID", "anthropic.claude-3-haiku-20240307-v1:0")

idempotency_table = dynamodb.Table(IDEMPOTENCY_TABLE_NAME)

# Cached secret to avoid repeated Secrets Manager calls within same invocation
_cached_secrets: Optional[dict] = None


# ============================================================
# Security: HMAC Verification + Replay Prevention
# ============================================================

def _load_webhook_secrets() -> dict:
    """
    Load webhook signing secrets from Secrets Manager.
    Cached for the lifetime of the Lambda execution environment.
    Secret format: {"stripe": "whsec_...", "catalog": "cat_sec_...", "reviews": "rev_..."}
    """
    global _cached_secrets
    if _cached_secrets is not None:
        return _cached_secrets

    response = secrets_client.get_secret_value(SecretId=WEBHOOK_SECRET_ARN)
    _cached_secrets = json.loads(response["SecretString"])
    return _cached_secrets


def verify_webhook_signature(
    raw_body: str,
    signature_header: str,
    source: str,
    algorithm: str = "sha256",
) -> bool:
    """
    Verify HMAC signature against the raw request body.
    Uses constant-time comparison to prevent timing attacks.
    """
    secrets = _load_webhook_secrets()
    secret = secrets.get(source)
    if not secret:
        logger.error("No webhook secret configured for source: %s", source)
        return False

    hash_func = hashlib.sha256 if algorithm == "sha256" else hashlib.sha1

    # Some providers prefix signatures with algorithm identifier
    # e.g., Stripe: "t=timestamp,v1=signature"
    # Generic: just the hex digest
    sig_value = signature_header
    if "=" in sig_value and "," in sig_value:
        # Stripe-style: extract v1 signature
        parts = {k: v for k, v in (p.split("=", 1) for p in sig_value.split(","))}
        sig_value = parts.get("v1", sig_value)

    expected = hmac.new(
        key=secret.encode("utf-8"),
        msg=raw_body.encode("utf-8"),
        digestmod=hash_func,
    ).hexdigest()

    return hmac.compare_digest(expected, sig_value)


def verify_timestamp_freshness(timestamp_value: str) -> bool:
    """
    Verify webhook timestamp is within the acceptable drift window.
    Prevents replay attacks using captured webhook deliveries.
    """
    try:
        ts = int(timestamp_value)
    except (ValueError, TypeError):
        logger.warning("Invalid timestamp value: %s", timestamp_value)
        return False

    drift = abs(int(time.time()) - ts)
    if drift > MAX_TIMESTAMP_DRIFT:
        logger.warning("Timestamp drift %ds exceeds max %ds", drift, MAX_TIMESTAMP_DRIFT)
        return False
    return True


# ============================================================
# Idempotency: DynamoDB Conditional Write Pattern
# ============================================================

def claim_idempotency_key(key: str) -> bool:
    """
    Attempt to claim an idempotency key via DynamoDB conditional put.
    Returns True if claimed (first time), False if already exists (duplicate).
    """
    ttl_epoch = int(time.time()) + (IDEMPOTENCY_TTL_HOURS * 3600)
    try:
        idempotency_table.put_item(
            Item={
                "idempotency_key": key,
                "claimed_at": datetime.now(timezone.utc).isoformat(),
                "status": "processing",
                "ttl": ttl_epoch,
            },
            ConditionExpression="attribute_not_exists(idempotency_key)",
        )
        return True
    except ClientError as e:
        if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
            logger.info("Duplicate webhook: %s", key)
            return False
        raise


def complete_idempotency_key(key: str, result: dict) -> None:
    """Mark processing as completed with result summary."""
    idempotency_table.update_item(
        Key={"idempotency_key": key},
        UpdateExpression="SET #s = :done, completed_at = :ts, result_summary = :r",
        ExpressionAttributeNames={"#s": "status"},
        ExpressionAttributeValues={
            ":done": "completed",
            ":ts": datetime.now(timezone.utc).isoformat(),
            ":r": json.dumps(result, default=str)[:1000],
        },
    )


def release_idempotency_key(key: str) -> None:
    """Delete key on failure so the webhook can be retried."""
    idempotency_table.delete_item(Key={"idempotency_key": key})


# ============================================================
# FM Integration: Quick Sentiment/Summary via Bedrock
# ============================================================

def invoke_fm_for_webhook(prompt: str, max_tokens: int = 512) -> str:
    """
    Invoke Bedrock Claude 3 for webhook-triggered FM processing.
    Used for real-time sentiment analysis, content moderation, summaries.
    """
    response = bedrock_runtime.invoke_model(
        modelId=FM_MODEL_ID,
        contentType="application/json",
        accept="application/json",
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": max_tokens,
            "temperature": 0.1,
            "messages": [{"role": "user", "content": prompt}],
        }),
    )

    result = json.loads(response["body"].read())
    return result.get("content", [{}])[0].get("text", "")


# ============================================================
# Webhook Source Processors
# ============================================================

def process_payment_webhook(payload: dict) -> dict:
    """Process Stripe payment completed -> generate AI order confirmation."""
    event_type = payload.get("type", "")
    data = payload.get("data", {}).get("object", {})

    if event_type != "checkout.session.completed":
        return {"processed": False, "reason": f"unhandled_type: {event_type}"}

    order_id = data.get("metadata", {}).get("order_id", "unknown")
    items_json = data.get("metadata", {}).get("items", "[]")
    amount = data.get("amount_total", 0) / 100
    currency = data.get("currency", "jpy").upper()

    # Publish to EventBridge for async AI confirmation generation
    eventbridge.put_events(Entries=[{
        "Source": "mangaassist.payments",
        "DetailType": "PaymentCompleted",
        "Detail": json.dumps({
            "order_id": order_id,
            "customer_email": data.get("customer_email", ""),
            "amount": str(amount),
            "currency": currency,
            "items": items_json,
            "action": "generate_confirmation",
            "processing_stage": "raw",
        }),
        "EventBusName": EVENT_BUS_NAME,
    }])

    logger.info("Payment webhook -> EventBridge: order=%s amount=%s%s", order_id, amount, currency)
    return {"processed": True, "order_id": order_id}


def process_review_webhook(payload: dict) -> dict:
    """Process customer review -> real-time sentiment + publish for moderation."""
    review_id = payload.get("review_id", "unknown")
    review_text = payload.get("text", "")
    product_id = payload.get("product_id", "")

    if not review_text or not product_id:
        return {"processed": False, "reason": "missing_required_fields"}

    # Real-time sentiment analysis via Haiku (fast, cheap)
    sentiment_prompt = (
        f"Analyze the sentiment of this manga product review. "
        f"Reply with exactly one word: positive, negative, or neutral.\n\n"
        f"Review: {review_text[:1000]}"
    )
    sentiment = invoke_fm_for_webhook(sentiment_prompt, max_tokens=10).strip().lower()

    # Publish enriched event to EventBridge
    eventbridge.put_events(Entries=[{
        "Source": "mangaassist.reviews",
        "DetailType": "ReviewSubmitted",
        "Detail": json.dumps({
            "review_id": review_id,
            "product_id": product_id,
            "review_text": review_text[:2000],
            "rating": payload.get("rating", 0),
            "customer_id": payload.get("customer_id", ""),
            "ai_sentiment": sentiment,
            "actions": ["content_moderation", "response_generation"],
            "processing_stage": "raw",
        }),
        "EventBusName": EVENT_BUS_NAME,
    }])

    logger.info("Review webhook -> sentiment=%s review_id=%s", sentiment, review_id)
    return {"processed": True, "review_id": review_id, "sentiment": sentiment}


def process_catalog_webhook(payload: dict) -> dict:
    """Process publisher catalog update -> trigger embedding regeneration."""
    update_type = payload.get("type", "")
    items = payload.get("items", [])

    if not items:
        return {"processed": False, "reason": "empty_items"}

    # Batch publish (EventBridge max 10 entries per PutEvents call)
    entries = []
    for item in items[:10]:
        entries.append({
            "Source": "mangaassist.catalog",
            "DetailType": f"CatalogUpdate.{update_type}",
            "Detail": json.dumps({
                "product_id": item.get("id", ""),
                "title": item.get("title", ""),
                "update_type": update_type,
                "data": item,
                "actions": ["regenerate_embedding", "update_search_index"],
                "processing_stage": "raw",
            }),
            "EventBusName": EVENT_BUS_NAME,
        })

    response = eventbridge.put_events(Entries=entries)
    failed = response.get("FailedEntryCount", 0)
    if failed:
        logger.error("Failed to publish %d/%d catalog events", failed, len(entries))

    return {
        "processed": True,
        "update_type": update_type,
        "published": len(entries) - failed,
        "failed": failed,
    }


# ============================================================
# Source Configuration Table
# ============================================================

SOURCE_CONFIG = {
    "stripe": {
        "handler": process_payment_webhook,
        "sig_header": "stripe-signature",
        "ts_header": None,
        "algorithm": "sha256",
        "id_field": "id",
    },
    "reviews": {
        "handler": process_review_webhook,
        "sig_header": "x-webhook-signature",
        "ts_header": "x-webhook-timestamp",
        "algorithm": "sha256",
        "id_field": "review_id",
    },
    "catalog": {
        "handler": process_catalog_webhook,
        "sig_header": "x-catalog-signature",
        "ts_header": "x-catalog-timestamp",
        "algorithm": "sha256",
        "id_field": "batch_id",
    },
}


# ============================================================
# Lambda Entry Point
# ============================================================

def handler(event: dict, context: Any) -> dict:
    """
    WebhookFMHandler Lambda entry point.
    Deployed as Lambda Function URL. Path suffix determines source.
    POST https://<fn-url>.lambda-url.ap-northeast-1.on.aws/{source}
    """
    path = event.get("rawPath", event.get("path", ""))
    source = path.strip("/").split("/")[-1] if path else ""
    headers = {k.lower(): v for k, v in event.get("headers", {}).items()}
    raw_body = event.get("body", "")

    logger.info("Webhook received: source=%s request_id=%s", source, context.aws_request_id)

    # --- Validate source ---
    if source not in SOURCE_CONFIG:
        return _resp(404, {"error": f"Unknown source: {source}"})

    cfg = SOURCE_CONFIG[source]

    # --- Step 1: HMAC signature verification ---
    signature = headers.get(cfg["sig_header"], "")
    if not signature:
        return _resp(401, {"error": "Missing signature header"})

    if not verify_webhook_signature(raw_body, signature, source, cfg["algorithm"]):
        return _resp(403, {"error": "Invalid signature"})

    # --- Step 2: Timestamp freshness (replay prevention) ---
    if cfg["ts_header"]:
        ts_val = headers.get(cfg["ts_header"], "")
        if not verify_timestamp_freshness(ts_val):
            return _resp(403, {"error": "Stale timestamp"})

    # --- Step 3: Parse payload ---
    try:
        payload = json.loads(raw_body) if isinstance(raw_body, str) else raw_body
    except json.JSONDecodeError:
        return _resp(400, {"error": "Invalid JSON"})

    # --- Step 4: Idempotency ---
    id_value = payload.get(cfg["id_field"], context.aws_request_id)
    idem_key = f"{source}:{id_value}"

    if not claim_idempotency_key(idem_key):
        return _resp(200, {"status": "already_processed", "key": idem_key})

    # --- Step 5: Process ---
    try:
        result = cfg["handler"](payload)
        complete_idempotency_key(idem_key, result)
        return _resp(200, {"status": "processed", **result})
    except Exception as e:
        release_idempotency_key(idem_key)
        logger.exception("Processing failed: %s", e)
        return _resp(500, {"error": "Processing failed"})


def _resp(code: int, body: dict) -> dict:
    return {
        "statusCode": code,
        "headers": {"Content-Type": "application/json"},
        "body": json.dumps(body, default=str),
    }

5.2 MicroserviceFMProxy --- Sidecar-Aware FM Client for ECS Fargate

"""
MicroserviceFMProxy: A production FM client for ECS Fargate microservices.
Integrates with the Envoy sidecar for circuit breaking, retries, and tracing.
Provides caching, model routing, and cost tracking.
"""

import json
import time
import hashlib
import os
import logging
from typing import Optional
from dataclasses import dataclass, field

import boto3
import redis
from botocore.config import Config as BotoConfig
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

# --- Configuration ---
REDIS_ENDPOINT = os.environ.get("ELASTICACHE_ENDPOINT", "localhost")
REDIS_PORT = int(os.environ.get("ELASTICACHE_PORT", "6379"))
EVENT_BUS_NAME = os.environ.get("EVENT_BUS_NAME", "MangaAssist-AI-Events")
CACHE_TTL_SECONDS = int(os.environ.get("FM_CACHE_TTL", "300"))

# Bedrock client with retry and timeout config
bedrock_config = BotoConfig(
    region_name="ap-northeast-1",
    retries={"max_attempts": 2, "mode": "adaptive"},
    read_timeout=15,
    connect_timeout=5,
)
bedrock_runtime = boto3.client("bedrock-runtime", config=bedrock_config)
eventbridge_client = boto3.client("events")

# Redis connection pool (shared across requests in the same container)
redis_pool = redis.ConnectionPool(
    host=REDIS_ENDPOINT,
    port=REDIS_PORT,
    max_connections=20,
    socket_timeout=2,
    socket_connect_timeout=1,
    retry_on_timeout=True,
    decode_responses=True,
)


@dataclass
class FMResponse:
    """Structured response from Foundation Model invocation."""
    text: str
    model_id: str
    input_tokens: int
    output_tokens: int
    latency_ms: int
    cached: bool = False
    estimated_cost_usd: float = 0.0
    stop_reason: str = ""


@dataclass
class ModelConfig:
    """Configuration for a specific model variant."""
    model_id: str
    max_tokens: int = 1024
    temperature: float = 0.3
    top_p: float = 0.9
    input_cost_per_1m: float = 0.25   # Haiku default
    output_cost_per_1m: float = 1.25  # Haiku default


# Pre-defined model configurations
MODELS = {
    "haiku": ModelConfig(
        model_id="anthropic.claude-3-haiku-20240307-v1:0",
        max_tokens=1024,
        temperature=0.3,
        input_cost_per_1m=0.25,
        output_cost_per_1m=1.25,
    ),
    "sonnet": ModelConfig(
        model_id="anthropic.claude-3-sonnet-20240229-v1:0",
        max_tokens=2048,
        temperature=0.5,
        input_cost_per_1m=3.0,
        output_cost_per_1m=15.0,
    ),
}


class MicroserviceFMProxy:
    """
    FM proxy for ECS Fargate microservices.
    Features:
    - Response caching via ElastiCache Redis
    - Automatic model selection (Haiku for simple, Sonnet for complex)
    - Cost tracking and budget enforcement
    - EventBridge event publishing for analytics
    - Circuit breaker awareness (respects Envoy sidecar signals)
    """

    def __init__(self, default_model: str = "haiku"):
        self.redis_client = redis.Redis(connection_pool=redis_pool)
        self.default_model = default_model
        self._request_count = 0
        self._total_cost = 0.0

    # -------------------------------------------------------
    # Public: Invoke FM with caching and routing
    # -------------------------------------------------------

    def invoke(
        self,
        prompt: str,
        model: str = "",
        system_prompt: str = "",
        session_id: str = "",
        skip_cache: bool = False,
        max_tokens: Optional[int] = None,
        temperature: Optional[float] = None,
    ) -> FMResponse:
        """
        Invoke Foundation Model with automatic caching, routing, and tracking.

        Args:
            prompt: User message or prompt text.
            model: Model key ("haiku" or "sonnet"). Defaults to self.default_model.
            system_prompt: Optional system prompt for context.
            session_id: Session ID for event correlation.
            skip_cache: Bypass cache for this request.
            max_tokens: Override default max tokens.
            temperature: Override default temperature.

        Returns:
            FMResponse with text, metrics, and cost.
        """
        model_key = model or self.default_model
        config = MODELS.get(model_key, MODELS["haiku"])

        effective_max_tokens = max_tokens or config.max_tokens
        effective_temperature = temperature if temperature is not None else config.temperature

        # --- Cache check ---
        cache_key = None
        if not skip_cache:
            cache_key = self._build_cache_key(prompt, system_prompt, config.model_id)
            cached = self._get_from_cache(cache_key)
            if cached:
                logger.info("Cache hit: model=%s key=%s", model_key, cache_key[:16])
                return cached

        # --- Invoke Bedrock ---
        start_time = time.monotonic()

        messages = [{"role": "user", "content": prompt}]
        body = {
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": effective_max_tokens,
            "temperature": effective_temperature,
            "top_p": config.top_p,
            "messages": messages,
        }
        if system_prompt:
            body["system"] = system_prompt

        try:
            response = bedrock_runtime.invoke_model(
                modelId=config.model_id,
                contentType="application/json",
                accept="application/json",
                body=json.dumps(body),
            )
        except ClientError as e:
            error_code = e.response["Error"]["Code"]
            logger.error("Bedrock invocation failed: %s - %s", error_code, str(e))
            if error_code == "ThrottlingException":
                # Signal circuit breaker: this request should be retried
                raise
            raise

        elapsed_ms = int((time.monotonic() - start_time) * 1000)

        # --- Parse response ---
        result_body = json.loads(response["body"].read())
        text = result_body.get("content", [{}])[0].get("text", "")
        usage = result_body.get("usage", {})
        input_tokens = usage.get("input_tokens", 0)
        output_tokens = usage.get("output_tokens", 0)
        stop_reason = result_body.get("stop_reason", "")

        cost = (
            (input_tokens * config.input_cost_per_1m / 1_000_000)
            + (output_tokens * config.output_cost_per_1m / 1_000_000)
        )

        fm_response = FMResponse(
            text=text,
            model_id=config.model_id,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            latency_ms=elapsed_ms,
            cached=False,
            estimated_cost_usd=cost,
            stop_reason=stop_reason,
        )

        # --- Cache response ---
        if cache_key and stop_reason == "end_turn":
            self._put_to_cache(cache_key, fm_response)

        # --- Publish metrics event ---
        self._publish_inference_event(session_id, fm_response)

        # --- Track cost ---
        self._request_count += 1
        self._total_cost += cost

        logger.info(
            "FM invoked: model=%s tokens=%d/%d latency=%dms cost=$%.6f",
            model_key, input_tokens, output_tokens, elapsed_ms, cost,
        )

        return fm_response

    def select_model_for_complexity(self, prompt: str) -> str:
        """
        Auto-select model based on prompt complexity heuristics.
        Haiku for simple queries, Sonnet for complex reasoning.
        """
        word_count = len(prompt.split())
        has_analysis_keywords = any(
            kw in prompt.lower()
            for kw in ["analyze", "compare", "explain why", "summarize", "recommend based on"]
        )

        if word_count > 200 or has_analysis_keywords:
            return "sonnet"
        return "haiku"

    def get_stats(self) -> dict:
        """Return runtime statistics for this proxy instance."""
        return {
            "total_requests": self._request_count,
            "total_cost_usd": round(self._total_cost, 6),
            "avg_cost_per_request": round(self._total_cost / max(self._request_count, 1), 6),
        }

    # -------------------------------------------------------
    # Private: Cache operations
    # -------------------------------------------------------

    def _build_cache_key(self, prompt: str, system_prompt: str, model_id: str) -> str:
        content = f"{model_id}:{system_prompt}:{prompt}"
        return f"fm:cache:{hashlib.sha256(content.encode()).hexdigest()}"

    def _get_from_cache(self, key: str) -> Optional[FMResponse]:
        try:
            data = self.redis_client.get(key)
            if data:
                obj = json.loads(data)
                return FMResponse(cached=True, **obj)
        except (redis.RedisError, json.JSONDecodeError) as e:
            logger.warning("Cache read failed: %s", e)
        return None

    def _put_to_cache(self, key: str, response: FMResponse) -> None:
        try:
            data = {
                "text": response.text,
                "model_id": response.model_id,
                "input_tokens": response.input_tokens,
                "output_tokens": response.output_tokens,
                "latency_ms": response.latency_ms,
                "estimated_cost_usd": response.estimated_cost_usd,
                "stop_reason": response.stop_reason,
            }
            self.redis_client.setex(key, CACHE_TTL_SECONDS, json.dumps(data))
        except redis.RedisError as e:
            logger.warning("Cache write failed: %s", e)

    # -------------------------------------------------------
    # Private: Event publishing
    # -------------------------------------------------------

    def _publish_inference_event(self, session_id: str, response: FMResponse) -> None:
        try:
            eventbridge_client.put_events(Entries=[{
                "Source": "mangaassist.ai",
                "DetailType": "InferenceCompleted",
                "Detail": json.dumps({
                    "session_id": session_id,
                    "model_id": response.model_id,
                    "status": "success",
                    "metrics": {
                        "input_tokens": response.input_tokens,
                        "output_tokens": response.output_tokens,
                        "latency_ms": response.latency_ms,
                        "estimated_cost_usd": response.estimated_cost_usd,
                    },
                    "stop_reason": response.stop_reason,
                    "processing_stage": "completed",
                }),
                "EventBusName": EVENT_BUS_NAME,
            }])
        except Exception as e:
            logger.warning("Event publish failed (non-fatal): %s", e)

5.3 EventDrivenFMTrigger --- EventBridge Consumer that Invokes FM

"""
EventDrivenFMTrigger: Lambda function triggered by EventBridge rules.
Consumes events and invokes Foundation Models for asynchronous AI processing.
Handles inference post-processing, content moderation, and confirmation generation.
"""

import json
import os
import time
import logging
from datetime import datetime, timezone
from typing import Any

import boto3
from botocore.config import Config as BotoConfig
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

# --- AWS Clients ---
bedrock_config = BotoConfig(
    retries={"max_attempts": 3, "mode": "adaptive"},
    read_timeout=30,
    connect_timeout=5,
)
bedrock_runtime = boto3.client("bedrock-runtime", config=bedrock_config)
dynamodb = boto3.resource("dynamodb")
eventbridge = boto3.client("events")
ses_client = boto3.client("ses")

# --- Configuration ---
SESSIONS_TABLE = os.environ.get("DYNAMODB_SESSIONS_TABLE", "MangaAssist-Sessions")
PRODUCTS_TABLE = os.environ.get("DYNAMODB_PRODUCTS_TABLE", "MangaAssist-Products")
EVENT_BUS_NAME = os.environ.get("EVENT_BUS_NAME", "MangaAssist-AI-Events")
SENDER_EMAIL = os.environ.get("SENDER_EMAIL", "noreply@mangaassist.example.com")
SONNET_MODEL = "anthropic.claude-3-sonnet-20240229-v1:0"
HAIKU_MODEL = "anthropic.claude-3-haiku-20240307-v1:0"
MAX_EVENT_CHAIN_DEPTH = int(os.environ.get("MAX_CHAIN_DEPTH", "3"))

sessions_table = dynamodb.Table(SESSIONS_TABLE)
products_table = dynamodb.Table(PRODUCTS_TABLE)


# ============================================================
# Anti-Loop: Event Chain Depth Tracking
# ============================================================

def check_chain_depth(event_detail: dict) -> int:
    """
    Check and increment event processing chain depth.
    Prevents infinite loops where FM output triggers new events
    that re-invoke FM processing.
    Returns current depth. Raises ValueError if max depth exceeded.
    """
    depth = event_detail.get("_chain_depth", 0)
    if depth >= MAX_EVENT_CHAIN_DEPTH:
        raise ValueError(
            f"Event chain depth {depth} exceeds maximum {MAX_EVENT_CHAIN_DEPTH}. "
            f"Possible infinite loop detected."
        )
    return depth + 1


def add_chain_metadata(detail: dict, current_depth: int, source_event_id: str) -> dict:
    """Add chain tracking metadata to outbound event detail."""
    detail["_chain_depth"] = current_depth
    detail["_source_event_id"] = source_event_id
    detail["_processed_at"] = datetime.now(timezone.utc).isoformat()
    detail["processing_stage"] = "processed"
    return detail


# ============================================================
# FM Invocation Helpers
# ============================================================

def invoke_claude(
    prompt: str,
    model_id: str = HAIKU_MODEL,
    system_prompt: str = "",
    max_tokens: int = 1024,
    temperature: float = 0.3,
) -> dict:
    """
    Invoke Claude 3 via Bedrock and return structured result.
    Used for all event-driven FM processing.
    """
    start = time.monotonic()

    body = {
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": max_tokens,
        "temperature": temperature,
        "messages": [{"role": "user", "content": prompt}],
    }
    if system_prompt:
        body["system"] = system_prompt

    response = bedrock_runtime.invoke_model(
        modelId=model_id,
        contentType="application/json",
        accept="application/json",
        body=json.dumps(body),
    )

    result = json.loads(response["body"].read())
    elapsed_ms = int((time.monotonic() - start) * 1000)

    usage = result.get("usage", {})
    return {
        "text": result.get("content", [{}])[0].get("text", ""),
        "input_tokens": usage.get("input_tokens", 0),
        "output_tokens": usage.get("output_tokens", 0),
        "latency_ms": elapsed_ms,
        "model_id": model_id,
        "stop_reason": result.get("stop_reason", ""),
    }


# ============================================================
# Event Handlers by DetailType
# ============================================================

def handle_payment_completed(detail: dict, chain_depth: int) -> dict:
    """
    Generate AI-powered order confirmation email.
    Triggered by: mangaassist.payments / PaymentCompleted
    """
    order_id = detail.get("order_id", "unknown")
    customer_email = detail.get("customer_email", "")
    amount = detail.get("amount", "0")
    currency = detail.get("currency", "JPY")
    items = detail.get("items", "[]")

    if isinstance(items, str):
        try:
            items = json.loads(items)
        except json.JSONDecodeError:
            items = []

    # Build confirmation prompt
    items_text = "\n".join(
        f"- {item.get('title', 'Unknown')} (x{item.get('qty', 1)})"
        for item in items
    ) if items else "- Order items (details in confirmation email)"

    prompt = (
        f"Generate a friendly, professional order confirmation message for a Japanese manga store.\n"
        f"Order ID: {order_id}\n"
        f"Amount: {amount} {currency}\n"
        f"Items:\n{items_text}\n\n"
        f"Write in a warm tone. Include the order ID, total amount, and a thank-you message. "
        f"Keep it under 200 words. Write in English with a light manga/anime flavor."
    )

    result = invoke_claude(prompt, model_id=HAIKU_MODEL, max_tokens=512, temperature=0.5)

    # Send email via SES
    if customer_email:
        try:
            ses_client.send_email(
                Source=SENDER_EMAIL,
                Destination={"ToAddresses": [customer_email]},
                Message={
                    "Subject": {"Data": f"MangaAssist Order Confirmed! #{order_id}"},
                    "Body": {"Text": {"Data": result["text"]}},
                },
            )
            logger.info("Confirmation email sent: order=%s email=%s", order_id, customer_email)
        except ClientError as e:
            logger.error("SES send failed: %s", e)

    return {"action": "confirmation_sent", "order_id": order_id, **result}


def handle_review_submitted(detail: dict, chain_depth: int) -> dict:
    """
    Perform AI content moderation and sentiment analysis on customer review.
    Triggered by: mangaassist.reviews / ReviewSubmitted
    """
    review_id = detail.get("review_id", "unknown")
    review_text = detail.get("review_text", "")
    product_id = detail.get("product_id", "")

    if not review_text:
        return {"action": "skipped", "reason": "empty_review"}

    # Content moderation check
    moderation_prompt = (
        f"You are a content moderator for a Japanese manga store. "
        f"Analyze this customer review for inappropriate content.\n\n"
        f"Review: {review_text}\n\n"
        f"Respond with a JSON object:\n"
        f'{{"safe": true/false, "severity": "none/low/medium/high/critical", '
        f'"categories": ["list of flagged categories"], '
        f'"reason": "brief explanation"}}'
    )

    moderation_result = invoke_claude(
        moderation_prompt, model_id=HAIKU_MODEL, max_tokens=256, temperature=0.1
    )

    # Parse moderation response
    try:
        moderation = json.loads(moderation_result["text"])
    except json.JSONDecodeError:
        moderation = {"safe": True, "severity": "none", "categories": [], "reason": "parse_error"}

    # If flagged, publish ModerationFlagged event
    if not moderation.get("safe", True):
        severity = moderation.get("severity", "medium")
        eventbridge.put_events(Entries=[{
            "Source": "mangaassist.ai",
            "DetailType": "ModerationFlagged",
            "Detail": json.dumps(add_chain_metadata(
                {
                    "review_id": review_id,
                    "product_id": product_id,
                    "severity": severity,
                    "categories": moderation.get("categories", []),
                    "reason": moderation.get("reason", ""),
                    "content_preview": review_text[:200],
                },
                chain_depth,
                review_id,
            )),
            "EventBusName": EVENT_BUS_NAME,
        }])
        logger.warning("Review flagged: review_id=%s severity=%s", review_id, severity)

    return {
        "action": "moderation_completed",
        "review_id": review_id,
        "safe": moderation.get("safe", True),
        "severity": moderation.get("severity", "none"),
        **moderation_result,
    }


def handle_catalog_update(detail: dict, chain_depth: int) -> dict:
    """
    Generate product description and tags using FM for catalog updates.
    Triggered by: mangaassist.catalog / CatalogUpdate.*
    """
    product_id = detail.get("product_id", "")
    title = detail.get("title", "")
    data = detail.get("data", {})
    update_type = detail.get("update_type", "")

    if update_type == "discontinued":
        # No need to generate new descriptions for discontinued items
        return {"action": "skipped", "reason": "discontinued_product", "product_id": product_id}

    # Generate enhanced product description
    prompt = (
        f"Generate a compelling product description for this manga item.\n"
        f"Title: {title}\n"
        f"Author: {data.get('author', 'Unknown')}\n"
        f"Genre: {data.get('genre', 'Unknown')}\n"
        f"Volume: {data.get('volume', 'N/A')}\n"
        f"Synopsis: {data.get('synopsis', 'No synopsis available')}\n\n"
        f"Write a 2-3 sentence description that would appeal to manga fans. "
        f"Also provide 5 search tags as a comma-separated list on a new line starting with 'Tags: '."
    )

    result = invoke_claude(prompt, model_id=HAIKU_MODEL, max_tokens=300, temperature=0.6)

    # Update product in DynamoDB with AI-generated description
    try:
        products_table.update_item(
            Key={"product_id": product_id},
            UpdateExpression=(
                "SET ai_description = :desc, ai_updated_at = :ts, ai_model = :model"
            ),
            ExpressionAttributeValues={
                ":desc": result["text"],
                ":ts": datetime.now(timezone.utc).isoformat(),
                ":model": result["model_id"],
            },
        )
        logger.info("Product description updated: %s", product_id)
    except ClientError as e:
        logger.error("DynamoDB update failed for %s: %s", product_id, e)

    return {"action": "description_generated", "product_id": product_id, **result}


# ============================================================
# Event Router
# ============================================================

EVENT_HANDLERS = {
    "PaymentCompleted": handle_payment_completed,
    "ReviewSubmitted": handle_review_submitted,
    "CatalogUpdate.new_release": handle_catalog_update,
    "CatalogUpdate.price_change": handle_catalog_update,
}


# ============================================================
# Lambda Entry Point
# ============================================================

def handler(event: dict, context: Any) -> dict:
    """
    EventDrivenFMTrigger Lambda entry point.
    Invoked by EventBridge rules. Routes events to appropriate FM handler.
    """
    detail_type = event.get("detail-type", "")
    detail = event.get("detail", {})
    source = event.get("source", "")
    event_id = event.get("id", "unknown")

    logger.info(
        "Event received: source=%s type=%s id=%s",
        source, detail_type, event_id,
    )

    # --- Anti-loop check ---
    try:
        chain_depth = check_chain_depth(detail)
    except ValueError as e:
        logger.error("Infinite loop prevented: %s", e)
        return {"status": "rejected", "reason": str(e)}

    # --- Route to handler ---
    handler_fn = EVENT_HANDLERS.get(detail_type)
    if not handler_fn:
        logger.warning("No handler for detail_type: %s", detail_type)
        return {"status": "unhandled", "detail_type": detail_type}

    try:
        result = handler_fn(detail, chain_depth)
        logger.info("Event processed: type=%s result=%s", detail_type, result.get("action"))
        return {"status": "success", **result}

    except ClientError as e:
        logger.exception("AWS service error: %s", e)
        return {"status": "error", "error": str(e)}

    except Exception as e:
        logger.exception("Unhandled error processing event: %s", e)
        raise  # Let Lambda retry via EventBridge DLQ

Key Takeaways for the AIP-C01 Exam

Concept What to Remember
API Gateway 29s timeout Hard limit for REST API synchronous responses; use WebSocket for streaming or async pattern for long FM calls
Lambda Function URL Simpler than API Gateway for single-purpose webhooks; built-in HTTPS, no additional cost
HMAC webhook verification Use hmac.compare_digest() for constant-time comparison; never use == for signature comparison
Idempotency pattern DynamoDB conditional write (attribute_not_exists) + TTL cleanup; prevents duplicate FM invocations on webhook retry
EventBridge event size Max 256 KB per event; truncate FM responses before publishing
EventBridge fan-out Up to 5 targets per rule; each target gets a copy of the event for parallel processing
Event loop prevention Use processing_stage field, source differentiation, or chain depth counter to prevent infinite loops
Sidecar pattern Envoy proxy in same ECS task; adds circuit breaking, mTLS, tracing without modifying app code
Circuit breaker Open after N consecutive failures; prevents cascading failures when Bedrock is throttled or unavailable
Model A/B testing App Mesh traffic splitting with weighted virtual nodes; monitor per-node metrics in CloudWatch
Response caching Redis cache keyed by hash(model + system_prompt + user_prompt); TTL 5 minutes; saves cost on repeated queries
Cost tracking Haiku $0.25/$1.25 per 1M tokens; Sonnet $3/$15 per 1M tokens; track per-request in EventBridge events

Quick Reference: Integration Pattern Decision Matrix

+----------------------------+------------------+----------------+-------------------+
| Scenario                   | API Gateway REST | Lambda Fn URL  | EventBridge       |
+----------------------------+------------------+----------------+-------------------+
| Client-facing chat API     | YES (primary)    | No             | No                |
| External webhook receiver  | Possible         | YES (simpler)  | No                |
| Async FM processing        | No               | No             | YES (primary)     |
| Multi-consumer fan-out     | No               | No             | YES (up to 5)     |
| Real-time streaming        | WebSocket API    | No             | No                |
| Internal microservice call | HTTP proxy       | No             | Possible          |
| Batch FM invocation        | No               | No             | YES + SQS + Lambda|
| Model A/B testing          | App Mesh split   | No             | No                |
+----------------------------+------------------+----------------+-------------------+