LOCAL PREVIEW View on GitHub

Flexible Model Interaction Architecture

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Attribute Value
Certification AWS Certified AI Practitioner (AIP-C01)
Domain 2 — Implementation and Integration of Foundation Models
Task 2.4 — Design model interaction systems for generative AI applications
Skill 2.4.1 — Create flexible model interaction systems (Bedrock APIs for synchronous requests, language-specific SDKs and SQS for asynchronous processing, API Gateway for custom API clients with request validation)

1. Interaction Patterns Mindmap

mindmap
  root((Flexible Model<br/>Interaction))
    Synchronous
      InvokeModel API
        Single-turn Q&A
        Real-time chat
        Inline recommendations
      InvokeModelWithResponseStream
        Streaming chat
        Progressive rendering
        Time-to-first-token optimization
      Converse API
        Multi-turn conversations
        Tool use / function calling
        System prompts
    Asynchronous
      SQS + Lambda
        Batch manga analysis
        Bulk content moderation
        Scheduled summarization
      SQS + ECS Tasks
        Long-running generation
        Multi-model pipelines
        Heavy image analysis
      EventBridge + Step Functions
        Orchestrated workflows
        Retry with backoff
        Fan-out processing
    Batch
      Bedrock Batch Inference
        Catalog enrichment
        Nightly re-ranking
        Bulk translation JP→EN
      S3 Input/Output
        JSONL payloads
        Result collection
        Cost optimization at 50% discount
    Custom APIs
      API Gateway REST
        Request validation
        Rate limiting
        API key management
      API Gateway WebSocket
        Persistent connections
        Server-push responses
        Real-time streaming
      API Gateway HTTP
        Low-latency proxy
        JWT authorization
        Minimal overhead

2. Architecture Flowchart — MangaAssist Sync/Async Request Flows

flowchart TB
    subgraph Clients["Client Layer"]
        WEB["Web App<br/>(React SPA)"]
        MOB["Mobile App<br/>(React Native)"]
        PART["Partner API<br/>(REST Clients)"]
    end

    subgraph Gateway["API Gateway Layer"]
        WS["WebSocket API<br/>(wss://manga.api)"]
        REST["REST API<br/>(https://manga.api/v1)"]
        VALID["Request Validator<br/>(JSON Schema)"]
        MAPPING["VTL Mapping<br/>Templates"]
    end

    subgraph Sync["Synchronous Path"]
        ECS["ECS Fargate<br/>Orchestrator"]
        CACHE["ElastiCache Redis<br/>Response Cache"]
        BEDROCK_S["Bedrock InvokeModel<br/>(Claude 3 Haiku)"]
        BEDROCK_STREAM["Bedrock InvokeModelWithResponseStream<br/>(Claude 3 Sonnet)"]
    end

    subgraph Async["Asynchronous Path"]
        SQS_STD["SQS Standard Queue<br/>(manga-fm-requests)"]
        SQS_FIFO["SQS FIFO Queue<br/>(manga-fm-ordered.fifo)"]
        DLQ["Dead Letter Queue<br/>(manga-fm-dlq)"]
        LAMBDA_P["Lambda Processor<br/>(batch-fm-worker)"]
        ECS_TASK["ECS Fargate Task<br/>(heavy-analysis)"]
        SNS["SNS Topic<br/>(fm-results)"]
    end

    subgraph Data["Data Layer"]
        DDB["DynamoDB<br/>Sessions & Products"]
        OS["OpenSearch Serverless<br/>Vector Store"]
        S3["S3 Bucket<br/>Batch Results"]
    end

    WEB -->|"WebSocket connect"| WS
    MOB -->|"WebSocket connect"| WS
    PART -->|"REST POST"| REST

    REST --> VALID
    VALID --> MAPPING
    MAPPING -->|"Sync request"| ECS
    WS -->|"Sync message"| ECS
    MAPPING -->|"Async request"| SQS_STD

    ECS --> CACHE
    CACHE -->|"Cache miss"| BEDROCK_S
    ECS -->|"Streaming"| BEDROCK_STREAM
    ECS --> DDB
    ECS --> OS

    SQS_STD --> LAMBDA_P
    SQS_FIFO --> ECS_TASK
    SQS_STD -->|"3 failures"| DLQ
    LAMBDA_P --> BEDROCK_S
    ECS_TASK --> BEDROCK_STREAM
    LAMBDA_P --> S3
    ECS_TASK --> S3
    LAMBDA_P --> SNS
    ECS_TASK --> SNS
    SNS -->|"Notify client"| WS

    style Clients fill:#e1f5fe,stroke:#0288d1
    style Gateway fill:#fff3e0,stroke:#f57c00
    style Sync fill:#e8f5e9,stroke:#388e3c
    style Async fill:#fce4ec,stroke:#c62828
    style Data fill:#f3e5f5,stroke:#7b1fa2

3. Bedrock InvokeModel API Patterns from Various Compute

3.1 Core API Surface

Amazon Bedrock provides three primary invocation APIs for foundation model interaction:

API Use Case Latency Profile MangaAssist Usage
InvokeModel Single synchronous request 1-10s depending on model Quick manga lookups with Haiku
InvokeModelWithResponseStream Streaming synchronous request TTFT 200-800ms Chat conversations with Sonnet
Converse / ConverseStream Multi-turn with tool use 1-12s depending on model Complex recommendation flows
CreateModelInvocationJob Batch asynchronous processing Minutes to hours Nightly catalog enrichment

3.2 InvokeModel from Lambda

"""
Lambda function for synchronous Bedrock InvokeModel calls.
Used for quick, single-turn manga queries via Haiku.
"""
import json
import time
import boto3
from botocore.config import Config

# Configure retry behavior for Lambda's short-lived execution
bedrock_config = Config(
    region_name="us-east-1",
    retries={
        "max_attempts": 2,        # Limited retries for Lambda timeout budget
        "mode": "adaptive"        # Adaptive retry with token bucket
    },
    read_timeout=30,              # 30s read timeout for model response
    connect_timeout=5             # 5s connection timeout
)

bedrock_runtime = boto3.client(
    "bedrock-runtime",
    config=bedrock_config
)


def lambda_handler(event, context):
    """
    Handle synchronous manga query via Bedrock InvokeModel.

    Expected event:
    {
        "query": "What manga series are similar to One Piece?",
        "model_id": "anthropic.claude-3-haiku-20240307-v1:0",
        "max_tokens": 512
    }
    """
    start_time = time.time()

    query = event.get("query", "")
    model_id = event.get("model_id", "anthropic.claude-3-haiku-20240307-v1:0")
    max_tokens = event.get("max_tokens", 512)

    # Build the request body per Anthropic Messages API format
    request_body = json.dumps({
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": max_tokens,
        "messages": [
            {
                "role": "user",
                "content": query
            }
        ],
        "system": (
            "You are MangaAssist, a helpful assistant for a Japanese manga store. "
            "Provide concise, accurate recommendations and information about manga series, "
            "authors, and genres. Respond in the same language as the user's query."
        ),
        "temperature": 0.3,       # Lower temperature for factual manga info
        "top_p": 0.9
    })

    try:
        response = bedrock_runtime.invoke_model(
            modelId=model_id,
            contentType="application/json",
            accept="application/json",
            body=request_body
        )

        response_body = json.loads(response["body"].read())

        elapsed_ms = int((time.time() - start_time) * 1000)

        return {
            "statusCode": 200,
            "body": {
                "response": response_body["content"][0]["text"],
                "model": model_id,
                "input_tokens": response_body["usage"]["input_tokens"],
                "output_tokens": response_body["usage"]["output_tokens"],
                "latency_ms": elapsed_ms,
                "stop_reason": response_body["stop_reason"]
            }
        }

    except bedrock_runtime.exceptions.ThrottlingException as e:
        return {
            "statusCode": 429,
            "body": {"error": "Model throttled", "detail": str(e)}
        }
    except bedrock_runtime.exceptions.ModelTimeoutException as e:
        return {
            "statusCode": 504,
            "body": {"error": "Model timeout", "detail": str(e)}
        }
    except Exception as e:
        return {
            "statusCode": 500,
            "body": {"error": "InvokeModel failed", "detail": str(e)}
        }

