Flexible Model Interaction Architecture
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Attribute | Value |
|---|---|
| Certification | AWS Certified AI Practitioner (AIP-C01) |
| Domain | 2 — Implementation and Integration of Foundation Models |
| Task | 2.4 — Design model interaction systems for generative AI applications |
| Skill | 2.4.1 — Create flexible model interaction systems (Bedrock APIs for synchronous requests, language-specific SDKs and SQS for asynchronous processing, API Gateway for custom API clients with request validation) |
1. Interaction Patterns Mindmap
mindmap
root((Flexible Model<br/>Interaction))
Synchronous
InvokeModel API
Single-turn Q&A
Real-time chat
Inline recommendations
InvokeModelWithResponseStream
Streaming chat
Progressive rendering
Time-to-first-token optimization
Converse API
Multi-turn conversations
Tool use / function calling
System prompts
Asynchronous
SQS + Lambda
Batch manga analysis
Bulk content moderation
Scheduled summarization
SQS + ECS Tasks
Long-running generation
Multi-model pipelines
Heavy image analysis
EventBridge + Step Functions
Orchestrated workflows
Retry with backoff
Fan-out processing
Batch
Bedrock Batch Inference
Catalog enrichment
Nightly re-ranking
Bulk translation JP→EN
S3 Input/Output
JSONL payloads
Result collection
Cost optimization at 50% discount
Custom APIs
API Gateway REST
Request validation
Rate limiting
API key management
API Gateway WebSocket
Persistent connections
Server-push responses
Real-time streaming
API Gateway HTTP
Low-latency proxy
JWT authorization
Minimal overhead
2. Architecture Flowchart — MangaAssist Sync/Async Request Flows
flowchart TB
subgraph Clients["Client Layer"]
WEB["Web App<br/>(React SPA)"]
MOB["Mobile App<br/>(React Native)"]
PART["Partner API<br/>(REST Clients)"]
end
subgraph Gateway["API Gateway Layer"]
WS["WebSocket API<br/>(wss://manga.api)"]
REST["REST API<br/>(https://manga.api/v1)"]
VALID["Request Validator<br/>(JSON Schema)"]
MAPPING["VTL Mapping<br/>Templates"]
end
subgraph Sync["Synchronous Path"]
ECS["ECS Fargate<br/>Orchestrator"]
CACHE["ElastiCache Redis<br/>Response Cache"]
BEDROCK_S["Bedrock InvokeModel<br/>(Claude 3 Haiku)"]
BEDROCK_STREAM["Bedrock InvokeModelWithResponseStream<br/>(Claude 3 Sonnet)"]
end
subgraph Async["Asynchronous Path"]
SQS_STD["SQS Standard Queue<br/>(manga-fm-requests)"]
SQS_FIFO["SQS FIFO Queue<br/>(manga-fm-ordered.fifo)"]
DLQ["Dead Letter Queue<br/>(manga-fm-dlq)"]
LAMBDA_P["Lambda Processor<br/>(batch-fm-worker)"]
ECS_TASK["ECS Fargate Task<br/>(heavy-analysis)"]
SNS["SNS Topic<br/>(fm-results)"]
end
subgraph Data["Data Layer"]
DDB["DynamoDB<br/>Sessions & Products"]
OS["OpenSearch Serverless<br/>Vector Store"]
S3["S3 Bucket<br/>Batch Results"]
end
WEB -->|"WebSocket connect"| WS
MOB -->|"WebSocket connect"| WS
PART -->|"REST POST"| REST
REST --> VALID
VALID --> MAPPING
MAPPING -->|"Sync request"| ECS
WS -->|"Sync message"| ECS
MAPPING -->|"Async request"| SQS_STD
ECS --> CACHE
CACHE -->|"Cache miss"| BEDROCK_S
ECS -->|"Streaming"| BEDROCK_STREAM
ECS --> DDB
ECS --> OS
SQS_STD --> LAMBDA_P
SQS_FIFO --> ECS_TASK
SQS_STD -->|"3 failures"| DLQ
LAMBDA_P --> BEDROCK_S
ECS_TASK --> BEDROCK_STREAM
LAMBDA_P --> S3
ECS_TASK --> S3
LAMBDA_P --> SNS
ECS_TASK --> SNS
SNS -->|"Notify client"| WS
style Clients fill:#e1f5fe,stroke:#0288d1
style Gateway fill:#fff3e0,stroke:#f57c00
style Sync fill:#e8f5e9,stroke:#388e3c
style Async fill:#fce4ec,stroke:#c62828
style Data fill:#f3e5f5,stroke:#7b1fa2
3. Bedrock InvokeModel API Patterns from Various Compute
3.1 Core API Surface
Amazon Bedrock provides three primary invocation APIs for foundation model interaction:
| API | Use Case | Latency Profile | MangaAssist Usage |
|---|---|---|---|
InvokeModel |
Single synchronous request | 1-10s depending on model | Quick manga lookups with Haiku |
InvokeModelWithResponseStream |
Streaming synchronous request | TTFT 200-800ms | Chat conversations with Sonnet |
Converse / ConverseStream |
Multi-turn with tool use | 1-12s depending on model | Complex recommendation flows |
CreateModelInvocationJob |
Batch asynchronous processing | Minutes to hours | Nightly catalog enrichment |
3.2 InvokeModel from Lambda
"""
Lambda function for synchronous Bedrock InvokeModel calls.
Used for quick, single-turn manga queries via Haiku.
"""
import json
import time
import boto3
from botocore.config import Config
# Configure retry behavior for Lambda's short-lived execution
bedrock_config = Config(
region_name="us-east-1",
retries={
"max_attempts": 2, # Limited retries for Lambda timeout budget
"mode": "adaptive" # Adaptive retry with token bucket
},
read_timeout=30, # 30s read timeout for model response
connect_timeout=5 # 5s connection timeout
)
bedrock_runtime = boto3.client(
"bedrock-runtime",
config=bedrock_config
)
def lambda_handler(event, context):
"""
Handle synchronous manga query via Bedrock InvokeModel.
Expected event:
{
"query": "What manga series are similar to One Piece?",
"model_id": "anthropic.claude-3-haiku-20240307-v1:0",
"max_tokens": 512
}
"""
start_time = time.time()
query = event.get("query", "")
model_id = event.get("model_id", "anthropic.claude-3-haiku-20240307-v1:0")
max_tokens = event.get("max_tokens", 512)
# Build the request body per Anthropic Messages API format
request_body = json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": max_tokens,
"messages": [
{
"role": "user",
"content": query
}
],
"system": (
"You are MangaAssist, a helpful assistant for a Japanese manga store. "
"Provide concise, accurate recommendations and information about manga series, "
"authors, and genres. Respond in the same language as the user's query."
),
"temperature": 0.3, # Lower temperature for factual manga info
"top_p": 0.9
})
try:
response = bedrock_runtime.invoke_model(
modelId=model_id,
contentType="application/json",
accept="application/json",
body=request_body
)
response_body = json.loads(response["body"].read())
elapsed_ms = int((time.time() - start_time) * 1000)
return {
"statusCode": 200,
"body": {
"response": response_body["content"][0]["text"],
"model": model_id,
"input_tokens": response_body["usage"]["input_tokens"],
"output_tokens": response_body["usage"]["output_tokens"],
"latency_ms": elapsed_ms,
"stop_reason": response_body["stop_reason"]
}
}
except bedrock_runtime.exceptions.ThrottlingException as e:
return {
"statusCode": 429,
"body": {"error": "Model throttled", "detail": str(e)}
}
except bedrock_runtime.exceptions.ModelTimeoutException as e:
return {
"statusCode": 504,
"body": {"error": "Model timeout", "detail": str(e)}
}
except Exception as e:
return {
"statusCode": 500,
"body": {"error": "InvokeModel failed", "detail": str(e)}
}
3.3 InvokeModelWithResponseStream from ECS Fargate
"""
ECS Fargate service for streaming Bedrock responses.
Streams tokens back to the client via WebSocket for real-time chat.
"""
import json
import asyncio
import boto3
from botocore.config import Config
class BedrockStreamingClient:
"""
Manages streaming invocations from ECS Fargate to Bedrock.
Optimized for long-running container lifecycle with connection pooling.
"""
def __init__(self, region: str = "us-east-1"):
self._config = Config(
region_name=region,
retries={
"max_attempts": 3, # More retries for long-running ECS tasks
"mode": "adaptive"
},
read_timeout=120, # 2 min for streaming responses
connect_timeout=10,
max_pool_connections=50 # Connection pool for high concurrency
)
self._client = boto3.client("bedrock-runtime", config=self._config)
async def stream_manga_response(
self,
messages: list,
system_prompt: str,
model_id: str = "anthropic.claude-3-sonnet-20240229-v1:0",
max_tokens: int = 2048,
on_token: callable = None
) -> dict:
"""
Stream a response from Bedrock, yielding tokens as they arrive.
Args:
messages: List of conversation messages in Anthropic format
system_prompt: System prompt for MangaAssist behavior
model_id: Bedrock model identifier
max_tokens: Maximum tokens to generate
on_token: Async callback invoked with each token chunk
Returns:
Complete response metadata including usage statistics
"""
request_body = json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": max_tokens,
"messages": messages,
"system": system_prompt,
"temperature": 0.7,
"top_p": 0.95,
"stop_sequences": ["</answer>"]
})
# InvokeModelWithResponseStream returns an EventStream
response = self._client.invoke_model_with_response_stream(
modelId=model_id,
contentType="application/json",
accept="application/json",
body=request_body
)
full_text = []
input_tokens = 0
output_tokens = 0
stop_reason = None
# Process the event stream
event_stream = response["body"]
for event in event_stream:
chunk = event.get("chunk")
if chunk:
chunk_data = json.loads(chunk["bytes"].decode("utf-8"))
# Handle different event types in the stream
event_type = chunk_data.get("type")
if event_type == "message_start":
usage = chunk_data.get("message", {}).get("usage", {})
input_tokens = usage.get("input_tokens", 0)
elif event_type == "content_block_delta":
delta = chunk_data.get("delta", {})
if delta.get("type") == "text_delta":
token_text = delta["text"]
full_text.append(token_text)
# Stream token back to caller (e.g., WebSocket)
if on_token:
await on_token(token_text)
elif event_type == "message_delta":
usage = chunk_data.get("usage", {})
output_tokens = usage.get("output_tokens", 0)
stop_reason = chunk_data.get("delta", {}).get("stop_reason")
elif event_type == "message_stop":
pass # Stream complete
return {
"text": "".join(full_text),
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"stop_reason": stop_reason,
"model_id": model_id
}
class MangaAssistStreamHandler:
"""
WebSocket handler that bridges API Gateway WebSocket to Bedrock streaming.
Runs on ECS Fargate as a persistent service.
"""
def __init__(self):
self.bedrock = BedrockStreamingClient()
self.system_prompt = (
"You are MangaAssist, a knowledgeable assistant for a Japanese manga store. "
"You help customers discover manga series, provide detailed information about "
"authors, genres, and story arcs. You understand both Japanese and English "
"manga terminology. When recommending series, consider the user's reading "
"history and preferences. Format recommendations with title (Japanese and "
"romanized), author, genre, and a brief description."
)
async def handle_message(self, websocket_connection_id: str, message: dict):
"""
Process an incoming WebSocket message and stream the FM response back.
"""
messages = message.get("messages", [])
model_preference = message.get("model", "sonnet")
model_map = {
"sonnet": "anthropic.claude-3-sonnet-20240229-v1:0",
"haiku": "anthropic.claude-3-haiku-20240307-v1:0"
}
model_id = model_map.get(model_preference, model_map["sonnet"])
tokens_sent = 0
async def send_token(token: str):
nonlocal tokens_sent
tokens_sent += 1
# Send token chunk back through API Gateway WebSocket
await self._post_to_connection(
websocket_connection_id,
{
"type": "token",
"data": token,
"sequence": tokens_sent
}
)
result = await self.bedrock.stream_manga_response(
messages=messages,
system_prompt=self.system_prompt,
model_id=model_id,
on_token=send_token
)
# Send completion message
await self._post_to_connection(
websocket_connection_id,
{
"type": "complete",
"usage": {
"input_tokens": result["input_tokens"],
"output_tokens": result["output_tokens"]
},
"stop_reason": result["stop_reason"]
}
)
return result
async def _post_to_connection(self, connection_id: str, data: dict):
"""Post data back to a WebSocket connection via API Gateway Management API."""
# API Gateway Management API client would be initialized with endpoint URL
pass # Implementation depends on API Gateway endpoint configuration
3.4 InvokeModel from EC2 (Long-Running Analysis)
"""
EC2-based heavy manga analysis worker.
Used for tasks that exceed Lambda/ECS timeout limits or need GPU-adjacent processing.
"""
import json
import logging
import boto3
from botocore.config import Config
from concurrent.futures import ThreadPoolExecutor, as_completed
logger = logging.getLogger(__name__)
class EC2BedrockWorker:
"""
Long-running Bedrock worker on EC2 with connection pooling
and concurrent request management.
"""
def __init__(self, region: str = "us-east-1", max_concurrent: int = 10):
self._config = Config(
region_name=region,
retries={
"max_attempts": 5, # Aggressive retries for batch work
"mode": "adaptive"
},
read_timeout=300, # 5 min timeout for complex analysis
connect_timeout=10,
max_pool_connections=max_concurrent
)
self._client = boto3.client("bedrock-runtime", config=self._config)
self._executor = ThreadPoolExecutor(max_workers=max_concurrent)
def analyze_manga_catalog(self, manga_items: list) -> list:
"""
Analyze a batch of manga items concurrently using Bedrock.
Args:
manga_items: List of manga metadata dicts to analyze
Returns:
List of analysis results with enriched metadata
"""
futures = {}
for item in manga_items:
future = self._executor.submit(self._analyze_single, item)
futures[future] = item["manga_id"]
results = []
for future in as_completed(futures):
manga_id = futures[future]
try:
result = future.result(timeout=60)
results.append({"manga_id": manga_id, "status": "success", **result})
except Exception as e:
logger.error(f"Analysis failed for {manga_id}: {e}")
results.append({"manga_id": manga_id, "status": "error", "error": str(e)})
return results
def _analyze_single(self, manga_item: dict) -> dict:
"""Analyze a single manga item with Bedrock."""
prompt = self._build_analysis_prompt(manga_item)
request_body = json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1024,
"messages": [{"role": "user", "content": prompt}],
"system": (
"You are a manga catalog analysis system. Analyze the provided manga "
"metadata and generate structured enrichment data including genre tags, "
"theme keywords, reading level, and similar series recommendations. "
"Respond in valid JSON format only."
),
"temperature": 0.1 # Very low temperature for consistent structured output
})
response = self._client.invoke_model(
modelId="anthropic.claude-3-haiku-20240307-v1:0", # Haiku for cost efficiency
contentType="application/json",
accept="application/json",
body=request_body
)
response_body = json.loads(response["body"].read())
analysis = json.loads(response_body["content"][0]["text"])
return {
"analysis": analysis,
"tokens_used": {
"input": response_body["usage"]["input_tokens"],
"output": response_body["usage"]["output_tokens"]
}
}
def _build_analysis_prompt(self, manga_item: dict) -> str:
return (
f"Analyze the following manga entry and provide enrichment data:\n\n"
f"Title: {manga_item.get('title', 'Unknown')}\n"
f"Title (JP): {manga_item.get('title_jp', 'Unknown')}\n"
f"Author: {manga_item.get('author', 'Unknown')}\n"
f"Publisher: {manga_item.get('publisher', 'Unknown')}\n"
f"Year: {manga_item.get('year', 'Unknown')}\n"
f"Volumes: {manga_item.get('volumes', 'Unknown')}\n"
f"Synopsis: {manga_item.get('synopsis', 'No synopsis available')}\n\n"
f"Return JSON with: genre_tags, theme_keywords, reading_level, "
f"similar_series (list of 5), content_warnings, target_demographic."
)
4. AWS SDK Integration
4.1 Python boto3 — BedrockSyncInvoker
"""
Production-ready synchronous Bedrock invoker with caching,
metrics, and intelligent model routing for MangaAssist.
"""
import json
import time
import hashlib
import logging
from typing import Optional
from dataclasses import dataclass, field
import boto3
import redis
from botocore.config import Config
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
@dataclass
class InvocationResult:
"""Structured result from a Bedrock invocation."""
text: str
model_id: str
input_tokens: int
output_tokens: int
latency_ms: int
cache_hit: bool = False
stop_reason: str = "end_turn"
estimated_cost_usd: float = 0.0
@dataclass
class ModelPricing:
"""Per-million-token pricing for a Bedrock model."""
input_per_million: float
output_per_million: float
# MangaAssist model pricing table
MODEL_PRICING = {
"anthropic.claude-3-sonnet-20240229-v1:0": ModelPricing(3.00, 15.00),
"anthropic.claude-3-haiku-20240307-v1:0": ModelPricing(0.25, 1.25),
}
class BedrockSyncInvoker:
"""
Synchronous Bedrock invoker with response caching, model routing,
cost tracking, and comprehensive error handling.
Features:
- Redis response caching with TTL
- Automatic model fallback (Sonnet -> Haiku on throttle)
- Cost estimation per invocation
- CloudWatch metrics emission
- Request deduplication
Usage:
invoker = BedrockSyncInvoker(
redis_host="manga-cache.abc123.use1.cache.amazonaws.com",
cache_ttl_seconds=300
)
result = invoker.invoke(
prompt="Recommend 5 action manga similar to Naruto",
model_id="anthropic.claude-3-sonnet-20240229-v1:0"
)
print(result.text)
print(f"Cost: ${result.estimated_cost_usd:.6f}")
"""
def __init__(
self,
region: str = "us-east-1",
redis_host: Optional[str] = None,
redis_port: int = 6379,
cache_ttl_seconds: int = 300,
enable_fallback: bool = True
):
self._bedrock_config = Config(
region_name=region,
retries={"max_attempts": 3, "mode": "adaptive"},
read_timeout=60,
connect_timeout=5,
max_pool_connections=25
)
self._client = boto3.client("bedrock-runtime", config=self._bedrock_config)
self._cloudwatch = boto3.client("cloudwatch", region_name=region)
# Redis cache setup
self._cache = None
self._cache_ttl = cache_ttl_seconds
if redis_host:
self._cache = redis.Redis(
host=redis_host,
port=redis_port,
decode_responses=True,
socket_timeout=2,
socket_connect_timeout=2,
retry_on_timeout=True
)
self._enable_fallback = enable_fallback
# MangaAssist system prompt
self._system_prompt = (
"You are MangaAssist, a helpful assistant for a Japanese manga store. "
"You have deep knowledge of manga series, authors, genres, and Japanese "
"publishing. You can recommend series based on user preferences, explain "
"plot details without spoilers, and help with purchasing decisions. "
"Always be respectful of Japanese culture and naming conventions. "
"Use romanized Japanese titles alongside English titles when relevant."
)
def invoke(
self,
prompt: str,
model_id: str = "anthropic.claude-3-sonnet-20240229-v1:0",
max_tokens: int = 1024,
temperature: float = 0.5,
system_prompt: Optional[str] = None,
use_cache: bool = True,
conversation_history: Optional[list] = None
) -> InvocationResult:
"""
Invoke Bedrock synchronously with caching and fallback.
Args:
prompt: User prompt text
model_id: Bedrock model ID
max_tokens: Maximum response tokens
temperature: Sampling temperature (0.0-1.0)
system_prompt: Override default system prompt
use_cache: Whether to check/populate cache
conversation_history: Previous messages for multi-turn
Returns:
InvocationResult with response text and metadata
"""
start_time = time.time()
sys_prompt = system_prompt or self._system_prompt
# Build messages list
messages = []
if conversation_history:
messages.extend(conversation_history)
messages.append({"role": "user", "content": prompt})
# Check cache for single-turn queries
cache_key = None
if use_cache and self._cache and not conversation_history:
cache_key = self._compute_cache_key(prompt, model_id, max_tokens, temperature)
cached = self._get_cached_response(cache_key)
if cached:
elapsed_ms = int((time.time() - start_time) * 1000)
self._emit_metric("CacheHit", 1)
return InvocationResult(
text=cached["text"],
model_id=model_id,
input_tokens=cached.get("input_tokens", 0),
output_tokens=cached.get("output_tokens", 0),
latency_ms=elapsed_ms,
cache_hit=True,
stop_reason=cached.get("stop_reason", "end_turn"),
estimated_cost_usd=0.0 # No cost for cached responses
)
# Attempt invocation with optional fallback
try:
result = self._invoke_model(messages, sys_prompt, model_id, max_tokens, temperature)
except ClientError as e:
error_code = e.response["Error"]["Code"]
if error_code == "ThrottlingException" and self._enable_fallback:
logger.warning(f"Throttled on {model_id}, falling back to Haiku")
self._emit_metric("FallbackTriggered", 1)
fallback_model = "anthropic.claude-3-haiku-20240307-v1:0"
result = self._invoke_model(
messages, sys_prompt, fallback_model, max_tokens, temperature
)
model_id = fallback_model
else:
raise
elapsed_ms = int((time.time() - start_time) * 1000)
# Calculate cost
pricing = MODEL_PRICING.get(model_id, ModelPricing(0, 0))
cost = (
(result["input_tokens"] / 1_000_000) * pricing.input_per_million +
(result["output_tokens"] / 1_000_000) * pricing.output_per_million
)
# Cache the response
if cache_key and self._cache:
self._set_cached_response(cache_key, result)
# Emit metrics
self._emit_metric("InvocationLatency", elapsed_ms, "Milliseconds")
self._emit_metric("InputTokens", result["input_tokens"])
self._emit_metric("OutputTokens", result["output_tokens"])
self._emit_metric("EstimatedCostUSD", cost * 100, "None") # Cents for precision
return InvocationResult(
text=result["text"],
model_id=model_id,
input_tokens=result["input_tokens"],
output_tokens=result["output_tokens"],
latency_ms=elapsed_ms,
cache_hit=False,
stop_reason=result["stop_reason"],
estimated_cost_usd=cost
)
def _invoke_model(
self,
messages: list,
system_prompt: str,
model_id: str,
max_tokens: int,
temperature: float
) -> dict:
"""Execute the raw Bedrock InvokeModel call."""
request_body = json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": max_tokens,
"messages": messages,
"system": system_prompt,
"temperature": temperature,
"top_p": 0.95
})
response = self._client.invoke_model(
modelId=model_id,
contentType="application/json",
accept="application/json",
body=request_body
)
body = json.loads(response["body"].read())
return {
"text": body["content"][0]["text"],
"input_tokens": body["usage"]["input_tokens"],
"output_tokens": body["usage"]["output_tokens"],
"stop_reason": body["stop_reason"]
}
def _compute_cache_key(
self, prompt: str, model_id: str, max_tokens: int, temperature: float
) -> str:
"""Generate a deterministic cache key from request parameters."""
key_material = f"{prompt}|{model_id}|{max_tokens}|{temperature}"
return f"manga:fm:sync:{hashlib.sha256(key_material.encode()).hexdigest()[:16]}"
def _get_cached_response(self, key: str) -> Optional[dict]:
"""Retrieve a cached response from Redis."""
try:
data = self._cache.get(key)
if data:
return json.loads(data)
except redis.RedisError as e:
logger.warning(f"Cache read error: {e}")
return None
def _set_cached_response(self, key: str, result: dict):
"""Store a response in Redis with TTL."""
try:
self._cache.setex(key, self._cache_ttl, json.dumps(result))
except redis.RedisError as e:
logger.warning(f"Cache write error: {e}")
def _emit_metric(self, name: str, value: float, unit: str = "Count"):
"""Emit a CloudWatch metric for monitoring."""
try:
self._cloudwatch.put_metric_data(
Namespace="MangaAssist/Bedrock",
MetricData=[{
"MetricName": name,
"Value": value,
"Unit": unit,
"Dimensions": [
{"Name": "Service", "Value": "MangaAssist"},
{"Name": "Component", "Value": "BedrockSyncInvoker"}
]
}]
)
except Exception as e:
logger.debug(f"Metric emission failed: {e}")
4.2 JavaScript SDK v3 Integration
/**
* MangaAssist Bedrock client using AWS SDK for JavaScript v3.
* Used by the Node.js API layer for lightweight request handling.
*/
import {
BedrockRuntimeClient,
InvokeModelCommand,
InvokeModelWithResponseStreamCommand,
} from "@aws-sdk/client-bedrock-runtime";
const MANGA_SYSTEM_PROMPT = `You are MangaAssist, a helpful assistant for a Japanese manga store.
You provide recommendations, answer questions about manga series, and help customers
find their next great read. You understand both Japanese and English terminology.`;
/**
* Configuration for the Bedrock client with optimized retry behavior.
*/
const bedrockClient = new BedrockRuntimeClient({
region: "us-east-1",
maxAttempts: 3,
requestHandler: {
requestTimeout: 30000, // 30s total request timeout
httpsAgent: {
maxSockets: 50, // Connection pool size
keepAlive: true,
keepAliveMsecs: 60000,
},
},
});
/**
* Invoke Bedrock synchronously for a manga query.
*
* @param {string} prompt - User's manga-related query
* @param {Object} options - Invocation options
* @param {string} options.modelId - Bedrock model identifier
* @param {number} options.maxTokens - Maximum response tokens
* @param {number} options.temperature - Sampling temperature
* @returns {Promise<Object>} Response with text and usage metadata
*/
export async function invokeMangaQuery(prompt, options = {}) {
const {
modelId = "anthropic.claude-3-haiku-20240307-v1:0",
maxTokens = 512,
temperature = 0.3,
} = options;
const requestBody = JSON.stringify({
anthropic_version: "bedrock-2023-05-31",
max_tokens: maxTokens,
messages: [{ role: "user", content: prompt }],
system: MANGA_SYSTEM_PROMPT,
temperature,
});
const command = new InvokeModelCommand({
modelId,
contentType: "application/json",
accept: "application/json",
body: requestBody,
});
const startTime = Date.now();
const response = await bedrockClient.send(command);
const responseBody = JSON.parse(new TextDecoder().decode(response.body));
const latencyMs = Date.now() - startTime;
return {
text: responseBody.content[0].text,
modelId,
inputTokens: responseBody.usage.input_tokens,
outputTokens: responseBody.usage.output_tokens,
stopReason: responseBody.stop_reason,
latencyMs,
};
}
/**
* Stream a Bedrock response, yielding tokens as they arrive.
* Used for real-time chat over WebSocket.
*
* @param {Array} messages - Conversation message history
* @param {Object} options - Stream options
* @yields {string} Individual token chunks
* @returns {AsyncGenerator<string>} Token stream
*/
export async function* streamMangaResponse(messages, options = {}) {
const {
modelId = "anthropic.claude-3-sonnet-20240229-v1:0",
maxTokens = 2048,
temperature = 0.7,
} = options;
const requestBody = JSON.stringify({
anthropic_version: "bedrock-2023-05-31",
max_tokens: maxTokens,
messages,
system: MANGA_SYSTEM_PROMPT,
temperature,
});
const command = new InvokeModelWithResponseStreamCommand({
modelId,
contentType: "application/json",
accept: "application/json",
body: requestBody,
});
const response = await bedrockClient.send(command);
for await (const event of response.body) {
const chunk = JSON.parse(new TextDecoder().decode(event.chunk?.bytes));
if (chunk.type === "content_block_delta") {
if (chunk.delta?.type === "text_delta") {
yield chunk.delta.text;
}
}
}
}
5. SQS-Based Async Processing for Batch Manga Analysis
"""
Async FM processor using SQS for decoupled, scalable manga analysis.
Handles batch requests that do not require real-time responses.
"""
import json
import time
import uuid
import logging
from typing import Optional
from dataclasses import dataclass
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
@dataclass
class AsyncFMRequest:
"""Represents an asynchronous FM processing request."""
request_id: str
prompt: str
model_id: str
max_tokens: int
callback_url: Optional[str]
metadata: dict
priority: str = "normal" # normal | high | low
created_at: float = 0.0
def to_sqs_message(self) -> dict:
"""Serialize to SQS message format."""
return {
"request_id": self.request_id,
"prompt": self.prompt,
"model_id": self.model_id,
"max_tokens": self.max_tokens,
"callback_url": self.callback_url,
"metadata": self.metadata,
"priority": self.priority,
"created_at": self.created_at or time.time()
}
class AsyncFMProcessor:
"""
Asynchronous FM processor that uses SQS to decouple request
submission from processing. Supports priority routing, DLQ handling,
and callback notifications.
Architecture:
Producer SQS Queues Consumer
------- ---------- --------
submit() ------> manga-fm-high --------> Lambda/ECS
submit() ------> manga-fm-normal ------> Lambda/ECS
submit() ------> manga-fm-low ----------> Lambda/ECS
manga-fm-dlq <---------- (failures)
Usage:
processor = AsyncFMProcessor()
# Submit a request
request_id = processor.submit(
prompt="Analyze the art style evolution in Berserk volumes 1-40",
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
priority="normal",
callback_url="https://manga.api/webhooks/fm-result"
)
# Check status
status = processor.get_status(request_id)
"""
QUEUE_MAP = {
"high": "manga-fm-high-priority",
"normal": "manga-fm-normal",
"low": "manga-fm-low-priority"
}
DLQ_NAME = "manga-fm-dlq"
def __init__(self, region: str = "us-east-1"):
self._sqs = boto3.client("sqs", region_name=region)
self._dynamodb = boto3.resource("dynamodb", region_name=region)
self._status_table = self._dynamodb.Table("manga-fm-request-status")
# Resolve queue URLs
self._queue_urls = {}
for priority, queue_name in self.QUEUE_MAP.items():
resp = self._sqs.get_queue_url(QueueName=queue_name)
self._queue_urls[priority] = resp["QueueUrl"]
def submit(
self,
prompt: str,
model_id: str = "anthropic.claude-3-sonnet-20240229-v1:0",
max_tokens: int = 2048,
priority: str = "normal",
callback_url: Optional[str] = None,
metadata: Optional[dict] = None
) -> str:
"""
Submit an async FM request to the appropriate SQS queue.
Returns:
request_id for tracking the request status
"""
request_id = str(uuid.uuid4())
request = AsyncFMRequest(
request_id=request_id,
prompt=prompt,
model_id=model_id,
max_tokens=max_tokens,
callback_url=callback_url,
metadata=metadata or {},
priority=priority,
created_at=time.time()
)
queue_url = self._queue_urls.get(priority, self._queue_urls["normal"])
# Send to SQS with message attributes for filtering
self._sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(request.to_sqs_message()),
MessageAttributes={
"RequestId": {
"StringValue": request_id,
"DataType": "String"
},
"ModelId": {
"StringValue": model_id,
"DataType": "String"
},
"Priority": {
"StringValue": priority,
"DataType": "String"
}
},
MessageGroupId=f"manga-fm-{priority}" if "fifo" in queue_url else None,
MessageDeduplicationId=request_id if "fifo" in queue_url else None
)
# Track status in DynamoDB
self._status_table.put_item(Item={
"request_id": request_id,
"status": "queued",
"priority": priority,
"model_id": model_id,
"created_at": int(request.created_at),
"ttl": int(request.created_at) + 86400 # 24hr TTL
})
logger.info(f"Submitted async request {request_id} to {priority} queue")
return request_id
def get_status(self, request_id: str) -> dict:
"""Check the status of an async FM request."""
try:
response = self._status_table.get_item(Key={"request_id": request_id})
return response.get("Item", {"status": "not_found"})
except ClientError as e:
logger.error(f"Status lookup failed: {e}")
return {"status": "error", "detail": str(e)}
def submit_batch(self, requests: list) -> list:
"""
Submit multiple async FM requests efficiently using SQS batch operations.
Args:
requests: List of dicts with prompt, model_id, max_tokens, etc.
Returns:
List of request IDs
"""
request_ids = []
# Group by priority for batch sending
by_priority = {"high": [], "normal": [], "low": []}
for req in requests:
priority = req.get("priority", "normal")
request_id = str(uuid.uuid4())
request_ids.append(request_id)
fm_request = AsyncFMRequest(
request_id=request_id,
prompt=req["prompt"],
model_id=req.get("model_id", "anthropic.claude-3-haiku-20240307-v1:0"),
max_tokens=req.get("max_tokens", 1024),
callback_url=req.get("callback_url"),
metadata=req.get("metadata", {}),
priority=priority,
created_at=time.time()
)
by_priority[priority].append(fm_request)
# Send in batches of 10 (SQS limit)
for priority, batch_requests in by_priority.items():
if not batch_requests:
continue
queue_url = self._queue_urls.get(priority, self._queue_urls["normal"])
for i in range(0, len(batch_requests), 10):
batch = batch_requests[i:i+10]
entries = []
for req in batch:
entries.append({
"Id": req.request_id[:80], # SQS ID length limit
"MessageBody": json.dumps(req.to_sqs_message()),
"MessageAttributes": {
"RequestId": {
"StringValue": req.request_id,
"DataType": "String"
},
"ModelId": {
"StringValue": req.model_id,
"DataType": "String"
}
}
})
response = self._sqs.send_message_batch(
QueueUrl=queue_url,
Entries=entries
)
failed = response.get("Failed", [])
if failed:
logger.error(f"Batch send failures: {failed}")
logger.info(f"Submitted batch of {len(request_ids)} requests")
return request_ids
6. API Gateway Custom API with Request Validation
6.1 APIGatewayFMProxy
"""
API Gateway FM Proxy — handles REST API requests, validates them,
transforms payloads, and routes to the appropriate Bedrock model.
Deployed as a Lambda authorizer + integration backend.
"""
import json
import re
import logging
from typing import Optional
import boto3
import jsonschema
from jsonschema import validate, ValidationError
logger = logging.getLogger(__name__)
# --- Request Validation Schemas ---
MANGA_QUERY_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["query"],
"properties": {
"query": {
"type": "string",
"minLength": 1,
"maxLength": 4000,
"description": "User query text (supports Japanese Unicode)"
},
"model": {
"type": "string",
"enum": ["sonnet", "haiku"],
"default": "haiku"
},
"max_tokens": {
"type": "integer",
"minimum": 1,
"maximum": 4096,
"default": 512
},
"temperature": {
"type": "number",
"minimum": 0.0,
"maximum": 1.0,
"default": 0.5
},
"conversation_id": {
"type": "string",
"pattern": "^[a-f0-9-]{36}$"
},
"language": {
"type": "string",
"enum": ["en", "ja", "auto"],
"default": "auto"
}
},
"additionalProperties": False
}
BATCH_ANALYSIS_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["items"],
"properties": {
"items": {
"type": "array",
"minItems": 1,
"maxItems": 100,
"items": {
"type": "object",
"required": ["manga_id", "analysis_type"],
"properties": {
"manga_id": {
"type": "string",
"pattern": "^MNG-[A-Z0-9]{8}$"
},
"analysis_type": {
"type": "string",
"enum": [
"genre_classification",
"content_summary",
"similar_series",
"reading_level",
"full_enrichment"
]
},
"additional_context": {
"type": "string",
"maxLength": 1000
}
}
}
},
"callback_url": {
"type": "string",
"format": "uri",
"pattern": "^https://"
},
"priority": {
"type": "string",
"enum": ["high", "normal", "low"],
"default": "normal"
}
}
}
class RequestValidator:
"""
Validates incoming API Gateway requests against JSON schemas.
Handles Unicode-aware validation for Japanese manga titles and content.
"""
SCHEMAS = {
"manga_query": MANGA_QUERY_SCHEMA,
"batch_analysis": BATCH_ANALYSIS_SCHEMA
}
def __init__(self):
self._validators = {}
for name, schema in self.SCHEMAS.items():
self._validators[name] = jsonschema.Draft7Validator(schema)
def validate_request(self, schema_name: str, body: dict) -> dict:
"""
Validate a request body against a named schema.
Args:
schema_name: Key from SCHEMAS dict
body: Parsed request body
Returns:
Dict with 'valid' bool and optional 'errors' list
Raises:
KeyError: If schema_name is not registered
"""
validator = self._validators.get(schema_name)
if not validator:
raise KeyError(f"Unknown schema: {schema_name}")
errors = []
for error in validator.iter_errors(body):
errors.append({
"path": ".".join(str(p) for p in error.absolute_path) or "$",
"message": error.message,
"validator": error.validator
})
return {
"valid": len(errors) == 0,
"errors": errors
}
def sanitize_manga_query(self, query: str) -> str:
"""
Sanitize a manga query while preserving Japanese Unicode characters.
Removes potentially harmful content but keeps valid CJK characters.
"""
# Allow: Latin, CJK Unified, Hiragana, Katakana, punctuation, digits, spaces
allowed_pattern = (
r'[^\u0020-\u007E' # Basic Latin printable
r'\u3000-\u303F' # CJK Symbols and Punctuation
r'\u3040-\u309F' # Hiragana
r'\u30A0-\u30FF' # Katakana
r'\u4E00-\u9FFF' # CJK Unified Ideographs
r'\uFF00-\uFFEF' # Halfwidth and Fullwidth Forms
r'\u2000-\u206F' # General Punctuation
r']'
)
sanitized = re.sub(allowed_pattern, '', query)
return sanitized.strip()
class APIGatewayFMProxy:
"""
Lambda handler that serves as the integration backend for
API Gateway REST API endpoints. Routes requests to Bedrock
after validation and transformation.
Endpoints:
POST /v1/manga/query -> Synchronous FM invocation
POST /v1/manga/batch -> Async batch submission
GET /v1/manga/status/:id -> Check async request status
"""
MODEL_MAP = {
"sonnet": "anthropic.claude-3-sonnet-20240229-v1:0",
"haiku": "anthropic.claude-3-haiku-20240307-v1:0"
}
def __init__(self):
self._validator = RequestValidator()
self._bedrock = boto3.client(
"bedrock-runtime",
config=boto3.session.Config(
retries={"max_attempts": 3, "mode": "adaptive"},
read_timeout=30
)
)
self._sqs = boto3.client("sqs")
self._dynamodb = boto3.resource("dynamodb")
self._status_table = self._dynamodb.Table("manga-fm-request-status")
def handle(self, event: dict, context) -> dict:
"""
Main Lambda handler for API Gateway proxy integration.
Routes based on HTTP method and resource path.
"""
http_method = event.get("httpMethod", "")
resource = event.get("resource", "")
route_key = f"{http_method} {resource}"
routes = {
"POST /v1/manga/query": self._handle_sync_query,
"POST /v1/manga/batch": self._handle_batch_submit,
"GET /v1/manga/status/{request_id}": self._handle_status_check
}
handler = routes.get(route_key)
if not handler:
return self._response(404, {"error": "Not found", "path": resource})
try:
return handler(event, context)
except Exception as e:
logger.exception(f"Unhandled error in {route_key}")
return self._response(500, {"error": "Internal server error"})
def _handle_sync_query(self, event: dict, context) -> dict:
"""Handle synchronous manga query via Bedrock."""
try:
body = json.loads(event.get("body", "{}"))
except json.JSONDecodeError:
return self._response(400, {"error": "Invalid JSON body"})
# Validate request
validation = self._validator.validate_request("manga_query", body)
if not validation["valid"]:
return self._response(400, {
"error": "Validation failed",
"details": validation["errors"]
})
# Sanitize query
query = self._validator.sanitize_manga_query(body["query"])
model_key = body.get("model", "haiku")
model_id = self.MODEL_MAP.get(model_key, self.MODEL_MAP["haiku"])
max_tokens = body.get("max_tokens", 512)
temperature = body.get("temperature", 0.5)
# Invoke Bedrock
request_body = json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": max_tokens,
"messages": [{"role": "user", "content": query}],
"system": (
"You are MangaAssist. Provide helpful, concise manga recommendations "
"and information. Respond in the same language as the query."
),
"temperature": temperature
})
response = self._bedrock.invoke_model(
modelId=model_id,
contentType="application/json",
accept="application/json",
body=request_body
)
result = json.loads(response["body"].read())
return self._response(200, {
"response": result["content"][0]["text"],
"model": model_key,
"usage": {
"input_tokens": result["usage"]["input_tokens"],
"output_tokens": result["usage"]["output_tokens"]
},
"stop_reason": result["stop_reason"]
})
def _handle_batch_submit(self, event: dict, context) -> dict:
"""Handle batch analysis submission to SQS."""
try:
body = json.loads(event.get("body", "{}"))
except json.JSONDecodeError:
return self._response(400, {"error": "Invalid JSON body"})
validation = self._validator.validate_request("batch_analysis", body)
if not validation["valid"]:
return self._response(400, {
"error": "Validation failed",
"details": validation["errors"]
})
# Submit to SQS — returns request IDs
request_ids = []
for item in body["items"]:
# Each item becomes an individual SQS message
request_ids.append(item["manga_id"])
return self._response(202, {
"message": "Batch submitted",
"item_count": len(body["items"]),
"request_ids": request_ids
})
def _handle_status_check(self, event: dict, context) -> dict:
"""Check async request status from DynamoDB."""
request_id = event.get("pathParameters", {}).get("request_id", "")
if not request_id:
return self._response(400, {"error": "request_id is required"})
try:
response = self._status_table.get_item(Key={"request_id": request_id})
item = response.get("Item")
if not item:
return self._response(404, {"error": "Request not found"})
return self._response(200, item)
except Exception as e:
return self._response(500, {"error": str(e)})
def _response(self, status_code: int, body: dict) -> dict:
"""Build API Gateway proxy response."""
return {
"statusCode": status_code,
"headers": {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET,POST,OPTIONS",
"X-Request-Id": str(__import__("uuid").uuid4())
},
"body": json.dumps(body, default=str)
}
7. API Gateway OpenAPI Specification
# MangaAssist API Gateway OpenAPI Specification
# Defines the REST API with request validation models
openapi: "3.0.1"
info:
title: "MangaAssist FM API"
description: "Flexible model interaction API for MangaAssist manga store chatbot"
version: "1.0.0"
contact:
name: "MangaAssist Platform Team"
servers:
- url: "https://{api-id}.execute-api.us-east-1.amazonaws.com/prod"
variables:
api-id:
default: "abc123xyz"
paths:
/v1/manga/query:
post:
summary: "Synchronous manga query"
description: "Submit a manga-related query for real-time FM response"
operationId: "syncMangaQuery"
tags:
- Synchronous
requestBody:
required: true
content:
application/json:
schema:
$ref: "#/components/schemas/MangaQueryRequest"
examples:
japanese_query:
summary: "Query in Japanese"
value:
query: "ワンピースに似たアクション漫画を教えてください"
model: "sonnet"
language: "ja"
max_tokens: 1024
english_query:
summary: "Query in English"
value:
query: "What are the best seinen manga from 2023?"
model: "haiku"
max_tokens: 512
responses:
"200":
description: "Successful FM response"
content:
application/json:
schema:
$ref: "#/components/schemas/MangaQueryResponse"
"400":
description: "Validation error"
content:
application/json:
schema:
$ref: "#/components/schemas/ErrorResponse"
"429":
description: "Rate limited or model throttled"
"500":
description: "Internal server error"
x-amazon-apigateway-request-validator: "validate-body"
x-amazon-apigateway-integration:
type: "aws_proxy"
httpMethod: "POST"
uri: "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/arn:aws:lambda:us-east-1:123456789012:function:manga-fm-proxy/invocations"
/v1/manga/batch:
post:
summary: "Submit batch analysis"
description: "Submit multiple manga items for async FM analysis"
operationId: "submitBatchAnalysis"
tags:
- Asynchronous
requestBody:
required: true
content:
application/json:
schema:
$ref: "#/components/schemas/BatchAnalysisRequest"
responses:
"202":
description: "Batch accepted"
content:
application/json:
schema:
$ref: "#/components/schemas/BatchSubmitResponse"
"400":
description: "Validation error"
x-amazon-apigateway-request-validator: "validate-body"
x-amazon-apigateway-integration:
type: "aws_proxy"
httpMethod: "POST"
uri: "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/arn:aws:lambda:us-east-1:123456789012:function:manga-fm-proxy/invocations"
/v1/manga/status/{request_id}:
get:
summary: "Check async request status"
description: "Get the processing status of an async FM request"
operationId: "getRequestStatus"
tags:
- Asynchronous
parameters:
- name: request_id
in: path
required: true
schema:
type: string
pattern: "^[a-f0-9-]{36}$"
responses:
"200":
description: "Status found"
content:
application/json:
schema:
$ref: "#/components/schemas/RequestStatusResponse"
"404":
description: "Request not found"
x-amazon-apigateway-integration:
type: "aws_proxy"
httpMethod: "POST"
uri: "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/arn:aws:lambda:us-east-1:123456789012:function:manga-fm-proxy/invocations"
components:
schemas:
MangaQueryRequest:
type: object
required:
- query
properties:
query:
type: string
minLength: 1
maxLength: 4000
description: "User query (supports Japanese Unicode)"
model:
type: string
enum: ["sonnet", "haiku"]
default: "haiku"
max_tokens:
type: integer
minimum: 1
maximum: 4096
default: 512
temperature:
type: number
minimum: 0.0
maximum: 1.0
default: 0.5
conversation_id:
type: string
pattern: "^[a-f0-9-]{36}$"
language:
type: string
enum: ["en", "ja", "auto"]
default: "auto"
MangaQueryResponse:
type: object
properties:
response:
type: string
model:
type: string
usage:
type: object
properties:
input_tokens:
type: integer
output_tokens:
type: integer
stop_reason:
type: string
BatchAnalysisRequest:
type: object
required:
- items
properties:
items:
type: array
minItems: 1
maxItems: 100
items:
type: object
required:
- manga_id
- analysis_type
properties:
manga_id:
type: string
pattern: "^MNG-[A-Z0-9]{8}$"
analysis_type:
type: string
enum:
- genre_classification
- content_summary
- similar_series
- reading_level
- full_enrichment
additional_context:
type: string
maxLength: 1000
callback_url:
type: string
format: uri
pattern: "^https://"
priority:
type: string
enum: ["high", "normal", "low"]
default: "normal"
BatchSubmitResponse:
type: object
properties:
message:
type: string
item_count:
type: integer
request_ids:
type: array
items:
type: string
RequestStatusResponse:
type: object
properties:
request_id:
type: string
status:
type: string
enum: ["queued", "processing", "completed", "failed"]
created_at:
type: integer
completed_at:
type: integer
result:
type: object
ErrorResponse:
type: object
properties:
error:
type: string
details:
type: array
items:
type: object
properties:
path:
type: string
message:
type: string
x-amazon-apigateway-request-validators:
validate-body:
validateRequestBody: true
validateRequestParameters: false
validate-all:
validateRequestBody: true
validateRequestParameters: true
8. VTL Mapping Templates for API Gateway
8.1 Request Mapping Template (Input Transformation)
## Request mapping template: transforms API Gateway request into Lambda input
## Handles Unicode content in manga queries safely
#set($inputRoot = $input.path('$'))
{
"httpMethod": "$context.httpMethod",
"resource": "$context.resourcePath",
"pathParameters": {
#foreach($key in $input.params().path.keySet())
"$key": "$util.escapeJavaScript($input.params().path.get($key))"#if($foreach.hasNext),#end
#end
},
"queryStringParameters": {
#foreach($key in $input.params().querystring.keySet())
"$key": "$util.escapeJavaScript($input.params().querystring.get($key))"#if($foreach.hasNext),#end
#end
},
"headers": {
"Content-Type": "$input.params().header.get('Content-Type')",
"X-Api-Key": "$input.params().header.get('X-Api-Key')",
"X-Request-Id": "$context.requestId",
"X-Source-Ip": "$context.identity.sourceIp"
},
"body": $input.json('$'),
"requestContext": {
"requestId": "$context.requestId",
"stage": "$context.stage",
"apiId": "$context.apiId",
"requestTime": "$context.requestTime",
"requestTimeEpoch": $context.requestTimeEpoch
}
}
8.2 Response Mapping Template (Output Transformation)
## Response mapping template: transforms Lambda output for API Gateway response
## Adds standard headers and wraps response body
#set($response = $input.path('$'))
#set($statusCode = $response.statusCode)
#if($statusCode == 200)
{
"response": $response.body.response,
"metadata": {
"model": "$response.body.model",
"usage": {
"input_tokens": $response.body.usage.input_tokens,
"output_tokens": $response.body.usage.output_tokens
},
"request_id": "$context.requestId",
"timestamp": "$context.requestTime"
}
}
#elseif($statusCode == 429)
#set($context.responseOverride.status = 429)
{
"error": "Rate limited",
"message": "Too many requests. Please retry after a short delay.",
"retry_after_seconds": 5
}
#else
{
"error": $response.body.error,
"request_id": "$context.requestId"
}
#end
9. Converse API for Multi-Turn with Tool Use
"""
Bedrock Converse API integration for MangaAssist multi-turn conversations
with tool use (function calling) for dynamic data lookups.
"""
import json
import boto3
from botocore.config import Config
class MangaConverseClient:
"""
Multi-turn conversation client using Bedrock Converse API.
Supports tool use for live manga catalog lookups and order status checks.
"""
TOOL_CONFIG = {
"tools": [
{
"toolSpec": {
"name": "search_manga_catalog",
"description": (
"Search the manga catalog by title, author, genre, or keyword. "
"Returns matching manga entries with metadata."
),
"inputSchema": {
"json": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query text"
},
"genre": {
"type": "string",
"description": "Optional genre filter",
"enum": [
"shonen", "seinen", "shojo", "josei",
"kodomo", "isekai", "mecha", "horror"
]
},
"max_results": {
"type": "integer",
"description": "Maximum results to return (1-20)",
"default": 5
}
},
"required": ["query"]
}
}
}
},
{
"toolSpec": {
"name": "get_order_status",
"description": "Check the status of a customer's manga order by order ID.",
"inputSchema": {
"json": {
"type": "object",
"properties": {
"order_id": {
"type": "string",
"description": "The order ID to look up",
"pattern": "^ORD-[A-Z0-9]{8}$"
}
},
"required": ["order_id"]
}
}
}
},
{
"toolSpec": {
"name": "check_manga_availability",
"description": "Check stock availability and pricing for a specific manga.",
"inputSchema": {
"json": {
"type": "object",
"properties": {
"manga_id": {
"type": "string",
"description": "The manga ID to check"
},
"volume": {
"type": "integer",
"description": "Specific volume number (optional)"
}
},
"required": ["manga_id"]
}
}
}
}
]
}
def __init__(self, region: str = "us-east-1"):
config = Config(
region_name=region,
retries={"max_attempts": 3, "mode": "adaptive"},
read_timeout=60
)
self._client = boto3.client("bedrock-runtime", config=config)
self._model_id = "anthropic.claude-3-sonnet-20240229-v1:0"
self._system_prompt = [
{
"text": (
"You are MangaAssist, a knowledgeable assistant for a Japanese "
"manga store. Use the available tools to look up real-time "
"catalog data, check order statuses, and verify availability. "
"Always use tools when the user asks about specific manga, "
"orders, or stock. Respond naturally in the user's language."
)
}
]
def converse(self, messages: list, tool_handlers: dict = None) -> dict:
"""
Execute a multi-turn conversation with automatic tool use handling.
Args:
messages: Conversation history in Converse API format
tool_handlers: Dict mapping tool names to handler functions
Returns:
Final response with conversation history
"""
tool_handlers = tool_handlers or {}
max_tool_rounds = 5 # Prevent infinite tool loops
for round_num in range(max_tool_rounds):
response = self._client.converse(
modelId=self._model_id,
messages=messages,
system=self._system_prompt,
toolConfig=self.TOOL_CONFIG,
inferenceConfig={
"maxTokens": 2048,
"temperature": 0.5,
"topP": 0.95
}
)
stop_reason = response["stopReason"]
assistant_message = response["output"]["message"]
messages.append(assistant_message)
if stop_reason == "end_turn":
# Model finished responding — return final text
return {
"messages": messages,
"response": self._extract_text(assistant_message),
"usage": response["usage"],
"stop_reason": stop_reason,
"tool_rounds": round_num
}
elif stop_reason == "tool_use":
# Model wants to call tools — execute them and continue
tool_results = []
for block in assistant_message["content"]:
if block.get("toolUse"):
tool_use = block["toolUse"]
tool_name = tool_use["name"]
tool_input = tool_use["input"]
tool_use_id = tool_use["toolUseId"]
handler = tool_handlers.get(tool_name)
if handler:
try:
result = handler(tool_input)
tool_results.append({
"toolResult": {
"toolUseId": tool_use_id,
"content": [{"json": result}]
}
})
except Exception as e:
tool_results.append({
"toolResult": {
"toolUseId": tool_use_id,
"content": [{"text": f"Error: {str(e)}"}],
"status": "error"
}
})
else:
tool_results.append({
"toolResult": {
"toolUseId": tool_use_id,
"content": [{"text": f"Tool {tool_name} not available"}],
"status": "error"
}
})
# Append tool results as user message and continue loop
messages.append({"role": "user", "content": tool_results})
else:
# Unexpected stop reason (max_tokens, etc.)
return {
"messages": messages,
"response": self._extract_text(assistant_message),
"usage": response["usage"],
"stop_reason": stop_reason,
"tool_rounds": round_num
}
return {
"messages": messages,
"response": "Maximum tool interaction rounds reached.",
"usage": {},
"stop_reason": "max_tool_rounds",
"tool_rounds": max_tool_rounds
}
def _extract_text(self, message: dict) -> str:
"""Extract text content from a Converse API message."""
texts = []
for block in message.get("content", []):
if block.get("text"):
texts.append(block["text"])
return "\n".join(texts)
10. Quick Reference — Interaction Pattern Selection
| Pattern | When to Use | Latency | Cost | MangaAssist Example |
|---|---|---|---|---|
InvokeModel sync |
Single-turn, < 30s response needed | 1-10s | Per-request | Quick manga info lookup |
InvokeModelWithResponseStream |
Chat UI, progressive display | TTFT 200-800ms | Per-request | Real-time conversation |
Converse / ConverseStream |
Multi-turn with tools | 1-12s per turn | Per-request | Catalog search + recommendation |
| SQS + Lambda async | Batch work, cost-tolerant | Minutes | Per-request | Nightly catalog enrichment |
| SQS + ECS async | Long-running, heavy analysis | Minutes-hours | Per-request | Full series analysis |
| API Gateway REST + validation | External partner access | Proxy overhead ~50ms | Per-request + API GW | Partner manga data API |
| API Gateway WebSocket | Persistent real-time chat | Connection setup ~200ms | Per-message + connection | Customer chat sessions |
| Batch Inference Job | Massive offline processing | Hours | 50% discount | Monthly full-catalog reindex |
11. IAM Policies for Model Interaction
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "BedrockInvokeModels",
"Effect": "Allow",
"Action": [
"bedrock:InvokeModel",
"bedrock:InvokeModelWithResponseStream"
],
"Resource": [
"arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0",
"arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-3-haiku-20240307-v1:0"
]
},
{
"Sid": "BedrockConverse",
"Effect": "Allow",
"Action": [
"bedrock:Converse",
"bedrock:ConverseStream"
],
"Resource": [
"arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0"
]
},
{
"Sid": "BedrockBatchInference",
"Effect": "Allow",
"Action": [
"bedrock:CreateModelInvocationJob",
"bedrock:GetModelInvocationJob",
"bedrock:ListModelInvocationJobs",
"bedrock:StopModelInvocationJob"
],
"Resource": "*"
},
{
"Sid": "SQSAsyncQueues",
"Effect": "Allow",
"Action": [
"sqs:SendMessage",
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:GetQueueUrl"
],
"Resource": [
"arn:aws:sqs:us-east-1:123456789012:manga-fm-*"
]
}
]
}
12. Cost Estimation Model
Daily volume: 1,000,000 messages
Routing split: 80% Haiku, 20% Sonnet
Average tokens per request: 500 input, 300 output
Haiku (800,000 requests/day):
Input: 800,000 * 500 / 1,000,000 * $0.25 = $100.00
Output: 800,000 * 300 / 1,000,000 * $1.25 = $300.00
Haiku daily: $400.00
Sonnet (200,000 requests/day):
Input: 200,000 * 500 / 1,000,000 * $3.00 = $300.00
Output: 200,000 * 300 / 1,000,000 * $15.00 = $900.00
Sonnet daily: $1,200.00
Total FM cost/day: $1,600.00
Total FM cost/month: ~$48,000.00
Cache hit rate at 15% saves: ~$7,200/month
Batch at 50% discount for nightly jobs: additional savings variable
End of File 1 — Flexible Model Interaction Architecture