LOCAL PREVIEW View on GitHub

Model Ensemble Aggregation and Selection Frameworks

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Field Value
Certification AWS AI Practitioner (AIP-C01)
Domain 2 — Fundamentals of Generative AI
Task 2.1 — Explain the concepts and techniques for building GenAI applications
Skill 2.1.4 — Create sophisticated model coordination systems to optimize performance across multiple capabilities (specialized FMs, custom aggregation logic for model ensembles, model selection frameworks)
Focus Ensemble aggregation patterns, model selection routing, and output merging strategies

1. Mindmap — Ensemble, Aggregation, and Selection

mindmap
  root((Model Coordination<br/>Systems))
    Ensemble Patterns
      Voting
        Majority Vote
        Weighted Vote
        Unanimous Agreement
      Averaging
        Simple Average
        Weighted Average
        Confidence-Weighted
      Stacking
        Meta-Learner
        Cross-Validation Folds
        Feature Augmentation
      Mixture of Experts
        Gating Network
        Expert Specialization
        Sparse Activation
    Selection Frameworks
      Cost-Based Routing
        Token Budget Allocation
        Tier Classification
        Spend Caps
      Quality-Based Routing
        Complexity Scoring
        Domain Detection
        Confidence Thresholds
      Latency Constraints
        SLA Tiers
        Timeout Fallback
        Parallel Dispatch
      Hybrid Routing
        Multi-Objective Optimization
        Pareto Frontier
        Dynamic Weighting
    Aggregation Logic
      Output Merging
        Concatenation
        Summarization
        Ranked Selection
      Conflict Resolution
        Confidence Comparison
        Source Priority
        Human Escalation
      Quality Scoring
        Coherence Check
        Factual Consistency
        Relevance Score
    Production Concerns
      Caching
        Response Cache
        Embedding Cache
        Routing Cache
      Monitoring
        Model Accuracy Drift
        Cost Tracking
        Latency Percentiles
      Scaling
        Horizontal Dispatch
        Rate Limiting
        Backpressure

2. Model Ensemble Patterns

2.1 Pattern Overview

Model ensembles combine outputs from multiple foundation models to produce results that are more accurate, robust, or cost-efficient than any single model alone. In the MangaAssist context, the two primary models — Sonnet (high quality, higher cost) and Haiku (fast, economical) — form a natural two-tier ensemble.

2.2 Voting Ensembles

Voting ensembles query multiple models and select the final answer based on agreement.

Voting Type Mechanism MangaAssist Use Case
Majority Vote Pick the answer chosen by >50% of models Product classification (genre tagging)
Weighted Vote Each model's vote is weighted by confidence or quality score Manga recommendation ranking
Unanimous Agreement Require all models to agree; escalate otherwise Content moderation decisions

When to use voting in MangaAssist: Voting works best for discrete classification tasks — genre tagging, sentiment detection, or content safety flags. For free-form text generation (the majority of chatbot responses), voting is less applicable since outputs are rarely identical.

2.3 Weighted Averaging

Weighted averaging blends numerical or scored outputs from multiple models.

Averaging Type Formula Best For
Simple Average (score_A + score_B) / 2 Equal-trust scenarios
Weighted Average w_A * score_A + w_B * score_B When one model is known to be better for this task
Confidence-Weighted conf_A * score_A + conf_B * score_B Dynamic weighting per request

MangaAssist application: When ranking manga recommendations, Sonnet may produce relevance scores for top-10 results and Haiku may produce its own ranking. A weighted average (e.g., 0.7 * Sonnet + 0.3 * Haiku) produces a blended ranking that leverages Sonnet's quality while incorporating Haiku's perspective.

2.4 Stacking (Meta-Learner)

Stacking uses a secondary model (or lightweight classifier) that takes the outputs of base models as features and produces the final prediction.

┌──────────┐    ┌──────────┐
│  Sonnet   │    │  Haiku   │
│  Output   │    │  Output  │
└─────┬─────┘    └─────┬────┘
      │                │
      ▼                ▼
  ┌────────────────────────┐
  │      Meta-Learner      │
  │  (lightweight model or │
  │   rule-based combiner) │
  └───────────┬────────────┘
              │
              ▼
       Final Response

In MangaAssist, the meta-learner could be a simple rule engine hosted on ECS Fargate that selects the better response based on length, relevance keywords, and Japanese language correctness.

2.5 Mixture of Experts (MoE)

MoE routes each request to a specialized "expert" model based on the request characteristics.

Component Role MangaAssist Example
Gating Network Classifies the request and selects experts Intent classifier on ECS
Expert Models Specialized for specific domains Sonnet for creative writing, Haiku for FAQ
Sparse Activation Only 1-2 experts handle each request Only the selected model is invoked (saves cost)

Key advantage: MoE avoids calling all models for every request. For MangaAssist at 1M messages/day, this means most simple FAQ queries go to Haiku ($0.25/1M input tokens) while complex creative queries go to Sonnet ($3/1M input tokens), producing massive cost savings.


3. Model Selection Frameworks

3.1 Cost-Based Routing

Cost-based routing directs requests to the cheapest model that can handle them adequately.

Request Tier Model Est. Cost per Request Example Queries
Simple (< 100 tokens, FAQ) Haiku ~$0.00005 "What are your store hours?"
Medium (100-500 tokens, product info) Haiku ~$0.0002 "Tell me about One Piece Volume 104"
Complex (> 500 tokens, creative/analysis) Sonnet ~$0.005 "Write a review comparing Berserk and Vinland Saga art styles"
Critical (safety, compliance) Sonnet ~$0.008 Content moderation, refund policy decisions

Daily cost at 1M messages (assuming 70% simple, 20% medium, 8% complex, 2% critical):

Tier Volume Cost/Request Daily Cost
Simple 700,000 $0.00005 $35.00
Medium 200,000 $0.0002 $40.00
Complex 80,000 $0.005 $400.00
Critical 20,000 $0.008 $160.00
Total 1,000,000 $635.00

Contrast with sending everything to Sonnet: 1M * $0.005 avg = $5,000/day. Cost-based routing saves ~87%.

3.2 Quality-Based Routing

Quality-based routing uses request complexity analysis to select the model most likely to produce a satisfactory answer.

Complexity Scoring Dimensions:

Dimension Low (Haiku) Medium (Haiku+verify) High (Sonnet)
Token count < 50 input tokens 50-200 tokens > 200 tokens
Language mix Single language Some mixed Heavy JP/EN code-switch
Domain specificity General FAQ Product catalog Creative, cultural, comparative
Reasoning depth Factual lookup Single-step inference Multi-step reasoning
Output format Short text Structured list Long-form prose

3.3 Latency-Constrained Routing

With a 3-second SLA, latency is a hard constraint.

Model Avg Latency (p50) p95 Latency p99 Latency
Haiku 200ms 500ms 800ms
Sonnet 800ms 1,800ms 2,500ms
Sonnet (long output) 1,500ms 2,800ms 3,500ms

Routing rules under latency constraints: 1. If estimated output > 500 tokens and current Sonnet p95 > 2.5s, route to Haiku 2. If real-time latency monitor shows Sonnet degradation, auto-failover to Haiku 3. For WebSocket streaming responses, start with Haiku for instant first token, then optionally upgrade

3.4 Hybrid Multi-Objective Routing

In production, routing decisions balance cost, quality, and latency simultaneously.

Score(model, request) = w_quality * Q(model, request)
                      + w_cost    * (1 - C(model, request) / C_max)
                      + w_latency * (1 - L(model, request) / L_max)

Where weights are tunable: - Cost-sensitive mode: w_quality=0.3, w_cost=0.5, w_latency=0.2 - Quality-first mode: w_quality=0.6, w_cost=0.1, w_latency=0.3 - Latency-critical mode: w_quality=0.2, w_cost=0.2, w_latency=0.6


