Scenarios and Runbooks — Accessible AI Interfaces
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Field | Value |
|---|---|
| Domain | 2 — Implementation & Integration |
| Task | 2.5 — Application Integration Patterns |
| Skill | 2.5.2 — Accessible AI Interfaces |
| Focus | Amplify WebSocket failing on mobile, OpenAPI spec drift, Prompt Flow timeout, stale cached response, unexpected prompt chains |
| MangaAssist Relevance | Production incidents in the frontend, API contract, and prompt flow layers |
Mind Map
mindmap
root((Accessible AI Scenarios & Runbooks))
Scenario 1
Amplify WebSocket Failing on Mobile
iOS Safari WebSocket bugs
Carrier NAT timeout
Service worker interference
Scenario 2
OpenAPI Spec Drift
Spec vs implementation mismatch
Missing new fields
Breaking partner SDKs
Scenario 3
Prompt Flow Timeout
Complex flow chain
Lambda cold start in flow
Bedrock queue depth
Scenario 4
Stale Cached Response
Redis serving old data
CDN caching API responses
Cache invalidation failure
Scenario 5
Unexpected Prompt Chains
Recursive flow invocation
Condition node infinite loop
Token budget explosion
Scenario 1: Amplify WebSocket Failing on Mobile
Problem
MangaAssist mobile users on Japanese carriers (NTT Docomo, au, SoftBank) report intermittent chat failures. The chat widget shows "接続中..." (Connecting...) indefinitely, then falls back to "接続できませんでした" (Connection failed). The issue affects roughly 15% of mobile sessions, primarily on iOS Safari. Desktop Chrome and Firefox work fine.
Detection
graph TB
subgraph Signals["Detection Signals"]
ANALYTICS[Amplify Analytics<br/>Mobile connection success: 85%<br/>Desktop connection success: 99.5%]
CW_WS[API Gateway WebSocket<br/>ConnectCount stable<br/>DisconnectCount spike on mobile]
SUPPORT[Support Tickets<br/>iOS users: 接続できない]
RUM[Real User Monitoring<br/>iOS Safari TTFB: 8.2s avg<br/>Chrome mobile: 1.1s avg]
end
subgraph RootCause["Root Cause Analysis"]
RC1["iOS Safari WebSocket<br/>idle timeout: 60s vs 10min"]
RC2["Carrier NAT tables<br/>drop idle TCP after 30s"]
RC3["Service worker<br/>intercepting WS upgrade"]
RC4["Mixed content<br/>wss:// blocked by CSP"]
end
ANALYTICS --> RC1
CW_WS --> RC2
SUPPORT --> RC3
RUM --> RC4
style ANALYTICS fill:#dc3545,color:#fff
style RC2 fill:#ffc107,color:#000
Root Cause
Three compounding issues specific to mobile networks in Japan:
- Japanese carrier NAT timeout — NTT Docomo and au aggressively recycle NAT table entries, dropping idle TCP connections after 30 seconds (vs. typical 5-minute timeout on WiFi)
- iOS Safari WebSocket handling — Safari on iOS has stricter WebSocket idle detection and terminates connections that do not receive data frames within 60 seconds
- Service worker interference — The Amplify PWA service worker was intercepting the WebSocket upgrade request, causing it to fail silently on some iOS versions
Resolution
"""
MangaAssist Mobile WebSocket Fix
Addresses carrier NAT timeout, iOS Safari quirks, and service worker interference.
"""
import json
import time
import logging
from dataclasses import dataclass
from typing import Optional
logger = logging.getLogger(__name__)
@dataclass
class MobileWebSocketConfig:
"""Optimized WebSocket config for Japanese mobile carriers."""
# Ping interval shorter than carrier NAT timeout (30s)
ping_interval_seconds: int = 20
# Pong timeout — if no pong in 5s, assume connection dead
pong_timeout_seconds: int = 5
# Reconnection settings
reconnect_base_delay: float = 0.5
reconnect_max_delay: float = 10.0
max_reconnect_attempts: int = 10
# iOS Safari specific
force_close_on_background: bool = True
reconnect_on_foreground: bool = True
# Disable service worker for WebSocket paths
service_worker_ws_bypass: bool = True
class MobileOptimizedWebSocket:
"""
WebSocket client optimized for Japanese mobile carriers.
Addresses NAT timeout, iOS Safari, and service worker issues.
"""
def __init__(self, config: MobileWebSocketConfig):
self.config = config
self._last_pong = 0
self._ping_task = None
self._visibility_listener = None
def get_connection_params(self) -> dict:
"""
Generate WebSocket connection parameters optimized for mobile.
"""
return {
# Use binary frames for ping/pong (more reliable on mobile)
"ping_type": "application", # Application-level ping, not WebSocket frame
"ping_interval": self.config.ping_interval_seconds,
"protocols": ["manga-assist-v2"],
"headers": {
"X-Client-Type": "mobile",
"X-Carrier-Hint": "jp-mobile",
},
}
def create_ping_handler(self) -> dict:
"""
Application-level ping handler.
Uses JSON messages rather than WebSocket ping frames
because some Japanese carrier proxies strip WS control frames.
"""
return {
"type": "ping_config",
"interval_ms": self.config.ping_interval_seconds * 1000,
"timeout_ms": self.config.pong_timeout_seconds * 1000,
"message": json.dumps({"action": "ping", "ts": int(time.time())}),
"expected_response": "pong",
"on_timeout": "reconnect",
}
def get_service_worker_config(self) -> dict:
"""
Service worker configuration that bypasses WebSocket URLs.
Prevents the SW from intercepting WS upgrade requests.
"""
return {
"navigationPreload": True,
"bypassPatterns": [
# Never intercept WebSocket connections
r"wss?://.*manga-assist\.example\.com.*",
# Never intercept API Gateway Management API callbacks
r"https://.*execute-api.*amazonaws\.com.*",
],
"cachingStrategy": {
"/": "network-first",
"/static/*": "cache-first",
"/api/*": "network-only",
},
}
def get_visibility_handler_config(self) -> dict:
"""
Page visibility change handler for iOS Safari.
iOS aggressively suspends background tabs, killing WS connections.
We proactively close and reconnect on visibility change.
"""
return {
"on_hidden": {
"action": "close_gracefully" if self.config.force_close_on_background else "keep_alive",
"save_state": True,
"reason": "Proactive close to avoid zombie connections",
},
"on_visible": {
"action": "reconnect" if self.config.reconnect_on_foreground else "check_connection",
"delay_ms": 100,
"restore_state": True,
},
}
def generate_lambda_authorizer_mobile_aware() -> str:
"""
Lambda authorizer that recognizes mobile clients
and adjusts connection TTL accordingly.
"""
return '''
import json
def handler(event, context):
"""
Mobile-aware WebSocket authorizer.
Sets shorter connection idle timeout for mobile clients.
"""
headers = event.get("headers", {})
client_type = headers.get("x-client-type", "desktop")
query = event.get("queryStringParameters", {}) or {}
token = query.get("token", "")
# Validate token (simplified)
if not token:
return generate_policy("anonymous", "Deny", event)
# Set context for downstream handlers
context_data = {
"clientType": client_type,
"idleTimeoutMs": "30000" if client_type == "mobile" else "600000",
"pingRequired": "true" if client_type == "mobile" else "false",
}
policy = generate_policy("user123", "Allow", event)
policy["context"] = context_data
return policy
def generate_policy(principal_id, effect, event):
return {
"principalId": principal_id,
"policyDocument": {
"Version": "2012-10-17",
"Statement": [{
"Action": "execute-api:Invoke",
"Effect": effect,
"Resource": event["methodArn"],
}],
},
}
'''
Prevention
- Application-level pings every 20 seconds — Shorter than the 30-second carrier NAT timeout
- Service worker bypass for all WebSocket URLs — prevents SW from intercepting upgrade requests
- Proactive close/reconnect on page visibility change — avoids zombie connections when iOS suspends the tab
- Mobile-specific CloudWatch dashboard with carrier-level success rate breakdowns
- Real User Monitoring (RUM) via CloudWatch RUM to track actual connection success rates by device/carrier
Scenario 2: OpenAPI Spec Drift
Problem
A partner integrating with MangaAssist's REST API reports that their auto-generated TypeScript SDK is throwing type errors. The /chat response now includes a metadata.quotaRemaining field that was added by the backend team but never reflected in the OpenAPI spec. The partner's strict TypeScript types reject the response as invalid.
Detection
graph TB
subgraph Signals["Detection Signals"]
PARTNER[Partner Bug Report<br/>TypeScript SDK type error]
SCHEMATHESIS[Contract Test<br/>Schemathesis found<br/>undocumented fields]
DIFF[API Diff Check<br/>Response body mismatch]
REVIEW[Code Review Gap<br/>Backend PR merged<br/>without spec update]
end
subgraph Drift["Drift Analysis"]
D1["New field: metadata.quotaRemaining<br/>Added in backend code"]
D2["OpenAPI spec: not updated<br/>Schema missing field"]
D3["SDK generator: stale types<br/>Partner gets TypeScript errors"]
D4["API Gateway: no validation<br/>Pass-through mode"]
end
PARTNER --> D3
SCHEMATHESIS --> D1
DIFF --> D2
REVIEW --> D4
style PARTNER fill:#dc3545,color:#fff
style REVIEW fill:#ffc107,color:#000
Root Cause
- No spec-code synchronization check in the CI pipeline — backend changes could be merged without updating the OpenAPI spec
- API Gateway in pass-through mode — no response validation against the spec
- No automated spec-drift detection — Schemathesis contract tests were only run manually
Resolution
"""
MangaAssist OpenAPI Spec Drift Prevention
Automated pipeline to detect and prevent API spec drift.
"""
import json
import subprocess
import sys
import logging
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
class SpecDriftDetector:
"""
Detects drift between OpenAPI spec and actual API responses.
Runs as a CI step and a periodic production check.
"""
def __init__(self, spec_path: str, api_base_url: str):
self.spec_path = spec_path
self.api_base_url = api_base_url
def run_ci_checks(self) -> dict:
"""
CI pipeline checks to prevent spec drift.
Returns dict with check results.
"""
results = {}
# Check 1: Validate spec is valid OpenAPI
results["spec_valid"] = self._validate_spec()
# Check 2: Check for breaking changes vs previous version
results["breaking_changes"] = self._check_breaking_changes()
# Check 3: Verify all API routes in code have spec entries
results["route_coverage"] = self._check_route_coverage()
# Check 4: Run contract tests against staging
results["contract_tests"] = self._run_contract_tests()
results["all_passed"] = all(
r.get("passed", False) for r in results.values()
)
return results
def _validate_spec(self) -> dict:
"""Validate OpenAPI spec with spectral."""
result = subprocess.run(
[
"npx", "@stoplight/spectral-cli", "lint",
self.spec_path,
"--ruleset", ".spectral.yml",
],
capture_output=True, text=True,
)
return {
"passed": result.returncode == 0,
"output": result.stdout,
"errors": result.stderr,
}
def _check_breaking_changes(self) -> dict:
"""Check for breaking changes using oasdiff."""
previous = self.spec_path.replace(".yaml", ".previous.yaml")
if not Path(previous).exists():
return {"passed": True, "message": "No previous spec to compare"}
result = subprocess.run(
[
"oasdiff", "breaking",
"--base", previous,
"--revision", self.spec_path,
"--format", "json",
"--fail-on", "ERR",
],
capture_output=True, text=True,
)
return {
"passed": result.returncode == 0,
"output": result.stdout,
}
def _check_route_coverage(self) -> dict:
"""
Verify all Flask/FastAPI routes have corresponding OpenAPI paths.
Scans source code for route decorators and cross-references spec.
"""
import yaml
with open(self.spec_path) as f:
spec = yaml.safe_load(f)
spec_paths = set(spec.get("paths", {}).keys())
# Scan source for route definitions (FastAPI pattern)
source_routes = set()
for py_file in Path("src").rglob("*.py"):
content = py_file.read_text()
import re
for match in re.finditer(
r'@(?:app|router)\.(get|post|put|delete)\(\s*["\']([^"\']+)',
content,
):
route = match.group(2)
source_routes.add(route)
undocumented = source_routes - spec_paths
orphaned = spec_paths - source_routes
return {
"passed": len(undocumented) == 0,
"undocumented_routes": list(undocumented),
"orphaned_spec_paths": list(orphaned),
}
def _run_contract_tests(self) -> dict:
"""Run Schemathesis contract tests against staging."""
result = subprocess.run(
[
"schemathesis", "run",
self.spec_path,
"--base-url", self.api_base_url,
"--checks", "all",
"--max-examples", "20",
"--hypothesis-suppress-health-check", "too_slow",
],
capture_output=True, text=True, timeout=180,
)
return {
"passed": result.returncode == 0,
"output": result.stdout,
}
def create_git_hook_script() -> str:
"""
Git pre-commit hook that blocks commits changing API routes
without corresponding OpenAPI spec updates.
"""
return """#!/bin/bash
# Pre-commit hook: Check for API spec drift
SPEC_FILE="api/manga-assist-api.yaml"
API_FILES=$(git diff --cached --name-only | grep -E "^src/.*routes.*\\.py$")
if [ -n "$API_FILES" ]; then
SPEC_CHANGED=$(git diff --cached --name-only | grep "$SPEC_FILE")
if [ -z "$SPEC_CHANGED" ]; then
echo "ERROR: API route files changed but OpenAPI spec was not updated."
echo ""
echo "Changed route files:"
echo "$API_FILES"
echo ""
echo "Please update $SPEC_FILE to match your API changes."
echo "Run 'npm run spec:validate' to check the spec."
exit 1
fi
fi
exit 0
"""
Prevention
- Pre-commit hook blocks route file changes without corresponding spec updates
- CI pipeline runs Spectral validation, oasdiff breaking-change detection, route coverage, and Schemathesis contract tests
- API Gateway response validation enabled (catches undocumented fields)
- Weekly automated spec-drift scan against production API
- Partner SDK auto-rebuild triggered by spec changes, with breaking-change notifications
Scenario 3: Prompt Flow Timeout
Problem
The MangaAssist recommendation Prompt Flow starts timing out for queries involving the "Isekai" genre. The flow has a two-node chain (classification then recommendation), and the total execution time exceeds the 30-second Bedrock Agent timeout. Users see: "申し訳ございません、推薦の生成に失敗しました" (Sorry, recommendation generation failed).
Detection
graph TB
subgraph Signals["Detection Signals"]
FLOW_METRIC[Prompt Flow Metrics<br/>Isekai queries: 95% timeout<br/>Other genres: 2% timeout]
BEDROCK_LATENCY[Bedrock Latency<br/>Sonnet P95: 18s<br/>Normal P95: 6s]
RAG_SIZE[RAG Context Size<br/>Isekai: 4,200 tokens avg<br/>Other: 1,800 tokens avg]
FLOW_LOGS[Flow Execution Logs<br/>ClassifyIntent: 3s<br/>GenerateRec: timeout at 27s]
end
subgraph RootCause["Root Cause Chain"]
R1["Isekai genre recently added<br/>2,000+ new titles indexed"]
R2["RAG returns larger context<br/>4,200 tokens vs 1,800 avg"]
R3["Sonnet prompt is longer<br/>More tokens = slower generation"]
R4["Two-node chain:<br/>3s classify + 27s recommend = 30s timeout"]
end
FLOW_METRIC --> R1
RAG_SIZE --> R2
BEDROCK_LATENCY --> R3
FLOW_LOGS --> R4
style FLOW_METRIC fill:#dc3545,color:#fff
style R4 fill:#ffc107,color:#000
Root Cause
- New Isekai genre explosion — 2,000+ titles recently indexed, making RAG retrieval return much larger context (4,200 tokens vs. 1,800 average)
- Two-node serial chain — Classification (3s) + Recommendation (up to 27s) approaches the 30s timeout
- No per-genre token limits on RAG results — The RAG retrieval returned all matching results without limiting by genre
Resolution
"""
MangaAssist Prompt Flow Timeout Fix
RAG result limiting, flow optimization, and timeout-aware architecture.
"""
import json
import time
import logging
from typing import Optional
logger = logging.getLogger(__name__)
class OptimizedFlowInvoker:
"""
Prompt Flow invoker with timeout awareness and RAG optimization.
"""
# Per-genre RAG token limits
RAG_TOKEN_LIMITS = {
"isekai": 1500, # Large catalog — limit strictly
"shonen": 2000,
"shoujo": 2000,
"seinen": 2000,
"josei": 1800,
"horror": 1500,
"sports": 1500,
"default": 1800,
}
FLOW_TIMEOUT = 25 # seconds (5s buffer before 30s hard limit)
CLASSIFY_BUDGET = 4 # seconds for classification
RECOMMEND_BUDGET = 20 # seconds for recommendation
def __init__(self, bedrock_agent_client, redis_client):
self.bedrock = bedrock_agent_client
self.redis = redis_client
def invoke_with_timeout_awareness(
self,
user_message: str,
session_context: str,
rag_results: list[str],
genre_hint: Optional[str] = None,
) -> dict:
"""
Invoke the recommendation flow with timeout-aware optimizations.
"""
start = time.time()
# Step 1: If we already know the genre, skip classification
if genre_hint:
genre = genre_hint
intent = "recommendation"
classify_time = 0
else:
classify_start = time.time()
classification = self._classify_with_timeout(
user_message, session_context
)
classify_time = time.time() - classify_start
genre = classification.get("genre", "unknown")
intent = classification.get("intent", "general")
if intent != "recommendation":
return {"intent": intent, "genre": genre}
# Step 2: Limit RAG results based on genre
token_limit = self.RAG_TOKEN_LIMITS.get(
genre, self.RAG_TOKEN_LIMITS["default"]
)
trimmed_rag = self._trim_rag_results(rag_results, token_limit)
# Step 3: Calculate remaining time budget
elapsed = time.time() - start
remaining = self.FLOW_TIMEOUT - elapsed
if remaining < 5:
# Not enough time — use cached recommendation or Haiku fallback
logger.warning(
f"Insufficient time for recommendation: {remaining:.1f}s remaining"
)
return self._get_cached_or_fallback(user_message, genre)
# Step 4: Invoke recommendation with adjusted timeout
recommendation = self._generate_recommendation(
user_message=user_message,
genre=genre,
rag_context="\n".join(trimmed_rag),
session_context=session_context,
timeout=remaining - 2, # 2s buffer
)
total_time = time.time() - start
logger.info(
f"Flow completed: genre={genre}, classify={classify_time:.1f}s, "
f"total={total_time:.1f}s"
)
return recommendation
def _classify_with_timeout(
self, message: str, context: str
) -> dict:
"""Classify intent with strict timeout."""
try:
# Use Haiku for fast classification
# Direct Bedrock call instead of Prompt Flow for speed
import boto3
bedrock_runtime = boto3.client("bedrock-runtime", region_name="ap-northeast-1")
body = json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 150,
"messages": [{"role": "user", "content": [{"type": "text", "text": (
f"Classify this manga query. Respond with JSON only.\n"
f"Message: {message}\n"
f'Output: {{"intent": "recommendation|faq|general", '
f'"genre": "shonen|shoujo|seinen|josei|isekai|horror|sports|unknown"}}'
)}]}],
"temperature": 0.1,
})
response = bedrock_runtime.invoke_model(
modelId="anthropic.claude-3-haiku-20240307-v1:0",
body=body,
)
result = json.loads(response["body"].read())
text = result["content"][0]["text"]
return json.loads(text)
except Exception as e:
logger.error(f"Classification failed: {e}")
return {"intent": "general", "genre": "unknown"}
def _trim_rag_results(
self, results: list[str], max_tokens: int
) -> list[str]:
"""Trim RAG results to fit within token budget."""
trimmed = []
total_tokens = 0
for result in results:
# Estimate tokens (JP text: ~1 token per 1.5 chars)
est_tokens = len(result) // 2
if total_tokens + est_tokens > max_tokens:
break
trimmed.append(result)
total_tokens += est_tokens
if len(trimmed) < len(results):
logger.info(
f"RAG trimmed: {len(results)} -> {len(trimmed)} results, "
f"~{total_tokens} tokens (limit: {max_tokens})"
)
return trimmed
def _generate_recommendation(
self, user_message: str, genre: str, rag_context: str,
session_context: str, timeout: float,
) -> dict:
"""Generate recommendation with timeout."""
# Implementation invokes Bedrock with timeout
pass
def _get_cached_or_fallback(self, message: str, genre: str) -> dict:
"""Return cached recommendation or quick Haiku fallback."""
# Check cache for similar recent queries
cache_key = f"rec_cache:{genre}:{hash(message) % 10000}"
cached = self.redis.get(cache_key)
if cached:
return {"text": cached, "cached": True}
# Quick Haiku fallback
return {
"text": f"「{genre}」ジャンルのおすすめ作品をお探しですね。"
"少々お待ちください。詳しいおすすめは次のメッセージでお伝えします。",
"fallback": True,
}
Prevention
- Per-genre RAG token limits — cap context size per genre to prevent prompt bloat
- Time-budget-aware flow execution — track elapsed time and adapt strategy (skip classification if genre is known, use Haiku fallback if time is short)
- Direct Bedrock call for classification instead of Prompt Flow — saves ~2 seconds of flow orchestration overhead
- Cached recommendations for popular queries in Redis with 5-minute TTL
- Genre-specific monitoring dashboard — CloudWatch alarm when any genre's P95 latency exceeds 15 seconds
Scenario 4: Stale Cached Response
Problem
Users report receiving outdated manga availability information. A title that went out of stock 2 hours ago still shows as "在庫あり" (In stock) in the chatbot's responses. Investigation reveals that Redis is serving cached Bedrock responses that reference stale product data, and the CloudFront CDN is also caching API responses due to a misconfigured Cache-Control header.
Detection
graph TB
subgraph Symptoms["User Symptoms"]
S1["Chat says 在庫あり<br/>but product page shows 在庫なし"]
S2["Prices in chat response<br/>differ from catalog"]
S3["New releases not appearing<br/>in recommendations"]
end
subgraph CacheStack["Cache Layer Analysis"]
CDN[CloudFront CDN<br/>Cache-Control: max-age=3600<br/>on /chat endpoint ERROR]
REDIS_RESP[Redis Response Cache<br/>TTL: 1 hour for chat responses<br/>Not invalidated on product change]
REDIS_RAG[Redis RAG Cache<br/>OpenSearch results cached 30min<br/>Not invalidated on reindex]
DYNAMO_CACHE[DynamoDB DAX<br/>Item cache TTL: 5 min<br/>Query cache TTL: 1 min]
end
S1 --> CDN
S1 --> REDIS_RESP
S2 --> REDIS_RAG
S3 --> REDIS_RAG
style CDN fill:#dc3545,color:#fff
style REDIS_RESP fill:#ffc107,color:#000
Root Cause
- CloudFront caching API responses — A misconfigured
Cache-Controlheader (max-age=3600instead ofno-cache) on the chat endpoint caused CloudFront to cache dynamic responses for 1 hour - Redis response cache not inventory-aware — Chat responses were cached with a flat 1-hour TTL, regardless of whether they referenced product inventory
- RAG cache not invalidated on product updates — OpenSearch query results were cached for 30 minutes and not invalidated when products were updated or delisted
Resolution
"""
MangaAssist Cache Invalidation Strategy
Multi-layer cache management with event-driven invalidation.
"""
import json
import time
import hashlib
import logging
from typing import Optional
import boto3
logger = logging.getLogger(__name__)
class CacheManager:
"""
Multi-layer cache manager with event-driven invalidation.
Handles Redis response cache, RAG cache, and CloudFront.
"""
def __init__(self, redis_client, cloudfront_client=None):
self.redis = redis_client
self.cloudfront = cloudfront_client or boto3.client("cloudfront")
self.distribution_id = "E1234567890ABC"
def cache_chat_response(
self,
session_id: str,
message: str,
response_text: str,
referenced_products: list[str],
ttl: int = 300, # 5 minutes, not 1 hour
) -> None:
"""
Cache a chat response with product-aware metadata.
Short TTL (5 min) and product dependency tracking.
"""
cache_key = self._make_cache_key(session_id, message)
cache_entry = {
"text": response_text,
"referenced_products": referenced_products,
"cached_at": int(time.time()),
}
# Store response with short TTL
self.redis.setex(
f"chat_cache:{cache_key}",
ttl,
json.dumps(cache_entry, ensure_ascii=False),
)
# Track which cache entries reference which products
# This enables targeted invalidation when products change
for product_id in referenced_products:
self.redis.sadd(f"product_cache_refs:{product_id}", cache_key)
self.redis.expire(f"product_cache_refs:{product_id}", ttl + 60)
def get_cached_response(
self, session_id: str, message: str
) -> Optional[str]:
"""Get cached response if still valid."""
cache_key = self._make_cache_key(session_id, message)
cached = self.redis.get(f"chat_cache:{cache_key}")
if not cached:
return None
entry = json.loads(cached)
# Verify referenced products haven't been invalidated
for product_id in entry.get("referenced_products", []):
if self.redis.get(f"product_invalidated:{product_id}"):
# Product changed — cache is stale
self.redis.delete(f"chat_cache:{cache_key}")
logger.info(
f"Cache invalidated: product {product_id} changed"
)
return None
return entry["text"]
def invalidate_product_caches(self, product_id: str) -> int:
"""
Invalidate all caches referencing a specific product.
Called by the product update event handler.
"""
# Mark product as recently changed
self.redis.setex(
f"product_invalidated:{product_id}", 600, "1"
)
# Find and delete all chat cache entries referencing this product
cache_refs = self.redis.smembers(f"product_cache_refs:{product_id}")
deleted = 0
for cache_key in cache_refs:
if self.redis.delete(f"chat_cache:{cache_key}"):
deleted += 1
self.redis.delete(f"product_cache_refs:{product_id}")
# Also invalidate RAG cache for this product
self.redis.delete(f"rag_cache:product:{product_id}")
logger.info(
f"Product {product_id} cache invalidation: "
f"{deleted} chat caches, 1 RAG cache"
)
return deleted
def invalidate_cloudfront_paths(self, paths: list[str]) -> str:
"""Create CloudFront invalidation for specific paths."""
response = self.cloudfront.create_invalidation(
DistributionId=self.distribution_id,
InvalidationBatch={
"Paths": {
"Quantity": len(paths),
"Items": paths,
},
"CallerReference": f"manga-{int(time.time())}",
},
)
invalidation_id = response["Invalidation"]["Id"]
logger.info(f"CloudFront invalidation created: {invalidation_id}")
return invalidation_id
def _make_cache_key(self, session_id: str, message: str) -> str:
"""Generate deterministic cache key."""
return hashlib.sha256(
f"{session_id}:{message}".encode()
).hexdigest()[:16]
def handle_product_update_event(event: dict, context) -> dict:
"""
DynamoDB Stream handler for product table updates.
Triggers cache invalidation when products change.
"""
import redis
r = redis.Redis(
host="manga-assist-cache.xxxxx.apne1.cache.amazonaws.com",
port=6379, decode_responses=True,
)
cache_mgr = CacheManager(r)
invalidated = 0
for record in event.get("Records", []):
if record["eventName"] in ("MODIFY", "REMOVE"):
product_id = record["dynamodb"]["Keys"]["productId"]["S"]
invalidated += cache_mgr.invalidate_product_caches(product_id)
# Fix: Set correct Cache-Control header on API Gateway
# This should already be configured, but we verify
return {
"statusCode": 200,
"headers": {
"Cache-Control": "no-store, no-cache, must-revalidate",
},
"body": json.dumps({"invalidated": invalidated}),
}
def fix_cloudfront_cache_policy() -> dict:
"""
Fix CloudFront cache behavior for the chat API.
Chat API responses must NEVER be cached at the CDN layer.
"""
cloudfront = boto3.client("cloudfront")
# Create a no-cache policy for API endpoints
policy = cloudfront.create_cache_policy(
CachePolicyConfig={
"Name": "MangaAssist-API-NoCache",
"DefaultTTL": 0,
"MaxTTL": 0,
"MinTTL": 0,
"ParametersInCacheKeyAndForwardedToOrigin": {
"EnableAcceptEncodingGzip": True,
"EnableAcceptEncodingBrotli": True,
"HeadersConfig": {"HeaderBehavior": "none"},
"CookiesConfig": {"CookieBehavior": "none"},
"QueryStringsConfig": {"QueryStringBehavior": "none"},
},
}
)
return {"policy_id": policy["CachePolicy"]["Id"]}
Prevention
- CloudFront cache policy with TTL=0 for all API endpoints — dynamic responses must never be CDN-cached
- Product-aware cache keys — chat responses track which products they reference for targeted invalidation
- DynamoDB Streams trigger cache invalidation Lambda on product updates
- Short default TTL (5 minutes, not 1 hour) for all chat response caches
- Cache-Control: no-store header enforced on all chat API responses
Scenario 5: Unexpected Prompt Chains
Problem
The MangaAssist Prompt Flow for complex recommendations starts producing extremely long, expensive responses. A condition node bug causes the flow to loop through the recommendation node 3 times instead of once, tripling the token usage and cost. One user's query generates $0.45 in Bedrock costs (normally ~$0.015) and takes 45 seconds.
Detection
graph TB
subgraph Signals["Detection Signals"]
TOKEN_SPIKE[Token Usage Metric<br/>Single request: 12,000 output tokens<br/>Normal: 400 output tokens]
COST_ALERT[Cost Alert<br/>$0.45 per request<br/>Normal: $0.015]
LATENCY[Latency Spike<br/>45 seconds<br/>Normal: 3 seconds]
FLOW_TRACE[Flow Execution Trace<br/>GenerateRec node invoked 3x]
end
subgraph Bug["Root Cause: Condition Bug"]
COND[Condition Node<br/>$.genre != 'unknown']
LOOP["Always true after first pass<br/>Genre is set to 'isekai'<br/>Loop: Recommend → Classify → Recommend"]
EDGE["Edge configuration<br/>Missing exit condition<br/>Default routes back to start"]
end
TOKEN_SPIKE --> COND
COST_ALERT --> LOOP
FLOW_TRACE --> EDGE
style TOKEN_SPIKE fill:#dc3545,color:#fff
style LOOP fill:#ffc107,color:#000
Root Cause
- Condition node default branch pointed back to the classification node instead of the output node
- No loop detection in the flow execution — Bedrock Prompt Flows allow cycles if configured
- No per-flow token budget — the flow could consume unlimited tokens across multiple node invocations
Resolution
"""
MangaAssist Prompt Flow Safety Guards
Prevents runaway prompt chains, loops, and token budget explosions.
"""
import json
import time
import logging
from dataclasses import dataclass
from typing import Optional
import boto3
logger = logging.getLogger(__name__)
@dataclass
class FlowSafetyConfig:
"""Safety limits for Prompt Flow execution."""
max_node_invocations: int = 5
max_total_tokens: int = 5000
max_execution_time_seconds: float = 20.0
max_cost_per_flow_usd: float = 0.10
enable_loop_detection: bool = True
class SafeFlowInvoker:
"""
Invoke Prompt Flows with safety guards against loops and runaway costs.
"""
def __init__(
self,
bedrock_agent_client,
redis_client,
config: Optional[FlowSafetyConfig] = None,
):
self.bedrock = bedrock_agent_client
self.redis = redis_client
self.config = config or FlowSafetyConfig()
def invoke_flow_safe(
self,
flow_id: str,
alias_id: str,
inputs: list[dict],
request_id: str,
) -> dict:
"""
Invoke a Prompt Flow with safety guards.
Monitors token usage, execution time, and node invocation count.
"""
start = time.time()
node_invocations = 0
total_tokens = 0
total_cost = 0.0
visited_nodes = []
try:
response = self.bedrock.invoke_flow(
flowIdentifier=flow_id,
flowAliasIdentifier=alias_id,
inputs=inputs,
)
result_parts = []
for event in response.get("responseStream", []):
# Time guard
elapsed = time.time() - start
if elapsed > self.config.max_execution_time_seconds:
logger.error(
f"Flow timeout: {elapsed:.1f}s > "
f"{self.config.max_execution_time_seconds}s"
)
self._record_safety_violation(
request_id, "timeout", elapsed
)
break
if "flowTraceEvent" in event:
trace = event["flowTraceEvent"]
node_name = trace.get("nodeName", "unknown")
visited_nodes.append(node_name)
node_invocations += 1
# Node invocation guard
if node_invocations > self.config.max_node_invocations:
logger.error(
f"Max node invocations exceeded: "
f"{node_invocations} > {self.config.max_node_invocations}. "
f"Visited: {visited_nodes}"
)
self._record_safety_violation(
request_id, "max_nodes", node_invocations
)
break
# Loop detection
if self.config.enable_loop_detection:
if self._detect_loop(visited_nodes):
logger.error(
f"Loop detected in flow: {visited_nodes}"
)
self._record_safety_violation(
request_id, "loop", visited_nodes
)
break
# Track token usage from trace events
usage = trace.get("usage", {})
tokens = usage.get("output_tokens", 0)
total_tokens += tokens
if total_tokens > self.config.max_total_tokens:
logger.error(
f"Token budget exceeded: {total_tokens} > "
f"{self.config.max_total_tokens}"
)
self._record_safety_violation(
request_id, "token_budget", total_tokens
)
break
elif "flowOutputEvent" in event:
output = event["flowOutputEvent"]
content = output.get("content", {})
if "document" in content:
result_parts.append(str(content["document"]))
elif "flowCompletionEvent" in event:
pass # Normal completion
elapsed = time.time() - start
return {
"text": "".join(result_parts),
"metrics": {
"node_invocations": node_invocations,
"total_tokens": total_tokens,
"execution_time_seconds": round(elapsed, 2),
"visited_nodes": visited_nodes,
},
}
except Exception as e:
logger.error(f"Flow invocation failed: {e}")
raise
def _detect_loop(self, visited: list[str]) -> bool:
"""
Detect loops in node visitation pattern.
A loop is detected if a sequence of 2+ nodes repeats.
"""
if len(visited) < 4:
return False
# Check for repeated pairs (A→B→A→B)
for pattern_len in range(2, len(visited) // 2 + 1):
pattern = visited[-pattern_len:]
preceding = visited[-(pattern_len * 2):-pattern_len]
if pattern == preceding:
return True
# Check for single-node repetition (A→A→A)
if len(visited) >= 3:
last_three = visited[-3:]
if len(set(last_three)) == 1:
return True
return False
def _record_safety_violation(
self, request_id: str, violation_type: str, details
) -> None:
"""Record safety violation for monitoring and alerting."""
violation = {
"request_id": request_id,
"type": violation_type,
"details": str(details),
"timestamp": int(time.time()),
}
# Store in Redis for dashboard
self.redis.lpush(
"flow_safety_violations",
json.dumps(violation),
)
self.redis.ltrim("flow_safety_violations", 0, 999)
# Increment violation counter for alerting
self.redis.incr(f"safety_violations:{violation_type}")
self.redis.expire(f"safety_violations:{violation_type}", 3600)
def validate_flow_definition(flow_def: dict) -> list[str]:
"""
Static analysis of a flow definition to detect potential issues.
Run before deploying a new flow version.
"""
issues = []
nodes = {n["name"]: n for n in flow_def.get("nodes", [])}
connections = flow_def.get("connections", [])
# Build adjacency list
adj = {}
for conn in connections:
src = conn["source"]
tgt = conn["target"]
adj.setdefault(src, []).append(tgt)
# Check 1: Detect cycles using DFS
def has_cycle(node, visited, rec_stack):
visited.add(node)
rec_stack.add(node)
for neighbor in adj.get(node, []):
if neighbor not in visited:
if has_cycle(neighbor, visited, rec_stack):
return True
elif neighbor in rec_stack:
return True
rec_stack.discard(node)
return False
visited = set()
for node_name in nodes:
if node_name not in visited:
if has_cycle(node_name, visited, set()):
issues.append(
f"CRITICAL: Cycle detected in flow graph involving node '{node_name}'"
)
# Check 2: Verify all condition branches have targets
for node in flow_def.get("nodes", []):
if node.get("type") == "Condition":
outputs = {o["name"] for o in node.get("outputs", [])}
connected = {
conn["sourceOutput"]
for conn in connections
if conn["source"] == node["name"]
}
missing = outputs - connected
if missing:
issues.append(
f"WARNING: Condition node '{node['name']}' has unconnected "
f"branches: {missing}"
)
# Check 3: Verify output node is reachable
output_nodes = [n["name"] for n in flow_def["nodes"] if n["type"] == "Output"]
for out in output_nodes:
incoming = [c for c in connections if c["target"] == out]
if not incoming:
issues.append(
f"ERROR: Output node '{out}' has no incoming connections"
)
return issues
Prevention
- Static analysis of flow definitions before deployment — detect cycles, unconnected branches, and unreachable outputs
- Runtime loop detection — pattern matching on node visitation sequence (A-B-A-B detection)
- Per-flow token budget — hard limit of 5,000 tokens across all nodes in a single invocation
- Max node invocation count — stop execution after 5 node visits
- Safety violation dashboard with CloudWatch alarm on violation rate > 0
Key Takeaways
| # | Takeaway | MangaAssist Application |
|---|---|---|
| 1 | Mobile WebSocket needs carrier-aware keepalive — Japanese carrier NAT tables timeout at 30 seconds; application-level pings at 20-second intervals are required. | MangaAssist mobile users on Docomo, au, and SoftBank maintain stable connections through aggressive keepalive. |
| 2 | Spec drift is a process failure, not a code failure — Pre-commit hooks and CI checks that block route changes without spec updates prevent drift at the source. | Partners auto-regenerate SDKs on spec changes; breaking changes require manual API team approval before merge. |
| 3 | Prompt Flow timeouts need time-budget awareness — Track elapsed time across flow nodes and adapt strategy (skip steps, use fallback models) as the budget shrinks. | Isekai genre queries skip the classification node when genre is known, saving 3 seconds of the 25-second budget. |
| 4 | Dynamic API responses must never be CDN-cached — CloudFront Cache-Control: no-store is mandatory for chat endpoints; only static assets should be cached. |
A single misconfigured header caused 2 hours of stale inventory data in chat responses for all users. |
| 5 | Prompt Flows need static analysis + runtime guards — Cycle detection before deployment plus loop detection and token budgets during execution prevent runaway costs. | The flow validator runs in CI; the runtime guard has a hard stop at 5 node invocations and 5,000 tokens per flow execution. |