LOCAL PREVIEW View on GitHub

Model Coordination Systems — Scenarios and Runbooks

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Certification Domain Task Skill This File
AWS AIP-C01 Domain 2 — Implementation & Integration Task 2.1 — Select and implement FM integration approaches Skill 2.1.4 — Design model coordination systems Scenarios and runbooks for ensemble failures, router misclassification, aggregation timeout, cost explosion, and embedding drift

Skill scope: Five production incident scenarios covering the failure modes of multi-model coordination in a high-traffic GenAI chatbot — conflicting ensemble recommendations, router misclassifying queries, aggregation timeout, cost explosion from unnecessary Sonnet routing, and embedding drift. Each with detection, root cause analysis, resolution steps, and prevention measures.


Scenario Overview

# Scenario Severity Blast Radius Detection Time Resolution Time
1 Conflicting ensemble recommendations P2 12% of recommendation queries ~15 min (quality score drop) 1-2 hours
2 Router misclassifying queries P1 25% of all queries ~5 min (latency spike) 30-60 min
3 Aggregation timeout P1 100% of ensemble queries ~1 min (error rate spike) 15-30 min
4 Cost explosion from unnecessary Sonnet routing P2 Budget (not quality) ~2 hours (billing alert) 30 min
5 Embedding drift degrading ensemble quality P3 Gradual — 5-10% quality loss ~1 week (weekly drift check) 4-8 hours

Scenario 1 — Conflicting Ensemble Recommendations

Problem Statement

A customer asks: "I loved Death Note, what should I read next?"

The 3-model ensemble produces conflicting recommendations: - Haiku suggests "Monster" (psychological thriller match) - Sonnet suggests "Code Geass" (strategic genius theme match) - RAG (OpenSearch) returns "Bakuman" (same author — Tsugumi Ohba)

The quality scorer rates all three above 0.70, and the response merger produces an incoherent answer that mixes all three recommendations without clear reasoning. The customer receives: "You might enjoy Monster, Code Geass, and Bakuman because they share themes of..." — a generic response that fails to explain why each was chosen, reducing trust and engagement.

Business impact: Customer satisfaction drops on recommendation queries. Conversion rate for "similar manga" recommendations falls from 8.2% to 5.1% over three days.

Detection

graph TD
    A[Ensemble produces 3+<br/>divergent recommendations] --> B{Quality scores<br/>within 0.05 of each other?}
    B -->|Yes — no clear winner| C[Log: ensemble_conflict_detected]
    C --> D[CloudWatch Metric:<br/>ensemble.conflict_rate]
    D --> E{conflict_rate > 15%<br/>of ensemble queries?}
    E -->|Yes| F[CloudWatch Alarm:<br/>EnsembleConflictRateHigh]
    F --> G[PagerDuty Alert]

    B -->|No — clear winner| H[Normal aggregation<br/>no conflict]

    E -->|No| I[Within normal range<br/>some conflict is expected]

    style F fill:#e76f51,stroke:#f4a261,color:#fff

Key metrics to monitor:

Metric Normal Alarm Threshold Source
ensemble.conflict_rate < 10% > 15% of ensemble queries Aggregator logs
ensemble.quality_spread < 0.15 > 0.05 spread among top 3 Quality scorer
recommendation.ctr > 7% < 5% Frontend analytics
recommendation.feedback_negative < 8% > 15% User thumbs down

Root Cause Analysis

graph TD
    CONF[Ensemble Conflict<br/>Detected] --> Q1{Are models using<br/>different criteria?}
    Q1 -->|Yes| RC1[Root Cause: No shared<br/>recommendation framework<br/>Models optimize different signals]
    Q1 -->|No| Q2{Is RAG returning<br/>author-based matches<br/>instead of theme-based?}
    Q2 -->|Yes| RC2[Root Cause: OpenSearch index<br/>over-indexes author metadata<br/>vs thematic embeddings]
    Q2 -->|No| Q3{Is the merger strategy<br/>inappropriate for<br/>multi-option responses?}
    Q3 -->|Yes| RC3[Root Cause: Union merge<br/>concatenates without ranking<br/>needs selection or refinement]
    Q3 -->|No| RC4[Root Cause: Quality scorer<br/>not differentiating between<br/>complementary vs conflicting responses]

    RC1 --> FIX1[Add shared recommendation<br/>criteria in system prompt]
    RC2 --> FIX2[Rebalance OpenSearch<br/>scoring: theme > author]
    RC3 --> FIX3[Switch merger to REFINEMENT<br/>for recommendation queries]
    RC4 --> FIX4[Add conflict dimension<br/>to quality scorer]

    style CONF fill:#e76f51,stroke:#f4a261,color:#fff
    style RC3 fill:#264653,stroke:#2a9d8f,color:#fff

In this scenario: Root Cause 3 is primary. The union merge strategy blindly combines all responses when a refinement strategy (Sonnet synthesizes the best recommendation with supporting evidence) would produce a coherent single recommendation.

Resolution

Step 1 — Switch merge strategy for recommendations (5 minutes)

"""
Emergency: change merge strategy for recommendation queries
from UNION to REFINEMENT. Deploy via feature flag in Redis.
"""
import redis
import json
import logging

logger = logging.getLogger(__name__)


def switch_recommendation_merge_strategy(
    redis_client: redis.Redis,
    new_strategy: str = "refinement",
) -> None:
    """
    Update merge strategy for recommendation queries via feature flag.
    ECS tasks read this flag at the start of each ensemble execution.
    """
    flag_key = "feature_flags:ensemble:recommendation_merge_strategy"
    redis_client.set(flag_key, new_strategy)
    logger.info("Switched recommendation merge strategy to: %s", new_strategy)


def get_merge_strategy(
    redis_client: redis.Redis,
    query_type: str,
) -> str:
    """Read current merge strategy from feature flag."""
    flag_key = f"feature_flags:ensemble:{query_type}_merge_strategy"
    strategy = redis_client.get(flag_key)
    if strategy:
        return strategy.decode()
    # Defaults per query type
    defaults = {
        "recommendation": "refinement",
        "factual": "selection",
        "creative": "summarization",
        "general": "selection",
    }
    return defaults.get(query_type, "selection")