4. Aggregation Logic — Combining Sonnet + Haiku Outputs

4.1 When to Aggregate vs. Select

Strategy When to Use MangaAssist Example
Select one Outputs are complete alternatives Chatbot response generation
Merge/concatenate Outputs cover different aspects Haiku provides facts, Sonnet adds commentary
Rank and pick Both produce lists Recommendation ranking
Summarize both Need consensus view Product comparison from two perspectives
Conflict resolution Outputs contradict Sonnet says "in stock," Haiku says "out of stock"

4.2 Aggregation Strategies for Multilingual Outputs

MangaAssist handles Japanese and English. Aggregation must account for:

Challenge Impact Mitigation
Character encoding Garbled text if UTF-8 not enforced Enforce UTF-8 at every boundary
Token counting JP characters use more tokens Use byte-pair encoding aware counting
Mixed-script merging EN/JP sentences interleaved awkwardly Detect primary language, merge in that language
Honorific consistency One model uses -san, other uses -sama Normalize to user preference from session

4.3 Output Quality Scoring for Aggregation

When aggregating outputs, each response needs a quality score to weight its contribution.

Quality Dimension Measurement Weight
Relevance Cosine similarity between query embedding and response embedding 0.30
Coherence Perplexity score or language model confidence 0.25
Completeness Does it answer all parts of the query? (checklist match) 0.20
Factual grounding Can claims be traced to RAG sources? 0.15
Tone appropriateness Matches expected formality level 0.10

5. Production Python Code

5.1 ModelEnsemble Class

"""
ModelEnsemble: Coordinates multiple FM invocations and combines results.
Designed for MangaAssist's Sonnet + Haiku two-model ensemble on AWS Bedrock.
"""

import asyncio
import time
import json
import hashlib
import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional

import boto3

logger = logging.getLogger(__name__)


class EnsembleStrategy(Enum):
    VOTING = "voting"
    WEIGHTED_AVERAGE = "weighted_average"
    STACKING = "stacking"
    MIXTURE_OF_EXPERTS = "mixture_of_experts"
    PARALLEL_BEST = "parallel_best"


@dataclass
class ModelResponse:
    """Response from a single model invocation."""
    model_id: str
    content: str
    confidence: float
    latency_ms: float
    input_tokens: int
    output_tokens: int
    cost_usd: float
    metadata: dict = field(default_factory=dict)


@dataclass
class EnsembleResult:
    """Combined result from the ensemble."""
    final_content: str
    strategy_used: EnsembleStrategy
    model_responses: list
    total_latency_ms: float
    total_cost_usd: float
    quality_score: float
    metadata: dict = field(default_factory=dict)


