LOCAL PREVIEW View on GitHub

Flexible Model Interaction Architecture

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Dimension Value
Certification AWS Certified AI Practitioner — Specialty (AIP-C01)
Task 2.4 — Select and implement FM API integration patterns
Skill 2.4.1 — Flexible Model Interaction
This File 01 — Model Interaction Architecture (sync/async patterns, multi-environment Bedrock client, API Gateway validation)

Skill Scope

Skill 2.4.1 covers the foundational patterns for invoking Foundation Models through Amazon Bedrock's API surface. This includes synchronous InvokeModel for real-time chat, asynchronous invocation via SQS for batch and deferred workloads, and API Gateway request validation to enforce schema compliance before requests reach backend services. For MangaAssist, this means designing a multi-tier interaction model where lightweight queries (greeting, FAQ) use fast synchronous paths while complex operations (manga synopsis generation, bulk catalog enrichment) route through asynchronous queues.


Mind Map

mindmap
  root((Skill 2.4.1<br/>Flexible Model<br/>Interaction))
    Synchronous Patterns
      Bedrock InvokeModel API
      Request/Response lifecycle
      Timeout management (29s API GW)
      Connection pooling
      Multi-region client config
    Asynchronous Patterns
      SQS Standard/FIFO queues
      Dead-letter queues
      Callback notification (SNS/EventBridge)
      Batch processing windows
      Retry with backoff
    API Gateway Validation
      Request body models (JSON Schema)
      Parameter validation
      Header enforcement
      Japanese text encoding checks
      Rate limiting per client
    Multi-Environment Bedrock Client
      Cross-region failover
      Model ID abstraction
      Credential chain management
      Connection reuse
      Inference profile selection
    MangaAssist Integration
      Chat sync path (< 3s)
      Catalog enrichment async path
      WebSocket to sync bridge
      Input sanitization
      Cost-aware model selection

1. Synchronous Interaction Architecture

1.1 End-to-End Request Flow

When a MangaAssist user sends a chat message, the synchronous path must deliver a response within the 3-second SLA. The architecture keeps the critical path short: API Gateway receives the WebSocket frame, the ECS orchestrator assembles the prompt, calls Bedrock InvokeModel, and streams the response back.

sequenceDiagram
    participant User as Manga Reader
    participant APIGW as API Gateway<br/>WebSocket
    participant ECS as ECS Fargate<br/>Orchestrator
    participant Cache as ElastiCache<br/>Redis
    participant Bedrock as Amazon Bedrock<br/>Claude 3
    participant DDB as DynamoDB<br/>Sessions

    User->>APIGW: sendMessage (JP text)
    APIGW->>APIGW: Request validation<br/>(JSON Schema)
    APIGW->>ECS: Route to container
    ECS->>Cache: Check prompt cache
    alt Cache Hit
        Cache-->>ECS: Cached response
        ECS-->>APIGW: Return cached
    else Cache Miss
        ECS->>DDB: Load session context
        DDB-->>ECS: Conversation history
        ECS->>Bedrock: InvokeModel (Claude 3 Sonnet)
        Bedrock-->>ECS: Complete response
        ECS->>Cache: Store response (TTL 300s)
        ECS->>DDB: Update session
    end
    ECS-->>APIGW: JSON response
    APIGW-->>User: WebSocket frame

1.2 Multi-Environment Bedrock Client

The Bedrock client must handle multiple environments (dev, staging, prod), support cross-region failover, and manage connection pooling for high-throughput scenarios.

"""
Multi-environment Bedrock client for MangaAssist.
Handles region failover, connection pooling, and model abstraction.
"""
import os
import time
import logging
from typing import Optional, Dict, Any
from dataclasses import dataclass, field

import boto3
from botocore.config import Config
from botocore.exceptions import ClientError, EndpointConnectionError

logger = logging.getLogger(__name__)


@dataclass
class ModelSpec:
    """Defines a Bedrock model with its operational parameters."""
    model_id: str
    max_tokens: int
    temperature: float
    cost_per_1k_input: float
    cost_per_1k_output: float
    timeout_seconds: int = 25
    supports_japanese: bool = True


