Flexible Model Interaction Architecture
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Dimension | Value |
|---|---|
| Certification | AWS Certified AI Practitioner — Specialty (AIP-C01) |
| Task | 2.4 — Select and implement FM API integration patterns |
| Skill | 2.4.1 — Flexible Model Interaction |
| This File | 01 — Model Interaction Architecture (sync/async patterns, multi-environment Bedrock client, API Gateway validation) |
Skill Scope
Skill 2.4.1 covers the foundational patterns for invoking Foundation Models through Amazon Bedrock's API surface. This includes synchronous InvokeModel for real-time chat, asynchronous invocation via SQS for batch and deferred workloads, and API Gateway request validation to enforce schema compliance before requests reach backend services. For MangaAssist, this means designing a multi-tier interaction model where lightweight queries (greeting, FAQ) use fast synchronous paths while complex operations (manga synopsis generation, bulk catalog enrichment) route through asynchronous queues.
Mind Map
mindmap
root((Skill 2.4.1<br/>Flexible Model<br/>Interaction))
Synchronous Patterns
Bedrock InvokeModel API
Request/Response lifecycle
Timeout management (29s API GW)
Connection pooling
Multi-region client config
Asynchronous Patterns
SQS Standard/FIFO queues
Dead-letter queues
Callback notification (SNS/EventBridge)
Batch processing windows
Retry with backoff
API Gateway Validation
Request body models (JSON Schema)
Parameter validation
Header enforcement
Japanese text encoding checks
Rate limiting per client
Multi-Environment Bedrock Client
Cross-region failover
Model ID abstraction
Credential chain management
Connection reuse
Inference profile selection
MangaAssist Integration
Chat sync path (< 3s)
Catalog enrichment async path
WebSocket to sync bridge
Input sanitization
Cost-aware model selection
1. Synchronous Interaction Architecture
1.1 End-to-End Request Flow
When a MangaAssist user sends a chat message, the synchronous path must deliver a response within the 3-second SLA. The architecture keeps the critical path short: API Gateway receives the WebSocket frame, the ECS orchestrator assembles the prompt, calls Bedrock InvokeModel, and streams the response back.
sequenceDiagram
participant User as Manga Reader
participant APIGW as API Gateway<br/>WebSocket
participant ECS as ECS Fargate<br/>Orchestrator
participant Cache as ElastiCache<br/>Redis
participant Bedrock as Amazon Bedrock<br/>Claude 3
participant DDB as DynamoDB<br/>Sessions
User->>APIGW: sendMessage (JP text)
APIGW->>APIGW: Request validation<br/>(JSON Schema)
APIGW->>ECS: Route to container
ECS->>Cache: Check prompt cache
alt Cache Hit
Cache-->>ECS: Cached response
ECS-->>APIGW: Return cached
else Cache Miss
ECS->>DDB: Load session context
DDB-->>ECS: Conversation history
ECS->>Bedrock: InvokeModel (Claude 3 Sonnet)
Bedrock-->>ECS: Complete response
ECS->>Cache: Store response (TTL 300s)
ECS->>DDB: Update session
end
ECS-->>APIGW: JSON response
APIGW-->>User: WebSocket frame
1.2 Multi-Environment Bedrock Client
The Bedrock client must handle multiple environments (dev, staging, prod), support cross-region failover, and manage connection pooling for high-throughput scenarios.
"""
Multi-environment Bedrock client for MangaAssist.
Handles region failover, connection pooling, and model abstraction.
"""
import os
import time
import logging
from typing import Optional, Dict, Any
from dataclasses import dataclass, field
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError, EndpointConnectionError
logger = logging.getLogger(__name__)
@dataclass
class ModelSpec:
"""Defines a Bedrock model with its operational parameters."""
model_id: str
max_tokens: int
temperature: float
cost_per_1k_input: float
cost_per_1k_output: float
timeout_seconds: int = 25
supports_japanese: bool = True
# MangaAssist model catalog — keeps model IDs in one place
MODEL_CATALOG: Dict[str, ModelSpec] = {
"sonnet": ModelSpec(
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
max_tokens=4096,
temperature=0.3,
cost_per_1k_input=0.003,
cost_per_1k_output=0.015,
timeout_seconds=25,
),
"haiku": ModelSpec(
model_id="anthropic.claude-3-haiku-20240307-v1:0",
max_tokens=2048,
temperature=0.2,
cost_per_1k_input=0.00025,
cost_per_1k_output=0.00125,
timeout_seconds=10,
),
}
@dataclass
class RegionConfig:
"""Configuration for a single AWS region endpoint."""
region: str
priority: int
is_healthy: bool = True
last_failure: float = 0.0
failure_count: int = 0
cooldown_seconds: float = 60.0
class BedrockClientManager:
"""
Manages Bedrock runtime clients across multiple regions with
automatic failover, connection pooling, and health tracking.
Usage:
manager = BedrockClientManager(environment="prod")
response = manager.invoke_model("sonnet", prompt_body)
"""
def __init__(
self,
environment: str = "prod",
primary_region: str = "us-east-1",
failover_regions: Optional[list] = None,
):
self.environment = environment
self.regions = self._build_region_list(primary_region, failover_regions)
self._clients: Dict[str, Any] = {}
self._boto_config = Config(
retries={"max_attempts": 2, "mode": "adaptive"},
connect_timeout=5,
read_timeout=30,
max_pool_connections=25,
)
logger.info(
"BedrockClientManager initialized | env=%s | regions=%s",
environment,
[r.region for r in self.regions],
)
def _build_region_list(
self, primary: str, failover: Optional[list]
) -> list:
"""Build prioritized region list."""
regions = [RegionConfig(region=primary, priority=0)]
for idx, region in enumerate(failover or ["us-west-2", "ap-northeast-1"]):
regions.append(RegionConfig(region=region, priority=idx + 1))
return sorted(regions, key=lambda r: r.priority)
def _get_client(self, region: str):
"""Get or create a Bedrock runtime client for the given region."""
if region not in self._clients:
self._clients[region] = boto3.client(
"bedrock-runtime",
region_name=region,
config=self._boto_config,
)
logger.info("Created Bedrock client for region=%s", region)
return self._clients[region]
def _is_region_available(self, region_config: RegionConfig) -> bool:
"""Check if a region has recovered from failures."""
if region_config.is_healthy:
return True
elapsed = time.time() - region_config.last_failure
if elapsed >= region_config.cooldown_seconds:
region_config.is_healthy = True
region_config.failure_count = 0
logger.info("Region %s marked healthy after cooldown", region_config.region)
return True
return False
def _mark_region_failed(self, region_config: RegionConfig) -> None:
"""Record a failure for a region."""
region_config.is_healthy = False
region_config.last_failure = time.time()
region_config.failure_count += 1
# Exponential cooldown: 60s, 120s, 240s, max 600s
region_config.cooldown_seconds = min(
60 * (2 ** region_config.failure_count), 600
)
logger.warning(
"Region %s marked unhealthy | failures=%d | cooldown=%.0fs",
region_config.region,
region_config.failure_count,
region_config.cooldown_seconds,
)
def invoke_model(
self,
model_key: str,
body: dict,
override_timeout: Optional[int] = None,
) -> dict:
"""
Invoke a Bedrock model with automatic region failover.
Args:
model_key: Key from MODEL_CATALOG (e.g., "sonnet", "haiku")
body: The request body for the model
override_timeout: Optional timeout override in seconds
Returns:
Parsed response dictionary from the model
Raises:
RuntimeError: If all regions are exhausted
"""
import json
spec = MODEL_CATALOG[model_key]
body.setdefault("max_tokens", spec.max_tokens)
body.setdefault("temperature", spec.temperature)
body.setdefault("anthropic_version", "bedrock-2023-05-31")
timeout = override_timeout or spec.timeout_seconds
errors = []
for region_config in self.regions:
if not self._is_region_available(region_config):
continue
client = self._get_client(region_config.region)
try:
start = time.time()
response = client.invoke_model(
modelId=spec.model_id,
contentType="application/json",
accept="application/json",
body=json.dumps(body),
)
latency_ms = (time.time() - start) * 1000
result = json.loads(response["body"].read())
logger.info(
"Bedrock invocation success | model=%s | region=%s | latency=%.0fms",
model_key,
region_config.region,
latency_ms,
)
return result
except ClientError as exc:
error_code = exc.response["Error"]["Code"]
self._mark_region_failed(region_config)
errors.append(f"{region_config.region}: {error_code}")
logger.error(
"Bedrock ClientError | region=%s | code=%s | msg=%s",
region_config.region,
error_code,
exc.response["Error"]["Message"],
)
except EndpointConnectionError as exc:
self._mark_region_failed(region_config)
errors.append(f"{region_config.region}: ConnectionError")
logger.error(
"Bedrock connection failed | region=%s | error=%s",
region_config.region,
str(exc),
)
raise RuntimeError(
f"All Bedrock regions exhausted for model={model_key}. "
f"Errors: {'; '.join(errors)}"
)
def get_healthy_regions(self) -> list:
"""Return list of currently healthy regions."""
return [r.region for r in self.regions if self._is_region_available(r)]
def get_cost_estimate(self, model_key: str, input_tokens: int, output_tokens: int) -> float:
"""Estimate cost for a single invocation in USD."""
spec = MODEL_CATALOG[model_key]
return (
(input_tokens / 1000) * spec.cost_per_1k_input
+ (output_tokens / 1000) * spec.cost_per_1k_output
)
1.3 Request Body Construction
Building the correct request body for Claude 3 on Bedrock requires the Messages API format. MangaAssist must handle Japanese text, conversation context from DynamoDB, and system prompts.
"""
Request body builder for MangaAssist Bedrock invocations.
Handles Japanese text, session context, and system prompts.
"""
import json
import hashlib
from typing import List, Dict, Optional
MANGA_SYSTEM_PROMPT = """You are MangaAssist, a helpful chatbot for a Japanese manga store.
You help customers find manga, answer questions about series and authors,
provide recommendations, and assist with orders. Always respond in the
language the customer uses. For Japanese customers, use polite keigo (敬語)
by default. You have access to the store's catalog and can reference
specific titles, ISBNs, and pricing."""
def build_claude3_body(
user_message: str,
conversation_history: Optional[List[Dict]] = None,
system_prompt: str = MANGA_SYSTEM_PROMPT,
max_tokens: int = 4096,
temperature: float = 0.3,
) -> dict:
"""
Build a Claude 3 Messages API request body.
Args:
user_message: Current user input (may be Japanese)
conversation_history: Prior turns from DynamoDB session
system_prompt: System instruction for the model
max_tokens: Maximum generation length
temperature: Sampling temperature
Returns:
Dictionary ready for json.dumps() and Bedrock InvokeModel
"""
messages = []
# Restore conversation context (last 10 turns to stay within context window)
if conversation_history:
for turn in conversation_history[-10:]:
messages.append({
"role": turn["role"],
"content": turn["content"],
})
# Add current user message
messages.append({
"role": "user",
"content": user_message,
})
body = {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": max_tokens,
"temperature": temperature,
"system": system_prompt,
"messages": messages,
}
return body
def compute_cache_key(body: dict) -> str:
"""Generate a deterministic cache key for a request body."""
# Exclude temperature from cache key — same prompt should hit cache
stable = {
"system": body.get("system", ""),
"messages": body.get("messages", []),
"max_tokens": body.get("max_tokens", 4096),
}
payload = json.dumps(stable, sort_keys=True, ensure_ascii=False)
return hashlib.sha256(payload.encode("utf-8")).hexdigest()[:16]
2. API Gateway Request Validation
2.1 Validation Architecture
API Gateway models enforce schema constraints before the request reaches ECS. This prevents malformed or oversized payloads from consuming compute resources and protects against injection.
flowchart TD
subgraph "API Gateway Layer"
A[Incoming Request] --> B{Method Request<br/>Validation}
B -->|Valid| C[Integration Request]
B -->|Invalid| D[400 Bad Request<br/>+ error detail]
subgraph "Validation Rules"
V1[Body JSON Schema]
V2[Query Parameter Types]
V3[Header Presence]
V4[Content-Length < 64KB]
end
B --- V1
B --- V2
B --- V3
B --- V4
end
C --> E[ECS Fargate<br/>Orchestrator]
E --> F{Secondary<br/>Validation}
F -->|Pass| G[Bedrock Invocation]
F -->|Fail| H[422 Unprocessable<br/>Entity]
2.2 JSON Schema Model for Chat Messages
{
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "MangaAssistChatRequest",
"type": "object",
"required": ["action", "data"],
"properties": {
"action": {
"type": "string",
"enum": ["sendMessage", "getHistory", "clearSession"]
},
"data": {
"type": "object",
"required": ["message"],
"properties": {
"message": {
"type": "string",
"minLength": 1,
"maxLength": 4000,
"description": "User message text (supports Japanese UTF-8)"
},
"sessionId": {
"type": "string",
"pattern": "^[a-f0-9]{8}-[a-f0-9]{4}-4[a-f0-9]{3}-[89ab][a-f0-9]{3}-[a-f0-9]{12}$"
},
"preferredLanguage": {
"type": "string",
"enum": ["ja", "en", "zh"],
"default": "ja"
},
"modelPreference": {
"type": "string",
"enum": ["auto", "fast", "quality"],
"default": "auto"
}
}
},
"metadata": {
"type": "object",
"properties": {
"clientVersion": {
"type": "string"
},
"timestamp": {
"type": "integer"
}
}
}
}
}
2.3 Server-Side Validation in ECS
"""
Server-side request validation for MangaAssist.
Runs in ECS after API Gateway schema validation as a defense-in-depth layer.
"""
import re
import unicodedata
from typing import Tuple, Optional
# Patterns that should never appear in user input
INJECTION_PATTERNS = [
r"<script\b",
r"javascript:",
r"\{\{.*\}\}", # Template injection
r"__import__", # Python code injection
r"os\.system",
r"subprocess\.",
]
# Maximum byte size for a single message (after UTF-8 encoding)
MAX_MESSAGE_BYTES = 16_384 # 16 KB
def validate_chat_message(message: str) -> Tuple[bool, Optional[str]]:
"""
Validate a chat message beyond API Gateway schema checks.
Returns:
(is_valid, error_reason) tuple
"""
# 1. Check byte length (Japanese chars are 3 bytes each in UTF-8)
byte_len = len(message.encode("utf-8"))
if byte_len > MAX_MESSAGE_BYTES:
return False, f"Message exceeds {MAX_MESSAGE_BYTES} bytes (got {byte_len})"
# 2. Check for null bytes
if "\x00" in message:
return False, "Message contains null bytes"
# 3. Check for injection patterns
for pattern in INJECTION_PATTERNS:
if re.search(pattern, message, re.IGNORECASE):
return False, f"Message contains disallowed pattern: {pattern}"
# 4. Verify text is renderable (no excessive control characters)
control_count = sum(
1 for ch in message
if unicodedata.category(ch).startswith("C") and ch not in ("\n", "\r", "\t")
)
if control_count > len(message) * 0.05:
return False, f"Message has too many control characters ({control_count})"
# 5. Check for mixed-script anomalies (optional heuristic)
has_cjk = bool(re.search(r"[\u4e00-\u9fff\u3040-\u309f\u30a0-\u30ff]", message))
has_cyrillic = bool(re.search(r"[\u0400-\u04ff]", message))
if has_cjk and has_cyrillic:
return False, "Unusual script combination detected"
return True, None
def sanitize_for_prompt(message: str) -> str:
"""
Sanitize user input before inclusion in a model prompt.
Strips control characters but preserves Japanese text.
"""
# Remove zero-width characters that could hide content
cleaned = re.sub(r"[\u200b-\u200f\u2028-\u202f\ufeff]", "", message)
# Normalize Unicode (NFC for Japanese)
cleaned = unicodedata.normalize("NFC", cleaned)
# Strip leading/trailing whitespace
cleaned = cleaned.strip()
return cleaned
def estimate_token_count(text: str) -> int:
"""
Rough token estimate for Claude 3.
Japanese text averages ~1.5 tokens per character.
English averages ~0.75 tokens per word.
"""
jp_chars = len(re.findall(r"[\u3000-\u9fff]", text))
en_words = len(re.findall(r"[a-zA-Z]+", text))
other_chars = len(text) - jp_chars - sum(len(w) for w in re.findall(r"[a-zA-Z]+", text))
return int(jp_chars * 1.5 + en_words * 0.75 + other_chars * 0.5)
3. Synchronous Invocation Deep Dive
3.1 Timeout Budget Allocation
The 3-second SLA requires careful budget allocation across the synchronous path.
gantt
title Sync Path Time Budget (3000ms total)
dateFormat X
axisFormat %Lms
section Network
API GW → ECS :a1, 0, 50
ECS → Bedrock :a2, 50, 100
section Processing
Session load (DynamoDB) :b1, 100, 200
Prompt construction :b2, 200, 250
Cache check (Redis) :b3, 250, 270
section Model Inference
Bedrock InvokeModel :crit, c1, 270, 2600
section Response
Cache store + DDB write :d1, 2600, 2750
Response serialization :d2, 2750, 2800
Network return :d3, 2800, 3000
3.2 Connection Pooling Strategy
"""
Connection pool configuration for high-throughput Bedrock invocations.
Tuned for 1M messages/day ≈ 12 TPS average, 50 TPS peak.
"""
from botocore.config import Config
def create_production_config() -> Config:
"""
Production Bedrock client configuration.
Key parameters:
- max_pool_connections=50: Supports peak of 50 concurrent requests
- connect_timeout=3: Fail fast on connection issues
- read_timeout=28: Just under API Gateway's 29s limit
- adaptive retries: AWS SDK handles throttle backoff
"""
return Config(
region_name="us-east-1",
retries={
"max_attempts": 3,
"mode": "adaptive",
},
connect_timeout=3,
read_timeout=28,
max_pool_connections=50,
tcp_keepalive=True,
)
def create_development_config() -> Config:
"""Development config with verbose logging and short timeouts."""
return Config(
region_name="us-east-1",
retries={
"max_attempts": 1,
"mode": "standard",
},
connect_timeout=5,
read_timeout=30,
max_pool_connections=5,
)
# ECS task-level singleton — share across request handlers
_CONFIGS = {
"prod": create_production_config,
"staging": create_production_config,
"dev": create_development_config,
}
def get_config(environment: str = None) -> Config:
"""Retrieve configuration for the current environment."""
env = environment or os.environ.get("MANGA_ENV", "dev")
factory = _CONFIGS.get(env, create_development_config)
return factory()
4. Asynchronous Pattern Overview
4.1 When to Use Async
| Scenario | Pattern | Reason |
|---|---|---|
| Live chat reply | Sync | User waiting, 3s SLA |
| Manga synopsis generation (batch) | Async (SQS) | No user waiting, cost-optimize with batching |
| Catalog enrichment (1000+ items) | Async (SQS FIFO) | Ordered processing, no timeout pressure |
| Recommendation pre-computation | Async (EventBridge) | Scheduled, no latency requirement |
| Moderation check on uploaded image | Async (SQS) | Can process after upload acknowledgment |
4.2 Async Architecture Overview
flowchart LR
subgraph "Producers"
P1[Admin Console] -->|Catalog jobs| Q1
P2[Scheduled EventBridge] -->|Daily enrichment| Q1
P3[ECS Orchestrator] -->|Deferred tasks| Q2
end
subgraph "SQS Queues"
Q1[manga-enrichment<br/>FIFO Queue]
Q2[deferred-inference<br/>Standard Queue]
Q1 --> DLQ1[enrichment-dlq]
Q2 --> DLQ2[inference-dlq]
end
subgraph "Consumers"
Q1 --> C1[Lambda Consumer<br/>Batch Size 10]
Q2 --> C2[ECS Consumer<br/>Long Poll]
C1 --> Bedrock[Amazon Bedrock]
C2 --> Bedrock
end
subgraph "Results"
C1 --> DDB[(DynamoDB<br/>enriched catalog)]
C2 --> SNS[SNS Notification]
end
5. Infrastructure as Code
5.1 CDK Stack for API Gateway Validation
"""
CDK stack snippet — API Gateway with request validation for MangaAssist.
"""
from aws_cdk import (
Stack,
aws_apigateway as apigw,
CfnOutput,
)
from constructs import Construct
class MangaAssistApiStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs):
super().__init__(scope, construct_id, **kwargs)
# REST API with request validation
api = apigw.RestApi(
self, "MangaAssistApi",
rest_api_name="manga-assist-api",
description="MangaAssist chatbot REST API",
deploy_options=apigw.StageOptions(
stage_name="prod",
throttling_rate_limit=100,
throttling_burst_limit=200,
logging_level=apigw.MethodLoggingLevel.INFO,
metrics_enabled=True,
),
)
# Request validator
validator = apigw.RequestValidator(
self, "ChatValidator",
rest_api=api,
request_validator_name="chat-body-validator",
validate_request_body=True,
validate_request_parameters=True,
)
# Chat message model
chat_model = api.add_model(
"ChatRequestModel",
content_type="application/json",
model_name="ChatRequest",
schema=apigw.JsonSchema(
schema=apigw.JsonSchemaVersion.DRAFT4,
title="ChatRequest",
type=apigw.JsonSchemaType.OBJECT,
required=["action", "data"],
properties={
"action": apigw.JsonSchema(
type=apigw.JsonSchemaType.STRING,
enum=["sendMessage", "getHistory", "clearSession"],
),
"data": apigw.JsonSchema(
type=apigw.JsonSchemaType.OBJECT,
required=["message"],
properties={
"message": apigw.JsonSchema(
type=apigw.JsonSchemaType.STRING,
min_length=1,
max_length=4000,
),
},
),
},
),
)
# Chat resource with validation
chat_resource = api.root.add_resource("chat")
chat_resource.add_method(
"POST",
# Integration would point to ECS via ALB
apigw.HttpIntegration(
"http://internal-manga-alb.us-east-1.elb.amazonaws.com/chat",
),
request_models={"application/json": chat_model},
request_validator=validator,
)
CfnOutput(self, "ApiUrl", value=api.url)
6. Multi-Environment Configuration
6.1 Environment-Aware Configuration
flowchart TD
subgraph "Configuration Hierarchy"
ENV[Environment Variable<br/>MANGA_ENV] --> LOADER[Config Loader]
SSM[SSM Parameter Store<br/>/manga-assist/{env}/] --> LOADER
SECRET[Secrets Manager<br/>manga-assist-{env}] --> LOADER
end
LOADER --> APP_CONFIG[Application Config]
APP_CONFIG --> CLIENT[Bedrock Client<br/>Manager]
APP_CONFIG --> POOL[Connection Pool<br/>Settings]
APP_CONFIG --> TIMEOUT[Timeout<br/>Budgets]
APP_CONFIG --> FEATURE[Feature<br/>Flags]
"""
Environment-aware configuration loader for MangaAssist.
Loads from SSM Parameter Store with local fallback for development.
"""
import os
import json
import logging
from dataclasses import dataclass
from typing import Dict, Any, Optional
from functools import lru_cache
import boto3
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
@dataclass
class MangaAssistConfig:
"""Centralized configuration for MangaAssist services."""
environment: str
primary_region: str
failover_regions: list
default_model: str
sync_timeout_ms: int
async_queue_url: str
cache_ttl_seconds: int
max_conversation_turns: int
enable_prompt_caching: bool
enable_fallback_model: bool
log_level: str
@classmethod
def from_dict(cls, data: dict) -> "MangaAssistConfig":
return cls(
environment=data.get("environment", "dev"),
primary_region=data.get("primary_region", "us-east-1"),
failover_regions=data.get("failover_regions", ["us-west-2"]),
default_model=data.get("default_model", "sonnet"),
sync_timeout_ms=data.get("sync_timeout_ms", 3000),
async_queue_url=data.get("async_queue_url", ""),
cache_ttl_seconds=data.get("cache_ttl_seconds", 300),
max_conversation_turns=data.get("max_conversation_turns", 10),
enable_prompt_caching=data.get("enable_prompt_caching", True),
enable_fallback_model=data.get("enable_fallback_model", True),
log_level=data.get("log_level", "INFO"),
)
@lru_cache(maxsize=1)
def load_config(environment: Optional[str] = None) -> MangaAssistConfig:
"""
Load configuration from SSM Parameter Store.
Falls back to environment variables for local development.
"""
env = environment or os.environ.get("MANGA_ENV", "dev")
if env == "dev" and not os.environ.get("AWS_DEFAULT_REGION"):
logger.info("Loading local development config")
return MangaAssistConfig.from_dict({"environment": "dev"})
try:
ssm = boto3.client("ssm")
param_path = f"/manga-assist/{env}/"
paginator = ssm.get_paginator("get_parameters_by_path")
params = {}
for page in paginator.paginate(
Path=param_path,
Recursive=True,
WithDecryption=True,
):
for param in page["Parameters"]:
key = param["Name"].replace(param_path, "")
value = param["Value"]
# Auto-parse JSON values
try:
value = json.loads(value)
except (json.JSONDecodeError, TypeError):
pass
params[key] = value
logger.info("Loaded %d config params from SSM for env=%s", len(params), env)
return MangaAssistConfig.from_dict(params)
except ClientError as exc:
logger.error("Failed to load SSM config: %s", exc)
raise
Key Takeaways
| # | Takeaway |
|---|---|
| 1 | Bedrock InvokeModel is synchronous and blocks until the full response is ready — budget timeout carefully (25s model + 3-4s overhead < 29s API Gateway limit). |
| 2 | Multi-region failover with exponential cooldown prevents cascading failures — track per-region health and automatically route around outages. |
| 3 | API Gateway request validation (JSON Schema models) rejects malformed payloads before they reach compute, saving ECS capacity and reducing attack surface. |
| 4 | Connection pooling (50 pool connections for 50 TPS peak) avoids TCP handshake overhead — reuse the Bedrock client across requests in the ECS task. |
| 5 | Japanese text handling requires NFC normalization, UTF-8 byte-length checks (3 bytes per char), and CJK-aware token estimation (~1.5 tokens/char). |
| 6 | Cost awareness belongs in the client layer — MODEL_CATALOG encodes per-token pricing so the orchestrator can make routing decisions. |
| 7 | Defense in depth: API Gateway schema validation + ECS server-side validation + prompt sanitization — each layer catches what the prior missed. |
| 8 | Environment configuration via SSM Parameter Store with lru_cache avoids repeated API calls while supporting per-environment tuning. |