Model Coordination Systems — Scenarios and Runbooks
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Certification | Domain | Task | Skill | This File |
|---|---|---|---|---|
| AWS AIP-C01 | Domain 2 — Implementation & Integration | Task 2.1 — Select and implement FM integration approaches | Skill 2.1.4 — Design model coordination systems | Scenarios and runbooks for ensemble failures, router misclassification, aggregation timeout, cost explosion, and embedding drift |
Skill scope: Five production incident scenarios covering the failure modes of multi-model coordination in a high-traffic GenAI chatbot — conflicting ensemble recommendations, router misclassifying queries, aggregation timeout, cost explosion from unnecessary Sonnet routing, and embedding drift. Each with detection, root cause analysis, resolution steps, and prevention measures.
Scenario Overview
| # | Scenario | Severity | Blast Radius | Detection Time | Resolution Time |
|---|---|---|---|---|---|
| 1 | Conflicting ensemble recommendations | P2 | 12% of recommendation queries | ~15 min (quality score drop) | 1-2 hours |
| 2 | Router misclassifying queries | P1 | 25% of all queries | ~5 min (latency spike) | 30-60 min |
| 3 | Aggregation timeout | P1 | 100% of ensemble queries | ~1 min (error rate spike) | 15-30 min |
| 4 | Cost explosion from unnecessary Sonnet routing | P2 | Budget (not quality) | ~2 hours (billing alert) | 30 min |
| 5 | Embedding drift degrading ensemble quality | P3 | Gradual — 5-10% quality loss | ~1 week (weekly drift check) | 4-8 hours |
Scenario 1 — Conflicting Ensemble Recommendations
Problem Statement
A customer asks: "I loved Death Note, what should I read next?"
The 3-model ensemble produces conflicting recommendations: - Haiku suggests "Monster" (psychological thriller match) - Sonnet suggests "Code Geass" (strategic genius theme match) - RAG (OpenSearch) returns "Bakuman" (same author — Tsugumi Ohba)
The quality scorer rates all three above 0.70, and the response merger produces an incoherent answer that mixes all three recommendations without clear reasoning. The customer receives: "You might enjoy Monster, Code Geass, and Bakuman because they share themes of..." — a generic response that fails to explain why each was chosen, reducing trust and engagement.
Business impact: Customer satisfaction drops on recommendation queries. Conversion rate for "similar manga" recommendations falls from 8.2% to 5.1% over three days.
Detection
graph TD
A[Ensemble produces 3+<br/>divergent recommendations] --> B{Quality scores<br/>within 0.05 of each other?}
B -->|Yes — no clear winner| C[Log: ensemble_conflict_detected]
C --> D[CloudWatch Metric:<br/>ensemble.conflict_rate]
D --> E{conflict_rate > 15%<br/>of ensemble queries?}
E -->|Yes| F[CloudWatch Alarm:<br/>EnsembleConflictRateHigh]
F --> G[PagerDuty Alert]
B -->|No — clear winner| H[Normal aggregation<br/>no conflict]
E -->|No| I[Within normal range<br/>some conflict is expected]
style F fill:#e76f51,stroke:#f4a261,color:#fff
Key metrics to monitor:
| Metric | Normal | Alarm Threshold | Source |
|---|---|---|---|
ensemble.conflict_rate |
< 10% | > 15% of ensemble queries | Aggregator logs |
ensemble.quality_spread |
< 0.15 | > 0.05 spread among top 3 | Quality scorer |
recommendation.ctr |
> 7% | < 5% | Frontend analytics |
recommendation.feedback_negative |
< 8% | > 15% | User thumbs down |
Root Cause Analysis
graph TD
CONF[Ensemble Conflict<br/>Detected] --> Q1{Are models using<br/>different criteria?}
Q1 -->|Yes| RC1[Root Cause: No shared<br/>recommendation framework<br/>Models optimize different signals]
Q1 -->|No| Q2{Is RAG returning<br/>author-based matches<br/>instead of theme-based?}
Q2 -->|Yes| RC2[Root Cause: OpenSearch index<br/>over-indexes author metadata<br/>vs thematic embeddings]
Q2 -->|No| Q3{Is the merger strategy<br/>inappropriate for<br/>multi-option responses?}
Q3 -->|Yes| RC3[Root Cause: Union merge<br/>concatenates without ranking<br/>needs selection or refinement]
Q3 -->|No| RC4[Root Cause: Quality scorer<br/>not differentiating between<br/>complementary vs conflicting responses]
RC1 --> FIX1[Add shared recommendation<br/>criteria in system prompt]
RC2 --> FIX2[Rebalance OpenSearch<br/>scoring: theme > author]
RC3 --> FIX3[Switch merger to REFINEMENT<br/>for recommendation queries]
RC4 --> FIX4[Add conflict dimension<br/>to quality scorer]
style CONF fill:#e76f51,stroke:#f4a261,color:#fff
style RC3 fill:#264653,stroke:#2a9d8f,color:#fff
In this scenario: Root Cause 3 is primary. The union merge strategy blindly combines all responses when a refinement strategy (Sonnet synthesizes the best recommendation with supporting evidence) would produce a coherent single recommendation.
Resolution
Step 1 — Switch merge strategy for recommendations (5 minutes)
"""
Emergency: change merge strategy for recommendation queries
from UNION to REFINEMENT. Deploy via feature flag in Redis.
"""
import redis
import json
import logging
logger = logging.getLogger(__name__)
def switch_recommendation_merge_strategy(
redis_client: redis.Redis,
new_strategy: str = "refinement",
) -> None:
"""
Update merge strategy for recommendation queries via feature flag.
ECS tasks read this flag at the start of each ensemble execution.
"""
flag_key = "feature_flags:ensemble:recommendation_merge_strategy"
redis_client.set(flag_key, new_strategy)
logger.info("Switched recommendation merge strategy to: %s", new_strategy)
def get_merge_strategy(
redis_client: redis.Redis,
query_type: str,
) -> str:
"""Read current merge strategy from feature flag."""
flag_key = f"feature_flags:ensemble:{query_type}_merge_strategy"
strategy = redis_client.get(flag_key)
if strategy:
return strategy.decode()
# Defaults per query type
defaults = {
"recommendation": "refinement",
"factual": "selection",
"creative": "summarization",
"general": "selection",
}
return defaults.get(query_type, "selection")
Step 2 — Add recommendation framework to system prompt (30 minutes)
"""
Update the ensemble system prompt to include a shared
recommendation framework so all models use consistent criteria.
"""
RECOMMENDATION_SYSTEM_PROMPT = """You are a manga recommendation expert for MangaAssist.
When recommending manga similar to a title the customer mentions, follow this framework:
1. PRIMARY: Match by genre and theme (e.g., psychological thriller, action, romance)
2. SECONDARY: Match by narrative style (episodic vs. arc-based, dark vs. lighthearted)
3. TERTIARY: Match by author or artist (same creator's other works)
Always explain WHY you are recommending each title:
- "Because you enjoyed the psychological tension in Death Note, you'll love Monster's slow-burn suspense."
Do NOT list multiple recommendations without clear reasoning for each.
Limit to 2-3 recommendations maximum, ranked by relevance.
Respond with JSON:
{
"recommendations": [
{"title": "...", "reason": "...", "match_type": "genre|style|author", "confidence": 0.0-1.0}
]
}
"""
Step 3 — Add conflict detection to the aggregator (1-2 hours)
"""
Detect when ensemble responses conflict and trigger
refinement merge instead of union merge automatically.
"""
import logging
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class ConflictAnalysis:
"""Analysis of whether ensemble responses conflict."""
is_conflicting: bool
conflict_score: float # 0.0 = full agreement, 1.0 = full disagreement
recommended_strategy: str
reason: str
class EnsembleConflictDetector:
"""
Detects when ensemble responses are divergent rather than
complementary, and recommends the appropriate merge strategy.
"""
CONFLICT_THRESHOLD = 0.6
def analyze_responses(
self,
responses: list[dict],
quality_scores: list[float],
) -> ConflictAnalysis:
"""
Determine if responses conflict based on quality score spread
and response text similarity.
responses: [{"model_id": str, "response": str, "entities": list}, ...]
quality_scores: [float, ...] aligned with responses
"""
if len(responses) < 2:
return ConflictAnalysis(
is_conflicting=False,
conflict_score=0.0,
recommended_strategy="selection",
reason="Single response, no conflict possible",
)
# Check quality score spread
score_spread = max(quality_scores) - min(quality_scores)
scores_close = score_spread < 0.10
# Check entity overlap (e.g., recommended titles)
all_entities = [set(r.get("entities", [])) for r in responses]
if all_entities and any(all_entities):
union = set().union(*all_entities)
intersection = set.intersection(*all_entities) if all(all_entities) else set()
entity_overlap = len(intersection) / len(union) if union else 1.0
else:
entity_overlap = 0.5 # Unknown
# Conflict score: high when scores are close but entities differ
conflict_score = (1.0 - entity_overlap) * (1.0 if scores_close else 0.5)
if conflict_score > self.CONFLICT_THRESHOLD:
return ConflictAnalysis(
is_conflicting=True,
conflict_score=conflict_score,
recommended_strategy="refinement",
reason=f"Entity overlap={entity_overlap:.2f}, score_spread={score_spread:.2f}",
)
return ConflictAnalysis(
is_conflicting=False,
conflict_score=conflict_score,
recommended_strategy="union" if scores_close else "selection",
reason=f"Entity overlap={entity_overlap:.2f}, score_spread={score_spread:.2f}",
)
Prevention
- Default merge strategy for recommendation queries should be
REFINEMENT, notUNION - Add shared recommendation framework to all model system prompts
- Monitor
ensemble.conflict_rateand alert at 15% - Run weekly A/B test comparing merge strategies to validate quality
Scenario 2 — Router Misclassifying Queries
Problem Statement
The Haiku-based complexity router starts misclassifying 25% of complex Japanese-language queries as "simple," sending them to Haiku-only processing instead of the Sonnet ensemble. Customers asking nuanced questions like "この漫画の文化的な背景を教えてください" ("Tell me about the cultural background of this manga") receive shallow, generic answers.
Business impact: Japanese-language CSAT drops from 4.2 to 3.4 out of 5. Customer escalation rate for JP queries increases 3x.
Detection
graph TD
A[Query arrives in Japanese] --> B[Router classifies<br/>complexity = 0.25 'simple']
B --> C[Routes to Haiku Only]
C --> D[Haiku gives shallow response]
D --> E[Customer gives thumbs down]
E --> F{Negative feedback rate<br/>for Haiku-only JP queries?}
F -->|> 20%| G[CloudWatch Alarm:<br/>RouterMisclassificationJP]
G --> H[PagerDuty Alert]
F -->|< 20%| I[Normal range]
style G fill:#e76f51,stroke:#f4a261,color:#fff
Key metrics:
| Metric | Normal | Alarm | Source |
|---|---|---|---|
router.jp_simple_pct |
35-40% | > 55% | Router classification logs |
haiku_only.jp_negative_feedback |
< 12% | > 20% | User feedback |
router.complexity_score_mean_jp |
0.55 | < 0.40 | Router logs |
escalation.jp_rate |
< 5% | > 12% | Support ticket system |
Root Cause Analysis
graph TD
MISS[Router Misclassification<br/>JP Queries] --> Q1{Is the classification<br/>prompt English-only?}
Q1 -->|Yes| RC1[Root Cause: Classification prompt<br/>lacks Japanese examples<br/>and complexity criteria]
Q1 -->|No| Q2{Did Haiku model<br/>version change recently?}
Q2 -->|Yes| RC2[Root Cause: New Haiku version<br/>has different JP complexity<br/>assessment behavior]
Q2 -->|No| Q3{Are JP queries<br/>longer on average?}
Q3 -->|Yes| RC3[Root Cause: Token-count heuristic<br/>in router biased toward<br/>JP character density]
Q3 -->|No| RC4[Root Cause: Complexity training<br/>data under-represents<br/>Japanese query patterns]
RC1 --> FIX1[Add JP classification examples<br/>and bilingual complexity criteria]
RC2 --> FIX2[Pin Haiku model version<br/>add regression test suite]
RC3 --> FIX3[Normalize token count<br/>by language before routing]
RC4 --> FIX4[Augment training data<br/>with JP complexity labels]
style MISS fill:#e76f51,stroke:#f4a261,color:#fff
style RC1 fill:#264653,stroke:#2a9d8f,color:#fff
Resolution
Step 1 — Emergency: bypass router for JP queries (5 minutes)
"""
Emergency override: route all Japanese queries to Cascade
strategy instead of relying on the complexity router.
"""
import redis
import logging
logger = logging.getLogger(__name__)
def enable_jp_cascade_override(redis_client: redis.Redis) -> None:
"""
Set feature flag to bypass complexity router for Japanese queries.
All JP queries go through Cascade (Haiku first, Sonnet escalation).
"""
redis_client.set(
"feature_flags:router:jp_override_strategy", "cascade"
)
redis_client.set(
"feature_flags:router:jp_override_enabled", "true"
)
logger.info("JP cascade override ENABLED — all JP queries bypass router")
def check_jp_override(
redis_client: redis.Redis,
detected_language: str,
) -> str | None:
"""Check if JP override is active before routing."""
if detected_language != "ja":
return None
enabled = redis_client.get("feature_flags:router:jp_override_enabled")
if enabled and enabled.decode() == "true":
strategy = redis_client.get("feature_flags:router:jp_override_strategy")
return strategy.decode() if strategy else "cascade"
return None
Step 2 — Fix the classification prompt (30 minutes)
"""
Updated classification prompt with bilingual complexity criteria
and Japanese-specific examples.
"""
BILINGUAL_COMPLEXITY_PROMPT = """Classify the complexity of the following customer query for a manga store chatbot.
Complexity levels:
- SIMPLE (0.0-0.3): Factual lookups, yes/no questions, greetings
EN: "What's the price of One Piece vol 108?" / "Do you ship to Canada?"
JP: "ワンピース108巻の値段は?" / "カナダに発送しますか?"
- MEDIUM (0.3-0.7): Comparisons, basic recommendations, multi-step questions
EN: "What's the difference between the regular and deluxe edition?"
JP: "通常版とデラックス版の違いは何ですか?"
- COMPLEX (0.7-1.0): Cultural context, thematic analysis, nuanced recommendations, opinions
EN: "How does the cultural context of post-war Japan influence the themes in Barefoot Gen?"
JP: "この漫画の文化的な背景を教えてください" / "鬼滅の刃のテーマ的な深みについて教えて"
IMPORTANT: Japanese queries that ask about 背景 (background), 文化 (culture), テーマ (theme),
意味 (meaning), 影響 (influence), or 比較 (comparison) are almost always COMPLEX.
Query: {query}
Respond with JSON only: {{"complexity": 0.0-1.0, "reasoning": "brief explanation"}}"""
Step 3 — Add language-aware complexity normalization (1 hour)
"""
Normalize complexity scores by language to account for
systematic underestimation of Japanese query complexity.
"""
import json
import logging
from dataclasses import dataclass
logger = logging.getLogger(__name__)
# Calibration offsets learned from evaluation data
LANGUAGE_COMPLEXITY_OFFSETS = {
"ja": 0.15, # JP queries systematically underscored by 0.15
"en": 0.0, # English is baseline
"zh": 0.10, # Chinese queries slightly underscored
}
JP_COMPLEXITY_KEYWORDS = [
"背景", "文化", "テーマ", "意味", "影響", "比較",
"解釈", "象徴", "分析", "考察", "歴史", "社会",
]
@dataclass
class NormalizedComplexity:
"""Complexity score with language normalization applied."""
raw_score: float
language: str
offset: float
keyword_boost: float
final_score: float
def normalize_complexity(
raw_score: float,
query: str,
language: str,
) -> NormalizedComplexity:
"""
Apply language-specific calibration offset and keyword boosting.
"""
offset = LANGUAGE_COMPLEXITY_OFFSETS.get(language, 0.0)
# Keyword boost for JP complex indicators
keyword_boost = 0.0
if language == "ja":
matches = sum(1 for kw in JP_COMPLEXITY_KEYWORDS if kw in query)
keyword_boost = min(0.20, matches * 0.08)
final_score = min(1.0, raw_score + offset + keyword_boost)
return NormalizedComplexity(
raw_score=raw_score,
language=language,
offset=offset,
keyword_boost=keyword_boost,
final_score=round(final_score, 3),
)
Prevention
- Include bilingual (JP + EN) examples in all classification prompts
- Add language-aware complexity normalization to the router
- Pin Haiku model version and run regression tests on version updates
- Monitor per-language routing distributions with CloudWatch dashboards
Scenario 3 — Aggregation Timeout
Problem Statement
The ensemble aggregator's asyncio.gather call times out at 3 seconds because Sonnet response latency spikes to 4.5 seconds during a Bedrock service degradation event. All ensemble queries fail with timeout errors, and customers see "Sorry, I'm having trouble processing your request."
Business impact: 100% of complex queries fail for 45 minutes. Error rate jumps from 0.1% to 38%. Customer abandonment rate spikes.
Detection
graph TD
A[asyncio.gather raises<br/>TimeoutError at 3s] --> B[ECS task logs<br/>aggregation_timeout error]
B --> C[CloudWatch Metric:<br/>ensemble.timeout_count]
C --> D{timeout_count > 50<br/>in 5 minutes?}
D -->|Yes| E[CloudWatch Alarm:<br/>EnsembleTimeoutCritical]
E --> F[PagerDuty P1 Alert]
D -->|No| G[Sporadic timeout<br/>log and continue]
H[Bedrock latency CloudWatch<br/>shows Sonnet p99 = 4.5s] --> I{Sonnet p99 > 3s?}
I -->|Yes| J[CloudWatch Alarm:<br/>BedrockSonnetLatencyHigh]
J --> F
style E fill:#e76f51,stroke:#f4a261,color:#fff
style J fill:#e76f51,stroke:#f4a261,color:#fff
Key metrics:
| Metric | Normal | Alarm | Source |
|---|---|---|---|
ensemble.timeout_count |
< 5/hr | > 50 in 5 min | ECS application logs |
ensemble.timeout_rate |
< 0.5% | > 5% | timeout / total ensemble |
bedrock.sonnet_p99_latency |
< 2s | > 3s | Bedrock CloudWatch |
customer.error_rate |
< 0.5% | > 5% | API Gateway 5xx |
Root Cause Analysis
graph TD
TO[Aggregation Timeout] --> Q1{Which model<br/>timed out?}
Q1 -->|Sonnet| Q2{Is Bedrock reporting<br/>service issues?}
Q2 -->|Yes| RC1[Root Cause: Bedrock Sonnet<br/>service degradation<br/>Latency spike across region]
Q2 -->|No| Q3{Is the prompt<br/>unusually long?}
Q3 -->|Yes| RC2[Root Cause: Prompt bloat<br/>RAG context too large<br/>exceeding token limit]
Q3 -->|No| RC3[Root Cause: ECS task<br/>resource exhaustion<br/>CPU/memory pressure]
Q1 -->|OpenSearch| RC4[Root Cause: OpenSearch<br/>collection throttling<br/>or index not available]
Q1 -->|All models| RC5[Root Cause: Network issue<br/>VPC endpoint or NAT<br/>gateway congestion]
RC1 --> FIX1[Enable cascade fallback<br/>Use Haiku-only when<br/>Sonnet is degraded]
style TO fill:#e76f51,stroke:#f4a261,color:#fff
style RC1 fill:#264653,stroke:#2a9d8f,color:#fff
Resolution
Step 1 — Enable graceful degradation (2 minutes)
"""
Emergency: enable graceful degradation mode.
When Sonnet times out, the aggregator proceeds with
whatever results are available (Haiku + RAG).
"""
import asyncio
import json
import logging
import time
from dataclasses import dataclass, field
import boto3
logger = logging.getLogger(__name__)
@dataclass
class PartialEnsembleResult:
"""Result when some models time out."""
responses: list[dict] = field(default_factory=list)
timed_out_models: list[str] = field(default_factory=list)
is_partial: bool = False
total_latency_ms: float = 0.0
class ResilientEnsembleAggregator:
"""
Ensemble aggregator with per-model timeout and graceful degradation.
Instead of asyncio.gather with a single timeout, each model
gets its own timeout. If Sonnet times out, Haiku + RAG results
are used with a quality warning flag.
"""
MODEL_TIMEOUTS = {
"anthropic.claude-3-sonnet-20240229-v1:0": 2.5, # seconds
"anthropic.claude-3-haiku-20240307-v1:0": 1.0,
"opensearch_rag": 1.5,
}
MINIMUM_RESPONSES = 1 # At least 1 model must respond
def __init__(self, bedrock_client=None):
self.bedrock = bedrock_client or boto3.client(
"bedrock-runtime", region_name="ap-northeast-1"
)
async def invoke_with_timeout(
self,
model_id: str,
query: str,
system_prompt: str,
) -> dict | None:
"""Invoke a single model with per-model timeout."""
timeout = self.MODEL_TIMEOUTS.get(model_id, 2.0)
try:
result = await asyncio.wait_for(
self._invoke_model(model_id, query, system_prompt),
timeout=timeout,
)
return result
except asyncio.TimeoutError:
logger.warning(
"Model %s timed out after %.1fs", model_id, timeout
)
return None
except Exception as e:
logger.error("Model %s failed: %s", model_id, str(e))
return None
async def aggregate_with_degradation(
self,
query: str,
system_prompt: str,
model_ids: list[str],
) -> PartialEnsembleResult:
"""
Run ensemble with graceful degradation.
Proceeds with partial results if some models fail.
"""
start = time.monotonic()
tasks = {
model_id: self.invoke_with_timeout(model_id, query, system_prompt)
for model_id in model_ids
}
results = {}
for model_id, task in tasks.items():
results[model_id] = await task
responses = []
timed_out = []
for model_id, result in results.items():
if result is not None:
responses.append(result)
else:
timed_out.append(model_id)
elapsed = (time.monotonic() - start) * 1000
if len(responses) < self.MINIMUM_RESPONSES:
logger.error(
"Ensemble critically degraded: %d/%d models responded",
len(responses), len(model_ids),
)
# Last resort: synchronous Haiku call with extended timeout
fallback = await self._emergency_haiku_fallback(query, system_prompt)
if fallback:
responses.append(fallback)
return PartialEnsembleResult(
responses=responses,
timed_out_models=timed_out,
is_partial=len(timed_out) > 0,
total_latency_ms=elapsed,
)
async def _invoke_model(
self, model_id: str, query: str, system_prompt: str
) -> dict:
"""Invoke a Bedrock model."""
body = json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1024,
"temperature": 0.2,
"system": system_prompt,
"messages": [{"role": "user", "content": query}],
})
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
lambda: self.bedrock.invoke_model(
modelId=model_id,
body=body,
contentType="application/json",
accept="application/json",
),
)
result = json.loads(response["body"].read())
return {
"model_id": model_id,
"response": result["content"][0]["text"],
"usage": result.get("usage", {}),
}
async def _emergency_haiku_fallback(
self, query: str, system_prompt: str
) -> dict | None:
"""Last-resort Haiku call with extended timeout."""
try:
return await asyncio.wait_for(
self._invoke_model(
"anthropic.claude-3-haiku-20240307-v1:0",
query, system_prompt,
),
timeout=5.0,
)
except Exception as e:
logger.critical("Emergency Haiku fallback failed: %s", e)
return None
Step 2 — Add circuit breaker for Sonnet (30 minutes)
"""
Circuit breaker pattern: automatically disable Sonnet invocation
when its error/timeout rate exceeds threshold, and periodically
probe to check if it has recovered.
"""
import time
import logging
from dataclasses import dataclass
from enum import Enum
import redis
logger = logging.getLogger(__name__)
class CircuitState(str, Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Sonnet disabled
HALF_OPEN = "half_open" # Probing for recovery
@dataclass
class CircuitBreakerConfig:
failure_threshold: int = 10 # Failures before opening
recovery_timeout_s: int = 60 # Seconds before probing
probe_success_threshold: int = 3 # Successful probes to close
class ModelCircuitBreaker:
"""
Redis-backed circuit breaker for Bedrock model invocations.
States:
- CLOSED: normal operation, counting failures
- OPEN: model disabled, timer running
- HALF_OPEN: probing with single requests
"""
def __init__(
self,
redis_client: redis.Redis,
model_id: str,
config: CircuitBreakerConfig | None = None,
):
self.redis = redis_client
self.model_id = model_id
self.config = config or CircuitBreakerConfig()
self._prefix = f"circuit_breaker:{model_id}"
def should_allow_request(self) -> bool:
"""Check if a request should be allowed through."""
state = self._get_state()
if state == CircuitState.CLOSED:
return True
if state == CircuitState.OPEN:
opened_at = float(self.redis.get(f"{self._prefix}:opened_at") or 0)
if time.time() - opened_at > self.config.recovery_timeout_s:
self._set_state(CircuitState.HALF_OPEN)
logger.info("Circuit breaker %s: OPEN -> HALF_OPEN (probing)", self.model_id)
return True
return False
if state == CircuitState.HALF_OPEN:
return True
return True
def record_success(self) -> None:
"""Record a successful invocation."""
state = self._get_state()
if state == CircuitState.HALF_OPEN:
probe_count = self.redis.incr(f"{self._prefix}:probe_successes")
if probe_count >= self.config.probe_success_threshold:
self._set_state(CircuitState.CLOSED)
self.redis.delete(f"{self._prefix}:failure_count")
self.redis.delete(f"{self._prefix}:probe_successes")
logger.info("Circuit breaker %s: HALF_OPEN -> CLOSED", self.model_id)
elif state == CircuitState.CLOSED:
# Reset failure count on success
self.redis.set(f"{self._prefix}:failure_count", 0)
def record_failure(self) -> None:
"""Record a failed invocation."""
state = self._get_state()
if state == CircuitState.HALF_OPEN:
self._set_state(CircuitState.OPEN)
self.redis.set(f"{self._prefix}:opened_at", str(time.time()))
self.redis.delete(f"{self._prefix}:probe_successes")
logger.warning("Circuit breaker %s: HALF_OPEN -> OPEN (probe failed)", self.model_id)
elif state == CircuitState.CLOSED:
failures = self.redis.incr(f"{self._prefix}:failure_count")
if failures >= self.config.failure_threshold:
self._set_state(CircuitState.OPEN)
self.redis.set(f"{self._prefix}:opened_at", str(time.time()))
logger.warning(
"Circuit breaker %s: CLOSED -> OPEN (%d failures)",
self.model_id, failures,
)
def _get_state(self) -> CircuitState:
raw = self.redis.get(f"{self._prefix}:state")
if raw:
return CircuitState(raw.decode())
return CircuitState.CLOSED
def _set_state(self, state: CircuitState) -> None:
self.redis.set(f"{self._prefix}:state", state.value)
Prevention
- Use per-model timeouts instead of a single
asyncio.gathertimeout - Implement circuit breaker for each Bedrock model endpoint
- Set up CloudWatch alarms on
bedrock.sonnet_p99_latency - Design for graceful degradation: always return something to the customer
Scenario 4 — Cost Explosion from Unnecessary Sonnet Routing
Problem Statement
After a deployment that lowered the complexity classification threshold from 0.7 to 0.5, 65% of queries are now routed to Sonnet or full ensemble (up from 25%). Daily Bedrock spend jumps from $5,000 to $14,200. The quality improvement is negligible — only +2% quality score for a 184% cost increase.
Business impact: Monthly budget blown in 11 days. Finance team flags the anomaly.
Detection
graph TD
A[Bedrock daily spend<br/>computed hourly] --> B{Spend > 120%<br/>of daily budget?}
B -->|Yes at hour 14| C[CloudWatch Alarm:<br/>BedrockBudgetExceeded]
C --> D[SNS to finance +<br/>engineering on-call]
E[Sonnet invocation count<br/>per hour] --> F{Sonnet queries > 60%<br/>of total?}
F -->|Yes| G[CloudWatch Alarm:<br/>SonnetRoutingExcessive]
G --> D
H[Cost-per-query metric<br/>rolling average] --> I{Avg cost/query > $0.01?}
I -->|Yes| J[CloudWatch Alarm:<br/>CostPerQueryHigh]
J --> D
style C fill:#e76f51,stroke:#f4a261,color:#fff
style G fill:#e76f51,stroke:#f4a261,color:#fff
Root Cause Analysis
graph TD
COST[Cost Explosion<br/>$14,200/day] --> Q1{When did spend<br/>increase start?}
Q1 --> Q2{Correlates with<br/>a deployment?}
Q2 -->|Yes| Q3{What changed in<br/>the deployment?}
Q3 --> RC1[Root Cause: Complexity threshold<br/>lowered from 0.7 to 0.5<br/>65% of queries now 'complex']
Q2 -->|No| Q4{Did query volume<br/>increase?}
Q4 -->|Yes| RC2[Root Cause: Viral traffic spike<br/>manga trending on social media]
Q4 -->|No| RC3[Root Cause: Routing logic bug<br/>fallback always hits Sonnet]
RC1 --> FIX1[Revert threshold to 0.7<br/>Add cost-impact review<br/>to deployment checklist]
style COST fill:#e76f51,stroke:#f4a261,color:#fff
style RC1 fill:#264653,stroke:#2a9d8f,color:#fff
Resolution
Step 1 — Emergency: revert complexity threshold (2 minutes)
"""
Emergency: restore complexity routing threshold via Redis
feature flag. No deployment required.
"""
import redis
import logging
logger = logging.getLogger(__name__)
def revert_complexity_threshold(
redis_client: redis.Redis,
threshold: float = 0.7,
) -> None:
"""Restore the complexity threshold for Sonnet routing."""
redis_client.set("config:router:complexity_threshold_medium", str(0.3))
redis_client.set("config:router:complexity_threshold_complex", str(threshold))
logger.info("Reverted complexity threshold to: %.2f", threshold)
def enable_cost_ceiling(
redis_client: redis.Redis,
daily_ceiling_usd: float = 6000.0,
per_query_ceiling_usd: float = 0.015,
) -> None:
"""
Set hard cost ceilings. The aggregator checks these before
selecting ensemble strategy.
"""
redis_client.set("config:budget:daily_ceiling_usd", str(daily_ceiling_usd))
redis_client.set("config:budget:per_query_ceiling_usd", str(per_query_ceiling_usd))
logger.info(
"Cost ceilings set: daily=$%.0f per_query=$%.3f",
daily_ceiling_usd, per_query_ceiling_usd,
)
Step 2 — Add cost-impact guardrail to deployment pipeline (1 hour)
"""
Pre-deployment cost impact estimator.
Runs the new configuration against a sample of historical queries
and computes projected daily cost. Blocks deployment if cost
increase exceeds threshold.
"""
import json
import logging
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class CostImpactEstimate:
"""Projected cost impact of a configuration change."""
current_daily_cost: float
projected_daily_cost: float
cost_change_pct: float
quality_change_pct: float
cost_per_quality_point: float
approved: bool
rejection_reason: str = ""
class DeploymentCostGuardrail:
"""
Estimates cost impact of routing configuration changes
before they are deployed to production.
"""
MAX_COST_INCREASE_PCT = 30.0 # Block if > 30% cost increase
MIN_QUALITY_IMPROVEMENT = 5.0 # Require >= 5% quality gain
def estimate_impact(
self,
current_config: dict,
new_config: dict,
sample_queries: list[dict],
) -> CostImpactEstimate:
"""
Compare cost and quality between current and new configurations.
sample_queries: [{"query": str, "complexity": float, "intent": str}, ...]
"""
current_cost = self._estimate_cost(current_config, sample_queries)
new_cost = self._estimate_cost(new_config, sample_queries)
cost_change = ((new_cost - current_cost) / current_cost * 100) if current_cost > 0 else 0
quality_change = self._estimate_quality_change(current_config, new_config, sample_queries)
# Cost per quality point
cost_delta = new_cost - current_cost
cpqp = cost_delta / quality_change if quality_change > 0 else float("inf")
approved = True
reason = ""
if cost_change > self.MAX_COST_INCREASE_PCT:
if quality_change < self.MIN_QUALITY_IMPROVEMENT:
approved = False
reason = (
f"Cost increase of {cost_change:.1f}% with only "
f"{quality_change:.1f}% quality improvement. "
f"Requires >= {self.MIN_QUALITY_IMPROVEMENT}% quality gain."
)
return CostImpactEstimate(
current_daily_cost=current_cost,
projected_daily_cost=new_cost,
cost_change_pct=round(cost_change, 1),
quality_change_pct=round(quality_change, 1),
cost_per_quality_point=round(cpqp, 2),
approved=approved,
rejection_reason=reason,
)
def _estimate_cost(
self, config: dict, queries: list[dict]
) -> float:
"""Estimate daily cost for a configuration across sample queries."""
threshold_complex = config.get("complexity_threshold_complex", 0.7)
threshold_medium = config.get("complexity_threshold_medium", 0.3)
haiku_count = 0
cascade_count = 0
ensemble_count = 0
for q in queries:
complexity = q["complexity"]
if complexity < threshold_medium:
haiku_count += 1
elif complexity < threshold_complex:
cascade_count += 1
else:
ensemble_count += 1
scale_factor = 1_000_000 / len(queries) # Scale to 1M/day
daily_cost = (
haiku_count * 0.0003 * scale_factor
+ cascade_count * 0.003 * scale_factor
+ ensemble_count * 0.018 * scale_factor
)
return round(daily_cost, 2)
def _estimate_quality_change(
self, current: dict, new: dict, queries: list[dict]
) -> float:
"""Estimate quality improvement percentage."""
# Simplified: more Sonnet usage = marginally higher quality
current_sonnet_pct = sum(
1 for q in queries
if q["complexity"] >= current.get("complexity_threshold_complex", 0.7)
) / len(queries)
new_sonnet_pct = sum(
1 for q in queries
if q["complexity"] >= new.get("complexity_threshold_complex", 0.7)
) / len(queries)
# Diminishing returns: each 10% more Sonnet usage = ~2% quality
delta_pct = (new_sonnet_pct - current_sonnet_pct) * 100
quality_gain = delta_pct * 0.2 # 20% efficiency
return round(quality_gain, 1)
Prevention
- Add
DeploymentCostGuardrailto CI/CD pipeline as a mandatory check - Set daily and per-query cost ceilings via Redis feature flags
- Create CloudWatch dashboard showing cost-per-query trending
- Require cost-impact analysis for any routing threshold changes
Scenario 5 — Embedding Drift Degrading Ensemble Quality
Problem Statement
Over four weeks, the Titan Embeddings V2 model on Bedrock was updated (silent minor version bump). The new embeddings are slightly different from the reference vectors used by the semantic cache and OpenSearch RAG index. The ensemble's RAG component now returns less relevant results, and the semantic cache hit rate drops from 28% to 19% while false positive rate increases.
Business impact: Gradual quality degradation. Customer satisfaction slowly drops. Cache cost-savings decrease by $900/day. No single incident — the decline is only visible in weekly metrics review.
Detection
graph TD
A[Weekly Drift Check<br/>EventBridge scheduled] --> B[Lambda: embed 1000<br/>reference queries]
B --> C[Compare to stored<br/>reference vectors]
C --> D{Mean cosine drift<br/>> 0.05?}
D -->|Yes| E[CloudWatch Alarm:<br/>EmbeddingDriftDetected]
E --> F[SNS to ML team]
D -->|No| G[Drift within<br/>normal range]
H[Cache hit rate<br/>weekly trend] --> I{Hit rate dropped<br/>> 5 percentage points?}
I -->|Yes| J[CloudWatch Alarm:<br/>CacheHitRateDecline]
J --> F
style E fill:#e76f51,stroke:#f4a261,color:#fff
style J fill:#e76f51,stroke:#f4a261,color:#fff
Key metrics:
| Metric | Normal | Alarm | Source |
|---|---|---|---|
embedding.drift_mean_cosine |
< 0.03 | > 0.05 | Weekly Lambda check |
cache.hit_rate_l2 |
> 25% | < 20% | Redis cache metrics |
cache.false_positive_rate |
< 0.5% | > 1.0% | User feedback + logs |
rag.retrieval_relevance_score |
> 0.80 | < 0.72 | Offline evaluation |
Root Cause Analysis
graph TD
DRIFT[Embedding Drift<br/>Detected] --> Q1{Was there a Bedrock<br/>model version update?}
Q1 -->|Yes - minor bump| RC1[Root Cause: Titan Embeddings<br/>minor version update changed<br/>vector space characteristics]
Q1 -->|No| Q2{Did query distribution<br/>change significantly?}
Q2 -->|Yes| RC2[Root Cause: New manga titles<br/>or seasonal trends shifted<br/>query distribution]
Q2 -->|No| Q3{Are reference vectors<br/>too old?}
Q3 -->|Yes| RC3[Root Cause: Reference set<br/>no longer representative<br/>of current query patterns]
RC1 --> FIX1[Re-embed all cache entries<br/>and re-index OpenSearch<br/>with current model]
style DRIFT fill:#e76f51,stroke:#f4a261,color:#fff
style RC1 fill:#264653,stroke:#2a9d8f,color:#fff
Resolution
Step 1 — Confirm drift severity (15 minutes)
"""
Run a targeted drift check comparing current embeddings
against stored reference vectors for critical query categories.
"""
import json
import logging
import math
import boto3
logger = logging.getLogger(__name__)
def run_targeted_drift_check(
bedrock_client,
s3_client,
reference_bucket: str = "mangaassist-ml-artifacts",
reference_key: str = "embeddings/reference_vectors.json",
model_id: str = "amazon.titan-embed-text-v2:0",
) -> dict:
"""
Quick drift check on 100 critical reference queries.
Returns drift statistics by category.
"""
# Load reference vectors
obj = s3_client.get_object(Bucket=reference_bucket, Key=reference_key)
references = json.loads(obj["Body"].read())
category_drifts: dict[str, list[float]] = {}
for ref in references[:100]: # Quick check on first 100
text = ref["text"]
category = ref.get("category", "general")
ref_vec = ref["vector"]
# Generate current embedding
body = json.dumps({
"inputText": text,
"dimensions": 1024,
"normalize": True,
})
response = bedrock_client.invoke_model(
modelId=model_id,
body=body,
contentType="application/json",
accept="application/json",
)
current_vec = json.loads(response["body"].read())["embedding"]
# Cosine distance
dot = sum(a * b for a, b in zip(ref_vec, current_vec))
norm_a = math.sqrt(sum(a * a for a in ref_vec))
norm_b = math.sqrt(sum(b * b for b in current_vec))
distance = 1.0 - (dot / (norm_a * norm_b)) if norm_a and norm_b else 1.0
category_drifts.setdefault(category, []).append(distance)
results = {}
for cat, drifts in category_drifts.items():
results[cat] = {
"mean_drift": round(sum(drifts) / len(drifts), 6),
"max_drift": round(max(drifts), 6),
"sample_count": len(drifts),
}
logger.info("Drift check results: %s", json.dumps(results, indent=2))
return results
Step 2 — Re-index OpenSearch and rebuild cache (4-8 hours)
"""
Full re-embedding pipeline: re-index OpenSearch vectors and
rebuild the semantic cache with current Titan Embeddings.
"""
import json
import logging
import time
from dataclasses import dataclass
import boto3
logger = logging.getLogger(__name__)
@dataclass
class ReindexProgress:
"""Progress tracking for re-indexing."""
total_documents: int = 0
processed: int = 0
failed: int = 0
elapsed_seconds: float = 0.0
@property
def pct_complete(self) -> float:
return (self.processed / self.total_documents * 100) if self.total_documents else 0.0
class EmbeddingReindexer:
"""
Re-embeds all documents in OpenSearch and rebuilds the
semantic cache after an embedding drift event.
Runs as a Step Functions workflow to handle the multi-hour
re-indexing process with checkpointing.
"""
BATCH_SIZE = 100
def __init__(
self,
bedrock_client=None,
opensearch_client=None,
redis_client=None,
model_id: str = "amazon.titan-embed-text-v2:0",
):
self.bedrock = bedrock_client or boto3.client(
"bedrock-runtime", region_name="ap-northeast-1"
)
self.opensearch = opensearch_client
self.redis = redis_client
self.model_id = model_id
def reindex_opensearch_batch(
self, documents: list[dict], index_name: str
) -> int:
"""
Re-embed and update a batch of OpenSearch documents.
Returns count of successfully updated documents.
"""
updated = 0
for doc in documents:
try:
# Generate new embedding
body = json.dumps({
"inputText": doc["text"],
"dimensions": 1024,
"normalize": True,
})
response = self.bedrock.invoke_model(
modelId=self.model_id,
body=body,
contentType="application/json",
accept="application/json",
)
new_vector = json.loads(response["body"].read())["embedding"]
# Update OpenSearch document
self.opensearch.update(
index=index_name,
id=doc["id"],
body={"doc": {"embedding": new_vector}},
)
updated += 1
except Exception as e:
logger.error("Failed to re-embed doc %s: %s", doc["id"], e)
return updated
def rebuild_semantic_cache(self) -> int:
"""
Flush and rebuild the semantic cache with current embeddings.
"""
if not self.redis:
return 0
# Get all cache entries
cursor = 0
rebuilt = 0
while True:
cursor, keys = self.redis.scan(cursor, match="sc:*", count=500)
for key in keys:
try:
query_text = self.redis.hget(key, "query_text")
if not query_text:
continue
# Re-embed
body = json.dumps({
"inputText": query_text.decode(),
"dimensions": 1024,
"normalize": True,
})
response = self.bedrock.invoke_model(
modelId=self.model_id,
body=body,
contentType="application/json",
accept="application/json",
)
new_vector = json.loads(response["body"].read())["embedding"]
# Update the embedding field in Redis
import struct
blob = struct.pack(f"{len(new_vector)}f", *new_vector)
self.redis.hset(key, "embedding", blob)
rebuilt += 1
except Exception as e:
logger.error("Failed to rebuild cache key %s: %s", key, e)
if cursor == 0:
break
logger.info("Rebuilt %d semantic cache entries", rebuilt)
return rebuilt
def update_reference_vectors(
self, s3_client, reference_queries: list[dict],
bucket: str = "mangaassist-ml-artifacts",
key: str = "embeddings/reference_vectors.json",
) -> None:
"""Store new reference vectors after re-indexing."""
new_references = []
for rq in reference_queries:
body = json.dumps({
"inputText": rq["text"],
"dimensions": 1024,
"normalize": True,
})
response = self.bedrock.invoke_model(
modelId=self.model_id,
body=body,
contentType="application/json",
accept="application/json",
)
vector = json.loads(response["body"].read())["embedding"]
new_references.append({
"text": rq["text"],
"category": rq.get("category", "general"),
"vector": vector,
"timestamp": int(time.time()),
})
s3_client.put_object(
Bucket=bucket,
Key=key,
Body=json.dumps(new_references),
ContentType="application/json",
)
logger.info("Updated %d reference vectors in S3", len(new_references))
Prevention
- Run weekly embedding drift checks via EventBridge + Lambda
- Pin Titan Embeddings model version where possible
- Alert on cache hit rate decline (leading indicator of drift)
- Automate re-indexing pipeline to run when drift exceeds threshold
- Store embedding model version in every cache entry and OpenSearch document
Cross-Scenario Decision Tree
graph TD
START[Model Coordination<br/>Incident Detected] --> TYPE{What type of<br/>incident?}
TYPE -->|Quality degradation| QUAL{Sudden or<br/>gradual?}
QUAL -->|Sudden| CONFLICT[Scenario 1: Ensemble Conflict<br/>or Scenario 2: Router Misclass]
QUAL -->|Gradual| DRIFT[Scenario 5: Embedding Drift]
TYPE -->|Errors/timeouts| ERR{Which component<br/>failing?}
ERR -->|Aggregation layer| TIMEOUT[Scenario 3: Aggregation Timeout]
ERR -->|Single model| CB[Check Circuit Breaker<br/>See Scenario 3 resolution]
TYPE -->|Cost anomaly| COST{Quality improved<br/>proportionally?}
COST -->|No| EXPLOSION[Scenario 4: Cost Explosion]
COST -->|Yes| EXPECTED[Expected cost increase<br/>validate with guardrail]
CONFLICT --> FIX_MERGE[Fix: Switch merge strategy<br/>+ add conflict detection]
DRIFT --> FIX_REINDEX[Fix: Re-index + rebuild cache<br/>+ update reference vectors]
TIMEOUT --> FIX_DEGRADE[Fix: Graceful degradation<br/>+ circuit breaker]
EXPLOSION --> FIX_REVERT[Fix: Revert threshold<br/>+ add cost guardrail]
style START fill:#264653,stroke:#2a9d8f,color:#fff
style TIMEOUT fill:#e76f51,stroke:#f4a261,color:#fff
style EXPLOSION fill:#e76f51,stroke:#f4a261,color:#fff
Runbook Summary Table
| Scenario | First Response (< 5 min) | Root Fix (< 4 hr) | Prevention |
|---|---|---|---|
| 1. Ensemble Conflict | Switch merge strategy to REFINEMENT via Redis flag | Add conflict detector + shared recommendation framework | Monitor conflict_rate, default REFINEMENT for recommendations |
| 2. Router Misclassification | Enable JP cascade override via Redis flag | Fix classification prompt + add language normalization | Bilingual prompts, pin model version, regression tests |
| 3. Aggregation Timeout | Enable graceful degradation (partial results) | Add per-model circuit breaker | Per-model timeouts, Bedrock latency alarms, fallback paths |
| 4. Cost Explosion | Revert complexity threshold via Redis config | Add deployment cost guardrail to CI/CD | Cost ceilings, cost-per-query dashboard, mandatory cost review |
| 5. Embedding Drift | N/A (gradual, detected weekly) | Re-index OpenSearch + rebuild cache + update references | Weekly drift checks, cache hit rate monitoring, version pinning |
Key Takeaways
-
Feature flags in Redis enable instant mitigation — all five scenarios use Redis-based feature flags for immediate response without code deployment, cutting MTTR from hours to minutes.
-
Graceful degradation is mandatory for ensemble systems — when one model in the ensemble fails, the system must still return a response using available models rather than failing entirely.
-
Circuit breakers prevent cascade failures — when Sonnet is degraded, the circuit breaker automatically routes to Haiku, preventing timeout propagation across the entire system.
-
Cost guardrails belong in the deployment pipeline — routing threshold changes should require cost-impact analysis before reaching production, not after finance discovers a budget overshoot.
-
Embedding drift is the silent quality killer — unlike outages which are immediately visible, embedding drift degrades quality gradually over weeks; only proactive monitoring catches it.
-
Language-aware routing prevents systematic bias — Japanese queries are systematically underscored for complexity by English-trained classifiers; language-specific calibration offsets fix this.
-
Conflict detection improves merge quality — when ensemble models disagree, the system should detect the conflict and switch to a refinement strategy rather than naively concatenating divergent responses.