# MangaAssist model catalog — keeps model IDs in one place
MODEL_CATALOG: Dict[str, ModelSpec] = {
    "sonnet": ModelSpec(
        model_id="anthropic.claude-3-sonnet-20240229-v1:0",
        max_tokens=4096,
        temperature=0.3,
        cost_per_1k_input=0.003,
        cost_per_1k_output=0.015,
        timeout_seconds=25,
    ),
    "haiku": ModelSpec(
        model_id="anthropic.claude-3-haiku-20240307-v1:0",
        max_tokens=2048,
        temperature=0.2,
        cost_per_1k_input=0.00025,
        cost_per_1k_output=0.00125,
        timeout_seconds=10,
    ),
}


@dataclass
class RegionConfig:
    """Configuration for a single AWS region endpoint."""
    region: str
    priority: int
    is_healthy: bool = True
    last_failure: float = 0.0
    failure_count: int = 0
    cooldown_seconds: float = 60.0


class BedrockClientManager:
    """
    Manages Bedrock runtime clients across multiple regions with
    automatic failover, connection pooling, and health tracking.

    Usage:
        manager = BedrockClientManager(environment="prod")
        response = manager.invoke_model("sonnet", prompt_body)
    """

    def __init__(
        self,
        environment: str = "prod",
        primary_region: str = "us-east-1",
        failover_regions: Optional[list] = None,
    ):
        self.environment = environment
        self.regions = self._build_region_list(primary_region, failover_regions)
        self._clients: Dict[str, Any] = {}
        self._boto_config = Config(
            retries={"max_attempts": 2, "mode": "adaptive"},
            connect_timeout=5,
            read_timeout=30,
            max_pool_connections=25,
        )
        logger.info(
            "BedrockClientManager initialized | env=%s | regions=%s",
            environment,
            [r.region for r in self.regions],
        )

    def _build_region_list(
        self, primary: str, failover: Optional[list]
    ) -> list:
        """Build prioritized region list."""
        regions = [RegionConfig(region=primary, priority=0)]
        for idx, region in enumerate(failover or ["us-west-2", "ap-northeast-1"]):
            regions.append(RegionConfig(region=region, priority=idx + 1))
        return sorted(regions, key=lambda r: r.priority)

    def _get_client(self, region: str):
        """Get or create a Bedrock runtime client for the given region."""
        if region not in self._clients:
            self._clients[region] = boto3.client(
                "bedrock-runtime",
                region_name=region,
                config=self._boto_config,
            )
            logger.info("Created Bedrock client for region=%s", region)
        return self._clients[region]

    def _is_region_available(self, region_config: RegionConfig) -> bool:
        """Check if a region has recovered from failures."""
        if region_config.is_healthy:
            return True
        elapsed = time.time() - region_config.last_failure
        if elapsed >= region_config.cooldown_seconds:
            region_config.is_healthy = True
            region_config.failure_count = 0
            logger.info("Region %s marked healthy after cooldown", region_config.region)
            return True
        return False

    def _mark_region_failed(self, region_config: RegionConfig) -> None:
        """Record a failure for a region."""
        region_config.is_healthy = False
        region_config.last_failure = time.time()
        region_config.failure_count += 1
        # Exponential cooldown: 60s, 120s, 240s, max 600s
        region_config.cooldown_seconds = min(
            60 * (2 ** region_config.failure_count), 600
        )
        logger.warning(
            "Region %s marked unhealthy | failures=%d | cooldown=%.0fs",
            region_config.region,
            region_config.failure_count,
            region_config.cooldown_seconds,
        )

    def invoke_model(
        self,
        model_key: str,
        body: dict,
        override_timeout: Optional[int] = None,
    ) -> dict:
        """
        Invoke a Bedrock model with automatic region failover.

        Args:
            model_key: Key from MODEL_CATALOG (e.g., "sonnet", "haiku")
            body: The request body for the model
            override_timeout: Optional timeout override in seconds

        Returns:
            Parsed response dictionary from the model

        Raises:
            RuntimeError: If all regions are exhausted
        """
        import json

        spec = MODEL_CATALOG[model_key]
        body.setdefault("max_tokens", spec.max_tokens)
        body.setdefault("temperature", spec.temperature)
        body.setdefault("anthropic_version", "bedrock-2023-05-31")

        timeout = override_timeout or spec.timeout_seconds
        errors = []

        for region_config in self.regions:
            if not self._is_region_available(region_config):
                continue

            client = self._get_client(region_config.region)
            try:
                start = time.time()
                response = client.invoke_model(
                    modelId=spec.model_id,
                    contentType="application/json",
                    accept="application/json",
                    body=json.dumps(body),
                )
                latency_ms = (time.time() - start) * 1000
                result = json.loads(response["body"].read())

                logger.info(
                    "Bedrock invocation success | model=%s | region=%s | latency=%.0fms",
                    model_key,
                    region_config.region,
                    latency_ms,
                )
                return result

            except ClientError as exc:
                error_code = exc.response["Error"]["Code"]
                self._mark_region_failed(region_config)
                errors.append(f"{region_config.region}: {error_code}")
                logger.error(
                    "Bedrock ClientError | region=%s | code=%s | msg=%s",
                    region_config.region,
                    error_code,
                    exc.response["Error"]["Message"],
                )
            except EndpointConnectionError as exc:
                self._mark_region_failed(region_config)
                errors.append(f"{region_config.region}: ConnectionError")
                logger.error(
                    "Bedrock connection failed | region=%s | error=%s",
                    region_config.region,
                    str(exc),
                )

        raise RuntimeError(
            f"All Bedrock regions exhausted for model={model_key}. "
            f"Errors: {'; '.join(errors)}"
        )

    def get_healthy_regions(self) -> list:
        """Return list of currently healthy regions."""
        return [r.region for r in self.regions if self._is_region_available(r)]

    def get_cost_estimate(self, model_key: str, input_tokens: int, output_tokens: int) -> float:
        """Estimate cost for a single invocation in USD."""
        spec = MODEL_CATALOG[model_key]
        return (
            (input_tokens / 1000) * spec.cost_per_1k_input
            + (output_tokens / 1000) * spec.cost_per_1k_output
        )