Step 2 — Add recommendation framework to system prompt (30 minutes)

"""
Update the ensemble system prompt to include a shared
recommendation framework so all models use consistent criteria.
"""

RECOMMENDATION_SYSTEM_PROMPT = """You are a manga recommendation expert for MangaAssist.

When recommending manga similar to a title the customer mentions, follow this framework:
1. PRIMARY: Match by genre and theme (e.g., psychological thriller, action, romance)
2. SECONDARY: Match by narrative style (episodic vs. arc-based, dark vs. lighthearted)
3. TERTIARY: Match by author or artist (same creator's other works)

Always explain WHY you are recommending each title:
- "Because you enjoyed the psychological tension in Death Note, you'll love Monster's slow-burn suspense."

Do NOT list multiple recommendations without clear reasoning for each.
Limit to 2-3 recommendations maximum, ranked by relevance.

Respond with JSON:
{
    "recommendations": [
        {"title": "...", "reason": "...", "match_type": "genre|style|author", "confidence": 0.0-1.0}
    ]
}
"""

Step 3 — Add conflict detection to the aggregator (1-2 hours)

"""
Detect when ensemble responses conflict and trigger
refinement merge instead of union merge automatically.
"""

import logging
from dataclasses import dataclass

logger = logging.getLogger(__name__)


@dataclass
class ConflictAnalysis:
    """Analysis of whether ensemble responses conflict."""
    is_conflicting: bool
    conflict_score: float  # 0.0 = full agreement, 1.0 = full disagreement
    recommended_strategy: str
    reason: str


class EnsembleConflictDetector:
    """
    Detects when ensemble responses are divergent rather than
    complementary, and recommends the appropriate merge strategy.
    """

    CONFLICT_THRESHOLD = 0.6

    def analyze_responses(
        self,
        responses: list[dict],
        quality_scores: list[float],
    ) -> ConflictAnalysis:
        """
        Determine if responses conflict based on quality score spread
        and response text similarity.

        responses: [{"model_id": str, "response": str, "entities": list}, ...]
        quality_scores: [float, ...] aligned with responses
        """
        if len(responses) < 2:
            return ConflictAnalysis(
                is_conflicting=False,
                conflict_score=0.0,
                recommended_strategy="selection",
                reason="Single response, no conflict possible",
            )

        # Check quality score spread
        score_spread = max(quality_scores) - min(quality_scores)
        scores_close = score_spread < 0.10

        # Check entity overlap (e.g., recommended titles)
        all_entities = [set(r.get("entities", [])) for r in responses]
        if all_entities and any(all_entities):
            union = set().union(*all_entities)
            intersection = set.intersection(*all_entities) if all(all_entities) else set()
            entity_overlap = len(intersection) / len(union) if union else 1.0
        else:
            entity_overlap = 0.5  # Unknown

        # Conflict score: high when scores are close but entities differ
        conflict_score = (1.0 - entity_overlap) * (1.0 if scores_close else 0.5)

        if conflict_score > self.CONFLICT_THRESHOLD:
            return ConflictAnalysis(
                is_conflicting=True,
                conflict_score=conflict_score,
                recommended_strategy="refinement",
                reason=f"Entity overlap={entity_overlap:.2f}, score_spread={score_spread:.2f}",
            )

        return ConflictAnalysis(
            is_conflicting=False,
            conflict_score=conflict_score,
            recommended_strategy="union" if scores_close else "selection",
            reason=f"Entity overlap={entity_overlap:.2f}, score_spread={score_spread:.2f}",
        )

Prevention

  • Default merge strategy for recommendation queries should be REFINEMENT, not UNION
  • Add shared recommendation framework to all model system prompts
  • Monitor ensemble.conflict_rate and alert at 15%
  • Run weekly A/B test comparing merge strategies to validate quality

Scenario 2 — Router Misclassifying Queries

Problem Statement

The Haiku-based complexity router starts misclassifying 25% of complex Japanese-language queries as "simple," sending them to Haiku-only processing instead of the Sonnet ensemble. Customers asking nuanced questions like "この漫画の文化的な背景を教えてください" ("Tell me about the cultural background of this manga") receive shallow, generic answers.

Business impact: Japanese-language CSAT drops from 4.2 to 3.4 out of 5. Customer escalation rate for JP queries increases 3x.

Detection

graph TD
    A[Query arrives in Japanese] --> B[Router classifies<br/>complexity = 0.25 'simple']
    B --> C[Routes to Haiku Only]
    C --> D[Haiku gives shallow response]
    D --> E[Customer gives thumbs down]
    E --> F{Negative feedback rate<br/>for Haiku-only JP queries?}
    F -->|> 20%| G[CloudWatch Alarm:<br/>RouterMisclassificationJP]
    G --> H[PagerDuty Alert]

    F -->|< 20%| I[Normal range]

    style G fill:#e76f51,stroke:#f4a261,color:#fff

Key metrics:

Metric Normal Alarm Source
router.jp_simple_pct 35-40% > 55% Router classification logs
haiku_only.jp_negative_feedback < 12% > 20% User feedback
router.complexity_score_mean_jp 0.55 < 0.40 Router logs
escalation.jp_rate < 5% > 12% Support ticket system

Root Cause Analysis

graph TD
    MISS[Router Misclassification<br/>JP Queries] --> Q1{Is the classification<br/>prompt English-only?}
    Q1 -->|Yes| RC1[Root Cause: Classification prompt<br/>lacks Japanese examples<br/>and complexity criteria]
    Q1 -->|No| Q2{Did Haiku model<br/>version change recently?}
    Q2 -->|Yes| RC2[Root Cause: New Haiku version<br/>has different JP complexity<br/>assessment behavior]
    Q2 -->|No| Q3{Are JP queries<br/>longer on average?}
    Q3 -->|Yes| RC3[Root Cause: Token-count heuristic<br/>in router biased toward<br/>JP character density]
    Q3 -->|No| RC4[Root Cause: Complexity training<br/>data under-represents<br/>Japanese query patterns]

    RC1 --> FIX1[Add JP classification examples<br/>and bilingual complexity criteria]
    RC2 --> FIX2[Pin Haiku model version<br/>add regression test suite]
    RC3 --> FIX3[Normalize token count<br/>by language before routing]
    RC4 --> FIX4[Augment training data<br/>with JP complexity labels]

    style MISS fill:#e76f51,stroke:#f4a261,color:#fff
    style RC1 fill:#264653,stroke:#2a9d8f,color:#fff

