LOCAL PREVIEW View on GitHub

Rate Limiting, Fallback Mechanisms, and FM Observability

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Attribute Value
Certification AWS Certified AI Practitioner (AIP-C01)
Domain 2 — Implementation and Integration of Foundation Models
Task 2.4 — Design resilient and scalable FM-based applications
Skill 2.4.3 — Create resilient FM systems to ensure reliable operations
Focus Areas API Gateway usage plans, multi-tier fallback, X-Ray service maps, CloudWatch dashboards

1. API Gateway Usage Plans and Throttle Settings

Usage Plan Architecture for MangaAssist

API Gateway usage plans control who can access the MangaAssist API and at what rate. For a WebSocket API serving 1M messages/day, the usage plan architecture must balance three competing concerns:

  1. Protection — Prevent any single user or integration from overwhelming Bedrock
  2. Fairness — Ensure all manga readers get responsive service during peak hours
  3. Cost control — Prevent runaway FM costs from misbehaving clients

Usage Plan Tiers

┌─────────────────────────────────────────────────────────────┐
│                   MangaAssist Usage Plans                    │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Free Tier (Anonymous Browsers)                             │
│  ├── Rate:  2 req/s                                         │
│  ├── Burst: 5                                               │
│  ├── Quota: 100 messages/day                                │
│  └── Model: Haiku only (cost-effective)                     │
│                                                             │
│  Registered Tier (Logged-in Users)                          │
│  ├── Rate:  5 req/s                                         │
│  ├── Burst: 15                                              │
│  ├── Quota: 1,000 messages/day                              │
│  └── Model: Sonnet + Haiku fallback                         │
│                                                             │
│  Premium Tier (Paid Subscribers)                            │
│  ├── Rate:  15 req/s                                        │
│  ├── Burst: 40                                              │
│  ├── Quota: 10,000 messages/day                             │
│  └── Model: Sonnet priority + extended context              │
│                                                             │
│  Internal Tier (Admin / Analytics)                          │
│  ├── Rate:  50 req/s                                        │
│  ├── Burst: 100                                             │
│  ├── Quota: Unlimited                                       │
│  └── Model: Sonnet with full tool access                    │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Throttle Behavior Under Load

Scenario: Evening surge — 8 PM JST, manga readers checking new weekly releases

Normal traffic:  ~800 req/s
Surge traffic:   ~3,500 req/s (weekly Shonen Jump release day)

API Gateway response:
  Requests 1-2000:    → 200 OK (within rate limit)
  Requests 2001-4000: → 200 OK (burst capacity absorbs)
  Requests 4001+:     → 429 Too Many Requests

Response headers for throttled requests:
  HTTP/1.1 429 Too Many Requests
  Retry-After: 1
  X-RateLimit-Limit: 2000
  X-RateLimit-Remaining: 0
  X-RateLimit-Reset: 1711900800

WAF Integration for DDoS Protection

WAF Rule Groups for MangaAssist:
  ┌─────────────────────────────────────────┐
  │ Rule 1: IP Rate Limiting                │
  │   Action: Block                         │
  │   Threshold: 100 requests/5 min per IP  │
  │   Scope: All routes                     │
  ├─────────────────────────────────────────┤
  │ Rule 2: Geographic Restriction          │
  │   Action: Allow                         │
  │   Regions: JP, US, EU (primary markets) │
  │   Block: Known bot regions              │
  ├─────────────────────────────────────────┤
  │ Rule 3: Request Size Limit              │
  │   Action: Block                         │
  │   Max body: 8 KB (manga queries)        │
  │   Max URL: 2 KB                         │
  ├─────────────────────────────────────────┤
  │ Rule 4: SQL/XSS Protection             │
  │   Action: Block                         │
  │   AWS Managed Rules: SQLi + XSS         │
  └─────────────────────────────────────────┘

Code: UsagePlanManager

"""
UsagePlanManager — Manages API Gateway usage plans for MangaAssist.

Handles creation, update, and monitoring of usage plans across tiers
(free, registered, premium, internal). Integrates with CloudWatch for
throttle monitoring and automatic tier adjustment.

Architecture:
  API Gateway Usage Plan → API Key → Stage + Route Throttle → CloudWatch Metrics

Each tier maps to different Bedrock model access:
  - Free: Haiku only (cost-effective for anonymous browsing)
  - Registered: Sonnet primary, Haiku fallback
  - Premium: Sonnet priority with extended context window
  - Internal: Full Sonnet access with all tools enabled
"""

import logging
import json
from dataclasses import dataclass, field
from typing import Optional, Dict, Any, List
from datetime import datetime, timezone
from enum import Enum

import boto3

logger = logging.getLogger("mangaassist.usageplan")


class UsageTier(Enum):
    """Usage tiers for MangaAssist API access."""
    FREE = "free"
    REGISTERED = "registered"
    PREMIUM = "premium"
    INTERNAL = "internal"


@dataclass
class TierConfig:
    """Configuration for a usage plan tier."""
    tier: UsageTier
    rate_limit: float           # Steady-state requests per second
    burst_limit: int            # Maximum burst capacity
    daily_quota: int            # Maximum requests per day
    monthly_quota: int          # Maximum requests per month
    allowed_models: List[str]   # Which Bedrock models this tier can access
    max_context_tokens: int     # Maximum context window (input tokens)
    priority: int               # Priority for resource allocation (1=highest)
    description: str = ""

    @property
    def quota_per_second_avg(self) -> float:
        """Average allowed requests per second based on daily quota."""
        return self.daily_quota / 86400


# Default tier configurations for MangaAssist
DEFAULT_TIERS: Dict[UsageTier, TierConfig] = {
    UsageTier.FREE: TierConfig(
        tier=UsageTier.FREE,
        rate_limit=2.0,
        burst_limit=5,
        daily_quota=100,
        monthly_quota=2000,
        allowed_models=["anthropic.claude-3-haiku-20240307-v1:0"],
        max_context_tokens=2048,
        priority=4,
        description="Free tier for anonymous manga store browsers",
    ),
    UsageTier.REGISTERED: TierConfig(
        tier=UsageTier.REGISTERED,
        rate_limit=5.0,
        burst_limit=15,
        daily_quota=1000,
        monthly_quota=25000,
        allowed_models=[
            "anthropic.claude-3-sonnet-20240229-v1:0",
            "anthropic.claude-3-haiku-20240307-v1:0",
        ],
        max_context_tokens=4096,
        priority=3,
        description="Registered user tier with Sonnet access",
    ),
    UsageTier.PREMIUM: TierConfig(
        tier=UsageTier.PREMIUM,
        rate_limit=15.0,
        burst_limit=40,
        daily_quota=10000,
        monthly_quota=250000,
        allowed_models=[
            "anthropic.claude-3-sonnet-20240229-v1:0",
            "anthropic.claude-3-haiku-20240307-v1:0",
        ],
        max_context_tokens=8192,
        priority=2,
        description="Premium subscriber tier with priority Sonnet access",
    ),
    UsageTier.INTERNAL: TierConfig(
        tier=UsageTier.INTERNAL,
        rate_limit=50.0,
        burst_limit=100,
        daily_quota=1000000,
        monthly_quota=30000000,
        allowed_models=[
            "anthropic.claude-3-sonnet-20240229-v1:0",
            "anthropic.claude-3-haiku-20240307-v1:0",
        ],
        max_context_tokens=16384,
        priority=1,
        description="Internal tier for admin and analytics",
    ),
}


@dataclass
class UsagePlanResult:
    """Result of a usage plan operation."""
    plan_id: str
    tier: UsageTier
    api_key_id: Optional[str] = None
    api_key_value: Optional[str] = None
    status: str = "created"


