GenAI API Design Patterns
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Field | Value |
|---|---|
| Domain | 2 — Implementation & Integration |
| Task | 2.5 — Application Integration Patterns |
| Skill | 2.5.1 — FM API Interfaces |
| Focus | Conversation APIs, token budget enforcement, idempotent retries |
| MangaAssist Relevance | Production API patterns for multi-turn manga recommendation conversations with cost controls |
Mind Map
mindmap
root((GenAI API Design Patterns))
Conversation API Design
Multi-Turn Protocol
Turn-based message format
Role alternation enforcement
System prompt injection
Context Management
Sliding window
Summarization fallback
Priority-based retention
Response Formatting
Structured JSON responses
Markdown rendering
Bilingual JP/EN support
Token Budget Enforcement
Pre-Request Validation
Input token estimation
Context window math
Rejection vs truncation
In-Flight Monitoring
Stream token counter
Budget breach detection
Graceful stop
Post-Request Accounting
Actual vs estimated reconciliation
Per-user daily caps
Alerting thresholds
Idempotent Retry Strategies
Request Fingerprinting
Content hash generation
Timestamp windowing
Session-scoped dedup
Retry Policies
Exponential backoff with jitter
Max retry ceiling
Error classification
Circuit Breaker
Failure rate tracking
Open/half-open/closed states
Fallback responses
Model Selection Router
Query Classification
Complexity scoring
Intent detection
Language detection
Cost-Aware Routing
Haiku for simple queries
Sonnet for complex reasoning
Fallback chains
A/B Testing
Traffic splitting
Quality metric comparison
Cost-quality tradeoff
Error Handling
Bedrock Errors
ThrottlingException
ModelTimeoutException
ValidationException
Graceful Degradation
Cached response fallback
Reduced quality mode
Maintenance messages
Client Communication
Typed error codes
Retry-After headers
User-friendly messages
Conversation API Design
Multi-Turn Message Protocol
MangaAssist follows a strict message protocol that ensures Claude 3 receives well-formed conversation history while enforcing business rules.
sequenceDiagram
participant User
participant API as API Layer
participant Validator as Request Validator
participant Budget as Token Budget
participant Context as Context Builder
participant Router as Model Router
participant Bedrock as Amazon Bedrock
User->>API: {"message": "鬼滅の刃みたいなマンガは?", "sessionId": "abc123"}
API->>Validator: Validate request schema
Validator->>Validator: Check required fields, sanitize input
Validator->>Budget: Estimate input tokens
Budget->>Budget: Check user daily quota remaining
Budget-->>Validator: OK (2,847 tokens remaining today)
Validator->>Context: Build conversation context
Context->>Context: Load session history (last 10 turns)
Context->>Context: Apply sliding window (≤3000 tokens)
Context->>Context: Inject system prompt + RAG results
Context->>Router: Route to model
Router->>Router: Classify query complexity
Router-->>Bedrock: Haiku (simple recommendation)
Bedrock-->>API: Streaming response
API-->>User: Streamed chunks + metadata
Conversation API Handler
"""
MangaAssist Conversation API
Handles multi-turn chat with token budget enforcement and model routing.
"""
import json
import time
import hashlib
import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
import boto3
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
class QueryComplexity(Enum):
"""Query complexity levels for model routing."""
SIMPLE = "simple" # Greetings, yes/no, simple lookups
MODERATE = "moderate" # Recommendations, comparisons
COMPLEX = "complex" # Multi-step reasoning, analysis, creative
class ModelChoice(Enum):
HAIKU = "anthropic.claude-3-haiku-20240307-v1:0"
SONNET = "anthropic.claude-3-sonnet-20240229-v1:0"
# Cost per 1M tokens
MODEL_COSTS = {
ModelChoice.HAIKU: {"input": 0.25, "output": 1.25},
ModelChoice.SONNET: {"input": 3.00, "output": 15.00},
}
@dataclass
class ConversationRequest:
"""Validated conversation request."""
session_id: str
user_id: str
message: str
language: str = "ja"
idempotency_key: Optional[str] = None
max_tokens: Optional[int] = None
stream: bool = True
@dataclass
class ConversationResponse:
"""Structured conversation response."""
session_id: str
message_id: str
text: str
model_used: str
input_tokens: int
output_tokens: int
cost_usd: float
latency_ms: int
cached: bool = False
@dataclass
class UserQuota:
"""Per-user daily token quota."""
daily_input_limit: int = 500_000
daily_output_limit: int = 200_000
daily_cost_limit_usd: float = 5.00
used_input_tokens: int = 0
used_output_tokens: int = 0
used_cost_usd: float = 0.0
@property
def input_remaining(self) -> int:
return max(0, self.daily_input_limit - self.used_input_tokens)
@property
def output_remaining(self) -> int:
return max(0, self.daily_output_limit - self.used_output_tokens)
@property
def cost_remaining(self) -> float:
return max(0.0, self.daily_cost_limit_usd - self.used_cost_usd)
def can_proceed(self, estimated_input: int, estimated_output: int) -> bool:
if self.used_input_tokens + estimated_input > self.daily_input_limit:
return False
if self.used_output_tokens + estimated_output > self.daily_output_limit:
return False
return True
class ConversationAPI:
"""
Production conversation API with token budgets and model routing.
"""
SYSTEM_PROMPT_JA = """あなたはMangaAssistです。日本のマンガストアのAIアシスタントとして、
お客様のマンガ選びをお手伝いします。
ルール:
- 丁寧な日本語で回答してください
- マンガのジャンル、作者、ストーリーに基づいた推薦を行います
- 在庫情報や価格は正確な情報のみ提供してください
- 不明な場合は正直にお伝えし、カスタマーサポートへの問い合わせを促してください
- 回答は簡潔に、300文字以内を目安にしてください"""
SYSTEM_PROMPT_EN = """You are MangaAssist, an AI assistant for a Japanese manga store.
You help customers discover and choose manga.
Rules:
- Provide recommendations based on genres, authors, and story preferences
- Only share accurate inventory and pricing information
- If uncertain, be honest and suggest contacting customer support
- Keep responses concise, around 150 words"""
# Keywords that indicate complex queries requiring Sonnet
COMPLEX_INDICATORS = [
"比較", "分析", "違い", "なぜ", "理由", "おすすめ理由",
"compare", "analyze", "difference", "explain why",
"ランキング", "トップ", "ベスト",
"似ている", "みたいな", "のような",
]
SIMPLE_INDICATORS = [
"こんにちは", "ありがとう", "はい", "いいえ",
"hello", "thanks", "yes", "no",
"在庫", "価格", "値段", "stock", "price",
]
def __init__(self, redis_client, session_manager):
self.redis = redis_client
self.session_manager = session_manager
self.bedrock = boto3.client(
"bedrock-runtime",
region_name="ap-northeast-1",
)
def validate_request(self, raw_body: dict) -> ConversationRequest:
"""
Validate and normalize incoming request.
Raises ValueError for invalid requests.
"""
message = raw_body.get("message", "").strip()
if not message:
raise ValueError("Message cannot be empty")
if len(message) > 5000:
raise ValueError("Message exceeds 5000 character limit")
session_id = raw_body.get("sessionId")
if not session_id:
raise ValueError("sessionId is required")
# Generate idempotency key if not provided
idempotency_key = raw_body.get("idempotencyKey")
if not idempotency_key:
ts_window = int(time.time()) // 5
idempotency_key = hashlib.sha256(
f"{session_id}:{message}:{ts_window}".encode()
).hexdigest()[:16]
# Detect language
jp_chars = sum(1 for c in message if ord(c) > 0x3000)
language = "ja" if jp_chars > len(message) * 0.2 else "en"
return ConversationRequest(
session_id=session_id,
user_id=raw_body.get("userId", "anonymous"),
message=message,
language=language,
idempotency_key=idempotency_key,
max_tokens=raw_body.get("maxTokens"),
stream=raw_body.get("stream", True),
)
def classify_complexity(self, message: str) -> QueryComplexity:
"""
Classify query complexity to determine model routing.
Simple queries go to Haiku (cheap), complex to Sonnet (capable).
"""
message_lower = message.lower()
# Check for simple patterns
for indicator in self.SIMPLE_INDICATORS:
if indicator in message_lower and len(message) < 50:
return QueryComplexity.SIMPLE
# Check for complex patterns
complex_score = sum(
1 for ind in self.COMPLEX_INDICATORS if ind in message_lower
)
if complex_score >= 2 or len(message) > 500:
return QueryComplexity.COMPLEX
if complex_score >= 1 or len(message) > 200:
return QueryComplexity.MODERATE
return QueryComplexity.SIMPLE
def select_model(self, complexity: QueryComplexity) -> ModelChoice:
"""Map complexity to model choice."""
routing = {
QueryComplexity.SIMPLE: ModelChoice.HAIKU,
QueryComplexity.MODERATE: ModelChoice.HAIKU, # Haiku handles moderate well
QueryComplexity.COMPLEX: ModelChoice.SONNET,
}
return routing[complexity]
def check_idempotency(self, key: str) -> Optional[ConversationResponse]:
"""Check if this request was already processed."""
cached = self.redis.get(f"idemp_resp:{key}")
if cached:
data = json.loads(cached)
return ConversationResponse(
session_id=data["session_id"],
message_id=data["message_id"],
text=data["text"],
model_used=data["model_used"],
input_tokens=data["input_tokens"],
output_tokens=data["output_tokens"],
cost_usd=data["cost_usd"],
latency_ms=data["latency_ms"],
cached=True,
)
return None
def get_user_quota(self, user_id: str) -> UserQuota:
"""Load user's daily quota from Redis."""
today = time.strftime("%Y-%m-%d")
key = f"quota:{user_id}:{today}"
data = self.redis.hgetall(key)
if not data:
return UserQuota()
return UserQuota(
used_input_tokens=int(data.get("input_tokens", 0)),
used_output_tokens=int(data.get("output_tokens", 0)),
used_cost_usd=float(data.get("cost_usd", 0.0)),
)
def update_user_quota(
self, user_id: str, input_tokens: int, output_tokens: int, cost_usd: float
) -> None:
"""Update user's daily usage counters."""
today = time.strftime("%Y-%m-%d")
key = f"quota:{user_id}:{today}"
pipe = self.redis.pipeline()
pipe.hincrby(key, "input_tokens", input_tokens)
pipe.hincrby(key, "output_tokens", output_tokens)
pipe.hincrbyfloat(key, "cost_usd", cost_usd)
pipe.expire(key, 86400) # Auto-expire at end of day
pipe.execute()
def build_prompt(
self,
request: ConversationRequest,
context_history: list[dict],
rag_results: list[str],
) -> tuple[str, list[dict]]:
"""
Build the full prompt with system instructions, RAG context, and history.
Returns (system_prompt, messages).
"""
system = (
self.SYSTEM_PROMPT_JA if request.language == "ja"
else self.SYSTEM_PROMPT_EN
)
# Inject RAG results into system prompt if available
if rag_results:
rag_context = "\n\n".join(rag_results[:3]) # Top 3 results
system += f"\n\n参考情報:\n{rag_context}"
# Build messages from history + current message
messages = list(context_history)
messages.append({
"role": "user",
"content": request.message,
})
return system, messages
def calculate_cost(
self, model: ModelChoice, input_tokens: int, output_tokens: int
) -> float:
"""Calculate request cost in USD."""
rates = MODEL_COSTS[model]
input_cost = (input_tokens / 1_000_000) * rates["input"]
output_cost = (output_tokens / 1_000_000) * rates["output"]
return input_cost + output_cost
async def handle_chat(self, raw_body: dict) -> ConversationResponse:
"""
Main entry point for chat requests.
Validates, budgets, routes, invokes, and tracks.
"""
start = time.time()
# 1. Validate request
request = self.validate_request(raw_body)
# 2. Check idempotency
cached_response = self.check_idempotency(request.idempotency_key)
if cached_response:
logger.info(f"Idempotent cache hit: {request.idempotency_key}")
return cached_response
# 3. Check user quota
quota = self.get_user_quota(request.user_id)
estimated_output = 300 # Conservative estimate
if not quota.can_proceed(500, estimated_output):
raise QuotaExceededException(
f"Daily quota exceeded. Remaining input: {quota.input_remaining}, "
f"output: {quota.output_remaining}"
)
# 4. Load session and build context
session = self.session_manager.get_or_create_session(
request.session_id, request.user_id
)
context_history = self.session_manager.get_context_for_prompt(session)
# 5. Classify and route
complexity = self.classify_complexity(request.message)
model = self.select_model(complexity)
# 6. Build prompt
rag_results = await self._retrieve_rag_context(request.message)
system_prompt, messages = self.build_prompt(
request, context_history, rag_results
)
# 7. Invoke Bedrock
response_text, input_tokens, output_tokens = await self._invoke_bedrock(
model=model,
system_prompt=system_prompt,
messages=messages,
max_tokens=request.max_tokens or 1024,
)
# 8. Calculate cost and update quota
cost = self.calculate_cost(model, input_tokens, output_tokens)
self.update_user_quota(request.user_id, input_tokens, output_tokens, cost)
# 9. Update session
self.session_manager.add_turn(
session, "user", request.message, token_count=input_tokens
)
self.session_manager.add_turn(
session, "assistant", response_text,
token_count=output_tokens, model_id=model.value,
)
latency_ms = int((time.time() - start) * 1000)
# 10. Build response
response = ConversationResponse(
session_id=request.session_id,
message_id=hashlib.sha256(
f"{request.session_id}:{time.time()}".encode()
).hexdigest()[:12],
text=response_text,
model_used=model.value,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost_usd=cost,
latency_ms=latency_ms,
)
# 11. Cache for idempotency
self._cache_response(request.idempotency_key, response)
logger.info(
"Chat completed",
extra={
"model": model.name,
"complexity": complexity.value,
"latency_ms": latency_ms,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cost_usd": f"{cost:.6f}",
},
)
return response
async def _retrieve_rag_context(self, query: str) -> list[str]:
"""Retrieve relevant manga information from OpenSearch."""
# Implementation calls OpenSearch Serverless vector search
# Returns top-k relevant product descriptions and reviews
pass
async def _invoke_bedrock(
self,
model: ModelChoice,
system_prompt: str,
messages: list[dict],
max_tokens: int,
) -> tuple[str, int, int]:
"""Invoke Bedrock and return (text, input_tokens, output_tokens)."""
# Implementation calls Bedrock InvokeModel or InvokeModelWithResponseStream
pass
def _cache_response(self, idempotency_key: str, response: ConversationResponse):
"""Cache response for idempotency."""
data = {
"session_id": response.session_id,
"message_id": response.message_id,
"text": response.text,
"model_used": response.model_used,
"input_tokens": response.input_tokens,
"output_tokens": response.output_tokens,
"cost_usd": response.cost_usd,
"latency_ms": response.latency_ms,
}
self.redis.setex(
f"idemp_resp:{idempotency_key}",
30, # 30-second dedup window
json.dumps(data, ensure_ascii=False),
)
class QuotaExceededException(Exception):
"""Raised when user exceeds daily token quota."""
pass
Token Budget Enforcement Architecture
Token budgets are the cost firewall of MangaAssist. Without strict enforcement, a single misbehaving client or prompt injection could generate thousands of dollars in Bedrock costs within minutes.
Token Budget Flow
graph TB
subgraph Pre_Request["Pre-Request Phase"]
ESTIMATE[Estimate Input Tokens<br/>JP: ~1 token/1.5 chars<br/>EN: ~1 token/4 chars]
CHECK_QUOTA[Check User Daily Quota<br/>Redis counter]
CHECK_CONTEXT[Check Context Window<br/>History + RAG + Prompt ≤ 180K]
TRUNCATE[Truncate if Needed<br/>Drop oldest turns first]
end
subgraph In_Flight["In-Flight Phase"]
STREAM_COUNT[Count Output Tokens<br/>From stream events]
BUDGET_CHECK[Check Against Cap<br/>max_tokens parameter]
FORCE_STOP[Force Stop if Budget<br/>Exceeded 110% of cap]
end
subgraph Post_Request["Post-Request Phase"]
ACTUAL[Get Actual Counts<br/>From message_delta event]
RECONCILE[Reconcile Estimate<br/>vs Actual]
UPDATE_QUOTA[Update User Quota<br/>Redis HINCRBY]
EMIT_METRIC[Emit CloudWatch<br/>Cost Metric]
end
ESTIMATE --> CHECK_QUOTA
CHECK_QUOTA -->|Quota OK| CHECK_CONTEXT
CHECK_QUOTA -->|Exceeded| REJECT[Reject Request<br/>429 Quota Exceeded]
CHECK_CONTEXT --> TRUNCATE
TRUNCATE --> STREAM_COUNT
STREAM_COUNT --> BUDGET_CHECK
BUDGET_CHECK --> FORCE_STOP
FORCE_STOP --> ACTUAL
ACTUAL --> RECONCILE
RECONCILE --> UPDATE_QUOTA
UPDATE_QUOTA --> EMIT_METRIC
style REJECT fill:#dc3545,color:#fff
style FORCE_STOP fill:#ffc107,color:#000
Token Budget Enforcer
"""
MangaAssist Token Budget Enforcer
Multi-layer token budget enforcement: pre-request, in-flight, and post-request.
"""
import time
import logging
from dataclasses import dataclass
from typing import Optional
logger = logging.getLogger(__name__)
@dataclass
class BudgetConfig:
"""Token budget configuration."""
# Per-request limits
max_input_tokens: int = 4000
max_output_tokens: int = 1024
max_context_window: int = 180_000 # Claude 3 Sonnet
# Per-user daily limits
daily_input_limit: int = 500_000
daily_output_limit: int = 200_000
daily_cost_limit_usd: float = 5.00
# System overhead
system_prompt_tokens: int = 300
rag_context_tokens: int = 800
safety_margin_tokens: int = 500
# In-flight controls
output_overshoot_tolerance: float = 1.10 # Allow 10% overshoot
@property
def available_for_history(self) -> int:
"""Tokens available for conversation history."""
return (
self.max_input_tokens
- self.system_prompt_tokens
- self.rag_context_tokens
- self.safety_margin_tokens
)
class TokenBudgetEnforcer:
"""Enforces token budgets at multiple levels."""
def __init__(self, redis_client, config: Optional[BudgetConfig] = None):
self.redis = redis_client
self.config = config or BudgetConfig()
def pre_request_check(
self, user_id: str, estimated_input_tokens: int
) -> dict:
"""
Pre-request validation. Returns budget allocation or rejection.
"""
# Check user daily quota
today = time.strftime("%Y-%m-%d")
quota_key = f"quota:{user_id}:{today}"
used = self.redis.hgetall(quota_key)
used_input = int(used.get("input_tokens", 0))
used_output = int(used.get("output_tokens", 0))
used_cost = float(used.get("cost_usd", 0.0))
if used_input + estimated_input_tokens > self.config.daily_input_limit:
return {
"allowed": False,
"reason": "daily_input_limit_exceeded",
"remaining_input": self.config.daily_input_limit - used_input,
}
if used_cost >= self.config.daily_cost_limit_usd:
return {
"allowed": False,
"reason": "daily_cost_limit_exceeded",
"remaining_cost_usd": self.config.daily_cost_limit_usd - used_cost,
}
# Check input doesn't exceed context window
if estimated_input_tokens > self.config.max_context_window:
return {
"allowed": False,
"reason": "context_window_exceeded",
"max_allowed": self.config.max_context_window,
}
# Calculate allowed output tokens
allowed_output = min(
self.config.max_output_tokens,
self.config.daily_output_limit - used_output,
)
return {
"allowed": True,
"allocated_output_tokens": max(allowed_output, 100),
"remaining_daily_input": self.config.daily_input_limit - used_input,
"remaining_daily_output": self.config.daily_output_limit - used_output,
}
def truncate_history(
self, history: list[dict], max_tokens: Optional[int] = None
) -> list[dict]:
"""
Truncate conversation history to fit within token budget.
Keeps most recent turns, drops oldest first.
Always preserves at least the last user-assistant pair.
"""
max_tokens = max_tokens or self.config.available_for_history
# Estimate tokens per turn
turns_with_tokens = []
for turn in history:
text = turn.get("content", "")
# Japanese: ~1 token per 1.5 chars; English: ~1 token per 4 chars
jp_chars = sum(1 for c in text if ord(c) > 0x3000)
en_chars = len(text) - jp_chars
estimated = int(jp_chars / 1.5) + (en_chars // 4) + 5 # +5 overhead
turns_with_tokens.append((turn, estimated))
# Walk backward, accumulate tokens
result = []
total = 0
for turn, tokens in reversed(turns_with_tokens):
if total + tokens > max_tokens and len(result) >= 2:
break
result.insert(0, turn)
total += tokens
logger.info(
f"History truncated: {len(history)} -> {len(result)} turns, "
f"~{total} tokens"
)
return result
def in_flight_check(
self, output_tokens_so_far: int, allocated_output: int
) -> bool:
"""
Check during streaming if we should force-stop.
Returns True if streaming should continue.
"""
max_allowed = int(allocated_output * self.config.output_overshoot_tolerance)
return output_tokens_so_far < max_allowed
def post_request_reconcile(
self,
user_id: str,
estimated_input: int,
actual_input: int,
actual_output: int,
model_id: str,
cost_usd: float,
) -> dict:
"""
Post-request: reconcile estimates with actuals and update quotas.
"""
drift = abs(estimated_input - actual_input) / max(actual_input, 1)
if drift > 0.20:
logger.warning(
f"Token estimation drift: {drift:.1%} "
f"(estimated={estimated_input}, actual={actual_input})"
)
# Update daily quota
today = time.strftime("%Y-%m-%d")
quota_key = f"quota:{user_id}:{today}"
pipe = self.redis.pipeline()
pipe.hincrby(quota_key, "input_tokens", actual_input)
pipe.hincrby(quota_key, "output_tokens", actual_output)
pipe.hincrbyfloat(quota_key, "cost_usd", cost_usd)
pipe.hincrby(quota_key, "request_count", 1)
pipe.expire(quota_key, 86400)
pipe.execute()
return {
"estimation_drift": drift,
"actual_input": actual_input,
"actual_output": actual_output,
"cost_usd": cost_usd,
}
Idempotent Retry Strategy
Retry Decision Tree
graph TB
ERROR[Error Received]
CLASSIFY{Classify Error}
RETRYABLE[Retryable Errors]
NON_RETRYABLE[Non-Retryable]
THROTTLE[ThrottlingException]
TIMEOUT[ModelTimeoutException]
SERVER[ServiceUnavailable 503]
NETWORK[ConnectionError]
VALIDATION[ValidationException]
ACCESS[AccessDeniedException]
BADREQ[BadRequestException]
QUOTA[QuotaExceeded]
CHECK_RETRY{Retry Count<br/>< Max?}
CALC_DELAY[Calculate Delay<br/>base × 2^attempt + jitter]
CHECK_CB{Circuit Breaker<br/>Status?}
WAIT[Wait with Backoff]
RETRY[Retry Request]
FAIL[Return Error<br/>to Client]
FALLBACK[Try Fallback<br/>Haiku if Sonnet failed]
ERROR --> CLASSIFY
CLASSIFY --> RETRYABLE
CLASSIFY --> NON_RETRYABLE
RETRYABLE --> THROTTLE
RETRYABLE --> TIMEOUT
RETRYABLE --> SERVER
RETRYABLE --> NETWORK
NON_RETRYABLE --> VALIDATION
NON_RETRYABLE --> ACCESS
NON_RETRYABLE --> BADREQ
NON_RETRYABLE --> QUOTA
NON_RETRYABLE --> FAIL
THROTTLE --> CHECK_RETRY
TIMEOUT --> CHECK_RETRY
SERVER --> CHECK_RETRY
NETWORK --> CHECK_RETRY
CHECK_RETRY -->|Yes| CALC_DELAY
CHECK_RETRY -->|No| CHECK_CB
CALC_DELAY --> WAIT
WAIT --> RETRY
CHECK_CB -->|Open| FALLBACK
CHECK_CB -->|Closed| FAIL
style RETRYABLE fill:#ffc107,color:#000
style NON_RETRYABLE fill:#dc3545,color:#fff
style FALLBACK fill:#28a745,color:#fff
Retry Controller with Circuit Breaker
"""
MangaAssist Retry Controller
Exponential backoff with jitter, circuit breaker, and model fallback.
"""
import time
import random
import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable, Optional, Any
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing recovery
@dataclass
class RetryConfig:
"""Retry behavior configuration."""
max_retries: int = 3
base_delay_seconds: float = 0.5
max_delay_seconds: float = 8.0
jitter_range: float = 0.5 # ±50% jitter
timeout_seconds: float = 25.0
# Circuit breaker settings
failure_threshold: int = 5 # Failures before opening circuit
recovery_timeout_seconds: float = 30.0 # Time before half-open
success_threshold: int = 2 # Successes to close circuit
RETRYABLE_ERRORS = {
"ThrottlingException",
"ModelTimeoutException",
"ServiceUnavailableException",
"InternalServerException",
"ModelStreamErrorException",
}
NON_RETRYABLE_ERRORS = {
"ValidationException",
"AccessDeniedException",
"ResourceNotFoundException",
"ModelNotReadyException",
}
class CircuitBreaker:
"""
Circuit breaker for Bedrock API calls.
Tracks failures per model and opens circuit when threshold is exceeded.
"""
def __init__(self, redis_client, config: RetryConfig):
self.redis = redis_client
self.config = config
def get_state(self, model_id: str) -> CircuitState:
"""Get current circuit state for a model."""
key = f"circuit:{model_id}"
data = self.redis.hgetall(key)
if not data:
return CircuitState.CLOSED
state = data.get("state", "closed")
failures = int(data.get("failures", 0))
last_failure = float(data.get("last_failure", 0))
if state == "open":
# Check if recovery timeout has elapsed
elapsed = time.time() - last_failure
if elapsed >= self.config.recovery_timeout_seconds:
self._set_state(model_id, CircuitState.HALF_OPEN)
return CircuitState.HALF_OPEN
return CircuitState.OPEN
if failures >= self.config.failure_threshold:
self._set_state(model_id, CircuitState.OPEN)
return CircuitState.OPEN
return CircuitState(state)
def record_success(self, model_id: str) -> None:
"""Record a successful call."""
key = f"circuit:{model_id}"
state = self.get_state(model_id)
if state == CircuitState.HALF_OPEN:
successes = self.redis.hincrby(key, "half_open_successes", 1)
if successes >= self.config.success_threshold:
self._set_state(model_id, CircuitState.CLOSED)
self.redis.delete(key)
elif state == CircuitState.CLOSED:
# Decay failure counter on success
self.redis.hincrby(key, "failures", -1)
failures = int(self.redis.hget(key, "failures") or 0)
if failures <= 0:
self.redis.delete(key)
def record_failure(self, model_id: str) -> None:
"""Record a failed call."""
key = f"circuit:{model_id}"
pipe = self.redis.pipeline()
pipe.hincrby(key, "failures", 1)
pipe.hset(key, "last_failure", str(time.time()))
pipe.expire(key, 300) # Auto-cleanup after 5 min
pipe.execute()
state = self.get_state(model_id)
if state == CircuitState.HALF_OPEN:
self._set_state(model_id, CircuitState.OPEN)
def _set_state(self, model_id: str, state: CircuitState) -> None:
key = f"circuit:{model_id}"
self.redis.hset(key, "state", state.value)
self.redis.expire(key, 300)
class RetryController:
"""
Manages retries with exponential backoff, jitter, and circuit breaker.
Supports model fallback (Sonnet -> Haiku).
"""
MODEL_FALLBACK = {
"anthropic.claude-3-sonnet-20240229-v1:0": "anthropic.claude-3-haiku-20240307-v1:0",
}
def __init__(
self,
redis_client,
config: Optional[RetryConfig] = None,
):
self.config = config or RetryConfig()
self.circuit_breaker = CircuitBreaker(redis_client, self.config)
def calculate_delay(self, attempt: int) -> float:
"""Calculate backoff delay with jitter."""
base = self.config.base_delay_seconds * (2 ** attempt)
capped = min(base, self.config.max_delay_seconds)
# Add jitter: ±jitter_range of the delay
jitter = capped * self.config.jitter_range
delay = capped + random.uniform(-jitter, jitter)
return max(0.1, delay) # Minimum 100ms
def execute_with_retry(
self,
func: Callable,
model_id: str,
*args,
**kwargs,
) -> Any:
"""
Execute a function with retry logic and circuit breaker.
Args:
func: The function to call (e.g., bedrock invoke)
model_id: The model being called
*args, **kwargs: Passed to func
Returns:
The function's return value
Raises:
Last exception if all retries exhausted
"""
# Check circuit breaker
state = self.circuit_breaker.get_state(model_id)
if state == CircuitState.OPEN:
fallback = self.MODEL_FALLBACK.get(model_id)
if fallback:
logger.warning(
f"Circuit open for {model_id}, falling back to {fallback}"
)
return self._try_fallback(func, fallback, *args, **kwargs)
raise CircuitOpenException(
f"Circuit breaker open for {model_id}"
)
last_exception = None
for attempt in range(self.config.max_retries + 1):
try:
result = func(model_id=model_id, *args, **kwargs)
self.circuit_breaker.record_success(model_id)
if attempt > 0:
logger.info(
f"Succeeded on retry {attempt} for {model_id}"
)
return result
except ClientError as e:
error_code = e.response["Error"]["Code"]
last_exception = e
if error_code in NON_RETRYABLE_ERRORS:
logger.error(
f"Non-retryable error {error_code}: {e}"
)
raise
if error_code in RETRYABLE_ERRORS:
self.circuit_breaker.record_failure(model_id)
if attempt < self.config.max_retries:
delay = self.calculate_delay(attempt)
logger.warning(
f"Retryable error {error_code} on attempt "
f"{attempt + 1}/{self.config.max_retries + 1}. "
f"Retrying in {delay:.2f}s"
)
time.sleep(delay)
else:
# All retries exhausted, try fallback
fallback = self.MODEL_FALLBACK.get(model_id)
if fallback:
return self._try_fallback(
func, fallback, *args, **kwargs
)
else:
logger.error(f"Unknown error code {error_code}: {e}")
raise
except Exception as e:
last_exception = e
self.circuit_breaker.record_failure(model_id)
if attempt < self.config.max_retries:
delay = self.calculate_delay(attempt)
logger.warning(
f"Unexpected error on attempt "
f"{attempt + 1}: {e}. Retrying in {delay:.2f}s"
)
time.sleep(delay)
raise last_exception
def _try_fallback(
self, func: Callable, fallback_model: str, *args, **kwargs
) -> Any:
"""Attempt request with fallback model."""
logger.info(f"Attempting fallback to {fallback_model}")
try:
result = func(model_id=fallback_model, *args, **kwargs)
self.circuit_breaker.record_success(fallback_model)
return result
except Exception as e:
self.circuit_breaker.record_failure(fallback_model)
raise
class CircuitOpenException(Exception):
"""Raised when circuit breaker is open and no fallback available."""
pass
Model Selection Router
graph LR
subgraph Input["Incoming Query"]
MSG[User Message]
end
subgraph Classification["Query Classification"]
INTENT[Intent Detection]
COMPLEXITY[Complexity Scoring<br/>Keywords + Length]
LANG[Language Detection<br/>JP / EN]
end
subgraph Routing["Model Router"]
CACHE_CHECK{Cache Hit?}
SIMPLE_Q{Simple?}
COMPLEX_Q{Complex?}
end
subgraph Models["Bedrock Models"]
REDIS_CACHE[Redis Cache<br/>$0.00/request]
HAIKU[Claude 3 Haiku<br/>$0.25/$1.25 per 1M]
SONNET[Claude 3 Sonnet<br/>$3/$15 per 1M]
end
MSG --> INTENT
MSG --> COMPLEXITY
MSG --> LANG
INTENT --> CACHE_CHECK
COMPLEXITY --> CACHE_CHECK
CACHE_CHECK -->|Yes| REDIS_CACHE
CACHE_CHECK -->|No| SIMPLE_Q
SIMPLE_Q -->|Yes| HAIKU
SIMPLE_Q -->|No| COMPLEX_Q
COMPLEX_Q -->|Yes| SONNET
COMPLEX_Q -->|No| HAIKU
style REDIS_CACHE fill:#28a745,color:#fff
style HAIKU fill:#17a2b8,color:#fff
style SONNET fill:#ff9900,color:#000
API Response Format Standards
"""
MangaAssist API Response Format
Standardized response envelopes for all API endpoints.
"""
import json
import time
from dataclasses import dataclass, asdict
from typing import Optional, Any
@dataclass
class APIResponse:
"""Standard API response envelope."""
success: bool
data: Optional[Any] = None
error: Optional[dict] = None
metadata: Optional[dict] = None
def to_dict(self) -> dict:
result = {"success": self.success}
if self.data is not None:
result["data"] = self.data
if self.error is not None:
result["error"] = self.error
if self.metadata is not None:
result["metadata"] = self.metadata
return result
def to_json(self) -> str:
return json.dumps(self.to_dict(), ensure_ascii=False)
@dataclass
class StreamChunk:
"""Individual streaming chunk sent via WebSocket."""
type: str # "chunk", "done", "error", "metadata"
text: Optional[str] = None
tokens_used: Optional[dict] = None
error_code: Optional[str] = None
message: Optional[str] = None
sequence: int = 0
def to_json(self) -> str:
data = {"type": self.type}
if self.text is not None:
data["text"] = self.text
if self.tokens_used is not None:
data["tokensUsed"] = self.tokens_used
if self.error_code is not None:
data["errorCode"] = self.error_code
if self.message is not None:
data["message"] = self.message
data["seq"] = self.sequence
return json.dumps(data, ensure_ascii=False)
# Error code catalog
ERROR_CODES = {
"QUOTA_EXCEEDED": {
"status": 429,
"message_ja": "本日のご利用上限に達しました。明日またお試しください。",
"message_en": "Daily usage limit reached. Please try again tomorrow.",
"retry_after": 86400,
},
"RATE_LIMITED": {
"status": 429,
"message_ja": "リクエストが多すぎます。少々お待ちください。",
"message_en": "Too many requests. Please wait a moment.",
"retry_after": 5,
},
"MODEL_TIMEOUT": {
"status": 504,
"message_ja": "応答に時間がかかっています。もう一度お試しください。",
"message_en": "Response timed out. Please try again.",
"retry_after": 2,
},
"INVALID_REQUEST": {
"status": 400,
"message_ja": "リクエストが不正です。",
"message_en": "Invalid request format.",
"retry_after": 0,
},
"SESSION_EXPIRED": {
"status": 410,
"message_ja": "セッションが期限切れです。新しいチャットを開始してください。",
"message_en": "Session expired. Please start a new chat.",
"retry_after": 0,
},
"INTERNAL_ERROR": {
"status": 500,
"message_ja": "内部エラーが発生しました。しばらくしてからお試しください。",
"message_en": "An internal error occurred. Please try again later.",
"retry_after": 10,
},
}
def build_error_response(
error_code: str, language: str = "ja", details: str = ""
) -> APIResponse:
"""Build a standardized error response."""
error_info = ERROR_CODES.get(error_code, ERROR_CODES["INTERNAL_ERROR"])
message_key = f"message_{language}"
message = error_info.get(message_key, error_info["message_en"])
return APIResponse(
success=False,
error={
"code": error_code,
"message": message,
"details": details,
"retryAfter": error_info["retry_after"],
},
metadata={
"timestamp": int(time.time()),
"statusCode": error_info["status"],
},
)
def build_chat_response(
text: str,
session_id: str,
input_tokens: int,
output_tokens: int,
model_used: str,
latency_ms: int,
) -> APIResponse:
"""Build a standardized chat response."""
return APIResponse(
success=True,
data={
"sessionId": session_id,
"text": text,
},
metadata={
"model": model_used,
"tokensUsed": {
"input": input_tokens,
"output": output_tokens,
},
"latencyMs": latency_ms,
"timestamp": int(time.time()),
},
)
Key Takeaways
| # | Takeaway | MangaAssist Application |
|---|---|---|
| 1 | Classify queries to route models — Simple greetings and lookups go to Haiku ($0.25/1M input); complex multi-step reasoning goes to Sonnet ($3/1M input). This 70/30 split saves ~67% vs all-Sonnet. | MangaAssist uses keyword + length heuristics to classify incoming Japanese text and route to the cheapest capable model. |
| 2 | Token budgets enforce at three layers — Pre-request estimation rejects over-budget inputs; in-flight monitoring stops runaway outputs; post-request reconciliation catches estimation drift. | Each MangaAssist user gets 500K input tokens/day (~$1.50 on Sonnet); exceeding triggers a friendly Japanese-language quota message. |
| 3 | Idempotency keys use content hashing — SHA-256 of session ID + message + 5-second time window creates natural dedup that handles retries without false positives. | WebSocket reconnects and double-taps do not generate duplicate Bedrock invocations or double-count token usage. |
| 4 | Circuit breakers with model fallback — When Sonnet fails 5 times in 30 seconds, the circuit opens and routes to Haiku automatically, then tests recovery via half-open state. | MangaAssist degrades gracefully: users get slightly less nuanced recommendations instead of errors. |
| 5 | Exponential backoff with jitter prevents thundering herds — Random jitter (±50%) on retry delays prevents synchronized retries from hundreds of concurrent requests. | At 50 RPS peak, a Bedrock throttling event does not cascade into a retry storm that compounds the throttling. |
| 6 | Conversation context uses a sliding token window — Rather than fixed turn counts, the session manager walks backward from the most recent turn, summing tokens until the budget is reached. | A conversation with short turns keeps more history; one with long manga descriptions naturally trims further back. |
| 7 | Bilingual error messages improve UX — Error codes map to both Japanese and English user-facing messages with appropriate retry-after guidance. | Japanese users see "応答に時間がかかっています" instead of cryptic HTTP error codes. |