LOCAL PREVIEW View on GitHub

Intelligent Model Routing Architecture

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Field Value
Certification AWS AI Practitioner (AIP-C01)
Domain 2 — Implementation and Integration of Foundation Models
Task 2.4 — Design model deployment and inference strategies
Skill 2.4.4 — Develop intelligent model routing systems to optimize model selection
Coverage Static routing configurations, Step Functions for dynamic content-based routing to specialized FMs, intelligent model routing based on metrics, API Gateway with request transformations for routing logic

1. Intelligent Model Routing — Mindmap

mindmap
  root((Intelligent Model Routing))
    Static Routing
      Intent-Model Mapping
        Simple FAQ → Haiku
        Complex Analysis → Sonnet
        Translation → Specialized FM
      Configuration Tables
        DynamoDB Routing Table
        Environment Variables
        Feature Flags
      Rule-Based Selection
        Regex Pattern Matching
        Keyword Detection
        User Tier Mapping
    Dynamic Routing
      Content-Based Analysis
        Complexity Scoring
        Token Length Estimation
        Language Detection
      Step Functions Orchestration
        State Machine Workflows
        Parallel Model Invocation
        Conditional Branching
      Real-Time Adaptation
        Load Balancing
        Failover Chains
        Circuit Breaker Patterns
    Metric-Based Selection
      Latency Tracking
        P50/P95/P99 Percentiles
        Time-to-First-Token
        End-to-End Response Time
      Cost Optimization
        Token Usage Tracking
        Budget Allocation
        Cost-per-Query Scoring
      Quality Scores
        User Satisfaction Ratings
        Hallucination Detection
        Relevance Scoring
    API Gateway Routing
      Request Transformation
        Header Injection
        Body Modification
        Query Parameter Routing
      Lambda Authorizers
        Token Validation
        Route Determination
        Context Enrichment
      Integration Mappings
        VTL Templates
        Response Shaping
        Error Routing

2. Architecture Flowchart — MangaAssist Routing Decision Flow

flowchart TB
    subgraph Ingress["API Gateway WebSocket"]
        WS[WebSocket Connection]
        RT[Request Transformer]
        LA[Lambda Authorizer]
    end

    subgraph Router["ECS Fargate — Routing Orchestrator"]
        RC[Route Controller]
        SS[Static Router]
        DS[Dynamic Router]
        MS[Metric Selector]
    end

    subgraph Analysis["Content Analysis Pipeline"]
        CX[Complexity Scorer]
        LD[Language Detector]
        TE[Token Estimator]
        IC[Intent Classifier]
    end

    subgraph StepFn["Step Functions — Dynamic Routing Workflow"]
        SF1[Analyze Content]
        SF2[Check Metrics]
        SF3[Select Model]
        SF4[Invoke FM]
        SF5[Evaluate Response]
    end

    subgraph Models["Bedrock Foundation Models"]
        HAIKU[Claude 3 Haiku<br/>$0.25/$1.25 per 1M]
        SONNET[Claude 3 Sonnet<br/>$3/$15 per 1M]
        SPEC[Specialized FM<br/>Translation/Summarization]
    end

    subgraph Metrics["Metric Collection"]
        CW[CloudWatch Metrics]
        DDB[DynamoDB Metrics Table]
        REDIS[ElastiCache Redis<br/>Real-Time Counters]
    end

    subgraph Storage["Routing Configuration"]
        RTABLE[DynamoDB Routing Table]
        CACHE[Redis Config Cache]
        FEAT[Feature Flags]
    end

    WS --> RT
    RT --> LA
    LA --> RC

    RC --> SS
    RC --> DS
    RC --> MS

    SS --> RTABLE
    SS --> CACHE

    DS --> CX
    DS --> LD
    DS --> TE
    DS --> IC

    IC -->|Complex Query| SF1
    SF1 --> SF2
    SF2 --> SF3
    SF3 --> SF4
    SF4 --> SF5

    SF3 -->|Simple| HAIKU
    SF3 -->|Complex| SONNET
    SF3 -->|Specialized| SPEC

    MS --> CW
    MS --> DDB
    MS --> REDIS

    SF5 -->|Log| CW
    SF5 -->|Store| DDB

    style HAIKU fill:#4CAF50,color:#fff
    style SONNET fill:#2196F3,color:#fff
    style SPEC fill:#FF9800,color:#fff
    style RC fill:#9C27B0,color:#fff

3. Static Routing Configurations

Static routing maps known intents, user tiers, and request types directly to foundation models without runtime analysis. This is the fastest routing path — zero analysis overhead.

3.1 Intent-to-Model Mapping Table

Intent Category Sub-Intent Routed Model Rationale
Simple FAQ Store hours, shipping info, return policy Haiku Low complexity, cost-efficient
Product Search Title lookup, author search, genre browse Haiku Structured query, fast response needed
Product Detail Synopsis generation, similar recommendations Sonnet Requires nuanced understanding
Translation JP→EN manga title translation Haiku Short text, pattern-based
Translation Full synopsis JP→EN with cultural context Sonnet Needs cultural nuance
Order Support Order status, tracking info Haiku Template-based responses
Complex Analysis Manga comparison, thematic analysis Sonnet Deep reasoning required
Content Advisory Age rating explanation, content warnings Sonnet Sensitivity requires quality
Greeting/Chitchat Hello, thanks, goodbye Haiku Minimal generation needed
Escalation Complaint, refund dispute Sonnet Customer satisfaction critical

3.2 DynamoDB Routing Table Schema

Table: MangaAssist-RoutingConfig
├── PK: ROUTE#<intent_category>
├── SK: CONFIG#<sub_intent>
├── model_id: string (e.g., "anthropic.claude-3-haiku-20240307-v1:0")
├── fallback_model_id: string
├── max_tokens: number
├── temperature: number
├── priority: number (1=highest)
├── enabled: boolean
├── ttl: number (epoch seconds)
├── updated_at: string (ISO 8601)
└── updated_by: string

3.3 Static Router Implementation

"""
static_router.py — MangaAssist Static Model Router

Maps intents and request metadata directly to pre-configured model selections.
Uses DynamoDB routing table with Redis caching for sub-millisecond lookups.
"""

import json
import time
import hashlib
import logging
from typing import Optional, Dict, Any, List, Tuple
from dataclasses import dataclass, field
from enum import Enum

import boto3
from botocore.config import Config as BotoConfig

logger = logging.getLogger(__name__)


class ModelTier(Enum):
    """Available model tiers with their Bedrock model IDs and pricing."""
    HAIKU = "anthropic.claude-3-haiku-20240307-v1:0"
    SONNET = "anthropic.claude-3-sonnet-20240229-v1:0"

    @property
    def input_cost_per_1m(self) -> float:
        costs = {
            "HAIKU": 0.25,
            "SONNET": 3.00,
        }
        return costs[self.name]

    @property
    def output_cost_per_1m(self) -> float:
        costs = {
            "HAIKU": 1.25,
            "SONNET": 15.00,
        }
        return costs[self.name]


@dataclass
class RouteConfig:
    """Configuration for a single routing rule."""
    intent_category: str
    sub_intent: str
    model_id: str
    fallback_model_id: str
    max_tokens: int = 1024
    temperature: float = 0.3
    priority: int = 5
    enabled: bool = True
    ttl: int = 0
    updated_at: str = ""
    updated_by: str = "system"

    @property
    def cache_key(self) -> str:
        return f"route:{self.intent_category}:{self.sub_intent}"


@dataclass
class RoutingDecision:
    """Result of a routing decision."""
    model_id: str
    fallback_model_id: str
    max_tokens: int
    temperature: float
    route_source: str  # "static", "cache", "default"
    decision_time_ms: float
    intent_category: str
    sub_intent: str
    metadata: Dict[str, Any] = field(default_factory=dict)


