Intelligent Model Routing Architecture
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Field | Value |
|---|---|
| Certification | AWS AI Practitioner (AIP-C01) |
| Domain | 2 — Implementation and Integration of Foundation Models |
| Task | 2.4 — Design model deployment and inference strategies |
| Skill | 2.4.4 — Develop intelligent model routing systems to optimize model selection |
| Coverage | Static routing configurations, Step Functions for dynamic content-based routing to specialized FMs, intelligent model routing based on metrics, API Gateway with request transformations for routing logic |
1. Intelligent Model Routing — Mindmap
mindmap
root((Intelligent Model Routing))
Static Routing
Intent-Model Mapping
Simple FAQ → Haiku
Complex Analysis → Sonnet
Translation → Specialized FM
Configuration Tables
DynamoDB Routing Table
Environment Variables
Feature Flags
Rule-Based Selection
Regex Pattern Matching
Keyword Detection
User Tier Mapping
Dynamic Routing
Content-Based Analysis
Complexity Scoring
Token Length Estimation
Language Detection
Step Functions Orchestration
State Machine Workflows
Parallel Model Invocation
Conditional Branching
Real-Time Adaptation
Load Balancing
Failover Chains
Circuit Breaker Patterns
Metric-Based Selection
Latency Tracking
P50/P95/P99 Percentiles
Time-to-First-Token
End-to-End Response Time
Cost Optimization
Token Usage Tracking
Budget Allocation
Cost-per-Query Scoring
Quality Scores
User Satisfaction Ratings
Hallucination Detection
Relevance Scoring
API Gateway Routing
Request Transformation
Header Injection
Body Modification
Query Parameter Routing
Lambda Authorizers
Token Validation
Route Determination
Context Enrichment
Integration Mappings
VTL Templates
Response Shaping
Error Routing
2. Architecture Flowchart — MangaAssist Routing Decision Flow
flowchart TB
subgraph Ingress["API Gateway WebSocket"]
WS[WebSocket Connection]
RT[Request Transformer]
LA[Lambda Authorizer]
end
subgraph Router["ECS Fargate — Routing Orchestrator"]
RC[Route Controller]
SS[Static Router]
DS[Dynamic Router]
MS[Metric Selector]
end
subgraph Analysis["Content Analysis Pipeline"]
CX[Complexity Scorer]
LD[Language Detector]
TE[Token Estimator]
IC[Intent Classifier]
end
subgraph StepFn["Step Functions — Dynamic Routing Workflow"]
SF1[Analyze Content]
SF2[Check Metrics]
SF3[Select Model]
SF4[Invoke FM]
SF5[Evaluate Response]
end
subgraph Models["Bedrock Foundation Models"]
HAIKU[Claude 3 Haiku<br/>$0.25/$1.25 per 1M]
SONNET[Claude 3 Sonnet<br/>$3/$15 per 1M]
SPEC[Specialized FM<br/>Translation/Summarization]
end
subgraph Metrics["Metric Collection"]
CW[CloudWatch Metrics]
DDB[DynamoDB Metrics Table]
REDIS[ElastiCache Redis<br/>Real-Time Counters]
end
subgraph Storage["Routing Configuration"]
RTABLE[DynamoDB Routing Table]
CACHE[Redis Config Cache]
FEAT[Feature Flags]
end
WS --> RT
RT --> LA
LA --> RC
RC --> SS
RC --> DS
RC --> MS
SS --> RTABLE
SS --> CACHE
DS --> CX
DS --> LD
DS --> TE
DS --> IC
IC -->|Complex Query| SF1
SF1 --> SF2
SF2 --> SF3
SF3 --> SF4
SF4 --> SF5
SF3 -->|Simple| HAIKU
SF3 -->|Complex| SONNET
SF3 -->|Specialized| SPEC
MS --> CW
MS --> DDB
MS --> REDIS
SF5 -->|Log| CW
SF5 -->|Store| DDB
style HAIKU fill:#4CAF50,color:#fff
style SONNET fill:#2196F3,color:#fff
style SPEC fill:#FF9800,color:#fff
style RC fill:#9C27B0,color:#fff
3. Static Routing Configurations
Static routing maps known intents, user tiers, and request types directly to foundation models without runtime analysis. This is the fastest routing path — zero analysis overhead.
3.1 Intent-to-Model Mapping Table
| Intent Category | Sub-Intent | Routed Model | Rationale |
|---|---|---|---|
| Simple FAQ | Store hours, shipping info, return policy | Haiku | Low complexity, cost-efficient |
| Product Search | Title lookup, author search, genre browse | Haiku | Structured query, fast response needed |
| Product Detail | Synopsis generation, similar recommendations | Sonnet | Requires nuanced understanding |
| Translation | JP→EN manga title translation | Haiku | Short text, pattern-based |
| Translation | Full synopsis JP→EN with cultural context | Sonnet | Needs cultural nuance |
| Order Support | Order status, tracking info | Haiku | Template-based responses |
| Complex Analysis | Manga comparison, thematic analysis | Sonnet | Deep reasoning required |
| Content Advisory | Age rating explanation, content warnings | Sonnet | Sensitivity requires quality |
| Greeting/Chitchat | Hello, thanks, goodbye | Haiku | Minimal generation needed |
| Escalation | Complaint, refund dispute | Sonnet | Customer satisfaction critical |
3.2 DynamoDB Routing Table Schema
Table: MangaAssist-RoutingConfig
├── PK: ROUTE#<intent_category>
├── SK: CONFIG#<sub_intent>
├── model_id: string (e.g., "anthropic.claude-3-haiku-20240307-v1:0")
├── fallback_model_id: string
├── max_tokens: number
├── temperature: number
├── priority: number (1=highest)
├── enabled: boolean
├── ttl: number (epoch seconds)
├── updated_at: string (ISO 8601)
└── updated_by: string
3.3 Static Router Implementation
"""
static_router.py — MangaAssist Static Model Router
Maps intents and request metadata directly to pre-configured model selections.
Uses DynamoDB routing table with Redis caching for sub-millisecond lookups.
"""
import json
import time
import hashlib
import logging
from typing import Optional, Dict, Any, List, Tuple
from dataclasses import dataclass, field
from enum import Enum
import boto3
from botocore.config import Config as BotoConfig
logger = logging.getLogger(__name__)
class ModelTier(Enum):
"""Available model tiers with their Bedrock model IDs and pricing."""
HAIKU = "anthropic.claude-3-haiku-20240307-v1:0"
SONNET = "anthropic.claude-3-sonnet-20240229-v1:0"
@property
def input_cost_per_1m(self) -> float:
costs = {
"HAIKU": 0.25,
"SONNET": 3.00,
}
return costs[self.name]
@property
def output_cost_per_1m(self) -> float:
costs = {
"HAIKU": 1.25,
"SONNET": 15.00,
}
return costs[self.name]
@dataclass
class RouteConfig:
"""Configuration for a single routing rule."""
intent_category: str
sub_intent: str
model_id: str
fallback_model_id: str
max_tokens: int = 1024
temperature: float = 0.3
priority: int = 5
enabled: bool = True
ttl: int = 0
updated_at: str = ""
updated_by: str = "system"
@property
def cache_key(self) -> str:
return f"route:{self.intent_category}:{self.sub_intent}"
@dataclass
class RoutingDecision:
"""Result of a routing decision."""
model_id: str
fallback_model_id: str
max_tokens: int
temperature: float
route_source: str # "static", "cache", "default"
decision_time_ms: float
intent_category: str
sub_intent: str
metadata: Dict[str, Any] = field(default_factory=dict)
class StaticRouter:
"""
Static model router for MangaAssist.
Performs zero-analysis routing based on pre-configured intent-to-model
mappings stored in DynamoDB with Redis caching. Designed for sub-millisecond
routing decisions on known intent categories.
Architecture:
1. Check Redis cache for routing config (< 0.1ms)
2. Fall back to DynamoDB lookup (< 5ms)
3. Fall back to hardcoded defaults (< 0.01ms)
Usage:
router = StaticRouter(
table_name="MangaAssist-RoutingConfig",
redis_client=redis_conn,
)
decision = router.route(intent="product_search", sub_intent="title_lookup")
# decision.model_id → "anthropic.claude-3-haiku-20240307-v1:0"
"""
# Hardcoded defaults — last resort if DynamoDB and Redis both fail
DEFAULT_ROUTES: Dict[str, str] = {
"simple_faq": ModelTier.HAIKU.value,
"product_search": ModelTier.HAIKU.value,
"product_detail": ModelTier.SONNET.value,
"translation_simple": ModelTier.HAIKU.value,
"translation_complex": ModelTier.SONNET.value,
"order_support": ModelTier.HAIKU.value,
"complex_analysis": ModelTier.SONNET.value,
"content_advisory": ModelTier.SONNET.value,
"greeting": ModelTier.HAIKU.value,
"escalation": ModelTier.SONNET.value,
}
FALLBACK_MODEL = ModelTier.HAIKU.value
def __init__(
self,
table_name: str = "MangaAssist-RoutingConfig",
redis_client=None,
cache_ttl_seconds: int = 300,
region: str = "ap-northeast-1",
):
self.table_name = table_name
self.redis = redis_client
self.cache_ttl = cache_ttl_seconds
boto_config = BotoConfig(
region_name=region,
retries={"max_attempts": 2, "mode": "adaptive"},
)
self.dynamodb = boto3.resource("dynamodb", config=boto_config)
self.table = self.dynamodb.Table(table_name)
# Local in-memory cache for ultra-fast lookups
self._local_cache: Dict[str, Tuple[RouteConfig, float]] = {}
self._local_cache_ttl = 60 # seconds
logger.info(
"StaticRouter initialized | table=%s | cache_ttl=%ds",
table_name,
cache_ttl_seconds,
)
def route(
self,
intent: str,
sub_intent: str = "default",
user_tier: str = "standard",
metadata: Optional[Dict[str, Any]] = None,
) -> RoutingDecision:
"""
Resolve a routing decision for the given intent.
Lookup order:
1. Local in-memory cache
2. Redis distributed cache
3. DynamoDB routing table
4. Hardcoded defaults
Args:
intent: The classified intent category (e.g., "product_search")
sub_intent: Optional sub-intent for finer routing
user_tier: User tier for priority routing (e.g., "premium")
metadata: Additional context for the routing decision
Returns:
RoutingDecision with selected model and parameters
"""
start = time.monotonic()
cache_key = f"route:{intent}:{sub_intent}"
# --- Layer 1: Local in-memory cache ---
config = self._check_local_cache(cache_key)
if config:
elapsed = (time.monotonic() - start) * 1000
logger.debug("Route resolved from local cache | key=%s | %.2fms", cache_key, elapsed)
return self._build_decision(config, "local_cache", elapsed, intent, sub_intent, metadata)
# --- Layer 2: Redis distributed cache ---
config = self._check_redis_cache(cache_key)
if config:
self._set_local_cache(cache_key, config)
elapsed = (time.monotonic() - start) * 1000
logger.debug("Route resolved from Redis | key=%s | %.2fms", cache_key, elapsed)
return self._build_decision(config, "redis_cache", elapsed, intent, sub_intent, metadata)
# --- Layer 3: DynamoDB lookup ---
config = self._lookup_dynamodb(intent, sub_intent)
if config:
self._set_local_cache(cache_key, config)
self._set_redis_cache(cache_key, config)
elapsed = (time.monotonic() - start) * 1000
logger.debug("Route resolved from DynamoDB | key=%s | %.2fms", cache_key, elapsed)
return self._build_decision(config, "dynamodb", elapsed, intent, sub_intent, metadata)
# --- Layer 4: Hardcoded defaults ---
elapsed = (time.monotonic() - start) * 1000
logger.warning("Route falling back to defaults | intent=%s | %.2fms", intent, elapsed)
return self._build_default_decision(intent, sub_intent, elapsed, metadata)
def _check_local_cache(self, key: str) -> Optional[RouteConfig]:
"""Check local in-memory cache with TTL expiry."""
if key in self._local_cache:
config, cached_at = self._local_cache[key]
if time.time() - cached_at < self._local_cache_ttl:
return config
del self._local_cache[key]
return None
def _set_local_cache(self, key: str, config: RouteConfig) -> None:
"""Store route config in local cache."""
self._local_cache[key] = (config, time.time())
def _check_redis_cache(self, key: str) -> Optional[RouteConfig]:
"""Check Redis distributed cache."""
if not self.redis:
return None
try:
data = self.redis.get(key)
if data:
parsed = json.loads(data)
return RouteConfig(**parsed)
except Exception as e:
logger.warning("Redis cache check failed | key=%s | error=%s", key, e)
return None
def _set_redis_cache(self, key: str, config: RouteConfig) -> None:
"""Store route config in Redis with TTL."""
if not self.redis:
return
try:
data = json.dumps({
"intent_category": config.intent_category,
"sub_intent": config.sub_intent,
"model_id": config.model_id,
"fallback_model_id": config.fallback_model_id,
"max_tokens": config.max_tokens,
"temperature": config.temperature,
"priority": config.priority,
"enabled": config.enabled,
})
self.redis.setex(key, self.cache_ttl, data)
except Exception as e:
logger.warning("Redis cache set failed | key=%s | error=%s", key, e)
def _lookup_dynamodb(self, intent: str, sub_intent: str) -> Optional[RouteConfig]:
"""Query DynamoDB for routing configuration."""
try:
response = self.table.get_item(
Key={
"PK": f"ROUTE#{intent}",
"SK": f"CONFIG#{sub_intent}",
},
ConsistentRead=False,
)
item = response.get("Item")
if item and item.get("enabled", True):
return RouteConfig(
intent_category=intent,
sub_intent=sub_intent,
model_id=item["model_id"],
fallback_model_id=item.get("fallback_model_id", self.FALLBACK_MODEL),
max_tokens=int(item.get("max_tokens", 1024)),
temperature=float(item.get("temperature", 0.3)),
priority=int(item.get("priority", 5)),
enabled=bool(item.get("enabled", True)),
)
except Exception as e:
logger.error("DynamoDB lookup failed | intent=%s | error=%s", intent, e)
return None
def _build_decision(
self,
config: RouteConfig,
source: str,
elapsed_ms: float,
intent: str,
sub_intent: str,
metadata: Optional[Dict[str, Any]],
) -> RoutingDecision:
"""Build a RoutingDecision from a RouteConfig."""
return RoutingDecision(
model_id=config.model_id,
fallback_model_id=config.fallback_model_id,
max_tokens=config.max_tokens,
temperature=config.temperature,
route_source=source,
decision_time_ms=elapsed_ms,
intent_category=intent,
sub_intent=sub_intent,
metadata=metadata or {},
)
def _build_default_decision(
self,
intent: str,
sub_intent: str,
elapsed_ms: float,
metadata: Optional[Dict[str, Any]],
) -> RoutingDecision:
"""Build a RoutingDecision from hardcoded defaults."""
model_id = self.DEFAULT_ROUTES.get(intent, self.FALLBACK_MODEL)
return RoutingDecision(
model_id=model_id,
fallback_model_id=self.FALLBACK_MODEL,
max_tokens=1024,
temperature=0.3,
route_source="hardcoded_default",
decision_time_ms=elapsed_ms,
intent_category=intent,
sub_intent=sub_intent,
metadata=metadata or {},
)
def bulk_load_routes(self) -> int:
"""
Pre-load all routing configs into local and Redis caches.
Call during ECS task startup for warm cache.
Returns:
Number of routes loaded
"""
count = 0
try:
response = self.table.scan(
FilterExpression="begins_with(PK, :prefix)",
ExpressionAttributeValues={":prefix": "ROUTE#"},
)
for item in response.get("Items", []):
intent = item["PK"].replace("ROUTE#", "")
sub_intent = item["SK"].replace("CONFIG#", "")
config = RouteConfig(
intent_category=intent,
sub_intent=sub_intent,
model_id=item["model_id"],
fallback_model_id=item.get("fallback_model_id", self.FALLBACK_MODEL),
max_tokens=int(item.get("max_tokens", 1024)),
temperature=float(item.get("temperature", 0.3)),
priority=int(item.get("priority", 5)),
enabled=bool(item.get("enabled", True)),
)
key = config.cache_key
self._set_local_cache(key, config)
self._set_redis_cache(key, config)
count += 1
logger.info("Bulk loaded %d routes into cache", count)
except Exception as e:
logger.error("Bulk load failed | error=%s", e)
return count
def invalidate_route(self, intent: str, sub_intent: str = "default") -> None:
"""Invalidate a specific route from all cache layers."""
key = f"route:{intent}:{sub_intent}"
self._local_cache.pop(key, None)
if self.redis:
try:
self.redis.delete(key)
except Exception as e:
logger.warning("Redis invalidation failed | key=%s | error=%s", key, e)
logger.info("Route invalidated | intent=%s | sub_intent=%s", intent, sub_intent)
4. Step Functions Dynamic Routing
Dynamic routing uses AWS Step Functions to orchestrate multi-step content analysis before selecting a model. This path handles queries where the optimal model is not obvious from the intent alone.
4.1 Dynamic Routing Decision Tree
flowchart TD
START([Incoming Query]) --> CLASSIFY[Classify Intent]
CLASSIFY -->|Known Simple| STATIC[Static Route → Haiku]
CLASSIFY -->|Known Complex| SONNET_DIRECT[Static Route → Sonnet]
CLASSIFY -->|Ambiguous| ANALYZE[Dynamic Analysis]
ANALYZE --> SCORE[Compute Complexity Score]
SCORE --> CHECK_SCORE{Score >= 7?}
CHECK_SCORE -->|Yes| CHECK_BUDGET{Budget Available?}
CHECK_BUDGET -->|Yes| ROUTE_SONNET[Route → Sonnet]
CHECK_BUDGET -->|No| ROUTE_HAIKU_WARN[Route → Haiku + Log Warning]
CHECK_SCORE -->|No| CHECK_LATENCY{Latency Budget?}
CHECK_LATENCY -->|Tight < 1s| ROUTE_HAIKU[Route → Haiku]
CHECK_LATENCY -->|Normal| CHECK_QUALITY{Quality Priority?}
CHECK_QUALITY -->|High| ROUTE_SONNET2[Route → Sonnet]
CHECK_QUALITY -->|Normal| ROUTE_HAIKU2[Route → Haiku]
STATIC --> INVOKE[Invoke Bedrock FM]
SONNET_DIRECT --> INVOKE
ROUTE_SONNET --> INVOKE
ROUTE_HAIKU_WARN --> INVOKE
ROUTE_HAIKU --> INVOKE
ROUTE_SONNET2 --> INVOKE
ROUTE_HAIKU2 --> INVOKE
INVOKE --> EVAL[Evaluate Response Quality]
EVAL --> LOG[Log Metrics + Decision]
LOG --> RESPOND([Return Response])
style ROUTE_SONNET fill:#2196F3,color:#fff
style ROUTE_SONNET2 fill:#2196F3,color:#fff
style SONNET_DIRECT fill:#2196F3,color:#fff
style ROUTE_HAIKU fill:#4CAF50,color:#fff
style ROUTE_HAIKU2 fill:#4CAF50,color:#fff
style ROUTE_HAIKU_WARN fill:#FF9800,color:#fff
style STATIC fill:#4CAF50,color:#fff
4.2 Dynamic Model Router Implementation
"""
dynamic_router.py — MangaAssist Dynamic Content-Based Model Router
Analyzes incoming query content in real time to determine the optimal
foundation model. Uses complexity scoring, language detection, token
estimation, and budget awareness.
"""
import re
import time
import logging
from typing import Dict, Any, List, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum
logger = logging.getLogger(__name__)
class ComplexityLevel(Enum):
"""Query complexity classification levels."""
TRIVIAL = 1 # Greeting, yes/no questions
SIMPLE = 3 # Single-fact lookups, basic FAQ
MODERATE = 5 # Multi-step queries, comparisons
COMPLEX = 7 # Analysis, reasoning, multi-turn context
EXPERT = 9 # Deep domain expertise, creative generation
@dataclass
class ContentAnalysis:
"""Result of content analysis for routing decisions."""
complexity_score: float # 0-10 scale
complexity_level: ComplexityLevel
estimated_input_tokens: int
estimated_output_tokens: int
language_detected: str # ISO 639-1 code
contains_japanese: bool
requires_cultural_context: bool
query_type: str # "factual", "analytical", "creative", "conversational"
entity_count: int
has_comparison: bool
has_negation: bool
multi_turn_depth: int
analysis_time_ms: float
@dataclass
class RoutingContext:
"""Full context for making a routing decision."""
content_analysis: ContentAnalysis
user_tier: str
session_query_count: int
daily_budget_remaining_usd: float
latency_budget_ms: int
quality_priority: str # "cost", "balanced", "quality"
previous_model_used: Optional[str] = None
@dataclass
class DynamicRoutingDecision:
"""Result of dynamic routing analysis."""
model_id: str
fallback_model_id: str
max_tokens: int
temperature: float
reasoning: str
confidence: float # 0-1
estimated_cost_usd: float
estimated_latency_ms: int
route_path: str
decision_time_ms: float
content_analysis: ContentAnalysis
metadata: Dict[str, Any] = field(default_factory=dict)
class DynamicModelRouter:
"""
Dynamic content-based model router for MangaAssist.
Performs real-time content analysis to determine the optimal model
for each query. Balances cost, latency, and quality based on
configurable policies and real-time budget constraints.
Decision Factors:
- Complexity score (vocabulary, structure, entities)
- Token estimation (input size → output prediction)
- Language detection (Japanese content needs higher quality)
- Budget constraints (daily spend limits per tier)
- Latency requirements (tight budgets favor Haiku)
- Quality priority (user tier, query importance)
Usage:
router = DynamicModelRouter(budget_tracker=tracker)
analysis = router.analyze_content(query, conversation_history)
decision = router.route(analysis, user_context)
"""
HAIKU_MODEL = "anthropic.claude-3-haiku-20240307-v1:0"
SONNET_MODEL = "anthropic.claude-3-sonnet-20240229-v1:0"
# Cost per 1M tokens
HAIKU_INPUT_COST = 0.25
HAIKU_OUTPUT_COST = 1.25
SONNET_INPUT_COST = 3.00
SONNET_OUTPUT_COST = 15.00
# Complexity indicators
COMPLEX_PATTERNS = [
r"\b(compare|contrast|analyze|explain why|what if)\b",
r"\b(difference between|similarities|pros and cons)\b",
r"\b(recommend|suggest|which .+ should)\b",
r"\b(history of|evolution of|influence on)\b",
r"\b(theme|symbolism|metaphor|narrative)\b",
]
SIMPLE_PATTERNS = [
r"\b(what is|where is|when did|how much|price of)\b",
r"\b(yes|no|ok|thanks|hello|hi|bye)\b",
r"\b(order status|tracking|shipping|return)\b",
]
JAPANESE_PATTERN = re.compile(r"[\u3040-\u309F\u30A0-\u30FF\u4E00-\u9FFF]")
CULTURAL_KEYWORDS = [
"manga", "anime", "otaku", "shounen", "shoujo", "seinen", "josei",
"isekai", "mecha", "slice of life", "tankoubon", "mangaka",
]
def __init__(
self,
budget_tracker=None,
complexity_threshold_sonnet: float = 6.5,
max_haiku_tokens: int = 2048,
max_sonnet_tokens: int = 4096,
):
self.budget_tracker = budget_tracker
self.sonnet_threshold = complexity_threshold_sonnet
self.max_haiku_tokens = max_haiku_tokens
self.max_sonnet_tokens = max_sonnet_tokens
# Compile patterns once
self._complex_re = [re.compile(p, re.IGNORECASE) for p in self.COMPLEX_PATTERNS]
self._simple_re = [re.compile(p, re.IGNORECASE) for p in self.SIMPLE_PATTERNS]
logger.info(
"DynamicModelRouter initialized | sonnet_threshold=%.1f",
complexity_threshold_sonnet,
)
def analyze_content(
self,
query: str,
conversation_history: Optional[List[Dict[str, str]]] = None,
) -> ContentAnalysis:
"""
Analyze query content to determine complexity and characteristics.
Args:
query: The user's current message
conversation_history: Previous messages in the session
Returns:
ContentAnalysis with all scoring dimensions
"""
start = time.monotonic()
history = conversation_history or []
# --- Token estimation ---
input_tokens = self._estimate_tokens(query)
context_tokens = sum(self._estimate_tokens(m.get("content", "")) for m in history)
total_input = input_tokens + context_tokens
estimated_output = min(input_tokens * 3, 2048)
# --- Language detection ---
has_japanese = bool(self.JAPANESE_PATTERN.search(query))
language = "ja" if has_japanese else "en"
# --- Cultural context ---
requires_cultural = any(kw in query.lower() for kw in self.CULTURAL_KEYWORDS)
# --- Complexity scoring ---
complexity = self._compute_complexity(query, history)
# --- Query type classification ---
query_type = self._classify_query_type(query)
# --- Entity counting ---
entity_count = self._count_entities(query)
# --- Structural features ---
has_comparison = bool(re.search(r"\b(compare|vs|versus|better|worse|difference)\b", query, re.IGNORECASE))
has_negation = bool(re.search(r"\b(not|never|don't|doesn't|isn't|aren't|won't)\b", query, re.IGNORECASE))
elapsed = (time.monotonic() - start) * 1000
level = self._score_to_level(complexity)
return ContentAnalysis(
complexity_score=complexity,
complexity_level=level,
estimated_input_tokens=total_input,
estimated_output_tokens=estimated_output,
language_detected=language,
contains_japanese=has_japanese,
requires_cultural_context=requires_cultural,
query_type=query_type,
entity_count=entity_count,
has_comparison=has_comparison,
has_negation=has_negation,
multi_turn_depth=len(history),
analysis_time_ms=elapsed,
)
def route(
self,
analysis: ContentAnalysis,
context: RoutingContext,
) -> DynamicRoutingDecision:
"""
Make a routing decision based on content analysis and context.
Decision Logic:
1. If complexity >= threshold AND budget allows → Sonnet
2. If Japanese cultural content AND quality priority → Sonnet
3. If latency budget tight (< 1000ms) → Haiku
4. If daily budget nearly exhausted → Haiku
5. Default → Haiku (cost optimization)
Args:
analysis: Content analysis results
context: User and system context
Returns:
DynamicRoutingDecision with full reasoning
"""
start = time.monotonic()
reasoning_parts: List[str] = []
# --- Decision scoring ---
sonnet_score = 0.0
haiku_score = 0.0
# Complexity factor (weight: 40%)
if analysis.complexity_score >= self.sonnet_threshold:
sonnet_score += 4.0
reasoning_parts.append(
f"Complexity {analysis.complexity_score:.1f} >= threshold {self.sonnet_threshold}"
)
else:
haiku_score += 4.0
reasoning_parts.append(
f"Complexity {analysis.complexity_score:.1f} < threshold {self.sonnet_threshold}"
)
# Cultural context factor (weight: 20%)
if analysis.requires_cultural_context and analysis.contains_japanese:
sonnet_score += 2.0
reasoning_parts.append("Japanese cultural content detected — quality boost")
else:
haiku_score += 1.0
# Latency factor (weight: 20%)
if context.latency_budget_ms < 1000:
haiku_score += 2.0
reasoning_parts.append(f"Tight latency budget ({context.latency_budget_ms}ms) — favoring Haiku")
elif context.latency_budget_ms < 2000:
haiku_score += 1.0
# Budget factor (weight: 20%)
est_sonnet_cost = self._estimate_cost(
analysis.estimated_input_tokens,
analysis.estimated_output_tokens,
self.SONNET_INPUT_COST,
self.SONNET_OUTPUT_COST,
)
if context.daily_budget_remaining_usd < est_sonnet_cost * 10:
haiku_score += 2.0
reasoning_parts.append("Budget nearly exhausted — routing to Haiku")
# Quality priority override
if context.quality_priority == "quality":
sonnet_score += 1.5
reasoning_parts.append("Quality priority set — boosting Sonnet score")
elif context.quality_priority == "cost":
haiku_score += 1.5
reasoning_parts.append("Cost priority set — boosting Haiku score")
# Premium user tier boost
if context.user_tier == "premium":
sonnet_score += 1.0
reasoning_parts.append("Premium user tier — Sonnet preference")
# --- Final decision ---
if sonnet_score > haiku_score:
model_id = self.SONNET_MODEL
fallback = self.HAIKU_MODEL
max_tokens = self.max_sonnet_tokens
est_cost = est_sonnet_cost
est_latency = 2000
else:
model_id = self.HAIKU_MODEL
fallback = self.SONNET_MODEL
max_tokens = self.max_haiku_tokens
est_cost = self._estimate_cost(
analysis.estimated_input_tokens,
analysis.estimated_output_tokens,
self.HAIKU_INPUT_COST,
self.HAIKU_OUTPUT_COST,
)
est_latency = 800
confidence = abs(sonnet_score - haiku_score) / max(sonnet_score + haiku_score, 1)
elapsed = (time.monotonic() - start) * 1000
decision = DynamicRoutingDecision(
model_id=model_id,
fallback_model_id=fallback,
max_tokens=max_tokens,
temperature=0.3 if analysis.query_type == "factual" else 0.7,
reasoning=" | ".join(reasoning_parts),
confidence=min(confidence, 1.0),
estimated_cost_usd=est_cost,
estimated_latency_ms=est_latency,
route_path="dynamic_content_analysis",
decision_time_ms=elapsed,
content_analysis=analysis,
)
logger.info(
"Dynamic route decision | model=%s | score=S:%.1f/H:%.1f | confidence=%.2f | %.2fms",
model_id.split(".")[-1][:20],
sonnet_score,
haiku_score,
confidence,
elapsed,
)
return decision
def _compute_complexity(
self, query: str, history: List[Dict[str, str]]
) -> float:
"""
Compute a 0-10 complexity score for the query.
Scoring dimensions:
- Vocabulary complexity (word length, rare words)
- Structural complexity (sentence count, question depth)
- Pattern matching (complex vs simple patterns)
- Context dependency (multi-turn depth)
- Entity density
"""
score = 0.0
words = query.split()
word_count = len(words)
# Word count factor
if word_count > 50:
score += 2.0
elif word_count > 20:
score += 1.0
elif word_count < 5:
score -= 1.0
# Average word length (proxy for vocabulary complexity)
if words:
avg_len = sum(len(w) for w in words) / len(words)
if avg_len > 7:
score += 1.5
elif avg_len > 5:
score += 0.5
# Complex pattern matches
complex_matches = sum(1 for p in self._complex_re if p.search(query))
score += min(complex_matches * 1.0, 3.0)
# Simple pattern penalty
simple_matches = sum(1 for p in self._simple_re if p.search(query))
score -= min(simple_matches * 0.5, 2.0)
# Multi-turn depth bonus
score += min(len(history) * 0.3, 2.0)
# Question complexity
question_marks = query.count("?")
if question_marks > 2:
score += 1.0
# Clamp to 0-10
return max(0.0, min(10.0, score + 3.0))
def _classify_query_type(self, query: str) -> str:
"""Classify query into factual, analytical, creative, or conversational."""
q_lower = query.lower()
if any(kw in q_lower for kw in ["analyze", "compare", "why", "how does", "explain"]):
return "analytical"
if any(kw in q_lower for kw in ["write", "create", "generate", "story", "describe"]):
return "creative"
if any(kw in q_lower for kw in ["hi", "hello", "thanks", "bye", "how are you"]):
return "conversational"
return "factual"
def _count_entities(self, query: str) -> int:
"""Count approximate entity mentions (capitalized words, quoted terms)."""
caps = re.findall(r"\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b", query)
quoted = re.findall(r'"[^"]+"|\'[^\']+\'', query)
return len(caps) + len(quoted)
def _estimate_tokens(self, text: str) -> int:
"""Estimate token count (rough: 1 token per 4 characters for English, 1.5 per char for Japanese)."""
jp_chars = len(self.JAPANESE_PATTERN.findall(text))
en_chars = len(text) - jp_chars
return int(en_chars / 4 + jp_chars / 1.5)
def _estimate_cost(
self, input_tokens: int, output_tokens: int,
input_price: float, output_price: float,
) -> float:
"""Estimate cost in USD."""
return (input_tokens * input_price + output_tokens * output_price) / 1_000_000
@staticmethod
def _score_to_level(score: float) -> ComplexityLevel:
"""Map numeric score to complexity level."""
if score <= 2:
return ComplexityLevel.TRIVIAL
if score <= 4:
return ComplexityLevel.SIMPLE
if score <= 6:
return ComplexityLevel.MODERATE
if score <= 8:
return ComplexityLevel.COMPLEX
return ComplexityLevel.EXPERT
5. Metric-Based Model Selection
Metric-based routing uses real-time performance data to dynamically adjust model selection. If Sonnet latency spikes, the system can temporarily route more queries to Haiku. If Haiku quality drops for certain query types, the system shifts to Sonnet.
5.1 Metrics Collection Architecture
flowchart LR
subgraph Sources["Metric Sources"]
BED[Bedrock InvokeModel<br/>Latency + Tokens]
CW[CloudWatch<br/>P50/P95/P99]
USR[User Feedback<br/>Thumbs Up/Down]
QA[Quality Eval<br/>Relevance Scores]
end
subgraph Collection["Collection Pipeline"]
KDS[Kinesis Data Stream]
FH[Firehose]
LAMBDA[Lambda Aggregator]
end
subgraph Storage["Metric Storage"]
TS[DynamoDB<br/>Time-Series Metrics]
RED[ElastiCache Redis<br/>Real-Time Counters]
S3[S3<br/>Historical Archive]
end
subgraph Selector["Metric-Based Selector"]
AGG[Metric Aggregator]
RANK[Model Ranker]
DECIDE[Route Decision]
end
BED --> KDS
CW --> LAMBDA
USR --> KDS
QA --> KDS
KDS --> FH
KDS --> LAMBDA
LAMBDA --> TS
LAMBDA --> RED
FH --> S3
RED --> AGG
TS --> AGG
AGG --> RANK
RANK --> DECIDE
5.2 Metric-Based Selector Implementation
"""
metric_selector.py — MangaAssist Metric-Based Model Selector
Uses real-time performance metrics to dynamically rank and select models.
Factors: latency percentiles, cost tracking, quality scores, error rates.
"""
import time
import json
import logging
from typing import Dict, Any, List, Optional, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
@dataclass
class ModelMetrics:
"""Real-time metrics for a single model."""
model_id: str
p50_latency_ms: float = 0.0
p95_latency_ms: float = 0.0
p99_latency_ms: float = 0.0
avg_input_tokens: float = 0.0
avg_output_tokens: float = 0.0
error_rate_pct: float = 0.0
throttle_rate_pct: float = 0.0
quality_score: float = 0.0 # 0-1 scale
user_satisfaction: float = 0.0 # 0-1 scale
cost_per_query_usd: float = 0.0
queries_last_hour: int = 0
last_updated: str = ""
@dataclass
class MetricWeight:
"""Configurable weights for multi-objective model ranking."""
latency: float = 0.30
cost: float = 0.25
quality: float = 0.30
reliability: float = 0.15
def validate(self) -> bool:
total = self.latency + self.cost + self.quality + self.reliability
return abs(total - 1.0) < 0.01
@dataclass
class ModelRanking:
"""Ranked model with composite score breakdown."""
model_id: str
composite_score: float
latency_score: float
cost_score: float
quality_score: float
reliability_score: float
rank: int
metrics: ModelMetrics
class MetricBasedSelector:
"""
Metric-based model selector for MangaAssist.
Maintains real-time performance metrics for all available models
and ranks them using configurable multi-objective scoring. Supports
dynamic weight adjustment based on system conditions (e.g., increase
cost weight during budget pressure, increase latency weight during
peak hours).
Scoring Formula:
composite = (latency_score * w_latency) +
(cost_score * w_cost) +
(quality_score * w_quality) +
(reliability_score * w_reliability)
All individual scores are normalized to 0-1 where 1 = best.
Usage:
selector = MetricBasedSelector(redis_client=redis_conn)
ranking = selector.rank_models(query_type="analytical")
best = ranking[0] # Highest composite score
"""
MODELS = [
"anthropic.claude-3-haiku-20240307-v1:0",
"anthropic.claude-3-sonnet-20240229-v1:0",
]
# Baseline expectations for normalization
LATENCY_BASELINE_MS = 3000.0 # 3s target
COST_BASELINE_USD = 0.001 # $0.001 per query target
QUALITY_BASELINE = 0.8 # 80% quality target
ERROR_RATE_THRESHOLD = 5.0 # 5% error rate threshold
def __init__(
self,
redis_client=None,
dynamodb_table=None,
weights: Optional[MetricWeight] = None,
metric_window_minutes: int = 15,
):
self.redis = redis_client
self.dynamodb_table = dynamodb_table
self.weights = weights or MetricWeight()
self.metric_window = metric_window_minutes
if not self.weights.validate():
raise ValueError("Metric weights must sum to 1.0")
# In-memory metric cache
self._metrics_cache: Dict[str, Tuple[ModelMetrics, float]] = {}
self._cache_ttl = 30 # seconds
logger.info(
"MetricBasedSelector initialized | weights=L:%.2f/C:%.2f/Q:%.2f/R:%.2f | window=%dm",
self.weights.latency,
self.weights.cost,
self.weights.quality,
self.weights.reliability,
metric_window_minutes,
)
def get_model_metrics(self, model_id: str) -> ModelMetrics:
"""
Retrieve current metrics for a model.
Checks local cache → Redis → DynamoDB.
"""
# Local cache check
if model_id in self._metrics_cache:
metrics, cached_at = self._metrics_cache[model_id]
if time.time() - cached_at < self._cache_ttl:
return metrics
# Redis check
if self.redis:
try:
key = f"metrics:{model_id}"
data = self.redis.hgetall(key)
if data:
metrics = ModelMetrics(
model_id=model_id,
p50_latency_ms=float(data.get(b"p50_latency_ms", 0)),
p95_latency_ms=float(data.get(b"p95_latency_ms", 0)),
p99_latency_ms=float(data.get(b"p99_latency_ms", 0)),
avg_input_tokens=float(data.get(b"avg_input_tokens", 0)),
avg_output_tokens=float(data.get(b"avg_output_tokens", 0)),
error_rate_pct=float(data.get(b"error_rate_pct", 0)),
throttle_rate_pct=float(data.get(b"throttle_rate_pct", 0)),
quality_score=float(data.get(b"quality_score", 0)),
user_satisfaction=float(data.get(b"user_satisfaction", 0)),
cost_per_query_usd=float(data.get(b"cost_per_query_usd", 0)),
queries_last_hour=int(data.get(b"queries_last_hour", 0)),
last_updated=data.get(b"last_updated", b"").decode(),
)
self._metrics_cache[model_id] = (metrics, time.time())
return metrics
except Exception as e:
logger.warning("Redis metrics fetch failed | model=%s | error=%s", model_id, e)
# Return empty metrics as fallback
return ModelMetrics(model_id=model_id)
def rank_models(
self,
query_type: str = "general",
override_weights: Optional[MetricWeight] = None,
) -> List[ModelRanking]:
"""
Rank all models by composite score.
Args:
query_type: Optional query type for weight adjustment
override_weights: Override default weights for this ranking
Returns:
List of ModelRanking sorted by composite_score descending
"""
weights = override_weights or self._adjust_weights_for_query(query_type)
rankings: List[ModelRanking] = []
all_metrics = {m: self.get_model_metrics(m) for m in self.MODELS}
for model_id, metrics in all_metrics.items():
latency_score = self._score_latency(metrics)
cost_score = self._score_cost(metrics)
quality_score = self._score_quality(metrics)
reliability_score = self._score_reliability(metrics)
composite = (
latency_score * weights.latency
+ cost_score * weights.cost
+ quality_score * weights.quality
+ reliability_score * weights.reliability
)
rankings.append(ModelRanking(
model_id=model_id,
composite_score=composite,
latency_score=latency_score,
cost_score=cost_score,
quality_score=quality_score,
reliability_score=reliability_score,
rank=0,
metrics=metrics,
))
# Sort by composite score descending
rankings.sort(key=lambda r: r.composite_score, reverse=True)
for i, r in enumerate(rankings):
r.rank = i + 1
if rankings:
logger.info(
"Model ranking | #1=%s (%.3f) | #2=%s (%.3f)",
rankings[0].model_id.split(".")[-1][:15],
rankings[0].composite_score,
rankings[1].model_id.split(".")[-1][:15] if len(rankings) > 1 else "N/A",
rankings[1].composite_score if len(rankings) > 1 else 0,
)
return rankings
def select_best_model(
self,
query_type: str = "general",
exclude_models: Optional[List[str]] = None,
) -> ModelRanking:
"""Select the highest-ranked model, optionally excluding some."""
rankings = self.rank_models(query_type)
exclude = set(exclude_models or [])
for r in rankings:
if r.model_id not in exclude:
return r
return rankings[0] # Fallback to best even if excluded
def record_metric(
self,
model_id: str,
latency_ms: float,
input_tokens: int,
output_tokens: int,
error: bool = False,
quality_score: Optional[float] = None,
) -> None:
"""
Record a new metric data point for incremental aggregation.
Pushes to Redis for real-time and DynamoDB for historical.
"""
if not self.redis:
return
try:
pipe = self.redis.pipeline()
key = f"metrics:{model_id}"
ts_key = f"metrics_ts:{model_id}:{int(time.time()) // 60}"
# Update running averages via sorted set for percentile calc
pipe.zadd(f"latency_samples:{model_id}", {str(time.time()): latency_ms})
# Increment counters
pipe.hincrby(key, "total_queries", 1)
if error:
pipe.hincrby(key, "error_count", 1)
if quality_score is not None:
pipe.hset(key, "quality_score", str(quality_score))
pipe.hset(key, "last_updated", datetime.utcnow().isoformat())
pipe.expire(key, 3600)
pipe.execute()
except Exception as e:
logger.warning("Metric recording failed | model=%s | error=%s", model_id, e)
def update_weights(self, new_weights: MetricWeight) -> None:
"""Dynamically update scoring weights (e.g., during budget pressure)."""
if not new_weights.validate():
raise ValueError("Weights must sum to 1.0")
old = self.weights
self.weights = new_weights
logger.info(
"Weights updated | L:%.2f→%.2f C:%.2f→%.2f Q:%.2f→%.2f R:%.2f→%.2f",
old.latency, new_weights.latency,
old.cost, new_weights.cost,
old.quality, new_weights.quality,
old.reliability, new_weights.reliability,
)
def _score_latency(self, metrics: ModelMetrics) -> float:
"""Score latency: lower is better, normalized to 0-1."""
if metrics.p95_latency_ms <= 0:
return 0.5 # No data — neutral score
ratio = metrics.p95_latency_ms / self.LATENCY_BASELINE_MS
return max(0.0, min(1.0, 1.0 - (ratio - 1.0) * 0.5))
def _score_cost(self, metrics: ModelMetrics) -> float:
"""Score cost: lower is better, normalized to 0-1."""
if metrics.cost_per_query_usd <= 0:
return 0.5
ratio = metrics.cost_per_query_usd / self.COST_BASELINE_USD
return max(0.0, min(1.0, 1.0 - (ratio - 1.0) * 0.3))
def _score_quality(self, metrics: ModelMetrics) -> float:
"""Score quality: higher is better, already 0-1."""
if metrics.quality_score <= 0:
return 0.5
return min(1.0, metrics.quality_score / self.QUALITY_BASELINE)
def _score_reliability(self, metrics: ModelMetrics) -> float:
"""Score reliability: lower error rate is better."""
if metrics.error_rate_pct <= 0:
return 0.9 # Assume good if no data
ratio = metrics.error_rate_pct / self.ERROR_RATE_THRESHOLD
return max(0.0, min(1.0, 1.0 - ratio))
def _adjust_weights_for_query(self, query_type: str) -> MetricWeight:
"""Adjust weights based on query type characteristics."""
adjustments = {
"factual": MetricWeight(latency=0.25, cost=0.25, quality=0.35, reliability=0.15),
"analytical": MetricWeight(latency=0.20, cost=0.15, quality=0.50, reliability=0.15),
"creative": MetricWeight(latency=0.15, cost=0.20, quality=0.50, reliability=0.15),
"conversational": MetricWeight(latency=0.40, cost=0.30, quality=0.15, reliability=0.15),
}
return adjustments.get(query_type, self.weights)
6. API Gateway Request Transformation for Routing
API Gateway request transformations allow routing logic to execute at the edge, before reaching the ECS Fargate orchestrator. This reduces latency for simple routing decisions and offloads work from the compute layer.
6.1 API Gateway Routing Architecture
flowchart LR
subgraph Client["Client Layer"]
APP[MangaAssist App]
end
subgraph APIGW["API Gateway"]
WS[WebSocket API]
AUTH[Lambda Authorizer]
VTL[VTL Mapping Template]
INT[Integration Request]
end
subgraph Routes["Route Targets"]
FAST[ECS Fast Path<br/>Haiku Direct]
FULL[ECS Full Router<br/>Dynamic Analysis]
PREM[ECS Premium Path<br/>Sonnet Direct]
end
APP -->|WebSocket| WS
WS --> AUTH
AUTH -->|Context| VTL
VTL -->|Transform| INT
INT -->|simple_intent| FAST
INT -->|complex_intent| FULL
INT -->|premium_user| PREM
6.2 API Gateway Route Transformer Implementation
"""
api_gateway_transformer.py — MangaAssist API Gateway Route Transformation
Generates VTL mapping templates and Lambda authorizer logic for
API Gateway-level routing decisions. Enables edge-based routing
without reaching the ECS orchestrator for simple cases.
"""
import json
import hashlib
import logging
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class TransformationRule:
"""A single routing transformation rule for API Gateway."""
rule_id: str
condition_field: str # e.g., "intent", "user_tier", "message_length"
condition_operator: str # "equals", "contains", "greater_than", "regex"
condition_value: str
target_route: str # Route key for API Gateway
priority: int = 5
enabled: bool = True
description: str = ""
@dataclass
class TransformationResult:
"""Result of applying transformation rules."""
target_route: str
matched_rule_id: str
transformed_headers: Dict[str, str]
transformed_body: Dict[str, Any]
routing_metadata: Dict[str, Any]
class APIGatewayRouteTransformer:
"""
API Gateway request transformation engine for MangaAssist.
Generates and applies routing transformations at the API Gateway
level using VTL templates and Lambda authorizer context. Enables
fast-path routing for simple intents without full orchestrator
involvement.
Capabilities:
- Header-based routing (X-Intent-Category, X-User-Tier)
- Body inspection for routing signals
- VTL template generation for API Gateway mappings
- Lambda authorizer context enrichment
- Route key generation for WebSocket API
Usage:
transformer = APIGatewayRouteTransformer()
transformer.add_rule(TransformationRule(...))
result = transformer.transform(request_event)
"""
DEFAULT_ROUTE = "full_router"
def __init__(self):
self.rules: List[TransformationRule] = []
self._rules_sorted = False
# Pre-load standard MangaAssist routing rules
self._load_default_rules()
logger.info("APIGatewayRouteTransformer initialized with %d rules", len(self.rules))
def _load_default_rules(self) -> None:
"""Load default routing rules for MangaAssist."""
defaults = [
TransformationRule(
rule_id="greeting_fast",
condition_field="intent",
condition_operator="equals",
condition_value="greeting",
target_route="fast_haiku",
priority=1,
description="Greetings go directly to Haiku fast path",
),
TransformationRule(
rule_id="simple_faq_fast",
condition_field="intent",
condition_operator="equals",
condition_value="simple_faq",
target_route="fast_haiku",
priority=2,
description="Simple FAQ to Haiku fast path",
),
TransformationRule(
rule_id="order_status_fast",
condition_field="intent",
condition_operator="equals",
condition_value="order_status",
target_route="fast_haiku",
priority=2,
description="Order status lookups to Haiku fast path",
),
TransformationRule(
rule_id="premium_user_sonnet",
condition_field="user_tier",
condition_operator="equals",
condition_value="premium",
target_route="premium_sonnet",
priority=3,
description="Premium users get Sonnet by default",
),
TransformationRule(
rule_id="long_message_dynamic",
condition_field="message_length",
condition_operator="greater_than",
condition_value="500",
target_route="full_router",
priority=4,
description="Long messages need dynamic analysis",
),
TransformationRule(
rule_id="complex_intent_dynamic",
condition_field="intent",
condition_operator="equals",
condition_value="complex_analysis",
target_route="full_router",
priority=2,
description="Complex analysis goes to full dynamic router",
),
]
self.rules.extend(defaults)
self._rules_sorted = False
def add_rule(self, rule: TransformationRule) -> None:
"""Add a transformation rule."""
self.rules.append(rule)
self._rules_sorted = False
logger.info("Rule added | id=%s | target=%s", rule.rule_id, rule.target_route)
def remove_rule(self, rule_id: str) -> bool:
"""Remove a rule by ID."""
initial = len(self.rules)
self.rules = [r for r in self.rules if r.rule_id != rule_id]
removed = len(self.rules) < initial
if removed:
self._rules_sorted = False
logger.info("Rule removed | id=%s", rule_id)
return removed
def transform(self, event: Dict[str, Any]) -> TransformationResult:
"""
Apply transformation rules to an incoming API Gateway event.
Args:
event: API Gateway WebSocket event (requestContext + body)
Returns:
TransformationResult with routing decision and transformed payload
"""
if not self._rules_sorted:
self.rules.sort(key=lambda r: r.priority)
self._rules_sorted = True
# Extract routing signals from event
signals = self._extract_signals(event)
# Evaluate rules in priority order
for rule in self.rules:
if not rule.enabled:
continue
if self._evaluate_rule(rule, signals):
return self._build_result(rule, event, signals)
# No rule matched — default route
return TransformationResult(
target_route=self.DEFAULT_ROUTE,
matched_rule_id="default",
transformed_headers=self._build_routing_headers(self.DEFAULT_ROUTE, signals),
transformed_body=self._build_routing_body(event, signals),
routing_metadata={"signals": signals, "matched": False},
)
def generate_vtl_template(self) -> str:
"""
Generate a VTL (Velocity Template Language) mapping template
for API Gateway integration request transformation.
Returns:
VTL template string for API Gateway configuration
"""
vtl_lines = [
'## MangaAssist API Gateway Routing VTL Template',
'## Auto-generated — do not edit manually',
'#set($inputRoot = $util.parseJson($input.body))',
'#set($context = $input.params())',
'#set($intent = $inputRoot.intent)',
'#set($userTier = $context.header.get("X-User-Tier"))',
'#set($messageLength = $inputRoot.message.length())',
'',
'## Determine route based on intent and context',
'#if($intent == "greeting" || $intent == "simple_faq" || $intent == "order_status")',
' #set($route = "fast_haiku")',
' #set($modelId = "anthropic.claude-3-haiku-20240307-v1:0")',
' #set($maxTokens = 512)',
'#elseif($userTier == "premium")',
' #set($route = "premium_sonnet")',
' #set($modelId = "anthropic.claude-3-sonnet-20240229-v1:0")',
' #set($maxTokens = 4096)',
'#elseif($intent == "complex_analysis" || $messageLength > 500)',
' #set($route = "full_router")',
' #set($modelId = "dynamic")',
' #set($maxTokens = 4096)',
'#else',
' #set($route = "full_router")',
' #set($modelId = "dynamic")',
' #set($maxTokens = 2048)',
'#end',
'',
'{',
' "route": "$route",',
' "modelId": "$modelId",',
' "maxTokens": $maxTokens,',
' "originalIntent": "$intent",',
' "userTier": "$userTier",',
' "message": "$util.escapeJavaScript($inputRoot.message)",',
' "sessionId": "$inputRoot.sessionId",',
' "timestamp": "$context.header.get(\'X-Request-Time\')"',
'}',
]
return "\n".join(vtl_lines)
def generate_lambda_authorizer_context(
self, event: Dict[str, Any]
) -> Dict[str, str]:
"""
Generate context to return from a Lambda authorizer
that includes routing metadata for downstream processing.
The authorizer context is available to the integration request
via $context.authorizer.<key>.
"""
signals = self._extract_signals(event)
result = self.transform(event)
return {
"routeTarget": result.target_route,
"matchedRule": result.matched_rule_id,
"detectedIntent": signals.get("intent", "unknown"),
"userTier": signals.get("user_tier", "standard"),
"messageLength": str(signals.get("message_length", 0)),
"routingVersion": "v2",
}
def _extract_signals(self, event: Dict[str, Any]) -> Dict[str, Any]:
"""Extract routing signals from the API Gateway event."""
body = event.get("body", {})
if isinstance(body, str):
try:
body = json.loads(body)
except json.JSONDecodeError:
body = {}
headers = event.get("headers", {})
request_context = event.get("requestContext", {})
authorizer = request_context.get("authorizer", {})
message = body.get("message", "")
return {
"intent": body.get("intent", authorizer.get("intent", "unknown")),
"user_tier": headers.get("X-User-Tier", authorizer.get("userTier", "standard")),
"message_length": len(message),
"session_id": body.get("sessionId", ""),
"connection_id": request_context.get("connectionId", ""),
"has_history": bool(body.get("history")),
"language_hint": headers.get("Accept-Language", "en"),
}
def _evaluate_rule(self, rule: TransformationRule, signals: Dict[str, Any]) -> bool:
"""Evaluate a single rule against extracted signals."""
value = signals.get(rule.condition_field)
if value is None:
return False
if rule.condition_operator == "equals":
return str(value) == rule.condition_value
elif rule.condition_operator == "contains":
return rule.condition_value in str(value)
elif rule.condition_operator == "greater_than":
try:
return float(value) > float(rule.condition_value)
except (ValueError, TypeError):
return False
elif rule.condition_operator == "regex":
import re
return bool(re.search(rule.condition_value, str(value)))
return False
def _build_result(
self,
rule: TransformationRule,
event: Dict[str, Any],
signals: Dict[str, Any],
) -> TransformationResult:
"""Build a transformation result from a matched rule."""
return TransformationResult(
target_route=rule.target_route,
matched_rule_id=rule.rule_id,
transformed_headers=self._build_routing_headers(rule.target_route, signals),
transformed_body=self._build_routing_body(event, signals),
routing_metadata={
"signals": signals,
"matched": True,
"rule_priority": rule.priority,
"rule_description": rule.description,
},
)
def _build_routing_headers(
self, route: str, signals: Dict[str, Any]
) -> Dict[str, str]:
"""Build HTTP headers for the routed request."""
return {
"X-Route-Target": route,
"X-Route-Version": "v2",
"X-Detected-Intent": signals.get("intent", "unknown"),
"X-User-Tier": signals.get("user_tier", "standard"),
"X-Message-Length": str(signals.get("message_length", 0)),
}
def _build_routing_body(
self, event: Dict[str, Any], signals: Dict[str, Any]
) -> Dict[str, Any]:
"""Build the transformed request body with routing context."""
body = event.get("body", {})
if isinstance(body, str):
try:
body = json.loads(body)
except json.JSONDecodeError:
body = {}
body["_routing"] = {
"signals": signals,
"timestamp": int(time.time() * 1000) if hasattr(time, 'time') else 0,
}
return body
7. Step Functions ASL — Dynamic Routing Workflow
{
"Comment": "MangaAssist Dynamic Model Routing Workflow",
"StartAt": "AnalyzeContent",
"States": {
"AnalyzeContent": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT:function:MangaAssist-AnalyzeContent",
"Parameters": {
"query.$": "$.message",
"session_id.$": "$.sessionId",
"conversation_history.$": "$.history"
},
"ResultPath": "$.analysis",
"TimeoutSeconds": 5,
"Retry": [
{
"ErrorEquals": ["Lambda.ServiceException", "Lambda.TooManyRequestsException"],
"IntervalSeconds": 1,
"MaxAttempts": 2,
"BackoffRate": 2
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"ResultPath": "$.error",
"Next": "FallbackRouting"
}
],
"Next": "CheckComplexity"
},
"CheckComplexity": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.analysis.complexity_score",
"NumericGreaterThanEquals": 7,
"Next": "CheckBudget"
},
{
"Variable": "$.analysis.requires_cultural_context",
"BooleanEquals": true,
"Next": "CheckBudget"
},
{
"Variable": "$.analysis.complexity_score",
"NumericLessThan": 3,
"Next": "RouteToHaiku"
}
],
"Default": "CollectMetrics"
},
"CheckBudget": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT:function:MangaAssist-CheckBudget",
"Parameters": {
"user_id.$": "$.userId",
"estimated_input_tokens.$": "$.analysis.estimated_input_tokens",
"estimated_output_tokens.$": "$.analysis.estimated_output_tokens",
"model_tier": "sonnet"
},
"ResultPath": "$.budget",
"TimeoutSeconds": 3,
"Next": "BudgetDecision"
},
"BudgetDecision": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.budget.approved",
"BooleanEquals": true,
"Next": "RouteToSonnet"
}
],
"Default": "RouteToHaiku"
},
"CollectMetrics": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT:function:MangaAssist-CollectMetrics",
"Parameters": {
"models": ["anthropic.claude-3-haiku-20240307-v1:0", "anthropic.claude-3-sonnet-20240229-v1:0"],
"query_type.$": "$.analysis.query_type"
},
"ResultPath": "$.metrics",
"TimeoutSeconds": 3,
"Retry": [
{
"ErrorEquals": ["States.TaskFailed"],
"IntervalSeconds": 1,
"MaxAttempts": 1
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"ResultPath": "$.metricsError",
"Next": "RouteToHaiku"
}
],
"Next": "MetricBasedSelection"
},
"MetricBasedSelection": {
"Type": "Choice",
"Choices": [
{
"And": [
{
"Variable": "$.metrics.best_model",
"StringEquals": "sonnet"
},
{
"Variable": "$.analysis.complexity_score",
"NumericGreaterThanEquals": 5
}
],
"Next": "RouteToSonnet"
}
],
"Default": "RouteToHaiku"
},
"RouteToSonnet": {
"Type": "Task",
"Resource": "arn:aws:states:::bedrock:invokeModel",
"Parameters": {
"ModelId": "anthropic.claude-3-sonnet-20240229-v1:0",
"ContentType": "application/json",
"Accept": "application/json",
"Body": {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens.$": "$.analysis.max_tokens",
"messages": [
{
"role": "user",
"content.$": "$.message"
}
],
"temperature": 0.3
}
},
"ResultPath": "$.modelResponse",
"TimeoutSeconds": 30,
"Retry": [
{
"ErrorEquals": ["Bedrock.ThrottlingException"],
"IntervalSeconds": 2,
"MaxAttempts": 3,
"BackoffRate": 2
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"ResultPath": "$.modelError",
"Next": "FallbackRouting"
}
],
"Next": "LogDecision"
},
"RouteToHaiku": {
"Type": "Task",
"Resource": "arn:aws:states:::bedrock:invokeModel",
"Parameters": {
"ModelId": "anthropic.claude-3-haiku-20240307-v1:0",
"ContentType": "application/json",
"Accept": "application/json",
"Body": {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1024,
"messages": [
{
"role": "user",
"content.$": "$.message"
}
],
"temperature": 0.3
}
},
"ResultPath": "$.modelResponse",
"TimeoutSeconds": 15,
"Retry": [
{
"ErrorEquals": ["Bedrock.ThrottlingException"],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"ResultPath": "$.modelError",
"Next": "FallbackRouting"
}
],
"Next": "LogDecision"
},
"FallbackRouting": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT:function:MangaAssist-FallbackResponse",
"Parameters": {
"error.$": "$.error",
"original_message.$": "$.message",
"session_id.$": "$.sessionId"
},
"ResultPath": "$.modelResponse",
"Next": "LogDecision"
},
"LogDecision": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "LogToCloudWatch",
"States": {
"LogToCloudWatch": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT:function:MangaAssist-LogRouting",
"Parameters": {
"decision_data.$": "$"
},
"End": true
}
}
},
{
"StartAt": "UpdateMetrics",
"States": {
"UpdateMetrics": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT:function:MangaAssist-UpdateMetrics",
"Parameters": {
"model_used.$": "$.modelResponse.model",
"latency_ms.$": "$.modelResponse.latency_ms",
"tokens.$": "$.modelResponse.usage"
},
"End": true
}
}
}
],
"ResultPath": "$.logging",
"Next": "ReturnResponse"
},
"ReturnResponse": {
"Type": "Pass",
"Parameters": {
"response.$": "$.modelResponse.content",
"model_used.$": "$.modelResponse.model",
"session_id.$": "$.sessionId"
},
"End": true
}
}
}
8. Cost Optimization Matrix
| Routing Path | Model | Avg Input Tokens | Avg Output Tokens | Cost/Query | Queries/Day | Daily Cost |
|---|---|---|---|---|---|---|
| Fast Haiku (greeting/FAQ) | Haiku | 200 | 150 | $0.000238 | 600,000 | $142.50 |
| Dynamic → Haiku | Haiku | 500 | 400 | $0.000625 | 250,000 | $156.25 |
| Dynamic → Sonnet | Sonnet | 800 | 600 | $0.011400 | 100,000 | $1,140.00 |
| Premium → Sonnet | Sonnet | 600 | 500 | $0.009300 | 50,000 | $465.00 |
| Totals | — | — | — | — | 1,000,000 | $1,903.75 |
Without routing (all Sonnet): 1M queries x avg $0.01 = $10,000/day
Savings from intelligent routing: ~81%
9. Key Design Decisions Summary
| Decision | Choice | Reasoning |
|---|---|---|
| Default model | Haiku | 85% of queries are simple enough for Haiku at 12x lower cost |
| Static cache layer | Redis + local in-memory | Sub-millisecond routing for known intents — critical for 3s target |
| Dynamic router trigger | Ambiguous intents only | Avoid analysis overhead for clear-cut routing decisions |
| Step Functions for routing | Express workflows | Standard workflows too slow; Express gives < 100ms state transitions |
| Metric collection | Redis counters + DynamoDB | Redis for real-time (< 1ms reads); DynamoDB for durable history |
| API Gateway routing | VTL + Lambda authorizer | Edge-level routing avoids ECS round-trip for 60% of queries |
| Fallback chain | Sonnet → Haiku → static | Graceful degradation: if premium model fails, still serve response |
| Budget tracking | Per-user daily limits | Prevents runaway Sonnet costs from single-user abuse |
10. References
| Resource | Link |
|---|---|
| Amazon Bedrock Model Invocation | https://docs.aws.amazon.com/bedrock/latest/userguide/model-invocation.html |
| AWS Step Functions Express Workflows | https://docs.aws.amazon.com/step-functions/latest/dg/concepts-standard-vs-express.html |
| API Gateway Request Mapping Templates | https://docs.aws.amazon.com/apigateway/latest/developerguide/request-response-data-mappings.html |
| Claude 3 Model Card | https://docs.anthropic.com/en/docs/about-claude/models |
| API Gateway WebSocket APIs | https://docs.aws.amazon.com/apigateway/latest/developerguide/apigateway-websocket-api.html |
| ElastiCache Best Practices | https://docs.aws.amazon.com/AmazonElastiCache/latest/red-ug/BestPractices.html |
| DynamoDB Design Patterns | https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/bp-general-nosql-design.html |