1.3 Request Body Construction

Building the correct request body for Claude 3 on Bedrock requires the Messages API format. MangaAssist must handle Japanese text, conversation context from DynamoDB, and system prompts.

"""
Request body builder for MangaAssist Bedrock invocations.
Handles Japanese text, session context, and system prompts.
"""
import json
import hashlib
from typing import List, Dict, Optional


MANGA_SYSTEM_PROMPT = """You are MangaAssist, a helpful chatbot for a Japanese manga store.
You help customers find manga, answer questions about series and authors,
provide recommendations, and assist with orders. Always respond in the
language the customer uses. For Japanese customers, use polite keigo (敬語)
by default. You have access to the store's catalog and can reference
specific titles, ISBNs, and pricing."""


def build_claude3_body(
    user_message: str,
    conversation_history: Optional[List[Dict]] = None,
    system_prompt: str = MANGA_SYSTEM_PROMPT,
    max_tokens: int = 4096,
    temperature: float = 0.3,
) -> dict:
    """
    Build a Claude 3 Messages API request body.

    Args:
        user_message: Current user input (may be Japanese)
        conversation_history: Prior turns from DynamoDB session
        system_prompt: System instruction for the model
        max_tokens: Maximum generation length
        temperature: Sampling temperature

    Returns:
        Dictionary ready for json.dumps() and Bedrock InvokeModel
    """
    messages = []

    # Restore conversation context (last 10 turns to stay within context window)
    if conversation_history:
        for turn in conversation_history[-10:]:
            messages.append({
                "role": turn["role"],
                "content": turn["content"],
            })

    # Add current user message
    messages.append({
        "role": "user",
        "content": user_message,
    })

    body = {
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": max_tokens,
        "temperature": temperature,
        "system": system_prompt,
        "messages": messages,
    }
    return body