class StaticRouter:
    """
    Static model router for MangaAssist.

    Performs zero-analysis routing based on pre-configured intent-to-model
    mappings stored in DynamoDB with Redis caching. Designed for sub-millisecond
    routing decisions on known intent categories.

    Architecture:
        1. Check Redis cache for routing config (< 0.1ms)
        2. Fall back to DynamoDB lookup (< 5ms)
        3. Fall back to hardcoded defaults (< 0.01ms)

    Usage:
        router = StaticRouter(
            table_name="MangaAssist-RoutingConfig",
            redis_client=redis_conn,
        )
        decision = router.route(intent="product_search", sub_intent="title_lookup")
        # decision.model_id → "anthropic.claude-3-haiku-20240307-v1:0"
    """

    # Hardcoded defaults — last resort if DynamoDB and Redis both fail
    DEFAULT_ROUTES: Dict[str, str] = {
        "simple_faq": ModelTier.HAIKU.value,
        "product_search": ModelTier.HAIKU.value,
        "product_detail": ModelTier.SONNET.value,
        "translation_simple": ModelTier.HAIKU.value,
        "translation_complex": ModelTier.SONNET.value,
        "order_support": ModelTier.HAIKU.value,
        "complex_analysis": ModelTier.SONNET.value,
        "content_advisory": ModelTier.SONNET.value,
        "greeting": ModelTier.HAIKU.value,
        "escalation": ModelTier.SONNET.value,
    }

    FALLBACK_MODEL = ModelTier.HAIKU.value

    def __init__(
        self,
        table_name: str = "MangaAssist-RoutingConfig",
        redis_client=None,
        cache_ttl_seconds: int = 300,
        region: str = "ap-northeast-1",
    ):
        self.table_name = table_name
        self.redis = redis_client
        self.cache_ttl = cache_ttl_seconds

        boto_config = BotoConfig(
            region_name=region,
            retries={"max_attempts": 2, "mode": "adaptive"},
        )
        self.dynamodb = boto3.resource("dynamodb", config=boto_config)
        self.table = self.dynamodb.Table(table_name)

        # Local in-memory cache for ultra-fast lookups
        self._local_cache: Dict[str, Tuple[RouteConfig, float]] = {}
        self._local_cache_ttl = 60  # seconds

        logger.info(
            "StaticRouter initialized | table=%s | cache_ttl=%ds",
            table_name,
            cache_ttl_seconds,
        )

    def route(
        self,
        intent: str,
        sub_intent: str = "default",
        user_tier: str = "standard",
        metadata: Optional[Dict[str, Any]] = None,
    ) -> RoutingDecision:
        """
        Resolve a routing decision for the given intent.

        Lookup order:
            1. Local in-memory cache
            2. Redis distributed cache
            3. DynamoDB routing table
            4. Hardcoded defaults

        Args:
            intent: The classified intent category (e.g., "product_search")
            sub_intent: Optional sub-intent for finer routing
            user_tier: User tier for priority routing (e.g., "premium")
            metadata: Additional context for the routing decision

        Returns:
            RoutingDecision with selected model and parameters
        """
        start = time.monotonic()
        cache_key = f"route:{intent}:{sub_intent}"

        # --- Layer 1: Local in-memory cache ---
        config = self._check_local_cache(cache_key)
        if config:
            elapsed = (time.monotonic() - start) * 1000
            logger.debug("Route resolved from local cache | key=%s | %.2fms", cache_key, elapsed)
            return self._build_decision(config, "local_cache", elapsed, intent, sub_intent, metadata)

        # --- Layer 2: Redis distributed cache ---
        config = self._check_redis_cache(cache_key)
        if config:
            self._set_local_cache(cache_key, config)
            elapsed = (time.monotonic() - start) * 1000
            logger.debug("Route resolved from Redis | key=%s | %.2fms", cache_key, elapsed)
            return self._build_decision(config, "redis_cache", elapsed, intent, sub_intent, metadata)

        # --- Layer 3: DynamoDB lookup ---
        config = self._lookup_dynamodb(intent, sub_intent)
        if config:
            self._set_local_cache(cache_key, config)
            self._set_redis_cache(cache_key, config)
            elapsed = (time.monotonic() - start) * 1000
            logger.debug("Route resolved from DynamoDB | key=%s | %.2fms", cache_key, elapsed)
            return self._build_decision(config, "dynamodb", elapsed, intent, sub_intent, metadata)

        # --- Layer 4: Hardcoded defaults ---
        elapsed = (time.monotonic() - start) * 1000
        logger.warning("Route falling back to defaults | intent=%s | %.2fms", intent, elapsed)
        return self._build_default_decision(intent, sub_intent, elapsed, metadata)

    def _check_local_cache(self, key: str) -> Optional[RouteConfig]:
        """Check local in-memory cache with TTL expiry."""
        if key in self._local_cache:
            config, cached_at = self._local_cache[key]
            if time.time() - cached_at < self._local_cache_ttl:
                return config
            del self._local_cache[key]
        return None

    def _set_local_cache(self, key: str, config: RouteConfig) -> None:
        """Store route config in local cache."""
        self._local_cache[key] = (config, time.time())

    def _check_redis_cache(self, key: str) -> Optional[RouteConfig]:
        """Check Redis distributed cache."""
        if not self.redis:
            return None
        try:
            data = self.redis.get(key)
            if data:
                parsed = json.loads(data)
                return RouteConfig(**parsed)
        except Exception as e:
            logger.warning("Redis cache check failed | key=%s | error=%s", key, e)
        return None

    def _set_redis_cache(self, key: str, config: RouteConfig) -> None:
        """Store route config in Redis with TTL."""
        if not self.redis:
            return
        try:
            data = json.dumps({
                "intent_category": config.intent_category,
                "sub_intent": config.sub_intent,
                "model_id": config.model_id,
                "fallback_model_id": config.fallback_model_id,
                "max_tokens": config.max_tokens,
                "temperature": config.temperature,
                "priority": config.priority,
                "enabled": config.enabled,
            })
            self.redis.setex(key, self.cache_ttl, data)
        except Exception as e:
            logger.warning("Redis cache set failed | key=%s | error=%s", key, e)

    def _lookup_dynamodb(self, intent: str, sub_intent: str) -> Optional[RouteConfig]:
        """Query DynamoDB for routing configuration."""
        try:
            response = self.table.get_item(
                Key={
                    "PK": f"ROUTE#{intent}",
                    "SK": f"CONFIG#{sub_intent}",
                },
                ConsistentRead=False,
            )
            item = response.get("Item")
            if item and item.get("enabled", True):
                return RouteConfig(
                    intent_category=intent,
                    sub_intent=sub_intent,
                    model_id=item["model_id"],
                    fallback_model_id=item.get("fallback_model_id", self.FALLBACK_MODEL),
                    max_tokens=int(item.get("max_tokens", 1024)),
                    temperature=float(item.get("temperature", 0.3)),
                    priority=int(item.get("priority", 5)),
                    enabled=bool(item.get("enabled", True)),
                )
        except Exception as e:
            logger.error("DynamoDB lookup failed | intent=%s | error=%s", intent, e)
        return None

    def _build_decision(
        self,
        config: RouteConfig,
        source: str,
        elapsed_ms: float,
        intent: str,
        sub_intent: str,
        metadata: Optional[Dict[str, Any]],
    ) -> RoutingDecision:
        """Build a RoutingDecision from a RouteConfig."""
        return RoutingDecision(
            model_id=config.model_id,
            fallback_model_id=config.fallback_model_id,
            max_tokens=config.max_tokens,
            temperature=config.temperature,
            route_source=source,
            decision_time_ms=elapsed_ms,
            intent_category=intent,
            sub_intent=sub_intent,
            metadata=metadata or {},
        )

    def _build_default_decision(
        self,
        intent: str,
        sub_intent: str,
        elapsed_ms: float,
        metadata: Optional[Dict[str, Any]],
    ) -> RoutingDecision:
        """Build a RoutingDecision from hardcoded defaults."""
        model_id = self.DEFAULT_ROUTES.get(intent, self.FALLBACK_MODEL)
        return RoutingDecision(
            model_id=model_id,
            fallback_model_id=self.FALLBACK_MODEL,
            max_tokens=1024,
            temperature=0.3,
            route_source="hardcoded_default",
            decision_time_ms=elapsed_ms,
            intent_category=intent,
            sub_intent=sub_intent,
            metadata=metadata or {},
        )

    def bulk_load_routes(self) -> int:
        """
        Pre-load all routing configs into local and Redis caches.
        Call during ECS task startup for warm cache.

        Returns:
            Number of routes loaded
        """
        count = 0
        try:
            response = self.table.scan(
                FilterExpression="begins_with(PK, :prefix)",
                ExpressionAttributeValues={":prefix": "ROUTE#"},
            )
            for item in response.get("Items", []):
                intent = item["PK"].replace("ROUTE#", "")
                sub_intent = item["SK"].replace("CONFIG#", "")
                config = RouteConfig(
                    intent_category=intent,
                    sub_intent=sub_intent,
                    model_id=item["model_id"],
                    fallback_model_id=item.get("fallback_model_id", self.FALLBACK_MODEL),
                    max_tokens=int(item.get("max_tokens", 1024)),
                    temperature=float(item.get("temperature", 0.3)),
                    priority=int(item.get("priority", 5)),
                    enabled=bool(item.get("enabled", True)),
                )
                key = config.cache_key
                self._set_local_cache(key, config)
                self._set_redis_cache(key, config)
                count += 1

            logger.info("Bulk loaded %d routes into cache", count)
        except Exception as e:
            logger.error("Bulk load failed | error=%s", e)
        return count

    def invalidate_route(self, intent: str, sub_intent: str = "default") -> None:
        """Invalidate a specific route from all cache layers."""
        key = f"route:{intent}:{sub_intent}"
        self._local_cache.pop(key, None)
        if self.redis:
            try:
                self.redis.delete(key)
            except Exception as e:
                logger.warning("Redis invalidation failed | key=%s | error=%s", key, e)
        logger.info("Route invalidated | intent=%s | sub_intent=%s", intent, sub_intent)