Resolution

Step 1 — Emergency: bypass router for JP queries (5 minutes)

"""
Emergency override: route all Japanese queries to Cascade
strategy instead of relying on the complexity router.
"""
import redis
import logging

logger = logging.getLogger(__name__)


def enable_jp_cascade_override(redis_client: redis.Redis) -> None:
    """
    Set feature flag to bypass complexity router for Japanese queries.
    All JP queries go through Cascade (Haiku first, Sonnet escalation).
    """
    redis_client.set(
        "feature_flags:router:jp_override_strategy", "cascade"
    )
    redis_client.set(
        "feature_flags:router:jp_override_enabled", "true"
    )
    logger.info("JP cascade override ENABLED — all JP queries bypass router")


def check_jp_override(
    redis_client: redis.Redis,
    detected_language: str,
) -> str | None:
    """Check if JP override is active before routing."""
    if detected_language != "ja":
        return None

    enabled = redis_client.get("feature_flags:router:jp_override_enabled")
    if enabled and enabled.decode() == "true":
        strategy = redis_client.get("feature_flags:router:jp_override_strategy")
        return strategy.decode() if strategy else "cascade"

    return None

Step 2 — Fix the classification prompt (30 minutes)

"""
Updated classification prompt with bilingual complexity criteria
and Japanese-specific examples.
"""

BILINGUAL_COMPLEXITY_PROMPT = """Classify the complexity of the following customer query for a manga store chatbot.

Complexity levels:
- SIMPLE (0.0-0.3): Factual lookups, yes/no questions, greetings
  EN: "What's the price of One Piece vol 108?" / "Do you ship to Canada?"
  JP: "ワンピース108巻の値段は?" / "カナダに発送しますか?"

- MEDIUM (0.3-0.7): Comparisons, basic recommendations, multi-step questions
  EN: "What's the difference between the regular and deluxe edition?"
  JP: "通常版とデラックス版の違いは何ですか?"

- COMPLEX (0.7-1.0): Cultural context, thematic analysis, nuanced recommendations, opinions
  EN: "How does the cultural context of post-war Japan influence the themes in Barefoot Gen?"
  JP: "この漫画の文化的な背景を教えてください" / "鬼滅の刃のテーマ的な深みについて教えて"

IMPORTANT: Japanese queries that ask about 背景 (background), 文化 (culture), テーマ (theme),
意味 (meaning), 影響 (influence), or 比較 (comparison) are almost always COMPLEX.

Query: {query}

Respond with JSON only: {{"complexity": 0.0-1.0, "reasoning": "brief explanation"}}"""

Step 3 — Add language-aware complexity normalization (1 hour)

"""
Normalize complexity scores by language to account for
systematic underestimation of Japanese query complexity.
"""

import json
import logging
from dataclasses import dataclass

logger = logging.getLogger(__name__)

# Calibration offsets learned from evaluation data
LANGUAGE_COMPLEXITY_OFFSETS = {
    "ja": 0.15,   # JP queries systematically underscored by 0.15
    "en": 0.0,    # English is baseline
    "zh": 0.10,   # Chinese queries slightly underscored
}

JP_COMPLEXITY_KEYWORDS = [
    "背景", "文化", "テーマ", "意味", "影響", "比較",
    "解釈", "象徴", "分析", "考察", "歴史", "社会",
]


@dataclass
class NormalizedComplexity:
    """Complexity score with language normalization applied."""
    raw_score: float
    language: str
    offset: float
    keyword_boost: float
    final_score: float


def normalize_complexity(
    raw_score: float,
    query: str,
    language: str,
) -> NormalizedComplexity:
    """
    Apply language-specific calibration offset and keyword boosting.
    """
    offset = LANGUAGE_COMPLEXITY_OFFSETS.get(language, 0.0)

    # Keyword boost for JP complex indicators
    keyword_boost = 0.0
    if language == "ja":
        matches = sum(1 for kw in JP_COMPLEXITY_KEYWORDS if kw in query)
        keyword_boost = min(0.20, matches * 0.08)

    final_score = min(1.0, raw_score + offset + keyword_boost)

    return NormalizedComplexity(
        raw_score=raw_score,
        language=language,
        offset=offset,
        keyword_boost=keyword_boost,
        final_score=round(final_score, 3),
    )

Prevention

  • Include bilingual (JP + EN) examples in all classification prompts
  • Add language-aware complexity normalization to the router
  • Pin Haiku model version and run regression tests on version updates
  • Monitor per-language routing distributions with CloudWatch dashboards

Scenario 3 — Aggregation Timeout

Problem Statement

The ensemble aggregator's asyncio.gather call times out at 3 seconds because Sonnet response latency spikes to 4.5 seconds during a Bedrock service degradation event. All ensemble queries fail with timeout errors, and customers see "Sorry, I'm having trouble processing your request."

Business impact: 100% of complex queries fail for 45 minutes. Error rate jumps from 0.1% to 38%. Customer abandonment rate spikes.

Detection

graph TD
    A[asyncio.gather raises<br/>TimeoutError at 3s] --> B[ECS task logs<br/>aggregation_timeout error]
    B --> C[CloudWatch Metric:<br/>ensemble.timeout_count]
    C --> D{timeout_count > 50<br/>in 5 minutes?}
    D -->|Yes| E[CloudWatch Alarm:<br/>EnsembleTimeoutCritical]
    E --> F[PagerDuty P1 Alert]

    D -->|No| G[Sporadic timeout<br/>log and continue]

    H[Bedrock latency CloudWatch<br/>shows Sonnet p99 = 4.5s] --> I{Sonnet p99 > 3s?}
    I -->|Yes| J[CloudWatch Alarm:<br/>BedrockSonnetLatencyHigh]
    J --> F

    style E fill:#e76f51,stroke:#f4a261,color:#fff
    style J fill:#e76f51,stroke:#f4a261,color:#fff

Key metrics:

Metric Normal Alarm Source
ensemble.timeout_count < 5/hr > 50 in 5 min ECS application logs
ensemble.timeout_rate < 0.5% > 5% timeout / total ensemble
bedrock.sonnet_p99_latency < 2s > 3s Bedrock CloudWatch
customer.error_rate < 0.5% > 5% API Gateway 5xx

Root Cause Analysis

graph TD
    TO[Aggregation Timeout] --> Q1{Which model<br/>timed out?}
    Q1 -->|Sonnet| Q2{Is Bedrock reporting<br/>service issues?}
    Q2 -->|Yes| RC1[Root Cause: Bedrock Sonnet<br/>service degradation<br/>Latency spike across region]
    Q2 -->|No| Q3{Is the prompt<br/>unusually long?}
    Q3 -->|Yes| RC2[Root Cause: Prompt bloat<br/>RAG context too large<br/>exceeding token limit]
    Q3 -->|No| RC3[Root Cause: ECS task<br/>resource exhaustion<br/>CPU/memory pressure]

    Q1 -->|OpenSearch| RC4[Root Cause: OpenSearch<br/>collection throttling<br/>or index not available]

    Q1 -->|All models| RC5[Root Cause: Network issue<br/>VPC endpoint or NAT<br/>gateway congestion]

    RC1 --> FIX1[Enable cascade fallback<br/>Use Haiku-only when<br/>Sonnet is degraded]

    style TO fill:#e76f51,stroke:#f4a261,color:#fff
    style RC1 fill:#264653,stroke:#2a9d8f,color:#fff

Resolution

Step 1 — Enable graceful degradation (2 minutes)

"""
Emergency: enable graceful degradation mode.
When Sonnet times out, the aggregator proceeds with
whatever results are available (Haiku + RAG).
"""
import asyncio
import json
import logging
import time
from dataclasses import dataclass, field

import boto3

logger = logging.getLogger(__name__)


@dataclass
class PartialEnsembleResult:
    """Result when some models time out."""
    responses: list[dict] = field(default_factory=list)
    timed_out_models: list[str] = field(default_factory=list)
    is_partial: bool = False
    total_latency_ms: float = 0.0


class ResilientEnsembleAggregator:
    """
    Ensemble aggregator with per-model timeout and graceful degradation.

    Instead of asyncio.gather with a single timeout, each model
    gets its own timeout. If Sonnet times out, Haiku + RAG results
    are used with a quality warning flag.
    """

    MODEL_TIMEOUTS = {
        "anthropic.claude-3-sonnet-20240229-v1:0": 2.5,   # seconds
        "anthropic.claude-3-haiku-20240307-v1:0": 1.0,
        "opensearch_rag": 1.5,
    }

    MINIMUM_RESPONSES = 1  # At least 1 model must respond

    def __init__(self, bedrock_client=None):
        self.bedrock = bedrock_client or boto3.client(
            "bedrock-runtime", region_name="ap-northeast-1"
        )

    async def invoke_with_timeout(
        self,
        model_id: str,
        query: str,
        system_prompt: str,
    ) -> dict | None:
        """Invoke a single model with per-model timeout."""
        timeout = self.MODEL_TIMEOUTS.get(model_id, 2.0)

        try:
            result = await asyncio.wait_for(
                self._invoke_model(model_id, query, system_prompt),
                timeout=timeout,
            )
            return result
        except asyncio.TimeoutError:
            logger.warning(
                "Model %s timed out after %.1fs", model_id, timeout
            )
            return None
        except Exception as e:
            logger.error("Model %s failed: %s", model_id, str(e))
            return None

    async def aggregate_with_degradation(
        self,
        query: str,
        system_prompt: str,
        model_ids: list[str],
    ) -> PartialEnsembleResult:
        """
        Run ensemble with graceful degradation.
        Proceeds with partial results if some models fail.
        """
        start = time.monotonic()

        tasks = {
            model_id: self.invoke_with_timeout(model_id, query, system_prompt)
            for model_id in model_ids
        }

        results = {}
        for model_id, task in tasks.items():
            results[model_id] = await task

        responses = []
        timed_out = []

        for model_id, result in results.items():
            if result is not None:
                responses.append(result)
            else:
                timed_out.append(model_id)

        elapsed = (time.monotonic() - start) * 1000

        if len(responses) < self.MINIMUM_RESPONSES:
            logger.error(
                "Ensemble critically degraded: %d/%d models responded",
                len(responses), len(model_ids),
            )
            # Last resort: synchronous Haiku call with extended timeout
            fallback = await self._emergency_haiku_fallback(query, system_prompt)
            if fallback:
                responses.append(fallback)

        return PartialEnsembleResult(
            responses=responses,
            timed_out_models=timed_out,
            is_partial=len(timed_out) > 0,
            total_latency_ms=elapsed,
        )

    async def _invoke_model(
        self, model_id: str, query: str, system_prompt: str
    ) -> dict:
        """Invoke a Bedrock model."""
        body = json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 1024,
            "temperature": 0.2,
            "system": system_prompt,
            "messages": [{"role": "user", "content": query}],
        })

        loop = asyncio.get_event_loop()
        response = await loop.run_in_executor(
            None,
            lambda: self.bedrock.invoke_model(
                modelId=model_id,
                body=body,
                contentType="application/json",
                accept="application/json",
            ),
        )

        result = json.loads(response["body"].read())
        return {
            "model_id": model_id,
            "response": result["content"][0]["text"],
            "usage": result.get("usage", {}),
        }

    async def _emergency_haiku_fallback(
        self, query: str, system_prompt: str
    ) -> dict | None:
        """Last-resort Haiku call with extended timeout."""
        try:
            return await asyncio.wait_for(
                self._invoke_model(
                    "anthropic.claude-3-haiku-20240307-v1:0",
                    query, system_prompt,
                ),
                timeout=5.0,
            )
        except Exception as e:
            logger.critical("Emergency Haiku fallback failed: %s", e)
            return None

Step 2 — Add circuit breaker for Sonnet (30 minutes)

"""
Circuit breaker pattern: automatically disable Sonnet invocation
when its error/timeout rate exceeds threshold, and periodically
probe to check if it has recovered.
"""
import time
import logging
from dataclasses import dataclass
from enum import Enum

import redis

logger = logging.getLogger(__name__)