def compute_cache_key(body: dict) -> str:
    """Generate a deterministic cache key for a request body."""
    # Exclude temperature from cache key — same prompt should hit cache
    stable = {
        "system": body.get("system", ""),
        "messages": body.get("messages", []),
        "max_tokens": body.get("max_tokens", 4096),
    }
    payload = json.dumps(stable, sort_keys=True, ensure_ascii=False)
    return hashlib.sha256(payload.encode("utf-8")).hexdigest()[:16]

2. API Gateway Request Validation

2.1 Validation Architecture

API Gateway models enforce schema constraints before the request reaches ECS. This prevents malformed or oversized payloads from consuming compute resources and protects against injection.

flowchart TD
    subgraph "API Gateway Layer"
        A[Incoming Request] --> B{Method Request<br/>Validation}
        B -->|Valid| C[Integration Request]
        B -->|Invalid| D[400 Bad Request<br/>+ error detail]

        subgraph "Validation Rules"
            V1[Body JSON Schema]
            V2[Query Parameter Types]
            V3[Header Presence]
            V4[Content-Length < 64KB]
        end

        B --- V1
        B --- V2
        B --- V3
        B --- V4
    end

    C --> E[ECS Fargate<br/>Orchestrator]
    E --> F{Secondary<br/>Validation}
    F -->|Pass| G[Bedrock Invocation]
    F -->|Fail| H[422 Unprocessable<br/>Entity]

2.2 JSON Schema Model for Chat Messages

{
  "$schema": "http://json-schema.org/draft-04/schema#",
  "title": "MangaAssistChatRequest",
  "type": "object",
  "required": ["action", "data"],
  "properties": {
    "action": {
      "type": "string",
      "enum": ["sendMessage", "getHistory", "clearSession"]
    },
    "data": {
      "type": "object",
      "required": ["message"],
      "properties": {
        "message": {
          "type": "string",
          "minLength": 1,
          "maxLength": 4000,
          "description": "User message text (supports Japanese UTF-8)"
        },
        "sessionId": {
          "type": "string",
          "pattern": "^[a-f0-9]{8}-[a-f0-9]{4}-4[a-f0-9]{3}-[89ab][a-f0-9]{3}-[a-f0-9]{12}$"
        },
        "preferredLanguage": {
          "type": "string",
          "enum": ["ja", "en", "zh"],
          "default": "ja"
        },
        "modelPreference": {
          "type": "string",
          "enum": ["auto", "fast", "quality"],
          "default": "auto"
        }
      }
    },
    "metadata": {
      "type": "object",
      "properties": {
        "clientVersion": {
          "type": "string"
        },
        "timestamp": {
          "type": "integer"
        }
      }
    }
  }
}

2.3 Server-Side Validation in ECS

"""
Server-side request validation for MangaAssist.
Runs in ECS after API Gateway schema validation as a defense-in-depth layer.
"""
import re
import unicodedata
from typing import Tuple, Optional


# Patterns that should never appear in user input
INJECTION_PATTERNS = [
    r"<script\b",
    r"javascript:",
    r"\{\{.*\}\}",           # Template injection
    r"__import__",           # Python code injection
    r"os\.system",
    r"subprocess\.",
]

# Maximum byte size for a single message (after UTF-8 encoding)
MAX_MESSAGE_BYTES = 16_384  # 16 KB


def validate_chat_message(message: str) -> Tuple[bool, Optional[str]]:
    """
    Validate a chat message beyond API Gateway schema checks.

    Returns:
        (is_valid, error_reason) tuple
    """
    # 1. Check byte length (Japanese chars are 3 bytes each in UTF-8)
    byte_len = len(message.encode("utf-8"))
    if byte_len > MAX_MESSAGE_BYTES:
        return False, f"Message exceeds {MAX_MESSAGE_BYTES} bytes (got {byte_len})"

    # 2. Check for null bytes
    if "\x00" in message:
        return False, "Message contains null bytes"

    # 3. Check for injection patterns
    for pattern in INJECTION_PATTERNS:
        if re.search(pattern, message, re.IGNORECASE):
            return False, f"Message contains disallowed pattern: {pattern}"

    # 4. Verify text is renderable (no excessive control characters)
    control_count = sum(
        1 for ch in message
        if unicodedata.category(ch).startswith("C") and ch not in ("\n", "\r", "\t")
    )
    if control_count > len(message) * 0.05:
        return False, f"Message has too many control characters ({control_count})"

    # 5. Check for mixed-script anomalies (optional heuristic)
    has_cjk = bool(re.search(r"[\u4e00-\u9fff\u3040-\u309f\u30a0-\u30ff]", message))
    has_cyrillic = bool(re.search(r"[\u0400-\u04ff]", message))
    if has_cjk and has_cyrillic:
        return False, "Unusual script combination detected"

    return True, None