4. Step Functions Dynamic Routing

Dynamic routing uses AWS Step Functions to orchestrate multi-step content analysis before selecting a model. This path handles queries where the optimal model is not obvious from the intent alone.

4.1 Dynamic Routing Decision Tree

flowchart TD
    START([Incoming Query]) --> CLASSIFY[Classify Intent]
    CLASSIFY -->|Known Simple| STATIC[Static Route → Haiku]
    CLASSIFY -->|Known Complex| SONNET_DIRECT[Static Route → Sonnet]
    CLASSIFY -->|Ambiguous| ANALYZE[Dynamic Analysis]

    ANALYZE --> SCORE[Compute Complexity Score]
    SCORE --> CHECK_SCORE{Score >= 7?}

    CHECK_SCORE -->|Yes| CHECK_BUDGET{Budget Available?}
    CHECK_BUDGET -->|Yes| ROUTE_SONNET[Route → Sonnet]
    CHECK_BUDGET -->|No| ROUTE_HAIKU_WARN[Route → Haiku + Log Warning]

    CHECK_SCORE -->|No| CHECK_LATENCY{Latency Budget?}
    CHECK_LATENCY -->|Tight < 1s| ROUTE_HAIKU[Route → Haiku]
    CHECK_LATENCY -->|Normal| CHECK_QUALITY{Quality Priority?}
    CHECK_QUALITY -->|High| ROUTE_SONNET2[Route → Sonnet]
    CHECK_QUALITY -->|Normal| ROUTE_HAIKU2[Route → Haiku]

    STATIC --> INVOKE[Invoke Bedrock FM]
    SONNET_DIRECT --> INVOKE
    ROUTE_SONNET --> INVOKE
    ROUTE_HAIKU_WARN --> INVOKE
    ROUTE_HAIKU --> INVOKE
    ROUTE_SONNET2 --> INVOKE
    ROUTE_HAIKU2 --> INVOKE

    INVOKE --> EVAL[Evaluate Response Quality]
    EVAL --> LOG[Log Metrics + Decision]
    LOG --> RESPOND([Return Response])

    style ROUTE_SONNET fill:#2196F3,color:#fff
    style ROUTE_SONNET2 fill:#2196F3,color:#fff
    style SONNET_DIRECT fill:#2196F3,color:#fff
    style ROUTE_HAIKU fill:#4CAF50,color:#fff
    style ROUTE_HAIKU2 fill:#4CAF50,color:#fff
    style ROUTE_HAIKU_WARN fill:#FF9800,color:#fff
    style STATIC fill:#4CAF50,color:#fff

4.2 Dynamic Model Router Implementation

"""
dynamic_router.py — MangaAssist Dynamic Content-Based Model Router

Analyzes incoming query content in real time to determine the optimal
foundation model. Uses complexity scoring, language detection, token
estimation, and budget awareness.
"""

import re
import time
import logging
from typing import Dict, Any, List, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum

logger = logging.getLogger(__name__)


class ComplexityLevel(Enum):
    """Query complexity classification levels."""
    TRIVIAL = 1    # Greeting, yes/no questions
    SIMPLE = 3     # Single-fact lookups, basic FAQ
    MODERATE = 5   # Multi-step queries, comparisons
    COMPLEX = 7    # Analysis, reasoning, multi-turn context
    EXPERT = 9     # Deep domain expertise, creative generation


@dataclass
class ContentAnalysis:
    """Result of content analysis for routing decisions."""
    complexity_score: float          # 0-10 scale
    complexity_level: ComplexityLevel
    estimated_input_tokens: int
    estimated_output_tokens: int
    language_detected: str           # ISO 639-1 code
    contains_japanese: bool
    requires_cultural_context: bool
    query_type: str                  # "factual", "analytical", "creative", "conversational"
    entity_count: int
    has_comparison: bool
    has_negation: bool
    multi_turn_depth: int
    analysis_time_ms: float


@dataclass
class RoutingContext:
    """Full context for making a routing decision."""
    content_analysis: ContentAnalysis
    user_tier: str
    session_query_count: int
    daily_budget_remaining_usd: float
    latency_budget_ms: int
    quality_priority: str  # "cost", "balanced", "quality"
    previous_model_used: Optional[str] = None


@dataclass
class DynamicRoutingDecision:
    """Result of dynamic routing analysis."""
    model_id: str
    fallback_model_id: str
    max_tokens: int
    temperature: float
    reasoning: str
    confidence: float  # 0-1
    estimated_cost_usd: float
    estimated_latency_ms: int
    route_path: str
    decision_time_ms: float
    content_analysis: ContentAnalysis
    metadata: Dict[str, Any] = field(default_factory=dict)