class ModelEnsemble:
    """
    Orchestrates ensemble inference across Sonnet and Haiku on AWS Bedrock.

    Supports multiple ensemble strategies:
    - Voting: Each model votes, majority wins (for classification tasks).
    - Weighted Average: Blend numeric scores from multiple models.
    - Stacking: Feed base model outputs into a meta-learner.
    - Mixture of Experts: Route to the best model per request.
    - Parallel Best: Query both, pick the best response by quality score.
    """

    # Bedrock model identifiers
    MODEL_SONNET = "anthropic.claude-3-sonnet-20240229-v1:0"
    MODEL_HAIKU = "anthropic.claude-3-haiku-20240307-v1:0"

    # Cost per 1M tokens (USD)
    COST_TABLE = {
        MODEL_SONNET: {"input": 3.00, "output": 15.00},
        MODEL_HAIKU: {"input": 0.25, "output": 1.25},
    }

    def __init__(
        self,
        bedrock_client=None,
        redis_client=None,
        default_strategy: EnsembleStrategy = EnsembleStrategy.PARALLEL_BEST,
        cache_ttl_seconds: int = 300,
    ):
        self.bedrock = bedrock_client or boto3.client(
            "bedrock-runtime", region_name="ap-northeast-1"
        )
        self.redis = redis_client
        self.default_strategy = default_strategy
        self.cache_ttl = cache_ttl_seconds
        self._invocation_count = {"sonnet": 0, "haiku": 0}

    # ------------------------------------------------------------------
    # Core ensemble entry point
    # ------------------------------------------------------------------

    async def ensemble_invoke(
        self,
        prompt: str,
        strategy: Optional[EnsembleStrategy] = None,
        models: Optional[list] = None,
        weights: Optional[dict] = None,
        max_tokens: int = 1024,
        temperature: float = 0.3,
    ) -> EnsembleResult:
        """
        Run the ensemble pipeline with the specified strategy.

        Args:
            prompt: The user query / assembled prompt.
            strategy: Ensemble strategy to use (defaults to instance default).
            models: List of model IDs to include. Defaults to [Sonnet, Haiku].
            weights: Model weights for weighted strategies. e.g. {MODEL_SONNET: 0.7}.
            max_tokens: Max output tokens per model.
            temperature: Sampling temperature.

        Returns:
            EnsembleResult with the final merged response and metadata.
        """
        strategy = strategy or self.default_strategy
        models = models or [self.MODEL_SONNET, self.MODEL_HAIKU]
        weights = weights or {self.MODEL_SONNET: 0.7, self.MODEL_HAIKU: 0.3}

        # Check cache first
        cached = self._check_cache(prompt, strategy)
        if cached:
            logger.info("Ensemble cache hit for prompt hash")
            return cached

        start_time = time.monotonic()

        if strategy == EnsembleStrategy.VOTING:
            result = await self._voting_ensemble(
                prompt, models, max_tokens, temperature
            )
        elif strategy == EnsembleStrategy.WEIGHTED_AVERAGE:
            result = await self._weighted_average_ensemble(
                prompt, models, weights, max_tokens, temperature
            )
        elif strategy == EnsembleStrategy.STACKING:
            result = await self._stacking_ensemble(
                prompt, models, max_tokens, temperature
            )
        elif strategy == EnsembleStrategy.MIXTURE_OF_EXPERTS:
            result = await self._moe_ensemble(
                prompt, max_tokens, temperature
            )
        elif strategy == EnsembleStrategy.PARALLEL_BEST:
            result = await self._parallel_best_ensemble(
                prompt, models, max_tokens, temperature
            )
        else:
            raise ValueError(f"Unknown ensemble strategy: {strategy}")

        result.total_latency_ms = (time.monotonic() - start_time) * 1000
        result.strategy_used = strategy

        # Store in cache
        self._store_cache(prompt, strategy, result)

        logger.info(
            "Ensemble complete: strategy=%s latency=%.0fms cost=$%.6f",
            strategy.value,
            result.total_latency_ms,
            result.total_cost_usd,
        )
        return result

    # ------------------------------------------------------------------
    # Strategy implementations
    # ------------------------------------------------------------------

    async def _voting_ensemble(
        self, prompt, models, max_tokens, temperature
    ) -> EnsembleResult:
        """Query all models; pick the majority-agreed answer (classification)."""
        responses = await self._invoke_all_parallel(
            prompt, models, max_tokens, temperature
        )

        # For classification: extract the "label" from each response
        votes = {}
        for resp in responses:
            label = self._extract_classification_label(resp.content)
            votes[label] = votes.get(label, 0) + 1

        # Majority vote
        winning_label = max(votes, key=votes.get)
        winning_response = next(
            r for r in responses
            if self._extract_classification_label(r.content) == winning_label
        )

        total_cost = sum(r.cost_usd for r in responses)

        return EnsembleResult(
            final_content=winning_response.content,
            strategy_used=EnsembleStrategy.VOTING,
            model_responses=responses,
            total_latency_ms=0.0,
            total_cost_usd=total_cost,
            quality_score=votes[winning_label] / len(responses),
            metadata={"votes": votes, "winning_label": winning_label},
        )

    async def _weighted_average_ensemble(
        self, prompt, models, weights, max_tokens, temperature
    ) -> EnsembleResult:
        """Blend scored outputs using model-specific weights."""
        responses = await self._invoke_all_parallel(
            prompt, models, max_tokens, temperature
        )

        # Score each response on quality dimensions
        scored = []
        for resp in responses:
            quality = self._score_response_quality(prompt, resp.content)
            weight = weights.get(resp.model_id, 0.5)
            scored.append((resp, quality, weight))

        # Weighted selection: pick the response with highest weighted score
        best_resp, best_quality, _ = max(
            scored, key=lambda x: x[1] * x[2]
        )
        total_cost = sum(r.cost_usd for r in responses)

        return EnsembleResult(
            final_content=best_resp.content,
            strategy_used=EnsembleStrategy.WEIGHTED_AVERAGE,
            model_responses=responses,
            total_latency_ms=0.0,
            total_cost_usd=total_cost,
            quality_score=best_quality,
            metadata={
                "scores": [
                    {
                        "model": r.model_id,
                        "quality": q,
                        "weight": w,
                        "weighted_score": q * w,
                    }
                    for r, q, w in scored
                ]
            },
        )

    async def _stacking_ensemble(
        self, prompt, models, max_tokens, temperature
    ) -> EnsembleResult:
        """
        Two-stage stacking: base models generate responses, then a
        meta-learner (Haiku, cheap) picks or synthesizes the best answer.
        """
        # Stage 1: Base model responses
        base_responses = await self._invoke_all_parallel(
            prompt, models, max_tokens, temperature
        )

        # Stage 2: Meta-learner synthesizes
        meta_prompt = self._build_meta_learner_prompt(prompt, base_responses)
        meta_response = await self._invoke_single(
            self.MODEL_HAIKU, meta_prompt, max_tokens, temperature=0.1
        )

        total_cost = sum(r.cost_usd for r in base_responses) + meta_response.cost_usd
        all_responses = base_responses + [meta_response]

        return EnsembleResult(
            final_content=meta_response.content,
            strategy_used=EnsembleStrategy.STACKING,
            model_responses=all_responses,
            total_latency_ms=0.0,
            total_cost_usd=total_cost,
            quality_score=self._score_response_quality(
                prompt, meta_response.content
            ),
            metadata={"meta_model": self.MODEL_HAIKU},
        )

    async def _moe_ensemble(
        self, prompt, max_tokens, temperature
    ) -> EnsembleResult:
        """
        Mixture of Experts: classify the request, route to the best model.
        Only one model is invoked — this is the most cost-efficient strategy.
        """
        # Gate: classify the request complexity
        complexity = self._classify_request_complexity(prompt)

        if complexity in ("simple", "medium"):
            selected_model = self.MODEL_HAIKU
        else:
            selected_model = self.MODEL_SONNET

        response = await self._invoke_single(
            selected_model, prompt, max_tokens, temperature
        )

        return EnsembleResult(
            final_content=response.content,
            strategy_used=EnsembleStrategy.MIXTURE_OF_EXPERTS,
            model_responses=[response],
            total_latency_ms=0.0,
            total_cost_usd=response.cost_usd,
            quality_score=response.confidence,
            metadata={
                "complexity": complexity,
                "routed_to": selected_model,
            },
        )

    async def _parallel_best_ensemble(
        self, prompt, models, max_tokens, temperature
    ) -> EnsembleResult:
        """
        Query all models in parallel. Pick the highest-quality response.
        Use when quality matters most and budget allows.
        """
        responses = await self._invoke_all_parallel(
            prompt, models, max_tokens, temperature
        )

        # Score each and pick the best
        scored = [
            (resp, self._score_response_quality(prompt, resp.content))
            for resp in responses
        ]
        best_resp, best_score = max(scored, key=lambda x: x[1])
        total_cost = sum(r.cost_usd for r in responses)

        return EnsembleResult(
            final_content=best_resp.content,
            strategy_used=EnsembleStrategy.PARALLEL_BEST,
            model_responses=responses,
            total_latency_ms=0.0,
            total_cost_usd=total_cost,
            quality_score=best_score,
            metadata={
                "scores": {r.model_id: s for r, s in scored},
                "selected_model": best_resp.model_id,
            },
        )

    # ------------------------------------------------------------------
    # Model invocation helpers
    # ------------------------------------------------------------------

    async def _invoke_single(
        self, model_id: str, prompt: str, max_tokens: int, temperature: float
    ) -> ModelResponse:
        """Invoke a single Bedrock model and return a ModelResponse."""
        start = time.monotonic()

        body = json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": max_tokens,
            "temperature": temperature,
            "messages": [{"role": "user", "content": prompt}],
        })

        loop = asyncio.get_event_loop()
        response = await loop.run_in_executor(
            None,
            lambda: self.bedrock.invoke_model(
                modelId=model_id,
                contentType="application/json",
                accept="application/json",
                body=body,
            ),
        )

        result = json.loads(response["body"].read())
        latency = (time.monotonic() - start) * 1000

        content = result["content"][0]["text"]
        input_tokens = result["usage"]["input_tokens"]
        output_tokens = result["usage"]["output_tokens"]
        cost = self._calculate_cost(model_id, input_tokens, output_tokens)

        # Track invocation counts
        short_name = "sonnet" if "sonnet" in model_id else "haiku"
        self._invocation_count[short_name] += 1

        return ModelResponse(
            model_id=model_id,
            content=content,
            confidence=self._estimate_confidence(result),
            latency_ms=latency,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            cost_usd=cost,
        )

    async def _invoke_all_parallel(
        self, prompt: str, models: list, max_tokens: int, temperature: float
    ) -> list:
        """Invoke all models concurrently and collect responses."""
        tasks = [
            self._invoke_single(model_id, prompt, max_tokens, temperature)
            for model_id in models
        ]
        return await asyncio.gather(*tasks)

    # ------------------------------------------------------------------
    # Scoring and classification helpers
    # ------------------------------------------------------------------

    def _classify_request_complexity(self, prompt: str) -> str:
        """
        Fast, heuristic-based complexity classifier for MoE gating.
        No model invocation needed — runs in <1ms.
        """
        token_estimate = len(prompt.split())
        has_japanese = any("\u3040" <= ch <= "\u9fff" for ch in prompt)
        multi_sentence = prompt.count("?") > 1 or prompt.count("。") > 1
        creative_keywords = [
            "compare", "analyze", "review", "recommend", "explain why",
            "比較", "分析", "レビュー", "おすすめ", "なぜ",
        ]
        is_creative = any(kw in prompt.lower() for kw in creative_keywords)

        score = 0
        if token_estimate > 50:
            score += 1
        if token_estimate > 150:
            score += 1
        if has_japanese:
            score += 1
        if multi_sentence:
            score += 1
        if is_creative:
            score += 2

        if score <= 1:
            return "simple"
        elif score <= 3:
            return "medium"
        else:
            return "complex"

    def _score_response_quality(self, prompt: str, response: str) -> float:
        """
        Lightweight quality scoring for response selection.
        Returns a 0.0 to 1.0 score.
        """
        score = 0.0

        # Length appropriateness (not too short, not excessively long)
        resp_len = len(response)
        if 50 < resp_len < 2000:
            score += 0.25
        elif resp_len >= 2000:
            score += 0.15

        # Contains relevant keywords from the prompt
        prompt_words = set(prompt.lower().split())
        response_words = set(response.lower().split())
        overlap = len(prompt_words & response_words) / max(len(prompt_words), 1)
        score += min(overlap * 0.5, 0.25)

        # Structural quality indicators
        if any(marker in response for marker in ["1.", "- ", "* ", "**"]):
            score += 0.15  # Has structure / formatting

        # Japanese content quality (if prompt was Japanese)
        has_jp_prompt = any("\u3040" <= ch <= "\u9fff" for ch in prompt)
        has_jp_response = any("\u3040" <= ch <= "\u9fff" for ch in response)
        if has_jp_prompt and has_jp_response:
            score += 0.20  # Responded in the appropriate language

        # Coherence: no obvious truncation
        if response.rstrip().endswith((".", "!", "?", "。", "!", "?")):
            score += 0.15  # Ends with proper punctuation

        return min(score, 1.0)

    def _extract_classification_label(self, content: str) -> str:
        """Extract a classification label from a model response."""
        # Simple extraction: take the first line, strip whitespace
        first_line = content.strip().split("\n")[0].strip()
        return first_line.lower()

    def _estimate_confidence(self, bedrock_result: dict) -> float:
        """Estimate model confidence from Bedrock response metadata."""
        stop_reason = bedrock_result.get("stop_reason", "end_turn")
        if stop_reason == "end_turn":
            return 0.9
        elif stop_reason == "max_tokens":
            return 0.5  # Truncated — lower confidence
        return 0.7

    # ------------------------------------------------------------------
    # Cost calculation
    # ------------------------------------------------------------------

    def _calculate_cost(
        self, model_id: str, input_tokens: int, output_tokens: int
    ) -> float:
        """Calculate USD cost for a single invocation."""
        rates = self.COST_TABLE.get(model_id, {"input": 3.0, "output": 15.0})
        input_cost = (input_tokens / 1_000_000) * rates["input"]
        output_cost = (output_tokens / 1_000_000) * rates["output"]
        return input_cost + output_cost

    # ------------------------------------------------------------------
    # Caching
    # ------------------------------------------------------------------

    def _cache_key(self, prompt: str, strategy: EnsembleStrategy) -> str:
        """Generate a deterministic cache key."""
        raw = f"{strategy.value}:{prompt}"
        return f"ensemble:{hashlib.sha256(raw.encode()).hexdigest()[:16]}"

    def _check_cache(
        self, prompt: str, strategy: EnsembleStrategy
    ) -> Optional[EnsembleResult]:
        """Check Redis cache for a previous ensemble result."""
        if not self.redis:
            return None
        key = self._cache_key(prompt, strategy)
        cached = self.redis.get(key)
        if cached:
            data = json.loads(cached)
            return EnsembleResult(**data)
        return None

    def _store_cache(
        self, prompt: str, strategy: EnsembleStrategy, result: EnsembleResult
    ) -> None:
        """Store ensemble result in Redis cache."""
        if not self.redis:
            return
        key = self._cache_key(prompt, strategy)
        # Store only essential fields for cache
        cache_data = json.dumps({
            "final_content": result.final_content,
            "strategy_used": result.strategy_used.value,
            "model_responses": [],
            "total_latency_ms": result.total_latency_ms,
            "total_cost_usd": result.total_cost_usd,
            "quality_score": result.quality_score,
            "metadata": {"cached": True},
        })
        self.redis.setex(key, self.cache_ttl, cache_data)

    # ------------------------------------------------------------------
    # Meta-learner prompt builder for stacking
    # ------------------------------------------------------------------

    def _build_meta_learner_prompt(
        self, original_prompt: str, base_responses: list
    ) -> str:
        """Build a prompt for the meta-learner in the stacking strategy."""
        responses_text = "\n\n".join(
            f"--- Response from {r.model_id} (confidence: {r.confidence:.2f}) ---\n{r.content}"
            for r in base_responses
        )
        return (
            f"You are a response quality judge. Given the original question and "
            f"multiple candidate responses, synthesize the best possible answer.\n\n"
            f"Original question: {original_prompt}\n\n"
            f"Candidate responses:\n{responses_text}\n\n"
            f"Provide the best synthesized answer. If one response is clearly "
            f"superior, use it directly. If both have strengths, combine them."
        )

    def get_invocation_stats(self) -> dict:
        """Return cumulative invocation counts and estimated costs."""
        return {
            "invocation_counts": dict(self._invocation_count),
            "total_invocations": sum(self._invocation_count.values()),
        }