def sanitize_for_prompt(message: str) -> str:
    """
    Sanitize user input before inclusion in a model prompt.
    Strips control characters but preserves Japanese text.
    """
    # Remove zero-width characters that could hide content
    cleaned = re.sub(r"[\u200b-\u200f\u2028-\u202f\ufeff]", "", message)
    # Normalize Unicode (NFC for Japanese)
    cleaned = unicodedata.normalize("NFC", cleaned)
    # Strip leading/trailing whitespace
    cleaned = cleaned.strip()
    return cleaned


def estimate_token_count(text: str) -> int:
    """
    Rough token estimate for Claude 3.
    Japanese text averages ~1.5 tokens per character.
    English averages ~0.75 tokens per word.
    """
    jp_chars = len(re.findall(r"[\u3000-\u9fff]", text))
    en_words = len(re.findall(r"[a-zA-Z]+", text))
    other_chars = len(text) - jp_chars - sum(len(w) for w in re.findall(r"[a-zA-Z]+", text))

    return int(jp_chars * 1.5 + en_words * 0.75 + other_chars * 0.5)

3. Synchronous Invocation Deep Dive

3.1 Timeout Budget Allocation

The 3-second SLA requires careful budget allocation across the synchronous path.

gantt
    title Sync Path Time Budget (3000ms total)
    dateFormat X
    axisFormat %Lms

    section Network
    API GW → ECS           :a1, 0, 50
    ECS → Bedrock          :a2, 50, 100

    section Processing
    Session load (DynamoDB) :b1, 100, 200
    Prompt construction     :b2, 200, 250
    Cache check (Redis)     :b3, 250, 270

    section Model Inference
    Bedrock InvokeModel     :crit, c1, 270, 2600

    section Response
    Cache store + DDB write :d1, 2600, 2750
    Response serialization  :d2, 2750, 2800
    Network return          :d3, 2800, 3000

3.2 Connection Pooling Strategy

"""
Connection pool configuration for high-throughput Bedrock invocations.
Tuned for 1M messages/day ≈ 12 TPS average, 50 TPS peak.
"""
from botocore.config import Config


def create_production_config() -> Config:
    """
    Production Bedrock client configuration.

    Key parameters:
    - max_pool_connections=50: Supports peak of 50 concurrent requests
    - connect_timeout=3: Fail fast on connection issues
    - read_timeout=28: Just under API Gateway's 29s limit
    - adaptive retries: AWS SDK handles throttle backoff
    """
    return Config(
        region_name="us-east-1",
        retries={
            "max_attempts": 3,
            "mode": "adaptive",
        },
        connect_timeout=3,
        read_timeout=28,
        max_pool_connections=50,
        tcp_keepalive=True,
    )


def create_development_config() -> Config:
    """Development config with verbose logging and short timeouts."""
    return Config(
        region_name="us-east-1",
        retries={
            "max_attempts": 1,
            "mode": "standard",
        },
        connect_timeout=5,
        read_timeout=30,
        max_pool_connections=5,
    )


# ECS task-level singleton — share across request handlers
_CONFIGS = {
    "prod": create_production_config,
    "staging": create_production_config,
    "dev": create_development_config,
}


def get_config(environment: str = None) -> Config:
    """Retrieve configuration for the current environment."""
    env = environment or os.environ.get("MANGA_ENV", "dev")
    factory = _CONFIGS.get(env, create_development_config)
    return factory()

4. Asynchronous Pattern Overview

4.1 When to Use Async