class DynamicModelRouter:
    """
    Dynamic content-based model router for MangaAssist.

    Performs real-time content analysis to determine the optimal model
    for each query. Balances cost, latency, and quality based on
    configurable policies and real-time budget constraints.

    Decision Factors:
        - Complexity score (vocabulary, structure, entities)
        - Token estimation (input size → output prediction)
        - Language detection (Japanese content needs higher quality)
        - Budget constraints (daily spend limits per tier)
        - Latency requirements (tight budgets favor Haiku)
        - Quality priority (user tier, query importance)

    Usage:
        router = DynamicModelRouter(budget_tracker=tracker)
        analysis = router.analyze_content(query, conversation_history)
        decision = router.route(analysis, user_context)
    """

    HAIKU_MODEL = "anthropic.claude-3-haiku-20240307-v1:0"
    SONNET_MODEL = "anthropic.claude-3-sonnet-20240229-v1:0"

    # Cost per 1M tokens
    HAIKU_INPUT_COST = 0.25
    HAIKU_OUTPUT_COST = 1.25
    SONNET_INPUT_COST = 3.00
    SONNET_OUTPUT_COST = 15.00

    # Complexity indicators
    COMPLEX_PATTERNS = [
        r"\b(compare|contrast|analyze|explain why|what if)\b",
        r"\b(difference between|similarities|pros and cons)\b",
        r"\b(recommend|suggest|which .+ should)\b",
        r"\b(history of|evolution of|influence on)\b",
        r"\b(theme|symbolism|metaphor|narrative)\b",
    ]

    SIMPLE_PATTERNS = [
        r"\b(what is|where is|when did|how much|price of)\b",
        r"\b(yes|no|ok|thanks|hello|hi|bye)\b",
        r"\b(order status|tracking|shipping|return)\b",
    ]

    JAPANESE_PATTERN = re.compile(r"[\u3040-\u309F\u30A0-\u30FF\u4E00-\u9FFF]")
    CULTURAL_KEYWORDS = [
        "manga", "anime", "otaku", "shounen", "shoujo", "seinen", "josei",
        "isekai", "mecha", "slice of life", "tankoubon", "mangaka",
    ]

    def __init__(
        self,
        budget_tracker=None,
        complexity_threshold_sonnet: float = 6.5,
        max_haiku_tokens: int = 2048,
        max_sonnet_tokens: int = 4096,
    ):
        self.budget_tracker = budget_tracker
        self.sonnet_threshold = complexity_threshold_sonnet
        self.max_haiku_tokens = max_haiku_tokens
        self.max_sonnet_tokens = max_sonnet_tokens

        # Compile patterns once
        self._complex_re = [re.compile(p, re.IGNORECASE) for p in self.COMPLEX_PATTERNS]
        self._simple_re = [re.compile(p, re.IGNORECASE) for p in self.SIMPLE_PATTERNS]

        logger.info(
            "DynamicModelRouter initialized | sonnet_threshold=%.1f",
            complexity_threshold_sonnet,
        )

    def analyze_content(
        self,
        query: str,
        conversation_history: Optional[List[Dict[str, str]]] = None,
    ) -> ContentAnalysis:
        """
        Analyze query content to determine complexity and characteristics.

        Args:
            query: The user's current message
            conversation_history: Previous messages in the session

        Returns:
            ContentAnalysis with all scoring dimensions
        """
        start = time.monotonic()
        history = conversation_history or []

        # --- Token estimation ---
        input_tokens = self._estimate_tokens(query)
        context_tokens = sum(self._estimate_tokens(m.get("content", "")) for m in history)
        total_input = input_tokens + context_tokens
        estimated_output = min(input_tokens * 3, 2048)

        # --- Language detection ---
        has_japanese = bool(self.JAPANESE_PATTERN.search(query))
        language = "ja" if has_japanese else "en"

        # --- Cultural context ---
        requires_cultural = any(kw in query.lower() for kw in self.CULTURAL_KEYWORDS)

        # --- Complexity scoring ---
        complexity = self._compute_complexity(query, history)

        # --- Query type classification ---
        query_type = self._classify_query_type(query)

        # --- Entity counting ---
        entity_count = self._count_entities(query)

        # --- Structural features ---
        has_comparison = bool(re.search(r"\b(compare|vs|versus|better|worse|difference)\b", query, re.IGNORECASE))
        has_negation = bool(re.search(r"\b(not|never|don't|doesn't|isn't|aren't|won't)\b", query, re.IGNORECASE))

        elapsed = (time.monotonic() - start) * 1000

        level = self._score_to_level(complexity)

        return ContentAnalysis(
            complexity_score=complexity,
            complexity_level=level,
            estimated_input_tokens=total_input,
            estimated_output_tokens=estimated_output,
            language_detected=language,
            contains_japanese=has_japanese,
            requires_cultural_context=requires_cultural,
            query_type=query_type,
            entity_count=entity_count,
            has_comparison=has_comparison,
            has_negation=has_negation,
            multi_turn_depth=len(history),
            analysis_time_ms=elapsed,
        )

    def route(
        self,
        analysis: ContentAnalysis,
        context: RoutingContext,
    ) -> DynamicRoutingDecision:
        """
        Make a routing decision based on content analysis and context.

        Decision Logic:
            1. If complexity >= threshold AND budget allows → Sonnet
            2. If Japanese cultural content AND quality priority → Sonnet
            3. If latency budget tight (< 1000ms) → Haiku
            4. If daily budget nearly exhausted → Haiku
            5. Default → Haiku (cost optimization)

        Args:
            analysis: Content analysis results
            context: User and system context

        Returns:
            DynamicRoutingDecision with full reasoning
        """
        start = time.monotonic()
        reasoning_parts: List[str] = []

        # --- Decision scoring ---
        sonnet_score = 0.0
        haiku_score = 0.0

        # Complexity factor (weight: 40%)
        if analysis.complexity_score >= self.sonnet_threshold:
            sonnet_score += 4.0
            reasoning_parts.append(
                f"Complexity {analysis.complexity_score:.1f} >= threshold {self.sonnet_threshold}"
            )
        else:
            haiku_score += 4.0
            reasoning_parts.append(
                f"Complexity {analysis.complexity_score:.1f} < threshold {self.sonnet_threshold}"
            )

        # Cultural context factor (weight: 20%)
        if analysis.requires_cultural_context and analysis.contains_japanese:
            sonnet_score += 2.0
            reasoning_parts.append("Japanese cultural content detected — quality boost")
        else:
            haiku_score += 1.0

        # Latency factor (weight: 20%)
        if context.latency_budget_ms < 1000:
            haiku_score += 2.0
            reasoning_parts.append(f"Tight latency budget ({context.latency_budget_ms}ms) — favoring Haiku")
        elif context.latency_budget_ms < 2000:
            haiku_score += 1.0

        # Budget factor (weight: 20%)
        est_sonnet_cost = self._estimate_cost(
            analysis.estimated_input_tokens,
            analysis.estimated_output_tokens,
            self.SONNET_INPUT_COST,
            self.SONNET_OUTPUT_COST,
        )
        if context.daily_budget_remaining_usd < est_sonnet_cost * 10:
            haiku_score += 2.0
            reasoning_parts.append("Budget nearly exhausted — routing to Haiku")

        # Quality priority override
        if context.quality_priority == "quality":
            sonnet_score += 1.5
            reasoning_parts.append("Quality priority set — boosting Sonnet score")
        elif context.quality_priority == "cost":
            haiku_score += 1.5
            reasoning_parts.append("Cost priority set — boosting Haiku score")

        # Premium user tier boost
        if context.user_tier == "premium":
            sonnet_score += 1.0
            reasoning_parts.append("Premium user tier — Sonnet preference")

        # --- Final decision ---
        if sonnet_score > haiku_score:
            model_id = self.SONNET_MODEL
            fallback = self.HAIKU_MODEL
            max_tokens = self.max_sonnet_tokens
            est_cost = est_sonnet_cost
            est_latency = 2000
        else:
            model_id = self.HAIKU_MODEL
            fallback = self.SONNET_MODEL
            max_tokens = self.max_haiku_tokens
            est_cost = self._estimate_cost(
                analysis.estimated_input_tokens,
                analysis.estimated_output_tokens,
                self.HAIKU_INPUT_COST,
                self.HAIKU_OUTPUT_COST,
            )
            est_latency = 800

        confidence = abs(sonnet_score - haiku_score) / max(sonnet_score + haiku_score, 1)
        elapsed = (time.monotonic() - start) * 1000

        decision = DynamicRoutingDecision(
            model_id=model_id,
            fallback_model_id=fallback,
            max_tokens=max_tokens,
            temperature=0.3 if analysis.query_type == "factual" else 0.7,
            reasoning=" | ".join(reasoning_parts),
            confidence=min(confidence, 1.0),
            estimated_cost_usd=est_cost,
            estimated_latency_ms=est_latency,
            route_path="dynamic_content_analysis",
            decision_time_ms=elapsed,
            content_analysis=analysis,
        )

        logger.info(
            "Dynamic route decision | model=%s | score=S:%.1f/H:%.1f | confidence=%.2f | %.2fms",
            model_id.split(".")[-1][:20],
            sonnet_score,
            haiku_score,
            confidence,
            elapsed,
        )

        return decision

    def _compute_complexity(
        self, query: str, history: List[Dict[str, str]]
    ) -> float:
        """
        Compute a 0-10 complexity score for the query.

        Scoring dimensions:
            - Vocabulary complexity (word length, rare words)
            - Structural complexity (sentence count, question depth)
            - Pattern matching (complex vs simple patterns)
            - Context dependency (multi-turn depth)
            - Entity density
        """
        score = 0.0
        words = query.split()
        word_count = len(words)

        # Word count factor
        if word_count > 50:
            score += 2.0
        elif word_count > 20:
            score += 1.0
        elif word_count < 5:
            score -= 1.0

        # Average word length (proxy for vocabulary complexity)
        if words:
            avg_len = sum(len(w) for w in words) / len(words)
            if avg_len > 7:
                score += 1.5
            elif avg_len > 5:
                score += 0.5

        # Complex pattern matches
        complex_matches = sum(1 for p in self._complex_re if p.search(query))
        score += min(complex_matches * 1.0, 3.0)

        # Simple pattern penalty
        simple_matches = sum(1 for p in self._simple_re if p.search(query))
        score -= min(simple_matches * 0.5, 2.0)

        # Multi-turn depth bonus
        score += min(len(history) * 0.3, 2.0)

        # Question complexity
        question_marks = query.count("?")
        if question_marks > 2:
            score += 1.0

        # Clamp to 0-10
        return max(0.0, min(10.0, score + 3.0))

    def _classify_query_type(self, query: str) -> str:
        """Classify query into factual, analytical, creative, or conversational."""
        q_lower = query.lower()
        if any(kw in q_lower for kw in ["analyze", "compare", "why", "how does", "explain"]):
            return "analytical"
        if any(kw in q_lower for kw in ["write", "create", "generate", "story", "describe"]):
            return "creative"
        if any(kw in q_lower for kw in ["hi", "hello", "thanks", "bye", "how are you"]):
            return "conversational"
        return "factual"

    def _count_entities(self, query: str) -> int:
        """Count approximate entity mentions (capitalized words, quoted terms)."""
        caps = re.findall(r"\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b", query)
        quoted = re.findall(r'"[^"]+"|\'[^\']+\'', query)
        return len(caps) + len(quoted)

    def _estimate_tokens(self, text: str) -> int:
        """Estimate token count (rough: 1 token per 4 characters for English, 1.5 per char for Japanese)."""
        jp_chars = len(self.JAPANESE_PATTERN.findall(text))
        en_chars = len(text) - jp_chars
        return int(en_chars / 4 + jp_chars / 1.5)

    def _estimate_cost(
        self, input_tokens: int, output_tokens: int,
        input_price: float, output_price: float,
    ) -> float:
        """Estimate cost in USD."""
        return (input_tokens * input_price + output_tokens * output_price) / 1_000_000

    @staticmethod
    def _score_to_level(score: float) -> ComplexityLevel:
        """Map numeric score to complexity level."""
        if score <= 2:
            return ComplexityLevel.TRIVIAL
        if score <= 4:
            return ComplexityLevel.SIMPLE
        if score <= 6:
            return ComplexityLevel.MODERATE
        if score <= 8:
            return ComplexityLevel.COMPLEX
        return ComplexityLevel.EXPERT

