Model Ensemble Aggregation and Selection Frameworks
MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.
Skill Mapping
| Field | Value |
|---|---|
| Certification | AWS AI Practitioner (AIP-C01) |
| Domain | 2 — Fundamentals of Generative AI |
| Task | 2.1 — Explain the concepts and techniques for building GenAI applications |
| Skill | 2.1.4 — Create sophisticated model coordination systems to optimize performance across multiple capabilities (specialized FMs, custom aggregation logic for model ensembles, model selection frameworks) |
| Focus | Ensemble aggregation patterns, model selection routing, and output merging strategies |
1. Mindmap — Ensemble, Aggregation, and Selection
mindmap
root((Model Coordination<br/>Systems))
Ensemble Patterns
Voting
Majority Vote
Weighted Vote
Unanimous Agreement
Averaging
Simple Average
Weighted Average
Confidence-Weighted
Stacking
Meta-Learner
Cross-Validation Folds
Feature Augmentation
Mixture of Experts
Gating Network
Expert Specialization
Sparse Activation
Selection Frameworks
Cost-Based Routing
Token Budget Allocation
Tier Classification
Spend Caps
Quality-Based Routing
Complexity Scoring
Domain Detection
Confidence Thresholds
Latency Constraints
SLA Tiers
Timeout Fallback
Parallel Dispatch
Hybrid Routing
Multi-Objective Optimization
Pareto Frontier
Dynamic Weighting
Aggregation Logic
Output Merging
Concatenation
Summarization
Ranked Selection
Conflict Resolution
Confidence Comparison
Source Priority
Human Escalation
Quality Scoring
Coherence Check
Factual Consistency
Relevance Score
Production Concerns
Caching
Response Cache
Embedding Cache
Routing Cache
Monitoring
Model Accuracy Drift
Cost Tracking
Latency Percentiles
Scaling
Horizontal Dispatch
Rate Limiting
Backpressure
2. Model Ensemble Patterns
2.1 Pattern Overview
Model ensembles combine outputs from multiple foundation models to produce results that are more accurate, robust, or cost-efficient than any single model alone. In the MangaAssist context, the two primary models — Sonnet (high quality, higher cost) and Haiku (fast, economical) — form a natural two-tier ensemble.
2.2 Voting Ensembles
Voting ensembles query multiple models and select the final answer based on agreement.
| Voting Type | Mechanism | MangaAssist Use Case |
|---|---|---|
| Majority Vote | Pick the answer chosen by >50% of models | Product classification (genre tagging) |
| Weighted Vote | Each model's vote is weighted by confidence or quality score | Manga recommendation ranking |
| Unanimous Agreement | Require all models to agree; escalate otherwise | Content moderation decisions |
When to use voting in MangaAssist: Voting works best for discrete classification tasks — genre tagging, sentiment detection, or content safety flags. For free-form text generation (the majority of chatbot responses), voting is less applicable since outputs are rarely identical.
2.3 Weighted Averaging
Weighted averaging blends numerical or scored outputs from multiple models.
| Averaging Type | Formula | Best For |
|---|---|---|
| Simple Average | (score_A + score_B) / 2 |
Equal-trust scenarios |
| Weighted Average | w_A * score_A + w_B * score_B |
When one model is known to be better for this task |
| Confidence-Weighted | conf_A * score_A + conf_B * score_B |
Dynamic weighting per request |
MangaAssist application: When ranking manga recommendations, Sonnet may produce relevance scores for top-10 results and Haiku may produce its own ranking. A weighted average (e.g., 0.7 * Sonnet + 0.3 * Haiku) produces a blended ranking that leverages Sonnet's quality while incorporating Haiku's perspective.
2.4 Stacking (Meta-Learner)
Stacking uses a secondary model (or lightweight classifier) that takes the outputs of base models as features and produces the final prediction.
┌──────────┐ ┌──────────┐
│ Sonnet │ │ Haiku │
│ Output │ │ Output │
└─────┬─────┘ └─────┬────┘
│ │
▼ ▼
┌────────────────────────┐
│ Meta-Learner │
│ (lightweight model or │
│ rule-based combiner) │
└───────────┬────────────┘
│
▼
Final Response
In MangaAssist, the meta-learner could be a simple rule engine hosted on ECS Fargate that selects the better response based on length, relevance keywords, and Japanese language correctness.
2.5 Mixture of Experts (MoE)
MoE routes each request to a specialized "expert" model based on the request characteristics.
| Component | Role | MangaAssist Example |
|---|---|---|
| Gating Network | Classifies the request and selects experts | Intent classifier on ECS |
| Expert Models | Specialized for specific domains | Sonnet for creative writing, Haiku for FAQ |
| Sparse Activation | Only 1-2 experts handle each request | Only the selected model is invoked (saves cost) |
Key advantage: MoE avoids calling all models for every request. For MangaAssist at 1M messages/day, this means most simple FAQ queries go to Haiku ($0.25/1M input tokens) while complex creative queries go to Sonnet ($3/1M input tokens), producing massive cost savings.
3. Model Selection Frameworks
3.1 Cost-Based Routing
Cost-based routing directs requests to the cheapest model that can handle them adequately.
| Request Tier | Model | Est. Cost per Request | Example Queries |
|---|---|---|---|
| Simple (< 100 tokens, FAQ) | Haiku | ~$0.00005 | "What are your store hours?" |
| Medium (100-500 tokens, product info) | Haiku | ~$0.0002 | "Tell me about One Piece Volume 104" |
| Complex (> 500 tokens, creative/analysis) | Sonnet | ~$0.005 | "Write a review comparing Berserk and Vinland Saga art styles" |
| Critical (safety, compliance) | Sonnet | ~$0.008 | Content moderation, refund policy decisions |
Daily cost at 1M messages (assuming 70% simple, 20% medium, 8% complex, 2% critical):
| Tier | Volume | Cost/Request | Daily Cost |
|---|---|---|---|
| Simple | 700,000 | $0.00005 | $35.00 |
| Medium | 200,000 | $0.0002 | $40.00 |
| Complex | 80,000 | $0.005 | $400.00 |
| Critical | 20,000 | $0.008 | $160.00 |
| Total | 1,000,000 | $635.00 |
Contrast with sending everything to Sonnet: 1M * $0.005 avg = $5,000/day. Cost-based routing saves ~87%.
3.2 Quality-Based Routing
Quality-based routing uses request complexity analysis to select the model most likely to produce a satisfactory answer.
Complexity Scoring Dimensions:
| Dimension | Low (Haiku) | Medium (Haiku+verify) | High (Sonnet) |
|---|---|---|---|
| Token count | < 50 input tokens | 50-200 tokens | > 200 tokens |
| Language mix | Single language | Some mixed | Heavy JP/EN code-switch |
| Domain specificity | General FAQ | Product catalog | Creative, cultural, comparative |
| Reasoning depth | Factual lookup | Single-step inference | Multi-step reasoning |
| Output format | Short text | Structured list | Long-form prose |
3.3 Latency-Constrained Routing
With a 3-second SLA, latency is a hard constraint.
| Model | Avg Latency (p50) | p95 Latency | p99 Latency |
|---|---|---|---|
| Haiku | 200ms | 500ms | 800ms |
| Sonnet | 800ms | 1,800ms | 2,500ms |
| Sonnet (long output) | 1,500ms | 2,800ms | 3,500ms |
Routing rules under latency constraints: 1. If estimated output > 500 tokens and current Sonnet p95 > 2.5s, route to Haiku 2. If real-time latency monitor shows Sonnet degradation, auto-failover to Haiku 3. For WebSocket streaming responses, start with Haiku for instant first token, then optionally upgrade
3.4 Hybrid Multi-Objective Routing
In production, routing decisions balance cost, quality, and latency simultaneously.
Score(model, request) = w_quality * Q(model, request)
+ w_cost * (1 - C(model, request) / C_max)
+ w_latency * (1 - L(model, request) / L_max)
Where weights are tunable:
- Cost-sensitive mode: w_quality=0.3, w_cost=0.5, w_latency=0.2
- Quality-first mode: w_quality=0.6, w_cost=0.1, w_latency=0.3
- Latency-critical mode: w_quality=0.2, w_cost=0.2, w_latency=0.6
4. Aggregation Logic — Combining Sonnet + Haiku Outputs
4.1 When to Aggregate vs. Select
| Strategy | When to Use | MangaAssist Example |
|---|---|---|
| Select one | Outputs are complete alternatives | Chatbot response generation |
| Merge/concatenate | Outputs cover different aspects | Haiku provides facts, Sonnet adds commentary |
| Rank and pick | Both produce lists | Recommendation ranking |
| Summarize both | Need consensus view | Product comparison from two perspectives |
| Conflict resolution | Outputs contradict | Sonnet says "in stock," Haiku says "out of stock" |
4.2 Aggregation Strategies for Multilingual Outputs
MangaAssist handles Japanese and English. Aggregation must account for:
| Challenge | Impact | Mitigation |
|---|---|---|
| Character encoding | Garbled text if UTF-8 not enforced | Enforce UTF-8 at every boundary |
| Token counting | JP characters use more tokens | Use byte-pair encoding aware counting |
| Mixed-script merging | EN/JP sentences interleaved awkwardly | Detect primary language, merge in that language |
| Honorific consistency | One model uses -san, other uses -sama | Normalize to user preference from session |
4.3 Output Quality Scoring for Aggregation
When aggregating outputs, each response needs a quality score to weight its contribution.
| Quality Dimension | Measurement | Weight |
|---|---|---|
| Relevance | Cosine similarity between query embedding and response embedding | 0.30 |
| Coherence | Perplexity score or language model confidence | 0.25 |
| Completeness | Does it answer all parts of the query? (checklist match) | 0.20 |
| Factual grounding | Can claims be traced to RAG sources? | 0.15 |
| Tone appropriateness | Matches expected formality level | 0.10 |
5. Production Python Code
5.1 ModelEnsemble Class
"""
ModelEnsemble: Coordinates multiple FM invocations and combines results.
Designed for MangaAssist's Sonnet + Haiku two-model ensemble on AWS Bedrock.
"""
import asyncio
import time
import json
import hashlib
import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
import boto3
logger = logging.getLogger(__name__)
class EnsembleStrategy(Enum):
VOTING = "voting"
WEIGHTED_AVERAGE = "weighted_average"
STACKING = "stacking"
MIXTURE_OF_EXPERTS = "mixture_of_experts"
PARALLEL_BEST = "parallel_best"
@dataclass
class ModelResponse:
"""Response from a single model invocation."""
model_id: str
content: str
confidence: float
latency_ms: float
input_tokens: int
output_tokens: int
cost_usd: float
metadata: dict = field(default_factory=dict)
@dataclass
class EnsembleResult:
"""Combined result from the ensemble."""
final_content: str
strategy_used: EnsembleStrategy
model_responses: list
total_latency_ms: float
total_cost_usd: float
quality_score: float
metadata: dict = field(default_factory=dict)
class ModelEnsemble:
"""
Orchestrates ensemble inference across Sonnet and Haiku on AWS Bedrock.
Supports multiple ensemble strategies:
- Voting: Each model votes, majority wins (for classification tasks).
- Weighted Average: Blend numeric scores from multiple models.
- Stacking: Feed base model outputs into a meta-learner.
- Mixture of Experts: Route to the best model per request.
- Parallel Best: Query both, pick the best response by quality score.
"""
# Bedrock model identifiers
MODEL_SONNET = "anthropic.claude-3-sonnet-20240229-v1:0"
MODEL_HAIKU = "anthropic.claude-3-haiku-20240307-v1:0"
# Cost per 1M tokens (USD)
COST_TABLE = {
MODEL_SONNET: {"input": 3.00, "output": 15.00},
MODEL_HAIKU: {"input": 0.25, "output": 1.25},
}
def __init__(
self,
bedrock_client=None,
redis_client=None,
default_strategy: EnsembleStrategy = EnsembleStrategy.PARALLEL_BEST,
cache_ttl_seconds: int = 300,
):
self.bedrock = bedrock_client or boto3.client(
"bedrock-runtime", region_name="ap-northeast-1"
)
self.redis = redis_client
self.default_strategy = default_strategy
self.cache_ttl = cache_ttl_seconds
self._invocation_count = {"sonnet": 0, "haiku": 0}
# ------------------------------------------------------------------
# Core ensemble entry point
# ------------------------------------------------------------------
async def ensemble_invoke(
self,
prompt: str,
strategy: Optional[EnsembleStrategy] = None,
models: Optional[list] = None,
weights: Optional[dict] = None,
max_tokens: int = 1024,
temperature: float = 0.3,
) -> EnsembleResult:
"""
Run the ensemble pipeline with the specified strategy.
Args:
prompt: The user query / assembled prompt.
strategy: Ensemble strategy to use (defaults to instance default).
models: List of model IDs to include. Defaults to [Sonnet, Haiku].
weights: Model weights for weighted strategies. e.g. {MODEL_SONNET: 0.7}.
max_tokens: Max output tokens per model.
temperature: Sampling temperature.
Returns:
EnsembleResult with the final merged response and metadata.
"""
strategy = strategy or self.default_strategy
models = models or [self.MODEL_SONNET, self.MODEL_HAIKU]
weights = weights or {self.MODEL_SONNET: 0.7, self.MODEL_HAIKU: 0.3}
# Check cache first
cached = self._check_cache(prompt, strategy)
if cached:
logger.info("Ensemble cache hit for prompt hash")
return cached
start_time = time.monotonic()
if strategy == EnsembleStrategy.VOTING:
result = await self._voting_ensemble(
prompt, models, max_tokens, temperature
)
elif strategy == EnsembleStrategy.WEIGHTED_AVERAGE:
result = await self._weighted_average_ensemble(
prompt, models, weights, max_tokens, temperature
)
elif strategy == EnsembleStrategy.STACKING:
result = await self._stacking_ensemble(
prompt, models, max_tokens, temperature
)
elif strategy == EnsembleStrategy.MIXTURE_OF_EXPERTS:
result = await self._moe_ensemble(
prompt, max_tokens, temperature
)
elif strategy == EnsembleStrategy.PARALLEL_BEST:
result = await self._parallel_best_ensemble(
prompt, models, max_tokens, temperature
)
else:
raise ValueError(f"Unknown ensemble strategy: {strategy}")
result.total_latency_ms = (time.monotonic() - start_time) * 1000
result.strategy_used = strategy
# Store in cache
self._store_cache(prompt, strategy, result)
logger.info(
"Ensemble complete: strategy=%s latency=%.0fms cost=$%.6f",
strategy.value,
result.total_latency_ms,
result.total_cost_usd,
)
return result
# ------------------------------------------------------------------
# Strategy implementations
# ------------------------------------------------------------------
async def _voting_ensemble(
self, prompt, models, max_tokens, temperature
) -> EnsembleResult:
"""Query all models; pick the majority-agreed answer (classification)."""
responses = await self._invoke_all_parallel(
prompt, models, max_tokens, temperature
)
# For classification: extract the "label" from each response
votes = {}
for resp in responses:
label = self._extract_classification_label(resp.content)
votes[label] = votes.get(label, 0) + 1
# Majority vote
winning_label = max(votes, key=votes.get)
winning_response = next(
r for r in responses
if self._extract_classification_label(r.content) == winning_label
)
total_cost = sum(r.cost_usd for r in responses)
return EnsembleResult(
final_content=winning_response.content,
strategy_used=EnsembleStrategy.VOTING,
model_responses=responses,
total_latency_ms=0.0,
total_cost_usd=total_cost,
quality_score=votes[winning_label] / len(responses),
metadata={"votes": votes, "winning_label": winning_label},
)
async def _weighted_average_ensemble(
self, prompt, models, weights, max_tokens, temperature
) -> EnsembleResult:
"""Blend scored outputs using model-specific weights."""
responses = await self._invoke_all_parallel(
prompt, models, max_tokens, temperature
)
# Score each response on quality dimensions
scored = []
for resp in responses:
quality = self._score_response_quality(prompt, resp.content)
weight = weights.get(resp.model_id, 0.5)
scored.append((resp, quality, weight))
# Weighted selection: pick the response with highest weighted score
best_resp, best_quality, _ = max(
scored, key=lambda x: x[1] * x[2]
)
total_cost = sum(r.cost_usd for r in responses)
return EnsembleResult(
final_content=best_resp.content,
strategy_used=EnsembleStrategy.WEIGHTED_AVERAGE,
model_responses=responses,
total_latency_ms=0.0,
total_cost_usd=total_cost,
quality_score=best_quality,
metadata={
"scores": [
{
"model": r.model_id,
"quality": q,
"weight": w,
"weighted_score": q * w,
}
for r, q, w in scored
]
},
)
async def _stacking_ensemble(
self, prompt, models, max_tokens, temperature
) -> EnsembleResult:
"""
Two-stage stacking: base models generate responses, then a
meta-learner (Haiku, cheap) picks or synthesizes the best answer.
"""
# Stage 1: Base model responses
base_responses = await self._invoke_all_parallel(
prompt, models, max_tokens, temperature
)
# Stage 2: Meta-learner synthesizes
meta_prompt = self._build_meta_learner_prompt(prompt, base_responses)
meta_response = await self._invoke_single(
self.MODEL_HAIKU, meta_prompt, max_tokens, temperature=0.1
)
total_cost = sum(r.cost_usd for r in base_responses) + meta_response.cost_usd
all_responses = base_responses + [meta_response]
return EnsembleResult(
final_content=meta_response.content,
strategy_used=EnsembleStrategy.STACKING,
model_responses=all_responses,
total_latency_ms=0.0,
total_cost_usd=total_cost,
quality_score=self._score_response_quality(
prompt, meta_response.content
),
metadata={"meta_model": self.MODEL_HAIKU},
)
async def _moe_ensemble(
self, prompt, max_tokens, temperature
) -> EnsembleResult:
"""
Mixture of Experts: classify the request, route to the best model.
Only one model is invoked — this is the most cost-efficient strategy.
"""
# Gate: classify the request complexity
complexity = self._classify_request_complexity(prompt)
if complexity in ("simple", "medium"):
selected_model = self.MODEL_HAIKU
else:
selected_model = self.MODEL_SONNET
response = await self._invoke_single(
selected_model, prompt, max_tokens, temperature
)
return EnsembleResult(
final_content=response.content,
strategy_used=EnsembleStrategy.MIXTURE_OF_EXPERTS,
model_responses=[response],
total_latency_ms=0.0,
total_cost_usd=response.cost_usd,
quality_score=response.confidence,
metadata={
"complexity": complexity,
"routed_to": selected_model,
},
)
async def _parallel_best_ensemble(
self, prompt, models, max_tokens, temperature
) -> EnsembleResult:
"""
Query all models in parallel. Pick the highest-quality response.
Use when quality matters most and budget allows.
"""
responses = await self._invoke_all_parallel(
prompt, models, max_tokens, temperature
)
# Score each and pick the best
scored = [
(resp, self._score_response_quality(prompt, resp.content))
for resp in responses
]
best_resp, best_score = max(scored, key=lambda x: x[1])
total_cost = sum(r.cost_usd for r in responses)
return EnsembleResult(
final_content=best_resp.content,
strategy_used=EnsembleStrategy.PARALLEL_BEST,
model_responses=responses,
total_latency_ms=0.0,
total_cost_usd=total_cost,
quality_score=best_score,
metadata={
"scores": {r.model_id: s for r, s in scored},
"selected_model": best_resp.model_id,
},
)
# ------------------------------------------------------------------
# Model invocation helpers
# ------------------------------------------------------------------
async def _invoke_single(
self, model_id: str, prompt: str, max_tokens: int, temperature: float
) -> ModelResponse:
"""Invoke a single Bedrock model and return a ModelResponse."""
start = time.monotonic()
body = json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": max_tokens,
"temperature": temperature,
"messages": [{"role": "user", "content": prompt}],
})
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
lambda: self.bedrock.invoke_model(
modelId=model_id,
contentType="application/json",
accept="application/json",
body=body,
),
)
result = json.loads(response["body"].read())
latency = (time.monotonic() - start) * 1000
content = result["content"][0]["text"]
input_tokens = result["usage"]["input_tokens"]
output_tokens = result["usage"]["output_tokens"]
cost = self._calculate_cost(model_id, input_tokens, output_tokens)
# Track invocation counts
short_name = "sonnet" if "sonnet" in model_id else "haiku"
self._invocation_count[short_name] += 1
return ModelResponse(
model_id=model_id,
content=content,
confidence=self._estimate_confidence(result),
latency_ms=latency,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost_usd=cost,
)
async def _invoke_all_parallel(
self, prompt: str, models: list, max_tokens: int, temperature: float
) -> list:
"""Invoke all models concurrently and collect responses."""
tasks = [
self._invoke_single(model_id, prompt, max_tokens, temperature)
for model_id in models
]
return await asyncio.gather(*tasks)
# ------------------------------------------------------------------
# Scoring and classification helpers
# ------------------------------------------------------------------
def _classify_request_complexity(self, prompt: str) -> str:
"""
Fast, heuristic-based complexity classifier for MoE gating.
No model invocation needed — runs in <1ms.
"""
token_estimate = len(prompt.split())
has_japanese = any("\u3040" <= ch <= "\u9fff" for ch in prompt)
multi_sentence = prompt.count("?") > 1 or prompt.count("。") > 1
creative_keywords = [
"compare", "analyze", "review", "recommend", "explain why",
"比較", "分析", "レビュー", "おすすめ", "なぜ",
]
is_creative = any(kw in prompt.lower() for kw in creative_keywords)
score = 0
if token_estimate > 50:
score += 1
if token_estimate > 150:
score += 1
if has_japanese:
score += 1
if multi_sentence:
score += 1
if is_creative:
score += 2
if score <= 1:
return "simple"
elif score <= 3:
return "medium"
else:
return "complex"
def _score_response_quality(self, prompt: str, response: str) -> float:
"""
Lightweight quality scoring for response selection.
Returns a 0.0 to 1.0 score.
"""
score = 0.0
# Length appropriateness (not too short, not excessively long)
resp_len = len(response)
if 50 < resp_len < 2000:
score += 0.25
elif resp_len >= 2000:
score += 0.15
# Contains relevant keywords from the prompt
prompt_words = set(prompt.lower().split())
response_words = set(response.lower().split())
overlap = len(prompt_words & response_words) / max(len(prompt_words), 1)
score += min(overlap * 0.5, 0.25)
# Structural quality indicators
if any(marker in response for marker in ["1.", "- ", "* ", "**"]):
score += 0.15 # Has structure / formatting
# Japanese content quality (if prompt was Japanese)
has_jp_prompt = any("\u3040" <= ch <= "\u9fff" for ch in prompt)
has_jp_response = any("\u3040" <= ch <= "\u9fff" for ch in response)
if has_jp_prompt and has_jp_response:
score += 0.20 # Responded in the appropriate language
# Coherence: no obvious truncation
if response.rstrip().endswith((".", "!", "?", "。", "!", "?")):
score += 0.15 # Ends with proper punctuation
return min(score, 1.0)
def _extract_classification_label(self, content: str) -> str:
"""Extract a classification label from a model response."""
# Simple extraction: take the first line, strip whitespace
first_line = content.strip().split("\n")[0].strip()
return first_line.lower()
def _estimate_confidence(self, bedrock_result: dict) -> float:
"""Estimate model confidence from Bedrock response metadata."""
stop_reason = bedrock_result.get("stop_reason", "end_turn")
if stop_reason == "end_turn":
return 0.9
elif stop_reason == "max_tokens":
return 0.5 # Truncated — lower confidence
return 0.7
# ------------------------------------------------------------------
# Cost calculation
# ------------------------------------------------------------------
def _calculate_cost(
self, model_id: str, input_tokens: int, output_tokens: int
) -> float:
"""Calculate USD cost for a single invocation."""
rates = self.COST_TABLE.get(model_id, {"input": 3.0, "output": 15.0})
input_cost = (input_tokens / 1_000_000) * rates["input"]
output_cost = (output_tokens / 1_000_000) * rates["output"]
return input_cost + output_cost
# ------------------------------------------------------------------
# Caching
# ------------------------------------------------------------------
def _cache_key(self, prompt: str, strategy: EnsembleStrategy) -> str:
"""Generate a deterministic cache key."""
raw = f"{strategy.value}:{prompt}"
return f"ensemble:{hashlib.sha256(raw.encode()).hexdigest()[:16]}"
def _check_cache(
self, prompt: str, strategy: EnsembleStrategy
) -> Optional[EnsembleResult]:
"""Check Redis cache for a previous ensemble result."""
if not self.redis:
return None
key = self._cache_key(prompt, strategy)
cached = self.redis.get(key)
if cached:
data = json.loads(cached)
return EnsembleResult(**data)
return None
def _store_cache(
self, prompt: str, strategy: EnsembleStrategy, result: EnsembleResult
) -> None:
"""Store ensemble result in Redis cache."""
if not self.redis:
return
key = self._cache_key(prompt, strategy)
# Store only essential fields for cache
cache_data = json.dumps({
"final_content": result.final_content,
"strategy_used": result.strategy_used.value,
"model_responses": [],
"total_latency_ms": result.total_latency_ms,
"total_cost_usd": result.total_cost_usd,
"quality_score": result.quality_score,
"metadata": {"cached": True},
})
self.redis.setex(key, self.cache_ttl, cache_data)
# ------------------------------------------------------------------
# Meta-learner prompt builder for stacking
# ------------------------------------------------------------------
def _build_meta_learner_prompt(
self, original_prompt: str, base_responses: list
) -> str:
"""Build a prompt for the meta-learner in the stacking strategy."""
responses_text = "\n\n".join(
f"--- Response from {r.model_id} (confidence: {r.confidence:.2f}) ---\n{r.content}"
for r in base_responses
)
return (
f"You are a response quality judge. Given the original question and "
f"multiple candidate responses, synthesize the best possible answer.\n\n"
f"Original question: {original_prompt}\n\n"
f"Candidate responses:\n{responses_text}\n\n"
f"Provide the best synthesized answer. If one response is clearly "
f"superior, use it directly. If both have strengths, combine them."
)
def get_invocation_stats(self) -> dict:
"""Return cumulative invocation counts and estimated costs."""
return {
"invocation_counts": dict(self._invocation_count),
"total_invocations": sum(self._invocation_count.values()),
}
5.2 SelectionFramework Class
"""
SelectionFramework: Multi-objective model selection for MangaAssist.
Routes each request to the optimal model based on cost, quality, and latency.
"""
import time
import logging
from dataclasses import dataclass
from enum import Enum
from typing import Optional
logger = logging.getLogger(__name__)
class RoutingMode(Enum):
COST_OPTIMIZED = "cost_optimized"
QUALITY_FIRST = "quality_first"
LATENCY_CRITICAL = "latency_critical"
BALANCED = "balanced"
@dataclass
class RoutingDecision:
"""The result of a model selection decision."""
selected_model: str
routing_mode: RoutingMode
complexity_score: float
estimated_cost_usd: float
estimated_latency_ms: float
confidence: float
reason: str
fallback_model: Optional[str] = None
@dataclass
class LatencyBudget:
"""Latency constraints for routing decisions."""
total_budget_ms: float = 3000.0 # MangaAssist 3-second SLA
rag_retrieval_ms: float = 200.0
preprocessing_ms: float = 100.0
postprocessing_ms: float = 100.0
@property
def model_budget_ms(self) -> float:
"""Time budget remaining for model inference."""
overhead = self.rag_retrieval_ms + self.preprocessing_ms + self.postprocessing_ms
return self.total_budget_ms - overhead # Typically ~2600ms
class SelectionFramework:
"""
Routes requests to the optimal Bedrock model based on multi-objective
scoring across cost, quality, and latency dimensions.
Architecture:
- Request arrives at ECS Fargate orchestrator
- SelectionFramework analyzes request characteristics
- Returns a RoutingDecision with selected model + fallback
- Orchestrator invokes the selected model via Bedrock
"""
MODEL_SONNET = "anthropic.claude-3-sonnet-20240229-v1:0"
MODEL_HAIKU = "anthropic.claude-3-haiku-20240307-v1:0"
# Model profiles: latency (p50) in ms, cost per avg request
MODEL_PROFILES = {
MODEL_SONNET: {
"latency_p50_ms": 800,
"latency_p95_ms": 1800,
"latency_p99_ms": 2500,
"cost_per_1k_input": 0.003,
"cost_per_1k_output": 0.015,
"quality_baseline": 0.92,
},
MODEL_HAIKU: {
"latency_p50_ms": 200,
"latency_p95_ms": 500,
"latency_p99_ms": 800,
"quality_baseline": 0.78,
"cost_per_1k_input": 0.00025,
"cost_per_1k_output": 0.00125,
},
}
# Weight presets for each routing mode
MODE_WEIGHTS = {
RoutingMode.COST_OPTIMIZED: {
"quality": 0.25, "cost": 0.55, "latency": 0.20
},
RoutingMode.QUALITY_FIRST: {
"quality": 0.60, "cost": 0.10, "latency": 0.30
},
RoutingMode.LATENCY_CRITICAL: {
"quality": 0.20, "cost": 0.15, "latency": 0.65
},
RoutingMode.BALANCED: {
"quality": 0.40, "cost": 0.30, "latency": 0.30
},
}
def __init__(
self,
default_mode: RoutingMode = RoutingMode.BALANCED,
latency_budget: Optional[LatencyBudget] = None,
redis_client=None,
):
self.default_mode = default_mode
self.latency_budget = latency_budget or LatencyBudget()
self.redis = redis_client
self._routing_history: list = []
self._live_latency = {
self.MODEL_SONNET: {"p50": 800, "p95": 1800},
self.MODEL_HAIKU: {"p50": 200, "p95": 500},
}
def select_model(
self,
prompt: str,
mode: Optional[RoutingMode] = None,
user_tier: str = "standard",
session_context: Optional[dict] = None,
) -> RoutingDecision:
"""
Analyze the request and select the optimal model.
Args:
prompt: User query text.
mode: Override routing mode (uses default if not specified).
user_tier: User subscription tier ("free", "standard", "premium").
session_context: Optional session metadata from DynamoDB.
Returns:
RoutingDecision with the selected model and reasoning.
"""
mode = mode or self.default_mode
# Step 1: Classify request complexity
complexity = self._compute_complexity_score(prompt, session_context)
# Step 2: Apply user tier overrides
if user_tier == "premium":
mode = RoutingMode.QUALITY_FIRST
elif user_tier == "free":
mode = RoutingMode.COST_OPTIMIZED
# Step 3: Score each model
weights = self.MODE_WEIGHTS[mode]
model_scores = {}
for model_id, profile in self.MODEL_PROFILES.items():
q_score = self._quality_score(model_id, complexity)
c_score = self._cost_score(model_id, prompt)
l_score = self._latency_score(model_id)
total = (
weights["quality"] * q_score
+ weights["cost"] * c_score
+ weights["latency"] * l_score
)
model_scores[model_id] = {
"total": total,
"quality": q_score,
"cost": c_score,
"latency": l_score,
}
# Step 4: Select the winning model
selected = max(model_scores, key=lambda m: model_scores[m]["total"])
fallback = min(model_scores, key=lambda m: model_scores[m]["total"])
# Step 5: Latency hard constraint check
model_budget = self.latency_budget.model_budget_ms
live_p95 = self._live_latency.get(selected, {}).get("p95", 2000)
if live_p95 > model_budget:
logger.warning(
"Selected model %s p95 latency %dms exceeds budget %dms, "
"falling back to %s",
selected, live_p95, model_budget, fallback,
)
selected, fallback = fallback, selected
# Step 6: Build decision
est_cost = self._estimate_request_cost(selected, prompt)
est_latency = self._live_latency.get(selected, {}).get("p50", 500)
scores = model_scores[selected]
decision = RoutingDecision(
selected_model=selected,
routing_mode=mode,
complexity_score=complexity,
estimated_cost_usd=est_cost,
estimated_latency_ms=est_latency,
confidence=scores["total"],
reason=self._build_reason(selected, scores, complexity, mode),
fallback_model=fallback,
)
self._routing_history.append(decision)
self._emit_routing_metric(decision)
return decision
# ------------------------------------------------------------------
# Scoring functions
# ------------------------------------------------------------------
def _compute_complexity_score(
self, prompt: str, session_context: Optional[dict] = None
) -> float:
"""
Compute a 0.0 to 1.0 complexity score for the request.
Higher complexity favors Sonnet; lower favors Haiku.
"""
score = 0.0
token_est = len(prompt.split())
# Token length factor
if token_est > 100:
score += 0.2
if token_est > 250:
score += 0.15
# Japanese content factor
jp_chars = sum(1 for ch in prompt if "\u3040" <= ch <= "\u9fff")
if jp_chars > 10:
score += 0.15
# Multi-turn conversation depth
if session_context:
turn_count = session_context.get("turn_count", 0)
if turn_count > 5:
score += 0.1
if turn_count > 10:
score += 0.1
# Creative / analytical keywords
creative_indicators = [
"compare", "analyze", "explain why", "write a review",
"比較", "分析", "なぜ", "レビュー", "おすすめ",
]
if any(kw in prompt.lower() for kw in creative_indicators):
score += 0.25
# Explicit format requests (lists, tables, structured output)
format_indicators = ["list", "table", "step by step", "一覧", "表"]
if any(kw in prompt.lower() for kw in format_indicators):
score += 0.1
return min(score, 1.0)
def _quality_score(self, model_id: str, complexity: float) -> float:
"""Quality score: higher-capability models score better on complex tasks."""
baseline = self.MODEL_PROFILES[model_id]["quality_baseline"]
# Sonnet's advantage grows with complexity
if "sonnet" in model_id:
return baseline + (complexity * 0.08)
# Haiku is fine for simple tasks but degrades on complex ones
return baseline - (complexity * 0.15)
def _cost_score(self, model_id: str, prompt: str) -> float:
"""Cost score: cheaper models score higher (inverted cost)."""
est_cost = self._estimate_request_cost(model_id, prompt)
# Normalize: Haiku ~$0.00005, Sonnet ~$0.005
# Use log scale for fair comparison
if est_cost <= 0:
return 1.0
# Lower cost = higher score
max_cost = 0.01 # $0.01 ceiling
return max(0.0, 1.0 - (est_cost / max_cost))
def _latency_score(self, model_id: str) -> float:
"""Latency score based on live metrics."""
budget = self.latency_budget.model_budget_ms
live_p50 = self._live_latency.get(model_id, {}).get("p50", 1000)
# Score based on how much budget headroom remains
ratio = live_p50 / budget
return max(0.0, 1.0 - ratio)
def _estimate_request_cost(self, model_id: str, prompt: str) -> float:
"""Estimate cost for a single request."""
profile = self.MODEL_PROFILES[model_id]
input_tokens = len(prompt.split()) * 1.3 # rough estimate
output_tokens = 200 # average assumption
input_cost = (input_tokens / 1000) * profile["cost_per_1k_input"]
output_cost = (output_tokens / 1000) * profile["cost_per_1k_output"]
return input_cost + output_cost
# ------------------------------------------------------------------
# Live latency tracking
# ------------------------------------------------------------------
def update_live_latency(
self, model_id: str, latency_ms: float
) -> None:
"""Update the exponential moving average of model latency."""
alpha = 0.1 # Smoothing factor
if model_id not in self._live_latency:
self._live_latency[model_id] = {"p50": latency_ms, "p95": latency_ms * 2}
return
current = self._live_latency[model_id]
current["p50"] = alpha * latency_ms + (1 - alpha) * current["p50"]
# Rough p95 estimation: track the high-water mark with decay
if latency_ms > current["p95"]:
current["p95"] = alpha * latency_ms + (1 - alpha) * current["p95"]
else:
current["p95"] = 0.98 * current["p95"] # Slow decay
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _build_reason(
self, model_id: str, scores: dict, complexity: float, mode: RoutingMode
) -> str:
"""Build a human-readable reason for the routing decision."""
model_name = "Sonnet" if "sonnet" in model_id else "Haiku"
return (
f"Selected {model_name} under {mode.value} mode "
f"(complexity={complexity:.2f}, "
f"quality={scores['quality']:.2f}, "
f"cost={scores['cost']:.2f}, "
f"latency={scores['latency']:.2f}, "
f"total={scores['total']:.2f})"
)
def _emit_routing_metric(self, decision: RoutingDecision) -> None:
"""Emit routing decision metrics for monitoring."""
if self.redis:
key = f"routing:metrics:{int(time.time()) // 60}"
model_name = (
"sonnet" if "sonnet" in decision.selected_model else "haiku"
)
self.redis.hincrby(key, f"count:{model_name}", 1)
self.redis.expire(key, 3600)
def get_routing_stats(self) -> dict:
"""Return routing statistics from the current session."""
if not self._routing_history:
return {"total_decisions": 0}
sonnet_count = sum(
1 for d in self._routing_history if "sonnet" in d.selected_model
)
haiku_count = len(self._routing_history) - sonnet_count
return {
"total_decisions": len(self._routing_history),
"sonnet_selections": sonnet_count,
"haiku_selections": haiku_count,
"sonnet_pct": round(sonnet_count / len(self._routing_history) * 100, 1),
"avg_complexity": round(
sum(d.complexity_score for d in self._routing_history)
/ len(self._routing_history),
3,
),
"avg_estimated_cost": round(
sum(d.estimated_cost_usd for d in self._routing_history)
/ len(self._routing_history),
6,
),
}
5.3 AggregationEngine Class
"""
AggregationEngine: Merges, ranks, and resolves outputs from multiple FMs.
Handles multilingual (JP/EN) output merging for MangaAssist.
"""
import re
import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
logger = logging.getLogger(__name__)
class AggregationMethod(Enum):
SELECT_BEST = "select_best"
MERGE_COMPLEMENTARY = "merge_complementary"
RANKED_FUSION = "ranked_fusion"
CONFLICT_RESOLUTION = "conflict_resolution"
SUMMARIZE = "summarize"
class OutputLanguage(Enum):
JAPANESE = "ja"
ENGLISH = "en"
MIXED = "mixed"
@dataclass
class ScoredOutput:
"""A model output with associated quality scores."""
model_id: str
content: str
relevance_score: float = 0.0
coherence_score: float = 0.0
completeness_score: float = 0.0
factual_score: float = 0.0
tone_score: float = 0.0
language: OutputLanguage = OutputLanguage.ENGLISH
overall_score: float = 0.0
@dataclass
class AggregationResult:
"""Result of the aggregation process."""
final_content: str
method_used: AggregationMethod
source_outputs: list
quality_score: float
language: OutputLanguage
conflict_detected: bool = False
conflict_details: Optional[str] = None
metadata: dict = field(default_factory=dict)
class AggregationEngine:
"""
Combines outputs from multiple models into a single coherent response.
Handles:
- Quality-based selection (pick the best)
- Complementary merging (combine non-overlapping parts)
- Ranked fusion (merge ranked lists like recommendations)
- Conflict resolution (when models disagree)
- Multilingual output normalization (JP/EN)
"""
# Quality dimension weights
QUALITY_WEIGHTS = {
"relevance": 0.30,
"coherence": 0.25,
"completeness": 0.20,
"factual": 0.15,
"tone": 0.10,
}
def __init__(self, preferred_language: OutputLanguage = OutputLanguage.JAPANESE):
self.preferred_language = preferred_language
self._aggregation_history: list = []
def aggregate(
self,
outputs: list,
query: str,
method: Optional[AggregationMethod] = None,
user_language: Optional[OutputLanguage] = None,
) -> AggregationResult:
"""
Main aggregation entry point.
Args:
outputs: List of (model_id, content) tuples from ensemble models.
query: The original user query.
method: Aggregation method. Auto-detected if not specified.
user_language: Preferred output language.
Returns:
AggregationResult with the merged response.
"""
lang = user_language or self.preferred_language
# Step 1: Score all outputs
scored = [
self._score_output(model_id, content, query)
for model_id, content in outputs
]
# Step 2: Auto-detect aggregation method if not specified
if method is None:
method = self._auto_detect_method(scored, query)
# Step 3: Detect conflicts
conflict_detected, conflict_details = self._detect_conflicts(scored)
if conflict_detected and method != AggregationMethod.CONFLICT_RESOLUTION:
logger.warning(
"Conflict detected between model outputs: %s", conflict_details
)
method = AggregationMethod.CONFLICT_RESOLUTION
# Step 4: Execute aggregation
if method == AggregationMethod.SELECT_BEST:
result = self._select_best(scored, lang)
elif method == AggregationMethod.MERGE_COMPLEMENTARY:
result = self._merge_complementary(scored, query, lang)
elif method == AggregationMethod.RANKED_FUSION:
result = self._ranked_fusion(scored, lang)
elif method == AggregationMethod.CONFLICT_RESOLUTION:
result = self._resolve_conflict(scored, query, lang)
elif method == AggregationMethod.SUMMARIZE:
result = self._summarize_outputs(scored, query, lang)
else:
result = self._select_best(scored, lang)
result.method_used = method
result.conflict_detected = conflict_detected
result.conflict_details = conflict_details
# Step 5: Normalize language in final output
result.final_content = self._normalize_language(
result.final_content, lang
)
self._aggregation_history.append(result)
return result
# ------------------------------------------------------------------
# Scoring
# ------------------------------------------------------------------
def _score_output(
self, model_id: str, content: str, query: str
) -> ScoredOutput:
"""Score a single model output across quality dimensions."""
language = self._detect_language(content)
relevance = self._compute_relevance(query, content)
coherence = self._compute_coherence(content)
completeness = self._compute_completeness(query, content)
factual = self._compute_factual_grounding(content)
tone = self._compute_tone_score(content)
overall = (
self.QUALITY_WEIGHTS["relevance"] * relevance
+ self.QUALITY_WEIGHTS["coherence"] * coherence
+ self.QUALITY_WEIGHTS["completeness"] * completeness
+ self.QUALITY_WEIGHTS["factual"] * factual
+ self.QUALITY_WEIGHTS["tone"] * tone
)
return ScoredOutput(
model_id=model_id,
content=content,
relevance_score=relevance,
coherence_score=coherence,
completeness_score=completeness,
factual_score=factual,
tone_score=tone,
language=language,
overall_score=overall,
)
def _compute_relevance(self, query: str, content: str) -> float:
"""Measure response relevance to the query using keyword overlap."""
query_terms = set(query.lower().split())
content_terms = set(content.lower().split())
if not query_terms:
return 0.5
overlap = len(query_terms & content_terms)
return min(overlap / len(query_terms), 1.0)
def _compute_coherence(self, content: str) -> float:
"""Measure response coherence via structural indicators."""
score = 0.5 # baseline
# Ends with proper punctuation
if content.rstrip().endswith((".", "!", "?", "。", "!", "?")):
score += 0.2
# Has paragraph/sentence structure
sentences = re.split(r"[.!?。!?]+", content)
if 2 <= len(sentences) <= 20:
score += 0.2
# No obvious repetition
words = content.lower().split()
if len(words) > 0:
unique_ratio = len(set(words)) / len(words)
if unique_ratio > 0.5:
score += 0.1
return min(score, 1.0)
def _compute_completeness(self, query: str, content: str) -> float:
"""Check if the response addresses all parts of the query."""
# Count question marks / key phrases as sub-questions
sub_questions = max(
query.count("?") + query.count("?"), 1
)
# Rough heuristic: longer responses are more likely complete
content_length = len(content)
length_factor = min(content_length / (sub_questions * 100), 1.0)
return length_factor
def _compute_factual_grounding(self, content: str) -> float:
"""Check if the response references verifiable facts."""
score = 0.5
# Contains specific numbers, titles, or proper nouns
if re.search(r"\d+", content):
score += 0.15
if re.search(r"「.+?」", content): # Japanese quotation marks
score += 0.15
if re.search(r'"[^"]+?"', content): # English quotation marks
score += 0.1
# References to manga titles, volumes, authors
manga_indicators = ["巻", "volume", "vol.", "chapter", "話"]
if any(ind in content.lower() for ind in manga_indicators):
score += 0.1
return min(score, 1.0)
def _compute_tone_score(self, content: str) -> float:
"""Evaluate tone appropriateness for a manga store chatbot."""
score = 0.6 # baseline — assume adequate
# Friendly indicators
friendly = ["!", "!", "ぜひ", "おすすめ", "楽しい", "great", "enjoy", "love"]
if any(f in content for f in friendly):
score += 0.2
# Too formal or robotic
robotic = [
"as an AI", "I cannot", "I'm sorry, but",
"申し訳ございません", "対応できません",
]
if any(r in content for r in robotic):
score -= 0.2
return max(0.0, min(score, 1.0))
# ------------------------------------------------------------------
# Aggregation strategies
# ------------------------------------------------------------------
def _select_best(
self, scored: list, lang: OutputLanguage
) -> AggregationResult:
"""Pick the single best-scoring output."""
best = max(scored, key=lambda s: s.overall_score)
return AggregationResult(
final_content=best.content,
method_used=AggregationMethod.SELECT_BEST,
source_outputs=scored,
quality_score=best.overall_score,
language=best.language,
)
def _merge_complementary(
self, scored: list, query: str, lang: OutputLanguage
) -> AggregationResult:
"""Merge non-overlapping parts from multiple outputs."""
# Sort by overall score descending
sorted_outputs = sorted(scored, key=lambda s: s.overall_score, reverse=True)
primary = sorted_outputs[0]
merged_content = primary.content
for secondary in sorted_outputs[1:]:
# Extract sentences not already covered in primary
primary_sentences = set(
s.strip() for s in re.split(r"[.!?。!?]+", primary.content) if s.strip()
)
secondary_sentences = [
s.strip()
for s in re.split(r"[.!?。!?]+", secondary.content)
if s.strip() and s.strip() not in primary_sentences
]
if secondary_sentences:
novel_content = "。".join(secondary_sentences) if lang == OutputLanguage.JAPANESE else ". ".join(secondary_sentences)
merged_content += "\n\n" + novel_content
avg_score = sum(s.overall_score for s in scored) / len(scored)
return AggregationResult(
final_content=merged_content,
method_used=AggregationMethod.MERGE_COMPLEMENTARY,
source_outputs=scored,
quality_score=min(avg_score * 1.1, 1.0), # Boost for combined info
language=lang,
metadata={"merge_sources": len(scored)},
)
def _ranked_fusion(
self, scored: list, lang: OutputLanguage
) -> AggregationResult:
"""Merge ranked lists (e.g., manga recommendations) using RRF."""
# Reciprocal Rank Fusion (RRF) for combining ranked lists
k = 60 # Standard RRF constant
# Extract ranked items from each output
all_items = {}
for output in scored:
items = self._extract_ranked_items(output.content)
for rank, item in enumerate(items, 1):
item_key = item.lower().strip()
if item_key not in all_items:
all_items[item_key] = {"display": item, "rrf_score": 0.0}
all_items[item_key]["rrf_score"] += 1.0 / (k + rank)
# Sort by RRF score descending
fused_ranking = sorted(
all_items.values(), key=lambda x: x["rrf_score"], reverse=True
)
# Format as numbered list
result_items = [
f"{i+1}. {item['display']}" for i, item in enumerate(fused_ranking[:10])
]
merged = "\n".join(result_items)
return AggregationResult(
final_content=merged,
method_used=AggregationMethod.RANKED_FUSION,
source_outputs=scored,
quality_score=max(s.overall_score for s in scored),
language=lang,
metadata={"fused_items": len(fused_ranking)},
)
def _resolve_conflict(
self, scored: list, query: str, lang: OutputLanguage
) -> AggregationResult:
"""
Handle conflicting outputs. Strategy:
1. Trust the higher-scoring model
2. Add a caveat about uncertainty
3. Flag for human review if the gap is small
"""
sorted_outputs = sorted(scored, key=lambda s: s.overall_score, reverse=True)
best = sorted_outputs[0]
runner_up = sorted_outputs[1] if len(sorted_outputs) > 1 else None
score_gap = (
best.overall_score - runner_up.overall_score if runner_up else 1.0
)
if score_gap < 0.1:
# Close call — add uncertainty caveat
caveat = (
"\n\n※ この情報は確認中です。正確な詳細はスタッフにお問い合わせください。"
if lang == OutputLanguage.JAPANESE
else "\n\nNote: This information is being verified. "
"Please check with our staff for exact details."
)
content = best.content + caveat
needs_review = True
else:
content = best.content
needs_review = False
return AggregationResult(
final_content=content,
method_used=AggregationMethod.CONFLICT_RESOLUTION,
source_outputs=scored,
quality_score=best.overall_score,
language=best.language,
conflict_detected=True,
conflict_details=(
f"Score gap: {score_gap:.3f}, "
f"best={best.model_id} ({best.overall_score:.3f}), "
f"runner_up={runner_up.model_id if runner_up else 'N/A'}"
),
metadata={"needs_human_review": needs_review, "score_gap": score_gap},
)
def _summarize_outputs(
self, scored: list, query: str, lang: OutputLanguage
) -> AggregationResult:
"""Create a summary that captures the consensus from all outputs."""
# Concatenate all outputs with source labels
combined = "\n\n".join(
f"[{s.model_id}]: {s.content}" for s in scored
)
# In production, this would be fed to a model for summarization.
# For now, pick the best and note the consensus.
best = max(scored, key=lambda s: s.overall_score)
return AggregationResult(
final_content=best.content,
method_used=AggregationMethod.SUMMARIZE,
source_outputs=scored,
quality_score=best.overall_score,
language=best.language,
metadata={"source_count": len(scored)},
)
# ------------------------------------------------------------------
# Language handling
# ------------------------------------------------------------------
def _detect_language(self, content: str) -> OutputLanguage:
"""Detect primary language of the content."""
jp_chars = sum(1 for ch in content if "\u3040" <= ch <= "\u9fff")
en_chars = sum(1 for ch in content if "a" <= ch.lower() <= "z")
total = jp_chars + en_chars
if total == 0:
return OutputLanguage.MIXED
jp_ratio = jp_chars / total
if jp_ratio > 0.6:
return OutputLanguage.JAPANESE
elif jp_ratio < 0.2:
return OutputLanguage.ENGLISH
return OutputLanguage.MIXED
def _normalize_language(
self, content: str, target: OutputLanguage
) -> str:
"""
Normalize language-specific formatting in the output.
Does not translate — just ensures consistent formatting.
"""
if target == OutputLanguage.JAPANESE:
# Ensure Japanese punctuation
content = content.replace("...", "…")
# Ensure full-width numbers in context of Japanese text
# (lightweight normalization, not full translation)
elif target == OutputLanguage.ENGLISH:
# Ensure English punctuation conventions
content = content.replace("。", ". ").replace("、", ", ")
content = content.replace("!", "!").replace("?", "?")
return content
# ------------------------------------------------------------------
# Utilities
# ------------------------------------------------------------------
def _auto_detect_method(
self, scored: list, query: str
) -> AggregationMethod:
"""Auto-detect the best aggregation method for this scenario."""
# If query asks for a list/ranking, use ranked fusion
ranking_keywords = [
"top", "best", "recommend", "ranking",
"おすすめ", "ランキング", "一覧", "ベスト",
]
if any(kw in query.lower() for kw in ranking_keywords):
return AggregationMethod.RANKED_FUSION
# If outputs are very different in content, merge them
if len(scored) >= 2:
overlap = self._content_overlap(scored[0].content, scored[1].content)
if overlap < 0.3:
return AggregationMethod.MERGE_COMPLEMENTARY
# Default: select best
return AggregationMethod.SELECT_BEST
def _detect_conflicts(self, scored: list) -> tuple:
"""Detect if model outputs contain contradictory information."""
if len(scored) < 2:
return False, None
# Check for contradictory signals
contradiction_pairs = [
("in stock", "out of stock"),
("在庫あり", "在庫なし"),
("available", "unavailable"),
("yes", "no"),
("recommended", "not recommended"),
]
for i, output_a in enumerate(scored):
for output_b in scored[i + 1:]:
for pos, neg in contradiction_pairs:
a_has_pos = pos in output_a.content.lower()
b_has_neg = neg in output_b.content.lower()
a_has_neg = neg in output_a.content.lower()
b_has_pos = pos in output_b.content.lower()
if (a_has_pos and b_has_neg) or (a_has_neg and b_has_pos):
detail = (
f"Contradiction: {output_a.model_id} says "
f"'{pos if a_has_pos else neg}' but "
f"{output_b.model_id} says "
f"'{neg if b_has_neg else pos}'"
)
return True, detail
return False, None
def _content_overlap(self, content_a: str, content_b: str) -> float:
"""Compute word-level overlap between two texts."""
words_a = set(content_a.lower().split())
words_b = set(content_b.lower().split())
if not words_a or not words_b:
return 0.0
intersection = len(words_a & words_b)
union = len(words_a | words_b)
return intersection / union if union > 0 else 0.0
def _extract_ranked_items(self, content: str) -> list:
"""Extract numbered or bulleted list items from model output."""
items = []
for line in content.split("\n"):
line = line.strip()
# Match "1. Item", "- Item", "* Item", "・Item"
match = re.match(r"^(?:\d+[.)]\s*|[-*・]\s*)(.+)$", line)
if match:
items.append(match.group(1).strip())
return items
def get_aggregation_stats(self) -> dict:
"""Return aggregation statistics."""
if not self._aggregation_history:
return {"total_aggregations": 0}
method_counts = {}
conflict_count = 0
for result in self._aggregation_history:
method = result.method_used.value
method_counts[method] = method_counts.get(method, 0) + 1
if result.conflict_detected:
conflict_count += 1
return {
"total_aggregations": len(self._aggregation_history),
"method_distribution": method_counts,
"conflict_rate": round(
conflict_count / len(self._aggregation_history), 3
),
"avg_quality": round(
sum(r.quality_score for r in self._aggregation_history)
/ len(self._aggregation_history),
3,
),
}
6. Comparison Tables
6.1 Ensemble Strategy Comparison
| Strategy | Models Invoked | Latency | Cost | Quality | Best For |
|---|---|---|---|---|---|
| Voting | All (2) | Parallel — bounded by slowest | 2x | Moderate | Classification tasks (genre, sentiment) |
| Weighted Average | All (2) | Parallel — bounded by slowest | 2x | Good | Scoring / ranking tasks |
| Stacking | All + meta-learner (3) | Sequential (base + meta) | ~2.5x | High | Complex responses needing synthesis |
| MoE | 1 (selected expert) | Single model | 1x | Good-High | General routing (most cost-efficient) |
| Parallel Best | All (2) | Parallel — bounded by slowest | 2x | Highest | Quality-critical responses |
6.2 Selection Framework Mode Comparison
| Mode | Quality Weight | Cost Weight | Latency Weight | Sonnet Usage | Daily Cost (1M msgs) |
|---|---|---|---|---|---|
| Cost Optimized | 0.25 | 0.55 | 0.20 | ~5% | ~$300 |
| Balanced | 0.40 | 0.30 | 0.30 | ~20% | ~$635 |
| Quality First | 0.60 | 0.10 | 0.30 | ~50% | ~$2,700 |
| Latency Critical | 0.20 | 0.15 | 0.65 | ~8% | ~$400 |
6.3 Aggregation Method Comparison
| Method | When to Use | Handles Conflicts | Output Quality | Latency Impact |
|---|---|---|---|---|
| Select Best | Single best answer needed | No (picks one) | Depends on best model | None |
| Merge Complementary | Models cover different aspects | Partial | Higher (combined info) | Minimal processing |
| Ranked Fusion | Recommendation / list queries | N/A | High (consensus ranking) | Minimal processing |
| Conflict Resolution | Models contradict each other | Yes | Moderate (flags uncertainty) | May add caveat |
| Summarize | Need consensus from all models | Partial | High | Needs summarization step |
7. Cost Analysis at Scale
7.1 Monthly Cost Projection (1M messages/day, 30 days)
| Strategy | Model Invocations/day | Daily Cost | Monthly Cost | Quality Rating |
|---|---|---|---|---|
| Haiku Only | 1M Haiku | $50 | $1,500 | Adequate (78%) |
| MoE Routing (recommended) | 900K Haiku + 100K Sonnet | $550 | $16,500 | Good (85%) |
| Balanced Routing | 800K Haiku + 200K Sonnet | $1,050 | $31,500 | Very Good (88%) |
| Parallel Best | 1M Haiku + 1M Sonnet | $5,050 | $151,500 | Excellent (93%) |
| Sonnet Only | 1M Sonnet | $5,000 | $150,000 | Excellent (92%) |
Key insight: MoE routing delivers 85% quality at $16,500/month — a 91% cost reduction compared to Sonnet-only ($150K), with only a 7% quality drop. This is the recommended default for MangaAssist.
7.2 Break-Even Analysis
| Quality Target | Cheapest Strategy | Monthly Cost | Cost per Message |
|---|---|---|---|
| 78%+ (baseline) | Haiku Only | $1,500 | $0.00005 |
| 85%+ (good) | MoE Routing | $16,500 | $0.00055 |
| 88%+ (very good) | Balanced Routing | $31,500 | $0.00105 |
| 92%+ (excellent) | Parallel Best | $151,500 | $0.00505 |
8. Key Takeaways
-
MoE routing is the most cost-effective strategy for MangaAssist — it achieves 85% quality at ~11% of the Sonnet-only cost by routing simple queries to Haiku and complex queries to Sonnet.
-
Parallel Best is the quality ceiling — querying both models and selecting the best response yields the highest quality but doubles cost. Reserve this for premium-tier users.
-
Stacking adds a meta-learner layer — feeding both model outputs into a cheap Haiku call for synthesis can improve complex responses, but adds sequential latency (not ideal under 3-second SLA).
-
Aggregation must handle JP/EN mixing — the AggregationEngine normalizes language, detects conflicts, and ensures consistent tone for the manga store context.
-
Selection framework weights are tunable per user tier — free users get cost-optimized routing, premium users get quality-first, and the system adapts in real-time to latency conditions.
-
Live latency tracking prevents SLA violations — the SelectionFramework monitors actual Bedrock response times and automatically fails over to Haiku when Sonnet latency exceeds the 2.6-second model budget.
-
Conflict detection prevents contradictory answers — the AggregationEngine flags disagreements (e.g., stock status) and either defers to the higher-quality model or adds an uncertainty caveat.
-
Caching at the ensemble level dramatically reduces cost — storing ensemble results in ElastiCache Redis avoids redundant multi-model invocations for repeated queries (common in FAQ-heavy chatbot traffic).
-
Cost routing alone saves ~87% versus uniform Sonnet — even without quality scoring, simply routing by request tier (simple/medium/complex) yields $635/day vs. $5,000/day.
-
Monitor ensemble drift over time — model quality, latency profiles, and traffic patterns change. Re-evaluate routing weights monthly and adjust the complexity classifier as needed.