Scenario Pattern Reason
Live chat reply Sync User waiting, 3s SLA
Manga synopsis generation (batch) Async (SQS) No user waiting, cost-optimize with batching
Catalog enrichment (1000+ items) Async (SQS FIFO) Ordered processing, no timeout pressure
Recommendation pre-computation Async (EventBridge) Scheduled, no latency requirement
Moderation check on uploaded image Async (SQS) Can process after upload acknowledgment

4.2 Async Architecture Overview

flowchart LR
    subgraph "Producers"
        P1[Admin Console] -->|Catalog jobs| Q1
        P2[Scheduled EventBridge] -->|Daily enrichment| Q1
        P3[ECS Orchestrator] -->|Deferred tasks| Q2
    end

    subgraph "SQS Queues"
        Q1[manga-enrichment<br/>FIFO Queue]
        Q2[deferred-inference<br/>Standard Queue]
        Q1 --> DLQ1[enrichment-dlq]
        Q2 --> DLQ2[inference-dlq]
    end

    subgraph "Consumers"
        Q1 --> C1[Lambda Consumer<br/>Batch Size 10]
        Q2 --> C2[ECS Consumer<br/>Long Poll]
        C1 --> Bedrock[Amazon Bedrock]
        C2 --> Bedrock
    end

    subgraph "Results"
        C1 --> DDB[(DynamoDB<br/>enriched catalog)]
        C2 --> SNS[SNS Notification]
    end

5. Infrastructure as Code

5.1 CDK Stack for API Gateway Validation

"""
CDK stack snippet — API Gateway with request validation for MangaAssist.
"""
from aws_cdk import (
    Stack,
    aws_apigateway as apigw,
    CfnOutput,
)
from constructs import Construct


class MangaAssistApiStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs):
        super().__init__(scope, construct_id, **kwargs)

        # REST API with request validation
        api = apigw.RestApi(
            self, "MangaAssistApi",
            rest_api_name="manga-assist-api",
            description="MangaAssist chatbot REST API",
            deploy_options=apigw.StageOptions(
                stage_name="prod",
                throttling_rate_limit=100,
                throttling_burst_limit=200,
                logging_level=apigw.MethodLoggingLevel.INFO,
                metrics_enabled=True,
            ),
        )

        # Request validator
        validator = apigw.RequestValidator(
            self, "ChatValidator",
            rest_api=api,
            request_validator_name="chat-body-validator",
            validate_request_body=True,
            validate_request_parameters=True,
        )

        # Chat message model
        chat_model = api.add_model(
            "ChatRequestModel",
            content_type="application/json",
            model_name="ChatRequest",
            schema=apigw.JsonSchema(
                schema=apigw.JsonSchemaVersion.DRAFT4,
                title="ChatRequest",
                type=apigw.JsonSchemaType.OBJECT,
                required=["action", "data"],
                properties={
                    "action": apigw.JsonSchema(
                        type=apigw.JsonSchemaType.STRING,
                        enum=["sendMessage", "getHistory", "clearSession"],
                    ),
                    "data": apigw.JsonSchema(
                        type=apigw.JsonSchemaType.OBJECT,
                        required=["message"],
                        properties={
                            "message": apigw.JsonSchema(
                                type=apigw.JsonSchemaType.STRING,
                                min_length=1,
                                max_length=4000,
                            ),
                        },
                    ),
                },
            ),
        )

        # Chat resource with validation
        chat_resource = api.root.add_resource("chat")
        chat_resource.add_method(
            "POST",
            # Integration would point to ECS via ALB
            apigw.HttpIntegration(
                "http://internal-manga-alb.us-east-1.elb.amazonaws.com/chat",
            ),
            request_models={"application/json": chat_model},
            request_validator=validator,
        )

        CfnOutput(self, "ApiUrl", value=api.url)

6. Multi-Environment Configuration

6.1 Environment-Aware Configuration

flowchart TD
    subgraph "Configuration Hierarchy"
        ENV[Environment Variable<br/>MANGA_ENV] --> LOADER[Config Loader]
        SSM[SSM Parameter Store<br/>/manga-assist/{env}/] --> LOADER
        SECRET[Secrets Manager<br/>manga-assist-{env}] --> LOADER
    end

    LOADER --> APP_CONFIG[Application Config]

    APP_CONFIG --> CLIENT[Bedrock Client<br/>Manager]
    APP_CONFIG --> POOL[Connection Pool<br/>Settings]
    APP_CONFIG --> TIMEOUT[Timeout<br/>Budgets]
    APP_CONFIG --> FEATURE[Feature<br/>Flags]
