Accessible Interface Architecture
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Field | Value |
|---|---|
| Domain | 2 — Implementation & Integration |
| Task | 2.5 — Application Integration Patterns |
| Skill | 2.5.2 — Accessible AI Interfaces |
| Focus | Amplify chat widget, API-first design, Prompt Flows visual builder |
| MangaAssist Relevance | Amplify-hosted React chat frontend, OpenAPI-first API contract, Bedrock Prompt Flows for no-code prompt iteration |
Mind Map
mindmap
root((Accessible AI Interface Architecture))
Amplify Chat Widget
React Component Library
Chat bubble UI
Markdown rendering
Japanese IME support
WebSocket Integration
API Gateway connection
Reconnection logic
Connection state machine
Hosting & Deployment
Amplify Hosting CI/CD
CloudFront CDN
Custom domain + SSL
Authentication
Cognito User Pool
Social login (LINE, Google)
Guest access tokens
API-First Design
OpenAPI 3.1 Specification
Chat endpoint schemas
WebSocket message schemas
Error response catalog
Contract-First Development
Schema validation middleware
Auto-generated SDKs
API versioning strategy
Documentation
Interactive API explorer
Code samples (JS, Python, cURL)
Rate limit documentation
Bedrock Prompt Flows
Visual Builder
Drag-and-drop nodes
Flow branching logic
Variable binding
Flow Types
Prompt node
Condition node
Iterator node
Lambda node
MangaAssist Flows
Genre classifier
Recommendation chain
FAQ routing
Version Management
Flow aliases
Canary deployment
Rollback strategy
Accessibility Standards
WCAG 2.1 AA Compliance
Keyboard navigation
Screen reader support
Color contrast 4.5:1
Internationalization
Japanese primary
English secondary
RTL awareness
Mobile Responsiveness
Touch targets ≥44px
Viewport scaling
Offline graceful degradation
Architecture Overview
MangaAssist's frontend architecture uses AWS Amplify to host a React-based chat interface that communicates with the backend via API Gateway WebSocket. The API contract is defined OpenAPI-first, and Bedrock Prompt Flows enable non-developer team members to iterate on prompt engineering without code changes.
Full-Stack Architecture
graph TB
subgraph Frontend["Amplify Frontend"]
REACT[React Chat App<br/>TypeScript + Tailwind]
COMPONENTS[Chat Components<br/>Message bubbles, Input bar<br/>Typing indicator]
WS_CLIENT[WebSocket Client<br/>Reconnect + Backpressure]
AUTH_UI[Auth UI<br/>Cognito Hosted UI<br/>LINE / Google SSO]
end
subgraph CDN["Edge Layer"]
CF[CloudFront CDN<br/>ap-northeast-1]
AMPLIFY_HOST[Amplify Hosting<br/>CI/CD from Git]
end
subgraph API["API Layer"]
APIGW_WS[API Gateway<br/>WebSocket API]
APIGW_REST[API Gateway<br/>REST API + OpenAPI]
COGNITO[Cognito User Pool<br/>JWT Tokens]
end
subgraph Backend["Backend Services"]
ORCHESTRATOR[ECS Fargate<br/>Orchestrator]
PROMPT_FLOWS[Bedrock Prompt Flows<br/>Genre Classifier<br/>Recommendation Chain]
BEDROCK[Bedrock Runtime<br/>Claude 3 Sonnet/Haiku]
end
subgraph Data["Data Layer"]
DYNAMO[DynamoDB]
OPENSEARCH[OpenSearch Serverless]
REDIS[ElastiCache Redis]
end
REACT --> CF
CF --> AMPLIFY_HOST
REACT --> WS_CLIENT
WS_CLIENT --> APIGW_WS
AUTH_UI --> COGNITO
COGNITO --> APIGW_WS
COGNITO --> APIGW_REST
APIGW_WS --> ORCHESTRATOR
APIGW_REST --> ORCHESTRATOR
ORCHESTRATOR --> PROMPT_FLOWS
PROMPT_FLOWS --> BEDROCK
ORCHESTRATOR --> DYNAMO
ORCHESTRATOR --> OPENSEARCH
ORCHESTRATOR --> REDIS
style REACT fill:#61dafb,color:#000
style BEDROCK fill:#ff9900,color:#000
style PROMPT_FLOWS fill:#8b5cf6,color:#fff
style CF fill:#f38020,color:#000
Amplify Chat Widget Architecture
React Chat Component Structure
graph TB
subgraph App["MangaAssist Chat App"]
PROVIDER[ChatProvider<br/>Context + State]
LAYOUT[ChatLayout<br/>Responsive shell]
end
subgraph Components["Chat Components"]
HEADER[ChatHeader<br/>Title, connection status]
MESSAGES[MessageList<br/>Virtualized scroll]
BUBBLE[MessageBubble<br/>User / Assistant]
TYPING[TypingIndicator<br/>Animated dots]
INPUT[ChatInput<br/>Japanese IME aware]
SUGGESTIONS[SuggestionChips<br/>Quick actions]
end
subgraph Hooks["Custom Hooks"]
USE_WS[useWebSocket<br/>Connection management]
USE_CHAT[useChat<br/>Message state]
USE_AUTH[useAuth<br/>Cognito tokens]
USE_STREAM[useStreamResponse<br/>Chunk assembly]
end
PROVIDER --> LAYOUT
LAYOUT --> HEADER
LAYOUT --> MESSAGES
MESSAGES --> BUBBLE
MESSAGES --> TYPING
LAYOUT --> INPUT
LAYOUT --> SUGGESTIONS
INPUT --> USE_CHAT
USE_CHAT --> USE_WS
USE_WS --> USE_AUTH
BUBBLE --> USE_STREAM
style PROVIDER fill:#61dafb,color:#000
style USE_WS fill:#10b981,color:#000
Amplify Chat Widget Implementation
"""
MangaAssist Amplify Backend Configuration
CDK infrastructure for Amplify-hosted chat frontend.
"""
import json
from aws_cdk import (
Stack,
Duration,
RemovalPolicy,
CfnOutput,
aws_amplify as amplify,
aws_cognito as cognito,
aws_iam as iam,
aws_cloudfront as cloudfront,
aws_cloudfront_origins as origins,
)
from constructs import Construct
class MangaAssistFrontendStack(Stack):
"""CDK Stack for MangaAssist Amplify frontend."""
def __init__(self, scope: Construct, id: str, **kwargs):
super().__init__(scope, id, **kwargs)
# --- Cognito User Pool ---
user_pool = cognito.UserPool(
self, "MangaAssistUserPool",
user_pool_name="manga-assist-users",
self_sign_up_enabled=True,
sign_in_aliases=cognito.SignInAliases(email=True),
auto_verify=cognito.AutoVerifiedAttrs(email=True),
password_policy=cognito.PasswordPolicy(
min_length=8,
require_lowercase=True,
require_digits=True,
require_symbols=False,
),
account_recovery=cognito.AccountRecovery.EMAIL_ONLY,
removal_policy=RemovalPolicy.RETAIN,
)
# LINE Login identity provider
line_provider = cognito.UserPoolIdentityProviderOidc(
self, "LineProvider",
user_pool=user_pool,
name="LINE",
client_id="{{resolve:ssm:/manga-assist/line-client-id}}",
client_secret="{{resolve:ssm:/manga-assist/line-client-secret}}",
issuer_url="https://access.line.me",
scopes=["openid", "profile", "email"],
attribute_mapping=cognito.AttributeMapping(
email=cognito.ProviderAttribute.other("email"),
fullname=cognito.ProviderAttribute.other("name"),
),
)
# App client
app_client = user_pool.add_client(
"MangaAssistWebClient",
user_pool_client_name="manga-web",
auth_flows=cognito.AuthFlow(
user_srp=True,
custom=True,
),
o_auth=cognito.OAuthSettings(
flows=cognito.OAuthFlows(authorization_code_grant=True),
scopes=[cognito.OAuthScope.OPENID, cognito.OAuthScope.PROFILE],
callback_urls=[
"https://manga-assist.example.com/callback",
"http://localhost:3000/callback",
],
logout_urls=[
"https://manga-assist.example.com/",
"http://localhost:3000/",
],
),
supported_identity_providers=[
cognito.UserPoolClientIdentityProvider.COGNITO,
cognito.UserPoolClientIdentityProvider.custom("LINE"),
],
access_token_validity=Duration.hours(1),
id_token_validity=Duration.hours(1),
refresh_token_validity=Duration.days(30),
)
# --- Amplify App ---
amplify_app = amplify.CfnApp(
self, "MangaAssistAmplifyApp",
name="manga-assist-chat",
repository="https://github.com/manga-corp/manga-assist-frontend",
access_token="{{resolve:secretsmanager:github-token}}",
build_spec=json.dumps({
"version": 1,
"frontend": {
"phases": {
"preBuild": {
"commands": ["npm ci"]
},
"build": {
"commands": ["npm run build"]
},
},
"artifacts": {
"baseDirectory": "build",
"files": ["**/*"],
},
"cache": {
"paths": ["node_modules/**/*"],
},
},
}),
environment_variables=[
amplify.CfnApp.EnvironmentVariableProperty(
name="REACT_APP_WS_ENDPOINT",
value="wss://ws-api.manga-assist.example.com",
),
amplify.CfnApp.EnvironmentVariableProperty(
name="REACT_APP_COGNITO_USER_POOL_ID",
value=user_pool.user_pool_id,
),
amplify.CfnApp.EnvironmentVariableProperty(
name="REACT_APP_COGNITO_CLIENT_ID",
value=app_client.user_pool_client_id,
),
amplify.CfnApp.EnvironmentVariableProperty(
name="REACT_APP_REGION",
value="ap-northeast-1",
),
],
custom_rules=[
amplify.CfnApp.CustomRuleProperty(
source="</^[^.]+$|\\.(?!(css|gif|ico|jpg|js|png|txt|svg|woff|woff2|ttf|map|json)$)([^.]+$)/>",
target="/index.html",
status="200",
),
],
)
# Production branch
amplify.CfnBranch(
self, "MainBranch",
app_id=amplify_app.attr_app_id,
branch_name="main",
enable_auto_build=True,
stage="PRODUCTION",
)
# Outputs
CfnOutput(self, "UserPoolId", value=user_pool.user_pool_id)
CfnOutput(self, "AppClientId", value=app_client.user_pool_client_id)
CfnOutput(
self, "AmplifyAppUrl",
value=f"https://main.{amplify_app.attr_default_domain}",
)
WebSocket Client Hook
"""
MangaAssist WebSocket Client Logic (Python equivalent of the React hook).
Demonstrates the reconnection state machine and backpressure handling.
"""
import json
import time
import logging
import asyncio
from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, Optional
logger = logging.getLogger(__name__)
class ConnectionState(Enum):
DISCONNECTED = "disconnected"
CONNECTING = "connecting"
CONNECTED = "connected"
RECONNECTING = "reconnecting"
FAILED = "failed"
@dataclass
class WebSocketConfig:
"""WebSocket connection configuration."""
url: str
auth_token: str
session_id: str
max_reconnect_attempts: int = 5
reconnect_base_delay: float = 1.0
reconnect_max_delay: float = 30.0
ping_interval: float = 30.0
connection_timeout: float = 10.0
class MangaAssistWebSocketClient:
"""
WebSocket client with reconnection state machine.
Python reference implementation of the React useWebSocket hook.
"""
def __init__(
self,
config: WebSocketConfig,
on_message: Callable[[dict], None],
on_state_change: Callable[[ConnectionState], None],
):
self.config = config
self.on_message = on_message
self.on_state_change = on_state_change
self.state = ConnectionState.DISCONNECTED
self.reconnect_attempts = 0
self._ws = None
self._ping_task = None
def _set_state(self, new_state: ConnectionState) -> None:
old = self.state
self.state = new_state
logger.info(f"WebSocket state: {old.value} → {new_state.value}")
self.on_state_change(new_state)
async def connect(self) -> None:
"""Establish WebSocket connection."""
import websockets
self._set_state(ConnectionState.CONNECTING)
url = (
f"{self.config.url}"
f"?sessionId={self.config.session_id}"
f"&token={self.config.auth_token}"
)
try:
self._ws = await asyncio.wait_for(
websockets.connect(url),
timeout=self.config.connection_timeout,
)
self._set_state(ConnectionState.CONNECTED)
self.reconnect_attempts = 0
# Start ping keepalive
self._ping_task = asyncio.create_task(self._ping_loop())
# Start message listener
await self._listen()
except asyncio.TimeoutError:
logger.error("WebSocket connection timeout")
await self._handle_disconnect()
except Exception as e:
logger.error(f"WebSocket connection failed: {e}")
await self._handle_disconnect()
async def _listen(self) -> None:
"""Listen for incoming messages."""
try:
async for raw_message in self._ws:
try:
data = json.loads(raw_message)
msg_type = data.get("type")
if msg_type == "backpressure":
await self._handle_backpressure(data)
elif msg_type == "pong":
pass # Keepalive response
else:
self.on_message(data)
except json.JSONDecodeError:
logger.warning(f"Non-JSON message: {raw_message[:100]}")
except Exception as e:
logger.error(f"WebSocket listener error: {e}")
await self._handle_disconnect()
async def send_message(self, message: str) -> bool:
"""Send a chat message."""
if self.state != ConnectionState.CONNECTED:
logger.warning(f"Cannot send in state {self.state.value}")
return False
payload = json.dumps({
"action": "chat",
"message": message,
"sessionId": self.config.session_id,
}, ensure_ascii=False)
try:
await self._ws.send(payload)
return True
except Exception as e:
logger.error(f"Send failed: {e}")
await self._handle_disconnect()
return False
async def _handle_disconnect(self) -> None:
"""Handle disconnection with exponential backoff reconnection."""
if self._ping_task:
self._ping_task.cancel()
if self.reconnect_attempts >= self.config.max_reconnect_attempts:
self._set_state(ConnectionState.FAILED)
return
self._set_state(ConnectionState.RECONNECTING)
self.reconnect_attempts += 1
# Exponential backoff with jitter
delay = min(
self.config.reconnect_base_delay * (2 ** (self.reconnect_attempts - 1)),
self.config.reconnect_max_delay,
)
import random
delay *= random.uniform(0.5, 1.5)
logger.info(
f"Reconnecting in {delay:.1f}s "
f"(attempt {self.reconnect_attempts}/{self.config.max_reconnect_attempts})"
)
await asyncio.sleep(delay)
await self.connect()
async def _handle_backpressure(self, data: dict) -> None:
"""Handle server backpressure signal."""
action = data.get("action")
retry_after = data.get("retryAfter", 5)
if action == "pause":
logger.warning(
f"Server backpressure: pausing for {retry_after}s"
)
# Notify UI to show "server busy" message
self.on_message({
"type": "system",
"text": data.get("message_ja", "サーバーが混雑しています"),
})
await asyncio.sleep(retry_after)
async def _ping_loop(self) -> None:
"""Send periodic pings to keep connection alive."""
while self.state == ConnectionState.CONNECTED:
try:
await self._ws.send(json.dumps({"action": "ping"}))
await asyncio.sleep(self.config.ping_interval)
except Exception:
break
async def disconnect(self) -> None:
"""Gracefully close the connection."""
if self._ping_task:
self._ping_task.cancel()
if self._ws:
await self._ws.close()
self._set_state(ConnectionState.DISCONNECTED)
API-First Design with OpenAPI
OpenAPI Specification Structure
graph LR
subgraph Spec["OpenAPI 3.1 Specification"]
INFO[Info<br/>Title, Version, Contact]
PATHS[Paths<br/>/chat, /sessions, /health]
SCHEMAS[Schemas<br/>Request/Response models]
SECURITY[Security<br/>Bearer JWT, API Key]
WS_SPEC[AsyncAPI Extension<br/>WebSocket messages]
end
subgraph Consumers["Spec Consumers"]
SDK_GEN[SDK Generator<br/>openapi-generator]
DOCS[API Docs<br/>Swagger UI / Redoc]
VALIDATE[Request Validator<br/>API Gateway]
MOCK[Mock Server<br/>Prism]
TEST[Contract Tests<br/>Schemathesis]
end
INFO --> DOCS
PATHS --> SDK_GEN
PATHS --> VALIDATE
SCHEMAS --> MOCK
SCHEMAS --> TEST
SECURITY --> SDK_GEN
WS_SPEC --> SDK_GEN
style Spec fill:#85ea2d,color:#000
style SDK_GEN fill:#ff6b35,color:#fff
OpenAPI Contract Definition
"""
MangaAssist OpenAPI specification generator.
Produces the contract that drives SDK generation, validation, and documentation.
"""
import json
import yaml
def generate_openapi_spec() -> dict:
"""Generate the MangaAssist OpenAPI 3.1 specification."""
return {
"openapi": "3.1.0",
"info": {
"title": "MangaAssist Chat API",
"version": "2.0.0",
"description": (
"AI-powered manga recommendation chatbot API. "
"Supports both REST and WebSocket for real-time streaming."
),
"contact": {
"name": "MangaAssist API Team",
"email": "api@manga-assist.example.com",
},
},
"servers": [
{
"url": "https://api.manga-assist.example.com/v2",
"description": "Production",
},
{
"url": "https://api-staging.manga-assist.example.com/v2",
"description": "Staging",
},
],
"paths": {
"/chat": {
"post": {
"operationId": "sendChatMessage",
"summary": "Send a chat message",
"description": (
"Send a message to the MangaAssist chatbot. "
"Returns a streamed or complete response depending "
"on the Accept header."
),
"tags": ["Chat"],
"security": [{"bearerAuth": []}, {"apiKey": []}],
"requestBody": {
"required": True,
"content": {
"application/json": {
"schema": {"$ref": "#/components/schemas/ChatRequest"},
"examples": {
"japanese": {
"summary": "Japanese manga recommendation",
"value": {
"sessionId": "sess_abc123",
"message": "鬼滅の刃みたいなマンガを教えてください",
"language": "ja",
},
},
"english": {
"summary": "English query",
"value": {
"sessionId": "sess_abc123",
"message": "Recommend manga similar to Attack on Titan",
"language": "en",
},
},
},
}
},
},
"responses": {
"200": {
"description": "Successful response",
"content": {
"application/json": {
"schema": {"$ref": "#/components/schemas/ChatResponse"},
},
},
},
"400": {
"description": "Invalid request",
"content": {
"application/json": {
"schema": {"$ref": "#/components/schemas/ErrorResponse"},
},
},
},
"429": {
"description": "Rate limited or quota exceeded",
"headers": {
"Retry-After": {
"schema": {"type": "integer"},
"description": "Seconds to wait before retrying",
},
},
"content": {
"application/json": {
"schema": {"$ref": "#/components/schemas/ErrorResponse"},
},
},
},
},
},
},
"/sessions/{sessionId}": {
"get": {
"operationId": "getSession",
"summary": "Get session details",
"tags": ["Sessions"],
"security": [{"bearerAuth": []}],
"parameters": [
{
"name": "sessionId",
"in": "path",
"required": True,
"schema": {"type": "string"},
}
],
"responses": {
"200": {
"description": "Session details",
"content": {
"application/json": {
"schema": {"$ref": "#/components/schemas/SessionResponse"},
},
},
},
},
},
},
"/health": {
"get": {
"operationId": "healthCheck",
"summary": "Health check",
"tags": ["System"],
"responses": {
"200": {
"description": "Service is healthy",
"content": {
"application/json": {
"schema": {"$ref": "#/components/schemas/HealthResponse"},
},
},
},
},
},
},
},
"components": {
"schemas": {
"ChatRequest": {
"type": "object",
"required": ["sessionId", "message"],
"properties": {
"sessionId": {
"type": "string",
"description": "Session identifier",
"example": "sess_abc123",
},
"message": {
"type": "string",
"maxLength": 5000,
"description": "User message in Japanese or English",
},
"language": {
"type": "string",
"enum": ["ja", "en"],
"default": "ja",
},
"maxTokens": {
"type": "integer",
"minimum": 50,
"maximum": 2048,
"default": 1024,
},
"idempotencyKey": {
"type": "string",
"description": "Optional key for request deduplication",
},
},
},
"ChatResponse": {
"type": "object",
"properties": {
"success": {"type": "boolean"},
"data": {
"type": "object",
"properties": {
"sessionId": {"type": "string"},
"text": {"type": "string"},
},
},
"metadata": {
"type": "object",
"properties": {
"model": {"type": "string"},
"tokensUsed": {
"type": "object",
"properties": {
"input": {"type": "integer"},
"output": {"type": "integer"},
},
},
"latencyMs": {"type": "integer"},
},
},
},
},
"ErrorResponse": {
"type": "object",
"properties": {
"success": {"type": "boolean", "enum": [False]},
"error": {
"type": "object",
"properties": {
"code": {"type": "string"},
"message": {"type": "string"},
"retryAfter": {"type": "integer"},
},
},
},
},
"SessionResponse": {
"type": "object",
"properties": {
"sessionId": {"type": "string"},
"turnCount": {"type": "integer"},
"tokensUsed": {
"type": "object",
"properties": {
"input": {"type": "integer"},
"output": {"type": "integer"},
},
},
"createdAt": {"type": "string", "format": "date-time"},
},
},
"HealthResponse": {
"type": "object",
"properties": {
"status": {"type": "string", "enum": ["ok", "degraded", "down"]},
"services": {
"type": "object",
"properties": {
"bedrock": {"type": "string"},
"dynamodb": {"type": "string"},
"opensearch": {"type": "string"},
"redis": {"type": "string"},
},
},
},
},
},
"securitySchemes": {
"bearerAuth": {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT",
"description": "Cognito JWT token",
},
"apiKey": {
"type": "apiKey",
"in": "header",
"name": "X-API-Key",
"description": "Partner API key",
},
},
},
}
def export_spec(format: str = "yaml") -> str:
"""Export the OpenAPI spec as YAML or JSON."""
spec = generate_openapi_spec()
if format == "yaml":
return yaml.dump(spec, default_flow_style=False, allow_unicode=True)
return json.dumps(spec, indent=2, ensure_ascii=False)
Bedrock Prompt Flows Architecture
Bedrock Prompt Flows allow the MangaAssist content team to iterate on prompts and conversation routing without deploying code. The visual builder defines flows that the orchestrator invokes at runtime.
Prompt Flow Design
graph TB
subgraph Input["Flow Input"]
USER_MSG[User Message]
SESSION[Session Context]
RAG[RAG Results]
end
subgraph GenreClassifier["Genre Classifier Flow"]
CLASSIFY[Prompt Node<br/>Classify intent + genre<br/>Model: Haiku]
COND_GENRE{Condition Node<br/>Genre detected?}
FAQ_CHECK{Condition Node<br/>Is FAQ?}
end
subgraph Recommendation["Recommendation Flow"]
REC_PROMPT[Prompt Node<br/>Generate recommendations<br/>Model: Sonnet]
FORMAT[Prompt Node<br/>Format for JP audience<br/>Model: Haiku]
end
subgraph FAQ["FAQ Flow"]
FAQ_LOOKUP[Lambda Node<br/>DynamoDB FAQ lookup]
FAQ_FORMAT[Prompt Node<br/>Natural language answer<br/>Model: Haiku]
end
subgraph Fallback["General Chat Flow"]
GENERAL[Prompt Node<br/>General conversation<br/>Model: Haiku]
end
USER_MSG --> CLASSIFY
SESSION --> CLASSIFY
RAG --> REC_PROMPT
CLASSIFY --> COND_GENRE
COND_GENRE -->|Yes: Shonen, Shoujo, etc.| REC_PROMPT
COND_GENRE -->|No| FAQ_CHECK
FAQ_CHECK -->|Yes| FAQ_LOOKUP
FAQ_CHECK -->|No| GENERAL
REC_PROMPT --> FORMAT
FAQ_LOOKUP --> FAQ_FORMAT
style CLASSIFY fill:#8b5cf6,color:#fff
style REC_PROMPT fill:#ff9900,color:#000
style FAQ_LOOKUP fill:#3b48cc,color:#fff
Prompt Flow Invoker
"""
MangaAssist Bedrock Prompt Flows Integration
Invokes visual Prompt Flows for genre classification and recommendation generation.
"""
import json
import time
import logging
from typing import Optional
import boto3
from botocore.config import Config
logger = logging.getLogger(__name__)
bedrock_agent = boto3.client(
"bedrock-agent-runtime",
config=Config(
region_name="ap-northeast-1",
retries={"max_attempts": 2, "mode": "adaptive"},
read_timeout=30,
),
)
class PromptFlowInvoker:
"""
Invoke Bedrock Prompt Flows for MangaAssist use cases.
Uses flow aliases for safe canary deployment.
"""
# Flow IDs and aliases — configured via SSM Parameter Store in production
FLOWS = {
"genre_classifier": {
"flow_id": "FLOW_GENRE_CLASSIFIER_ID",
"alias_id": "ALIAS_PROD", # Points to latest validated version
},
"recommendation": {
"flow_id": "FLOW_RECOMMENDATION_ID",
"alias_id": "ALIAS_PROD",
},
"faq_router": {
"flow_id": "FLOW_FAQ_ROUTER_ID",
"alias_id": "ALIAS_PROD",
},
}
def invoke_genre_classifier(
self, user_message: str, conversation_context: str = ""
) -> dict:
"""
Classify the user's intent and preferred manga genre.
Returns:
{
"intent": "recommendation" | "faq" | "general",
"genre": "shonen" | "shoujo" | "seinen" | ...,
"confidence": 0.0-1.0
}
"""
flow_config = self.FLOWS["genre_classifier"]
inputs = [
{
"content": {
"document": {
"user_message": user_message,
"conversation_context": conversation_context,
}
},
"nodeName": "FlowInput",
"nodeOutputName": "document",
}
]
try:
response = bedrock_agent.invoke_flow(
flowIdentifier=flow_config["flow_id"],
flowAliasIdentifier=flow_config["alias_id"],
inputs=inputs,
)
# Process the response stream
result = self._collect_flow_response(response)
# Parse structured output
classification = json.loads(result)
logger.info(
f"Genre classification: intent={classification.get('intent')}, "
f"genre={classification.get('genre')}, "
f"confidence={classification.get('confidence')}"
)
return classification
except Exception as e:
logger.error(f"Genre classifier flow failed: {e}")
# Fallback: treat as general query
return {
"intent": "general",
"genre": "unknown",
"confidence": 0.0,
}
def invoke_recommendation_flow(
self,
user_message: str,
genre: str,
rag_results: list[str],
conversation_context: str = "",
) -> str:
"""
Generate a manga recommendation using the Prompt Flow.
The flow handles prompt construction, model selection, and formatting.
"""
flow_config = self.FLOWS["recommendation"]
inputs = [
{
"content": {
"document": {
"user_message": user_message,
"genre": genre,
"rag_context": "\n".join(rag_results[:3]),
"conversation_context": conversation_context,
}
},
"nodeName": "FlowInput",
"nodeOutputName": "document",
}
]
try:
response = bedrock_agent.invoke_flow(
flowIdentifier=flow_config["flow_id"],
flowAliasIdentifier=flow_config["alias_id"],
inputs=inputs,
)
return self._collect_flow_response(response)
except Exception as e:
logger.error(f"Recommendation flow failed: {e}")
raise
def _collect_flow_response(self, response: dict) -> str:
"""Collect and assemble flow response from event stream."""
result_parts = []
for event in response.get("responseStream", []):
if "flowOutputEvent" in event:
output = event["flowOutputEvent"]
content = output.get("content", {})
if "document" in content:
doc = content["document"]
if isinstance(doc, str):
result_parts.append(doc)
elif isinstance(doc, dict):
result_parts.append(json.dumps(doc, ensure_ascii=False))
elif "flowCompletionEvent" in event:
completion = event["flowCompletionEvent"]
status = completion.get("completionReason")
if status != "SUCCESS":
logger.warning(f"Flow completed with status: {status}")
return "".join(result_parts)
class PromptFlowVersionManager:
"""
Manage Prompt Flow versions and aliases for safe deployment.
"""
def __init__(self):
self.bedrock_agent_client = boto3.client(
"bedrock-agent",
config=Config(region_name="ap-northeast-1"),
)
def create_canary_deployment(
self, flow_id: str, new_version: str, canary_percentage: int = 10
) -> dict:
"""
Deploy a new Prompt Flow version with canary traffic split.
Routes canary_percentage of traffic to new version, rest to current.
"""
# Get current production alias routing
alias_response = self.bedrock_agent_client.get_flow_alias(
flowIdentifier=flow_id,
aliasIdentifier="ALIAS_PROD",
)
current_version = alias_response.get("routingConfiguration", [{}])[0].get(
"flowVersion", "1"
)
# Update alias with canary routing
routing_config = [
{
"flowVersion": current_version,
"weight": 100 - canary_percentage,
},
{
"flowVersion": new_version,
"weight": canary_percentage,
},
]
self.bedrock_agent_client.update_flow_alias(
flowIdentifier=flow_id,
aliasIdentifier="ALIAS_PROD",
name="Production",
routingConfiguration=routing_config,
)
logger.info(
f"Canary deployment: {canary_percentage}% to v{new_version}, "
f"{100 - canary_percentage}% to v{current_version}"
)
return {
"flow_id": flow_id,
"canary_version": new_version,
"stable_version": current_version,
"canary_percentage": canary_percentage,
}
def promote_canary(self, flow_id: str, version: str) -> None:
"""Promote canary version to 100% production traffic."""
self.bedrock_agent_client.update_flow_alias(
flowIdentifier=flow_id,
aliasIdentifier="ALIAS_PROD",
name="Production",
routingConfiguration=[
{"flowVersion": version, "weight": 100},
],
)
logger.info(f"Promoted v{version} to 100% production")
def rollback(self, flow_id: str, stable_version: str) -> None:
"""Roll back to stable version."""
self.bedrock_agent_client.update_flow_alias(
flowIdentifier=flow_id,
aliasIdentifier="ALIAS_PROD",
name="Production",
routingConfiguration=[
{"flowVersion": stable_version, "weight": 100},
],
)
logger.warning(f"Rolled back to v{stable_version}")
Connection State Machine
stateDiagram-v2
[*] --> Disconnected
Disconnected --> Connecting: User opens chat
Connecting --> Connected: WebSocket open
Connecting --> Reconnecting: Connection timeout
Connected --> Disconnected: User closes chat
Connected --> Reconnecting: Connection lost
Connected --> Connected: Message sent/received
Reconnecting --> Connecting: Backoff delay elapsed
Reconnecting --> Failed: Max attempts reached
Failed --> Connecting: User clicks retry
Failed --> Disconnected: User abandons
note right of Connected
Active state:
- Messages flowing
- Ping/pong keepalive
- Backpressure responsive
end note
note right of Reconnecting
Exponential backoff:
1s, 2s, 4s, 8s, 16s
with ±50% jitter
end note
Key Takeaways
| # | Takeaway | MangaAssist Application |
|---|---|---|
| 1 | Amplify accelerates frontend delivery — Amplify Hosting provides Git-triggered CI/CD, CDN distribution, and custom domain with SSL, eliminating manual deployment pipelines for the chat widget. | MangaAssist frontend deploys automatically on every main branch push, with preview deployments for feature branches. |
| 2 | API-first design prevents integration drift — OpenAPI 3.1 spec drives SDK generation, request validation, mock servers, and contract tests from a single source of truth. | Partner integrations auto-generate TypeScript and Python SDKs from the spec; breaking changes are caught by contract tests in CI. |
| 3 | Prompt Flows enable non-developer iteration — The MangaAssist content team uses the visual builder to refine genre classification and recommendation prompts without code changes. | New manga genre support (e.g., adding "Isekai" classifier) goes from content team idea to production in hours, not sprint cycles. |
| 4 | WebSocket reconnection needs a state machine — Simple retry-on-disconnect is insufficient; a proper state machine handles connecting, connected, reconnecting, and failed states with backoff. | Mobile users on Tokyo trains experience frequent disconnects; the state machine reconnects transparently within 2-5 seconds. |
| 5 | Cognito + LINE SSO matches the Japanese market — LINE is the dominant messaging app in Japan; social login via LINE reduces friction compared to email/password. | MangaAssist registration converts at 3x higher rate with LINE SSO versus email-only signup. |
| 6 | Prompt Flow versioning enables canary deployment — Flow aliases support weighted routing, allowing 10% canary traffic to a new prompt version before full rollout. | A bad prompt version that generates irrelevant recommendations is caught at 10% traffic and rolled back before affecting most users. |
| 7 | Accessibility is a baseline, not a feature — WCAG 2.1 AA compliance (keyboard nav, screen readers, 4.5:1 contrast) ensures the chat widget works for all users including those with disabilities. | Japanese accessibility laws (JIS X 8341-3) require AA compliance; MangaAssist meets this from day one. |