3.3 InvokeModelWithResponseStream from ECS Fargate

"""
ECS Fargate service for streaming Bedrock responses.
Streams tokens back to the client via WebSocket for real-time chat.
"""
import json
import asyncio
import boto3
from botocore.config import Config


class BedrockStreamingClient:
    """
    Manages streaming invocations from ECS Fargate to Bedrock.
    Optimized for long-running container lifecycle with connection pooling.
    """

    def __init__(self, region: str = "us-east-1"):
        self._config = Config(
            region_name=region,
            retries={
                "max_attempts": 3,        # More retries for long-running ECS tasks
                "mode": "adaptive"
            },
            read_timeout=120,             # 2 min for streaming responses
            connect_timeout=10,
            max_pool_connections=50       # Connection pool for high concurrency
        )
        self._client = boto3.client("bedrock-runtime", config=self._config)

    async def stream_manga_response(
        self,
        messages: list,
        system_prompt: str,
        model_id: str = "anthropic.claude-3-sonnet-20240229-v1:0",
        max_tokens: int = 2048,
        on_token: callable = None
    ) -> dict:
        """
        Stream a response from Bedrock, yielding tokens as they arrive.

        Args:
            messages: List of conversation messages in Anthropic format
            system_prompt: System prompt for MangaAssist behavior
            model_id: Bedrock model identifier
            max_tokens: Maximum tokens to generate
            on_token: Async callback invoked with each token chunk

        Returns:
            Complete response metadata including usage statistics
        """
        request_body = json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": max_tokens,
            "messages": messages,
            "system": system_prompt,
            "temperature": 0.7,
            "top_p": 0.95,
            "stop_sequences": ["</answer>"]
        })

        # InvokeModelWithResponseStream returns an EventStream
        response = self._client.invoke_model_with_response_stream(
            modelId=model_id,
            contentType="application/json",
            accept="application/json",
            body=request_body
        )

        full_text = []
        input_tokens = 0
        output_tokens = 0
        stop_reason = None

        # Process the event stream
        event_stream = response["body"]
        for event in event_stream:
            chunk = event.get("chunk")
            if chunk:
                chunk_data = json.loads(chunk["bytes"].decode("utf-8"))

                # Handle different event types in the stream
                event_type = chunk_data.get("type")

                if event_type == "message_start":
                    usage = chunk_data.get("message", {}).get("usage", {})
                    input_tokens = usage.get("input_tokens", 0)

                elif event_type == "content_block_delta":
                    delta = chunk_data.get("delta", {})
                    if delta.get("type") == "text_delta":
                        token_text = delta["text"]
                        full_text.append(token_text)

                        # Stream token back to caller (e.g., WebSocket)
                        if on_token:
                            await on_token(token_text)

                elif event_type == "message_delta":
                    usage = chunk_data.get("usage", {})
                    output_tokens = usage.get("output_tokens", 0)
                    stop_reason = chunk_data.get("delta", {}).get("stop_reason")

                elif event_type == "message_stop":
                    pass  # Stream complete

        return {
            "text": "".join(full_text),
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "stop_reason": stop_reason,
            "model_id": model_id
        }


class MangaAssistStreamHandler:
    """
    WebSocket handler that bridges API Gateway WebSocket to Bedrock streaming.
    Runs on ECS Fargate as a persistent service.
    """

    def __init__(self):
        self.bedrock = BedrockStreamingClient()
        self.system_prompt = (
            "You are MangaAssist, a knowledgeable assistant for a Japanese manga store. "
            "You help customers discover manga series, provide detailed information about "
            "authors, genres, and story arcs. You understand both Japanese and English "
            "manga terminology. When recommending series, consider the user's reading "
            "history and preferences. Format recommendations with title (Japanese and "
            "romanized), author, genre, and a brief description."
        )

    async def handle_message(self, websocket_connection_id: str, message: dict):
        """
        Process an incoming WebSocket message and stream the FM response back.
        """
        messages = message.get("messages", [])
        model_preference = message.get("model", "sonnet")

        model_map = {
            "sonnet": "anthropic.claude-3-sonnet-20240229-v1:0",
            "haiku": "anthropic.claude-3-haiku-20240307-v1:0"
        }
        model_id = model_map.get(model_preference, model_map["sonnet"])

        tokens_sent = 0

        async def send_token(token: str):
            nonlocal tokens_sent
            tokens_sent += 1
            # Send token chunk back through API Gateway WebSocket
            await self._post_to_connection(
                websocket_connection_id,
                {
                    "type": "token",
                    "data": token,
                    "sequence": tokens_sent
                }
            )

        result = await self.bedrock.stream_manga_response(
            messages=messages,
            system_prompt=self.system_prompt,
            model_id=model_id,
            on_token=send_token
        )

        # Send completion message
        await self._post_to_connection(
            websocket_connection_id,
            {
                "type": "complete",
                "usage": {
                    "input_tokens": result["input_tokens"],
                    "output_tokens": result["output_tokens"]
                },
                "stop_reason": result["stop_reason"]
            }
        )

        return result

    async def _post_to_connection(self, connection_id: str, data: dict):
        """Post data back to a WebSocket connection via API Gateway Management API."""
        # API Gateway Management API client would be initialized with endpoint URL
        pass  # Implementation depends on API Gateway endpoint configuration

3.4 InvokeModel from EC2 (Long-Running Analysis)

"""
EC2-based heavy manga analysis worker.
Used for tasks that exceed Lambda/ECS timeout limits or need GPU-adjacent processing.
"""
import json
import logging
import boto3
from botocore.config import Config
from concurrent.futures import ThreadPoolExecutor, as_completed

logger = logging.getLogger(__name__)


class EC2BedrockWorker:
    """
    Long-running Bedrock worker on EC2 with connection pooling
    and concurrent request management.
    """

    def __init__(self, region: str = "us-east-1", max_concurrent: int = 10):
        self._config = Config(
            region_name=region,
            retries={
                "max_attempts": 5,         # Aggressive retries for batch work
                "mode": "adaptive"
            },
            read_timeout=300,              # 5 min timeout for complex analysis
            connect_timeout=10,
            max_pool_connections=max_concurrent
        )
        self._client = boto3.client("bedrock-runtime", config=self._config)
        self._executor = ThreadPoolExecutor(max_workers=max_concurrent)

    def analyze_manga_catalog(self, manga_items: list) -> list:
        """
        Analyze a batch of manga items concurrently using Bedrock.

        Args:
            manga_items: List of manga metadata dicts to analyze

        Returns:
            List of analysis results with enriched metadata
        """
        futures = {}
        for item in manga_items:
            future = self._executor.submit(self._analyze_single, item)
            futures[future] = item["manga_id"]

        results = []
        for future in as_completed(futures):
            manga_id = futures[future]
            try:
                result = future.result(timeout=60)
                results.append({"manga_id": manga_id, "status": "success", **result})
            except Exception as e:
                logger.error(f"Analysis failed for {manga_id}: {e}")
                results.append({"manga_id": manga_id, "status": "error", "error": str(e)})

        return results

    def _analyze_single(self, manga_item: dict) -> dict:
        """Analyze a single manga item with Bedrock."""
        prompt = self._build_analysis_prompt(manga_item)

        request_body = json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 1024,
            "messages": [{"role": "user", "content": prompt}],
            "system": (
                "You are a manga catalog analysis system. Analyze the provided manga "
                "metadata and generate structured enrichment data including genre tags, "
                "theme keywords, reading level, and similar series recommendations. "
                "Respond in valid JSON format only."
            ),
            "temperature": 0.1   # Very low temperature for consistent structured output
        })

        response = self._client.invoke_model(
            modelId="anthropic.claude-3-haiku-20240307-v1:0",  # Haiku for cost efficiency
            contentType="application/json",
            accept="application/json",
            body=request_body
        )

        response_body = json.loads(response["body"].read())
        analysis = json.loads(response_body["content"][0]["text"])

        return {
            "analysis": analysis,
            "tokens_used": {
                "input": response_body["usage"]["input_tokens"],
                "output": response_body["usage"]["output_tokens"]
            }
        }

    def _build_analysis_prompt(self, manga_item: dict) -> str:
        return (
            f"Analyze the following manga entry and provide enrichment data:\n\n"
            f"Title: {manga_item.get('title', 'Unknown')}\n"
            f"Title (JP): {manga_item.get('title_jp', 'Unknown')}\n"
            f"Author: {manga_item.get('author', 'Unknown')}\n"
            f"Publisher: {manga_item.get('publisher', 'Unknown')}\n"
            f"Year: {manga_item.get('year', 'Unknown')}\n"
            f"Volumes: {manga_item.get('volumes', 'Unknown')}\n"
            f"Synopsis: {manga_item.get('synopsis', 'No synopsis available')}\n\n"
            f"Return JSON with: genre_tags, theme_keywords, reading_level, "
            f"similar_series (list of 5), content_warnings, target_demographic."
        )