class UsagePlanManager:
    """
    Manages the full lifecycle of API Gateway usage plans for MangaAssist.

    Responsibilities:
      1. Create and configure usage plans for each tier
      2. Generate and assign API keys to plans
      3. Monitor usage and throttle metrics
      4. Auto-adjust throttle settings based on demand
      5. Publish usage metrics to CloudWatch

    Usage:
        manager = UsagePlanManager(api_id="abc123", stage="prod")

        # Create all tier plans
        plans = manager.create_all_usage_plans()

        # Assign a user to a tier
        key = manager.create_api_key(
            user_id="user_123",
            tier=UsageTier.REGISTERED,
        )

        # Check usage
        usage = manager.get_usage_stats(
            plan_id=plans[UsageTier.REGISTERED].plan_id,
        )
    """

    def __init__(
        self,
        api_id: str,
        stage: str = "prod",
        region: str = "ap-northeast-1",
        tiers: Optional[Dict[UsageTier, TierConfig]] = None,
    ):
        self.api_id = api_id
        self.stage = stage
        self.region = region
        self.tiers = tiers or DEFAULT_TIERS

        self._apigw = boto3.client("apigateway", region_name=region)
        self._cloudwatch = boto3.client("cloudwatch", region_name=region)

        self._plan_cache: Dict[UsageTier, str] = {}

    def create_usage_plan(self, tier: UsageTier) -> UsagePlanResult:
        """
        Create an API Gateway usage plan for the specified tier.

        Args:
            tier: The usage tier to create a plan for.

        Returns:
            UsagePlanResult with the plan ID and configuration.
        """
        config = self.tiers[tier]

        plan = self._apigw.create_usage_plan(
            name=f"MangaAssist-{tier.value.title()}",
            description=config.description,
            apiStages=[
                {
                    "apiId": self.api_id,
                    "stage": self.stage,
                    "throttle": {
                        # Per-route throttle for the sendMessage route
                        "sendMessage": {
                            "burstLimit": config.burst_limit,
                            "rateLimit": config.rate_limit,
                        },
                        # Connection establishment gets lower limits
                        "$connect": {
                            "burstLimit": max(1, config.burst_limit // 5),
                            "rateLimit": max(1.0, config.rate_limit / 5),
                        },
                        # Default route uses standard limits
                        "$default": {
                            "burstLimit": config.burst_limit,
                            "rateLimit": config.rate_limit,
                        },
                    },
                }
            ],
            throttle={
                "burstLimit": config.burst_limit,
                "rateLimit": config.rate_limit,
            },
            quota={
                "limit": config.monthly_quota,
                "offset": 0,
                "period": "MONTH",
            },
            tags={
                "Service": "MangaAssist",
                "Tier": tier.value,
                "Environment": self.stage,
            },
        )

        plan_id = plan["id"]
        self._plan_cache[tier] = plan_id

        logger.info(
            f"Created usage plan [{plan_id}] for tier={tier.value} "
            f"rate={config.rate_limit} burst={config.burst_limit}"
        )

        return UsagePlanResult(plan_id=plan_id, tier=tier)

    def create_all_usage_plans(self) -> Dict[UsageTier, UsagePlanResult]:
        """Create usage plans for all defined tiers."""
        results = {}
        for tier in self.tiers:
            results[tier] = self.create_usage_plan(tier)
        return results

    def create_api_key(
        self,
        user_id: str,
        tier: UsageTier,
        customer_name: Optional[str] = None,
    ) -> UsagePlanResult:
        """
        Create an API key and associate it with a usage plan.

        Args:
            user_id: Unique user identifier.
            tier: The tier to assign the user to.
            customer_name: Optional display name.

        Returns:
            UsagePlanResult with key details.
        """
        plan_id = self._plan_cache.get(tier)
        if not plan_id:
            result = self.create_usage_plan(tier)
            plan_id = result.plan_id

        key = self._apigw.create_api_key(
            name=f"MangaAssist-{user_id}",
            description=f"API key for {customer_name or user_id} ({tier.value} tier)",
            enabled=True,
            tags={
                "UserId": user_id,
                "Tier": tier.value,
                "Service": "MangaAssist",
            },
        )

        self._apigw.create_usage_plan_key(
            usagePlanId=plan_id,
            keyId=key["id"],
            keyType="API_KEY",
        )

        logger.info(
            f"Created API key [{key['id']}] for user={user_id} tier={tier.value}"
        )

        return UsagePlanResult(
            plan_id=plan_id,
            tier=tier,
            api_key_id=key["id"],
            api_key_value=key["value"],
        )

    def get_usage_stats(
        self, plan_id: str, key_id: Optional[str] = None
    ) -> Dict[str, Any]:
        """
        Get usage statistics for a usage plan.

        Returns daily and monthly usage with remaining quota.
        """
        usage = self._apigw.get_usage(
            usagePlanId=plan_id,
            startDate=datetime.now(timezone.utc).strftime("%Y-%m-%d"),
            endDate=datetime.now(timezone.utc).strftime("%Y-%m-%d"),
        )

        return {
            "plan_id": plan_id,
            "usage_data": usage.get("items", {}),
            "position": usage.get("position"),
        }

    def update_throttle_for_load(
        self, tier: UsageTier, load_factor: float
    ) -> Dict[str, Any]:
        """
        Dynamically adjust throttle settings based on current load.

        Args:
            tier: The tier to adjust.
            load_factor: Current load as fraction of capacity (0.0-1.0+).

        If load_factor > 0.8 and this is a lower-priority tier, reduce
        its limits to protect higher-priority tiers.
        """
        config = self.tiers[tier]
        plan_id = self._plan_cache.get(tier)

        if not plan_id:
            logger.warning(f"No plan ID cached for tier {tier.value}")
            return {"error": "Plan not found"}

        adjusted_rate = config.rate_limit
        adjusted_burst = config.burst_limit

        if load_factor > 0.9 and config.priority >= 3:
            # Under extreme load, reduce low-priority tiers by 50%
            adjusted_rate = config.rate_limit * 0.5
            adjusted_burst = config.burst_limit // 2
            logger.warning(
                f"Reducing tier {tier.value} to rate={adjusted_rate} "
                f"burst={adjusted_burst} (load={load_factor:.1%})"
            )
        elif load_factor > 0.8 and config.priority >= 3:
            # Under high load, reduce low-priority tiers by 25%
            adjusted_rate = config.rate_limit * 0.75
            adjusted_burst = int(config.burst_limit * 0.75)

        update = self._apigw.update_usage_plan(
            usagePlanId=plan_id,
            patchOperations=[
                {
                    "op": "replace",
                    "path": "/throttle/rateLimit",
                    "value": str(adjusted_rate),
                },
                {
                    "op": "replace",
                    "path": "/throttle/burstLimit",
                    "value": str(adjusted_burst),
                },
            ],
        )

        self._publish_throttle_adjustment_metric(tier, load_factor, adjusted_rate)

        return {
            "tier": tier.value,
            "original_rate": config.rate_limit,
            "adjusted_rate": adjusted_rate,
            "original_burst": config.burst_limit,
            "adjusted_burst": adjusted_burst,
            "load_factor": load_factor,
        }

    def _publish_throttle_adjustment_metric(
        self, tier: UsageTier, load_factor: float, new_rate: float
    ):
        """Publish throttle adjustment events to CloudWatch."""
        try:
            self._cloudwatch.put_metric_data(
                Namespace="MangaAssist/UsagePlans",
                MetricData=[
                    {
                        "MetricName": "ThrottleAdjustment",
                        "Dimensions": [
                            {"Name": "Tier", "Value": tier.value},
                            {"Name": "Service", "Value": "MangaAssist"},
                        ],
                        "Timestamp": datetime.now(timezone.utc),
                        "Value": new_rate,
                        "Unit": "Count/Second",
                    },
                    {
                        "MetricName": "LoadFactor",
                        "Dimensions": [
                            {"Name": "Service", "Value": "MangaAssist"},
                        ],
                        "Timestamp": datetime.now(timezone.utc),
                        "Value": load_factor,
                        "Unit": "None",
                    },
                ],
            )
        except Exception as e:
            logger.warning(f"Failed to publish throttle adjustment metric: {e}")

2. Multi-Tier Fallback: Sonnet to Haiku to Cached to Static

Fallback Decision Matrix

Condition Primary Action Fallback Action User Experience
Sonnet healthy Invoke Sonnet N/A Full capability — rich recommendations, nuanced answers
Sonnet throttled Retry with backoff Fall to Haiku Slightly simpler answers, faster response
Sonnet + Haiku down N/A Serve from Redis cache Previous answer for similar query (may be stale)
All models + cache miss N/A Static FAQ match Basic scripted answer for common questions
Complete service failure N/A Graceful message Friendly acknowledgment + alternative contact

Fallback Tier Quality Characteristics

Tier 1 — Claude 3 Sonnet (Confidence: 1.0)
  ├── Understands complex manga recommendations with nuance
  ├── Can reason about series relationships and genre preferences
  ├── Handles multi-turn conversations about order issues
  ├── Provides personalized suggestions based on reading history
  └── Cost: $3/$15 per 1M tokens (input/output)

Tier 2 — Claude 3 Haiku (Confidence: 0.7)
  ├── Handles straightforward questions well
  ├── Good for order status, shipping, basic recommendations
  ├── May miss nuance in complex preference discussions
  ├── Faster response time (200-500ms vs 1-3s)
  └── Cost: $0.25/$1.25 per 1M tokens (60x cheaper input)

Tier 3 — Redis Cache (Confidence: 0.5)
  ├── Instant response (<10ms)
  ├── Only works for queries similar to previous ones
  ├── Risk of stale data (prices, availability)
  ├── No personalization — generic cached answer
  └── Cost: ~$0.0001 per lookup

Tier 4 — Static FAQ (Confidence: 0.3)
  ├── Pre-written answers for common categories
  ├── No dynamic content or personalization
  ├── Always available (in-memory)
  ├── Limited to ~20 FAQ categories
  └── Cost: $0 per lookup

Tier 5 — Graceful Degradation (Confidence: 0.1)
  ├── User-friendly "temporarily unavailable" message
  ├── Provides alternative contact methods
  ├── Directs user to browse catalog directly
  ├── Never leaves user with an error page
  └── Cost: $0

Stale Cache Detection

One of the biggest risks with cache-based fallback is serving stale data. For MangaAssist, stale data manifests as:

  • Wrong prices — Manga volume sale ended but cached response still quotes discounted price
  • Wrong availability — "In stock" for a volume that sold out
  • Wrong release dates — Pre-order dates changed but cached response has old date
Cache Staleness Mitigation:
  ┌───────────────────────────────────────────────┐
  │ Cache Entry Structure                          │
  │ ├── key: sha256(normalized_query)[:16]         │
  │ ├── response: "Full FM-generated response"     │
  │ ├── created_at: 1711900800                     │
  │ ├── ttl: 3600 (1 hour)                         │
  │ ├── contains_pricing: true                     │
  │ ├── contains_availability: true                │
  │ ├── product_ids: ["manga_001", "manga_002"]    │
  │ └── staleness_risk: "high"                     │
  ├───────────────────────────────────────────────┤
  │ Staleness Rules:                               │
  │ ├── Pricing data: TTL = 15 minutes             │
  │ ├── Availability data: TTL = 5 minutes         │
  │ ├── Recommendations: TTL = 4 hours             │
  │ ├── General info: TTL = 24 hours               │
  │ └── FAQ-type queries: TTL = 7 days             │
  └───────────────────────────────────────────────┘

Code: TieredFallbackEngine

"""
TieredFallbackEngine — Advanced multi-tier fallback with cache-awareness,
staleness detection, and quality tracking for MangaAssist.

Extends the basic FallbackOrchestrator with:
  - Staleness detection for cached responses (pricing, availability)
  - Quality score tracking per tier for adaptive routing
  - Warm-up strategy to pre-populate cache during low-traffic hours
  - Metrics publishing for fallback tier usage patterns
  - User notification when serving degraded responses
"""

import time
import json
import logging
import hashlib
import re
from dataclasses import dataclass, field
from typing import Optional, Dict, Any, List, Tuple
from datetime import datetime, timezone
from enum import Enum

import boto3
import redis

logger = logging.getLogger("mangaassist.tiered_fallback")


class ContentCategory(Enum):
    """Content categories with different staleness tolerances."""
    PRICING = "pricing"
    AVAILABILITY = "availability"
    RECOMMENDATION = "recommendation"
    ORDER_STATUS = "order_status"
    GENERAL_INFO = "general_info"
    FAQ = "faq"


# TTL settings per content category (in seconds)
CATEGORY_TTL: Dict[ContentCategory, int] = {
    ContentCategory.PRICING: 900,           # 15 minutes
    ContentCategory.AVAILABILITY: 300,      # 5 minutes
    ContentCategory.RECOMMENDATION: 14400,  # 4 hours
    ContentCategory.ORDER_STATUS: 60,       # 1 minute (near real-time)
    ContentCategory.GENERAL_INFO: 86400,    # 24 hours
    ContentCategory.FAQ: 604800,            # 7 days
}


@dataclass
class CacheEntry:
    """
    Rich cache entry with staleness metadata.

    Attributes:
        response: The cached response text.
        created_at: Unix timestamp when the response was generated.
        category: Content category for TTL calculation.
        product_ids: Product IDs referenced in the response.
        model_id: Which model generated this response.
        confidence: Original confidence score.
        query_hash: Hash of the original query.
    """
    response: str
    created_at: float
    category: ContentCategory
    product_ids: List[str] = field(default_factory=list)
    model_id: str = ""
    confidence: float = 1.0
    query_hash: str = ""

    @property
    def age_seconds(self) -> float:
        return time.time() - self.created_at

    @property
    def ttl(self) -> int:
        return CATEGORY_TTL.get(self.category, 3600)

    @property
    def is_stale(self) -> bool:
        return self.age_seconds > self.ttl

    @property
    def staleness_factor(self) -> float:
        """0.0 = fresh, 1.0 = at TTL, >1.0 = past TTL."""
        return self.age_seconds / self.ttl

    def to_json(self) -> str:
        return json.dumps({
            "response": self.response,
            "created_at": self.created_at,
            "category": self.category.value,
            "product_ids": self.product_ids,
            "model_id": self.model_id,
            "confidence": self.confidence,
            "query_hash": self.query_hash,
        })

    @classmethod
    def from_json(cls, data: str) -> "CacheEntry":
        parsed = json.loads(data)
        return cls(
            response=parsed["response"],
            created_at=parsed["created_at"],
            category=ContentCategory(parsed["category"]),
            product_ids=parsed.get("product_ids", []),
            model_id=parsed.get("model_id", ""),
            confidence=parsed.get("confidence", 1.0),
            query_hash=parsed.get("query_hash", ""),
        )


class ContentClassifier:
    """
    Classifies user queries into content categories for cache TTL decisions.

    Uses keyword matching for speed (runs on every request).
    For production, could be replaced with a lightweight classifier model.
    """

    CATEGORY_KEYWORDS: Dict[ContentCategory, List[str]] = {
        ContentCategory.PRICING: [
            "price", "cost", "how much", "discount", "sale", "coupon",
            "expensive", "cheap", "deal", "offer", "yen", "dollar",
        ],
        ContentCategory.AVAILABILITY: [
            "in stock", "available", "out of stock", "sold out",
            "restock", "back in stock", "inventory", "pre-order",
        ],
        ContentCategory.ORDER_STATUS: [
            "order", "tracking", "shipped", "delivery", "package",
            "arrive", "estimated", "where is my",
        ],
        ContentCategory.RECOMMENDATION: [
            "recommend", "suggest", "similar", "like", "best",
            "popular", "trending", "new release", "what should",
        ],
        ContentCategory.FAQ: [
            "return policy", "shipping policy", "payment method",
            "hours", "contact", "support", "membership", "account",
        ],
    }

    @classmethod
    def classify(cls, query: str) -> ContentCategory:
        """Classify a query into a content category."""
        query_lower = query.lower()
        best_category = ContentCategory.GENERAL_INFO
        best_score = 0

        for category, keywords in cls.CATEGORY_KEYWORDS.items():
            score = sum(1 for kw in keywords if kw in query_lower)
            if score > best_score:
                best_score = score
                best_category = category

        return best_category

    @classmethod
    def extract_product_ids(cls, text: str) -> List[str]:
        """Extract product IDs from response text for cache invalidation."""
        patterns = [
            r"ISBN[:\s-]*([\d-]{10,17})",
            r"MANGA[_-](\d{3,8})",
            r"product[_/]id[:\s]*(\w{6,12})",
        ]
        ids = []
        for pattern in patterns:
            ids.extend(re.findall(pattern, text, re.IGNORECASE))
        return list(set(ids))


@dataclass
class FallbackTierMetrics:
    """Per-tier metrics for quality tracking."""
    invocations: int = 0
    successes: int = 0
    failures: int = 0
    total_latency_ms: float = 0.0
    stale_serves: int = 0
    user_satisfaction_signals: List[float] = field(default_factory=list)

    @property
    def success_rate(self) -> float:
        return self.successes / max(1, self.invocations)

    @property
    def avg_latency_ms(self) -> float:
        return self.total_latency_ms / max(1, self.invocations)


class TieredFallbackEngine:
    """
    Advanced fallback engine with content-aware caching and staleness detection.

    Improvements over basic FallbackOrchestrator:
      1. Content-aware TTLs (pricing cached 15 min, FAQs cached 7 days)
      2. Staleness warnings when serving cached responses near TTL expiry
      3. Quality tracking per tier for adaptive routing decisions
      4. Cache warm-up during low-traffic periods
      5. Degradation notifications appended to cached/static responses

    Usage:
        engine = TieredFallbackEngine(
            backoff_client=client,
            redis_client=redis.Redis(...),
        )

        response = engine.get_response(
            user_query="How much is volume 1 of One Piece?",
            user_id="user_123",
        )
        if response.is_degraded:
            # Optionally show degradation indicator in UI
            pass
    """

    STALENESS_WARNING_TEMPLATE = (
        "\n\n---\n*Note: This information may not reflect the very latest "
        "updates. For the most current {category} details, please check "
        "our catalog directly or ask me again shortly.*"
    )

    def __init__(
        self,
        backoff_client,
        redis_client: redis.Redis,
        sonnet_model_id: str = "anthropic.claude-3-sonnet-20240229-v1:0",
        haiku_model_id: str = "anthropic.claude-3-haiku-20240307-v1:0",
        region: str = "ap-northeast-1",
    ):
        self.backoff_client = backoff_client
        self.redis = redis_client
        self.sonnet_model_id = sonnet_model_id
        self.haiku_model_id = haiku_model_id

        self.tier_metrics: Dict[str, FallbackTierMetrics] = {
            "sonnet": FallbackTierMetrics(),
            "haiku": FallbackTierMetrics(),
            "cached": FallbackTierMetrics(),
            "static": FallbackTierMetrics(),
            "graceful": FallbackTierMetrics(),
        }

        self._cloudwatch = boto3.client("cloudwatch", region_name=region)
        self._classifier = ContentClassifier()

    def _query_hash(self, query: str) -> str:
        """Generate a cache key from a normalized query."""
        normalized = query.lower().strip()
        return f"manga:cache:{hashlib.sha256(normalized.encode()).hexdigest()[:16]}"

    def _get_cached(self, query: str) -> Optional[CacheEntry]:
        """Retrieve a cache entry with staleness metadata."""
        try:
            key = self._query_hash(query)
            raw = self.redis.get(key)
            if raw:
                return CacheEntry.from_json(raw.decode("utf-8"))
        except Exception as e:
            logger.warning(f"Cache read failed: {e}")
        return None

    def _store_cached(
        self,
        query: str,
        response: str,
        category: ContentCategory,
        model_id: str,
        confidence: float,
    ):
        """Store a response in cache with rich metadata."""
        try:
            key = self._query_hash(query)
            entry = CacheEntry(
                response=response,
                created_at=time.time(),
                category=category,
                product_ids=ContentClassifier.extract_product_ids(response),
                model_id=model_id,
                confidence=confidence,
                query_hash=key,
            )
            ttl = CATEGORY_TTL.get(category, 3600)
            self.redis.setex(key, ttl, entry.to_json())
        except Exception as e:
            logger.warning(f"Cache write failed: {e}")

    def _build_body(self, query: str, history: List[Dict]) -> Dict:
        """Build Bedrock request body."""
        messages = []
        for msg in history[-10:]:
            messages.append({
                "role": msg["role"],
                "content": [{"text": msg["content"]}],
            })
        messages.append({"role": "user", "content": [{"text": query}]})

        return {
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 1024,
            "system": (
                "You are MangaAssist, a helpful assistant for a Japanese manga store. "
                "Help customers find manga, answer questions about orders, and provide "
                "recommendations. Always be friendly and knowledgeable about manga."
            ),
            "messages": messages,
        }

    def get_response(
        self,
        user_query: str,
        user_id: str = "anonymous",
        conversation_history: Optional[List[Dict]] = None,
    ) -> Dict[str, Any]:
        """
        Get a response using the tiered fallback with content-aware caching.

        Returns a dict with:
          - content: The response text
          - tier: Which tier served the response
          - confidence: Quality confidence score
          - is_degraded: Whether this is a fallback response
          - staleness_warning: Optional warning about data freshness
          - latency_ms: Response time
        """
        history = conversation_history or []
        category = ContentClassifier.classify(user_query)
        body = self._build_body(user_query, history)

        # Tier 1: Claude 3 Sonnet
        try:
            start = time.monotonic()
            result = self.backoff_client.invoke_model(
                model_id=self.sonnet_model_id, body=body
            )
            elapsed = (time.monotonic() - start) * 1000
            content = result["content"][0]["text"]

            self._store_cached(
                user_query, content, category, self.sonnet_model_id, 1.0
            )
            self._record_metric("sonnet", elapsed, True)

            return {
                "content": content,
                "tier": "sonnet",
                "confidence": 1.0,
                "is_degraded": False,
                "staleness_warning": None,
                "latency_ms": round(elapsed, 1),
                "category": category.value,
            }
        except Exception as e:
            logger.warning(f"Sonnet failed: {e}")
            self._record_metric("sonnet", 0, False)

        # Tier 2: Claude 3 Haiku
        try:
            start = time.monotonic()
            result = self.backoff_client.invoke_model(
                model_id=self.haiku_model_id, body=body
            )
            elapsed = (time.monotonic() - start) * 1000
            content = result["content"][0]["text"]

            self._store_cached(
                user_query, content, category, self.haiku_model_id, 0.7
            )
            self._record_metric("haiku", elapsed, True)

            return {
                "content": content,
                "tier": "haiku",
                "confidence": 0.7,
                "is_degraded": True,
                "staleness_warning": None,
                "latency_ms": round(elapsed, 1),
                "category": category.value,
            }
        except Exception as e:
            logger.warning(f"Haiku failed: {e}")
            self._record_metric("haiku", 0, False)

        # Tier 3: Cache with staleness detection
        start = time.monotonic()
        cached = self._get_cached(user_query)
        elapsed = (time.monotonic() - start) * 1000

        if cached:
            staleness_warning = None
            confidence = 0.5

            if cached.is_stale:
                staleness_warning = self.STALENESS_WARNING_TEMPLATE.format(
                    category=category.value
                )
                confidence = 0.3
                self.tier_metrics["cached"].stale_serves += 1
                logger.warning(
                    f"Serving stale cache: age={cached.age_seconds:.0f}s "
                    f"ttl={cached.ttl}s category={category.value}"
                )
            elif cached.staleness_factor > 0.8:
                staleness_warning = self.STALENESS_WARNING_TEMPLATE.format(
                    category=category.value
                )
                confidence = 0.4

            self._record_metric("cached", elapsed, True)

            return {
                "content": cached.response,
                "tier": "cached",
                "confidence": confidence,
                "is_degraded": True,
                "staleness_warning": staleness_warning,
                "latency_ms": round(elapsed, 1),
                "category": category.value,
                "cache_age_seconds": round(cached.age_seconds, 0),
            }

        # Tier 4: Static FAQ
        from typing import TYPE_CHECKING
        faq_answer = self._match_faq(user_query)
        if faq_answer:
            self._record_metric("static", 0, True)
            return {
                "content": faq_answer,
                "tier": "static",
                "confidence": 0.3,
                "is_degraded": True,
                "staleness_warning": None,
                "latency_ms": 0,
                "category": category.value,
            }

        # Tier 5: Graceful degradation
        self._record_metric("graceful", 0, True)
        return {
            "content": (
                "I'm having a brief moment of difficulty, but I'm still here! "
                "You can browse our manga catalog directly at mangaassist.jp/catalog "
                "or try asking me again in just a moment. For urgent order issues, "
                "our support team is available at support@mangaassist.jp."
            ),
            "tier": "graceful",
            "confidence": 0.1,
            "is_degraded": True,
            "staleness_warning": None,
            "latency_ms": 0,
            "category": category.value,
        }

    def _match_faq(self, query: str) -> Optional[str]:
        """Simple FAQ matcher for static responses."""
        faqs = {
            "shipping": (
                "Standard shipping takes 3-5 business days within Japan. "
                "International shipping is 7-14 business days."
            ),
            "return": (
                "Unopened manga can be returned within 30 days for a full refund. "
                "Damaged items can be exchanged."
            ),
            "payment": (
                "We accept Visa, Mastercard, JCB, PayPay, Line Pay, "
                "and convenience store payment."
            ),
        }
        query_lower = query.lower()
        for key, answer in faqs.items():
            if key in query_lower:
                return answer
        return None

    def _record_metric(self, tier: str, latency_ms: float, success: bool):
        """Record tier usage metrics."""
        metrics = self.tier_metrics[tier]
        metrics.invocations += 1
        if success:
            metrics.successes += 1
        else:
            metrics.failures += 1
        metrics.total_latency_ms += latency_ms

    def warm_cache(self, common_queries: List[str]):
        """
        Pre-populate cache with responses for common queries.

        Run this during low-traffic hours (2 AM - 6 AM JST) to ensure
        cache hits during the next day's peak hours.
        """
        logger.info(f"Starting cache warm-up for {len(common_queries)} queries")
        warmed = 0

        for query in common_queries:
            cached = self._get_cached(query)
            if cached and not cached.is_stale:
                continue  # Already cached and fresh

            try:
                category = ContentClassifier.classify(query)
                body = self._build_body(query, [])

                result = self.backoff_client.invoke_model(
                    model_id=self.sonnet_model_id, body=body
                )
                content = result["content"][0]["text"]
                self._store_cached(
                    query, content, category, self.sonnet_model_id, 1.0
                )
                warmed += 1
                time.sleep(0.5)  # Rate limit warm-up requests

            except Exception as e:
                logger.warning(f"Cache warm-up failed for '{query[:50]}': {e}")

        logger.info(f"Cache warm-up complete: {warmed}/{len(common_queries)} warmed")

    def get_tier_report(self) -> Dict[str, Any]:
        """Generate a report of fallback tier usage for dashboard display."""
        report = {}
        total = sum(m.invocations for m in self.tier_metrics.values())

        for tier_name, metrics in self.tier_metrics.items():
            pct = (metrics.invocations / max(1, total)) * 100
            report[tier_name] = {
                "invocations": metrics.invocations,
                "percentage": round(pct, 1),
                "success_rate": round(metrics.success_rate * 100, 1),
                "avg_latency_ms": round(metrics.avg_latency_ms, 1),
                "stale_serves": metrics.stale_serves,
            }

        report["total_requests"] = total
        report["degradation_rate"] = round(
            (1 - self.tier_metrics["sonnet"].invocations / max(1, total)) * 100, 1
        )
        return report

3. X-Ray Service Map and FM Call Annotations

Service Map Configuration

The X-Ray service map for MangaAssist visualizes the full request flow and highlights bottlenecks. The service map is automatically generated from trace data — no manual configuration needed beyond enabling X-Ray on each service.

flowchart LR
    subgraph ServiceMap["X-Ray Service Map — MangaAssist"]
        Client["Client<br/>1M msgs/day<br/>avg 2.3s E2E"]
        APIGW["API Gateway<br/>WebSocket<br/>avg 15ms<br/>0.01% error"]
        ECS["ECS Fargate<br/>Orchestrator<br/>avg 2.1s<br/>0.5% error"]
        Bedrock["Bedrock Runtime<br/>Claude 3 Sonnet<br/>avg 2.0s<br/>0.3% throttle"]
        OpenSearch["OpenSearch<br/>Serverless<br/>avg 45ms<br/>0.01% error"]
        Redis["ElastiCache<br/>Redis<br/>avg 3ms<br/>0.001% error"]
        DDB["DynamoDB<br/>Sessions<br/>avg 8ms<br/>0.001% error"]
    end

    Client -->|"100%"| APIGW
    APIGW -->|"100%"| ECS
    ECS -->|"85%"| Bedrock
    ECS -->|"90%"| OpenSearch
    ECS -->|"100%"| Redis
    ECS -->|"100%"| DDB

    style Bedrock fill:#f3e5f5,stroke:#6a1b9a,stroke-width:3px
    style ECS fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px

Subsegment Annotations for FM Calls

X-Ray annotations are indexed and searchable, making them essential for filtering traces. For MangaAssist FM calls, we annotate:

Searchable Annotations (indexed):
  ├── model_id: "anthropic.claude-3-sonnet-20240229-v1:0"
  ├── fallback_tier: "sonnet" | "haiku" | "cached" | "static" | "graceful"
  ├── user_id: "user_abc123" (anonymized)
  ├── cache_hit: true | false
  ├── error_code: "ThrottlingException" | "ServiceUnavailable" | null
  ├── is_throttled: true | false
  ├── content_category: "pricing" | "recommendation" | "faq" | ...
  └── is_degraded: true | false

Non-Indexed Metadata (stored but not searchable):
  ├── input_tokens: 450
  ├── output_tokens: 380
  ├── estimated_cost_usd: 0.007050
  ├── conversation_length: 5
  ├── cache_key: "manga:cache:abc123"
  ├── retry_count: 2
  └── total_latency_ms: 2150

X-Ray Filter Expressions for MangaAssist

# Find all throttled Bedrock calls
annotation.is_throttled = true AND annotation.model_id BEGINSWITH "anthropic"

# Find all degraded responses (served from fallback tiers)
annotation.is_degraded = true AND annotation.fallback_tier != "sonnet"

# Find slow FM calls (over 3 second SLA)
responsetime > 3 AND service("MangaAssist")

# Find all errors in the Bedrock segment
fault = true AND service("Bedrock Runtime")

# Find all pricing queries that hit cache
annotation.content_category = "pricing" AND annotation.cache_hit = true

# Trace a specific user's requests
annotation.user_id = "user_abc123"

X-Ray Groups for MangaAssist

Group: MangaAssist-Errors
  Filter: fault = true OR error = true
  Insight: Enabled
  Notification: SNS → On-Call

Group: MangaAssist-Slow
  Filter: responsetime > 3
  Insight: Enabled
  Notification: SNS → Slack Channel

Group: MangaAssist-Degraded
  Filter: annotation.is_degraded = true
  Insight: Enabled
  Notification: CloudWatch Alarm

Group: MangaAssist-Throttled
  Filter: annotation.is_throttled = true
  Insight: Enabled
  Notification: CloudWatch Alarm + Auto-Scale

4. CloudWatch Dashboards for FM Health

Dashboard Layout

┌─────────────────────────────────────────────────────────────────────┐
│                    MangaAssist FM Health Dashboard                   │
├──────────────────────────────┬──────────────────────────────────────┤
│  FM Invocation Rate          │  Response Latency (p50/p95/p99)     │
│  ████████████ 11.6 rps      │  p50: 1.8s  p95: 2.8s  p99: 4.2s  │
│  [Line chart — 1hr window]  │  [Line chart — 1hr window]          │
├──────────────────────────────┼──────────────────────────────────────┤
│  Error Rate by Type          │  Fallback Tier Distribution          │
│  Throttle: 0.3%             │  Sonnet: 85%  Haiku: 10%            │
│  Timeout: 0.1%              │  Cache: 4%   Static: 0.8%           │
│  Other: 0.05%               │  Graceful: 0.2%                     │
│  [Stacked area chart]       │  [Pie chart]                        │
├──────────────────────────────┼──────────────────────────────────────┤
│  Token Usage (Input/Output)  │  Estimated Cost (Hourly)            │
│  Input: 520K tokens/hr      │  Sonnet: $1.56 + $7.80 = $9.36/hr  │
│  Output: 420K tokens/hr     │  Haiku:  $0.015 + $0.06 = $0.075/hr│
│  [Area chart — 6hr window]  │  Total: $9.44/hr (~$226/day)        │
├──────────────────────────────┼──────────────────────────────────────┤
│  Circuit Breaker Status      │  Cache Hit Rate                     │
│  State: CLOSED (healthy)    │  Overall: 34%                       │
│  Failures: 0/5 threshold    │  Pricing: 45%  Recs: 28%  FAQ: 62% │
│  [Status indicator]         │  [Bar chart by category]            │
├──────────────────────────────┼──────────────────────────────────────┤
│  Rate Limit Utilization      │  Active Connections                  │
│  Global: 58% of 2000 rps   │  WebSocket: 12,450 active           │
│  Per-user avg: 0.8 rps     │  Peak today: 18,200                 │
│  [Gauge chart]              │  [Line chart — 24hr window]         │
└──────────────────────────────┴──────────────────────────────────────┘

Code: FMHealthDashboard

"""
FMHealthDashboard — CloudWatch dashboard configuration for MangaAssist FM health.

Creates and manages a comprehensive monitoring dashboard covering:
  - FM invocation rates and latency percentiles
  - Error rates by type (throttle, timeout, model error)
  - Fallback tier distribution
  - Token usage and cost estimation
  - Circuit breaker state
  - Cache hit rates and staleness metrics
  - Rate limit utilization

The dashboard is defined as CloudWatch Dashboard JSON and can be deployed
via CloudFormation, CDK, or the API directly.
"""

import json
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime, timezone

import boto3

logger = logging.getLogger("mangaassist.dashboard")


class FMHealthDashboard:
    """
    Creates and manages the MangaAssist FM Health CloudWatch Dashboard.

    Usage:
        dashboard = FMHealthDashboard(region="ap-northeast-1")
        dashboard.create_dashboard()
        dashboard.create_alarms()
    """

    NAMESPACE = "MangaAssist/FMHealth"
    DASHBOARD_NAME = "MangaAssist-FM-Health"

    def __init__(self, region: str = "ap-northeast-1"):
        self.region = region
        self._cloudwatch = boto3.client("cloudwatch", region_name=region)

    def create_dashboard(self) -> Dict[str, Any]:
        """Create the comprehensive FM health dashboard."""
        body = {
            "widgets": [
                self._header_widget(),
                self._invocation_rate_widget(),
                self._latency_widget(),
                self._error_rate_widget(),
                self._fallback_distribution_widget(),
                self._token_usage_widget(),
                self._cost_estimation_widget(),
                self._circuit_breaker_widget(),
                self._cache_hit_rate_widget(),
                self._rate_limit_widget(),
                self._active_connections_widget(),
                self._stale_cache_widget(),
            ]
        }

        response = self._cloudwatch.put_dashboard(
            DashboardName=self.DASHBOARD_NAME,
            DashboardBody=json.dumps(body),
        )

        logger.info(f"Dashboard '{self.DASHBOARD_NAME}' created/updated")
        return response

    def _header_widget(self) -> Dict:
        """Dashboard header with service overview."""
        return {
            "type": "text",
            "x": 0, "y": 0, "width": 24, "height": 2,
            "properties": {
                "markdown": (
                    "# MangaAssist FM Health Dashboard\n"
                    "Real-time monitoring of Foundation Model calls, "
                    "fallback tiers, and system resilience. "
                    "**Target SLA: useful response in under 3 seconds.**"
                ),
                "background": "transparent",
            },
        }

    def _invocation_rate_widget(self) -> Dict:
        """FM invocation rate over time."""
        return {
            "type": "metric",
            "x": 0, "y": 2, "width": 12, "height": 6,
            "properties": {
                "title": "FM Invocation Rate",
                "metrics": [
                    [self.NAMESPACE, "InvocationCount", "Model", "Sonnet",
                     {"stat": "Sum", "period": 60, "label": "Sonnet"}],
                    [self.NAMESPACE, "InvocationCount", "Model", "Haiku",
                     {"stat": "Sum", "period": 60, "label": "Haiku"}],
                    [self.NAMESPACE, "InvocationCount", "Model", "Total",
                     {"stat": "Sum", "period": 60, "label": "Total"}],
                ],
                "view": "timeSeries",
                "stacked": False,
                "region": self.region,
                "period": 60,
                "yAxis": {"left": {"label": "Requests/min", "min": 0}},
            },
        }

    def _latency_widget(self) -> Dict:
        """Response latency percentiles."""
        return {
            "type": "metric",
            "x": 12, "y": 2, "width": 12, "height": 6,
            "properties": {
                "title": "Response Latency (p50 / p95 / p99)",
                "metrics": [
                    [self.NAMESPACE, "Latency", "Service", "MangaAssist",
                     {"stat": "p50", "period": 60, "label": "p50"}],
                    [self.NAMESPACE, "Latency", "Service", "MangaAssist",
                     {"stat": "p95", "period": 60, "label": "p95"}],
                    [self.NAMESPACE, "Latency", "Service", "MangaAssist",
                     {"stat": "p99", "period": 60, "label": "p99"}],
                ],
                "view": "timeSeries",
                "region": self.region,
                "period": 60,
                "annotations": {
                    "horizontal": [
                        {"label": "SLA Target (3s)", "value": 3000,
                         "color": "#d62728"},
                    ]
                },
                "yAxis": {"left": {"label": "Milliseconds", "min": 0}},
            },
        }

    def _error_rate_widget(self) -> Dict:
        """Error rates by type."""
        return {
            "type": "metric",
            "x": 0, "y": 8, "width": 12, "height": 6,
            "properties": {
                "title": "Error Rate by Type",
                "metrics": [
                    [self.NAMESPACE, "ErrorCount", "ErrorType", "Throttle",
                     {"stat": "Sum", "period": 60, "label": "Throttle"}],
                    [self.NAMESPACE, "ErrorCount", "ErrorType", "Timeout",
                     {"stat": "Sum", "period": 60, "label": "Timeout"}],
                    [self.NAMESPACE, "ErrorCount", "ErrorType", "ModelError",
                     {"stat": "Sum", "period": 60, "label": "ModelError"}],
                    [self.NAMESPACE, "ErrorCount", "ErrorType", "ServiceError",
                     {"stat": "Sum", "period": 60, "label": "ServiceError"}],
                ],
                "view": "timeSeries",
                "stacked": True,
                "region": self.region,
                "period": 60,
                "yAxis": {"left": {"label": "Errors/min", "min": 0}},
            },
        }

    def _fallback_distribution_widget(self) -> Dict:
        """Fallback tier distribution."""
        return {
            "type": "metric",
            "x": 12, "y": 8, "width": 12, "height": 6,
            "properties": {
                "title": "Fallback Tier Distribution",
                "metrics": [
                    [self.NAMESPACE, "TierCount", "Tier", "Sonnet",
                     {"stat": "Sum", "period": 300, "label": "Sonnet (Primary)"}],
                    [self.NAMESPACE, "TierCount", "Tier", "Haiku",
                     {"stat": "Sum", "period": 300, "label": "Haiku (Fallback)"}],
                    [self.NAMESPACE, "TierCount", "Tier", "Cached",
                     {"stat": "Sum", "period": 300, "label": "Cached"}],
                    [self.NAMESPACE, "TierCount", "Tier", "Static",
                     {"stat": "Sum", "period": 300, "label": "Static FAQ"}],
                    [self.NAMESPACE, "TierCount", "Tier", "Graceful",
                     {"stat": "Sum", "period": 300, "label": "Graceful Msg"}],
                ],
                "view": "pie",
                "region": self.region,
                "period": 300,
            },
        }

    def _token_usage_widget(self) -> Dict:
        """Token usage tracking."""
        return {
            "type": "metric",
            "x": 0, "y": 14, "width": 12, "height": 6,
            "properties": {
                "title": "Token Usage (Input / Output)",
                "metrics": [
                    [self.NAMESPACE, "InputTokens", "Model", "Sonnet",
                     {"stat": "Sum", "period": 300, "label": "Sonnet Input"}],
                    [self.NAMESPACE, "OutputTokens", "Model", "Sonnet",
                     {"stat": "Sum", "period": 300, "label": "Sonnet Output"}],
                    [self.NAMESPACE, "InputTokens", "Model", "Haiku",
                     {"stat": "Sum", "period": 300, "label": "Haiku Input"}],
                    [self.NAMESPACE, "OutputTokens", "Model", "Haiku",
                     {"stat": "Sum", "period": 300, "label": "Haiku Output"}],
                ],
                "view": "timeSeries",
                "stacked": True,
                "region": self.region,
                "period": 300,
                "yAxis": {"left": {"label": "Tokens / 5 min"}},
            },
        }

    def _cost_estimation_widget(self) -> Dict:
        """Estimated FM cost."""
        return {
            "type": "metric",
            "x": 12, "y": 14, "width": 12, "height": 6,
            "properties": {
                "title": "Estimated FM Cost (USD/hour)",
                "metrics": [
                    [self.NAMESPACE, "EstimatedCostUSD", "Model", "Sonnet",
                     {"stat": "Sum", "period": 3600, "label": "Sonnet $/hr"}],
                    [self.NAMESPACE, "EstimatedCostUSD", "Model", "Haiku",
                     {"stat": "Sum", "period": 3600, "label": "Haiku $/hr"}],
                    [self.NAMESPACE, "EstimatedCostUSD", "Model", "Total",
                     {"stat": "Sum", "period": 3600, "label": "Total $/hr"}],
                ],
                "view": "timeSeries",
                "region": self.region,
                "period": 3600,
                "annotations": {
                    "horizontal": [
                        {"label": "Budget Alert ($15/hr)", "value": 15,
                         "color": "#d62728"},
                    ]
                },
                "yAxis": {"left": {"label": "USD/hour", "min": 0}},
            },
        }

    def _circuit_breaker_widget(self) -> Dict:
        """Circuit breaker status."""
        return {
            "type": "metric",
            "x": 0, "y": 20, "width": 8, "height": 6,
            "properties": {
                "title": "Circuit Breaker Status",
                "metrics": [
                    [self.NAMESPACE, "CircuitBreakerState", "Service", "Bedrock",
                     {"stat": "Maximum", "period": 60, "label": "State (0=Closed, 1=Open)"}],
                    [self.NAMESPACE, "CircuitBreakerFailures", "Service", "Bedrock",
                     {"stat": "Sum", "period": 60, "label": "Failure Count"}],
                ],
                "view": "timeSeries",
                "region": self.region,
                "period": 60,
                "annotations": {
                    "horizontal": [
                        {"label": "Threshold (5)", "value": 5, "color": "#d62728"},
                    ]
                },
            },
        }

    def _cache_hit_rate_widget(self) -> Dict:
        """Cache hit rate by category."""
        return {
            "type": "metric",
            "x": 8, "y": 20, "width": 8, "height": 6,
            "properties": {
                "title": "Cache Hit Rate by Category",
                "metrics": [
                    [self.NAMESPACE, "CacheHitRate", "Category", "Pricing",
                     {"stat": "Average", "period": 300, "label": "Pricing"}],
                    [self.NAMESPACE, "CacheHitRate", "Category", "Recommendation",
                     {"stat": "Average", "period": 300, "label": "Recommendations"}],
                    [self.NAMESPACE, "CacheHitRate", "Category", "FAQ",
                     {"stat": "Average", "period": 300, "label": "FAQ"}],
                    [self.NAMESPACE, "CacheHitRate", "Category", "General",
                     {"stat": "Average", "period": 300, "label": "General"}],
                ],
                "view": "bar",
                "region": self.region,
                "period": 300,
                "yAxis": {"left": {"label": "Hit Rate %", "min": 0, "max": 100}},
            },
        }

    def _rate_limit_widget(self) -> Dict:
        """Rate limit utilization."""
        return {
            "type": "metric",
            "x": 16, "y": 20, "width": 8, "height": 6,
            "properties": {
                "title": "Rate Limit Utilization",
                "metrics": [
                    [self.NAMESPACE, "RateLimitUtilization", "Tier", "Global",
                     {"stat": "Average", "period": 60, "label": "Global"}],
                    [self.NAMESPACE, "ThrottledRequests", "Tier", "Global",
                     {"stat": "Sum", "period": 60, "label": "Throttled/min"}],
                ],
                "view": "timeSeries",
                "region": self.region,
                "period": 60,
                "annotations": {
                    "horizontal": [
                        {"label": "Warning (80%)", "value": 80, "color": "#ff9800"},
                        {"label": "Critical (95%)", "value": 95, "color": "#d62728"},
                    ]
                },
            },
        }

    def _active_connections_widget(self) -> Dict:
        """Active WebSocket connections."""
        return {
            "type": "metric",
            "x": 0, "y": 26, "width": 12, "height": 6,
            "properties": {
                "title": "Active WebSocket Connections",
                "metrics": [
                    ["AWS/ApiGateway", "ConnectCount", "ApiId", "PLACEHOLDER",
                     {"stat": "Sum", "period": 60, "label": "New Connections"}],
                    ["AWS/ApiGateway", "MessageCount", "ApiId", "PLACEHOLDER",
                     {"stat": "Sum", "period": 60, "label": "Messages"}],
                ],
                "view": "timeSeries",
                "region": self.region,
                "period": 60,
            },
        }

    def _stale_cache_widget(self) -> Dict:
        """Stale cache serves."""
        return {
            "type": "metric",
            "x": 12, "y": 26, "width": 12, "height": 6,
            "properties": {
                "title": "Stale Cache Serves",
                "metrics": [
                    [self.NAMESPACE, "StaleCacheServes", "Category", "Pricing",
                     {"stat": "Sum", "period": 300, "label": "Pricing (15min TTL)"}],
                    [self.NAMESPACE, "StaleCacheServes", "Category", "Availability",
                     {"stat": "Sum", "period": 300, "label": "Availability (5min TTL)"}],
                    [self.NAMESPACE, "StaleCacheServes", "Category", "Recommendation",
                     {"stat": "Sum", "period": 300, "label": "Recommendation (4hr TTL)"}],
                ],
                "view": "timeSeries",
                "stacked": True,
                "region": self.region,
                "period": 300,
            },
        }

    def create_alarms(self) -> List[Dict[str, Any]]:
        """
        Create CloudWatch alarms for FM health monitoring.

        Alarm tiers:
          - CRITICAL: Immediate page (PagerDuty/SNS)
          - WARNING: Slack notification
          - INFO: Dashboard annotation only
        """
        alarms = []

        # CRITICAL: High error rate (>5% over 5 minutes)
        alarms.append(self._create_alarm(
            name="MangaAssist-FM-HighErrorRate",
            description="FM error rate exceeded 5% over 5 minutes",
            metric_name="ErrorRate",
            threshold=5.0,
            comparison="GreaterThanThreshold",
            evaluation_periods=5,
            period=60,
            statistic="Average",
            alarm_actions=["arn:aws:sns:ap-northeast-1:ACCOUNT:MangaAssist-Critical"],
        ))

        # CRITICAL: Circuit breaker open
        alarms.append(self._create_alarm(
            name="MangaAssist-FM-CircuitBreakerOpen",
            description="Circuit breaker opened — Bedrock may be unhealthy",
            metric_name="CircuitBreakerState",
            threshold=0.5,
            comparison="GreaterThanThreshold",
            evaluation_periods=1,
            period=60,
            statistic="Maximum",
            alarm_actions=["arn:aws:sns:ap-northeast-1:ACCOUNT:MangaAssist-Critical"],
        ))

        # WARNING: High degradation rate (>20% non-Sonnet)
        alarms.append(self._create_alarm(
            name="MangaAssist-FM-HighDegradation",
            description="More than 20% of responses coming from fallback tiers",
            metric_name="DegradationRate",
            threshold=20.0,
            comparison="GreaterThanThreshold",
            evaluation_periods=3,
            period=300,
            statistic="Average",
            alarm_actions=["arn:aws:sns:ap-northeast-1:ACCOUNT:MangaAssist-Warning"],
        ))

        # WARNING: p99 latency above SLA (>3s)
        alarms.append(self._create_alarm(
            name="MangaAssist-FM-HighLatency",
            description="p99 latency exceeded 3 second SLA target",
            metric_name="Latency",
            threshold=3000,
            comparison="GreaterThanThreshold",
            evaluation_periods=3,
            period=60,
            statistic="p99",
            alarm_actions=["arn:aws:sns:ap-northeast-1:ACCOUNT:MangaAssist-Warning"],
        ))

        # WARNING: Cost spike (>$15/hr)
        alarms.append(self._create_alarm(
            name="MangaAssist-FM-CostSpike",
            description="Estimated FM cost exceeded $15/hour budget",
            metric_name="EstimatedCostUSD",
            threshold=15.0,
            comparison="GreaterThanThreshold",
            evaluation_periods=2,
            period=3600,
            statistic="Sum",
            alarm_actions=["arn:aws:sns:ap-northeast-1:ACCOUNT:MangaAssist-Warning"],
        ))

        # INFO: Rate limit utilization above 80%
        alarms.append(self._create_alarm(
            name="MangaAssist-FM-RateLimitHigh",
            description="Rate limit utilization above 80%",
            metric_name="RateLimitUtilization",
            threshold=80.0,
            comparison="GreaterThanThreshold",
            evaluation_periods=5,
            period=60,
            statistic="Average",
            alarm_actions=["arn:aws:sns:ap-northeast-1:ACCOUNT:MangaAssist-Info"],
        ))

        return alarms

    def _create_alarm(
        self,
        name: str,
        description: str,
        metric_name: str,
        threshold: float,
        comparison: str,
        evaluation_periods: int,
        period: int,
        statistic: str,
        alarm_actions: List[str],
    ) -> Dict[str, Any]:
        """Create a single CloudWatch alarm."""
        response = self._cloudwatch.put_metric_alarm(
            AlarmName=name,
            AlarmDescription=description,
            Namespace=self.NAMESPACE,
            MetricName=metric_name,
            Dimensions=[
                {"Name": "Service", "Value": "MangaAssist"},
            ],
            Statistic=statistic if statistic in [
                "SampleCount", "Average", "Sum", "Minimum", "Maximum"
            ] else "Average",
            ExtendedStatistic=statistic if statistic.startswith("p") else None,
            Period=period,
            EvaluationPeriods=evaluation_periods,
            Threshold=threshold,
            ComparisonOperator=comparison,
            AlarmActions=alarm_actions,
            OKActions=alarm_actions,
            TreatMissingData="notBreaching",
            Tags=[
                {"Key": "Service", "Value": "MangaAssist"},
                {"Key": "Environment", "Value": "production"},
            ],
        )

        logger.info(f"Created alarm: {name} (threshold={threshold})")
        return {"name": name, "threshold": threshold, "status": "created"}

    def publish_fm_metrics(
        self,
        model: str,
        latency_ms: float,
        input_tokens: int,
        output_tokens: int,
        fallback_tier: str,
        is_error: bool = False,
        error_type: Optional[str] = None,
        cache_hit: bool = False,
    ):
        """
        Publish a batch of FM metrics from a single request.

        Called after every FM invocation (or fallback) to maintain
        real-time dashboard accuracy.
        """
        now = datetime.now(timezone.utc)
        metrics = [
            {
                "MetricName": "InvocationCount",
                "Dimensions": [{"Name": "Model", "Value": model}],
                "Timestamp": now,
                "Value": 1,
                "Unit": "Count",
            },
            {
                "MetricName": "Latency",
                "Dimensions": [{"Name": "Service", "Value": "MangaAssist"}],
                "Timestamp": now,
                "Value": latency_ms,
                "Unit": "Milliseconds",
            },
            {
                "MetricName": "InputTokens",
                "Dimensions": [{"Name": "Model", "Value": model}],
                "Timestamp": now,
                "Value": input_tokens,
                "Unit": "Count",
            },
            {
                "MetricName": "OutputTokens",
                "Dimensions": [{"Name": "Model", "Value": model}],
                "Timestamp": now,
                "Value": output_tokens,
                "Unit": "Count",
            },
            {
                "MetricName": "TierCount",
                "Dimensions": [{"Name": "Tier", "Value": fallback_tier}],
                "Timestamp": now,
                "Value": 1,
                "Unit": "Count",
            },
        ]

        if is_error and error_type:
            metrics.append({
                "MetricName": "ErrorCount",
                "Dimensions": [{"Name": "ErrorType", "Value": error_type}],
                "Timestamp": now,
                "Value": 1,
                "Unit": "Count",
            })

        # Estimate cost
        pricing = {
            "Sonnet": {"input": 3.0, "output": 15.0},
            "Haiku": {"input": 0.25, "output": 1.25},
        }
        model_pricing = pricing.get(model, pricing["Sonnet"])
        cost = (
            (input_tokens / 1_000_000) * model_pricing["input"]
            + (output_tokens / 1_000_000) * model_pricing["output"]
        )
        metrics.append({
            "MetricName": "EstimatedCostUSD",
            "Dimensions": [{"Name": "Model", "Value": model}],
            "Timestamp": now,
            "Value": cost,
            "Unit": "None",
        })

        try:
            self._cloudwatch.put_metric_data(
                Namespace=self.NAMESPACE,
                MetricData=metrics,
            )
        except Exception as e:
            logger.warning(f"Failed to publish FM metrics: {e}")

Summary: Observability Stack Integration

┌──────────────────────────────────────────────────────────────┐
│                  MangaAssist Observability                    │
├──────────────┬──────────────┬──────────────┬─────────────────┤
│  X-Ray       │  CloudWatch  │  CloudWatch  │  SNS / PagerDuty│
│  Tracing     │  Metrics     │  Logs        │  Alerting       │
├──────────────┼──────────────┼──────────────┼─────────────────┤
│ Distributed  │ FM latency   │ Structured   │ Critical:       │
│ traces       │ percentiles  │ JSON logs    │  Error >5%      │
│ across all   │              │              │  Circuit open   │
│ services     │ Error rates  │ Correlation  │                 │
│              │ by type      │ IDs          │ Warning:        │
│ Service map  │              │              │  Degradation    │
│ visualization│ Token usage  │ Log Insights │  >20%           │
│              │              │ queries      │  p99 >3s        │
│ Subsegment   │ Fallback     │              │  Cost >$15/hr   │
│ annotations  │ tier counts  │ Request/     │                 │
│ for FM calls │              │ response     │ Info:           │
│              │ Cost         │ payloads     │  Rate limit     │
│ Filter       │ estimation   │              │  >80%           │
│ expressions  │              │ Error stack  │                 │
│              │ Cache hit    │ traces       │ Auto-remediate: │
│ Groups &     │ rates        │              │  Scale ECS      │
│ insights     │              │              │  Adjust throttle│
└──────────────┴──────────────┴──────────────┴─────────────────┘

Key Takeaways

Component Purpose MangaAssist Config
Usage Plans Server-side throttle per tier Free: 2 rps, Registered: 5 rps, Premium: 15 rps
Tiered Fallback Content-aware degradation Sonnet -> Haiku -> Cache (category-aware TTL) -> FAQ -> Graceful
X-Ray Tracing Distributed request flow 5% sampling, 100% error capture, FM annotations
CloudWatch Dashboard Real-time FM health 12 widgets: latency, errors, tiers, cost, circuit breaker
Alarms Proactive incident response 6 alarms across critical, warning, info tiers
Staleness Detection Cache quality assurance Pricing: 15min TTL, Availability: 5min, Recommendations: 4hr