5.2 SelectionFramework Class

"""
SelectionFramework: Multi-objective model selection for MangaAssist.
Routes each request to the optimal model based on cost, quality, and latency.
"""

import time
import logging
from dataclasses import dataclass
from enum import Enum
from typing import Optional

logger = logging.getLogger(__name__)


class RoutingMode(Enum):
    COST_OPTIMIZED = "cost_optimized"
    QUALITY_FIRST = "quality_first"
    LATENCY_CRITICAL = "latency_critical"
    BALANCED = "balanced"


@dataclass
class RoutingDecision:
    """The result of a model selection decision."""
    selected_model: str
    routing_mode: RoutingMode
    complexity_score: float
    estimated_cost_usd: float
    estimated_latency_ms: float
    confidence: float
    reason: str
    fallback_model: Optional[str] = None


@dataclass
class LatencyBudget:
    """Latency constraints for routing decisions."""
    total_budget_ms: float = 3000.0  # MangaAssist 3-second SLA
    rag_retrieval_ms: float = 200.0
    preprocessing_ms: float = 100.0
    postprocessing_ms: float = 100.0

    @property
    def model_budget_ms(self) -> float:
        """Time budget remaining for model inference."""
        overhead = self.rag_retrieval_ms + self.preprocessing_ms + self.postprocessing_ms
        return self.total_budget_ms - overhead  # Typically ~2600ms