5. Metric-Based Model Selection

Metric-based routing uses real-time performance data to dynamically adjust model selection. If Sonnet latency spikes, the system can temporarily route more queries to Haiku. If Haiku quality drops for certain query types, the system shifts to Sonnet.

5.1 Metrics Collection Architecture

flowchart LR
    subgraph Sources["Metric Sources"]
        BED[Bedrock InvokeModel<br/>Latency + Tokens]
        CW[CloudWatch<br/>P50/P95/P99]
        USR[User Feedback<br/>Thumbs Up/Down]
        QA[Quality Eval<br/>Relevance Scores]
    end

    subgraph Collection["Collection Pipeline"]
        KDS[Kinesis Data Stream]
        FH[Firehose]
        LAMBDA[Lambda Aggregator]
    end

    subgraph Storage["Metric Storage"]
        TS[DynamoDB<br/>Time-Series Metrics]
        RED[ElastiCache Redis<br/>Real-Time Counters]
        S3[S3<br/>Historical Archive]
    end

    subgraph Selector["Metric-Based Selector"]
        AGG[Metric Aggregator]
        RANK[Model Ranker]
        DECIDE[Route Decision]
    end

    BED --> KDS
    CW --> LAMBDA
    USR --> KDS
    QA --> KDS

    KDS --> FH
    KDS --> LAMBDA

    LAMBDA --> TS
    LAMBDA --> RED
    FH --> S3

    RED --> AGG
    TS --> AGG
    AGG --> RANK
    RANK --> DECIDE

5.2 Metric-Based Selector Implementation

"""
metric_selector.py — MangaAssist Metric-Based Model Selector

Uses real-time performance metrics to dynamically rank and select models.
Factors: latency percentiles, cost tracking, quality scores, error rates.
"""

import time
import json
import logging
from typing import Dict, Any, List, Optional, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta

logger = logging.getLogger(__name__)


@dataclass
class ModelMetrics:
    """Real-time metrics for a single model."""
    model_id: str
    p50_latency_ms: float = 0.0
    p95_latency_ms: float = 0.0
    p99_latency_ms: float = 0.0
    avg_input_tokens: float = 0.0
    avg_output_tokens: float = 0.0
    error_rate_pct: float = 0.0
    throttle_rate_pct: float = 0.0
    quality_score: float = 0.0       # 0-1 scale
    user_satisfaction: float = 0.0    # 0-1 scale
    cost_per_query_usd: float = 0.0
    queries_last_hour: int = 0
    last_updated: str = ""


@dataclass
class MetricWeight:
    """Configurable weights for multi-objective model ranking."""
    latency: float = 0.30
    cost: float = 0.25
    quality: float = 0.30
    reliability: float = 0.15

    def validate(self) -> bool:
        total = self.latency + self.cost + self.quality + self.reliability
        return abs(total - 1.0) < 0.01


@dataclass
class ModelRanking:
    """Ranked model with composite score breakdown."""
    model_id: str
    composite_score: float
    latency_score: float
    cost_score: float
    quality_score: float
    reliability_score: float
    rank: int
    metrics: ModelMetrics