class CircuitState(str, Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Sonnet disabled
    HALF_OPEN = "half_open"  # Probing for recovery


@dataclass
class CircuitBreakerConfig:
    failure_threshold: int = 10      # Failures before opening
    recovery_timeout_s: int = 60     # Seconds before probing
    probe_success_threshold: int = 3  # Successful probes to close


class ModelCircuitBreaker:
    """
    Redis-backed circuit breaker for Bedrock model invocations.

    States:
    - CLOSED: normal operation, counting failures
    - OPEN: model disabled, timer running
    - HALF_OPEN: probing with single requests
    """

    def __init__(
        self,
        redis_client: redis.Redis,
        model_id: str,
        config: CircuitBreakerConfig | None = None,
    ):
        self.redis = redis_client
        self.model_id = model_id
        self.config = config or CircuitBreakerConfig()
        self._prefix = f"circuit_breaker:{model_id}"

    def should_allow_request(self) -> bool:
        """Check if a request should be allowed through."""
        state = self._get_state()

        if state == CircuitState.CLOSED:
            return True

        if state == CircuitState.OPEN:
            opened_at = float(self.redis.get(f"{self._prefix}:opened_at") or 0)
            if time.time() - opened_at > self.config.recovery_timeout_s:
                self._set_state(CircuitState.HALF_OPEN)
                logger.info("Circuit breaker %s: OPEN -> HALF_OPEN (probing)", self.model_id)
                return True
            return False

        if state == CircuitState.HALF_OPEN:
            return True

        return True

    def record_success(self) -> None:
        """Record a successful invocation."""
        state = self._get_state()

        if state == CircuitState.HALF_OPEN:
            probe_count = self.redis.incr(f"{self._prefix}:probe_successes")
            if probe_count >= self.config.probe_success_threshold:
                self._set_state(CircuitState.CLOSED)
                self.redis.delete(f"{self._prefix}:failure_count")
                self.redis.delete(f"{self._prefix}:probe_successes")
                logger.info("Circuit breaker %s: HALF_OPEN -> CLOSED", self.model_id)

        elif state == CircuitState.CLOSED:
            # Reset failure count on success
            self.redis.set(f"{self._prefix}:failure_count", 0)

    def record_failure(self) -> None:
        """Record a failed invocation."""
        state = self._get_state()

        if state == CircuitState.HALF_OPEN:
            self._set_state(CircuitState.OPEN)
            self.redis.set(f"{self._prefix}:opened_at", str(time.time()))
            self.redis.delete(f"{self._prefix}:probe_successes")
            logger.warning("Circuit breaker %s: HALF_OPEN -> OPEN (probe failed)", self.model_id)

        elif state == CircuitState.CLOSED:
            failures = self.redis.incr(f"{self._prefix}:failure_count")
            if failures >= self.config.failure_threshold:
                self._set_state(CircuitState.OPEN)
                self.redis.set(f"{self._prefix}:opened_at", str(time.time()))
                logger.warning(
                    "Circuit breaker %s: CLOSED -> OPEN (%d failures)",
                    self.model_id, failures,
                )

    def _get_state(self) -> CircuitState:
        raw = self.redis.get(f"{self._prefix}:state")
        if raw:
            return CircuitState(raw.decode())
        return CircuitState.CLOSED

    def _set_state(self, state: CircuitState) -> None:
        self.redis.set(f"{self._prefix}:state", state.value)

Prevention

  • Use per-model timeouts instead of a single asyncio.gather timeout
  • Implement circuit breaker for each Bedrock model endpoint
  • Set up CloudWatch alarms on bedrock.sonnet_p99_latency
  • Design for graceful degradation: always return something to the customer

Scenario 4 — Cost Explosion from Unnecessary Sonnet Routing

Problem Statement

After a deployment that lowered the complexity classification threshold from 0.7 to 0.5, 65% of queries are now routed to Sonnet or full ensemble (up from 25%). Daily Bedrock spend jumps from $5,000 to $14,200. The quality improvement is negligible — only +2% quality score for a 184% cost increase.

Business impact: Monthly budget blown in 11 days. Finance team flags the anomaly.

Detection

graph TD
    A[Bedrock daily spend<br/>computed hourly] --> B{Spend > 120%<br/>of daily budget?}
    B -->|Yes at hour 14| C[CloudWatch Alarm:<br/>BedrockBudgetExceeded]
    C --> D[SNS to finance +<br/>engineering on-call]

    E[Sonnet invocation count<br/>per hour] --> F{Sonnet queries > 60%<br/>of total?}
    F -->|Yes| G[CloudWatch Alarm:<br/>SonnetRoutingExcessive]
    G --> D

    H[Cost-per-query metric<br/>rolling average] --> I{Avg cost/query > $0.01?}
    I -->|Yes| J[CloudWatch Alarm:<br/>CostPerQueryHigh]
    J --> D

    style C fill:#e76f51,stroke:#f4a261,color:#fff
    style G fill:#e76f51,stroke:#f4a261,color:#fff

Root Cause Analysis

graph TD
    COST[Cost Explosion<br/>$14,200/day] --> Q1{When did spend<br/>increase start?}
    Q1 --> Q2{Correlates with<br/>a deployment?}
    Q2 -->|Yes| Q3{What changed in<br/>the deployment?}
    Q3 --> RC1[Root Cause: Complexity threshold<br/>lowered from 0.7 to 0.5<br/>65% of queries now 'complex']

    Q2 -->|No| Q4{Did query volume<br/>increase?}
    Q4 -->|Yes| RC2[Root Cause: Viral traffic spike<br/>manga trending on social media]
    Q4 -->|No| RC3[Root Cause: Routing logic bug<br/>fallback always hits Sonnet]

    RC1 --> FIX1[Revert threshold to 0.7<br/>Add cost-impact review<br/>to deployment checklist]

    style COST fill:#e76f51,stroke:#f4a261,color:#fff
    style RC1 fill:#264653,stroke:#2a9d8f,color:#fff

Resolution

Step 1 — Emergency: revert complexity threshold (2 minutes)

"""
Emergency: restore complexity routing threshold via Redis
feature flag. No deployment required.
"""
import redis
import logging

logger = logging.getLogger(__name__)


def revert_complexity_threshold(
    redis_client: redis.Redis,
    threshold: float = 0.7,
) -> None:
    """Restore the complexity threshold for Sonnet routing."""
    redis_client.set("config:router:complexity_threshold_medium", str(0.3))
    redis_client.set("config:router:complexity_threshold_complex", str(threshold))
    logger.info("Reverted complexity threshold to: %.2f", threshold)


def enable_cost_ceiling(
    redis_client: redis.Redis,
    daily_ceiling_usd: float = 6000.0,
    per_query_ceiling_usd: float = 0.015,
) -> None:
    """
    Set hard cost ceilings. The aggregator checks these before
    selecting ensemble strategy.
    """
    redis_client.set("config:budget:daily_ceiling_usd", str(daily_ceiling_usd))
    redis_client.set("config:budget:per_query_ceiling_usd", str(per_query_ceiling_usd))
    logger.info(
        "Cost ceilings set: daily=$%.0f per_query=$%.3f",
        daily_ceiling_usd, per_query_ceiling_usd,
    )

Step 2 — Add cost-impact guardrail to deployment pipeline (1 hour)

"""
Pre-deployment cost impact estimator.

Runs the new configuration against a sample of historical queries
and computes projected daily cost. Blocks deployment if cost
increase exceeds threshold.
"""
import json
import logging
from dataclasses import dataclass

logger = logging.getLogger(__name__)


@dataclass
class CostImpactEstimate:
    """Projected cost impact of a configuration change."""
    current_daily_cost: float
    projected_daily_cost: float
    cost_change_pct: float
    quality_change_pct: float
    cost_per_quality_point: float
    approved: bool
    rejection_reason: str = ""


class DeploymentCostGuardrail:
    """
    Estimates cost impact of routing configuration changes
    before they are deployed to production.
    """

    MAX_COST_INCREASE_PCT = 30.0  # Block if > 30% cost increase
    MIN_QUALITY_IMPROVEMENT = 5.0  # Require >= 5% quality gain

    def estimate_impact(
        self,
        current_config: dict,
        new_config: dict,
        sample_queries: list[dict],
    ) -> CostImpactEstimate:
        """
        Compare cost and quality between current and new configurations.

        sample_queries: [{"query": str, "complexity": float, "intent": str}, ...]
        """
        current_cost = self._estimate_cost(current_config, sample_queries)
        new_cost = self._estimate_cost(new_config, sample_queries)

        cost_change = ((new_cost - current_cost) / current_cost * 100) if current_cost > 0 else 0
        quality_change = self._estimate_quality_change(current_config, new_config, sample_queries)

        # Cost per quality point
        cost_delta = new_cost - current_cost
        cpqp = cost_delta / quality_change if quality_change > 0 else float("inf")

        approved = True
        reason = ""

        if cost_change > self.MAX_COST_INCREASE_PCT:
            if quality_change < self.MIN_QUALITY_IMPROVEMENT:
                approved = False
                reason = (
                    f"Cost increase of {cost_change:.1f}% with only "
                    f"{quality_change:.1f}% quality improvement. "
                    f"Requires >= {self.MIN_QUALITY_IMPROVEMENT}% quality gain."
                )

        return CostImpactEstimate(
            current_daily_cost=current_cost,
            projected_daily_cost=new_cost,
            cost_change_pct=round(cost_change, 1),
            quality_change_pct=round(quality_change, 1),
            cost_per_quality_point=round(cpqp, 2),
            approved=approved,
            rejection_reason=reason,
        )

    def _estimate_cost(
        self, config: dict, queries: list[dict]
    ) -> float:
        """Estimate daily cost for a configuration across sample queries."""
        threshold_complex = config.get("complexity_threshold_complex", 0.7)
        threshold_medium = config.get("complexity_threshold_medium", 0.3)

        haiku_count = 0
        cascade_count = 0
        ensemble_count = 0

        for q in queries:
            complexity = q["complexity"]
            if complexity < threshold_medium:
                haiku_count += 1
            elif complexity < threshold_complex:
                cascade_count += 1
            else:
                ensemble_count += 1

        scale_factor = 1_000_000 / len(queries)  # Scale to 1M/day

        daily_cost = (
            haiku_count * 0.0003 * scale_factor
            + cascade_count * 0.003 * scale_factor
            + ensemble_count * 0.018 * scale_factor
        )

        return round(daily_cost, 2)

    def _estimate_quality_change(
        self, current: dict, new: dict, queries: list[dict]
    ) -> float:
        """Estimate quality improvement percentage."""
        # Simplified: more Sonnet usage = marginally higher quality
        current_sonnet_pct = sum(
            1 for q in queries
            if q["complexity"] >= current.get("complexity_threshold_complex", 0.7)
        ) / len(queries)

        new_sonnet_pct = sum(
            1 for q in queries
            if q["complexity"] >= new.get("complexity_threshold_complex", 0.7)
        ) / len(queries)

        # Diminishing returns: each 10% more Sonnet usage = ~2% quality
        delta_pct = (new_sonnet_pct - current_sonnet_pct) * 100
        quality_gain = delta_pct * 0.2  # 20% efficiency

        return round(quality_gain, 1)

Prevention

  • Add DeploymentCostGuardrail to CI/CD pipeline as a mandatory check
  • Set daily and per-query cost ceilings via Redis feature flags
  • Create CloudWatch dashboard showing cost-per-query trending
  • Require cost-impact analysis for any routing threshold changes

Scenario 5 — Embedding Drift Degrading Ensemble Quality

Problem Statement

Over four weeks, the Titan Embeddings V2 model on Bedrock was updated (silent minor version bump). The new embeddings are slightly different from the reference vectors used by the semantic cache and OpenSearch RAG index. The ensemble's RAG component now returns less relevant results, and the semantic cache hit rate drops from 28% to 19% while false positive rate increases.

Business impact: Gradual quality degradation. Customer satisfaction slowly drops. Cache cost-savings decrease by $900/day. No single incident — the decline is only visible in weekly metrics review.

Detection

graph TD
    A[Weekly Drift Check<br/>EventBridge scheduled] --> B[Lambda: embed 1000<br/>reference queries]
    B --> C[Compare to stored<br/>reference vectors]
    C --> D{Mean cosine drift<br/>> 0.05?}
    D -->|Yes| E[CloudWatch Alarm:<br/>EmbeddingDriftDetected]
    E --> F[SNS to ML team]

    D -->|No| G[Drift within<br/>normal range]

    H[Cache hit rate<br/>weekly trend] --> I{Hit rate dropped<br/>> 5 percentage points?}
    I -->|Yes| J[CloudWatch Alarm:<br/>CacheHitRateDecline]
    J --> F

    style E fill:#e76f51,stroke:#f4a261,color:#fff
    style J fill:#e76f51,stroke:#f4a261,color:#fff

Key metrics:

Metric Normal Alarm Source
embedding.drift_mean_cosine < 0.03 > 0.05 Weekly Lambda check
cache.hit_rate_l2 > 25% < 20% Redis cache metrics
cache.false_positive_rate < 0.5% > 1.0% User feedback + logs
rag.retrieval_relevance_score > 0.80 < 0.72 Offline evaluation

Root Cause Analysis

graph TD
    DRIFT[Embedding Drift<br/>Detected] --> Q1{Was there a Bedrock<br/>model version update?}
    Q1 -->|Yes - minor bump| RC1[Root Cause: Titan Embeddings<br/>minor version update changed<br/>vector space characteristics]
    Q1 -->|No| Q2{Did query distribution<br/>change significantly?}
    Q2 -->|Yes| RC2[Root Cause: New manga titles<br/>or seasonal trends shifted<br/>query distribution]
    Q2 -->|No| Q3{Are reference vectors<br/>too old?}
    Q3 -->|Yes| RC3[Root Cause: Reference set<br/>no longer representative<br/>of current query patterns]

    RC1 --> FIX1[Re-embed all cache entries<br/>and re-index OpenSearch<br/>with current model]

    style DRIFT fill:#e76f51,stroke:#f4a261,color:#fff
    style RC1 fill:#264653,stroke:#2a9d8f,color:#fff

Resolution

Step 1 — Confirm drift severity (15 minutes)

"""
Run a targeted drift check comparing current embeddings
against stored reference vectors for critical query categories.
"""
import json
import logging
import math

import boto3

logger = logging.getLogger(__name__)


def run_targeted_drift_check(
    bedrock_client,
    s3_client,
    reference_bucket: str = "mangaassist-ml-artifacts",
    reference_key: str = "embeddings/reference_vectors.json",
    model_id: str = "amazon.titan-embed-text-v2:0",
) -> dict:
    """
    Quick drift check on 100 critical reference queries.
    Returns drift statistics by category.
    """
    # Load reference vectors
    obj = s3_client.get_object(Bucket=reference_bucket, Key=reference_key)
    references = json.loads(obj["Body"].read())

    category_drifts: dict[str, list[float]] = {}

    for ref in references[:100]:  # Quick check on first 100
        text = ref["text"]
        category = ref.get("category", "general")
        ref_vec = ref["vector"]

        # Generate current embedding
        body = json.dumps({
            "inputText": text,
            "dimensions": 1024,
            "normalize": True,
        })
        response = bedrock_client.invoke_model(
            modelId=model_id,
            body=body,
            contentType="application/json",
            accept="application/json",
        )
        current_vec = json.loads(response["body"].read())["embedding"]

        # Cosine distance
        dot = sum(a * b for a, b in zip(ref_vec, current_vec))
        norm_a = math.sqrt(sum(a * a for a in ref_vec))
        norm_b = math.sqrt(sum(b * b for b in current_vec))
        distance = 1.0 - (dot / (norm_a * norm_b)) if norm_a and norm_b else 1.0

        category_drifts.setdefault(category, []).append(distance)

    results = {}
    for cat, drifts in category_drifts.items():
        results[cat] = {
            "mean_drift": round(sum(drifts) / len(drifts), 6),
            "max_drift": round(max(drifts), 6),
            "sample_count": len(drifts),
        }

    logger.info("Drift check results: %s", json.dumps(results, indent=2))
    return results

Step 2 — Re-index OpenSearch and rebuild cache (4-8 hours)

"""
Full re-embedding pipeline: re-index OpenSearch vectors and
rebuild the semantic cache with current Titan Embeddings.
"""
import json
import logging
import time
from dataclasses import dataclass

import boto3

logger = logging.getLogger(__name__)


@dataclass
class ReindexProgress:
    """Progress tracking for re-indexing."""
    total_documents: int = 0
    processed: int = 0
    failed: int = 0
    elapsed_seconds: float = 0.0

    @property
    def pct_complete(self) -> float:
        return (self.processed / self.total_documents * 100) if self.total_documents else 0.0


class EmbeddingReindexer:
    """
    Re-embeds all documents in OpenSearch and rebuilds the
    semantic cache after an embedding drift event.

    Runs as a Step Functions workflow to handle the multi-hour
    re-indexing process with checkpointing.
    """

    BATCH_SIZE = 100

    def __init__(
        self,
        bedrock_client=None,
        opensearch_client=None,
        redis_client=None,
        model_id: str = "amazon.titan-embed-text-v2:0",
    ):
        self.bedrock = bedrock_client or boto3.client(
            "bedrock-runtime", region_name="ap-northeast-1"
        )
        self.opensearch = opensearch_client
        self.redis = redis_client
        self.model_id = model_id

    def reindex_opensearch_batch(
        self, documents: list[dict], index_name: str
    ) -> int:
        """
        Re-embed and update a batch of OpenSearch documents.
        Returns count of successfully updated documents.
        """
        updated = 0
        for doc in documents:
            try:
                # Generate new embedding
                body = json.dumps({
                    "inputText": doc["text"],
                    "dimensions": 1024,
                    "normalize": True,
                })
                response = self.bedrock.invoke_model(
                    modelId=self.model_id,
                    body=body,
                    contentType="application/json",
                    accept="application/json",
                )
                new_vector = json.loads(response["body"].read())["embedding"]

                # Update OpenSearch document
                self.opensearch.update(
                    index=index_name,
                    id=doc["id"],
                    body={"doc": {"embedding": new_vector}},
                )
                updated += 1

            except Exception as e:
                logger.error("Failed to re-embed doc %s: %s", doc["id"], e)

        return updated

    def rebuild_semantic_cache(self) -> int:
        """
        Flush and rebuild the semantic cache with current embeddings.
        """
        if not self.redis:
            return 0

        # Get all cache entries
        cursor = 0
        rebuilt = 0
        while True:
            cursor, keys = self.redis.scan(cursor, match="sc:*", count=500)
            for key in keys:
                try:
                    query_text = self.redis.hget(key, "query_text")
                    if not query_text:
                        continue

                    # Re-embed
                    body = json.dumps({
                        "inputText": query_text.decode(),
                        "dimensions": 1024,
                        "normalize": True,
                    })
                    response = self.bedrock.invoke_model(
                        modelId=self.model_id,
                        body=body,
                        contentType="application/json",
                        accept="application/json",
                    )
                    new_vector = json.loads(response["body"].read())["embedding"]

                    # Update the embedding field in Redis
                    import struct
                    blob = struct.pack(f"{len(new_vector)}f", *new_vector)
                    self.redis.hset(key, "embedding", blob)
                    rebuilt += 1

                except Exception as e:
                    logger.error("Failed to rebuild cache key %s: %s", key, e)

            if cursor == 0:
                break

        logger.info("Rebuilt %d semantic cache entries", rebuilt)
        return rebuilt

    def update_reference_vectors(
        self, s3_client, reference_queries: list[dict],
        bucket: str = "mangaassist-ml-artifacts",
        key: str = "embeddings/reference_vectors.json",
    ) -> None:
        """Store new reference vectors after re-indexing."""
        new_references = []
        for rq in reference_queries:
            body = json.dumps({
                "inputText": rq["text"],
                "dimensions": 1024,
                "normalize": True,
            })
            response = self.bedrock.invoke_model(
                modelId=self.model_id,
                body=body,
                contentType="application/json",
                accept="application/json",
            )
            vector = json.loads(response["body"].read())["embedding"]
            new_references.append({
                "text": rq["text"],
                "category": rq.get("category", "general"),
                "vector": vector,
                "timestamp": int(time.time()),
            })

        s3_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=json.dumps(new_references),
            ContentType="application/json",
        )
        logger.info("Updated %d reference vectors in S3", len(new_references))