class SelectionFramework:
    """
    Routes requests to the optimal Bedrock model based on multi-objective
    scoring across cost, quality, and latency dimensions.

    Architecture:
    - Request arrives at ECS Fargate orchestrator
    - SelectionFramework analyzes request characteristics
    - Returns a RoutingDecision with selected model + fallback
    - Orchestrator invokes the selected model via Bedrock
    """

    MODEL_SONNET = "anthropic.claude-3-sonnet-20240229-v1:0"
    MODEL_HAIKU = "anthropic.claude-3-haiku-20240307-v1:0"

    # Model profiles: latency (p50) in ms, cost per avg request
    MODEL_PROFILES = {
        MODEL_SONNET: {
            "latency_p50_ms": 800,
            "latency_p95_ms": 1800,
            "latency_p99_ms": 2500,
            "cost_per_1k_input": 0.003,
            "cost_per_1k_output": 0.015,
            "quality_baseline": 0.92,
        },
        MODEL_HAIKU: {
            "latency_p50_ms": 200,
            "latency_p95_ms": 500,
            "latency_p99_ms": 800,
            "quality_baseline": 0.78,
            "cost_per_1k_input": 0.00025,
            "cost_per_1k_output": 0.00125,
        },
    }

    # Weight presets for each routing mode
    MODE_WEIGHTS = {
        RoutingMode.COST_OPTIMIZED: {
            "quality": 0.25, "cost": 0.55, "latency": 0.20
        },
        RoutingMode.QUALITY_FIRST: {
            "quality": 0.60, "cost": 0.10, "latency": 0.30
        },
        RoutingMode.LATENCY_CRITICAL: {
            "quality": 0.20, "cost": 0.15, "latency": 0.65
        },
        RoutingMode.BALANCED: {
            "quality": 0.40, "cost": 0.30, "latency": 0.30
        },
    }

    def __init__(
        self,
        default_mode: RoutingMode = RoutingMode.BALANCED,
        latency_budget: Optional[LatencyBudget] = None,
        redis_client=None,
    ):
        self.default_mode = default_mode
        self.latency_budget = latency_budget or LatencyBudget()
        self.redis = redis_client
        self._routing_history: list = []
        self._live_latency = {
            self.MODEL_SONNET: {"p50": 800, "p95": 1800},
            self.MODEL_HAIKU: {"p50": 200, "p95": 500},
        }

    def select_model(
        self,
        prompt: str,
        mode: Optional[RoutingMode] = None,
        user_tier: str = "standard",
        session_context: Optional[dict] = None,
    ) -> RoutingDecision:
        """
        Analyze the request and select the optimal model.

        Args:
            prompt: User query text.
            mode: Override routing mode (uses default if not specified).
            user_tier: User subscription tier ("free", "standard", "premium").
            session_context: Optional session metadata from DynamoDB.

        Returns:
            RoutingDecision with the selected model and reasoning.
        """
        mode = mode or self.default_mode

        # Step 1: Classify request complexity
        complexity = self._compute_complexity_score(prompt, session_context)

        # Step 2: Apply user tier overrides
        if user_tier == "premium":
            mode = RoutingMode.QUALITY_FIRST
        elif user_tier == "free":
            mode = RoutingMode.COST_OPTIMIZED

        # Step 3: Score each model
        weights = self.MODE_WEIGHTS[mode]
        model_scores = {}

        for model_id, profile in self.MODEL_PROFILES.items():
            q_score = self._quality_score(model_id, complexity)
            c_score = self._cost_score(model_id, prompt)
            l_score = self._latency_score(model_id)

            total = (
                weights["quality"] * q_score
                + weights["cost"] * c_score
                + weights["latency"] * l_score
            )
            model_scores[model_id] = {
                "total": total,
                "quality": q_score,
                "cost": c_score,
                "latency": l_score,
            }

        # Step 4: Select the winning model
        selected = max(model_scores, key=lambda m: model_scores[m]["total"])
        fallback = min(model_scores, key=lambda m: model_scores[m]["total"])

        # Step 5: Latency hard constraint check
        model_budget = self.latency_budget.model_budget_ms
        live_p95 = self._live_latency.get(selected, {}).get("p95", 2000)
        if live_p95 > model_budget:
            logger.warning(
                "Selected model %s p95 latency %dms exceeds budget %dms, "
                "falling back to %s",
                selected, live_p95, model_budget, fallback,
            )
            selected, fallback = fallback, selected

        # Step 6: Build decision
        est_cost = self._estimate_request_cost(selected, prompt)
        est_latency = self._live_latency.get(selected, {}).get("p50", 500)

        scores = model_scores[selected]
        decision = RoutingDecision(
            selected_model=selected,
            routing_mode=mode,
            complexity_score=complexity,
            estimated_cost_usd=est_cost,
            estimated_latency_ms=est_latency,
            confidence=scores["total"],
            reason=self._build_reason(selected, scores, complexity, mode),
            fallback_model=fallback,
        )

        self._routing_history.append(decision)
        self._emit_routing_metric(decision)

        return decision

    # ------------------------------------------------------------------
    # Scoring functions
    # ------------------------------------------------------------------

    def _compute_complexity_score(
        self, prompt: str, session_context: Optional[dict] = None
    ) -> float:
        """
        Compute a 0.0 to 1.0 complexity score for the request.
        Higher complexity favors Sonnet; lower favors Haiku.
        """
        score = 0.0
        token_est = len(prompt.split())

        # Token length factor
        if token_est > 100:
            score += 0.2
        if token_est > 250:
            score += 0.15

        # Japanese content factor
        jp_chars = sum(1 for ch in prompt if "\u3040" <= ch <= "\u9fff")
        if jp_chars > 10:
            score += 0.15

        # Multi-turn conversation depth
        if session_context:
            turn_count = session_context.get("turn_count", 0)
            if turn_count > 5:
                score += 0.1
            if turn_count > 10:
                score += 0.1

        # Creative / analytical keywords
        creative_indicators = [
            "compare", "analyze", "explain why", "write a review",
            "比較", "分析", "なぜ", "レビュー", "おすすめ",
        ]
        if any(kw in prompt.lower() for kw in creative_indicators):
            score += 0.25

        # Explicit format requests (lists, tables, structured output)
        format_indicators = ["list", "table", "step by step", "一覧", "表"]
        if any(kw in prompt.lower() for kw in format_indicators):
            score += 0.1

        return min(score, 1.0)

    def _quality_score(self, model_id: str, complexity: float) -> float:
        """Quality score: higher-capability models score better on complex tasks."""
        baseline = self.MODEL_PROFILES[model_id]["quality_baseline"]
        # Sonnet's advantage grows with complexity
        if "sonnet" in model_id:
            return baseline + (complexity * 0.08)
        # Haiku is fine for simple tasks but degrades on complex ones
        return baseline - (complexity * 0.15)

    def _cost_score(self, model_id: str, prompt: str) -> float:
        """Cost score: cheaper models score higher (inverted cost)."""
        est_cost = self._estimate_request_cost(model_id, prompt)
        # Normalize: Haiku ~$0.00005, Sonnet ~$0.005
        # Use log scale for fair comparison
        if est_cost <= 0:
            return 1.0
        # Lower cost = higher score
        max_cost = 0.01  # $0.01 ceiling
        return max(0.0, 1.0 - (est_cost / max_cost))

    def _latency_score(self, model_id: str) -> float:
        """Latency score based on live metrics."""
        budget = self.latency_budget.model_budget_ms
        live_p50 = self._live_latency.get(model_id, {}).get("p50", 1000)
        # Score based on how much budget headroom remains
        ratio = live_p50 / budget
        return max(0.0, 1.0 - ratio)

    def _estimate_request_cost(self, model_id: str, prompt: str) -> float:
        """Estimate cost for a single request."""
        profile = self.MODEL_PROFILES[model_id]
        input_tokens = len(prompt.split()) * 1.3  # rough estimate
        output_tokens = 200  # average assumption
        input_cost = (input_tokens / 1000) * profile["cost_per_1k_input"]
        output_cost = (output_tokens / 1000) * profile["cost_per_1k_output"]
        return input_cost + output_cost

    # ------------------------------------------------------------------
    # Live latency tracking
    # ------------------------------------------------------------------

    def update_live_latency(
        self, model_id: str, latency_ms: float
    ) -> None:
        """Update the exponential moving average of model latency."""
        alpha = 0.1  # Smoothing factor
        if model_id not in self._live_latency:
            self._live_latency[model_id] = {"p50": latency_ms, "p95": latency_ms * 2}
            return

        current = self._live_latency[model_id]
        current["p50"] = alpha * latency_ms + (1 - alpha) * current["p50"]
        # Rough p95 estimation: track the high-water mark with decay
        if latency_ms > current["p95"]:
            current["p95"] = alpha * latency_ms + (1 - alpha) * current["p95"]
        else:
            current["p95"] = 0.98 * current["p95"]  # Slow decay

    # ------------------------------------------------------------------
    # Helpers
    # ------------------------------------------------------------------

    def _build_reason(
        self, model_id: str, scores: dict, complexity: float, mode: RoutingMode
    ) -> str:
        """Build a human-readable reason for the routing decision."""
        model_name = "Sonnet" if "sonnet" in model_id else "Haiku"
        return (
            f"Selected {model_name} under {mode.value} mode "
            f"(complexity={complexity:.2f}, "
            f"quality={scores['quality']:.2f}, "
            f"cost={scores['cost']:.2f}, "
            f"latency={scores['latency']:.2f}, "
            f"total={scores['total']:.2f})"
        )

    def _emit_routing_metric(self, decision: RoutingDecision) -> None:
        """Emit routing decision metrics for monitoring."""
        if self.redis:
            key = f"routing:metrics:{int(time.time()) // 60}"
            model_name = (
                "sonnet" if "sonnet" in decision.selected_model else "haiku"
            )
            self.redis.hincrby(key, f"count:{model_name}", 1)
            self.redis.expire(key, 3600)

    def get_routing_stats(self) -> dict:
        """Return routing statistics from the current session."""
        if not self._routing_history:
            return {"total_decisions": 0}

        sonnet_count = sum(
            1 for d in self._routing_history if "sonnet" in d.selected_model
        )
        haiku_count = len(self._routing_history) - sonnet_count

        return {
            "total_decisions": len(self._routing_history),
            "sonnet_selections": sonnet_count,
            "haiku_selections": haiku_count,
            "sonnet_pct": round(sonnet_count / len(self._routing_history) * 100, 1),
            "avg_complexity": round(
                sum(d.complexity_score for d in self._routing_history)
                / len(self._routing_history),
                3,
            ),
            "avg_estimated_cost": round(
                sum(d.estimated_cost_usd for d in self._routing_history)
                / len(self._routing_history),
                6,
            ),
        }

