AWS AIP-C01 Task 4.2 → Skill 4.2.2: Optimize retrieval mechanisms for FM-augmented applications
System: MangaAssist e-commerce chatbot — Bedrock Claude 3 Sonnet/Haiku, OpenSearch Serverless (HNSW k-NN), DynamoDB, ECS Fargate, ElastiCache Redis
Target: Total retrieval latency < 200ms (query preprocessing + search + re-ranking) at p95
Skill Mapping
| AWS AIP-C01 Element |
Coverage |
| Task 4.2 |
Optimize application performance for FM workloads |
| Skill 4.2.2 |
Optimize retrieval mechanisms to improve FM-augmented application performance |
| Key Focus |
Index optimization, query preprocessing, hybrid search, custom scoring, re-ranking pipelines |
| MangaAssist Context |
OpenSearch Serverless stores ~100K manga product embeddings (1536-dim Titan v2); hybrid BM25 + kNN retrieval feeds Claude 3 RAG pipeline; Japanese + English bilingual queries |
mindmap
root((Retrieval<br/>Performance))
Index Optimization
HNSW Parameters
ef_construction tuning
m (graph connectivity)
ef_search at query time
Segment Merge Policies
Force merge schedule
Max segment size
Field Mapping Optimization
Keyword vs text
doc_values for filters
Exclude unnecessary fields
Shard Sizing
OCU-to-shard ratio
Hot/warm tiering
Query Preprocessing
Query Expansion
Manga term synonyms
Genre abbreviations
Japanese Tokenization
kuromoji analyzer
Reading form normalization
Kanji/Kana/Romaji unification
Synonym Expansion
shonen/shounen/少年
manga/漫画/マンガ
Stopword Removal
Language-aware lists
Domain-specific stops
Hybrid Search
BM25 Keyword Search
Exact title matching
Author name lookup
ISBN/catalog ID
kNN Vector Search
Semantic similarity
Concept-level matching
Cross-language retrieval
Score Fusion
Reciprocal Rank Fusion
Weighted linear combination
Learned combination
Custom Scoring
Recency Boost
New release window
Trending decay
Popularity Boost
Sales volume signal
Rating-weighted score
Intent-Aware Scoring
Browse vs Buy vs Research
Recommendation vs Search
Re-ranking Pipeline
Retrieve Top-50
Cross-Encoder Re-rank
LLM-Based Re-rank
Final Top-5 Selection
Multi-Index Strategy
manga-products index
manga-reviews index
manga-authors index
Federated search orchestration
Architecture — MangaAssist Retrieval Pipeline
graph TB
subgraph QueryIngress["Query Ingress"]
style QueryIngress fill:#1a1a2e,stroke:#16213e,color:#fff
USER["Customer Query<br/>'ワンピースみたいな冒険漫画'"]
APIGW["API Gateway<br/>WebSocket"]
ECS["ECS Fargate<br/>Orchestrator"]
end
subgraph PreprocessingStage["Query Preprocessing (15ms budget)"]
style PreprocessingStage fill:#0f3460,stroke:#16213e,color:#fff
LANG_DETECT["Language Detection<br/>JA / EN / Mixed"]
TOKENIZER["Japanese Tokenizer<br/>kuromoji + custom dict"]
SYNONYM["Synonym Expansion<br/>shonen ↔ 少年"]
QUERY_EXPAND["Query Expansion<br/>Genre + Theme terms"]
INTENT["Intent Classifier<br/>Browse / Buy / Research"]
end
subgraph EmbeddingStage["Embedding (25ms budget)"]
style EmbeddingStage fill:#533483,stroke:#16213e,color:#fff
CACHE_CHECK["ElastiCache Redis<br/>Embedding Cache"]
TITAN["Bedrock Titan<br/>Embeddings v2<br/>1536-dim"]
end
subgraph SearchStage["Hybrid Search (80ms budget)"]
style SearchStage fill:#e94560,stroke:#16213e,color:#fff
BM25["BM25 Keyword Search<br/>Exact titles, authors, ISBNs"]
KNN["kNN Vector Search<br/>HNSW ef_search=256<br/>Semantic similarity"]
FUSION["Score Fusion<br/>Reciprocal Rank Fusion<br/>α=0.6 vector, β=0.4 keyword"]
end
subgraph RerankStage["Re-ranking (60ms budget)"]
style RerankStage fill:#0a1931,stroke:#16213e,color:#fff
TOP50["Top-50 Candidates"]
SCORER["Custom Scorer<br/>Recency + Popularity<br/>+ Intent boost"]
CROSS_ENC["Cross-Encoder Reranker<br/>or Claude Haiku Reranker"]
TOP5["Final Top-5<br/>Context for Claude 3"]
end
subgraph IndexLayer["OpenSearch Serverless"]
style IndexLayer fill:#ff9900,stroke:#16213e,color:#000
IDX_PRODUCTS["manga-products<br/>100K docs, 1536-dim<br/>HNSW m=16, ef=512"]
IDX_REVIEWS["manga-reviews<br/>500K docs<br/>BM25 + kNN"]
IDX_AUTHORS["manga-authors<br/>15K docs<br/>Keyword + kNN"]
end
USER --> APIGW --> ECS
ECS --> LANG_DETECT --> TOKENIZER --> SYNONYM --> QUERY_EXPAND
QUERY_EXPAND --> INTENT
INTENT --> CACHE_CHECK
CACHE_CHECK -->|hit| KNN
CACHE_CHECK -->|miss| TITAN --> KNN
QUERY_EXPAND --> BM25
BM25 --> IDX_PRODUCTS & IDX_REVIEWS & IDX_AUTHORS
KNN --> IDX_PRODUCTS & IDX_REVIEWS & IDX_AUTHORS
IDX_PRODUCTS --> FUSION
IDX_REVIEWS --> FUSION
IDX_AUTHORS --> FUSION
FUSION --> TOP50 --> SCORER --> CROSS_ENC --> TOP5
TOP5 --> ECS
Latency Budget Breakdown
gantt
title Retrieval Pipeline Latency Budget (p95 Target: < 200ms)
dateFormat X
axisFormat %L ms
section Preprocessing
Language Detection :0, 3
Tokenization + Synonyms :3, 10
Query Expansion + Intent :10, 15
section Embedding
Cache Check (Redis) :15, 17
Titan Embedding (miss) :17, 40
section Search
BM25 Keyword (parallel) :40, 90
kNN Vector (parallel) :40, 100
Score Fusion :100, 120
section Re-ranking
Custom Scoring :120, 140
Cross-Encoder Top-50→5 :140, 180
section Delivery
Result Assembly :180, 190
Return to Orchestrator :190, 195
Index Optimization for OpenSearch Serverless
HNSW Parameter Tuning
HNSW (Hierarchical Navigable Small World) graphs are the backbone of vector search in OpenSearch. Choosing the right parameters directly controls the recall-latency tradeoff.
| Parameter |
Description |
MangaAssist Value |
Rationale |
ef_construction |
Graph build-time expansion factor; higher = better recall, slower indexing |
512 |
Product catalog updates are batched nightly; we can afford slower indexing for higher recall |
m |
Max bi-directional links per node; higher = better recall, more memory |
16 |
Balance between memory cost (OCU) and recall@10 > 0.95 |
ef_search |
Query-time expansion factor; higher = better recall, slower search |
256 |
Tuned via benchmark: recall@10=0.97 at 35ms p95 per shard |
space_type |
Distance metric |
cosinesimil |
Titan v2 embeddings are L2-normalized; cosine similarity is standard |
engine |
kNN engine |
nmslib |
Mature HNSW implementation; faiss alternative for filtered search |
Index Mapping Configuration
{
"settings": {
"index": {
"knn": true,
"knn.algo_param.ef_search": 256,
"number_of_shards": 4,
"number_of_replicas": 1
},
"analysis": {
"analyzer": {
"manga_ja_analyzer": {
"type": "custom",
"tokenizer": "kuromoji_tokenizer",
"filter": [
"kuromoji_baseform",
"kuromoji_part_of_speech",
"ja_stop",
"kuromoji_stemmer",
"lowercase"
]
},
"manga_en_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase", "english_stop", "english_stemmer"]
}
}
}
},
"mappings": {
"properties": {
"manga_id": { "type": "keyword" },
"title_ja": {
"type": "text",
"analyzer": "manga_ja_analyzer",
"fields": { "keyword": { "type": "keyword" } }
},
"title_en": {
"type": "text",
"analyzer": "manga_en_analyzer",
"fields": { "keyword": { "type": "keyword" } }
},
"description_embedding": {
"type": "knn_vector",
"dimension": 1536,
"method": {
"name": "hnsw",
"space_type": "cosinesimil",
"engine": "nmslib",
"parameters": {
"ef_construction": 512,
"m": 16
}
}
},
"genre": { "type": "keyword" },
"author": {
"type": "text",
"fields": { "keyword": { "type": "keyword" } }
},
"release_date": { "type": "date" },
"avg_rating": { "type": "float" },
"sales_rank": { "type": "integer" },
"price_jpy": { "type": "integer" },
"in_stock": { "type": "boolean" },
"tags": { "type": "keyword" },
"volume_count": { "type": "integer" },
"publisher": { "type": "keyword" },
"demographic": { "type": "keyword" }
}
}
}
Segment Merge Policies
OpenSearch Serverless manages segment merges automatically, but understanding the impact is critical for retrieval latency.
| Policy Aspect |
Recommendation |
MangaAssist Application |
| Force merge after bulk indexing |
Trigger _forcemerge to 1 segment per shard after nightly catalog sync |
Reduces search latency by ~15% on the morning after catalog updates |
| Max segment size |
Target segments < 5 GB for HNSW indexes |
With 100K docs at ~6KB/doc (embedding + metadata), total ~600MB fits easily |
| Refresh interval |
Set to 30s for near-real-time; 60s acceptable for product catalog |
MangaAssist uses 30s — new manga appear within 30s of DynamoDB write |
| Avoid small segments |
Bulk indexing with batch size 500-1000 |
Prevents segment explosion during catalog imports |
Multi-Index Strategy
graph LR
subgraph Indexes["OpenSearch Serverless Collection"]
IDX1["manga-products<br/>100K docs<br/>Title, Description, Embedding<br/>Genre, Author, Price, Rating"]
IDX2["manga-reviews<br/>500K docs<br/>Review text, Embedding<br/>Rating, Helpfulness"]
IDX3["manga-authors<br/>15K docs<br/>Bio, Embedding<br/>Notable works, Genre specialty"]
end
QUERY["Preprocessed Query"] --> IDX1 & IDX2 & IDX3
IDX1 -->|Top-20 products| MERGE["Federated<br/>Result Merge"]
IDX2 -->|Top-10 reviews| MERGE
IDX3 -->|Top-5 authors| MERGE
MERGE --> RERANK["Re-rank<br/>Top-50 → Top-5"]
| Index |
Document Count |
Primary Use Case |
Search Type |
Latency Target |
manga-products |
100K |
Product discovery, recommendations |
Hybrid (BM25 + kNN) |
< 50ms |
manga-reviews |
500K |
Review summarization, sentiment context |
Hybrid (BM25 + kNN) |
< 60ms |
manga-authors |
15K |
Author information, similar-author discovery |
Keyword + kNN |
< 30ms |
Query Preprocessing Pipeline
Japanese Language Handling
MangaAssist serves a JP manga audience — queries arrive in Japanese, English, Romaji, or mixed scripts. The preprocessing pipeline must normalize all forms before search.
graph LR
RAW["Raw Query<br/>'ワンピースみたいな<br/>adventure manga'"] --> DETECT["Language<br/>Detection"]
DETECT --> JA["Japanese Path<br/>kuromoji tokenizer"]
DETECT --> EN["English Path<br/>standard tokenizer"]
DETECT --> MIXED["Mixed Path<br/>split → both paths"]
JA --> NORM["Normalization<br/>全角→半角<br/>カタカナ→ひらがな"]
EN --> STEM["Stemming +<br/>Lowercasing"]
MIXED --> NORM & STEM
NORM --> SYN["Synonym<br/>Expansion"]
STEM --> SYN
SYN --> EXPAND["Query<br/>Expansion"]
EXPAND --> OUTPUT["Processed Query<br/>Tokens + Synonyms<br/>+ Expanded terms"]
Python — QueryPreprocessor
"""
MangaAssist Query Preprocessor
Handles Japanese/English bilingual queries for OpenSearch retrieval.
"""
import re
import unicodedata
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
import boto3
from opensearchpy import OpenSearch
class QueryLanguage(Enum):
JAPANESE = "ja"
ENGLISH = "en"
MIXED = "mixed"
class UserIntent(Enum):
BROWSE = "browse" # Exploring, no specific title
BUY = "buy" # Looking for a specific product
RESEARCH = "research" # Comparing, reading reviews
RECOMMEND = "recommend" # "Something like X"
@dataclass
class ProcessedQuery:
"""Output of the query preprocessing pipeline."""
original: str
language: QueryLanguage
intent: UserIntent
tokens_ja: list[str] = field(default_factory=list)
tokens_en: list[str] = field(default_factory=list)
synonyms: list[str] = field(default_factory=list)
expanded_terms: list[str] = field(default_factory=list)
normalized_text: str = ""
search_text_bm25: str = "" # For BM25 keyword search
embedding_text: str = "" # For vector embedding
# Manga-domain synonym map: canonical → all variants
MANGA_SYNONYMS: dict[str, list[str]] = {
"shonen": ["shounen", "少年", "しょうねん", "ショウネン"],
"shojo": ["shoujo", "少女", "しょうじょ", "ショウジョ"],
"seinen": ["青年", "せいねん", "セイネン"],
"josei": ["女性", "じょせい", "ジョセイ"],
"isekai": ["異世界", "いせかい", "イセカイ"],
"manga": ["漫画", "マンガ", "まんが"],
"anime": ["アニメ", "あにめ"],
"one piece": ["ワンピース", "わんぴーす"],
"naruto": ["ナルト", "なると"],
"attack on titan": ["進撃の巨人", "しんげきのきょじん"],
"demon slayer": ["鬼滅の刃", "きめつのやいば"],
}
# Intent detection keywords
INTENT_KEYWORDS: dict[UserIntent, list[str]] = {
UserIntent.BUY: [
"buy", "purchase", "order", "price", "在庫", "買う",
"購入", "注文", "値段", "how much", "いくら",
],
UserIntent.RECOMMEND: [
"like", "similar", "recommend", "suggest", "みたいな",
"おすすめ", "似てる", "のような", "っぽい",
],
UserIntent.RESEARCH: [
"review", "compare", "rating", "レビュー", "比較",
"評価", "difference", "違い", "vs",
],
UserIntent.BROWSE: [
"show", "list", "what", "explore", "見せて",
"一覧", "何", "どんな",
],
}
class QueryPreprocessor:
"""
Bilingual query preprocessing for MangaAssist.
Normalizes Japanese/English text, expands synonyms, detects intent.
Latency budget: 15ms total for the full preprocessing pipeline.
"""
def __init__(self, custom_synonyms: Optional[dict] = None):
self.synonyms = {**MANGA_SYNONYMS, **(custom_synonyms or {})}
# Build reverse lookup: any variant → canonical + all variants
self._reverse_synonyms: dict[str, list[str]] = {}
for canonical, variants in self.synonyms.items():
all_forms = [canonical] + variants
for form in all_forms:
self._reverse_synonyms[form.lower()] = all_forms
def process(self, raw_query: str) -> ProcessedQuery:
"""Full preprocessing pipeline: detect → normalize → expand → classify."""
language = self._detect_language(raw_query)
normalized = self._normalize(raw_query)
intent = self._detect_intent(normalized, language)
tokens_ja, tokens_en = self._tokenize(normalized, language)
synonyms = self._expand_synonyms(tokens_ja + tokens_en)
expanded = self._expand_query(tokens_ja + tokens_en, intent)
# Build search-ready text
all_terms = tokens_ja + tokens_en + synonyms + expanded
search_text = " ".join(dict.fromkeys(all_terms)) # dedupe, preserve order
embedding_text = self._build_embedding_text(
normalized, synonyms, intent
)
return ProcessedQuery(
original=raw_query,
language=language,
intent=intent,
tokens_ja=tokens_ja,
tokens_en=tokens_en,
synonyms=synonyms,
expanded_terms=expanded,
normalized_text=normalized,
search_text_bm25=search_text,
embedding_text=embedding_text,
)
# ---- Language Detection ----
def _detect_language(self, text: str) -> QueryLanguage:
"""Detect query language based on Unicode script analysis."""
has_cjk = bool(re.search(r'[\u3000-\u9fff\uf900-\ufaff]', text))
has_latin = bool(re.search(r'[a-zA-Z]{2,}', text))
if has_cjk and has_latin:
return QueryLanguage.MIXED
elif has_cjk:
return QueryLanguage.JAPANESE
return QueryLanguage.ENGLISH
# ---- Normalization ----
def _normalize(self, text: str) -> str:
"""
Normalize Unicode forms for consistent matching.
- NFKC normalization (fullwidth → halfwidth, etc.)
- Strip excess whitespace
"""
text = unicodedata.normalize("NFKC", text)
text = re.sub(r'\s+', ' ', text).strip()
return text
# ---- Tokenization ----
def _tokenize(
self, text: str, language: QueryLanguage
) -> tuple[list[str], list[str]]:
"""
Tokenize based on detected language.
Japanese: character-level bigrams + known terms (production would use kuromoji).
English: whitespace split + lowercasing.
"""
tokens_ja: list[str] = []
tokens_en: list[str] = []
# Split into Japanese and English segments
ja_segments = re.findall(r'[\u3000-\u9fff\uf900-\ufaff]+', text)
en_segments = re.findall(r'[a-zA-Z]+', text)
for seg in ja_segments:
# In production: call kuromoji via OpenSearch _analyze API
# Here: simple character bigrams as fallback
tokens_ja.append(seg)
if len(seg) > 2:
for i in range(len(seg) - 1):
tokens_ja.append(seg[i:i+2])
for seg in en_segments:
tokens_en.append(seg.lower())
return tokens_ja, tokens_en
# ---- Synonym Expansion ----
def _expand_synonyms(self, tokens: list[str]) -> list[str]:
"""Expand tokens using the manga-domain synonym map."""
expanded: list[str] = []
for token in tokens:
key = token.lower()
if key in self._reverse_synonyms:
for syn in self._reverse_synonyms[key]:
if syn.lower() != key:
expanded.append(syn)
return list(dict.fromkeys(expanded)) # dedupe
# ---- Intent Detection ----
def _detect_intent(
self, text: str, language: QueryLanguage
) -> UserIntent:
"""Classify user intent from query keywords."""
text_lower = text.lower()
scores: dict[UserIntent, int] = {intent: 0 for intent in UserIntent}
for intent, keywords in INTENT_KEYWORDS.items():
for kw in keywords:
if kw in text_lower:
scores[intent] += 1
best = max(scores, key=scores.get)
return best if scores[best] > 0 else UserIntent.BROWSE
# ---- Query Expansion ----
def _expand_query(
self, tokens: list[str], intent: UserIntent
) -> list[str]:
"""
Add contextual terms based on intent.
- RECOMMEND: add genre/theme terms
- BUY: add availability terms
- RESEARCH: add review/comparison terms
"""
expansion: list[str] = []
if intent == UserIntent.RECOMMEND:
expansion.extend(["similar", "おすすめ", "recommendation"])
elif intent == UserIntent.BUY:
expansion.extend(["in_stock", "price", "購入可能"])
elif intent == UserIntent.RESEARCH:
expansion.extend(["review", "rating", "レビュー", "評価"])
return expansion
# ---- Embedding Text Construction ----
def _build_embedding_text(
self, normalized: str, synonyms: list[str], intent: UserIntent
) -> str:
"""
Build text optimized for Titan embedding generation.
Prefix with intent context for better semantic matching.
"""
intent_prefix = {
UserIntent.BROWSE: "manga search:",
UserIntent.BUY: "manga product to purchase:",
UserIntent.RECOMMEND: "manga recommendation similar to:",
UserIntent.RESEARCH: "manga review and comparison:",
}
prefix = intent_prefix.get(intent, "manga search:")
synonym_str = " ".join(synonyms[:5]) # limit synonym injection
return f"{prefix} {normalized} {synonym_str}".strip()
Hybrid Search Implementation
Why Hybrid Search for MangaAssist?
| Query Type |
Best Search Method |
Example |
| Exact title lookup |
BM25 keyword |
"鬼滅の刃 23巻" (Demon Slayer Vol 23) |
| Semantic concept |
kNN vector |
"adventure manga with pirates" |
| Author + genre |
BM25 keyword |
"Eiichiro Oda shonen" |
| Vague recommendation |
kNN vector |
"something dark and philosophical like Death Note" |
| Mixed specific + vague |
Hybrid BM25 + kNN |
"manga like ワンピース with good art" |
Pure BM25 fails on semantic queries ("adventure manga with pirates" does not literally match "One Piece"). Pure kNN fails on exact identifiers (ISBN, volume numbers, exact titles). Hybrid search combines both strengths.
Python — HybridSearchManager
"""
MangaAssist Hybrid Search Manager
Combines BM25 keyword search + kNN vector search with score fusion.
Target: < 80ms for the search stage at p95.
"""
import time
import hashlib
import json
from dataclasses import dataclass, field
from typing import Any, Optional
import boto3
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
@dataclass
class SearchResult:
"""A single search result from any search method."""
manga_id: str
title_ja: str
title_en: str
score: float
source: str # "bm25", "knn", or "hybrid"
genre: str = ""
avg_rating: float = 0.0
release_date: str = ""
metadata: dict = field(default_factory=dict)
@dataclass
class HybridSearchResponse:
"""Combined response from hybrid search pipeline."""
results: list[SearchResult]
bm25_count: int
knn_count: int
fusion_method: str
total_latency_ms: float
bm25_latency_ms: float
knn_latency_ms: float
fusion_latency_ms: float
class HybridSearchManager:
"""
Orchestrates hybrid BM25 + kNN search against OpenSearch Serverless.
Architecture:
1. Execute BM25 and kNN queries in parallel
2. Fuse scores using Reciprocal Rank Fusion (RRF) or weighted linear
3. Apply custom business scoring (recency, popularity, intent)
4. Return top-K candidates for re-ranking
Latency budget: 80ms total (BM25 + kNN parallel, then fusion).
"""
def __init__(
self,
opensearch_endpoint: str,
region: str = "ap-northeast-1",
product_index: str = "manga-products",
review_index: str = "manga-reviews",
author_index: str = "manga-authors",
vector_field: str = "description_embedding",
embedding_dimension: int = 1536,
):
self.region = region
self.product_index = product_index
self.review_index = review_index
self.author_index = author_index
self.vector_field = vector_field
self.embedding_dimension = embedding_dimension
# AWS auth for OpenSearch Serverless
credentials = boto3.Session().get_credentials()
self.awsauth = AWS4Auth(
credentials.access_key,
credentials.secret_key,
region,
"aoss",
session_token=credentials.token,
)
self.client = OpenSearch(
hosts=[{"host": opensearch_endpoint, "port": 443}],
http_auth=self.awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
timeout=5,
)
# Bedrock client for Titan embeddings
self.bedrock = boto3.client(
"bedrock-runtime", region_name=region
)
# ---- Main Hybrid Search ----
def hybrid_search(
self,
query_text: str,
query_embedding: list[float],
intent: str = "browse",
top_k: int = 50,
bm25_weight: float = 0.4,
knn_weight: float = 0.6,
fusion_method: str = "rrf",
filters: Optional[dict] = None,
) -> HybridSearchResponse:
"""
Execute hybrid BM25 + kNN search with score fusion.
Args:
query_text: Preprocessed search text for BM25.
query_embedding: 1536-dim vector from Titan Embeddings.
intent: User intent (browse/buy/recommend/research).
top_k: Number of candidates to retrieve before re-ranking.
bm25_weight: Weight for keyword score in linear fusion.
knn_weight: Weight for vector score in linear fusion.
fusion_method: "rrf" or "linear".
filters: Optional OpenSearch filters (genre, price, etc.).
Returns:
HybridSearchResponse with fused results and latency metrics.
"""
t_start = time.perf_counter()
# --- Parallel execution of BM25 and kNN ---
# In production, use asyncio or ThreadPoolExecutor for true parallelism
t_bm25_start = time.perf_counter()
bm25_results = self._bm25_search(
query_text, top_k=top_k, filters=filters
)
bm25_latency = (time.perf_counter() - t_bm25_start) * 1000
t_knn_start = time.perf_counter()
knn_results = self._knn_search(
query_embedding, top_k=top_k, filters=filters
)
knn_latency = (time.perf_counter() - t_knn_start) * 1000
# --- Score fusion ---
t_fusion_start = time.perf_counter()
if fusion_method == "rrf":
fused = self._reciprocal_rank_fusion(
bm25_results, knn_results, k=60
)
else:
fused = self._weighted_linear_fusion(
bm25_results, knn_results,
bm25_weight=bm25_weight,
knn_weight=knn_weight,
)
fusion_latency = (time.perf_counter() - t_fusion_start) * 1000
# --- Apply business scoring ---
scored = self._apply_business_scoring(fused, intent)
# --- Sort and truncate ---
scored.sort(key=lambda r: r.score, reverse=True)
top_results = scored[:top_k]
total_latency = (time.perf_counter() - t_start) * 1000
return HybridSearchResponse(
results=top_results,
bm25_count=len(bm25_results),
knn_count=len(knn_results),
fusion_method=fusion_method,
total_latency_ms=round(total_latency, 2),
bm25_latency_ms=round(bm25_latency, 2),
knn_latency_ms=round(knn_latency, 2),
fusion_latency_ms=round(fusion_latency, 2),
)
# ---- BM25 Keyword Search ----
def _bm25_search(
self,
query_text: str,
top_k: int = 50,
filters: Optional[dict] = None,
) -> list[SearchResult]:
"""
BM25 keyword search across title, description, author fields.
Uses multi_match with cross_fields for best coverage.
"""
must_clause: list[dict] = [
{
"multi_match": {
"query": query_text,
"fields": [
"title_ja^3",
"title_en^3",
"title_ja.keyword^5",
"title_en.keyword^5",
"author^2",
"description",
"tags^1.5",
"genre^1.5",
],
"type": "cross_fields",
"operator": "or",
"minimum_should_match": "30%",
}
}
]
filter_clause = self._build_filters(filters)
body = {
"size": top_k,
"query": {
"bool": {
"must": must_clause,
"filter": filter_clause,
}
},
"_source": [
"manga_id", "title_ja", "title_en", "genre",
"avg_rating", "release_date", "author",
"sales_rank", "price_jpy",
],
}
resp = self.client.search(index=self.product_index, body=body)
return self._parse_results(resp, source="bm25")
# ---- kNN Vector Search ----
def _knn_search(
self,
query_embedding: list[float],
top_k: int = 50,
filters: Optional[dict] = None,
) -> list[SearchResult]:
"""
HNSW k-NN vector search for semantic similarity.
Uses ef_search=256 for high recall at acceptable latency.
"""
knn_clause: dict[str, Any] = {
self.vector_field: {
"vector": query_embedding,
"k": top_k,
}
}
filter_clause = self._build_filters(filters)
if filter_clause:
knn_clause[self.vector_field]["filter"] = {
"bool": {"filter": filter_clause}
}
body: dict[str, Any] = {
"size": top_k,
"query": {"knn": knn_clause},
"_source": [
"manga_id", "title_ja", "title_en", "genre",
"avg_rating", "release_date", "author",
"sales_rank", "price_jpy",
],
}
resp = self.client.search(index=self.product_index, body=body)
return self._parse_results(resp, source="knn")
# ---- Score Fusion Methods ----
def _reciprocal_rank_fusion(
self,
bm25_results: list[SearchResult],
knn_results: list[SearchResult],
k: int = 60,
) -> list[SearchResult]:
"""
Reciprocal Rank Fusion (RRF) — rank-based fusion that is
robust to score scale differences between BM25 and kNN.
RRF(d) = sum( 1 / (k + rank_i(d)) ) for each ranker i
k=60 is the standard smoothing constant (from the original paper).
"""
rrf_scores: dict[str, float] = {}
result_map: dict[str, SearchResult] = {}
for rank, result in enumerate(bm25_results):
rrf_scores[result.manga_id] = rrf_scores.get(
result.manga_id, 0.0
) + 1.0 / (k + rank + 1)
result_map[result.manga_id] = result
for rank, result in enumerate(knn_results):
rrf_scores[result.manga_id] = rrf_scores.get(
result.manga_id, 0.0
) + 1.0 / (k + rank + 1)
if result.manga_id not in result_map:
result_map[result.manga_id] = result
fused: list[SearchResult] = []
for manga_id, score in rrf_scores.items():
r = result_map[manga_id]
fused.append(SearchResult(
manga_id=r.manga_id,
title_ja=r.title_ja,
title_en=r.title_en,
score=score,
source="hybrid",
genre=r.genre,
avg_rating=r.avg_rating,
release_date=r.release_date,
metadata=r.metadata,
))
return fused
def _weighted_linear_fusion(
self,
bm25_results: list[SearchResult],
knn_results: list[SearchResult],
bm25_weight: float = 0.4,
knn_weight: float = 0.6,
) -> list[SearchResult]:
"""
Weighted linear combination of normalized scores.
Requires min-max normalization to bring BM25 and kNN
scores onto the same [0, 1] scale.
"""
def normalize(results: list[SearchResult]) -> list[SearchResult]:
if not results:
return results
scores = [r.score for r in results]
min_s, max_s = min(scores), max(scores)
rng = max_s - min_s if max_s != min_s else 1.0
for r in results:
r.score = (r.score - min_s) / rng
return results
bm25_norm = normalize(list(bm25_results))
knn_norm = normalize(list(knn_results))
score_map: dict[str, float] = {}
result_map: dict[str, SearchResult] = {}
for r in bm25_norm:
score_map[r.manga_id] = bm25_weight * r.score
result_map[r.manga_id] = r
for r in knn_norm:
score_map[r.manga_id] = score_map.get(
r.manga_id, 0.0
) + knn_weight * r.score
if r.manga_id not in result_map:
result_map[r.manga_id] = r
fused: list[SearchResult] = []
for manga_id, score in score_map.items():
r = result_map[manga_id]
fused.append(SearchResult(
manga_id=r.manga_id,
title_ja=r.title_ja,
title_en=r.title_en,
score=score,
source="hybrid",
genre=r.genre,
avg_rating=r.avg_rating,
release_date=r.release_date,
metadata=r.metadata,
))
return fused
# ---- Business Scoring ----
def _apply_business_scoring(
self,
results: list[SearchResult],
intent: str,
) -> list[SearchResult]:
"""
Apply MangaAssist business rules on top of retrieval scores.
- Recency boost for new releases (last 30 days: +20%)
- Popularity boost for high-rated titles (>= 4.5: +15%)
- Intent-aware adjustments
"""
import datetime
now = datetime.date.today()
thirty_days_ago = now - datetime.timedelta(days=30)
for r in results:
boost = 1.0
# Recency boost
if r.release_date:
try:
release = datetime.date.fromisoformat(r.release_date[:10])
if release >= thirty_days_ago:
boost *= 1.20 # +20% for new releases
except (ValueError, TypeError):
pass
# Popularity boost
if r.avg_rating >= 4.5:
boost *= 1.15 # +15% for highly rated
elif r.avg_rating >= 4.0:
boost *= 1.05 # +5% for well rated
# Intent-aware boost
if intent == "buy":
# Favor in-stock, exact matches
if r.metadata.get("in_stock"):
boost *= 1.10
elif intent == "recommend":
# Favor diversity in genre
boost *= 1.0 # neutral, handled by re-ranker
elif intent == "research":
# Favor titles with many reviews
review_count = r.metadata.get("review_count", 0)
if review_count > 100:
boost *= 1.10
r.score *= boost
return results
# ---- Helpers ----
def _build_filters(
self, filters: Optional[dict]
) -> list[dict]:
"""Build OpenSearch filter clauses from a filter dict."""
if not filters:
return []
clauses: list[dict] = []
if "genre" in filters:
clauses.append({"term": {"genre": filters["genre"]}})
if "in_stock" in filters:
clauses.append({"term": {"in_stock": filters["in_stock"]}})
if "price_max" in filters:
clauses.append(
{"range": {"price_jpy": {"lte": filters["price_max"]}}}
)
if "min_rating" in filters:
clauses.append(
{"range": {"avg_rating": {"gte": filters["min_rating"]}}}
)
if "demographic" in filters:
clauses.append(
{"term": {"demographic": filters["demographic"]}}
)
return clauses
def _parse_results(
self, response: dict, source: str
) -> list[SearchResult]:
"""Parse OpenSearch response into SearchResult list."""
results: list[SearchResult] = []
for hit in response.get("hits", {}).get("hits", []):
src = hit.get("_source", {})
results.append(SearchResult(
manga_id=src.get("manga_id", hit["_id"]),
title_ja=src.get("title_ja", ""),
title_en=src.get("title_en", ""),
score=hit.get("_score", 0.0),
source=source,
genre=src.get("genre", ""),
avg_rating=src.get("avg_rating", 0.0),
release_date=src.get("release_date", ""),
metadata={
"author": src.get("author", ""),
"sales_rank": src.get("sales_rank", 0),
"price_jpy": src.get("price_jpy", 0),
"in_stock": src.get("in_stock", True),
},
))
return results
Re-ranking Pipeline
The re-ranking stage takes the top-50 candidates from hybrid search and distills them to the top-5 most relevant results that will be injected into Claude 3's context window.
graph LR
TOP50["Top-50 from<br/>Hybrid Search"] --> CUSTOM["Custom Scorer<br/>Recency + Rating<br/>+ Intent Boost"]
CUSTOM --> TOP20["Top-20<br/>Candidates"]
TOP20 --> RERANK["Cross-Encoder<br/>or Claude Haiku<br/>Relevance Scoring"]
RERANK --> TOP5["Top-5<br/>Final Results"]
TOP5 --> CONTEXT["Claude 3 Sonnet<br/>RAG Context"]
Re-ranking Strategy Comparison
| Strategy |
Latency |
Quality |
Cost |
MangaAssist Use |
| No re-ranking (use hybrid score as-is) |
0ms |
Baseline |
$0 |
Not recommended — hybrid scores are noisy |
| Custom heuristic scorer |
5-10ms |
+15% NDCG |
$0 |
Always applied as first pass |
| Cross-encoder model (e.g., ms-marco-MiniLM) |
30-50ms |
+25% NDCG |
$0 (self-hosted on ECS) |
Default re-ranker for top-20 → top-5 |
| Claude Haiku re-ranker |
80-150ms |
+35% NDCG |
~$0.001/query |
Used for high-value queries (buy intent) |
| Claude Sonnet re-ranker |
200-400ms |
+40% NDCG |
~$0.008/query |
Too slow — exceeds 200ms retrieval budget |
Search Strategy Comparison
| Strategy |
Avg Latency (p95) |
NDCG@5 |
MRR |
Best Use Case |
| BM25 only |
25ms |
0.62 |
0.58 |
Exact title/author/ISBN lookup |
| kNN only |
45ms |
0.71 |
0.67 |
Semantic recommendation queries |
| Hybrid (RRF) |
55ms |
0.82 |
0.79 |
General-purpose manga discovery |
| Hybrid (weighted linear) |
55ms |
0.80 |
0.77 |
When BM25/kNN weight tuning is available |
| Hybrid + custom scoring |
65ms |
0.86 |
0.83 |
Business-aware ranking (recency, popularity) |
| Hybrid + cross-encoder re-rank |
120ms |
0.91 |
0.88 |
High-quality retrieval within 200ms budget |
| Hybrid + Haiku re-rank |
180ms |
0.93 |
0.90 |
Maximum quality within budget (buy intent) |
OpenSearch Serverless OCU Capacity Planning
| Metric |
Value |
Notes |
| Index OCUs |
2 (minimum for serverless) |
Handles nightly bulk indexing of catalog updates |
| Search OCUs |
4 |
Supports ~500 QPS at < 50ms per kNN query |
| Total documents |
100K manga products + 500K reviews + 15K authors |
~615K total across 3 indexes |
| Embedding dimension |
1536 (Titan v2) |
~6KB per vector |
| Storage estimate |
~4 GB (vectors) + ~2 GB (metadata/text) |
Well within serverless limits |
| Estimated monthly cost |
~$1,750 (6 OCUs * $0.24/hr * 730 hrs) + data transfer |
Scale OCUs as QPS grows |
Key Takeaways
- Hybrid search is non-negotiable for MangaAssist — neither BM25 nor kNN alone handles the full query spectrum (exact titles + semantic recommendations).
- RRF is the safest fusion default — it requires no score normalization and is robust to score scale mismatches between BM25 and kNN.
- Japanese tokenization is a first-class concern — kuromoji analyzer with custom synonym dictionaries prevents catastrophic misses on manga-specific terms.
- Re-ranking buys the most NDCG per millisecond — a cross-encoder on top-20 candidates adds +25% NDCG for only 30-50ms.
- The 200ms retrieval budget is achievable — preprocessing (15ms) + embedding (25ms) + hybrid search (80ms) + re-ranking (60ms) = ~180ms at p95.
- Multi-index federation is worth the complexity — querying products, reviews, and authors separately allows index-specific tuning and avoids one bloated index.