Prevention

  • Run weekly embedding drift checks via EventBridge + Lambda
  • Pin Titan Embeddings model version where possible
  • Alert on cache hit rate decline (leading indicator of drift)
  • Automate re-indexing pipeline to run when drift exceeds threshold
  • Store embedding model version in every cache entry and OpenSearch document

Cross-Scenario Decision Tree

graph TD
    START[Model Coordination<br/>Incident Detected] --> TYPE{What type of<br/>incident?}

    TYPE -->|Quality degradation| QUAL{Sudden or<br/>gradual?}
    QUAL -->|Sudden| CONFLICT[Scenario 1: Ensemble Conflict<br/>or Scenario 2: Router Misclass]
    QUAL -->|Gradual| DRIFT[Scenario 5: Embedding Drift]

    TYPE -->|Errors/timeouts| ERR{Which component<br/>failing?}
    ERR -->|Aggregation layer| TIMEOUT[Scenario 3: Aggregation Timeout]
    ERR -->|Single model| CB[Check Circuit Breaker<br/>See Scenario 3 resolution]

    TYPE -->|Cost anomaly| COST{Quality improved<br/>proportionally?}
    COST -->|No| EXPLOSION[Scenario 4: Cost Explosion]
    COST -->|Yes| EXPECTED[Expected cost increase<br/>validate with guardrail]

    CONFLICT --> FIX_MERGE[Fix: Switch merge strategy<br/>+ add conflict detection]
    DRIFT --> FIX_REINDEX[Fix: Re-index + rebuild cache<br/>+ update reference vectors]
    TIMEOUT --> FIX_DEGRADE[Fix: Graceful degradation<br/>+ circuit breaker]
    EXPLOSION --> FIX_REVERT[Fix: Revert threshold<br/>+ add cost guardrail]

    style START fill:#264653,stroke:#2a9d8f,color:#fff
    style TIMEOUT fill:#e76f51,stroke:#f4a261,color:#fff
    style EXPLOSION fill:#e76f51,stroke:#f4a261,color:#fff