5.3 AggregationEngine Class

"""
AggregationEngine: Merges, ranks, and resolves outputs from multiple FMs.
Handles multilingual (JP/EN) output merging for MangaAssist.
"""

import re
import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional

logger = logging.getLogger(__name__)


class AggregationMethod(Enum):
    SELECT_BEST = "select_best"
    MERGE_COMPLEMENTARY = "merge_complementary"
    RANKED_FUSION = "ranked_fusion"
    CONFLICT_RESOLUTION = "conflict_resolution"
    SUMMARIZE = "summarize"


class OutputLanguage(Enum):
    JAPANESE = "ja"
    ENGLISH = "en"
    MIXED = "mixed"


@dataclass
class ScoredOutput:
    """A model output with associated quality scores."""
    model_id: str
    content: str
    relevance_score: float = 0.0
    coherence_score: float = 0.0
    completeness_score: float = 0.0
    factual_score: float = 0.0
    tone_score: float = 0.0
    language: OutputLanguage = OutputLanguage.ENGLISH
    overall_score: float = 0.0


@dataclass
class AggregationResult:
    """Result of the aggregation process."""
    final_content: str
    method_used: AggregationMethod
    source_outputs: list
    quality_score: float
    language: OutputLanguage
    conflict_detected: bool = False
    conflict_details: Optional[str] = None
    metadata: dict = field(default_factory=dict)