4. AWS SDK Integration

4.1 Python boto3 — BedrockSyncInvoker

"""
Production-ready synchronous Bedrock invoker with caching,
metrics, and intelligent model routing for MangaAssist.
"""
import json
import time
import hashlib
import logging
from typing import Optional
from dataclasses import dataclass, field

import boto3
import redis
from botocore.config import Config
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)


@dataclass
class InvocationResult:
    """Structured result from a Bedrock invocation."""
    text: str
    model_id: str
    input_tokens: int
    output_tokens: int
    latency_ms: int
    cache_hit: bool = False
    stop_reason: str = "end_turn"
    estimated_cost_usd: float = 0.0


@dataclass
class ModelPricing:
    """Per-million-token pricing for a Bedrock model."""
    input_per_million: float
    output_per_million: float


# MangaAssist model pricing table
MODEL_PRICING = {
    "anthropic.claude-3-sonnet-20240229-v1:0": ModelPricing(3.00, 15.00),
    "anthropic.claude-3-haiku-20240307-v1:0": ModelPricing(0.25, 1.25),
}


class BedrockSyncInvoker:
    """
    Synchronous Bedrock invoker with response caching, model routing,
    cost tracking, and comprehensive error handling.

    Features:
    - Redis response caching with TTL
    - Automatic model fallback (Sonnet -> Haiku on throttle)
    - Cost estimation per invocation
    - CloudWatch metrics emission
    - Request deduplication

    Usage:
        invoker = BedrockSyncInvoker(
            redis_host="manga-cache.abc123.use1.cache.amazonaws.com",
            cache_ttl_seconds=300
        )
        result = invoker.invoke(
            prompt="Recommend 5 action manga similar to Naruto",
            model_id="anthropic.claude-3-sonnet-20240229-v1:0"
        )
        print(result.text)
        print(f"Cost: ${result.estimated_cost_usd:.6f}")
    """

    def __init__(
        self,
        region: str = "us-east-1",
        redis_host: Optional[str] = None,
        redis_port: int = 6379,
        cache_ttl_seconds: int = 300,
        enable_fallback: bool = True
    ):
        self._bedrock_config = Config(
            region_name=region,
            retries={"max_attempts": 3, "mode": "adaptive"},
            read_timeout=60,
            connect_timeout=5,
            max_pool_connections=25
        )
        self._client = boto3.client("bedrock-runtime", config=self._bedrock_config)
        self._cloudwatch = boto3.client("cloudwatch", region_name=region)

        # Redis cache setup
        self._cache = None
        self._cache_ttl = cache_ttl_seconds
        if redis_host:
            self._cache = redis.Redis(
                host=redis_host,
                port=redis_port,
                decode_responses=True,
                socket_timeout=2,
                socket_connect_timeout=2,
                retry_on_timeout=True
            )

        self._enable_fallback = enable_fallback

        # MangaAssist system prompt
        self._system_prompt = (
            "You are MangaAssist, a helpful assistant for a Japanese manga store. "
            "You have deep knowledge of manga series, authors, genres, and Japanese "
            "publishing. You can recommend series based on user preferences, explain "
            "plot details without spoilers, and help with purchasing decisions. "
            "Always be respectful of Japanese culture and naming conventions. "
            "Use romanized Japanese titles alongside English titles when relevant."
        )

    def invoke(
        self,
        prompt: str,
        model_id: str = "anthropic.claude-3-sonnet-20240229-v1:0",
        max_tokens: int = 1024,
        temperature: float = 0.5,
        system_prompt: Optional[str] = None,
        use_cache: bool = True,
        conversation_history: Optional[list] = None
    ) -> InvocationResult:
        """
        Invoke Bedrock synchronously with caching and fallback.

        Args:
            prompt: User prompt text
            model_id: Bedrock model ID
            max_tokens: Maximum response tokens
            temperature: Sampling temperature (0.0-1.0)
            system_prompt: Override default system prompt
            use_cache: Whether to check/populate cache
            conversation_history: Previous messages for multi-turn

        Returns:
            InvocationResult with response text and metadata
        """
        start_time = time.time()
        sys_prompt = system_prompt or self._system_prompt

        # Build messages list
        messages = []
        if conversation_history:
            messages.extend(conversation_history)
        messages.append({"role": "user", "content": prompt})

        # Check cache for single-turn queries
        cache_key = None
        if use_cache and self._cache and not conversation_history:
            cache_key = self._compute_cache_key(prompt, model_id, max_tokens, temperature)
            cached = self._get_cached_response(cache_key)
            if cached:
                elapsed_ms = int((time.time() - start_time) * 1000)
                self._emit_metric("CacheHit", 1)
                return InvocationResult(
                    text=cached["text"],
                    model_id=model_id,
                    input_tokens=cached.get("input_tokens", 0),
                    output_tokens=cached.get("output_tokens", 0),
                    latency_ms=elapsed_ms,
                    cache_hit=True,
                    stop_reason=cached.get("stop_reason", "end_turn"),
                    estimated_cost_usd=0.0  # No cost for cached responses
                )

        # Attempt invocation with optional fallback
        try:
            result = self._invoke_model(messages, sys_prompt, model_id, max_tokens, temperature)
        except ClientError as e:
            error_code = e.response["Error"]["Code"]
            if error_code == "ThrottlingException" and self._enable_fallback:
                logger.warning(f"Throttled on {model_id}, falling back to Haiku")
                self._emit_metric("FallbackTriggered", 1)
                fallback_model = "anthropic.claude-3-haiku-20240307-v1:0"
                result = self._invoke_model(
                    messages, sys_prompt, fallback_model, max_tokens, temperature
                )
                model_id = fallback_model
            else:
                raise

        elapsed_ms = int((time.time() - start_time) * 1000)

        # Calculate cost
        pricing = MODEL_PRICING.get(model_id, ModelPricing(0, 0))
        cost = (
            (result["input_tokens"] / 1_000_000) * pricing.input_per_million +
            (result["output_tokens"] / 1_000_000) * pricing.output_per_million
        )

        # Cache the response
        if cache_key and self._cache:
            self._set_cached_response(cache_key, result)

        # Emit metrics
        self._emit_metric("InvocationLatency", elapsed_ms, "Milliseconds")
        self._emit_metric("InputTokens", result["input_tokens"])
        self._emit_metric("OutputTokens", result["output_tokens"])
        self._emit_metric("EstimatedCostUSD", cost * 100, "None")  # Cents for precision

        return InvocationResult(
            text=result["text"],
            model_id=model_id,
            input_tokens=result["input_tokens"],
            output_tokens=result["output_tokens"],
            latency_ms=elapsed_ms,
            cache_hit=False,
            stop_reason=result["stop_reason"],
            estimated_cost_usd=cost
        )

    def _invoke_model(
        self,
        messages: list,
        system_prompt: str,
        model_id: str,
        max_tokens: int,
        temperature: float
    ) -> dict:
        """Execute the raw Bedrock InvokeModel call."""
        request_body = json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": max_tokens,
            "messages": messages,
            "system": system_prompt,
            "temperature": temperature,
            "top_p": 0.95
        })

        response = self._client.invoke_model(
            modelId=model_id,
            contentType="application/json",
            accept="application/json",
            body=request_body
        )

        body = json.loads(response["body"].read())
        return {
            "text": body["content"][0]["text"],
            "input_tokens": body["usage"]["input_tokens"],
            "output_tokens": body["usage"]["output_tokens"],
            "stop_reason": body["stop_reason"]
        }

    def _compute_cache_key(
        self, prompt: str, model_id: str, max_tokens: int, temperature: float
    ) -> str:
        """Generate a deterministic cache key from request parameters."""
        key_material = f"{prompt}|{model_id}|{max_tokens}|{temperature}"
        return f"manga:fm:sync:{hashlib.sha256(key_material.encode()).hexdigest()[:16]}"

    def _get_cached_response(self, key: str) -> Optional[dict]:
        """Retrieve a cached response from Redis."""
        try:
            data = self._cache.get(key)
            if data:
                return json.loads(data)
        except redis.RedisError as e:
            logger.warning(f"Cache read error: {e}")
        return None

    def _set_cached_response(self, key: str, result: dict):
        """Store a response in Redis with TTL."""
        try:
            self._cache.setex(key, self._cache_ttl, json.dumps(result))
        except redis.RedisError as e:
            logger.warning(f"Cache write error: {e}")

    def _emit_metric(self, name: str, value: float, unit: str = "Count"):
        """Emit a CloudWatch metric for monitoring."""
        try:
            self._cloudwatch.put_metric_data(
                Namespace="MangaAssist/Bedrock",
                MetricData=[{
                    "MetricName": name,
                    "Value": value,
                    "Unit": unit,
                    "Dimensions": [
                        {"Name": "Service", "Value": "MangaAssist"},
                        {"Name": "Component", "Value": "BedrockSyncInvoker"}
                    ]
                }]
            )
        except Exception as e:
            logger.debug(f"Metric emission failed: {e}")