Runbook Summary Table

Scenario First Response (< 5 min) Root Fix (< 4 hr) Prevention
1. Ensemble Conflict Switch merge strategy to REFINEMENT via Redis flag Add conflict detector + shared recommendation framework Monitor conflict_rate, default REFINEMENT for recommendations
2. Router Misclassification Enable JP cascade override via Redis flag Fix classification prompt + add language normalization Bilingual prompts, pin model version, regression tests
3. Aggregation Timeout Enable graceful degradation (partial results) Add per-model circuit breaker Per-model timeouts, Bedrock latency alarms, fallback paths
4. Cost Explosion Revert complexity threshold via Redis config Add deployment cost guardrail to CI/CD Cost ceilings, cost-per-query dashboard, mandatory cost review
5. Embedding Drift N/A (gradual, detected weekly) Re-index OpenSearch + rebuild cache + update references Weekly drift checks, cache hit rate monitoring, version pinning

Key Takeaways

  1. Feature flags in Redis enable instant mitigation — all five scenarios use Redis-based feature flags for immediate response without code deployment, cutting MTTR from hours to minutes.

  2. Graceful degradation is mandatory for ensemble systems — when one model in the ensemble fails, the system must still return a response using available models rather than failing entirely.

  3. Circuit breakers prevent cascade failures — when Sonnet is degraded, the circuit breaker automatically routes to Haiku, preventing timeout propagation across the entire system.

  4. Cost guardrails belong in the deployment pipeline — routing threshold changes should require cost-impact analysis before reaching production, not after finance discovers a budget overshoot.

  5. Embedding drift is the silent quality killer — unlike outages which are immediately visible, embedding drift degrades quality gradually over weeks; only proactive monitoring catches it.

  6. Language-aware routing prevents systematic bias — Japanese queries are systematically underscored for complexity by English-trained classifiers; language-specific calibration offsets fix this.

  7. Conflict detection improves merge quality — when ensemble models disagree, the system should detect the conflict and switch to a refinement strategy rather than naively concatenating divergent responses.