class AggregationEngine:
    """
    Combines outputs from multiple models into a single coherent response.

    Handles:
    - Quality-based selection (pick the best)
    - Complementary merging (combine non-overlapping parts)
    - Ranked fusion (merge ranked lists like recommendations)
    - Conflict resolution (when models disagree)
    - Multilingual output normalization (JP/EN)
    """

    # Quality dimension weights
    QUALITY_WEIGHTS = {
        "relevance": 0.30,
        "coherence": 0.25,
        "completeness": 0.20,
        "factual": 0.15,
        "tone": 0.10,
    }

    def __init__(self, preferred_language: OutputLanguage = OutputLanguage.JAPANESE):
        self.preferred_language = preferred_language
        self._aggregation_history: list = []

    def aggregate(
        self,
        outputs: list,
        query: str,
        method: Optional[AggregationMethod] = None,
        user_language: Optional[OutputLanguage] = None,
    ) -> AggregationResult:
        """
        Main aggregation entry point.

        Args:
            outputs: List of (model_id, content) tuples from ensemble models.
            query: The original user query.
            method: Aggregation method. Auto-detected if not specified.
            user_language: Preferred output language.

        Returns:
            AggregationResult with the merged response.
        """
        lang = user_language or self.preferred_language

        # Step 1: Score all outputs
        scored = [
            self._score_output(model_id, content, query)
            for model_id, content in outputs
        ]

        # Step 2: Auto-detect aggregation method if not specified
        if method is None:
            method = self._auto_detect_method(scored, query)

        # Step 3: Detect conflicts
        conflict_detected, conflict_details = self._detect_conflicts(scored)

        if conflict_detected and method != AggregationMethod.CONFLICT_RESOLUTION:
            logger.warning(
                "Conflict detected between model outputs: %s", conflict_details
            )
            method = AggregationMethod.CONFLICT_RESOLUTION

        # Step 4: Execute aggregation
        if method == AggregationMethod.SELECT_BEST:
            result = self._select_best(scored, lang)
        elif method == AggregationMethod.MERGE_COMPLEMENTARY:
            result = self._merge_complementary(scored, query, lang)
        elif method == AggregationMethod.RANKED_FUSION:
            result = self._ranked_fusion(scored, lang)
        elif method == AggregationMethod.CONFLICT_RESOLUTION:
            result = self._resolve_conflict(scored, query, lang)
        elif method == AggregationMethod.SUMMARIZE:
            result = self._summarize_outputs(scored, query, lang)
        else:
            result = self._select_best(scored, lang)

        result.method_used = method
        result.conflict_detected = conflict_detected
        result.conflict_details = conflict_details

        # Step 5: Normalize language in final output
        result.final_content = self._normalize_language(
            result.final_content, lang
        )

        self._aggregation_history.append(result)
        return result

    # ------------------------------------------------------------------
    # Scoring
    # ------------------------------------------------------------------

    def _score_output(
        self, model_id: str, content: str, query: str
    ) -> ScoredOutput:
        """Score a single model output across quality dimensions."""
        language = self._detect_language(content)

        relevance = self._compute_relevance(query, content)
        coherence = self._compute_coherence(content)
        completeness = self._compute_completeness(query, content)
        factual = self._compute_factual_grounding(content)
        tone = self._compute_tone_score(content)

        overall = (
            self.QUALITY_WEIGHTS["relevance"] * relevance
            + self.QUALITY_WEIGHTS["coherence"] * coherence
            + self.QUALITY_WEIGHTS["completeness"] * completeness
            + self.QUALITY_WEIGHTS["factual"] * factual
            + self.QUALITY_WEIGHTS["tone"] * tone
        )

        return ScoredOutput(
            model_id=model_id,
            content=content,
            relevance_score=relevance,
            coherence_score=coherence,
            completeness_score=completeness,
            factual_score=factual,
            tone_score=tone,
            language=language,
            overall_score=overall,
        )

    def _compute_relevance(self, query: str, content: str) -> float:
        """Measure response relevance to the query using keyword overlap."""
        query_terms = set(query.lower().split())
        content_terms = set(content.lower().split())
        if not query_terms:
            return 0.5
        overlap = len(query_terms & content_terms)
        return min(overlap / len(query_terms), 1.0)

    def _compute_coherence(self, content: str) -> float:
        """Measure response coherence via structural indicators."""
        score = 0.5  # baseline

        # Ends with proper punctuation
        if content.rstrip().endswith((".", "!", "?", "。", "!", "?")):
            score += 0.2

        # Has paragraph/sentence structure
        sentences = re.split(r"[.!?。!?]+", content)
        if 2 <= len(sentences) <= 20:
            score += 0.2

        # No obvious repetition
        words = content.lower().split()
        if len(words) > 0:
            unique_ratio = len(set(words)) / len(words)
            if unique_ratio > 0.5:
                score += 0.1

        return min(score, 1.0)

    def _compute_completeness(self, query: str, content: str) -> float:
        """Check if the response addresses all parts of the query."""
        # Count question marks / key phrases as sub-questions
        sub_questions = max(
            query.count("?") + query.count("?"), 1
        )
        # Rough heuristic: longer responses are more likely complete
        content_length = len(content)
        length_factor = min(content_length / (sub_questions * 100), 1.0)
        return length_factor

    def _compute_factual_grounding(self, content: str) -> float:
        """Check if the response references verifiable facts."""
        score = 0.5
        # Contains specific numbers, titles, or proper nouns
        if re.search(r"\d+", content):
            score += 0.15
        if re.search(r"「.+?」", content):  # Japanese quotation marks
            score += 0.15
        if re.search(r'"[^"]+?"', content):  # English quotation marks
            score += 0.1
        # References to manga titles, volumes, authors
        manga_indicators = ["巻", "volume", "vol.", "chapter", "話"]
        if any(ind in content.lower() for ind in manga_indicators):
            score += 0.1
        return min(score, 1.0)

    def _compute_tone_score(self, content: str) -> float:
        """Evaluate tone appropriateness for a manga store chatbot."""
        score = 0.6  # baseline — assume adequate
        # Friendly indicators
        friendly = ["!", "!", "ぜひ", "おすすめ", "楽しい", "great", "enjoy", "love"]
        if any(f in content for f in friendly):
            score += 0.2
        # Too formal or robotic
        robotic = [
            "as an AI", "I cannot", "I'm sorry, but",
            "申し訳ございません", "対応できません",
        ]
        if any(r in content for r in robotic):
            score -= 0.2
        return max(0.0, min(score, 1.0))

    # ------------------------------------------------------------------
    # Aggregation strategies
    # ------------------------------------------------------------------

    def _select_best(
        self, scored: list, lang: OutputLanguage
    ) -> AggregationResult:
        """Pick the single best-scoring output."""
        best = max(scored, key=lambda s: s.overall_score)
        return AggregationResult(
            final_content=best.content,
            method_used=AggregationMethod.SELECT_BEST,
            source_outputs=scored,
            quality_score=best.overall_score,
            language=best.language,
        )

    def _merge_complementary(
        self, scored: list, query: str, lang: OutputLanguage
    ) -> AggregationResult:
        """Merge non-overlapping parts from multiple outputs."""
        # Sort by overall score descending
        sorted_outputs = sorted(scored, key=lambda s: s.overall_score, reverse=True)

        primary = sorted_outputs[0]
        merged_content = primary.content

        for secondary in sorted_outputs[1:]:
            # Extract sentences not already covered in primary
            primary_sentences = set(
                s.strip() for s in re.split(r"[.!?。!?]+", primary.content) if s.strip()
            )
            secondary_sentences = [
                s.strip()
                for s in re.split(r"[.!?。!?]+", secondary.content)
                if s.strip() and s.strip() not in primary_sentences
            ]

            if secondary_sentences:
                novel_content = "。".join(secondary_sentences) if lang == OutputLanguage.JAPANESE else ". ".join(secondary_sentences)
                merged_content += "\n\n" + novel_content

        avg_score = sum(s.overall_score for s in scored) / len(scored)
        return AggregationResult(
            final_content=merged_content,
            method_used=AggregationMethod.MERGE_COMPLEMENTARY,
            source_outputs=scored,
            quality_score=min(avg_score * 1.1, 1.0),  # Boost for combined info
            language=lang,
            metadata={"merge_sources": len(scored)},
        )

    def _ranked_fusion(
        self, scored: list, lang: OutputLanguage
    ) -> AggregationResult:
        """Merge ranked lists (e.g., manga recommendations) using RRF."""
        # Reciprocal Rank Fusion (RRF) for combining ranked lists
        k = 60  # Standard RRF constant

        # Extract ranked items from each output
        all_items = {}
        for output in scored:
            items = self._extract_ranked_items(output.content)
            for rank, item in enumerate(items, 1):
                item_key = item.lower().strip()
                if item_key not in all_items:
                    all_items[item_key] = {"display": item, "rrf_score": 0.0}
                all_items[item_key]["rrf_score"] += 1.0 / (k + rank)

        # Sort by RRF score descending
        fused_ranking = sorted(
            all_items.values(), key=lambda x: x["rrf_score"], reverse=True
        )

        # Format as numbered list
        result_items = [
            f"{i+1}. {item['display']}" for i, item in enumerate(fused_ranking[:10])
        ]
        merged = "\n".join(result_items)

        return AggregationResult(
            final_content=merged,
            method_used=AggregationMethod.RANKED_FUSION,
            source_outputs=scored,
            quality_score=max(s.overall_score for s in scored),
            language=lang,
            metadata={"fused_items": len(fused_ranking)},
        )

    def _resolve_conflict(
        self, scored: list, query: str, lang: OutputLanguage
    ) -> AggregationResult:
        """
        Handle conflicting outputs. Strategy:
        1. Trust the higher-scoring model
        2. Add a caveat about uncertainty
        3. Flag for human review if the gap is small
        """
        sorted_outputs = sorted(scored, key=lambda s: s.overall_score, reverse=True)
        best = sorted_outputs[0]
        runner_up = sorted_outputs[1] if len(sorted_outputs) > 1 else None

        score_gap = (
            best.overall_score - runner_up.overall_score if runner_up else 1.0
        )

        if score_gap < 0.1:
            # Close call — add uncertainty caveat
            caveat = (
                "\n\n※ この情報は確認中です。正確な詳細はスタッフにお問い合わせください。"
                if lang == OutputLanguage.JAPANESE
                else "\n\nNote: This information is being verified. "
                "Please check with our staff for exact details."
            )
            content = best.content + caveat
            needs_review = True
        else:
            content = best.content
            needs_review = False

        return AggregationResult(
            final_content=content,
            method_used=AggregationMethod.CONFLICT_RESOLUTION,
            source_outputs=scored,
            quality_score=best.overall_score,
            language=best.language,
            conflict_detected=True,
            conflict_details=(
                f"Score gap: {score_gap:.3f}, "
                f"best={best.model_id} ({best.overall_score:.3f}), "
                f"runner_up={runner_up.model_id if runner_up else 'N/A'}"
            ),
            metadata={"needs_human_review": needs_review, "score_gap": score_gap},
        )

    def _summarize_outputs(
        self, scored: list, query: str, lang: OutputLanguage
    ) -> AggregationResult:
        """Create a summary that captures the consensus from all outputs."""
        # Concatenate all outputs with source labels
        combined = "\n\n".join(
            f"[{s.model_id}]: {s.content}" for s in scored
        )
        # In production, this would be fed to a model for summarization.
        # For now, pick the best and note the consensus.
        best = max(scored, key=lambda s: s.overall_score)
        return AggregationResult(
            final_content=best.content,
            method_used=AggregationMethod.SUMMARIZE,
            source_outputs=scored,
            quality_score=best.overall_score,
            language=best.language,
            metadata={"source_count": len(scored)},
        )

    # ------------------------------------------------------------------
    # Language handling
    # ------------------------------------------------------------------

    def _detect_language(self, content: str) -> OutputLanguage:
        """Detect primary language of the content."""
        jp_chars = sum(1 for ch in content if "\u3040" <= ch <= "\u9fff")
        en_chars = sum(1 for ch in content if "a" <= ch.lower() <= "z")
        total = jp_chars + en_chars
        if total == 0:
            return OutputLanguage.MIXED
        jp_ratio = jp_chars / total
        if jp_ratio > 0.6:
            return OutputLanguage.JAPANESE
        elif jp_ratio < 0.2:
            return OutputLanguage.ENGLISH
        return OutputLanguage.MIXED

    def _normalize_language(
        self, content: str, target: OutputLanguage
    ) -> str:
        """
        Normalize language-specific formatting in the output.
        Does not translate — just ensures consistent formatting.
        """
        if target == OutputLanguage.JAPANESE:
            # Ensure Japanese punctuation
            content = content.replace("...", "…")
            # Ensure full-width numbers in context of Japanese text
            # (lightweight normalization, not full translation)
        elif target == OutputLanguage.ENGLISH:
            # Ensure English punctuation conventions
            content = content.replace("。", ". ").replace("、", ", ")
            content = content.replace("!", "!").replace("?", "?")
        return content

    # ------------------------------------------------------------------
    # Utilities
    # ------------------------------------------------------------------

    def _auto_detect_method(
        self, scored: list, query: str
    ) -> AggregationMethod:
        """Auto-detect the best aggregation method for this scenario."""
        # If query asks for a list/ranking, use ranked fusion
        ranking_keywords = [
            "top", "best", "recommend", "ranking",
            "おすすめ", "ランキング", "一覧", "ベスト",
        ]
        if any(kw in query.lower() for kw in ranking_keywords):
            return AggregationMethod.RANKED_FUSION

        # If outputs are very different in content, merge them
        if len(scored) >= 2:
            overlap = self._content_overlap(scored[0].content, scored[1].content)
            if overlap < 0.3:
                return AggregationMethod.MERGE_COMPLEMENTARY

        # Default: select best
        return AggregationMethod.SELECT_BEST

    def _detect_conflicts(self, scored: list) -> tuple:
        """Detect if model outputs contain contradictory information."""
        if len(scored) < 2:
            return False, None

        # Check for contradictory signals
        contradiction_pairs = [
            ("in stock", "out of stock"),
            ("在庫あり", "在庫なし"),
            ("available", "unavailable"),
            ("yes", "no"),
            ("recommended", "not recommended"),
        ]

        for i, output_a in enumerate(scored):
            for output_b in scored[i + 1:]:
                for pos, neg in contradiction_pairs:
                    a_has_pos = pos in output_a.content.lower()
                    b_has_neg = neg in output_b.content.lower()
                    a_has_neg = neg in output_a.content.lower()
                    b_has_pos = pos in output_b.content.lower()

                    if (a_has_pos and b_has_neg) or (a_has_neg and b_has_pos):
                        detail = (
                            f"Contradiction: {output_a.model_id} says "
                            f"'{pos if a_has_pos else neg}' but "
                            f"{output_b.model_id} says "
                            f"'{neg if b_has_neg else pos}'"
                        )
                        return True, detail

        return False, None

    def _content_overlap(self, content_a: str, content_b: str) -> float:
        """Compute word-level overlap between two texts."""
        words_a = set(content_a.lower().split())
        words_b = set(content_b.lower().split())
        if not words_a or not words_b:
            return 0.0
        intersection = len(words_a & words_b)
        union = len(words_a | words_b)
        return intersection / union if union > 0 else 0.0

    def _extract_ranked_items(self, content: str) -> list:
        """Extract numbered or bulleted list items from model output."""
        items = []
        for line in content.split("\n"):
            line = line.strip()
            # Match "1. Item", "- Item", "* Item", "・Item"
            match = re.match(r"^(?:\d+[.)]\s*|[-*・]\s*)(.+)$", line)
            if match:
                items.append(match.group(1).strip())
        return items

    def get_aggregation_stats(self) -> dict:
        """Return aggregation statistics."""
        if not self._aggregation_history:
            return {"total_aggregations": 0}

        method_counts = {}
        conflict_count = 0
        for result in self._aggregation_history:
            method = result.method_used.value
            method_counts[method] = method_counts.get(method, 0) + 1
            if result.conflict_detected:
                conflict_count += 1

        return {
            "total_aggregations": len(self._aggregation_history),
            "method_distribution": method_counts,
            "conflict_rate": round(
                conflict_count / len(self._aggregation_history), 3
            ),
            "avg_quality": round(
                sum(r.quality_score for r in self._aggregation_history)
                / len(self._aggregation_history),
                3,
            ),
        }