4.2 JavaScript SDK v3 Integration

/**
 * MangaAssist Bedrock client using AWS SDK for JavaScript v3.
 * Used by the Node.js API layer for lightweight request handling.
 */
import {
  BedrockRuntimeClient,
  InvokeModelCommand,
  InvokeModelWithResponseStreamCommand,
} from "@aws-sdk/client-bedrock-runtime";

const MANGA_SYSTEM_PROMPT = `You are MangaAssist, a helpful assistant for a Japanese manga store.
You provide recommendations, answer questions about manga series, and help customers
find their next great read. You understand both Japanese and English terminology.`;

/**
 * Configuration for the Bedrock client with optimized retry behavior.
 */
const bedrockClient = new BedrockRuntimeClient({
  region: "us-east-1",
  maxAttempts: 3,
  requestHandler: {
    requestTimeout: 30000,    // 30s total request timeout
    httpsAgent: {
      maxSockets: 50,         // Connection pool size
      keepAlive: true,
      keepAliveMsecs: 60000,
    },
  },
});

/**
 * Invoke Bedrock synchronously for a manga query.
 *
 * @param {string} prompt - User's manga-related query
 * @param {Object} options - Invocation options
 * @param {string} options.modelId - Bedrock model identifier
 * @param {number} options.maxTokens - Maximum response tokens
 * @param {number} options.temperature - Sampling temperature
 * @returns {Promise<Object>} Response with text and usage metadata
 */
export async function invokeMangaQuery(prompt, options = {}) {
  const {
    modelId = "anthropic.claude-3-haiku-20240307-v1:0",
    maxTokens = 512,
    temperature = 0.3,
  } = options;

  const requestBody = JSON.stringify({
    anthropic_version: "bedrock-2023-05-31",
    max_tokens: maxTokens,
    messages: [{ role: "user", content: prompt }],
    system: MANGA_SYSTEM_PROMPT,
    temperature,
  });

  const command = new InvokeModelCommand({
    modelId,
    contentType: "application/json",
    accept: "application/json",
    body: requestBody,
  });

  const startTime = Date.now();
  const response = await bedrockClient.send(command);

  const responseBody = JSON.parse(new TextDecoder().decode(response.body));
  const latencyMs = Date.now() - startTime;

  return {
    text: responseBody.content[0].text,
    modelId,
    inputTokens: responseBody.usage.input_tokens,
    outputTokens: responseBody.usage.output_tokens,
    stopReason: responseBody.stop_reason,
    latencyMs,
  };
}

/**
 * Stream a Bedrock response, yielding tokens as they arrive.
 * Used for real-time chat over WebSocket.
 *
 * @param {Array} messages - Conversation message history
 * @param {Object} options - Stream options
 * @yields {string} Individual token chunks
 * @returns {AsyncGenerator<string>} Token stream
 */
export async function* streamMangaResponse(messages, options = {}) {
  const {
    modelId = "anthropic.claude-3-sonnet-20240229-v1:0",
    maxTokens = 2048,
    temperature = 0.7,
  } = options;

  const requestBody = JSON.stringify({
    anthropic_version: "bedrock-2023-05-31",
    max_tokens: maxTokens,
    messages,
    system: MANGA_SYSTEM_PROMPT,
    temperature,
  });

  const command = new InvokeModelWithResponseStreamCommand({
    modelId,
    contentType: "application/json",
    accept: "application/json",
    body: requestBody,
  });

  const response = await bedrockClient.send(command);

  for await (const event of response.body) {
    const chunk = JSON.parse(new TextDecoder().decode(event.chunk?.bytes));

    if (chunk.type === "content_block_delta") {
      if (chunk.delta?.type === "text_delta") {
        yield chunk.delta.text;
      }
    }
  }
}

5. SQS-Based Async Processing for Batch Manga Analysis

"""
Async FM processor using SQS for decoupled, scalable manga analysis.
Handles batch requests that do not require real-time responses.
"""
import json
import time
import uuid
import logging
from typing import Optional
from dataclasses import dataclass

import boto3
from botocore.config import Config
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)


@dataclass
class AsyncFMRequest:
    """Represents an asynchronous FM processing request."""
    request_id: str
    prompt: str
    model_id: str
    max_tokens: int
    callback_url: Optional[str]
    metadata: dict
    priority: str = "normal"       # normal | high | low
    created_at: float = 0.0

    def to_sqs_message(self) -> dict:
        """Serialize to SQS message format."""
        return {
            "request_id": self.request_id,
            "prompt": self.prompt,
            "model_id": self.model_id,
            "max_tokens": self.max_tokens,
            "callback_url": self.callback_url,
            "metadata": self.metadata,
            "priority": self.priority,
            "created_at": self.created_at or time.time()
        }