class MetricBasedSelector:
    """
    Metric-based model selector for MangaAssist.

    Maintains real-time performance metrics for all available models
    and ranks them using configurable multi-objective scoring. Supports
    dynamic weight adjustment based on system conditions (e.g., increase
    cost weight during budget pressure, increase latency weight during
    peak hours).

    Scoring Formula:
        composite = (latency_score * w_latency) +
                    (cost_score * w_cost) +
                    (quality_score * w_quality) +
                    (reliability_score * w_reliability)

    All individual scores are normalized to 0-1 where 1 = best.

    Usage:
        selector = MetricBasedSelector(redis_client=redis_conn)
        ranking = selector.rank_models(query_type="analytical")
        best = ranking[0]  # Highest composite score
    """

    MODELS = [
        "anthropic.claude-3-haiku-20240307-v1:0",
        "anthropic.claude-3-sonnet-20240229-v1:0",
    ]

    # Baseline expectations for normalization
    LATENCY_BASELINE_MS = 3000.0     # 3s target
    COST_BASELINE_USD = 0.001        # $0.001 per query target
    QUALITY_BASELINE = 0.8           # 80% quality target
    ERROR_RATE_THRESHOLD = 5.0       # 5% error rate threshold

    def __init__(
        self,
        redis_client=None,
        dynamodb_table=None,
        weights: Optional[MetricWeight] = None,
        metric_window_minutes: int = 15,
    ):
        self.redis = redis_client
        self.dynamodb_table = dynamodb_table
        self.weights = weights or MetricWeight()
        self.metric_window = metric_window_minutes

        if not self.weights.validate():
            raise ValueError("Metric weights must sum to 1.0")

        # In-memory metric cache
        self._metrics_cache: Dict[str, Tuple[ModelMetrics, float]] = {}
        self._cache_ttl = 30  # seconds

        logger.info(
            "MetricBasedSelector initialized | weights=L:%.2f/C:%.2f/Q:%.2f/R:%.2f | window=%dm",
            self.weights.latency,
            self.weights.cost,
            self.weights.quality,
            self.weights.reliability,
            metric_window_minutes,
        )

    def get_model_metrics(self, model_id: str) -> ModelMetrics:
        """
        Retrieve current metrics for a model.
        Checks local cache → Redis → DynamoDB.
        """
        # Local cache check
        if model_id in self._metrics_cache:
            metrics, cached_at = self._metrics_cache[model_id]
            if time.time() - cached_at < self._cache_ttl:
                return metrics

        # Redis check
        if self.redis:
            try:
                key = f"metrics:{model_id}"
                data = self.redis.hgetall(key)
                if data:
                    metrics = ModelMetrics(
                        model_id=model_id,
                        p50_latency_ms=float(data.get(b"p50_latency_ms", 0)),
                        p95_latency_ms=float(data.get(b"p95_latency_ms", 0)),
                        p99_latency_ms=float(data.get(b"p99_latency_ms", 0)),
                        avg_input_tokens=float(data.get(b"avg_input_tokens", 0)),
                        avg_output_tokens=float(data.get(b"avg_output_tokens", 0)),
                        error_rate_pct=float(data.get(b"error_rate_pct", 0)),
                        throttle_rate_pct=float(data.get(b"throttle_rate_pct", 0)),
                        quality_score=float(data.get(b"quality_score", 0)),
                        user_satisfaction=float(data.get(b"user_satisfaction", 0)),
                        cost_per_query_usd=float(data.get(b"cost_per_query_usd", 0)),
                        queries_last_hour=int(data.get(b"queries_last_hour", 0)),
                        last_updated=data.get(b"last_updated", b"").decode(),
                    )
                    self._metrics_cache[model_id] = (metrics, time.time())
                    return metrics
            except Exception as e:
                logger.warning("Redis metrics fetch failed | model=%s | error=%s", model_id, e)

        # Return empty metrics as fallback
        return ModelMetrics(model_id=model_id)

    def rank_models(
        self,
        query_type: str = "general",
        override_weights: Optional[MetricWeight] = None,
    ) -> List[ModelRanking]:
        """
        Rank all models by composite score.

        Args:
            query_type: Optional query type for weight adjustment
            override_weights: Override default weights for this ranking

        Returns:
            List of ModelRanking sorted by composite_score descending
        """
        weights = override_weights or self._adjust_weights_for_query(query_type)
        rankings: List[ModelRanking] = []

        all_metrics = {m: self.get_model_metrics(m) for m in self.MODELS}

        for model_id, metrics in all_metrics.items():
            latency_score = self._score_latency(metrics)
            cost_score = self._score_cost(metrics)
            quality_score = self._score_quality(metrics)
            reliability_score = self._score_reliability(metrics)

            composite = (
                latency_score * weights.latency
                + cost_score * weights.cost
                + quality_score * weights.quality
                + reliability_score * weights.reliability
            )

            rankings.append(ModelRanking(
                model_id=model_id,
                composite_score=composite,
                latency_score=latency_score,
                cost_score=cost_score,
                quality_score=quality_score,
                reliability_score=reliability_score,
                rank=0,
                metrics=metrics,
            ))

        # Sort by composite score descending
        rankings.sort(key=lambda r: r.composite_score, reverse=True)
        for i, r in enumerate(rankings):
            r.rank = i + 1

        if rankings:
            logger.info(
                "Model ranking | #1=%s (%.3f) | #2=%s (%.3f)",
                rankings[0].model_id.split(".")[-1][:15],
                rankings[0].composite_score,
                rankings[1].model_id.split(".")[-1][:15] if len(rankings) > 1 else "N/A",
                rankings[1].composite_score if len(rankings) > 1 else 0,
            )

        return rankings

    def select_best_model(
        self,
        query_type: str = "general",
        exclude_models: Optional[List[str]] = None,
    ) -> ModelRanking:
        """Select the highest-ranked model, optionally excluding some."""
        rankings = self.rank_models(query_type)
        exclude = set(exclude_models or [])
        for r in rankings:
            if r.model_id not in exclude:
                return r
        return rankings[0]  # Fallback to best even if excluded

    def record_metric(
        self,
        model_id: str,
        latency_ms: float,
        input_tokens: int,
        output_tokens: int,
        error: bool = False,
        quality_score: Optional[float] = None,
    ) -> None:
        """
        Record a new metric data point for incremental aggregation.
        Pushes to Redis for real-time and DynamoDB for historical.
        """
        if not self.redis:
            return

        try:
            pipe = self.redis.pipeline()
            key = f"metrics:{model_id}"
            ts_key = f"metrics_ts:{model_id}:{int(time.time()) // 60}"

            # Update running averages via sorted set for percentile calc
            pipe.zadd(f"latency_samples:{model_id}", {str(time.time()): latency_ms})

            # Increment counters
            pipe.hincrby(key, "total_queries", 1)
            if error:
                pipe.hincrby(key, "error_count", 1)
            if quality_score is not None:
                pipe.hset(key, "quality_score", str(quality_score))

            pipe.hset(key, "last_updated", datetime.utcnow().isoformat())
            pipe.expire(key, 3600)
            pipe.execute()

        except Exception as e:
            logger.warning("Metric recording failed | model=%s | error=%s", model_id, e)

    def update_weights(self, new_weights: MetricWeight) -> None:
        """Dynamically update scoring weights (e.g., during budget pressure)."""
        if not new_weights.validate():
            raise ValueError("Weights must sum to 1.0")
        old = self.weights
        self.weights = new_weights
        logger.info(
            "Weights updated | L:%.2f%.2f C:%.2f%.2f Q:%.2f%.2f R:%.2f%.2f",
            old.latency, new_weights.latency,
            old.cost, new_weights.cost,
            old.quality, new_weights.quality,
            old.reliability, new_weights.reliability,
        )

    def _score_latency(self, metrics: ModelMetrics) -> float:
        """Score latency: lower is better, normalized to 0-1."""
        if metrics.p95_latency_ms <= 0:
            return 0.5  # No data — neutral score
        ratio = metrics.p95_latency_ms / self.LATENCY_BASELINE_MS
        return max(0.0, min(1.0, 1.0 - (ratio - 1.0) * 0.5))

    def _score_cost(self, metrics: ModelMetrics) -> float:
        """Score cost: lower is better, normalized to 0-1."""
        if metrics.cost_per_query_usd <= 0:
            return 0.5
        ratio = metrics.cost_per_query_usd / self.COST_BASELINE_USD
        return max(0.0, min(1.0, 1.0 - (ratio - 1.0) * 0.3))

    def _score_quality(self, metrics: ModelMetrics) -> float:
        """Score quality: higher is better, already 0-1."""
        if metrics.quality_score <= 0:
            return 0.5
        return min(1.0, metrics.quality_score / self.QUALITY_BASELINE)

    def _score_reliability(self, metrics: ModelMetrics) -> float:
        """Score reliability: lower error rate is better."""
        if metrics.error_rate_pct <= 0:
            return 0.9  # Assume good if no data
        ratio = metrics.error_rate_pct / self.ERROR_RATE_THRESHOLD
        return max(0.0, min(1.0, 1.0 - ratio))

    def _adjust_weights_for_query(self, query_type: str) -> MetricWeight:
        """Adjust weights based on query type characteristics."""
        adjustments = {
            "factual": MetricWeight(latency=0.25, cost=0.25, quality=0.35, reliability=0.15),
            "analytical": MetricWeight(latency=0.20, cost=0.15, quality=0.50, reliability=0.15),
            "creative": MetricWeight(latency=0.15, cost=0.20, quality=0.50, reliability=0.15),
            "conversational": MetricWeight(latency=0.40, cost=0.30, quality=0.15, reliability=0.15),
        }
        return adjustments.get(query_type, self.weights)

6. API Gateway Request Transformation for Routing

API Gateway request transformations allow routing logic to execute at the edge, before reaching the ECS Fargate orchestrator. This reduces latency for simple routing decisions and offloads work from the compute layer.

6.1 API Gateway Routing Architecture

flowchart LR
    subgraph Client["Client Layer"]
        APP[MangaAssist App]
    end

    subgraph APIGW["API Gateway"]
        WS[WebSocket API]
        AUTH[Lambda Authorizer]
        VTL[VTL Mapping Template]
        INT[Integration Request]
    end

    subgraph Routes["Route Targets"]
        FAST[ECS Fast Path<br/>Haiku Direct]
        FULL[ECS Full Router<br/>Dynamic Analysis]
        PREM[ECS Premium Path<br/>Sonnet Direct]
    end

    APP -->|WebSocket| WS
    WS --> AUTH
    AUTH -->|Context| VTL
    VTL -->|Transform| INT
    INT -->|simple_intent| FAST
    INT -->|complex_intent| FULL
    INT -->|premium_user| PREM

6.2 API Gateway Route Transformer Implementation

"""
api_gateway_transformer.py — MangaAssist API Gateway Route Transformation

Generates VTL mapping templates and Lambda authorizer logic for
API Gateway-level routing decisions. Enables edge-based routing
without reaching the ECS orchestrator for simple cases.
"""

import json
import hashlib
import logging
from typing import Dict, Any, Optional, List
from dataclasses import dataclass

logger = logging.getLogger(__name__)


@dataclass
class TransformationRule:
    """A single routing transformation rule for API Gateway."""
    rule_id: str
    condition_field: str       # e.g., "intent", "user_tier", "message_length"
    condition_operator: str    # "equals", "contains", "greater_than", "regex"
    condition_value: str
    target_route: str          # Route key for API Gateway
    priority: int = 5
    enabled: bool = True
    description: str = ""


@dataclass
class TransformationResult:
    """Result of applying transformation rules."""
    target_route: str
    matched_rule_id: str
    transformed_headers: Dict[str, str]
    transformed_body: Dict[str, Any]
    routing_metadata: Dict[str, Any]