6. Comparison Tables

6.1 Ensemble Strategy Comparison

Strategy Models Invoked Latency Cost Quality Best For
Voting All (2) Parallel — bounded by slowest 2x Moderate Classification tasks (genre, sentiment)
Weighted Average All (2) Parallel — bounded by slowest 2x Good Scoring / ranking tasks
Stacking All + meta-learner (3) Sequential (base + meta) ~2.5x High Complex responses needing synthesis
MoE 1 (selected expert) Single model 1x Good-High General routing (most cost-efficient)
Parallel Best All (2) Parallel — bounded by slowest 2x Highest Quality-critical responses

6.2 Selection Framework Mode Comparison

Mode Quality Weight Cost Weight Latency Weight Sonnet Usage Daily Cost (1M msgs)
Cost Optimized 0.25 0.55 0.20 ~5% ~$300
Balanced 0.40 0.30 0.30 ~20% ~$635
Quality First 0.60 0.10 0.30 ~50% ~$2,700
Latency Critical 0.20 0.15 0.65 ~8% ~$400

6.3 Aggregation Method Comparison

Method When to Use Handles Conflicts Output Quality Latency Impact
Select Best Single best answer needed No (picks one) Depends on best model None
Merge Complementary Models cover different aspects Partial Higher (combined info) Minimal processing
Ranked Fusion Recommendation / list queries N/A High (consensus ranking) Minimal processing
Conflict Resolution Models contradict each other Yes Moderate (flags uncertainty) May add caveat
Summarize Need consensus from all models Partial High Needs summarization step

7. Cost Analysis at Scale

7.1 Monthly Cost Projection (1M messages/day, 30 days)

Strategy Model Invocations/day Daily Cost Monthly Cost Quality Rating
Haiku Only 1M Haiku $50 $1,500 Adequate (78%)
MoE Routing (recommended) 900K Haiku + 100K Sonnet $550 $16,500 Good (85%)
Balanced Routing 800K Haiku + 200K Sonnet $1,050 $31,500 Very Good (88%)
Parallel Best 1M Haiku + 1M Sonnet $5,050 $151,500 Excellent (93%)
Sonnet Only 1M Sonnet $5,000 $150,000 Excellent (92%)

Key insight: MoE routing delivers 85% quality at $16,500/month — a 91% cost reduction compared to Sonnet-only ($150K), with only a 7% quality drop. This is the recommended default for MangaAssist.

7.2 Break-Even Analysis

Quality Target Cheapest Strategy Monthly Cost Cost per Message
78%+ (baseline) Haiku Only $1,500 $0.00005
85%+ (good) MoE Routing $16,500 $0.00055
88%+ (very good) Balanced Routing $31,500 $0.00105
92%+ (excellent) Parallel Best $151,500 $0.00505

8. Key Takeaways

  1. MoE routing is the most cost-effective strategy for MangaAssist — it achieves 85% quality at ~11% of the Sonnet-only cost by routing simple queries to Haiku and complex queries to Sonnet.

  2. Parallel Best is the quality ceiling — querying both models and selecting the best response yields the highest quality but doubles cost. Reserve this for premium-tier users.

  3. Stacking adds a meta-learner layer — feeding both model outputs into a cheap Haiku call for synthesis can improve complex responses, but adds sequential latency (not ideal under 3-second SLA).

  4. Aggregation must handle JP/EN mixing — the AggregationEngine normalizes language, detects conflicts, and ensures consistent tone for the manga store context.

  5. Selection framework weights are tunable per user tier — free users get cost-optimized routing, premium users get quality-first, and the system adapts in real-time to latency conditions.

  6. Live latency tracking prevents SLA violations — the SelectionFramework monitors actual Bedrock response times and automatically fails over to Haiku when Sonnet latency exceeds the 2.6-second model budget.

  7. Conflict detection prevents contradictory answers — the AggregationEngine flags disagreements (e.g., stock status) and either defers to the higher-quality model or adds an uncertainty caveat.

  8. Caching at the ensemble level dramatically reduces cost — storing ensemble results in ElastiCache Redis avoids redundant multi-model invocations for repeated queries (common in FAQ-heavy chatbot traffic).

  9. Cost routing alone saves ~87% versus uniform Sonnet — even without quality scoring, simply routing by request tier (simple/medium/complex) yields $635/day vs. $5,000/day.

  10. Monitor ensemble drift over time — model quality, latency profiles, and traffic patterns change. Re-evaluate routing weights monthly and adjust the complexity classifier as needed.