class AsyncFMProcessor:
    """
    Asynchronous FM processor that uses SQS to decouple request
    submission from processing. Supports priority routing, DLQ handling,
    and callback notifications.

    Architecture:

        Producer          SQS Queues              Consumer
        -------          ----------              --------
        submit() ------> manga-fm-high --------> Lambda/ECS
        submit() ------> manga-fm-normal ------> Lambda/ECS
        submit() ------> manga-fm-low ----------> Lambda/ECS
                         manga-fm-dlq <---------- (failures)

    Usage:
        processor = AsyncFMProcessor()

        # Submit a request
        request_id = processor.submit(
            prompt="Analyze the art style evolution in Berserk volumes 1-40",
            model_id="anthropic.claude-3-sonnet-20240229-v1:0",
            priority="normal",
            callback_url="https://manga.api/webhooks/fm-result"
        )

        # Check status
        status = processor.get_status(request_id)
    """

    QUEUE_MAP = {
        "high": "manga-fm-high-priority",
        "normal": "manga-fm-normal",
        "low": "manga-fm-low-priority"
    }
    DLQ_NAME = "manga-fm-dlq"

    def __init__(self, region: str = "us-east-1"):
        self._sqs = boto3.client("sqs", region_name=region)
        self._dynamodb = boto3.resource("dynamodb", region_name=region)
        self._status_table = self._dynamodb.Table("manga-fm-request-status")

        # Resolve queue URLs
        self._queue_urls = {}
        for priority, queue_name in self.QUEUE_MAP.items():
            resp = self._sqs.get_queue_url(QueueName=queue_name)
            self._queue_urls[priority] = resp["QueueUrl"]

    def submit(
        self,
        prompt: str,
        model_id: str = "anthropic.claude-3-sonnet-20240229-v1:0",
        max_tokens: int = 2048,
        priority: str = "normal",
        callback_url: Optional[str] = None,
        metadata: Optional[dict] = None
    ) -> str:
        """
        Submit an async FM request to the appropriate SQS queue.

        Returns:
            request_id for tracking the request status
        """
        request_id = str(uuid.uuid4())

        request = AsyncFMRequest(
            request_id=request_id,
            prompt=prompt,
            model_id=model_id,
            max_tokens=max_tokens,
            callback_url=callback_url,
            metadata=metadata or {},
            priority=priority,
            created_at=time.time()
        )

        queue_url = self._queue_urls.get(priority, self._queue_urls["normal"])

        # Send to SQS with message attributes for filtering
        self._sqs.send_message(
            QueueUrl=queue_url,
            MessageBody=json.dumps(request.to_sqs_message()),
            MessageAttributes={
                "RequestId": {
                    "StringValue": request_id,
                    "DataType": "String"
                },
                "ModelId": {
                    "StringValue": model_id,
                    "DataType": "String"
                },
                "Priority": {
                    "StringValue": priority,
                    "DataType": "String"
                }
            },
            MessageGroupId=f"manga-fm-{priority}" if "fifo" in queue_url else None,
            MessageDeduplicationId=request_id if "fifo" in queue_url else None
        )

        # Track status in DynamoDB
        self._status_table.put_item(Item={
            "request_id": request_id,
            "status": "queued",
            "priority": priority,
            "model_id": model_id,
            "created_at": int(request.created_at),
            "ttl": int(request.created_at) + 86400  # 24hr TTL
        })

        logger.info(f"Submitted async request {request_id} to {priority} queue")
        return request_id

    def get_status(self, request_id: str) -> dict:
        """Check the status of an async FM request."""
        try:
            response = self._status_table.get_item(Key={"request_id": request_id})
            return response.get("Item", {"status": "not_found"})
        except ClientError as e:
            logger.error(f"Status lookup failed: {e}")
            return {"status": "error", "detail": str(e)}

    def submit_batch(self, requests: list) -> list:
        """
        Submit multiple async FM requests efficiently using SQS batch operations.

        Args:
            requests: List of dicts with prompt, model_id, max_tokens, etc.

        Returns:
            List of request IDs
        """
        request_ids = []

        # Group by priority for batch sending
        by_priority = {"high": [], "normal": [], "low": []}
        for req in requests:
            priority = req.get("priority", "normal")
            request_id = str(uuid.uuid4())
            request_ids.append(request_id)

            fm_request = AsyncFMRequest(
                request_id=request_id,
                prompt=req["prompt"],
                model_id=req.get("model_id", "anthropic.claude-3-haiku-20240307-v1:0"),
                max_tokens=req.get("max_tokens", 1024),
                callback_url=req.get("callback_url"),
                metadata=req.get("metadata", {}),
                priority=priority,
                created_at=time.time()
            )
            by_priority[priority].append(fm_request)

        # Send in batches of 10 (SQS limit)
        for priority, batch_requests in by_priority.items():
            if not batch_requests:
                continue

            queue_url = self._queue_urls.get(priority, self._queue_urls["normal"])

            for i in range(0, len(batch_requests), 10):
                batch = batch_requests[i:i+10]
                entries = []
                for req in batch:
                    entries.append({
                        "Id": req.request_id[:80],  # SQS ID length limit
                        "MessageBody": json.dumps(req.to_sqs_message()),
                        "MessageAttributes": {
                            "RequestId": {
                                "StringValue": req.request_id,
                                "DataType": "String"
                            },
                            "ModelId": {
                                "StringValue": req.model_id,
                                "DataType": "String"
                            }
                        }
                    })

                response = self._sqs.send_message_batch(
                    QueueUrl=queue_url,
                    Entries=entries
                )

                failed = response.get("Failed", [])
                if failed:
                    logger.error(f"Batch send failures: {failed}")

        logger.info(f"Submitted batch of {len(request_ids)} requests")
        return request_ids

6. API Gateway Custom API with Request Validation

6.1 APIGatewayFMProxy

"""
API Gateway FM Proxy — handles REST API requests, validates them,
transforms payloads, and routes to the appropriate Bedrock model.
Deployed as a Lambda authorizer + integration backend.
"""
import json
import re
import logging
from typing import Optional

import boto3
import jsonschema
from jsonschema import validate, ValidationError

logger = logging.getLogger(__name__)


# --- Request Validation Schemas ---

MANGA_QUERY_SCHEMA = {
    "$schema": "http://json-schema.org/draft-07/schema#",
    "type": "object",
    "required": ["query"],
    "properties": {
        "query": {
            "type": "string",
            "minLength": 1,
            "maxLength": 4000,
            "description": "User query text (supports Japanese Unicode)"
        },
        "model": {
            "type": "string",
            "enum": ["sonnet", "haiku"],
            "default": "haiku"
        },
        "max_tokens": {
            "type": "integer",
            "minimum": 1,
            "maximum": 4096,
            "default": 512
        },
        "temperature": {
            "type": "number",
            "minimum": 0.0,
            "maximum": 1.0,
            "default": 0.5
        },
        "conversation_id": {
            "type": "string",
            "pattern": "^[a-f0-9-]{36}$"
        },
        "language": {
            "type": "string",
            "enum": ["en", "ja", "auto"],
            "default": "auto"
        }
    },
    "additionalProperties": False
}

BATCH_ANALYSIS_SCHEMA = {
    "$schema": "http://json-schema.org/draft-07/schema#",
    "type": "object",
    "required": ["items"],
    "properties": {
        "items": {
            "type": "array",
            "minItems": 1,
            "maxItems": 100,
            "items": {
                "type": "object",
                "required": ["manga_id", "analysis_type"],
                "properties": {
                    "manga_id": {
                        "type": "string",
                        "pattern": "^MNG-[A-Z0-9]{8}$"
                    },
                    "analysis_type": {
                        "type": "string",
                        "enum": [
                            "genre_classification",
                            "content_summary",
                            "similar_series",
                            "reading_level",
                            "full_enrichment"
                        ]
                    },
                    "additional_context": {
                        "type": "string",
                        "maxLength": 1000
                    }
                }
            }
        },
        "callback_url": {
            "type": "string",
            "format": "uri",
            "pattern": "^https://"
        },
        "priority": {
            "type": "string",
            "enum": ["high", "normal", "low"],
            "default": "normal"
        }
    }
}