"""
Environment-aware configuration loader for MangaAssist.
Loads from SSM Parameter Store with local fallback for development.
"""
import os
import json
import logging
from dataclasses import dataclass
from typing import Dict, Any, Optional
from functools import lru_cache

import boto3
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)


@dataclass
class MangaAssistConfig:
    """Centralized configuration for MangaAssist services."""
    environment: str
    primary_region: str
    failover_regions: list
    default_model: str
    sync_timeout_ms: int
    async_queue_url: str
    cache_ttl_seconds: int
    max_conversation_turns: int
    enable_prompt_caching: bool
    enable_fallback_model: bool
    log_level: str

    @classmethod
    def from_dict(cls, data: dict) -> "MangaAssistConfig":
        return cls(
            environment=data.get("environment", "dev"),
            primary_region=data.get("primary_region", "us-east-1"),
            failover_regions=data.get("failover_regions", ["us-west-2"]),
            default_model=data.get("default_model", "sonnet"),
            sync_timeout_ms=data.get("sync_timeout_ms", 3000),
            async_queue_url=data.get("async_queue_url", ""),
            cache_ttl_seconds=data.get("cache_ttl_seconds", 300),
            max_conversation_turns=data.get("max_conversation_turns", 10),
            enable_prompt_caching=data.get("enable_prompt_caching", True),
            enable_fallback_model=data.get("enable_fallback_model", True),
            log_level=data.get("log_level", "INFO"),
        )


@lru_cache(maxsize=1)
def load_config(environment: Optional[str] = None) -> MangaAssistConfig:
    """
    Load configuration from SSM Parameter Store.
    Falls back to environment variables for local development.
    """
    env = environment or os.environ.get("MANGA_ENV", "dev")

    if env == "dev" and not os.environ.get("AWS_DEFAULT_REGION"):
        logger.info("Loading local development config")
        return MangaAssistConfig.from_dict({"environment": "dev"})

    try:
        ssm = boto3.client("ssm")
        param_path = f"/manga-assist/{env}/"

        paginator = ssm.get_paginator("get_parameters_by_path")
        params = {}
        for page in paginator.paginate(
            Path=param_path,
            Recursive=True,
            WithDecryption=True,
        ):
            for param in page["Parameters"]:
                key = param["Name"].replace(param_path, "")
                value = param["Value"]
                # Auto-parse JSON values
                try:
                    value = json.loads(value)
                except (json.JSONDecodeError, TypeError):
                    pass
                params[key] = value

        logger.info("Loaded %d config params from SSM for env=%s", len(params), env)
        return MangaAssistConfig.from_dict(params)

    except ClientError as exc:
        logger.error("Failed to load SSM config: %s", exc)
        raise

Key Takeaways

# Takeaway
1 Bedrock InvokeModel is synchronous and blocks until the full response is ready — budget timeout carefully (25s model + 3-4s overhead < 29s API Gateway limit).
2 Multi-region failover with exponential cooldown prevents cascading failures — track per-region health and automatically route around outages.
3 API Gateway request validation (JSON Schema models) rejects malformed payloads before they reach compute, saving ECS capacity and reducing attack surface.
4 Connection pooling (50 pool connections for 50 TPS peak) avoids TCP handshake overhead — reuse the Bedrock client across requests in the ECS task.
5 Japanese text handling requires NFC normalization, UTF-8 byte-length checks (3 bytes per char), and CJK-aware token estimation (~1.5 tokens/char).
6 Cost awareness belongs in the client layer — MODEL_CATALOG encodes per-token pricing so the orchestrator can make routing decisions.
7 Defense in depth: API Gateway schema validation + ECS server-side validation + prompt sanitization — each layer catches what the prior missed.
8 Environment configuration via SSM Parameter Store with lru_cache avoids repeated API calls while supporting per-environment tuning.