Scenarios and Runbooks — FM API Interfaces
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Field | Value |
|---|---|
| Domain | 2 — Implementation & Integration |
| Task | 2.5 — Application Integration Patterns |
| Skill | 2.5.1 — FM API Interfaces |
| Focus | 29s API Gateway timeout, token quota exhausted, retry storm, session corruption, API version mismatch |
| MangaAssist Relevance | Production incident scenarios for the WebSocket + Bedrock streaming pipeline |
Mind Map
mindmap
root((FM API Scenarios & Runbooks))
Scenario 1
29s API Gateway Timeout
REST endpoint timeout
Long RAG retrieval
Cold start cascade
Scenario 2
Token Quota Exhausted
Burst traffic spike
Prompt injection attack
Missing budget enforcement
Scenario 3
Retry Storm
Bedrock throttling cascade
No backoff on client
Circuit breaker failure
Scenario 4
Session Corruption
Race condition on write
Stale Redis cache
DynamoDB conditional failure
Scenario 5
API Version Mismatch
Bedrock API change
Client SDK drift
Response format break
Scenario 1: 29-Second API Gateway Timeout
Problem
During peak manga release hours (Wednesday 00:00 JST, when new chapters drop), the REST /chat endpoint starts returning 504 Gateway Timeout errors. Users submitting complex recommendation queries via the REST fallback API (used when WebSocket connections fail) receive no response. CloudWatch shows API Gateway integration latency exceeding 29 seconds.
Detection
graph TB
subgraph Monitoring["Detection Chain"]
CW_ALARM[CloudWatch Alarm<br/>IntegrationLatency P95 > 20s]
API_METRIC[API Gateway Metrics<br/>5XX Error Rate > 5%]
X_RAY[X-Ray Trace<br/>Shows 29.0s segments]
USER_REPORT[User Reports<br/>チャットが応答しない]
end
subgraph Timeline["Incident Timeline"]
T0["T+0: Release hour traffic spike<br/>5x normal request rate"]
T1["T+2min: OpenSearch cold queries<br/>P95 latency 8s → 18s"]
T2["T+5min: Bedrock queue depth<br/>adds 10-15s"]
T3["T+7min: Combined latency 28s+<br/>API Gateway 504s begin"]
T4["T+10min: CW alarm fires<br/>PagerDuty notification"]
end
CW_ALARM --> T4
API_METRIC --> T3
X_RAY --> T2
USER_REPORT --> T3
T0 --> T1 --> T2 --> T3 --> T4
style T3 fill:#dc3545,color:#fff
style T4 fill:#ffc107,color:#000
Root Cause
The 29-second timeout is a hard limit on API Gateway REST APIs that cannot be changed. The incident chain:
- Wednesday release spike: 5x traffic at 00:00 JST when new manga chapters are published
- OpenSearch cold queries: Vector similarity searches against newly-indexed content miss the cache, taking 8-18 seconds instead of the normal 2-3 seconds
- Bedrock queuing: The Sonnet model experiences queuing under the traffic burst, adding 10-15 seconds
- Combined latency breaches 29 seconds: RAG (18s) + Bedrock (15s) = 33s, exceeding the hard limit
Resolution
"""
MangaAssist REST Timeout Prevention
Multi-pronged approach to stay under the 29-second API Gateway limit.
"""
import asyncio
import time
import logging
from typing import Optional
import boto3
logger = logging.getLogger(__name__)
# --- Fix 1: Async parallel execution for RAG + Bedrock prep ---
async def handle_chat_with_parallel_rag(
message: str, session_id: str, timeout: float = 25.0
) -> dict:
"""
Handle chat request with parallelized operations to stay under 29s.
Key insight: RAG retrieval and session loading happen in parallel,
not sequentially.
"""
start = time.time()
# Execute independent operations in parallel
session_task = asyncio.create_task(load_session(session_id))
rag_task = asyncio.create_task(retrieve_rag_context(message, timeout=8.0))
classification_task = asyncio.create_task(classify_query(message))
# Wait for all with a combined timeout
try:
session, rag_results, complexity = await asyncio.wait_for(
asyncio.gather(session_task, rag_task, classification_task),
timeout=10.0, # Hard cap on parallel phase
)
except asyncio.TimeoutError:
# Proceed without RAG context if retrieval is slow
logger.warning("Parallel phase timeout — proceeding without RAG")
session = await session_task if session_task.done() else None
rag_results = []
complexity = "simple"
# Bedrock call with remaining time budget
elapsed = time.time() - start
bedrock_timeout = min(25.0 - elapsed, 15.0) # Leave 4s buffer
if bedrock_timeout < 3.0:
# Not enough time for Bedrock — return cached/default response
return _fallback_response(message, session_id)
# Route to faster model if time is tight
model = "haiku" if bedrock_timeout < 8.0 or complexity == "simple" else "sonnet"
response = await invoke_bedrock_with_timeout(
message=message,
rag_context=rag_results,
session=session,
model=model,
timeout=bedrock_timeout,
)
return response
# --- Fix 2: Pre-warm OpenSearch indexes before release time ---
def prewarm_opensearch_for_release(release_titles: list[str]) -> None:
"""
Pre-warm OpenSearch vector cache 30 minutes before known release times.
Called by EventBridge scheduled rule at 23:30 JST every Wednesday.
"""
import opensearchpy
from sentence_transformers import SentenceTransformer
client = opensearchpy.OpenSearch(
hosts=[{"host": "manga-vectors.ap-northeast-1.aoss.amazonaws.com", "port": 443}],
use_ssl=True,
)
model = SentenceTransformer("intfloat/multilingual-e5-large")
# Generate queries users are likely to ask about new releases
prewarm_queries = []
for title in release_titles:
prewarm_queries.extend([
f"{title} 新刊",
f"{title} おすすめ",
f"{title} みたいなマンガ",
f"{title} 似ている作品",
])
for query in prewarm_queries:
embedding = model.encode(query).tolist()
client.search(
index="manga-vectors",
body={
"query": {
"knn": {
"embedding": {
"vector": embedding,
"k": 5,
}
}
},
"size": 5,
},
)
logger.info(f"Pre-warmed: {query}")
# --- Fix 3: Migrate REST to WebSocket for long requests ---
REST_TIMEOUT_THRESHOLD = 20 # seconds
def should_upgrade_to_websocket(estimated_latency: float) -> bool:
"""
Recommend WebSocket upgrade when estimated latency approaches REST limit.
Returns True if the client should reconnect via WebSocket.
"""
return estimated_latency > REST_TIMEOUT_THRESHOLD
# Helper stubs for the parallel execution example
async def load_session(session_id: str) -> dict:
"""Load session from Redis/DynamoDB."""
pass
async def retrieve_rag_context(message: str, timeout: float) -> list[str]:
"""Retrieve from OpenSearch with timeout."""
pass
async def classify_query(message: str) -> str:
"""Classify query complexity."""
pass
async def invoke_bedrock_with_timeout(
message: str, rag_context: list, session: dict,
model: str, timeout: float
) -> dict:
"""Invoke Bedrock with strict timeout."""
pass
def _fallback_response(message: str, session_id: str) -> dict:
"""Return a graceful fallback when time budget is exhausted."""
return {
"text": "申し訳ございません、現在アクセスが集中しております。"
"少々お待ちいただくか、もう一度お試しください。",
"fallback": True,
"sessionId": session_id,
}
Prevention
- Pre-warm OpenSearch indexes 30 minutes before known traffic spikes (Wednesday 23:30 JST via EventBridge rule)
- Parallelize independent operations — session load, RAG retrieval, and query classification execute concurrently
- Dynamic model downgrade — switch to Haiku when remaining time budget is tight
- Migrate REST callers to WebSocket — WebSocket API has no per-message timeout, only 10-minute idle timeout
- CloudWatch alarm on P95 integration latency > 20s to trigger proactive scaling
Scenario 2: Token Quota Exhausted
Problem
A MangaAssist power user (or a bot exploiting the API) exhausts their daily token quota within 2 hours, then receives cryptic 429 errors for the rest of the day. Customer support receives complaints: "チャットが使えなくなった" (the chat stopped working). The daily cost dashboard shows one user consuming $47 in a single day — 10x the expected per-user limit.
Detection
graph TB
subgraph Signals["Detection Signals"]
REDIS[Redis Quota Counter<br/>user:quota:2024-01-15<br/>input_tokens: 4,200,000]
CW_COST[CloudWatch Cost Metric<br/>Single user > $10/day]
RATE[Request Rate<br/>500 req/min from one user]
SUPPORT[Support Ticket<br/>チャットが使えない]
end
subgraph Analysis["Root Cause Analysis"]
A1[Automated script<br/>hitting API in loop]
A2[Prompt injection<br/>requesting max tokens]
A3[No output token cap<br/>on admin API key]
end
REDIS --> A1
CW_COST --> A2
RATE --> A3
SUPPORT --> A1
style CW_COST fill:#dc3545,color:#fff
style RATE fill:#ffc107,color:#000
Root Cause
Three contributing factors:
- No per-user rate limiting — The API only had global rate limits, not per-user
- Missing output token cap — Admin API keys bypassed the
max_tokensparameter, allowing 4096-token responses - No progressive warnings — Users hit a hard wall at 100% quota with no warning at 80% or 90%
Resolution
"""
MangaAssist Quota Management with Progressive Warnings and Rate Limiting
"""
import time
import logging
from dataclasses import dataclass
from typing import Optional, Tuple
logger = logging.getLogger(__name__)
@dataclass
class QuotaTier:
"""Configurable quota tier."""
name: str
daily_input_tokens: int
daily_output_tokens: int
daily_cost_limit_usd: float
max_requests_per_minute: int
max_tokens_per_request: int
# Warning thresholds (percentage of daily limit)
warn_at_80: bool = True
warn_at_90: bool = True
hard_limit_at_100: bool = True
# Default tiers
QUOTA_TIERS = {
"free": QuotaTier(
name="free",
daily_input_tokens=100_000,
daily_output_tokens=50_000,
daily_cost_limit_usd=1.00,
max_requests_per_minute=10,
max_tokens_per_request=512,
),
"standard": QuotaTier(
name="standard",
daily_input_tokens=500_000,
daily_output_tokens=200_000,
daily_cost_limit_usd=5.00,
max_requests_per_minute=30,
max_tokens_per_request=1024,
),
"premium": QuotaTier(
name="premium",
daily_input_tokens=2_000_000,
daily_output_tokens=1_000_000,
daily_cost_limit_usd=20.00,
max_requests_per_minute=60,
max_tokens_per_request=2048,
),
}
class QuotaManager:
"""
Per-user quota management with progressive warnings and rate limiting.
"""
def __init__(self, redis_client):
self.redis = redis_client
def check_and_enforce(
self, user_id: str, tier_name: str, estimated_tokens: int
) -> Tuple[bool, Optional[str], dict]:
"""
Check quota and rate limit.
Returns:
(allowed, warning_message, quota_status)
"""
tier = QUOTA_TIERS.get(tier_name, QUOTA_TIERS["free"])
# Check per-minute rate limit
rate_ok, rate_remaining = self._check_rate_limit(user_id, tier)
if not rate_ok:
return False, None, {
"blocked_reason": "rate_limit",
"retry_after_seconds": 60,
"message_ja": "リクエストが多すぎます。1分後にお試しください。",
}
# Check daily quota
today = time.strftime("%Y-%m-%d")
quota_key = f"quota:{user_id}:{today}"
usage = self.redis.hgetall(quota_key)
used_input = int(usage.get("input_tokens", 0))
used_output = int(usage.get("output_tokens", 0))
used_cost = float(usage.get("cost_usd", 0.0))
input_pct = (used_input / tier.daily_input_tokens) * 100
output_pct = (used_output / tier.daily_output_tokens) * 100
cost_pct = (used_cost / tier.daily_cost_limit_usd) * 100
max_pct = max(input_pct, output_pct, cost_pct)
status = {
"input_used": used_input,
"input_limit": tier.daily_input_tokens,
"input_pct": round(input_pct, 1),
"output_used": used_output,
"output_limit": tier.daily_output_tokens,
"output_pct": round(output_pct, 1),
"cost_used_usd": round(used_cost, 4),
"cost_limit_usd": tier.daily_cost_limit_usd,
"rate_remaining": rate_remaining,
}
# Hard limit
if max_pct >= 100:
return False, None, {
**status,
"blocked_reason": "daily_quota",
"message_ja": "本日のご利用上限に達しました。明日またお越しください。",
"message_en": "Daily quota reached. Please come back tomorrow.",
}
# Progressive warnings
warning = None
if tier.warn_at_90 and max_pct >= 90:
warning = (
f"本日の利用量が90%に達しました。残り約{int(tier.daily_input_tokens - used_input):,}トークンです。"
)
elif tier.warn_at_80 and max_pct >= 80:
warning = (
f"本日の利用量が80%を超えました。ご利用はあと{int(tier.daily_input_tokens - used_input):,}トークンまでです。"
)
return True, warning, status
def _check_rate_limit(
self, user_id: str, tier: QuotaTier
) -> Tuple[bool, int]:
"""
Sliding window rate limit per user.
Returns (allowed, remaining_requests).
"""
window_key = f"rate:{user_id}:{int(time.time()) // 60}"
pipe = self.redis.pipeline()
pipe.incr(window_key)
pipe.expire(window_key, 120) # Keep for 2 minutes
results = pipe.execute()
current_count = results[0]
remaining = max(0, tier.max_requests_per_minute - current_count)
return current_count <= tier.max_requests_per_minute, remaining
def record_usage(
self,
user_id: str,
input_tokens: int,
output_tokens: int,
cost_usd: float,
) -> None:
"""Record token usage after a request completes."""
today = time.strftime("%Y-%m-%d")
quota_key = f"quota:{user_id}:{today}"
pipe = self.redis.pipeline()
pipe.hincrby(quota_key, "input_tokens", input_tokens)
pipe.hincrby(quota_key, "output_tokens", output_tokens)
pipe.hincrbyfloat(quota_key, "cost_usd", cost_usd)
pipe.hincrby(quota_key, "request_count", 1)
pipe.expire(quota_key, 172800) # 48h expiry (covers timezone edge)
pipe.execute()
# Check if we should alert ops
used_cost = float(self.redis.hget(quota_key, "cost_usd") or 0)
if used_cost > 10.0:
logger.warning(
f"High-cost user detected: {user_id} spent ${used_cost:.2f} today"
)
Prevention
- Per-user rate limiting via Redis sliding window counters (10-60 req/min by tier)
- Progressive quota warnings at 80% and 90% thresholds sent as metadata in responses
- Hard output token cap on all API keys, including admin (max 2048 tokens)
- Anomaly detection alarm when single-user cost exceeds $10/day
- Graceful quota exhaustion messages in Japanese with clear "try again tomorrow" guidance
Scenario 3: Retry Storm
Problem
Amazon Bedrock experiences a 2-minute regional throttling event on Claude 3 Sonnet. MangaAssist's retry logic — running across 50 ECS Fargate tasks — all begin retrying simultaneously with identical exponential backoff timing. The retry storm amplifies the original throttling by 3x, extends the outage from 2 minutes to 15 minutes, and generates 450,000 failed requests.
Detection
graph TB
subgraph Metrics["Metric Cascade"]
THROTTLE[Bedrock ThrottlingException<br/>rate: 200/sec → 600/sec]
RETRY_COUNT[Retry Counter<br/>0 → 150,000 in 2 min]
CPU[ECS CPU Utilization<br/>30% → 85%]
QUEUE[Request Queue Depth<br/>0 → 50,000]
TIMEOUT[Client Timeout Rate<br/>0.1% → 45%]
end
subgraph Timeline["Storm Timeline"]
T0["T+0: Bedrock throttling begins<br/>ThrottlingException 200/sec"]
T1["T+30s: All 50 tasks retry simultaneously<br/>synchronized at 1s backoff"]
T2["T+60s: Second retry wave<br/>2s backoff, still synchronized"]
T3["T+120s: Bedrock recovers, but<br/>retry queue overwhelms again"]
T4["T+900s: Queue finally drains<br/>normal operation resumes"]
end
THROTTLE --> T0
RETRY_COUNT --> T1
CPU --> T2
QUEUE --> T3
TIMEOUT --> T4
T0 --> T1 --> T2 --> T3 --> T4
style T1 fill:#dc3545,color:#fff
style T2 fill:#dc3545,color:#fff
style T3 fill:#ffc107,color:#000
Root Cause
The retry implementation had three critical flaws:
- No jitter on backoff delays — All 50 tasks calculated identical retry times (1s, 2s, 4s, 8s), creating synchronized retry waves
- No circuit breaker — Even after 100+ failures, each task kept retrying individually
- Client-side retries compounded server-side retries — The WebSocket client had its own retry logic, doubling the request volume
Resolution
"""
MangaAssist Anti-Retry-Storm Implementation
Coordinated retry with jitter, circuit breaker, and client backpressure.
"""
import time
import random
import logging
from typing import Optional
logger = logging.getLogger(__name__)
class CoordinatedRetryManager:
"""
Retry manager that prevents retry storms through:
1. Full jitter on backoff delays
2. Global circuit breaker (shared via Redis)
3. Client backpressure signaling
4. Retry budget per time window
"""
def __init__(self, redis_client):
self.redis = redis_client
def should_retry(
self, model_id: str, attempt: int, max_attempts: int = 3
) -> tuple[bool, float]:
"""
Determine if a retry should be attempted.
Returns (should_retry, delay_seconds).
Uses GLOBAL retry budget to prevent storm across all tasks.
"""
if attempt >= max_attempts:
return False, 0.0
# Check global circuit breaker
circuit_state = self._get_global_circuit(model_id)
if circuit_state == "open":
logger.warning(f"Global circuit OPEN for {model_id} — no retry")
return False, 0.0
# Check global retry budget (max 100 retries per 10-second window)
window = int(time.time()) // 10
budget_key = f"retry_budget:{model_id}:{window}"
current = self.redis.incr(budget_key)
self.redis.expire(budget_key, 30)
if current > 100: # Global retry budget exhausted
logger.warning(
f"Global retry budget exhausted for {model_id} "
f"({current} retries in window)"
)
return False, 0.0
# Full jitter: delay = random(0, base * 2^attempt)
# This ensures NO two tasks retry at the same time
max_delay = min(0.5 * (2 ** attempt), 16.0)
delay = random.uniform(0.1, max_delay)
return True, delay
def record_failure(self, model_id: str) -> None:
"""Record a failure and potentially open circuit."""
window = int(time.time()) // 30 # 30-second failure window
fail_key = f"failures:{model_id}:{window}"
pipe = self.redis.pipeline()
pipe.incr(fail_key)
pipe.expire(fail_key, 60)
results = pipe.execute()
failure_count = results[0]
if failure_count >= 50: # 50 failures in 30s = open circuit
self._open_global_circuit(model_id)
def record_success(self, model_id: str) -> None:
"""Record success — may close circuit if in half-open."""
circuit_key = f"circuit_global:{model_id}"
state = self.redis.hget(circuit_key, "state")
if state == "half_open":
successes = self.redis.hincrby(circuit_key, "half_open_success", 1)
if successes >= 3:
self.redis.delete(circuit_key)
logger.info(f"Global circuit CLOSED for {model_id}")
def _get_global_circuit(self, model_id: str) -> str:
"""Get global circuit breaker state."""
circuit_key = f"circuit_global:{model_id}"
data = self.redis.hgetall(circuit_key)
if not data:
return "closed"
state = data.get("state", "closed")
opened_at = float(data.get("opened_at", 0))
if state == "open" and time.time() - opened_at > 30:
# Transition to half-open after 30 seconds
self.redis.hset(circuit_key, "state", "half_open")
return "half_open"
return state
def _open_global_circuit(self, model_id: str) -> None:
"""Open the global circuit breaker."""
circuit_key = f"circuit_global:{model_id}"
self.redis.hset(circuit_key, mapping={
"state": "open",
"opened_at": str(time.time()),
})
self.redis.expire(circuit_key, 120)
logger.critical(
f"GLOBAL CIRCUIT OPEN for {model_id} — all retries halted"
)
def get_client_backpressure_signal(self, model_id: str) -> dict:
"""
Generate backpressure signal for WebSocket clients.
Tells clients to stop retrying and wait.
"""
circuit_state = self._get_global_circuit(model_id)
if circuit_state == "open":
return {
"type": "backpressure",
"action": "pause",
"retryAfter": 30,
"message_ja": "サーバーが混雑しています。30秒後に自動的に再接続します。",
}
elif circuit_state == "half_open":
return {
"type": "backpressure",
"action": "slow",
"retryAfter": 5,
"message_ja": "接続を回復中です。しばらくお待ちください。",
}
return {"type": "backpressure", "action": "none"}
Prevention
- Full jitter on backoff —
random(0, base * 2^attempt)ensures zero synchronization across tasks - Global circuit breaker in Redis — shared across all 50 ECS tasks, not per-task
- Global retry budget — max 100 retries per model per 10-second window across the fleet
- Client backpressure signals — WebSocket sends
pausemessages telling clients to stop retrying for 30 seconds - Separate Bedrock throughput provisioning for peak hours (Wednesday 00:00 JST)
Scenario 4: Session Corruption
Problem
Users report garbled conversations: the assistant responds to questions they never asked, or previous conversation context is missing. Investigation reveals that when a user switches from mobile to desktop, both devices briefly hold active WebSocket connections to the same session. Concurrent writes to DynamoDB create a last-writer-wins race condition, and the Redis cache serves stale data.
Detection
graph TB
subgraph Symptoms["User-Reported Symptoms"]
S1["Assistant answers wrong question"]
S2["Conversation history missing turns"]
S3["Duplicate messages in history"]
S4["Session suddenly resets"]
end
subgraph Investigation["Investigation Signals"]
DDB_WRITES[DynamoDB Write Metrics<br/>ConditionalCheckFailedRequests > 0]
REDIS_STALE[Redis TTL Check<br/>Cache age > DDB lastModified]
CONN_COUNT[Connection Count<br/>2+ active connections per session]
XRAY_RACE[X-Ray Traces<br/>Overlapping PutItem calls]
end
S1 --> REDIS_STALE
S2 --> DDB_WRITES
S3 --> CONN_COUNT
S4 --> XRAY_RACE
style S1 fill:#dc3545,color:#fff
style CONN_COUNT fill:#ffc107,color:#000
Root Cause
- No optimistic concurrency on DynamoDB session writes —
PutItemoverwrites without version checking - Redis cache not invalidated on DynamoDB updates from a different connection
- No single-writer enforcement — multiple connections writing to the same session simultaneously
Resolution
"""
MangaAssist Session Corruption Fix
Optimistic concurrency control + single-writer enforcement.
"""
import time
import json
import logging
from typing import Optional
import boto3
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
dynamodb = boto3.resource("dynamodb")
sessions_table = dynamodb.Table("MangaAssist-Sessions")
class ConcurrentSessionManager:
"""
Session manager with optimistic concurrency control.
Uses DynamoDB version numbers and Redis cache invalidation.
"""
def __init__(self, redis_client):
self.redis = redis_client
def acquire_write_lock(
self, session_id: str, connection_id: str, ttl: int = 10
) -> bool:
"""
Acquire exclusive write lock for a session.
Only one connection can write at a time.
Uses Redis SET NX (set-if-not-exists).
"""
lock_key = f"session_lock:{session_id}"
acquired = self.redis.set(
lock_key, connection_id, nx=True, ex=ttl
)
if not acquired:
# Check if WE already hold the lock
current_holder = self.redis.get(lock_key)
if current_holder == connection_id:
self.redis.expire(lock_key, ttl)
return True
logger.warning(
f"Session {session_id} write lock held by {current_holder}, "
f"requested by {connection_id}"
)
return False
return True
def release_write_lock(self, session_id: str, connection_id: str) -> None:
"""Release write lock only if we hold it."""
lock_key = f"session_lock:{session_id}"
current = self.redis.get(lock_key)
if current == connection_id:
self.redis.delete(lock_key)
def update_session_safe(
self,
session_id: str,
connection_id: str,
conversation_history: list[dict],
expected_version: int,
) -> tuple[bool, int]:
"""
Update session with optimistic concurrency control.
Uses DynamoDB ConditionExpression to ensure version matches.
Returns (success, new_version).
"""
# Try to acquire write lock
if not self.acquire_write_lock(session_id, connection_id):
return False, expected_version
try:
new_version = expected_version + 1
sessions_table.update_item(
Key={"sessionId": session_id},
UpdateExpression=(
"SET conversationHistory = :history, "
"lastActivity = :ts, "
"activeConnectionId = :connId, "
"#ver = :new_ver"
),
ConditionExpression="#ver = :expected_ver",
ExpressionAttributeNames={"#ver": "version"},
ExpressionAttributeValues={
":history": json.dumps(conversation_history, ensure_ascii=False),
":ts": int(time.time()),
":connId": connection_id,
":new_ver": new_version,
":expected_ver": expected_version,
},
)
# Invalidate Redis cache so other connections fetch fresh data
self._invalidate_cache(session_id)
# Update cache with new data
self._update_cache(
session_id, conversation_history, new_version
)
return True, new_version
except ClientError as e:
if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
logger.warning(
f"Version conflict on session {session_id}: "
f"expected version {expected_version}"
)
# Reload from DynamoDB to get current version
return False, self._get_current_version(session_id)
raise
finally:
self.release_write_lock(session_id, connection_id)
def handle_device_switch(
self, session_id: str, old_connection_id: str, new_connection_id: str
) -> dict:
"""
Handle graceful device switch.
Notifies old connection and transfers session ownership.
"""
# Force-release lock from old connection
lock_key = f"session_lock:{session_id}"
current_holder = self.redis.get(lock_key)
if current_holder == old_connection_id:
self.redis.delete(lock_key)
# Update active connection in DynamoDB
try:
sessions_table.update_item(
Key={"sessionId": session_id},
UpdateExpression=(
"SET activeConnectionId = :new_conn, "
"lastDeviceSwitch = :ts"
),
ExpressionAttributeValues={
":new_conn": new_connection_id,
":ts": int(time.time()),
},
)
except ClientError as e:
logger.error(f"Device switch failed: {e}")
raise
# Invalidate cache to force fresh load
self._invalidate_cache(session_id)
return {
"status": "transferred",
"sessionId": session_id,
"newConnectionId": new_connection_id,
}
def _invalidate_cache(self, session_id: str) -> None:
"""Remove stale cache entry."""
self.redis.delete(f"session:{session_id}")
def _update_cache(
self, session_id: str, history: list[dict], version: int
) -> None:
"""Update Redis cache with versioned data."""
data = {
"conversationHistory": history,
"version": version,
"cachedAt": time.time(),
}
self.redis.setex(
f"session:{session_id}",
1800,
json.dumps(data, ensure_ascii=False),
)
def _get_current_version(self, session_id: str) -> int:
"""Get current version from DynamoDB."""
response = sessions_table.get_item(
Key={"sessionId": session_id},
ProjectionExpression="#ver",
ExpressionAttributeNames={"#ver": "version"},
)
return int(response.get("Item", {}).get("version", 0))
Prevention
- Optimistic concurrency with DynamoDB
ConditionExpressionon a version counter - Distributed write locks via Redis
SET NXto enforce single-writer per session - Cache invalidation on every write — delete Redis cache before updating, then repopulate
- Graceful device switch protocol — old connection receives a "session transferred" message and stops writing
- Connection count monitoring — CloudWatch alarm when a session has 2+ concurrent connections
Scenario 5: API Version Mismatch
Problem
After a Bedrock service update, Claude 3 Sonnet responses start including a new usage field structure in the message_delta event. The MangaAssist stream parser expects usage.output_tokens as an integer but receives a nested object usage.output_tokens.total. All token counting breaks, quotas are not enforced, and the cost dashboard shows zero usage while actual costs accumulate.
Detection
graph TB
subgraph Symptoms["Visible Symptoms"]
S1["CloudWatch: token_count metric = 0<br/>for all requests"]
S2["Quota system: no users blocked<br/>despite heavy traffic"]
S3["Cost dashboard: $0.00 daily<br/>but AWS bill shows $2,400"]
S4["Logs: KeyError 'output_tokens'<br/>in stream parser"]
end
subgraph Root["Root Cause Chain"]
R1["Bedrock API update<br/>Changed usage response format"]
R2["Stream parser expects int<br/>Gets nested object"]
R3["Exception caught silently<br/>Defaults to 0 tokens"]
R4["All downstream tracking<br/>reports zero usage"]
end
S1 --> R3
S2 --> R4
S3 --> R4
S4 --> R2
R1 --> R2 --> R3 --> R4
style S3 fill:#dc3545,color:#fff
style R1 fill:#ffc107,color:#000
Root Cause
- Brittle response parsing — The stream parser accessed
chunk["usage"]["output_tokens"]directly without defensive type checking - Silent exception handling —
except Exception: passin the token counting code masked the KeyError - No integration test against actual Bedrock responses — unit tests used mocked fixtures that did not reflect the API change
- No schema validation on Bedrock response events
Resolution
"""
MangaAssist Resilient Bedrock Response Parser
Handles API version changes gracefully with defensive parsing.
"""
import json
import logging
from typing import Optional, Any
logger = logging.getLogger(__name__)
class BedrockResponseParser:
"""
Resilient parser for Bedrock streaming response events.
Handles schema changes gracefully with fallback extraction.
"""
# Known response format versions
KNOWN_FORMATS = {"bedrock-2023-05-31", "bedrock-2024-01-01"}
def extract_usage(self, chunk_data: dict) -> dict:
"""
Extract token usage from a message_delta or message_start event.
Handles multiple API response formats defensively.
"""
usage = chunk_data.get("usage", {})
if not usage:
# Try alternate locations
usage = chunk_data.get("message", {}).get("usage", {})
input_tokens = self._extract_token_count(usage, "input_tokens")
output_tokens = self._extract_token_count(usage, "output_tokens")
# Validate: tokens should be non-negative integers
if input_tokens < 0 or output_tokens < 0:
logger.error(
f"Invalid token counts: input={input_tokens}, output={output_tokens}"
)
input_tokens = max(0, input_tokens)
output_tokens = max(0, output_tokens)
# Anomaly detection: flag suspiciously high or zero counts
if output_tokens == 0 and chunk_data.get("type") == "message_delta":
logger.warning(
"Zero output tokens in message_delta — possible schema change. "
f"Raw usage: {json.dumps(usage)}"
)
return {
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"raw_usage": usage, # Preserve raw data for debugging
}
def _extract_token_count(self, usage: dict, field: str) -> int:
"""
Extract token count from usage dict, handling multiple formats.
Supported formats:
- {"output_tokens": 342} (original)
- {"output_tokens": {"total": 342}} (nested)
- {"output_tokens": {"value": 342}} (alternate nested)
- {"outputTokens": 342} (camelCase variant)
"""
value = usage.get(field)
if value is None:
# Try camelCase variant
camel = self._to_camel_case(field)
value = usage.get(camel)
if value is None:
return 0
# Direct integer
if isinstance(value, (int, float)):
return int(value)
# Nested object with "total" or "value"
if isinstance(value, dict):
for key in ("total", "value", "count"):
if key in value:
return int(value[key])
# Unknown nested format — log and return 0
logger.error(
f"Unknown nested format for {field}: {json.dumps(value)}"
)
return 0
# String that might be numeric
if isinstance(value, str):
try:
return int(value)
except ValueError:
logger.error(f"Non-numeric string for {field}: {value}")
return 0
logger.error(f"Unexpected type for {field}: {type(value).__name__}")
return 0
def _to_camel_case(self, snake_str: str) -> str:
"""Convert snake_case to camelCase."""
parts = snake_str.split("_")
return parts[0] + "".join(p.capitalize() for p in parts[1:])
def validate_stream_event(self, event_data: dict) -> bool:
"""
Validate a stream event against expected schema.
Returns True if the event is recognized, False if unknown.
"""
known_types = {
"message_start", "content_block_start", "content_block_delta",
"content_block_stop", "message_delta", "message_stop",
}
event_type = event_data.get("type")
if event_type not in known_types:
logger.warning(
f"Unknown stream event type: {event_type}. "
f"Keys: {list(event_data.keys())}"
)
return False
return True
def parse_stream_event(self, raw_bytes: bytes) -> Optional[dict]:
"""
Parse a raw stream event with error handling.
Returns parsed dict or None if unparseable.
"""
try:
data = json.loads(raw_bytes)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse stream event: {e}")
return None
if not self.validate_stream_event(data):
# Still return it — caller may know how to handle new types
pass
return data
def setup_integration_health_check(redis_client) -> None:
"""
Periodic health check that validates Bedrock response format.
Run every 5 minutes via ECS task scheduler.
"""
import boto3
from botocore.config import Config
bedrock = boto3.client(
"bedrock-runtime",
config=Config(region_name="ap-northeast-1"),
)
test_body = json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 10,
"messages": [{"role": "user", "content": [{"type": "text", "text": "Hi"}]}],
})
try:
response = bedrock.invoke_model_with_response_stream(
modelId="anthropic.claude-3-haiku-20240307-v1:0",
contentType="application/json",
accept="application/json",
body=test_body,
)
parser = BedrockResponseParser()
found_usage = False
for event in response["body"]:
chunk = event.get("chunk")
if not chunk:
continue
data = json.loads(chunk["bytes"])
if data.get("type") == "message_delta":
usage = parser.extract_usage(data)
if usage["output_tokens"] > 0:
found_usage = True
else:
# Alert: possible schema change
logger.critical(
"HEALTH CHECK FAILED: message_delta returned 0 output tokens. "
f"Raw: {json.dumps(data)}"
)
redis_client.set("bedrock_schema_alert", "true", ex=3600)
if found_usage:
redis_client.set("bedrock_health", "ok", ex=600)
logger.info("Bedrock integration health check: OK")
else:
redis_client.set("bedrock_health", "degraded", ex=600)
logger.warning("Bedrock integration health check: DEGRADED")
except Exception as e:
redis_client.set("bedrock_health", "error", ex=600)
logger.error(f"Bedrock health check failed: {e}")
Prevention
- Defensive parsing — Never assume field types; check for int, dict, and string variants
- Never silently swallow exceptions in token counting — log at ERROR level and alert
- Integration health check every 5 minutes that validates actual Bedrock response format
- Schema change alerts — CloudWatch alarm when the health check detects unexpected formats
- Raw response logging (sampled) — Store 1% of raw Bedrock responses in S3 for forensic analysis
- Pin the
anthropic_versionheader in all Bedrock requests to maintain API compatibility
Key Takeaways
| # | Takeaway | MangaAssist Application |
|---|---|---|
| 1 | The 29-second REST timeout is architectural — Design around it with WebSocket for streaming and parallel operations to compress latency within the limit. | MangaAssist uses WebSocket for all user-facing chat; REST is only for health checks and admin APIs. |
| 2 | Token quotas need progressive warnings — Hard walls with no warning create terrible UX. Warn at 80% and 90% so users can self-regulate. | Japanese-language quota warnings are sent as metadata in chat responses: "本日の利用量が90%に達しました". |
| 3 | Retry storms are worse than the original failure — Synchronized retries across a fleet amplify throttling. Full jitter + global retry budgets are essential. | Global Redis-based circuit breaker and retry budget prevent MangaAssist's 50 ECS tasks from creating synchronized retry waves. |
| 4 | Session writes need optimistic concurrency — Multi-device users create race conditions that corrupt conversation history. DynamoDB conditional writes + version counters fix this. | Device switch protocol notifies old connections and transfers write ownership before the new device starts writing. |
| 5 | Never trust API response schemas to be stable — Parse defensively, validate types, and run integration health checks against live APIs every few minutes. | Bedrock response parser handles int, nested dict, and string formats for token counts; health check runs every 5 minutes. |