class RequestValidator:
    """
    Validates incoming API Gateway requests against JSON schemas.
    Handles Unicode-aware validation for Japanese manga titles and content.
    """

    SCHEMAS = {
        "manga_query": MANGA_QUERY_SCHEMA,
        "batch_analysis": BATCH_ANALYSIS_SCHEMA
    }

    def __init__(self):
        self._validators = {}
        for name, schema in self.SCHEMAS.items():
            self._validators[name] = jsonschema.Draft7Validator(schema)

    def validate_request(self, schema_name: str, body: dict) -> dict:
        """
        Validate a request body against a named schema.

        Args:
            schema_name: Key from SCHEMAS dict
            body: Parsed request body

        Returns:
            Dict with 'valid' bool and optional 'errors' list

        Raises:
            KeyError: If schema_name is not registered
        """
        validator = self._validators.get(schema_name)
        if not validator:
            raise KeyError(f"Unknown schema: {schema_name}")

        errors = []
        for error in validator.iter_errors(body):
            errors.append({
                "path": ".".join(str(p) for p in error.absolute_path) or "$",
                "message": error.message,
                "validator": error.validator
            })

        return {
            "valid": len(errors) == 0,
            "errors": errors
        }

    def sanitize_manga_query(self, query: str) -> str:
        """
        Sanitize a manga query while preserving Japanese Unicode characters.
        Removes potentially harmful content but keeps valid CJK characters.
        """
        # Allow: Latin, CJK Unified, Hiragana, Katakana, punctuation, digits, spaces
        allowed_pattern = (
            r'[^\u0020-\u007E'     # Basic Latin printable
            r'\u3000-\u303F'       # CJK Symbols and Punctuation
            r'\u3040-\u309F'       # Hiragana
            r'\u30A0-\u30FF'       # Katakana
            r'\u4E00-\u9FFF'       # CJK Unified Ideographs
            r'\uFF00-\uFFEF'       # Halfwidth and Fullwidth Forms
            r'\u2000-\u206F'       # General Punctuation
            r']'
        )
        sanitized = re.sub(allowed_pattern, '', query)
        return sanitized.strip()


class APIGatewayFMProxy:
    """
    Lambda handler that serves as the integration backend for
    API Gateway REST API endpoints. Routes requests to Bedrock
    after validation and transformation.

    Endpoints:
        POST /v1/manga/query      -> Synchronous FM invocation
        POST /v1/manga/batch      -> Async batch submission
        GET  /v1/manga/status/:id -> Check async request status
    """

    MODEL_MAP = {
        "sonnet": "anthropic.claude-3-sonnet-20240229-v1:0",
        "haiku": "anthropic.claude-3-haiku-20240307-v1:0"
    }

    def __init__(self):
        self._validator = RequestValidator()
        self._bedrock = boto3.client(
            "bedrock-runtime",
            config=boto3.session.Config(
                retries={"max_attempts": 3, "mode": "adaptive"},
                read_timeout=30
            )
        )
        self._sqs = boto3.client("sqs")
        self._dynamodb = boto3.resource("dynamodb")
        self._status_table = self._dynamodb.Table("manga-fm-request-status")

    def handle(self, event: dict, context) -> dict:
        """
        Main Lambda handler for API Gateway proxy integration.
        Routes based on HTTP method and resource path.
        """
        http_method = event.get("httpMethod", "")
        resource = event.get("resource", "")

        route_key = f"{http_method} {resource}"

        routes = {
            "POST /v1/manga/query": self._handle_sync_query,
            "POST /v1/manga/batch": self._handle_batch_submit,
            "GET /v1/manga/status/{request_id}": self._handle_status_check
        }

        handler = routes.get(route_key)
        if not handler:
            return self._response(404, {"error": "Not found", "path": resource})

        try:
            return handler(event, context)
        except Exception as e:
            logger.exception(f"Unhandled error in {route_key}")
            return self._response(500, {"error": "Internal server error"})

    def _handle_sync_query(self, event: dict, context) -> dict:
        """Handle synchronous manga query via Bedrock."""
        try:
            body = json.loads(event.get("body", "{}"))
        except json.JSONDecodeError:
            return self._response(400, {"error": "Invalid JSON body"})

        # Validate request
        validation = self._validator.validate_request("manga_query", body)
        if not validation["valid"]:
            return self._response(400, {
                "error": "Validation failed",
                "details": validation["errors"]
            })

        # Sanitize query
        query = self._validator.sanitize_manga_query(body["query"])
        model_key = body.get("model", "haiku")
        model_id = self.MODEL_MAP.get(model_key, self.MODEL_MAP["haiku"])
        max_tokens = body.get("max_tokens", 512)
        temperature = body.get("temperature", 0.5)

        # Invoke Bedrock
        request_body = json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": max_tokens,
            "messages": [{"role": "user", "content": query}],
            "system": (
                "You are MangaAssist. Provide helpful, concise manga recommendations "
                "and information. Respond in the same language as the query."
            ),
            "temperature": temperature
        })

        response = self._bedrock.invoke_model(
            modelId=model_id,
            contentType="application/json",
            accept="application/json",
            body=request_body
        )

        result = json.loads(response["body"].read())

        return self._response(200, {
            "response": result["content"][0]["text"],
            "model": model_key,
            "usage": {
                "input_tokens": result["usage"]["input_tokens"],
                "output_tokens": result["usage"]["output_tokens"]
            },
            "stop_reason": result["stop_reason"]
        })

    def _handle_batch_submit(self, event: dict, context) -> dict:
        """Handle batch analysis submission to SQS."""
        try:
            body = json.loads(event.get("body", "{}"))
        except json.JSONDecodeError:
            return self._response(400, {"error": "Invalid JSON body"})

        validation = self._validator.validate_request("batch_analysis", body)
        if not validation["valid"]:
            return self._response(400, {
                "error": "Validation failed",
                "details": validation["errors"]
            })

        # Submit to SQS — returns request IDs
        request_ids = []
        for item in body["items"]:
            # Each item becomes an individual SQS message
            request_ids.append(item["manga_id"])

        return self._response(202, {
            "message": "Batch submitted",
            "item_count": len(body["items"]),
            "request_ids": request_ids
        })

    def _handle_status_check(self, event: dict, context) -> dict:
        """Check async request status from DynamoDB."""
        request_id = event.get("pathParameters", {}).get("request_id", "")

        if not request_id:
            return self._response(400, {"error": "request_id is required"})

        try:
            response = self._status_table.get_item(Key={"request_id": request_id})
            item = response.get("Item")
            if not item:
                return self._response(404, {"error": "Request not found"})
            return self._response(200, item)
        except Exception as e:
            return self._response(500, {"error": str(e)})

    def _response(self, status_code: int, body: dict) -> dict:
        """Build API Gateway proxy response."""
        return {
            "statusCode": status_code,
            "headers": {
                "Content-Type": "application/json",
                "Access-Control-Allow-Origin": "*",
                "Access-Control-Allow-Methods": "GET,POST,OPTIONS",
                "X-Request-Id": str(__import__("uuid").uuid4())
            },
            "body": json.dumps(body, default=str)
        }

7. API Gateway OpenAPI Specification

# MangaAssist API Gateway OpenAPI Specification
# Defines the REST API with request validation models
openapi: "3.0.1"
info:
  title: "MangaAssist FM API"
  description: "Flexible model interaction API for MangaAssist manga store chatbot"
  version: "1.0.0"
  contact:
    name: "MangaAssist Platform Team"

servers:
  - url: "https://{api-id}.execute-api.us-east-1.amazonaws.com/prod"
    variables:
      api-id:
        default: "abc123xyz"