class APIGatewayRouteTransformer:
    """
    API Gateway request transformation engine for MangaAssist.

    Generates and applies routing transformations at the API Gateway
    level using VTL templates and Lambda authorizer context. Enables
    fast-path routing for simple intents without full orchestrator
    involvement.

    Capabilities:
        - Header-based routing (X-Intent-Category, X-User-Tier)
        - Body inspection for routing signals
        - VTL template generation for API Gateway mappings
        - Lambda authorizer context enrichment
        - Route key generation for WebSocket API

    Usage:
        transformer = APIGatewayRouteTransformer()
        transformer.add_rule(TransformationRule(...))
        result = transformer.transform(request_event)
    """

    DEFAULT_ROUTE = "full_router"

    def __init__(self):
        self.rules: List[TransformationRule] = []
        self._rules_sorted = False

        # Pre-load standard MangaAssist routing rules
        self._load_default_rules()

        logger.info("APIGatewayRouteTransformer initialized with %d rules", len(self.rules))

    def _load_default_rules(self) -> None:
        """Load default routing rules for MangaAssist."""
        defaults = [
            TransformationRule(
                rule_id="greeting_fast",
                condition_field="intent",
                condition_operator="equals",
                condition_value="greeting",
                target_route="fast_haiku",
                priority=1,
                description="Greetings go directly to Haiku fast path",
            ),
            TransformationRule(
                rule_id="simple_faq_fast",
                condition_field="intent",
                condition_operator="equals",
                condition_value="simple_faq",
                target_route="fast_haiku",
                priority=2,
                description="Simple FAQ to Haiku fast path",
            ),
            TransformationRule(
                rule_id="order_status_fast",
                condition_field="intent",
                condition_operator="equals",
                condition_value="order_status",
                target_route="fast_haiku",
                priority=2,
                description="Order status lookups to Haiku fast path",
            ),
            TransformationRule(
                rule_id="premium_user_sonnet",
                condition_field="user_tier",
                condition_operator="equals",
                condition_value="premium",
                target_route="premium_sonnet",
                priority=3,
                description="Premium users get Sonnet by default",
            ),
            TransformationRule(
                rule_id="long_message_dynamic",
                condition_field="message_length",
                condition_operator="greater_than",
                condition_value="500",
                target_route="full_router",
                priority=4,
                description="Long messages need dynamic analysis",
            ),
            TransformationRule(
                rule_id="complex_intent_dynamic",
                condition_field="intent",
                condition_operator="equals",
                condition_value="complex_analysis",
                target_route="full_router",
                priority=2,
                description="Complex analysis goes to full dynamic router",
            ),
        ]
        self.rules.extend(defaults)
        self._rules_sorted = False

    def add_rule(self, rule: TransformationRule) -> None:
        """Add a transformation rule."""
        self.rules.append(rule)
        self._rules_sorted = False
        logger.info("Rule added | id=%s | target=%s", rule.rule_id, rule.target_route)

    def remove_rule(self, rule_id: str) -> bool:
        """Remove a rule by ID."""
        initial = len(self.rules)
        self.rules = [r for r in self.rules if r.rule_id != rule_id]
        removed = len(self.rules) < initial
        if removed:
            self._rules_sorted = False
            logger.info("Rule removed | id=%s", rule_id)
        return removed

    def transform(self, event: Dict[str, Any]) -> TransformationResult:
        """
        Apply transformation rules to an incoming API Gateway event.

        Args:
            event: API Gateway WebSocket event (requestContext + body)

        Returns:
            TransformationResult with routing decision and transformed payload
        """
        if not self._rules_sorted:
            self.rules.sort(key=lambda r: r.priority)
            self._rules_sorted = True

        # Extract routing signals from event
        signals = self._extract_signals(event)

        # Evaluate rules in priority order
        for rule in self.rules:
            if not rule.enabled:
                continue
            if self._evaluate_rule(rule, signals):
                return self._build_result(rule, event, signals)

        # No rule matched — default route
        return TransformationResult(
            target_route=self.DEFAULT_ROUTE,
            matched_rule_id="default",
            transformed_headers=self._build_routing_headers(self.DEFAULT_ROUTE, signals),
            transformed_body=self._build_routing_body(event, signals),
            routing_metadata={"signals": signals, "matched": False},
        )

    def generate_vtl_template(self) -> str:
        """
        Generate a VTL (Velocity Template Language) mapping template
        for API Gateway integration request transformation.

        Returns:
            VTL template string for API Gateway configuration
        """
        vtl_lines = [
            '## MangaAssist API Gateway Routing VTL Template',
            '## Auto-generated — do not edit manually',
            '#set($inputRoot = $util.parseJson($input.body))',
            '#set($context = $input.params())',
            '#set($intent = $inputRoot.intent)',
            '#set($userTier = $context.header.get("X-User-Tier"))',
            '#set($messageLength = $inputRoot.message.length())',
            '',
            '## Determine route based on intent and context',
            '#if($intent == "greeting" || $intent == "simple_faq" || $intent == "order_status")',
            '  #set($route = "fast_haiku")',
            '  #set($modelId = "anthropic.claude-3-haiku-20240307-v1:0")',
            '  #set($maxTokens = 512)',
            '#elseif($userTier == "premium")',
            '  #set($route = "premium_sonnet")',
            '  #set($modelId = "anthropic.claude-3-sonnet-20240229-v1:0")',
            '  #set($maxTokens = 4096)',
            '#elseif($intent == "complex_analysis" || $messageLength > 500)',
            '  #set($route = "full_router")',
            '  #set($modelId = "dynamic")',
            '  #set($maxTokens = 4096)',
            '#else',
            '  #set($route = "full_router")',
            '  #set($modelId = "dynamic")',
            '  #set($maxTokens = 2048)',
            '#end',
            '',
            '{',
            '  "route": "$route",',
            '  "modelId": "$modelId",',
            '  "maxTokens": $maxTokens,',
            '  "originalIntent": "$intent",',
            '  "userTier": "$userTier",',
            '  "message": "$util.escapeJavaScript($inputRoot.message)",',
            '  "sessionId": "$inputRoot.sessionId",',
            '  "timestamp": "$context.header.get(\'X-Request-Time\')"',
            '}',
        ]
        return "\n".join(vtl_lines)

    def generate_lambda_authorizer_context(
        self, event: Dict[str, Any]
    ) -> Dict[str, str]:
        """
        Generate context to return from a Lambda authorizer
        that includes routing metadata for downstream processing.

        The authorizer context is available to the integration request
        via $context.authorizer.<key>.
        """
        signals = self._extract_signals(event)
        result = self.transform(event)

        return {
            "routeTarget": result.target_route,
            "matchedRule": result.matched_rule_id,
            "detectedIntent": signals.get("intent", "unknown"),
            "userTier": signals.get("user_tier", "standard"),
            "messageLength": str(signals.get("message_length", 0)),
            "routingVersion": "v2",
        }

    def _extract_signals(self, event: Dict[str, Any]) -> Dict[str, Any]:
        """Extract routing signals from the API Gateway event."""
        body = event.get("body", {})
        if isinstance(body, str):
            try:
                body = json.loads(body)
            except json.JSONDecodeError:
                body = {}

        headers = event.get("headers", {})
        request_context = event.get("requestContext", {})
        authorizer = request_context.get("authorizer", {})

        message = body.get("message", "")

        return {
            "intent": body.get("intent", authorizer.get("intent", "unknown")),
            "user_tier": headers.get("X-User-Tier", authorizer.get("userTier", "standard")),
            "message_length": len(message),
            "session_id": body.get("sessionId", ""),
            "connection_id": request_context.get("connectionId", ""),
            "has_history": bool(body.get("history")),
            "language_hint": headers.get("Accept-Language", "en"),
        }

    def _evaluate_rule(self, rule: TransformationRule, signals: Dict[str, Any]) -> bool:
        """Evaluate a single rule against extracted signals."""
        value = signals.get(rule.condition_field)
        if value is None:
            return False

        if rule.condition_operator == "equals":
            return str(value) == rule.condition_value
        elif rule.condition_operator == "contains":
            return rule.condition_value in str(value)
        elif rule.condition_operator == "greater_than":
            try:
                return float(value) > float(rule.condition_value)
            except (ValueError, TypeError):
                return False
        elif rule.condition_operator == "regex":
            import re
            return bool(re.search(rule.condition_value, str(value)))

        return False

    def _build_result(
        self,
        rule: TransformationRule,
        event: Dict[str, Any],
        signals: Dict[str, Any],
    ) -> TransformationResult:
        """Build a transformation result from a matched rule."""
        return TransformationResult(
            target_route=rule.target_route,
            matched_rule_id=rule.rule_id,
            transformed_headers=self._build_routing_headers(rule.target_route, signals),
            transformed_body=self._build_routing_body(event, signals),
            routing_metadata={
                "signals": signals,
                "matched": True,
                "rule_priority": rule.priority,
                "rule_description": rule.description,
            },
        )

    def _build_routing_headers(
        self, route: str, signals: Dict[str, Any]
    ) -> Dict[str, str]:
        """Build HTTP headers for the routed request."""
        return {
            "X-Route-Target": route,
            "X-Route-Version": "v2",
            "X-Detected-Intent": signals.get("intent", "unknown"),
            "X-User-Tier": signals.get("user_tier", "standard"),
            "X-Message-Length": str(signals.get("message_length", 0)),
        }

    def _build_routing_body(
        self, event: Dict[str, Any], signals: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Build the transformed request body with routing context."""
        body = event.get("body", {})
        if isinstance(body, str):
            try:
                body = json.loads(body)
            except json.JSONDecodeError:
                body = {}

        body["_routing"] = {
            "signals": signals,
            "timestamp": int(time.time() * 1000) if hasattr(time, 'time') else 0,
        }
        return body

7. Step Functions ASL — Dynamic Routing Workflow

{
  "Comment": "MangaAssist Dynamic Model Routing Workflow",
  "StartAt": "AnalyzeContent",
  "States": {
    "AnalyzeContent": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT:function:MangaAssist-AnalyzeContent",
      "Parameters": {
        "query.$": "$.message",
        "session_id.$": "$.sessionId",
        "conversation_history.$": "$.history"
      },
      "ResultPath": "$.analysis",
      "TimeoutSeconds": 5,
      "Retry": [
        {
          "ErrorEquals": ["Lambda.ServiceException", "Lambda.TooManyRequestsException"],
          "IntervalSeconds": 1,
          "MaxAttempts": 2,
          "BackoffRate": 2
        }
      ],
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "ResultPath": "$.error",
          "Next": "FallbackRouting"
        }
      ],
      "Next": "CheckComplexity"
    },

    "CheckComplexity": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.analysis.complexity_score",
          "NumericGreaterThanEquals": 7,
          "Next": "CheckBudget"
        },
        {
          "Variable": "$.analysis.requires_cultural_context",
          "BooleanEquals": true,
          "Next": "CheckBudget"
        },
        {
          "Variable": "$.analysis.complexity_score",
          "NumericLessThan": 3,
          "Next": "RouteToHaiku"
        }
      ],
      "Default": "CollectMetrics"
    },

    "CheckBudget": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT:function:MangaAssist-CheckBudget",
      "Parameters": {
        "user_id.$": "$.userId",
        "estimated_input_tokens.$": "$.analysis.estimated_input_tokens",
        "estimated_output_tokens.$": "$.analysis.estimated_output_tokens",
        "model_tier": "sonnet"
      },
      "ResultPath": "$.budget",
      "TimeoutSeconds": 3,
      "Next": "BudgetDecision"
    },

    "BudgetDecision": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.budget.approved",
          "BooleanEquals": true,
          "Next": "RouteToSonnet"
        }
      ],
      "Default": "RouteToHaiku"
    },

    "CollectMetrics": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT:function:MangaAssist-CollectMetrics",
      "Parameters": {
        "models": ["anthropic.claude-3-haiku-20240307-v1:0", "anthropic.claude-3-sonnet-20240229-v1:0"],
        "query_type.$": "$.analysis.query_type"
      },
      "ResultPath": "$.metrics",
      "TimeoutSeconds": 3,
      "Retry": [
        {
          "ErrorEquals": ["States.TaskFailed"],
          "IntervalSeconds": 1,
          "MaxAttempts": 1
        }
      ],
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "ResultPath": "$.metricsError",
          "Next": "RouteToHaiku"
        }
      ],
      "Next": "MetricBasedSelection"
    },

    "MetricBasedSelection": {
      "Type": "Choice",
      "Choices": [
        {
          "And": [
            {
              "Variable": "$.metrics.best_model",
              "StringEquals": "sonnet"
            },
            {
              "Variable": "$.analysis.complexity_score",
              "NumericGreaterThanEquals": 5
            }
          ],
          "Next": "RouteToSonnet"
        }
      ],
      "Default": "RouteToHaiku"
    },

    "RouteToSonnet": {
      "Type": "Task",
      "Resource": "arn:aws:states:::bedrock:invokeModel",
      "Parameters": {
        "ModelId": "anthropic.claude-3-sonnet-20240229-v1:0",
        "ContentType": "application/json",
        "Accept": "application/json",
        "Body": {
          "anthropic_version": "bedrock-2023-05-31",
          "max_tokens.$": "$.analysis.max_tokens",
          "messages": [
            {
              "role": "user",
              "content.$": "$.message"
            }
          ],
          "temperature": 0.3
        }
      },
      "ResultPath": "$.modelResponse",
      "TimeoutSeconds": 30,
      "Retry": [
        {
          "ErrorEquals": ["Bedrock.ThrottlingException"],
          "IntervalSeconds": 2,
          "MaxAttempts": 3,
          "BackoffRate": 2
        }
      ],
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "ResultPath": "$.modelError",
          "Next": "FallbackRouting"
        }
      ],
      "Next": "LogDecision"
    },

    "RouteToHaiku": {
      "Type": "Task",
      "Resource": "arn:aws:states:::bedrock:invokeModel",
      "Parameters": {
        "ModelId": "anthropic.claude-3-haiku-20240307-v1:0",
        "ContentType": "application/json",
        "Accept": "application/json",
        "Body": {
          "anthropic_version": "bedrock-2023-05-31",
          "max_tokens": 1024,
          "messages": [
            {
              "role": "user",
              "content.$": "$.message"
            }
          ],
          "temperature": 0.3
        }
      },
      "ResultPath": "$.modelResponse",
      "TimeoutSeconds": 15,
      "Retry": [
        {
          "ErrorEquals": ["Bedrock.ThrottlingException"],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2
        }
      ],
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "ResultPath": "$.modelError",
          "Next": "FallbackRouting"
        }
      ],
      "Next": "LogDecision"
    },

    "FallbackRouting": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT:function:MangaAssist-FallbackResponse",
      "Parameters": {
        "error.$": "$.error",
        "original_message.$": "$.message",
        "session_id.$": "$.sessionId"
      },
      "ResultPath": "$.modelResponse",
      "Next": "LogDecision"
    },

    "LogDecision": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "LogToCloudWatch",
          "States": {
            "LogToCloudWatch": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT:function:MangaAssist-LogRouting",
              "Parameters": {
                "decision_data.$": "$"
              },
              "End": true
            }
          }
        },
        {
          "StartAt": "UpdateMetrics",
          "States": {
            "UpdateMetrics": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT:function:MangaAssist-UpdateMetrics",
              "Parameters": {
                "model_used.$": "$.modelResponse.model",
                "latency_ms.$": "$.modelResponse.latency_ms",
                "tokens.$": "$.modelResponse.usage"
              },
              "End": true
            }
          }
        }
      ],
      "ResultPath": "$.logging",
      "Next": "ReturnResponse"
    },

    "ReturnResponse": {
      "Type": "Pass",
      "Parameters": {
        "response.$": "$.modelResponse.content",
        "model_used.$": "$.modelResponse.model",
        "session_id.$": "$.sessionId"
      },
      "End": true
    }
  }
}

8. Cost Optimization Matrix

Routing Path Model Avg Input Tokens Avg Output Tokens Cost/Query Queries/Day Daily Cost
Fast Haiku (greeting/FAQ) Haiku 200 150 $0.000238 600,000 $142.50
Dynamic → Haiku Haiku 500 400 $0.000625 250,000 $156.25
Dynamic → Sonnet Sonnet 800 600 $0.011400 100,000 $1,140.00
Premium → Sonnet Sonnet 600 500 $0.009300 50,000 $465.00
Totals 1,000,000 $1,903.75

Without routing (all Sonnet): 1M queries x avg $0.01 = $10,000/day

Savings from intelligent routing: ~81%


9. Key Design Decisions Summary

Decision Choice Reasoning
Default model Haiku 85% of queries are simple enough for Haiku at 12x lower cost
Static cache layer Redis + local in-memory Sub-millisecond routing for known intents — critical for 3s target
Dynamic router trigger Ambiguous intents only Avoid analysis overhead for clear-cut routing decisions
Step Functions for routing Express workflows Standard workflows too slow; Express gives < 100ms state transitions
Metric collection Redis counters + DynamoDB Redis for real-time (< 1ms reads); DynamoDB for durable history
API Gateway routing VTL + Lambda authorizer Edge-level routing avoids ECS round-trip for 60% of queries
Fallback chain Sonnet → Haiku → static Graceful degradation: if premium model fails, still serve response
Budget tracking Per-user daily limits Prevents runaway Sonnet costs from single-user abuse

10. References

Resource Link
Amazon Bedrock Model Invocation https://docs.aws.amazon.com/bedrock/latest/userguide/model-invocation.html
AWS Step Functions Express Workflows https://docs.aws.amazon.com/step-functions/latest/dg/concepts-standard-vs-express.html
API Gateway Request Mapping Templates https://docs.aws.amazon.com/apigateway/latest/developerguide/request-response-data-mappings.html
Claude 3 Model Card https://docs.anthropic.com/en/docs/about-claude/models
API Gateway WebSocket APIs https://docs.aws.amazon.com/apigateway/latest/developerguide/apigateway-websocket-api.html
ElastiCache Best Practices https://docs.aws.amazon.com/AmazonElastiCache/latest/red-ug/BestPractices.html
DynamoDB Design Patterns https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/bp-general-nosql-design.html