Rate Limiting, Fallback Mechanisms, and FM Observability
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Attribute | Value |
|---|---|
| Certification | AWS Certified AI Practitioner (AIP-C01) |
| Domain | 2 — Implementation and Integration of Foundation Models |
| Task | 2.4 — Design resilient and scalable FM-based applications |
| Skill | 2.4.3 — Create resilient FM systems to ensure reliable operations |
| Focus Areas | API Gateway usage plans, multi-tier fallback, X-Ray service maps, CloudWatch dashboards |
1. API Gateway Usage Plans and Throttle Settings
Usage Plan Architecture for MangaAssist
API Gateway usage plans control who can access the MangaAssist API and at what rate. For a WebSocket API serving 1M messages/day, the usage plan architecture must balance three competing concerns:
- Protection — Prevent any single user or integration from overwhelming Bedrock
- Fairness — Ensure all manga readers get responsive service during peak hours
- Cost control — Prevent runaway FM costs from misbehaving clients
Usage Plan Tiers
┌─────────────────────────────────────────────────────────────┐
│ MangaAssist Usage Plans │
├─────────────────────────────────────────────────────────────┤
│ │
│ Free Tier (Anonymous Browsers) │
│ ├── Rate: 2 req/s │
│ ├── Burst: 5 │
│ ├── Quota: 100 messages/day │
│ └── Model: Haiku only (cost-effective) │
│ │
│ Registered Tier (Logged-in Users) │
│ ├── Rate: 5 req/s │
│ ├── Burst: 15 │
│ ├── Quota: 1,000 messages/day │
│ └── Model: Sonnet + Haiku fallback │
│ │
│ Premium Tier (Paid Subscribers) │
│ ├── Rate: 15 req/s │
│ ├── Burst: 40 │
│ ├── Quota: 10,000 messages/day │
│ └── Model: Sonnet priority + extended context │
│ │
│ Internal Tier (Admin / Analytics) │
│ ├── Rate: 50 req/s │
│ ├── Burst: 100 │
│ ├── Quota: Unlimited │
│ └── Model: Sonnet with full tool access │
│ │
└─────────────────────────────────────────────────────────────┘
Throttle Behavior Under Load
Scenario: Evening surge — 8 PM JST, manga readers checking new weekly releases
Normal traffic: ~800 req/s
Surge traffic: ~3,500 req/s (weekly Shonen Jump release day)
API Gateway response:
Requests 1-2000: → 200 OK (within rate limit)
Requests 2001-4000: → 200 OK (burst capacity absorbs)
Requests 4001+: → 429 Too Many Requests
Response headers for throttled requests:
HTTP/1.1 429 Too Many Requests
Retry-After: 1
X-RateLimit-Limit: 2000
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1711900800
WAF Integration for DDoS Protection
WAF Rule Groups for MangaAssist:
┌─────────────────────────────────────────┐
│ Rule 1: IP Rate Limiting │
│ Action: Block │
│ Threshold: 100 requests/5 min per IP │
│ Scope: All routes │
├─────────────────────────────────────────┤
│ Rule 2: Geographic Restriction │
│ Action: Allow │
│ Regions: JP, US, EU (primary markets) │
│ Block: Known bot regions │
├─────────────────────────────────────────┤
│ Rule 3: Request Size Limit │
│ Action: Block │
│ Max body: 8 KB (manga queries) │
│ Max URL: 2 KB │
├─────────────────────────────────────────┤
│ Rule 4: SQL/XSS Protection │
│ Action: Block │
│ AWS Managed Rules: SQLi + XSS │
└─────────────────────────────────────────┘
Code: UsagePlanManager
"""
UsagePlanManager — Manages API Gateway usage plans for MangaAssist.
Handles creation, update, and monitoring of usage plans across tiers
(free, registered, premium, internal). Integrates with CloudWatch for
throttle monitoring and automatic tier adjustment.
Architecture:
API Gateway Usage Plan → API Key → Stage + Route Throttle → CloudWatch Metrics
Each tier maps to different Bedrock model access:
- Free: Haiku only (cost-effective for anonymous browsing)
- Registered: Sonnet primary, Haiku fallback
- Premium: Sonnet priority with extended context window
- Internal: Full Sonnet access with all tools enabled
"""
import logging
import json
from dataclasses import dataclass, field
from typing import Optional, Dict, Any, List
from datetime import datetime, timezone
from enum import Enum
import boto3
logger = logging.getLogger("mangaassist.usageplan")
class UsageTier(Enum):
"""Usage tiers for MangaAssist API access."""
FREE = "free"
REGISTERED = "registered"
PREMIUM = "premium"
INTERNAL = "internal"
@dataclass
class TierConfig:
"""Configuration for a usage plan tier."""
tier: UsageTier
rate_limit: float # Steady-state requests per second
burst_limit: int # Maximum burst capacity
daily_quota: int # Maximum requests per day
monthly_quota: int # Maximum requests per month
allowed_models: List[str] # Which Bedrock models this tier can access
max_context_tokens: int # Maximum context window (input tokens)
priority: int # Priority for resource allocation (1=highest)
description: str = ""
@property
def quota_per_second_avg(self) -> float:
"""Average allowed requests per second based on daily quota."""
return self.daily_quota / 86400
# Default tier configurations for MangaAssist
DEFAULT_TIERS: Dict[UsageTier, TierConfig] = {
UsageTier.FREE: TierConfig(
tier=UsageTier.FREE,
rate_limit=2.0,
burst_limit=5,
daily_quota=100,
monthly_quota=2000,
allowed_models=["anthropic.claude-3-haiku-20240307-v1:0"],
max_context_tokens=2048,
priority=4,
description="Free tier for anonymous manga store browsers",
),
UsageTier.REGISTERED: TierConfig(
tier=UsageTier.REGISTERED,
rate_limit=5.0,
burst_limit=15,
daily_quota=1000,
monthly_quota=25000,
allowed_models=[
"anthropic.claude-3-sonnet-20240229-v1:0",
"anthropic.claude-3-haiku-20240307-v1:0",
],
max_context_tokens=4096,
priority=3,
description="Registered user tier with Sonnet access",
),
UsageTier.PREMIUM: TierConfig(
tier=UsageTier.PREMIUM,
rate_limit=15.0,
burst_limit=40,
daily_quota=10000,
monthly_quota=250000,
allowed_models=[
"anthropic.claude-3-sonnet-20240229-v1:0",
"anthropic.claude-3-haiku-20240307-v1:0",
],
max_context_tokens=8192,
priority=2,
description="Premium subscriber tier with priority Sonnet access",
),
UsageTier.INTERNAL: TierConfig(
tier=UsageTier.INTERNAL,
rate_limit=50.0,
burst_limit=100,
daily_quota=1000000,
monthly_quota=30000000,
allowed_models=[
"anthropic.claude-3-sonnet-20240229-v1:0",
"anthropic.claude-3-haiku-20240307-v1:0",
],
max_context_tokens=16384,
priority=1,
description="Internal tier for admin and analytics",
),
}
@dataclass
class UsagePlanResult:
"""Result of a usage plan operation."""
plan_id: str
tier: UsageTier
api_key_id: Optional[str] = None
api_key_value: Optional[str] = None
status: str = "created"
class UsagePlanManager:
"""
Manages the full lifecycle of API Gateway usage plans for MangaAssist.
Responsibilities:
1. Create and configure usage plans for each tier
2. Generate and assign API keys to plans
3. Monitor usage and throttle metrics
4. Auto-adjust throttle settings based on demand
5. Publish usage metrics to CloudWatch
Usage:
manager = UsagePlanManager(api_id="abc123", stage="prod")
# Create all tier plans
plans = manager.create_all_usage_plans()
# Assign a user to a tier
key = manager.create_api_key(
user_id="user_123",
tier=UsageTier.REGISTERED,
)
# Check usage
usage = manager.get_usage_stats(
plan_id=plans[UsageTier.REGISTERED].plan_id,
)
"""
def __init__(
self,
api_id: str,
stage: str = "prod",
region: str = "ap-northeast-1",
tiers: Optional[Dict[UsageTier, TierConfig]] = None,
):
self.api_id = api_id
self.stage = stage
self.region = region
self.tiers = tiers or DEFAULT_TIERS
self._apigw = boto3.client("apigateway", region_name=region)
self._cloudwatch = boto3.client("cloudwatch", region_name=region)
self._plan_cache: Dict[UsageTier, str] = {}
def create_usage_plan(self, tier: UsageTier) -> UsagePlanResult:
"""
Create an API Gateway usage plan for the specified tier.
Args:
tier: The usage tier to create a plan for.
Returns:
UsagePlanResult with the plan ID and configuration.
"""
config = self.tiers[tier]
plan = self._apigw.create_usage_plan(
name=f"MangaAssist-{tier.value.title()}",
description=config.description,
apiStages=[
{
"apiId": self.api_id,
"stage": self.stage,
"throttle": {
# Per-route throttle for the sendMessage route
"sendMessage": {
"burstLimit": config.burst_limit,
"rateLimit": config.rate_limit,
},
# Connection establishment gets lower limits
"$connect": {
"burstLimit": max(1, config.burst_limit // 5),
"rateLimit": max(1.0, config.rate_limit / 5),
},
# Default route uses standard limits
"$default": {
"burstLimit": config.burst_limit,
"rateLimit": config.rate_limit,
},
},
}
],
throttle={
"burstLimit": config.burst_limit,
"rateLimit": config.rate_limit,
},
quota={
"limit": config.monthly_quota,
"offset": 0,
"period": "MONTH",
},
tags={
"Service": "MangaAssist",
"Tier": tier.value,
"Environment": self.stage,
},
)
plan_id = plan["id"]
self._plan_cache[tier] = plan_id
logger.info(
f"Created usage plan [{plan_id}] for tier={tier.value} "
f"rate={config.rate_limit} burst={config.burst_limit}"
)
return UsagePlanResult(plan_id=plan_id, tier=tier)
def create_all_usage_plans(self) -> Dict[UsageTier, UsagePlanResult]:
"""Create usage plans for all defined tiers."""
results = {}
for tier in self.tiers:
results[tier] = self.create_usage_plan(tier)
return results
def create_api_key(
self,
user_id: str,
tier: UsageTier,
customer_name: Optional[str] = None,
) -> UsagePlanResult:
"""
Create an API key and associate it with a usage plan.
Args:
user_id: Unique user identifier.
tier: The tier to assign the user to.
customer_name: Optional display name.
Returns:
UsagePlanResult with key details.
"""
plan_id = self._plan_cache.get(tier)
if not plan_id:
result = self.create_usage_plan(tier)
plan_id = result.plan_id
key = self._apigw.create_api_key(
name=f"MangaAssist-{user_id}",
description=f"API key for {customer_name or user_id} ({tier.value} tier)",
enabled=True,
tags={
"UserId": user_id,
"Tier": tier.value,
"Service": "MangaAssist",
},
)
self._apigw.create_usage_plan_key(
usagePlanId=plan_id,
keyId=key["id"],
keyType="API_KEY",
)
logger.info(
f"Created API key [{key['id']}] for user={user_id} tier={tier.value}"
)
return UsagePlanResult(
plan_id=plan_id,
tier=tier,
api_key_id=key["id"],
api_key_value=key["value"],
)
def get_usage_stats(
self, plan_id: str, key_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Get usage statistics for a usage plan.
Returns daily and monthly usage with remaining quota.
"""
usage = self._apigw.get_usage(
usagePlanId=plan_id,
startDate=datetime.now(timezone.utc).strftime("%Y-%m-%d"),
endDate=datetime.now(timezone.utc).strftime("%Y-%m-%d"),
)
return {
"plan_id": plan_id,
"usage_data": usage.get("items", {}),
"position": usage.get("position"),
}
def update_throttle_for_load(
self, tier: UsageTier, load_factor: float
) -> Dict[str, Any]:
"""
Dynamically adjust throttle settings based on current load.
Args:
tier: The tier to adjust.
load_factor: Current load as fraction of capacity (0.0-1.0+).
If load_factor > 0.8 and this is a lower-priority tier, reduce
its limits to protect higher-priority tiers.
"""
config = self.tiers[tier]
plan_id = self._plan_cache.get(tier)
if not plan_id:
logger.warning(f"No plan ID cached for tier {tier.value}")
return {"error": "Plan not found"}
adjusted_rate = config.rate_limit
adjusted_burst = config.burst_limit
if load_factor > 0.9 and config.priority >= 3:
# Under extreme load, reduce low-priority tiers by 50%
adjusted_rate = config.rate_limit * 0.5
adjusted_burst = config.burst_limit // 2
logger.warning(
f"Reducing tier {tier.value} to rate={adjusted_rate} "
f"burst={adjusted_burst} (load={load_factor:.1%})"
)
elif load_factor > 0.8 and config.priority >= 3:
# Under high load, reduce low-priority tiers by 25%
adjusted_rate = config.rate_limit * 0.75
adjusted_burst = int(config.burst_limit * 0.75)
update = self._apigw.update_usage_plan(
usagePlanId=plan_id,
patchOperations=[
{
"op": "replace",
"path": "/throttle/rateLimit",
"value": str(adjusted_rate),
},
{
"op": "replace",
"path": "/throttle/burstLimit",
"value": str(adjusted_burst),
},
],
)
self._publish_throttle_adjustment_metric(tier, load_factor, adjusted_rate)
return {
"tier": tier.value,
"original_rate": config.rate_limit,
"adjusted_rate": adjusted_rate,
"original_burst": config.burst_limit,
"adjusted_burst": adjusted_burst,
"load_factor": load_factor,
}
def _publish_throttle_adjustment_metric(
self, tier: UsageTier, load_factor: float, new_rate: float
):
"""Publish throttle adjustment events to CloudWatch."""
try:
self._cloudwatch.put_metric_data(
Namespace="MangaAssist/UsagePlans",
MetricData=[
{
"MetricName": "ThrottleAdjustment",
"Dimensions": [
{"Name": "Tier", "Value": tier.value},
{"Name": "Service", "Value": "MangaAssist"},
],
"Timestamp": datetime.now(timezone.utc),
"Value": new_rate,
"Unit": "Count/Second",
},
{
"MetricName": "LoadFactor",
"Dimensions": [
{"Name": "Service", "Value": "MangaAssist"},
],
"Timestamp": datetime.now(timezone.utc),
"Value": load_factor,
"Unit": "None",
},
],
)
except Exception as e:
logger.warning(f"Failed to publish throttle adjustment metric: {e}")
2. Multi-Tier Fallback: Sonnet to Haiku to Cached to Static
Fallback Decision Matrix
| Condition | Primary Action | Fallback Action | User Experience |
|---|---|---|---|
| Sonnet healthy | Invoke Sonnet | N/A | Full capability — rich recommendations, nuanced answers |
| Sonnet throttled | Retry with backoff | Fall to Haiku | Slightly simpler answers, faster response |
| Sonnet + Haiku down | N/A | Serve from Redis cache | Previous answer for similar query (may be stale) |
| All models + cache miss | N/A | Static FAQ match | Basic scripted answer for common questions |
| Complete service failure | N/A | Graceful message | Friendly acknowledgment + alternative contact |
Fallback Tier Quality Characteristics
Tier 1 — Claude 3 Sonnet (Confidence: 1.0)
├── Understands complex manga recommendations with nuance
├── Can reason about series relationships and genre preferences
├── Handles multi-turn conversations about order issues
├── Provides personalized suggestions based on reading history
└── Cost: $3/$15 per 1M tokens (input/output)
Tier 2 — Claude 3 Haiku (Confidence: 0.7)
├── Handles straightforward questions well
├── Good for order status, shipping, basic recommendations
├── May miss nuance in complex preference discussions
├── Faster response time (200-500ms vs 1-3s)
└── Cost: $0.25/$1.25 per 1M tokens (60x cheaper input)
Tier 3 — Redis Cache (Confidence: 0.5)
├── Instant response (<10ms)
├── Only works for queries similar to previous ones
├── Risk of stale data (prices, availability)
├── No personalization — generic cached answer
└── Cost: ~$0.0001 per lookup
Tier 4 — Static FAQ (Confidence: 0.3)
├── Pre-written answers for common categories
├── No dynamic content or personalization
├── Always available (in-memory)
├── Limited to ~20 FAQ categories
└── Cost: $0 per lookup
Tier 5 — Graceful Degradation (Confidence: 0.1)
├── User-friendly "temporarily unavailable" message
├── Provides alternative contact methods
├── Directs user to browse catalog directly
├── Never leaves user with an error page
└── Cost: $0
Stale Cache Detection
One of the biggest risks with cache-based fallback is serving stale data. For MangaAssist, stale data manifests as:
- Wrong prices — Manga volume sale ended but cached response still quotes discounted price
- Wrong availability — "In stock" for a volume that sold out
- Wrong release dates — Pre-order dates changed but cached response has old date
Cache Staleness Mitigation:
┌───────────────────────────────────────────────┐
│ Cache Entry Structure │
│ ├── key: sha256(normalized_query)[:16] │
│ ├── response: "Full FM-generated response" │
│ ├── created_at: 1711900800 │
│ ├── ttl: 3600 (1 hour) │
│ ├── contains_pricing: true │
│ ├── contains_availability: true │
│ ├── product_ids: ["manga_001", "manga_002"] │
│ └── staleness_risk: "high" │
├───────────────────────────────────────────────┤
│ Staleness Rules: │
│ ├── Pricing data: TTL = 15 minutes │
│ ├── Availability data: TTL = 5 minutes │
│ ├── Recommendations: TTL = 4 hours │
│ ├── General info: TTL = 24 hours │
│ └── FAQ-type queries: TTL = 7 days │
└───────────────────────────────────────────────┘
Code: TieredFallbackEngine
"""
TieredFallbackEngine — Advanced multi-tier fallback with cache-awareness,
staleness detection, and quality tracking for MangaAssist.
Extends the basic FallbackOrchestrator with:
- Staleness detection for cached responses (pricing, availability)
- Quality score tracking per tier for adaptive routing
- Warm-up strategy to pre-populate cache during low-traffic hours
- Metrics publishing for fallback tier usage patterns
- User notification when serving degraded responses
"""
import time
import json
import logging
import hashlib
import re
from dataclasses import dataclass, field
from typing import Optional, Dict, Any, List, Tuple
from datetime import datetime, timezone
from enum import Enum
import boto3
import redis
logger = logging.getLogger("mangaassist.tiered_fallback")
class ContentCategory(Enum):
"""Content categories with different staleness tolerances."""
PRICING = "pricing"
AVAILABILITY = "availability"
RECOMMENDATION = "recommendation"
ORDER_STATUS = "order_status"
GENERAL_INFO = "general_info"
FAQ = "faq"
# TTL settings per content category (in seconds)
CATEGORY_TTL: Dict[ContentCategory, int] = {
ContentCategory.PRICING: 900, # 15 minutes
ContentCategory.AVAILABILITY: 300, # 5 minutes
ContentCategory.RECOMMENDATION: 14400, # 4 hours
ContentCategory.ORDER_STATUS: 60, # 1 minute (near real-time)
ContentCategory.GENERAL_INFO: 86400, # 24 hours
ContentCategory.FAQ: 604800, # 7 days
}
@dataclass
class CacheEntry:
"""
Rich cache entry with staleness metadata.
Attributes:
response: The cached response text.
created_at: Unix timestamp when the response was generated.
category: Content category for TTL calculation.
product_ids: Product IDs referenced in the response.
model_id: Which model generated this response.
confidence: Original confidence score.
query_hash: Hash of the original query.
"""
response: str
created_at: float
category: ContentCategory
product_ids: List[str] = field(default_factory=list)
model_id: str = ""
confidence: float = 1.0
query_hash: str = ""
@property
def age_seconds(self) -> float:
return time.time() - self.created_at
@property
def ttl(self) -> int:
return CATEGORY_TTL.get(self.category, 3600)
@property
def is_stale(self) -> bool:
return self.age_seconds > self.ttl
@property
def staleness_factor(self) -> float:
"""0.0 = fresh, 1.0 = at TTL, >1.0 = past TTL."""
return self.age_seconds / self.ttl
def to_json(self) -> str:
return json.dumps({
"response": self.response,
"created_at": self.created_at,
"category": self.category.value,
"product_ids": self.product_ids,
"model_id": self.model_id,
"confidence": self.confidence,
"query_hash": self.query_hash,
})
@classmethod
def from_json(cls, data: str) -> "CacheEntry":
parsed = json.loads(data)
return cls(
response=parsed["response"],
created_at=parsed["created_at"],
category=ContentCategory(parsed["category"]),
product_ids=parsed.get("product_ids", []),
model_id=parsed.get("model_id", ""),
confidence=parsed.get("confidence", 1.0),
query_hash=parsed.get("query_hash", ""),
)
class ContentClassifier:
"""
Classifies user queries into content categories for cache TTL decisions.
Uses keyword matching for speed (runs on every request).
For production, could be replaced with a lightweight classifier model.
"""
CATEGORY_KEYWORDS: Dict[ContentCategory, List[str]] = {
ContentCategory.PRICING: [
"price", "cost", "how much", "discount", "sale", "coupon",
"expensive", "cheap", "deal", "offer", "yen", "dollar",
],
ContentCategory.AVAILABILITY: [
"in stock", "available", "out of stock", "sold out",
"restock", "back in stock", "inventory", "pre-order",
],
ContentCategory.ORDER_STATUS: [
"order", "tracking", "shipped", "delivery", "package",
"arrive", "estimated", "where is my",
],
ContentCategory.RECOMMENDATION: [
"recommend", "suggest", "similar", "like", "best",
"popular", "trending", "new release", "what should",
],
ContentCategory.FAQ: [
"return policy", "shipping policy", "payment method",
"hours", "contact", "support", "membership", "account",
],
}
@classmethod
def classify(cls, query: str) -> ContentCategory:
"""Classify a query into a content category."""
query_lower = query.lower()
best_category = ContentCategory.GENERAL_INFO
best_score = 0
for category, keywords in cls.CATEGORY_KEYWORDS.items():
score = sum(1 for kw in keywords if kw in query_lower)
if score > best_score:
best_score = score
best_category = category
return best_category
@classmethod
def extract_product_ids(cls, text: str) -> List[str]:
"""Extract product IDs from response text for cache invalidation."""
patterns = [
r"ISBN[:\s-]*([\d-]{10,17})",
r"MANGA[_-](\d{3,8})",
r"product[_/]id[:\s]*(\w{6,12})",
]
ids = []
for pattern in patterns:
ids.extend(re.findall(pattern, text, re.IGNORECASE))
return list(set(ids))
@dataclass
class FallbackTierMetrics:
"""Per-tier metrics for quality tracking."""
invocations: int = 0
successes: int = 0
failures: int = 0
total_latency_ms: float = 0.0
stale_serves: int = 0
user_satisfaction_signals: List[float] = field(default_factory=list)
@property
def success_rate(self) -> float:
return self.successes / max(1, self.invocations)
@property
def avg_latency_ms(self) -> float:
return self.total_latency_ms / max(1, self.invocations)
class TieredFallbackEngine:
"""
Advanced fallback engine with content-aware caching and staleness detection.
Improvements over basic FallbackOrchestrator:
1. Content-aware TTLs (pricing cached 15 min, FAQs cached 7 days)
2. Staleness warnings when serving cached responses near TTL expiry
3. Quality tracking per tier for adaptive routing decisions
4. Cache warm-up during low-traffic periods
5. Degradation notifications appended to cached/static responses
Usage:
engine = TieredFallbackEngine(
backoff_client=client,
redis_client=redis.Redis(...),
)
response = engine.get_response(
user_query="How much is volume 1 of One Piece?",
user_id="user_123",
)
if response.is_degraded:
# Optionally show degradation indicator in UI
pass
"""
STALENESS_WARNING_TEMPLATE = (
"\n\n---\n*Note: This information may not reflect the very latest "
"updates. For the most current {category} details, please check "
"our catalog directly or ask me again shortly.*"
)
def __init__(
self,
backoff_client,
redis_client: redis.Redis,
sonnet_model_id: str = "anthropic.claude-3-sonnet-20240229-v1:0",
haiku_model_id: str = "anthropic.claude-3-haiku-20240307-v1:0",
region: str = "ap-northeast-1",
):
self.backoff_client = backoff_client
self.redis = redis_client
self.sonnet_model_id = sonnet_model_id
self.haiku_model_id = haiku_model_id
self.tier_metrics: Dict[str, FallbackTierMetrics] = {
"sonnet": FallbackTierMetrics(),
"haiku": FallbackTierMetrics(),
"cached": FallbackTierMetrics(),
"static": FallbackTierMetrics(),
"graceful": FallbackTierMetrics(),
}
self._cloudwatch = boto3.client("cloudwatch", region_name=region)
self._classifier = ContentClassifier()
def _query_hash(self, query: str) -> str:
"""Generate a cache key from a normalized query."""
normalized = query.lower().strip()
return f"manga:cache:{hashlib.sha256(normalized.encode()).hexdigest()[:16]}"
def _get_cached(self, query: str) -> Optional[CacheEntry]:
"""Retrieve a cache entry with staleness metadata."""
try:
key = self._query_hash(query)
raw = self.redis.get(key)
if raw:
return CacheEntry.from_json(raw.decode("utf-8"))
except Exception as e:
logger.warning(f"Cache read failed: {e}")
return None
def _store_cached(
self,
query: str,
response: str,
category: ContentCategory,
model_id: str,
confidence: float,
):
"""Store a response in cache with rich metadata."""
try:
key = self._query_hash(query)
entry = CacheEntry(
response=response,
created_at=time.time(),
category=category,
product_ids=ContentClassifier.extract_product_ids(response),
model_id=model_id,
confidence=confidence,
query_hash=key,
)
ttl = CATEGORY_TTL.get(category, 3600)
self.redis.setex(key, ttl, entry.to_json())
except Exception as e:
logger.warning(f"Cache write failed: {e}")
def _build_body(self, query: str, history: List[Dict]) -> Dict:
"""Build Bedrock request body."""
messages = []
for msg in history[-10:]:
messages.append({
"role": msg["role"],
"content": [{"text": msg["content"]}],
})
messages.append({"role": "user", "content": [{"text": query}]})
return {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1024,
"system": (
"You are MangaAssist, a helpful assistant for a Japanese manga store. "
"Help customers find manga, answer questions about orders, and provide "
"recommendations. Always be friendly and knowledgeable about manga."
),
"messages": messages,
}
def get_response(
self,
user_query: str,
user_id: str = "anonymous",
conversation_history: Optional[List[Dict]] = None,
) -> Dict[str, Any]:
"""
Get a response using the tiered fallback with content-aware caching.
Returns a dict with:
- content: The response text
- tier: Which tier served the response
- confidence: Quality confidence score
- is_degraded: Whether this is a fallback response
- staleness_warning: Optional warning about data freshness
- latency_ms: Response time
"""
history = conversation_history or []
category = ContentClassifier.classify(user_query)
body = self._build_body(user_query, history)
# Tier 1: Claude 3 Sonnet
try:
start = time.monotonic()
result = self.backoff_client.invoke_model(
model_id=self.sonnet_model_id, body=body
)
elapsed = (time.monotonic() - start) * 1000
content = result["content"][0]["text"]
self._store_cached(
user_query, content, category, self.sonnet_model_id, 1.0
)
self._record_metric("sonnet", elapsed, True)
return {
"content": content,
"tier": "sonnet",
"confidence": 1.0,
"is_degraded": False,
"staleness_warning": None,
"latency_ms": round(elapsed, 1),
"category": category.value,
}
except Exception as e:
logger.warning(f"Sonnet failed: {e}")
self._record_metric("sonnet", 0, False)
# Tier 2: Claude 3 Haiku
try:
start = time.monotonic()
result = self.backoff_client.invoke_model(
model_id=self.haiku_model_id, body=body
)
elapsed = (time.monotonic() - start) * 1000
content = result["content"][0]["text"]
self._store_cached(
user_query, content, category, self.haiku_model_id, 0.7
)
self._record_metric("haiku", elapsed, True)
return {
"content": content,
"tier": "haiku",
"confidence": 0.7,
"is_degraded": True,
"staleness_warning": None,
"latency_ms": round(elapsed, 1),
"category": category.value,
}
except Exception as e:
logger.warning(f"Haiku failed: {e}")
self._record_metric("haiku", 0, False)
# Tier 3: Cache with staleness detection
start = time.monotonic()
cached = self._get_cached(user_query)
elapsed = (time.monotonic() - start) * 1000
if cached:
staleness_warning = None
confidence = 0.5
if cached.is_stale:
staleness_warning = self.STALENESS_WARNING_TEMPLATE.format(
category=category.value
)
confidence = 0.3
self.tier_metrics["cached"].stale_serves += 1
logger.warning(
f"Serving stale cache: age={cached.age_seconds:.0f}s "
f"ttl={cached.ttl}s category={category.value}"
)
elif cached.staleness_factor > 0.8:
staleness_warning = self.STALENESS_WARNING_TEMPLATE.format(
category=category.value
)
confidence = 0.4
self._record_metric("cached", elapsed, True)
return {
"content": cached.response,
"tier": "cached",
"confidence": confidence,
"is_degraded": True,
"staleness_warning": staleness_warning,
"latency_ms": round(elapsed, 1),
"category": category.value,
"cache_age_seconds": round(cached.age_seconds, 0),
}
# Tier 4: Static FAQ
from typing import TYPE_CHECKING
faq_answer = self._match_faq(user_query)
if faq_answer:
self._record_metric("static", 0, True)
return {
"content": faq_answer,
"tier": "static",
"confidence": 0.3,
"is_degraded": True,
"staleness_warning": None,
"latency_ms": 0,
"category": category.value,
}
# Tier 5: Graceful degradation
self._record_metric("graceful", 0, True)
return {
"content": (
"I'm having a brief moment of difficulty, but I'm still here! "
"You can browse our manga catalog directly at mangaassist.jp/catalog "
"or try asking me again in just a moment. For urgent order issues, "
"our support team is available at support@mangaassist.jp."
),
"tier": "graceful",
"confidence": 0.1,
"is_degraded": True,
"staleness_warning": None,
"latency_ms": 0,
"category": category.value,
}
def _match_faq(self, query: str) -> Optional[str]:
"""Simple FAQ matcher for static responses."""
faqs = {
"shipping": (
"Standard shipping takes 3-5 business days within Japan. "
"International shipping is 7-14 business days."
),
"return": (
"Unopened manga can be returned within 30 days for a full refund. "
"Damaged items can be exchanged."
),
"payment": (
"We accept Visa, Mastercard, JCB, PayPay, Line Pay, "
"and convenience store payment."
),
}
query_lower = query.lower()
for key, answer in faqs.items():
if key in query_lower:
return answer
return None
def _record_metric(self, tier: str, latency_ms: float, success: bool):
"""Record tier usage metrics."""
metrics = self.tier_metrics[tier]
metrics.invocations += 1
if success:
metrics.successes += 1
else:
metrics.failures += 1
metrics.total_latency_ms += latency_ms
def warm_cache(self, common_queries: List[str]):
"""
Pre-populate cache with responses for common queries.
Run this during low-traffic hours (2 AM - 6 AM JST) to ensure
cache hits during the next day's peak hours.
"""
logger.info(f"Starting cache warm-up for {len(common_queries)} queries")
warmed = 0
for query in common_queries:
cached = self._get_cached(query)
if cached and not cached.is_stale:
continue # Already cached and fresh
try:
category = ContentClassifier.classify(query)
body = self._build_body(query, [])
result = self.backoff_client.invoke_model(
model_id=self.sonnet_model_id, body=body
)
content = result["content"][0]["text"]
self._store_cached(
query, content, category, self.sonnet_model_id, 1.0
)
warmed += 1
time.sleep(0.5) # Rate limit warm-up requests
except Exception as e:
logger.warning(f"Cache warm-up failed for '{query[:50]}': {e}")
logger.info(f"Cache warm-up complete: {warmed}/{len(common_queries)} warmed")
def get_tier_report(self) -> Dict[str, Any]:
"""Generate a report of fallback tier usage for dashboard display."""
report = {}
total = sum(m.invocations for m in self.tier_metrics.values())
for tier_name, metrics in self.tier_metrics.items():
pct = (metrics.invocations / max(1, total)) * 100
report[tier_name] = {
"invocations": metrics.invocations,
"percentage": round(pct, 1),
"success_rate": round(metrics.success_rate * 100, 1),
"avg_latency_ms": round(metrics.avg_latency_ms, 1),
"stale_serves": metrics.stale_serves,
}
report["total_requests"] = total
report["degradation_rate"] = round(
(1 - self.tier_metrics["sonnet"].invocations / max(1, total)) * 100, 1
)
return report
3. X-Ray Service Map and FM Call Annotations
Service Map Configuration
The X-Ray service map for MangaAssist visualizes the full request flow and highlights bottlenecks. The service map is automatically generated from trace data — no manual configuration needed beyond enabling X-Ray on each service.
flowchart LR
subgraph ServiceMap["X-Ray Service Map — MangaAssist"]
Client["Client<br/>1M msgs/day<br/>avg 2.3s E2E"]
APIGW["API Gateway<br/>WebSocket<br/>avg 15ms<br/>0.01% error"]
ECS["ECS Fargate<br/>Orchestrator<br/>avg 2.1s<br/>0.5% error"]
Bedrock["Bedrock Runtime<br/>Claude 3 Sonnet<br/>avg 2.0s<br/>0.3% throttle"]
OpenSearch["OpenSearch<br/>Serverless<br/>avg 45ms<br/>0.01% error"]
Redis["ElastiCache<br/>Redis<br/>avg 3ms<br/>0.001% error"]
DDB["DynamoDB<br/>Sessions<br/>avg 8ms<br/>0.001% error"]
end
Client -->|"100%"| APIGW
APIGW -->|"100%"| ECS
ECS -->|"85%"| Bedrock
ECS -->|"90%"| OpenSearch
ECS -->|"100%"| Redis
ECS -->|"100%"| DDB
style Bedrock fill:#f3e5f5,stroke:#6a1b9a,stroke-width:3px
style ECS fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px
Subsegment Annotations for FM Calls
X-Ray annotations are indexed and searchable, making them essential for filtering traces. For MangaAssist FM calls, we annotate:
Searchable Annotations (indexed):
├── model_id: "anthropic.claude-3-sonnet-20240229-v1:0"
├── fallback_tier: "sonnet" | "haiku" | "cached" | "static" | "graceful"
├── user_id: "user_abc123" (anonymized)
├── cache_hit: true | false
├── error_code: "ThrottlingException" | "ServiceUnavailable" | null
├── is_throttled: true | false
├── content_category: "pricing" | "recommendation" | "faq" | ...
└── is_degraded: true | false
Non-Indexed Metadata (stored but not searchable):
├── input_tokens: 450
├── output_tokens: 380
├── estimated_cost_usd: 0.007050
├── conversation_length: 5
├── cache_key: "manga:cache:abc123"
├── retry_count: 2
└── total_latency_ms: 2150
X-Ray Filter Expressions for MangaAssist
# Find all throttled Bedrock calls
annotation.is_throttled = true AND annotation.model_id BEGINSWITH "anthropic"
# Find all degraded responses (served from fallback tiers)
annotation.is_degraded = true AND annotation.fallback_tier != "sonnet"
# Find slow FM calls (over 3 second SLA)
responsetime > 3 AND service("MangaAssist")
# Find all errors in the Bedrock segment
fault = true AND service("Bedrock Runtime")
# Find all pricing queries that hit cache
annotation.content_category = "pricing" AND annotation.cache_hit = true
# Trace a specific user's requests
annotation.user_id = "user_abc123"
X-Ray Groups for MangaAssist
Group: MangaAssist-Errors
Filter: fault = true OR error = true
Insight: Enabled
Notification: SNS → On-Call
Group: MangaAssist-Slow
Filter: responsetime > 3
Insight: Enabled
Notification: SNS → Slack Channel
Group: MangaAssist-Degraded
Filter: annotation.is_degraded = true
Insight: Enabled
Notification: CloudWatch Alarm
Group: MangaAssist-Throttled
Filter: annotation.is_throttled = true
Insight: Enabled
Notification: CloudWatch Alarm + Auto-Scale
4. CloudWatch Dashboards for FM Health
Dashboard Layout
┌─────────────────────────────────────────────────────────────────────┐
│ MangaAssist FM Health Dashboard │
├──────────────────────────────┬──────────────────────────────────────┤
│ FM Invocation Rate │ Response Latency (p50/p95/p99) │
│ ████████████ 11.6 rps │ p50: 1.8s p95: 2.8s p99: 4.2s │
│ [Line chart — 1hr window] │ [Line chart — 1hr window] │
├──────────────────────────────┼──────────────────────────────────────┤
│ Error Rate by Type │ Fallback Tier Distribution │
│ Throttle: 0.3% │ Sonnet: 85% Haiku: 10% │
│ Timeout: 0.1% │ Cache: 4% Static: 0.8% │
│ Other: 0.05% │ Graceful: 0.2% │
│ [Stacked area chart] │ [Pie chart] │
├──────────────────────────────┼──────────────────────────────────────┤
│ Token Usage (Input/Output) │ Estimated Cost (Hourly) │
│ Input: 520K tokens/hr │ Sonnet: $1.56 + $7.80 = $9.36/hr │
│ Output: 420K tokens/hr │ Haiku: $0.015 + $0.06 = $0.075/hr│
│ [Area chart — 6hr window] │ Total: $9.44/hr (~$226/day) │
├──────────────────────────────┼──────────────────────────────────────┤
│ Circuit Breaker Status │ Cache Hit Rate │
│ State: CLOSED (healthy) │ Overall: 34% │
│ Failures: 0/5 threshold │ Pricing: 45% Recs: 28% FAQ: 62% │
│ [Status indicator] │ [Bar chart by category] │
├──────────────────────────────┼──────────────────────────────────────┤
│ Rate Limit Utilization │ Active Connections │
│ Global: 58% of 2000 rps │ WebSocket: 12,450 active │
│ Per-user avg: 0.8 rps │ Peak today: 18,200 │
│ [Gauge chart] │ [Line chart — 24hr window] │
└──────────────────────────────┴──────────────────────────────────────┘
Code: FMHealthDashboard
"""
FMHealthDashboard — CloudWatch dashboard configuration for MangaAssist FM health.
Creates and manages a comprehensive monitoring dashboard covering:
- FM invocation rates and latency percentiles
- Error rates by type (throttle, timeout, model error)
- Fallback tier distribution
- Token usage and cost estimation
- Circuit breaker state
- Cache hit rates and staleness metrics
- Rate limit utilization
The dashboard is defined as CloudWatch Dashboard JSON and can be deployed
via CloudFormation, CDK, or the API directly.
"""
import json
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime, timezone
import boto3
logger = logging.getLogger("mangaassist.dashboard")
class FMHealthDashboard:
"""
Creates and manages the MangaAssist FM Health CloudWatch Dashboard.
Usage:
dashboard = FMHealthDashboard(region="ap-northeast-1")
dashboard.create_dashboard()
dashboard.create_alarms()
"""
NAMESPACE = "MangaAssist/FMHealth"
DASHBOARD_NAME = "MangaAssist-FM-Health"
def __init__(self, region: str = "ap-northeast-1"):
self.region = region
self._cloudwatch = boto3.client("cloudwatch", region_name=region)
def create_dashboard(self) -> Dict[str, Any]:
"""Create the comprehensive FM health dashboard."""
body = {
"widgets": [
self._header_widget(),
self._invocation_rate_widget(),
self._latency_widget(),
self._error_rate_widget(),
self._fallback_distribution_widget(),
self._token_usage_widget(),
self._cost_estimation_widget(),
self._circuit_breaker_widget(),
self._cache_hit_rate_widget(),
self._rate_limit_widget(),
self._active_connections_widget(),
self._stale_cache_widget(),
]
}
response = self._cloudwatch.put_dashboard(
DashboardName=self.DASHBOARD_NAME,
DashboardBody=json.dumps(body),
)
logger.info(f"Dashboard '{self.DASHBOARD_NAME}' created/updated")
return response
def _header_widget(self) -> Dict:
"""Dashboard header with service overview."""
return {
"type": "text",
"x": 0, "y": 0, "width": 24, "height": 2,
"properties": {
"markdown": (
"# MangaAssist FM Health Dashboard\n"
"Real-time monitoring of Foundation Model calls, "
"fallback tiers, and system resilience. "
"**Target SLA: useful response in under 3 seconds.**"
),
"background": "transparent",
},
}
def _invocation_rate_widget(self) -> Dict:
"""FM invocation rate over time."""
return {
"type": "metric",
"x": 0, "y": 2, "width": 12, "height": 6,
"properties": {
"title": "FM Invocation Rate",
"metrics": [
[self.NAMESPACE, "InvocationCount", "Model", "Sonnet",
{"stat": "Sum", "period": 60, "label": "Sonnet"}],
[self.NAMESPACE, "InvocationCount", "Model", "Haiku",
{"stat": "Sum", "period": 60, "label": "Haiku"}],
[self.NAMESPACE, "InvocationCount", "Model", "Total",
{"stat": "Sum", "period": 60, "label": "Total"}],
],
"view": "timeSeries",
"stacked": False,
"region": self.region,
"period": 60,
"yAxis": {"left": {"label": "Requests/min", "min": 0}},
},
}
def _latency_widget(self) -> Dict:
"""Response latency percentiles."""
return {
"type": "metric",
"x": 12, "y": 2, "width": 12, "height": 6,
"properties": {
"title": "Response Latency (p50 / p95 / p99)",
"metrics": [
[self.NAMESPACE, "Latency", "Service", "MangaAssist",
{"stat": "p50", "period": 60, "label": "p50"}],
[self.NAMESPACE, "Latency", "Service", "MangaAssist",
{"stat": "p95", "period": 60, "label": "p95"}],
[self.NAMESPACE, "Latency", "Service", "MangaAssist",
{"stat": "p99", "period": 60, "label": "p99"}],
],
"view": "timeSeries",
"region": self.region,
"period": 60,
"annotations": {
"horizontal": [
{"label": "SLA Target (3s)", "value": 3000,
"color": "#d62728"},
]
},
"yAxis": {"left": {"label": "Milliseconds", "min": 0}},
},
}
def _error_rate_widget(self) -> Dict:
"""Error rates by type."""
return {
"type": "metric",
"x": 0, "y": 8, "width": 12, "height": 6,
"properties": {
"title": "Error Rate by Type",
"metrics": [
[self.NAMESPACE, "ErrorCount", "ErrorType", "Throttle",
{"stat": "Sum", "period": 60, "label": "Throttle"}],
[self.NAMESPACE, "ErrorCount", "ErrorType", "Timeout",
{"stat": "Sum", "period": 60, "label": "Timeout"}],
[self.NAMESPACE, "ErrorCount", "ErrorType", "ModelError",
{"stat": "Sum", "period": 60, "label": "ModelError"}],
[self.NAMESPACE, "ErrorCount", "ErrorType", "ServiceError",
{"stat": "Sum", "period": 60, "label": "ServiceError"}],
],
"view": "timeSeries",
"stacked": True,
"region": self.region,
"period": 60,
"yAxis": {"left": {"label": "Errors/min", "min": 0}},
},
}
def _fallback_distribution_widget(self) -> Dict:
"""Fallback tier distribution."""
return {
"type": "metric",
"x": 12, "y": 8, "width": 12, "height": 6,
"properties": {
"title": "Fallback Tier Distribution",
"metrics": [
[self.NAMESPACE, "TierCount", "Tier", "Sonnet",
{"stat": "Sum", "period": 300, "label": "Sonnet (Primary)"}],
[self.NAMESPACE, "TierCount", "Tier", "Haiku",
{"stat": "Sum", "period": 300, "label": "Haiku (Fallback)"}],
[self.NAMESPACE, "TierCount", "Tier", "Cached",
{"stat": "Sum", "period": 300, "label": "Cached"}],
[self.NAMESPACE, "TierCount", "Tier", "Static",
{"stat": "Sum", "period": 300, "label": "Static FAQ"}],
[self.NAMESPACE, "TierCount", "Tier", "Graceful",
{"stat": "Sum", "period": 300, "label": "Graceful Msg"}],
],
"view": "pie",
"region": self.region,
"period": 300,
},
}
def _token_usage_widget(self) -> Dict:
"""Token usage tracking."""
return {
"type": "metric",
"x": 0, "y": 14, "width": 12, "height": 6,
"properties": {
"title": "Token Usage (Input / Output)",
"metrics": [
[self.NAMESPACE, "InputTokens", "Model", "Sonnet",
{"stat": "Sum", "period": 300, "label": "Sonnet Input"}],
[self.NAMESPACE, "OutputTokens", "Model", "Sonnet",
{"stat": "Sum", "period": 300, "label": "Sonnet Output"}],
[self.NAMESPACE, "InputTokens", "Model", "Haiku",
{"stat": "Sum", "period": 300, "label": "Haiku Input"}],
[self.NAMESPACE, "OutputTokens", "Model", "Haiku",
{"stat": "Sum", "period": 300, "label": "Haiku Output"}],
],
"view": "timeSeries",
"stacked": True,
"region": self.region,
"period": 300,
"yAxis": {"left": {"label": "Tokens / 5 min"}},
},
}
def _cost_estimation_widget(self) -> Dict:
"""Estimated FM cost."""
return {
"type": "metric",
"x": 12, "y": 14, "width": 12, "height": 6,
"properties": {
"title": "Estimated FM Cost (USD/hour)",
"metrics": [
[self.NAMESPACE, "EstimatedCostUSD", "Model", "Sonnet",
{"stat": "Sum", "period": 3600, "label": "Sonnet $/hr"}],
[self.NAMESPACE, "EstimatedCostUSD", "Model", "Haiku",
{"stat": "Sum", "period": 3600, "label": "Haiku $/hr"}],
[self.NAMESPACE, "EstimatedCostUSD", "Model", "Total",
{"stat": "Sum", "period": 3600, "label": "Total $/hr"}],
],
"view": "timeSeries",
"region": self.region,
"period": 3600,
"annotations": {
"horizontal": [
{"label": "Budget Alert ($15/hr)", "value": 15,
"color": "#d62728"},
]
},
"yAxis": {"left": {"label": "USD/hour", "min": 0}},
},
}
def _circuit_breaker_widget(self) -> Dict:
"""Circuit breaker status."""
return {
"type": "metric",
"x": 0, "y": 20, "width": 8, "height": 6,
"properties": {
"title": "Circuit Breaker Status",
"metrics": [
[self.NAMESPACE, "CircuitBreakerState", "Service", "Bedrock",
{"stat": "Maximum", "period": 60, "label": "State (0=Closed, 1=Open)"}],
[self.NAMESPACE, "CircuitBreakerFailures", "Service", "Bedrock",
{"stat": "Sum", "period": 60, "label": "Failure Count"}],
],
"view": "timeSeries",
"region": self.region,
"period": 60,
"annotations": {
"horizontal": [
{"label": "Threshold (5)", "value": 5, "color": "#d62728"},
]
},
},
}
def _cache_hit_rate_widget(self) -> Dict:
"""Cache hit rate by category."""
return {
"type": "metric",
"x": 8, "y": 20, "width": 8, "height": 6,
"properties": {
"title": "Cache Hit Rate by Category",
"metrics": [
[self.NAMESPACE, "CacheHitRate", "Category", "Pricing",
{"stat": "Average", "period": 300, "label": "Pricing"}],
[self.NAMESPACE, "CacheHitRate", "Category", "Recommendation",
{"stat": "Average", "period": 300, "label": "Recommendations"}],
[self.NAMESPACE, "CacheHitRate", "Category", "FAQ",
{"stat": "Average", "period": 300, "label": "FAQ"}],
[self.NAMESPACE, "CacheHitRate", "Category", "General",
{"stat": "Average", "period": 300, "label": "General"}],
],
"view": "bar",
"region": self.region,
"period": 300,
"yAxis": {"left": {"label": "Hit Rate %", "min": 0, "max": 100}},
},
}
def _rate_limit_widget(self) -> Dict:
"""Rate limit utilization."""
return {
"type": "metric",
"x": 16, "y": 20, "width": 8, "height": 6,
"properties": {
"title": "Rate Limit Utilization",
"metrics": [
[self.NAMESPACE, "RateLimitUtilization", "Tier", "Global",
{"stat": "Average", "period": 60, "label": "Global"}],
[self.NAMESPACE, "ThrottledRequests", "Tier", "Global",
{"stat": "Sum", "period": 60, "label": "Throttled/min"}],
],
"view": "timeSeries",
"region": self.region,
"period": 60,
"annotations": {
"horizontal": [
{"label": "Warning (80%)", "value": 80, "color": "#ff9800"},
{"label": "Critical (95%)", "value": 95, "color": "#d62728"},
]
},
},
}
def _active_connections_widget(self) -> Dict:
"""Active WebSocket connections."""
return {
"type": "metric",
"x": 0, "y": 26, "width": 12, "height": 6,
"properties": {
"title": "Active WebSocket Connections",
"metrics": [
["AWS/ApiGateway", "ConnectCount", "ApiId", "PLACEHOLDER",
{"stat": "Sum", "period": 60, "label": "New Connections"}],
["AWS/ApiGateway", "MessageCount", "ApiId", "PLACEHOLDER",
{"stat": "Sum", "period": 60, "label": "Messages"}],
],
"view": "timeSeries",
"region": self.region,
"period": 60,
},
}
def _stale_cache_widget(self) -> Dict:
"""Stale cache serves."""
return {
"type": "metric",
"x": 12, "y": 26, "width": 12, "height": 6,
"properties": {
"title": "Stale Cache Serves",
"metrics": [
[self.NAMESPACE, "StaleCacheServes", "Category", "Pricing",
{"stat": "Sum", "period": 300, "label": "Pricing (15min TTL)"}],
[self.NAMESPACE, "StaleCacheServes", "Category", "Availability",
{"stat": "Sum", "period": 300, "label": "Availability (5min TTL)"}],
[self.NAMESPACE, "StaleCacheServes", "Category", "Recommendation",
{"stat": "Sum", "period": 300, "label": "Recommendation (4hr TTL)"}],
],
"view": "timeSeries",
"stacked": True,
"region": self.region,
"period": 300,
},
}
def create_alarms(self) -> List[Dict[str, Any]]:
"""
Create CloudWatch alarms for FM health monitoring.
Alarm tiers:
- CRITICAL: Immediate page (PagerDuty/SNS)
- WARNING: Slack notification
- INFO: Dashboard annotation only
"""
alarms = []
# CRITICAL: High error rate (>5% over 5 minutes)
alarms.append(self._create_alarm(
name="MangaAssist-FM-HighErrorRate",
description="FM error rate exceeded 5% over 5 minutes",
metric_name="ErrorRate",
threshold=5.0,
comparison="GreaterThanThreshold",
evaluation_periods=5,
period=60,
statistic="Average",
alarm_actions=["arn:aws:sns:ap-northeast-1:ACCOUNT:MangaAssist-Critical"],
))
# CRITICAL: Circuit breaker open
alarms.append(self._create_alarm(
name="MangaAssist-FM-CircuitBreakerOpen",
description="Circuit breaker opened — Bedrock may be unhealthy",
metric_name="CircuitBreakerState",
threshold=0.5,
comparison="GreaterThanThreshold",
evaluation_periods=1,
period=60,
statistic="Maximum",
alarm_actions=["arn:aws:sns:ap-northeast-1:ACCOUNT:MangaAssist-Critical"],
))
# WARNING: High degradation rate (>20% non-Sonnet)
alarms.append(self._create_alarm(
name="MangaAssist-FM-HighDegradation",
description="More than 20% of responses coming from fallback tiers",
metric_name="DegradationRate",
threshold=20.0,
comparison="GreaterThanThreshold",
evaluation_periods=3,
period=300,
statistic="Average",
alarm_actions=["arn:aws:sns:ap-northeast-1:ACCOUNT:MangaAssist-Warning"],
))
# WARNING: p99 latency above SLA (>3s)
alarms.append(self._create_alarm(
name="MangaAssist-FM-HighLatency",
description="p99 latency exceeded 3 second SLA target",
metric_name="Latency",
threshold=3000,
comparison="GreaterThanThreshold",
evaluation_periods=3,
period=60,
statistic="p99",
alarm_actions=["arn:aws:sns:ap-northeast-1:ACCOUNT:MangaAssist-Warning"],
))
# WARNING: Cost spike (>$15/hr)
alarms.append(self._create_alarm(
name="MangaAssist-FM-CostSpike",
description="Estimated FM cost exceeded $15/hour budget",
metric_name="EstimatedCostUSD",
threshold=15.0,
comparison="GreaterThanThreshold",
evaluation_periods=2,
period=3600,
statistic="Sum",
alarm_actions=["arn:aws:sns:ap-northeast-1:ACCOUNT:MangaAssist-Warning"],
))
# INFO: Rate limit utilization above 80%
alarms.append(self._create_alarm(
name="MangaAssist-FM-RateLimitHigh",
description="Rate limit utilization above 80%",
metric_name="RateLimitUtilization",
threshold=80.0,
comparison="GreaterThanThreshold",
evaluation_periods=5,
period=60,
statistic="Average",
alarm_actions=["arn:aws:sns:ap-northeast-1:ACCOUNT:MangaAssist-Info"],
))
return alarms
def _create_alarm(
self,
name: str,
description: str,
metric_name: str,
threshold: float,
comparison: str,
evaluation_periods: int,
period: int,
statistic: str,
alarm_actions: List[str],
) -> Dict[str, Any]:
"""Create a single CloudWatch alarm."""
response = self._cloudwatch.put_metric_alarm(
AlarmName=name,
AlarmDescription=description,
Namespace=self.NAMESPACE,
MetricName=metric_name,
Dimensions=[
{"Name": "Service", "Value": "MangaAssist"},
],
Statistic=statistic if statistic in [
"SampleCount", "Average", "Sum", "Minimum", "Maximum"
] else "Average",
ExtendedStatistic=statistic if statistic.startswith("p") else None,
Period=period,
EvaluationPeriods=evaluation_periods,
Threshold=threshold,
ComparisonOperator=comparison,
AlarmActions=alarm_actions,
OKActions=alarm_actions,
TreatMissingData="notBreaching",
Tags=[
{"Key": "Service", "Value": "MangaAssist"},
{"Key": "Environment", "Value": "production"},
],
)
logger.info(f"Created alarm: {name} (threshold={threshold})")
return {"name": name, "threshold": threshold, "status": "created"}
def publish_fm_metrics(
self,
model: str,
latency_ms: float,
input_tokens: int,
output_tokens: int,
fallback_tier: str,
is_error: bool = False,
error_type: Optional[str] = None,
cache_hit: bool = False,
):
"""
Publish a batch of FM metrics from a single request.
Called after every FM invocation (or fallback) to maintain
real-time dashboard accuracy.
"""
now = datetime.now(timezone.utc)
metrics = [
{
"MetricName": "InvocationCount",
"Dimensions": [{"Name": "Model", "Value": model}],
"Timestamp": now,
"Value": 1,
"Unit": "Count",
},
{
"MetricName": "Latency",
"Dimensions": [{"Name": "Service", "Value": "MangaAssist"}],
"Timestamp": now,
"Value": latency_ms,
"Unit": "Milliseconds",
},
{
"MetricName": "InputTokens",
"Dimensions": [{"Name": "Model", "Value": model}],
"Timestamp": now,
"Value": input_tokens,
"Unit": "Count",
},
{
"MetricName": "OutputTokens",
"Dimensions": [{"Name": "Model", "Value": model}],
"Timestamp": now,
"Value": output_tokens,
"Unit": "Count",
},
{
"MetricName": "TierCount",
"Dimensions": [{"Name": "Tier", "Value": fallback_tier}],
"Timestamp": now,
"Value": 1,
"Unit": "Count",
},
]
if is_error and error_type:
metrics.append({
"MetricName": "ErrorCount",
"Dimensions": [{"Name": "ErrorType", "Value": error_type}],
"Timestamp": now,
"Value": 1,
"Unit": "Count",
})
# Estimate cost
pricing = {
"Sonnet": {"input": 3.0, "output": 15.0},
"Haiku": {"input": 0.25, "output": 1.25},
}
model_pricing = pricing.get(model, pricing["Sonnet"])
cost = (
(input_tokens / 1_000_000) * model_pricing["input"]
+ (output_tokens / 1_000_000) * model_pricing["output"]
)
metrics.append({
"MetricName": "EstimatedCostUSD",
"Dimensions": [{"Name": "Model", "Value": model}],
"Timestamp": now,
"Value": cost,
"Unit": "None",
})
try:
self._cloudwatch.put_metric_data(
Namespace=self.NAMESPACE,
MetricData=metrics,
)
except Exception as e:
logger.warning(f"Failed to publish FM metrics: {e}")
Summary: Observability Stack Integration
┌──────────────────────────────────────────────────────────────┐
│ MangaAssist Observability │
├──────────────┬──────────────┬──────────────┬─────────────────┤
│ X-Ray │ CloudWatch │ CloudWatch │ SNS / PagerDuty│
│ Tracing │ Metrics │ Logs │ Alerting │
├──────────────┼──────────────┼──────────────┼─────────────────┤
│ Distributed │ FM latency │ Structured │ Critical: │
│ traces │ percentiles │ JSON logs │ Error >5% │
│ across all │ │ │ Circuit open │
│ services │ Error rates │ Correlation │ │
│ │ by type │ IDs │ Warning: │
│ Service map │ │ │ Degradation │
│ visualization│ Token usage │ Log Insights │ >20% │
│ │ │ queries │ p99 >3s │
│ Subsegment │ Fallback │ │ Cost >$15/hr │
│ annotations │ tier counts │ Request/ │ │
│ for FM calls │ │ response │ Info: │
│ │ Cost │ payloads │ Rate limit │
│ Filter │ estimation │ │ >80% │
│ expressions │ │ Error stack │ │
│ │ Cache hit │ traces │ Auto-remediate: │
│ Groups & │ rates │ │ Scale ECS │
│ insights │ │ │ Adjust throttle│
└──────────────┴──────────────┴──────────────┴─────────────────┘
Key Takeaways
| Component | Purpose | MangaAssist Config |
|---|---|---|
| Usage Plans | Server-side throttle per tier | Free: 2 rps, Registered: 5 rps, Premium: 15 rps |
| Tiered Fallback | Content-aware degradation | Sonnet -> Haiku -> Cache (category-aware TTL) -> FAQ -> Graceful |
| X-Ray Tracing | Distributed request flow | 5% sampling, 100% error capture, FM annotations |
| CloudWatch Dashboard | Real-time FM health | 12 widgets: latency, errors, tiers, cost, circuit breaker |
| Alarms | Proactive incident response | 6 alarms across critical, warning, info tiers |
| Staleness Detection | Cache quality assurance | Pricing: 15min TTL, Availability: 5min, Recommendations: 4hr |