paths:
  /v1/manga/query:
    post:
      summary: "Synchronous manga query"
      description: "Submit a manga-related query for real-time FM response"
      operationId: "syncMangaQuery"
      tags:
        - Synchronous
      requestBody:
        required: true
        content:
          application/json:
            schema:
              $ref: "#/components/schemas/MangaQueryRequest"
            examples:
              japanese_query:
                summary: "Query in Japanese"
                value:
                  query: "ワンピースに似たアクション漫画を教えてください"
                  model: "sonnet"
                  language: "ja"
                  max_tokens: 1024
              english_query:
                summary: "Query in English"
                value:
                  query: "What are the best seinen manga from 2023?"
                  model: "haiku"
                  max_tokens: 512
      responses:
        "200":
          description: "Successful FM response"
          content:
            application/json:
              schema:
                $ref: "#/components/schemas/MangaQueryResponse"
        "400":
          description: "Validation error"
          content:
            application/json:
              schema:
                $ref: "#/components/schemas/ErrorResponse"
        "429":
          description: "Rate limited or model throttled"
        "500":
          description: "Internal server error"
      x-amazon-apigateway-request-validator: "validate-body"
      x-amazon-apigateway-integration:
        type: "aws_proxy"
        httpMethod: "POST"
        uri: "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/arn:aws:lambda:us-east-1:123456789012:function:manga-fm-proxy/invocations"

  /v1/manga/batch:
    post:
      summary: "Submit batch analysis"
      description: "Submit multiple manga items for async FM analysis"
      operationId: "submitBatchAnalysis"
      tags:
        - Asynchronous
      requestBody:
        required: true
        content:
          application/json:
            schema:
              $ref: "#/components/schemas/BatchAnalysisRequest"
      responses:
        "202":
          description: "Batch accepted"
          content:
            application/json:
              schema:
                $ref: "#/components/schemas/BatchSubmitResponse"
        "400":
          description: "Validation error"
      x-amazon-apigateway-request-validator: "validate-body"
      x-amazon-apigateway-integration:
        type: "aws_proxy"
        httpMethod: "POST"
        uri: "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/arn:aws:lambda:us-east-1:123456789012:function:manga-fm-proxy/invocations"

  /v1/manga/status/{request_id}:
    get:
      summary: "Check async request status"
      description: "Get the processing status of an async FM request"
      operationId: "getRequestStatus"
      tags:
        - Asynchronous
      parameters:
        - name: request_id
          in: path
          required: true
          schema:
            type: string
            pattern: "^[a-f0-9-]{36}$"
      responses:
        "200":
          description: "Status found"
          content:
            application/json:
              schema:
                $ref: "#/components/schemas/RequestStatusResponse"
        "404":
          description: "Request not found"
      x-amazon-apigateway-integration:
        type: "aws_proxy"
        httpMethod: "POST"
        uri: "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/arn:aws:lambda:us-east-1:123456789012:function:manga-fm-proxy/invocations"

components:
  schemas:
    MangaQueryRequest:
      type: object
      required:
        - query
      properties:
        query:
          type: string
          minLength: 1
          maxLength: 4000
          description: "User query (supports Japanese Unicode)"
        model:
          type: string
          enum: ["sonnet", "haiku"]
          default: "haiku"
        max_tokens:
          type: integer
          minimum: 1
          maximum: 4096
          default: 512
        temperature:
          type: number
          minimum: 0.0
          maximum: 1.0
          default: 0.5
        conversation_id:
          type: string
          pattern: "^[a-f0-9-]{36}$"
        language:
          type: string
          enum: ["en", "ja", "auto"]
          default: "auto"

    MangaQueryResponse:
      type: object
      properties:
        response:
          type: string
        model:
          type: string
        usage:
          type: object
          properties:
            input_tokens:
              type: integer
            output_tokens:
              type: integer
        stop_reason:
          type: string

    BatchAnalysisRequest:
      type: object
      required:
        - items
      properties:
        items:
          type: array
          minItems: 1
          maxItems: 100
          items:
            type: object
            required:
              - manga_id
              - analysis_type
            properties:
              manga_id:
                type: string
                pattern: "^MNG-[A-Z0-9]{8}$"
              analysis_type:
                type: string
                enum:
                  - genre_classification
                  - content_summary
                  - similar_series
                  - reading_level
                  - full_enrichment
              additional_context:
                type: string
                maxLength: 1000
        callback_url:
          type: string
          format: uri
          pattern: "^https://"
        priority:
          type: string
          enum: ["high", "normal", "low"]
          default: "normal"

    BatchSubmitResponse:
      type: object
      properties:
        message:
          type: string
        item_count:
          type: integer
        request_ids:
          type: array
          items:
            type: string

    RequestStatusResponse:
      type: object
      properties:
        request_id:
          type: string
        status:
          type: string
          enum: ["queued", "processing", "completed", "failed"]
        created_at:
          type: integer
        completed_at:
          type: integer
        result:
          type: object

    ErrorResponse:
      type: object
      properties:
        error:
          type: string
        details:
          type: array
          items:
            type: object
            properties:
              path:
                type: string
              message:
                type: string

  x-amazon-apigateway-request-validators:
    validate-body:
      validateRequestBody: true
      validateRequestParameters: false
    validate-all:
      validateRequestBody: true
      validateRequestParameters: true

8. VTL Mapping Templates for API Gateway

8.1 Request Mapping Template (Input Transformation)

## Request mapping template: transforms API Gateway request into Lambda input
## Handles Unicode content in manga queries safely
#set($inputRoot = $input.path('$'))
{
    "httpMethod": "$context.httpMethod",
    "resource": "$context.resourcePath",
    "pathParameters": {
#foreach($key in $input.params().path.keySet())
        "$key": "$util.escapeJavaScript($input.params().path.get($key))"#if($foreach.hasNext),#end

#end
    },
    "queryStringParameters": {
#foreach($key in $input.params().querystring.keySet())
        "$key": "$util.escapeJavaScript($input.params().querystring.get($key))"#if($foreach.hasNext),#end

#end
    },
    "headers": {
        "Content-Type": "$input.params().header.get('Content-Type')",
        "X-Api-Key": "$input.params().header.get('X-Api-Key')",
        "X-Request-Id": "$context.requestId",
        "X-Source-Ip": "$context.identity.sourceIp"
    },
    "body": $input.json('$'),
    "requestContext": {
        "requestId": "$context.requestId",
        "stage": "$context.stage",
        "apiId": "$context.apiId",
        "requestTime": "$context.requestTime",
        "requestTimeEpoch": $context.requestTimeEpoch
    }
}

8.2 Response Mapping Template (Output Transformation)

## Response mapping template: transforms Lambda output for API Gateway response
## Adds standard headers and wraps response body
#set($response = $input.path('$'))
#set($statusCode = $response.statusCode)

#if($statusCode == 200)
{
    "response": $response.body.response,
    "metadata": {
        "model": "$response.body.model",
        "usage": {
            "input_tokens": $response.body.usage.input_tokens,
            "output_tokens": $response.body.usage.output_tokens
        },
        "request_id": "$context.requestId",
        "timestamp": "$context.requestTime"
    }
}
#elseif($statusCode == 429)
#set($context.responseOverride.status = 429)
{
    "error": "Rate limited",
    "message": "Too many requests. Please retry after a short delay.",
    "retry_after_seconds": 5
}
#else
{
    "error": $response.body.error,
    "request_id": "$context.requestId"
}
#end

9. Converse API for Multi-Turn with Tool Use

"""
Bedrock Converse API integration for MangaAssist multi-turn conversations
with tool use (function calling) for dynamic data lookups.
"""
import json
import boto3
from botocore.config import Config


class MangaConverseClient:
    """
    Multi-turn conversation client using Bedrock Converse API.
    Supports tool use for live manga catalog lookups and order status checks.
    """

    TOOL_CONFIG = {
        "tools": [
            {
                "toolSpec": {
                    "name": "search_manga_catalog",
                    "description": (
                        "Search the manga catalog by title, author, genre, or keyword. "
                        "Returns matching manga entries with metadata."
                    ),
                    "inputSchema": {
                        "json": {
                            "type": "object",
                            "properties": {
                                "query": {
                                    "type": "string",
                                    "description": "Search query text"
                                },
                                "genre": {
                                    "type": "string",
                                    "description": "Optional genre filter",
                                    "enum": [
                                        "shonen", "seinen", "shojo", "josei",
                                        "kodomo", "isekai", "mecha", "horror"
                                    ]
                                },
                                "max_results": {
                                    "type": "integer",
                                    "description": "Maximum results to return (1-20)",
                                    "default": 5
                                }
                            },
                            "required": ["query"]
                        }
                    }
                }
            },
            {
                "toolSpec": {
                    "name": "get_order_status",
                    "description": "Check the status of a customer's manga order by order ID.",
                    "inputSchema": {
                        "json": {
                            "type": "object",
                            "properties": {
                                "order_id": {
                                    "type": "string",
                                    "description": "The order ID to look up",
                                    "pattern": "^ORD-[A-Z0-9]{8}$"
                                }
                            },
                            "required": ["order_id"]
                        }
                    }
                }
            },
            {
                "toolSpec": {
                    "name": "check_manga_availability",
                    "description": "Check stock availability and pricing for a specific manga.",
                    "inputSchema": {
                        "json": {
                            "type": "object",
                            "properties": {
                                "manga_id": {
                                    "type": "string",
                                    "description": "The manga ID to check"
                                },
                                "volume": {
                                    "type": "integer",
                                    "description": "Specific volume number (optional)"
                                }
                            },
                            "required": ["manga_id"]
                        }
                    }
                }
            }
        ]
    }

    def __init__(self, region: str = "us-east-1"):
        config = Config(
            region_name=region,
            retries={"max_attempts": 3, "mode": "adaptive"},
            read_timeout=60
        )
        self._client = boto3.client("bedrock-runtime", config=config)
        self._model_id = "anthropic.claude-3-sonnet-20240229-v1:0"
        self._system_prompt = [
            {
                "text": (
                    "You are MangaAssist, a knowledgeable assistant for a Japanese "
                    "manga store. Use the available tools to look up real-time "
                    "catalog data, check order statuses, and verify availability. "
                    "Always use tools when the user asks about specific manga, "
                    "orders, or stock. Respond naturally in the user's language."
                )
            }
        ]

    def converse(self, messages: list, tool_handlers: dict = None) -> dict:
        """
        Execute a multi-turn conversation with automatic tool use handling.

        Args:
            messages: Conversation history in Converse API format
            tool_handlers: Dict mapping tool names to handler functions

        Returns:
            Final response with conversation history
        """
        tool_handlers = tool_handlers or {}
        max_tool_rounds = 5  # Prevent infinite tool loops

        for round_num in range(max_tool_rounds):
            response = self._client.converse(
                modelId=self._model_id,
                messages=messages,
                system=self._system_prompt,
                toolConfig=self.TOOL_CONFIG,
                inferenceConfig={
                    "maxTokens": 2048,
                    "temperature": 0.5,
                    "topP": 0.95
                }
            )

            stop_reason = response["stopReason"]
            assistant_message = response["output"]["message"]
            messages.append(assistant_message)

            if stop_reason == "end_turn":
                # Model finished responding — return final text
                return {
                    "messages": messages,
                    "response": self._extract_text(assistant_message),
                    "usage": response["usage"],
                    "stop_reason": stop_reason,
                    "tool_rounds": round_num
                }

            elif stop_reason == "tool_use":
                # Model wants to call tools — execute them and continue
                tool_results = []
                for block in assistant_message["content"]:
                    if block.get("toolUse"):
                        tool_use = block["toolUse"]
                        tool_name = tool_use["name"]
                        tool_input = tool_use["input"]
                        tool_use_id = tool_use["toolUseId"]

                        handler = tool_handlers.get(tool_name)
                        if handler:
                            try:
                                result = handler(tool_input)
                                tool_results.append({
                                    "toolResult": {
                                        "toolUseId": tool_use_id,
                                        "content": [{"json": result}]
                                    }
                                })
                            except Exception as e:
                                tool_results.append({
                                    "toolResult": {
                                        "toolUseId": tool_use_id,
                                        "content": [{"text": f"Error: {str(e)}"}],
                                        "status": "error"
                                    }
                                })
                        else:
                            tool_results.append({
                                "toolResult": {
                                    "toolUseId": tool_use_id,
                                    "content": [{"text": f"Tool {tool_name} not available"}],
                                    "status": "error"
                                }
                            })

                # Append tool results as user message and continue loop
                messages.append({"role": "user", "content": tool_results})

            else:
                # Unexpected stop reason (max_tokens, etc.)
                return {
                    "messages": messages,
                    "response": self._extract_text(assistant_message),
                    "usage": response["usage"],
                    "stop_reason": stop_reason,
                    "tool_rounds": round_num
                }

        return {
            "messages": messages,
            "response": "Maximum tool interaction rounds reached.",
            "usage": {},
            "stop_reason": "max_tool_rounds",
            "tool_rounds": max_tool_rounds
        }

    def _extract_text(self, message: dict) -> str:
        """Extract text content from a Converse API message."""
        texts = []
        for block in message.get("content", []):
            if block.get("text"):
                texts.append(block["text"])
        return "\n".join(texts)

10. Quick Reference — Interaction Pattern Selection

Pattern When to Use Latency Cost MangaAssist Example
InvokeModel sync Single-turn, < 30s response needed 1-10s Per-request Quick manga info lookup
InvokeModelWithResponseStream Chat UI, progressive display TTFT 200-800ms Per-request Real-time conversation
Converse / ConverseStream Multi-turn with tools 1-12s per turn Per-request Catalog search + recommendation
SQS + Lambda async Batch work, cost-tolerant Minutes Per-request Nightly catalog enrichment
SQS + ECS async Long-running, heavy analysis Minutes-hours Per-request Full series analysis
API Gateway REST + validation External partner access Proxy overhead ~50ms Per-request + API GW Partner manga data API
API Gateway WebSocket Persistent real-time chat Connection setup ~200ms Per-message + connection Customer chat sessions
Batch Inference Job Massive offline processing Hours 50% discount Monthly full-catalog reindex

11. IAM Policies for Model Interaction

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "BedrockInvokeModels",
            "Effect": "Allow",
            "Action": [
                "bedrock:InvokeModel",
                "bedrock:InvokeModelWithResponseStream"
            ],
            "Resource": [
                "arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0",
                "arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-3-haiku-20240307-v1:0"
            ]
        },
        {
            "Sid": "BedrockConverse",
            "Effect": "Allow",
            "Action": [
                "bedrock:Converse",
                "bedrock:ConverseStream"
            ],
            "Resource": [
                "arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0"
            ]
        },
        {
            "Sid": "BedrockBatchInference",
            "Effect": "Allow",
            "Action": [
                "bedrock:CreateModelInvocationJob",
                "bedrock:GetModelInvocationJob",
                "bedrock:ListModelInvocationJobs",
                "bedrock:StopModelInvocationJob"
            ],
            "Resource": "*"
        },
        {
            "Sid": "SQSAsyncQueues",
            "Effect": "Allow",
            "Action": [
                "sqs:SendMessage",
                "sqs:ReceiveMessage",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes",
                "sqs:GetQueueUrl"
            ],
            "Resource": [
                "arn:aws:sqs:us-east-1:123456789012:manga-fm-*"
            ]
        }
    ]
}

12. Cost Estimation Model

Daily volume: 1,000,000 messages
Routing split: 80% Haiku, 20% Sonnet
Average tokens per request: 500 input, 300 output

Haiku (800,000 requests/day):
  Input:  800,000 * 500 / 1,000,000 * $0.25  = $100.00
  Output: 800,000 * 300 / 1,000,000 * $1.25   = $300.00
  Haiku daily: $400.00

Sonnet (200,000 requests/day):
  Input:  200,000 * 500 / 1,000,000 * $3.00   = $300.00
  Output: 200,000 * 300 / 1,000,000 * $15.00  = $900.00
  Sonnet daily: $1,200.00

Total FM cost/day:  $1,600.00
Total FM cost/month: ~$48,000.00

Cache hit rate at 15% saves: ~$7,200/month
Batch at 50% discount for nightly jobs: additional savings variable

End of File 1 — Flexible Model Interaction Architecture