LOCAL PREVIEW View on GitHub

Retrieval Performance Architecture

AWS AIP-C01 Task 4.2 → Skill 4.2.2: Optimize retrieval mechanisms for FM-augmented applications System: MangaAssist e-commerce chatbot — Bedrock Claude 3 Sonnet/Haiku, OpenSearch Serverless (HNSW k-NN), DynamoDB, ECS Fargate, ElastiCache Redis Target: Total retrieval latency < 200ms (query preprocessing + search + re-ranking) at p95


Skill Mapping

AWS AIP-C01 Element Coverage
Task 4.2 Optimize application performance for FM workloads
Skill 4.2.2 Optimize retrieval mechanisms to improve FM-augmented application performance
Key Focus Index optimization, query preprocessing, hybrid search, custom scoring, re-ranking pipelines
MangaAssist Context OpenSearch Serverless stores ~100K manga product embeddings (1536-dim Titan v2); hybrid BM25 + kNN retrieval feeds Claude 3 RAG pipeline; Japanese + English bilingual queries

Mind Map — Retrieval Performance Dimensions

mindmap
  root((Retrieval<br/>Performance))
    Index Optimization
      HNSW Parameters
        ef_construction tuning
        m (graph connectivity)
        ef_search at query time
      Segment Merge Policies
        Force merge schedule
        Max segment size
      Field Mapping Optimization
        Keyword vs text
        doc_values for filters
        Exclude unnecessary fields
      Shard Sizing
        OCU-to-shard ratio
        Hot/warm tiering
    Query Preprocessing
      Query Expansion
        Manga term synonyms
        Genre abbreviations
      Japanese Tokenization
        kuromoji analyzer
        Reading form normalization
        Kanji/Kana/Romaji unification
      Synonym Expansion
        shonen/shounen/少年
        manga/漫画/マンガ
      Stopword Removal
        Language-aware lists
        Domain-specific stops
    Hybrid Search
      BM25 Keyword Search
        Exact title matching
        Author name lookup
        ISBN/catalog ID
      kNN Vector Search
        Semantic similarity
        Concept-level matching
        Cross-language retrieval
      Score Fusion
        Reciprocal Rank Fusion
        Weighted linear combination
        Learned combination
    Custom Scoring
      Recency Boost
        New release window
        Trending decay
      Popularity Boost
        Sales volume signal
        Rating-weighted score
      Intent-Aware Scoring
        Browse vs Buy vs Research
        Recommendation vs Search
    Re-ranking Pipeline
      Retrieve Top-50
      Cross-Encoder Re-rank
      LLM-Based Re-rank
      Final Top-5 Selection
    Multi-Index Strategy
      manga-products index
      manga-reviews index
      manga-authors index
      Federated search orchestration

Architecture — MangaAssist Retrieval Pipeline

graph TB
    subgraph QueryIngress["Query Ingress"]
        style QueryIngress fill:#1a1a2e,stroke:#16213e,color:#fff
        USER["Customer Query<br/>'ワンピースみたいな冒険漫画'"]
        APIGW["API Gateway<br/>WebSocket"]
        ECS["ECS Fargate<br/>Orchestrator"]
    end

    subgraph PreprocessingStage["Query Preprocessing (15ms budget)"]
        style PreprocessingStage fill:#0f3460,stroke:#16213e,color:#fff
        LANG_DETECT["Language Detection<br/>JA / EN / Mixed"]
        TOKENIZER["Japanese Tokenizer<br/>kuromoji + custom dict"]
        SYNONYM["Synonym Expansion<br/>shonen ↔ 少年"]
        QUERY_EXPAND["Query Expansion<br/>Genre + Theme terms"]
        INTENT["Intent Classifier<br/>Browse / Buy / Research"]
    end

    subgraph EmbeddingStage["Embedding (25ms budget)"]
        style EmbeddingStage fill:#533483,stroke:#16213e,color:#fff
        CACHE_CHECK["ElastiCache Redis<br/>Embedding Cache"]
        TITAN["Bedrock Titan<br/>Embeddings v2<br/>1536-dim"]
    end

    subgraph SearchStage["Hybrid Search (80ms budget)"]
        style SearchStage fill:#e94560,stroke:#16213e,color:#fff
        BM25["BM25 Keyword Search<br/>Exact titles, authors, ISBNs"]
        KNN["kNN Vector Search<br/>HNSW ef_search=256<br/>Semantic similarity"]
        FUSION["Score Fusion<br/>Reciprocal Rank Fusion<br/>α=0.6 vector, β=0.4 keyword"]
    end

    subgraph RerankStage["Re-ranking (60ms budget)"]
        style RerankStage fill:#0a1931,stroke:#16213e,color:#fff
        TOP50["Top-50 Candidates"]
        SCORER["Custom Scorer<br/>Recency + Popularity<br/>+ Intent boost"]
        CROSS_ENC["Cross-Encoder Reranker<br/>or Claude Haiku Reranker"]
        TOP5["Final Top-5<br/>Context for Claude 3"]
    end

    subgraph IndexLayer["OpenSearch Serverless"]
        style IndexLayer fill:#ff9900,stroke:#16213e,color:#000
        IDX_PRODUCTS["manga-products<br/>100K docs, 1536-dim<br/>HNSW m=16, ef=512"]
        IDX_REVIEWS["manga-reviews<br/>500K docs<br/>BM25 + kNN"]
        IDX_AUTHORS["manga-authors<br/>15K docs<br/>Keyword + kNN"]
    end

    USER --> APIGW --> ECS
    ECS --> LANG_DETECT --> TOKENIZER --> SYNONYM --> QUERY_EXPAND
    QUERY_EXPAND --> INTENT
    INTENT --> CACHE_CHECK
    CACHE_CHECK -->|hit| KNN
    CACHE_CHECK -->|miss| TITAN --> KNN
    QUERY_EXPAND --> BM25
    BM25 --> IDX_PRODUCTS & IDX_REVIEWS & IDX_AUTHORS
    KNN --> IDX_PRODUCTS & IDX_REVIEWS & IDX_AUTHORS
    IDX_PRODUCTS --> FUSION
    IDX_REVIEWS --> FUSION
    IDX_AUTHORS --> FUSION
    FUSION --> TOP50 --> SCORER --> CROSS_ENC --> TOP5
    TOP5 --> ECS

Latency Budget Breakdown

gantt
    title Retrieval Pipeline Latency Budget (p95 Target: < 200ms)
    dateFormat X
    axisFormat %L ms

    section Preprocessing
    Language Detection        :0, 3
    Tokenization + Synonyms   :3, 10
    Query Expansion + Intent  :10, 15

    section Embedding
    Cache Check (Redis)       :15, 17
    Titan Embedding (miss)    :17, 40

    section Search
    BM25 Keyword (parallel)   :40, 90
    kNN Vector (parallel)     :40, 100
    Score Fusion              :100, 120

    section Re-ranking
    Custom Scoring            :120, 140
    Cross-Encoder Top-50→5   :140, 180

    section Delivery
    Result Assembly           :180, 190
    Return to Orchestrator    :190, 195

Index Optimization for OpenSearch Serverless

HNSW Parameter Tuning

HNSW (Hierarchical Navigable Small World) graphs are the backbone of vector search in OpenSearch. Choosing the right parameters directly controls the recall-latency tradeoff.

Parameter Description MangaAssist Value Rationale
ef_construction Graph build-time expansion factor; higher = better recall, slower indexing 512 Product catalog updates are batched nightly; we can afford slower indexing for higher recall
m Max bi-directional links per node; higher = better recall, more memory 16 Balance between memory cost (OCU) and recall@10 > 0.95
ef_search Query-time expansion factor; higher = better recall, slower search 256 Tuned via benchmark: recall@10=0.97 at 35ms p95 per shard
space_type Distance metric cosinesimil Titan v2 embeddings are L2-normalized; cosine similarity is standard
engine kNN engine nmslib Mature HNSW implementation; faiss alternative for filtered search

Index Mapping Configuration

{
  "settings": {
    "index": {
      "knn": true,
      "knn.algo_param.ef_search": 256,
      "number_of_shards": 4,
      "number_of_replicas": 1
    },
    "analysis": {
      "analyzer": {
        "manga_ja_analyzer": {
          "type": "custom",
          "tokenizer": "kuromoji_tokenizer",
          "filter": [
            "kuromoji_baseform",
            "kuromoji_part_of_speech",
            "ja_stop",
            "kuromoji_stemmer",
            "lowercase"
          ]
        },
        "manga_en_analyzer": {
          "type": "custom",
          "tokenizer": "standard",
          "filter": ["lowercase", "english_stop", "english_stemmer"]
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "manga_id": { "type": "keyword" },
      "title_ja": {
        "type": "text",
        "analyzer": "manga_ja_analyzer",
        "fields": { "keyword": { "type": "keyword" } }
      },
      "title_en": {
        "type": "text",
        "analyzer": "manga_en_analyzer",
        "fields": { "keyword": { "type": "keyword" } }
      },
      "description_embedding": {
        "type": "knn_vector",
        "dimension": 1536,
        "method": {
          "name": "hnsw",
          "space_type": "cosinesimil",
          "engine": "nmslib",
          "parameters": {
            "ef_construction": 512,
            "m": 16
          }
        }
      },
      "genre": { "type": "keyword" },
      "author": {
        "type": "text",
        "fields": { "keyword": { "type": "keyword" } }
      },
      "release_date": { "type": "date" },
      "avg_rating": { "type": "float" },
      "sales_rank": { "type": "integer" },
      "price_jpy": { "type": "integer" },
      "in_stock": { "type": "boolean" },
      "tags": { "type": "keyword" },
      "volume_count": { "type": "integer" },
      "publisher": { "type": "keyword" },
      "demographic": { "type": "keyword" }
    }
  }
}

Segment Merge Policies

OpenSearch Serverless manages segment merges automatically, but understanding the impact is critical for retrieval latency.

Policy Aspect Recommendation MangaAssist Application
Force merge after bulk indexing Trigger _forcemerge to 1 segment per shard after nightly catalog sync Reduces search latency by ~15% on the morning after catalog updates
Max segment size Target segments < 5 GB for HNSW indexes With 100K docs at ~6KB/doc (embedding + metadata), total ~600MB fits easily
Refresh interval Set to 30s for near-real-time; 60s acceptable for product catalog MangaAssist uses 30s — new manga appear within 30s of DynamoDB write
Avoid small segments Bulk indexing with batch size 500-1000 Prevents segment explosion during catalog imports

Multi-Index Strategy

graph LR
    subgraph Indexes["OpenSearch Serverless Collection"]
        IDX1["manga-products<br/>100K docs<br/>Title, Description, Embedding<br/>Genre, Author, Price, Rating"]
        IDX2["manga-reviews<br/>500K docs<br/>Review text, Embedding<br/>Rating, Helpfulness"]
        IDX3["manga-authors<br/>15K docs<br/>Bio, Embedding<br/>Notable works, Genre specialty"]
    end

    QUERY["Preprocessed Query"] --> IDX1 & IDX2 & IDX3
    IDX1 -->|Top-20 products| MERGE["Federated<br/>Result Merge"]
    IDX2 -->|Top-10 reviews| MERGE
    IDX3 -->|Top-5 authors| MERGE
    MERGE --> RERANK["Re-rank<br/>Top-50 → Top-5"]
Index Document Count Primary Use Case Search Type Latency Target
manga-products 100K Product discovery, recommendations Hybrid (BM25 + kNN) < 50ms
manga-reviews 500K Review summarization, sentiment context Hybrid (BM25 + kNN) < 60ms
manga-authors 15K Author information, similar-author discovery Keyword + kNN < 30ms

Query Preprocessing Pipeline

Japanese Language Handling

MangaAssist serves a JP manga audience — queries arrive in Japanese, English, Romaji, or mixed scripts. The preprocessing pipeline must normalize all forms before search.

graph LR
    RAW["Raw Query<br/>'ワンピースみたいな<br/>adventure manga'"] --> DETECT["Language<br/>Detection"]
    DETECT --> JA["Japanese Path<br/>kuromoji tokenizer"]
    DETECT --> EN["English Path<br/>standard tokenizer"]
    DETECT --> MIXED["Mixed Path<br/>split → both paths"]
    JA --> NORM["Normalization<br/>全角→半角<br/>カタカナ→ひらがな"]
    EN --> STEM["Stemming +<br/>Lowercasing"]
    MIXED --> NORM & STEM
    NORM --> SYN["Synonym<br/>Expansion"]
    STEM --> SYN
    SYN --> EXPAND["Query<br/>Expansion"]
    EXPAND --> OUTPUT["Processed Query<br/>Tokens + Synonyms<br/>+ Expanded terms"]

Python — QueryPreprocessor

"""
MangaAssist Query Preprocessor
Handles Japanese/English bilingual queries for OpenSearch retrieval.
"""

import re
import unicodedata
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional

import boto3
from opensearchpy import OpenSearch


class QueryLanguage(Enum):
    JAPANESE = "ja"
    ENGLISH = "en"
    MIXED = "mixed"


class UserIntent(Enum):
    BROWSE = "browse"          # Exploring, no specific title
    BUY = "buy"                # Looking for a specific product
    RESEARCH = "research"      # Comparing, reading reviews
    RECOMMEND = "recommend"    # "Something like X"


@dataclass
class ProcessedQuery:
    """Output of the query preprocessing pipeline."""
    original: str
    language: QueryLanguage
    intent: UserIntent
    tokens_ja: list[str] = field(default_factory=list)
    tokens_en: list[str] = field(default_factory=list)
    synonyms: list[str] = field(default_factory=list)
    expanded_terms: list[str] = field(default_factory=list)
    normalized_text: str = ""
    search_text_bm25: str = ""        # For BM25 keyword search
    embedding_text: str = ""           # For vector embedding


# Manga-domain synonym map: canonical → all variants
MANGA_SYNONYMS: dict[str, list[str]] = {
    "shonen":   ["shounen", "少年", "しょうねん", "ショウネン"],
    "shojo":    ["shoujo", "少女", "しょうじょ", "ショウジョ"],
    "seinen":   ["青年", "せいねん", "セイネン"],
    "josei":    ["女性", "じょせい", "ジョセイ"],
    "isekai":   ["異世界", "いせかい", "イセカイ"],
    "manga":    ["漫画", "マンガ", "まんが"],
    "anime":    ["アニメ", "あにめ"],
    "one piece": ["ワンピース", "わんぴーす"],
    "naruto":   ["ナルト", "なると"],
    "attack on titan": ["進撃の巨人", "しんげきのきょじん"],
    "demon slayer":    ["鬼滅の刃", "きめつのやいば"],
}

# Intent detection keywords
INTENT_KEYWORDS: dict[UserIntent, list[str]] = {
    UserIntent.BUY: [
        "buy", "purchase", "order", "price", "在庫", "買う",
        "購入", "注文", "値段", "how much", "いくら",
    ],
    UserIntent.RECOMMEND: [
        "like", "similar", "recommend", "suggest", "みたいな",
        "おすすめ", "似てる", "のような", "っぽい",
    ],
    UserIntent.RESEARCH: [
        "review", "compare", "rating", "レビュー", "比較",
        "評価", "difference", "違い", "vs",
    ],
    UserIntent.BROWSE: [
        "show", "list", "what", "explore", "見せて",
        "一覧", "何", "どんな",
    ],
}


class QueryPreprocessor:
    """
    Bilingual query preprocessing for MangaAssist.
    Normalizes Japanese/English text, expands synonyms, detects intent.

    Latency budget: 15ms total for the full preprocessing pipeline.
    """

    def __init__(self, custom_synonyms: Optional[dict] = None):
        self.synonyms = {**MANGA_SYNONYMS, **(custom_synonyms or {})}
        # Build reverse lookup: any variant → canonical + all variants
        self._reverse_synonyms: dict[str, list[str]] = {}
        for canonical, variants in self.synonyms.items():
            all_forms = [canonical] + variants
            for form in all_forms:
                self._reverse_synonyms[form.lower()] = all_forms

    def process(self, raw_query: str) -> ProcessedQuery:
        """Full preprocessing pipeline: detect → normalize → expand → classify."""
        language = self._detect_language(raw_query)
        normalized = self._normalize(raw_query)
        intent = self._detect_intent(normalized, language)
        tokens_ja, tokens_en = self._tokenize(normalized, language)
        synonyms = self._expand_synonyms(tokens_ja + tokens_en)
        expanded = self._expand_query(tokens_ja + tokens_en, intent)

        # Build search-ready text
        all_terms = tokens_ja + tokens_en + synonyms + expanded
        search_text = " ".join(dict.fromkeys(all_terms))  # dedupe, preserve order
        embedding_text = self._build_embedding_text(
            normalized, synonyms, intent
        )

        return ProcessedQuery(
            original=raw_query,
            language=language,
            intent=intent,
            tokens_ja=tokens_ja,
            tokens_en=tokens_en,
            synonyms=synonyms,
            expanded_terms=expanded,
            normalized_text=normalized,
            search_text_bm25=search_text,
            embedding_text=embedding_text,
        )

    # ---- Language Detection ----

    def _detect_language(self, text: str) -> QueryLanguage:
        """Detect query language based on Unicode script analysis."""
        has_cjk = bool(re.search(r'[\u3000-\u9fff\uf900-\ufaff]', text))
        has_latin = bool(re.search(r'[a-zA-Z]{2,}', text))
        if has_cjk and has_latin:
            return QueryLanguage.MIXED
        elif has_cjk:
            return QueryLanguage.JAPANESE
        return QueryLanguage.ENGLISH

    # ---- Normalization ----

    def _normalize(self, text: str) -> str:
        """
        Normalize Unicode forms for consistent matching.
        - NFKC normalization (fullwidth → halfwidth, etc.)
        - Strip excess whitespace
        """
        text = unicodedata.normalize("NFKC", text)
        text = re.sub(r'\s+', ' ', text).strip()
        return text

    # ---- Tokenization ----

    def _tokenize(
        self, text: str, language: QueryLanguage
    ) -> tuple[list[str], list[str]]:
        """
        Tokenize based on detected language.
        Japanese: character-level bigrams + known terms (production would use kuromoji).
        English: whitespace split + lowercasing.
        """
        tokens_ja: list[str] = []
        tokens_en: list[str] = []

        # Split into Japanese and English segments
        ja_segments = re.findall(r'[\u3000-\u9fff\uf900-\ufaff]+', text)
        en_segments = re.findall(r'[a-zA-Z]+', text)

        for seg in ja_segments:
            # In production: call kuromoji via OpenSearch _analyze API
            # Here: simple character bigrams as fallback
            tokens_ja.append(seg)
            if len(seg) > 2:
                for i in range(len(seg) - 1):
                    tokens_ja.append(seg[i:i+2])

        for seg in en_segments:
            tokens_en.append(seg.lower())

        return tokens_ja, tokens_en

    # ---- Synonym Expansion ----

    def _expand_synonyms(self, tokens: list[str]) -> list[str]:
        """Expand tokens using the manga-domain synonym map."""
        expanded: list[str] = []
        for token in tokens:
            key = token.lower()
            if key in self._reverse_synonyms:
                for syn in self._reverse_synonyms[key]:
                    if syn.lower() != key:
                        expanded.append(syn)
        return list(dict.fromkeys(expanded))  # dedupe

    # ---- Intent Detection ----

    def _detect_intent(
        self, text: str, language: QueryLanguage
    ) -> UserIntent:
        """Classify user intent from query keywords."""
        text_lower = text.lower()
        scores: dict[UserIntent, int] = {intent: 0 for intent in UserIntent}
        for intent, keywords in INTENT_KEYWORDS.items():
            for kw in keywords:
                if kw in text_lower:
                    scores[intent] += 1
        best = max(scores, key=scores.get)
        return best if scores[best] > 0 else UserIntent.BROWSE

    # ---- Query Expansion ----

    def _expand_query(
        self, tokens: list[str], intent: UserIntent
    ) -> list[str]:
        """
        Add contextual terms based on intent.
        - RECOMMEND: add genre/theme terms
        - BUY: add availability terms
        - RESEARCH: add review/comparison terms
        """
        expansion: list[str] = []
        if intent == UserIntent.RECOMMEND:
            expansion.extend(["similar", "おすすめ", "recommendation"])
        elif intent == UserIntent.BUY:
            expansion.extend(["in_stock", "price", "購入可能"])
        elif intent == UserIntent.RESEARCH:
            expansion.extend(["review", "rating", "レビュー", "評価"])
        return expansion

    # ---- Embedding Text Construction ----

    def _build_embedding_text(
        self, normalized: str, synonyms: list[str], intent: UserIntent
    ) -> str:
        """
        Build text optimized for Titan embedding generation.
        Prefix with intent context for better semantic matching.
        """
        intent_prefix = {
            UserIntent.BROWSE: "manga search:",
            UserIntent.BUY: "manga product to purchase:",
            UserIntent.RECOMMEND: "manga recommendation similar to:",
            UserIntent.RESEARCH: "manga review and comparison:",
        }
        prefix = intent_prefix.get(intent, "manga search:")
        synonym_str = " ".join(synonyms[:5])  # limit synonym injection
        return f"{prefix} {normalized} {synonym_str}".strip()

Hybrid Search Implementation

Why Hybrid Search for MangaAssist?

Query Type Best Search Method Example
Exact title lookup BM25 keyword "鬼滅の刃 23巻" (Demon Slayer Vol 23)
Semantic concept kNN vector "adventure manga with pirates"
Author + genre BM25 keyword "Eiichiro Oda shonen"
Vague recommendation kNN vector "something dark and philosophical like Death Note"
Mixed specific + vague Hybrid BM25 + kNN "manga like ワンピース with good art"

Pure BM25 fails on semantic queries ("adventure manga with pirates" does not literally match "One Piece"). Pure kNN fails on exact identifiers (ISBN, volume numbers, exact titles). Hybrid search combines both strengths.

Python — HybridSearchManager

"""
MangaAssist Hybrid Search Manager
Combines BM25 keyword search + kNN vector search with score fusion.
Target: < 80ms for the search stage at p95.
"""

import time
import hashlib
import json
from dataclasses import dataclass, field
from typing import Any, Optional

import boto3
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth


@dataclass
class SearchResult:
    """A single search result from any search method."""
    manga_id: str
    title_ja: str
    title_en: str
    score: float
    source: str          # "bm25", "knn", or "hybrid"
    genre: str = ""
    avg_rating: float = 0.0
    release_date: str = ""
    metadata: dict = field(default_factory=dict)


@dataclass
class HybridSearchResponse:
    """Combined response from hybrid search pipeline."""
    results: list[SearchResult]
    bm25_count: int
    knn_count: int
    fusion_method: str
    total_latency_ms: float
    bm25_latency_ms: float
    knn_latency_ms: float
    fusion_latency_ms: float


class HybridSearchManager:
    """
    Orchestrates hybrid BM25 + kNN search against OpenSearch Serverless.

    Architecture:
      1. Execute BM25 and kNN queries in parallel
      2. Fuse scores using Reciprocal Rank Fusion (RRF) or weighted linear
      3. Apply custom business scoring (recency, popularity, intent)
      4. Return top-K candidates for re-ranking

    Latency budget: 80ms total (BM25 + kNN parallel, then fusion).
    """

    def __init__(
        self,
        opensearch_endpoint: str,
        region: str = "ap-northeast-1",
        product_index: str = "manga-products",
        review_index: str = "manga-reviews",
        author_index: str = "manga-authors",
        vector_field: str = "description_embedding",
        embedding_dimension: int = 1536,
    ):
        self.region = region
        self.product_index = product_index
        self.review_index = review_index
        self.author_index = author_index
        self.vector_field = vector_field
        self.embedding_dimension = embedding_dimension

        # AWS auth for OpenSearch Serverless
        credentials = boto3.Session().get_credentials()
        self.awsauth = AWS4Auth(
            credentials.access_key,
            credentials.secret_key,
            region,
            "aoss",
            session_token=credentials.token,
        )
        self.client = OpenSearch(
            hosts=[{"host": opensearch_endpoint, "port": 443}],
            http_auth=self.awsauth,
            use_ssl=True,
            verify_certs=True,
            connection_class=RequestsHttpConnection,
            timeout=5,
        )

        # Bedrock client for Titan embeddings
        self.bedrock = boto3.client(
            "bedrock-runtime", region_name=region
        )

    # ---- Main Hybrid Search ----

    def hybrid_search(
        self,
        query_text: str,
        query_embedding: list[float],
        intent: str = "browse",
        top_k: int = 50,
        bm25_weight: float = 0.4,
        knn_weight: float = 0.6,
        fusion_method: str = "rrf",
        filters: Optional[dict] = None,
    ) -> HybridSearchResponse:
        """
        Execute hybrid BM25 + kNN search with score fusion.

        Args:
            query_text: Preprocessed search text for BM25.
            query_embedding: 1536-dim vector from Titan Embeddings.
            intent: User intent (browse/buy/recommend/research).
            top_k: Number of candidates to retrieve before re-ranking.
            bm25_weight: Weight for keyword score in linear fusion.
            knn_weight: Weight for vector score in linear fusion.
            fusion_method: "rrf" or "linear".
            filters: Optional OpenSearch filters (genre, price, etc.).

        Returns:
            HybridSearchResponse with fused results and latency metrics.
        """
        t_start = time.perf_counter()

        # --- Parallel execution of BM25 and kNN ---
        # In production, use asyncio or ThreadPoolExecutor for true parallelism
        t_bm25_start = time.perf_counter()
        bm25_results = self._bm25_search(
            query_text, top_k=top_k, filters=filters
        )
        bm25_latency = (time.perf_counter() - t_bm25_start) * 1000

        t_knn_start = time.perf_counter()
        knn_results = self._knn_search(
            query_embedding, top_k=top_k, filters=filters
        )
        knn_latency = (time.perf_counter() - t_knn_start) * 1000

        # --- Score fusion ---
        t_fusion_start = time.perf_counter()
        if fusion_method == "rrf":
            fused = self._reciprocal_rank_fusion(
                bm25_results, knn_results, k=60
            )
        else:
            fused = self._weighted_linear_fusion(
                bm25_results, knn_results,
                bm25_weight=bm25_weight,
                knn_weight=knn_weight,
            )
        fusion_latency = (time.perf_counter() - t_fusion_start) * 1000

        # --- Apply business scoring ---
        scored = self._apply_business_scoring(fused, intent)

        # --- Sort and truncate ---
        scored.sort(key=lambda r: r.score, reverse=True)
        top_results = scored[:top_k]

        total_latency = (time.perf_counter() - t_start) * 1000

        return HybridSearchResponse(
            results=top_results,
            bm25_count=len(bm25_results),
            knn_count=len(knn_results),
            fusion_method=fusion_method,
            total_latency_ms=round(total_latency, 2),
            bm25_latency_ms=round(bm25_latency, 2),
            knn_latency_ms=round(knn_latency, 2),
            fusion_latency_ms=round(fusion_latency, 2),
        )

    # ---- BM25 Keyword Search ----

    def _bm25_search(
        self,
        query_text: str,
        top_k: int = 50,
        filters: Optional[dict] = None,
    ) -> list[SearchResult]:
        """
        BM25 keyword search across title, description, author fields.
        Uses multi_match with cross_fields for best coverage.
        """
        must_clause: list[dict] = [
            {
                "multi_match": {
                    "query": query_text,
                    "fields": [
                        "title_ja^3",
                        "title_en^3",
                        "title_ja.keyword^5",
                        "title_en.keyword^5",
                        "author^2",
                        "description",
                        "tags^1.5",
                        "genre^1.5",
                    ],
                    "type": "cross_fields",
                    "operator": "or",
                    "minimum_should_match": "30%",
                }
            }
        ]

        filter_clause = self._build_filters(filters)

        body = {
            "size": top_k,
            "query": {
                "bool": {
                    "must": must_clause,
                    "filter": filter_clause,
                }
            },
            "_source": [
                "manga_id", "title_ja", "title_en", "genre",
                "avg_rating", "release_date", "author",
                "sales_rank", "price_jpy",
            ],
        }

        resp = self.client.search(index=self.product_index, body=body)
        return self._parse_results(resp, source="bm25")

    # ---- kNN Vector Search ----

    def _knn_search(
        self,
        query_embedding: list[float],
        top_k: int = 50,
        filters: Optional[dict] = None,
    ) -> list[SearchResult]:
        """
        HNSW k-NN vector search for semantic similarity.
        Uses ef_search=256 for high recall at acceptable latency.
        """
        knn_clause: dict[str, Any] = {
            self.vector_field: {
                "vector": query_embedding,
                "k": top_k,
            }
        }

        filter_clause = self._build_filters(filters)
        if filter_clause:
            knn_clause[self.vector_field]["filter"] = {
                "bool": {"filter": filter_clause}
            }

        body: dict[str, Any] = {
            "size": top_k,
            "query": {"knn": knn_clause},
            "_source": [
                "manga_id", "title_ja", "title_en", "genre",
                "avg_rating", "release_date", "author",
                "sales_rank", "price_jpy",
            ],
        }

        resp = self.client.search(index=self.product_index, body=body)
        return self._parse_results(resp, source="knn")

    # ---- Score Fusion Methods ----

    def _reciprocal_rank_fusion(
        self,
        bm25_results: list[SearchResult],
        knn_results: list[SearchResult],
        k: int = 60,
    ) -> list[SearchResult]:
        """
        Reciprocal Rank Fusion (RRF) — rank-based fusion that is
        robust to score scale differences between BM25 and kNN.

        RRF(d) = sum( 1 / (k + rank_i(d)) ) for each ranker i

        k=60 is the standard smoothing constant (from the original paper).
        """
        rrf_scores: dict[str, float] = {}
        result_map: dict[str, SearchResult] = {}

        for rank, result in enumerate(bm25_results):
            rrf_scores[result.manga_id] = rrf_scores.get(
                result.manga_id, 0.0
            ) + 1.0 / (k + rank + 1)
            result_map[result.manga_id] = result

        for rank, result in enumerate(knn_results):
            rrf_scores[result.manga_id] = rrf_scores.get(
                result.manga_id, 0.0
            ) + 1.0 / (k + rank + 1)
            if result.manga_id not in result_map:
                result_map[result.manga_id] = result

        fused: list[SearchResult] = []
        for manga_id, score in rrf_scores.items():
            r = result_map[manga_id]
            fused.append(SearchResult(
                manga_id=r.manga_id,
                title_ja=r.title_ja,
                title_en=r.title_en,
                score=score,
                source="hybrid",
                genre=r.genre,
                avg_rating=r.avg_rating,
                release_date=r.release_date,
                metadata=r.metadata,
            ))
        return fused

    def _weighted_linear_fusion(
        self,
        bm25_results: list[SearchResult],
        knn_results: list[SearchResult],
        bm25_weight: float = 0.4,
        knn_weight: float = 0.6,
    ) -> list[SearchResult]:
        """
        Weighted linear combination of normalized scores.
        Requires min-max normalization to bring BM25 and kNN
        scores onto the same [0, 1] scale.
        """
        def normalize(results: list[SearchResult]) -> list[SearchResult]:
            if not results:
                return results
            scores = [r.score for r in results]
            min_s, max_s = min(scores), max(scores)
            rng = max_s - min_s if max_s != min_s else 1.0
            for r in results:
                r.score = (r.score - min_s) / rng
            return results

        bm25_norm = normalize(list(bm25_results))
        knn_norm = normalize(list(knn_results))

        score_map: dict[str, float] = {}
        result_map: dict[str, SearchResult] = {}

        for r in bm25_norm:
            score_map[r.manga_id] = bm25_weight * r.score
            result_map[r.manga_id] = r

        for r in knn_norm:
            score_map[r.manga_id] = score_map.get(
                r.manga_id, 0.0
            ) + knn_weight * r.score
            if r.manga_id not in result_map:
                result_map[r.manga_id] = r

        fused: list[SearchResult] = []
        for manga_id, score in score_map.items():
            r = result_map[manga_id]
            fused.append(SearchResult(
                manga_id=r.manga_id,
                title_ja=r.title_ja,
                title_en=r.title_en,
                score=score,
                source="hybrid",
                genre=r.genre,
                avg_rating=r.avg_rating,
                release_date=r.release_date,
                metadata=r.metadata,
            ))
        return fused

    # ---- Business Scoring ----

    def _apply_business_scoring(
        self,
        results: list[SearchResult],
        intent: str,
    ) -> list[SearchResult]:
        """
        Apply MangaAssist business rules on top of retrieval scores.
        - Recency boost for new releases (last 30 days: +20%)
        - Popularity boost for high-rated titles (>= 4.5: +15%)
        - Intent-aware adjustments
        """
        import datetime

        now = datetime.date.today()
        thirty_days_ago = now - datetime.timedelta(days=30)

        for r in results:
            boost = 1.0

            # Recency boost
            if r.release_date:
                try:
                    release = datetime.date.fromisoformat(r.release_date[:10])
                    if release >= thirty_days_ago:
                        boost *= 1.20  # +20% for new releases
                except (ValueError, TypeError):
                    pass

            # Popularity boost
            if r.avg_rating >= 4.5:
                boost *= 1.15  # +15% for highly rated
            elif r.avg_rating >= 4.0:
                boost *= 1.05  # +5% for well rated

            # Intent-aware boost
            if intent == "buy":
                # Favor in-stock, exact matches
                if r.metadata.get("in_stock"):
                    boost *= 1.10
            elif intent == "recommend":
                # Favor diversity in genre
                boost *= 1.0  # neutral, handled by re-ranker
            elif intent == "research":
                # Favor titles with many reviews
                review_count = r.metadata.get("review_count", 0)
                if review_count > 100:
                    boost *= 1.10

            r.score *= boost

        return results

    # ---- Helpers ----

    def _build_filters(
        self, filters: Optional[dict]
    ) -> list[dict]:
        """Build OpenSearch filter clauses from a filter dict."""
        if not filters:
            return []
        clauses: list[dict] = []
        if "genre" in filters:
            clauses.append({"term": {"genre": filters["genre"]}})
        if "in_stock" in filters:
            clauses.append({"term": {"in_stock": filters["in_stock"]}})
        if "price_max" in filters:
            clauses.append(
                {"range": {"price_jpy": {"lte": filters["price_max"]}}}
            )
        if "min_rating" in filters:
            clauses.append(
                {"range": {"avg_rating": {"gte": filters["min_rating"]}}}
            )
        if "demographic" in filters:
            clauses.append(
                {"term": {"demographic": filters["demographic"]}}
            )
        return clauses

    def _parse_results(
        self, response: dict, source: str
    ) -> list[SearchResult]:
        """Parse OpenSearch response into SearchResult list."""
        results: list[SearchResult] = []
        for hit in response.get("hits", {}).get("hits", []):
            src = hit.get("_source", {})
            results.append(SearchResult(
                manga_id=src.get("manga_id", hit["_id"]),
                title_ja=src.get("title_ja", ""),
                title_en=src.get("title_en", ""),
                score=hit.get("_score", 0.0),
                source=source,
                genre=src.get("genre", ""),
                avg_rating=src.get("avg_rating", 0.0),
                release_date=src.get("release_date", ""),
                metadata={
                    "author": src.get("author", ""),
                    "sales_rank": src.get("sales_rank", 0),
                    "price_jpy": src.get("price_jpy", 0),
                    "in_stock": src.get("in_stock", True),
                },
            ))
        return results

Re-ranking Pipeline

The re-ranking stage takes the top-50 candidates from hybrid search and distills them to the top-5 most relevant results that will be injected into Claude 3's context window.

graph LR
    TOP50["Top-50 from<br/>Hybrid Search"] --> CUSTOM["Custom Scorer<br/>Recency + Rating<br/>+ Intent Boost"]
    CUSTOM --> TOP20["Top-20<br/>Candidates"]
    TOP20 --> RERANK["Cross-Encoder<br/>or Claude Haiku<br/>Relevance Scoring"]
    RERANK --> TOP5["Top-5<br/>Final Results"]
    TOP5 --> CONTEXT["Claude 3 Sonnet<br/>RAG Context"]

Re-ranking Strategy Comparison

Strategy Latency Quality Cost MangaAssist Use
No re-ranking (use hybrid score as-is) 0ms Baseline $0 Not recommended — hybrid scores are noisy
Custom heuristic scorer 5-10ms +15% NDCG $0 Always applied as first pass
Cross-encoder model (e.g., ms-marco-MiniLM) 30-50ms +25% NDCG $0 (self-hosted on ECS) Default re-ranker for top-20 → top-5
Claude Haiku re-ranker 80-150ms +35% NDCG ~$0.001/query Used for high-value queries (buy intent)
Claude Sonnet re-ranker 200-400ms +40% NDCG ~$0.008/query Too slow — exceeds 200ms retrieval budget

Search Strategy Comparison

Strategy Avg Latency (p95) NDCG@5 MRR Best Use Case
BM25 only 25ms 0.62 0.58 Exact title/author/ISBN lookup
kNN only 45ms 0.71 0.67 Semantic recommendation queries
Hybrid (RRF) 55ms 0.82 0.79 General-purpose manga discovery
Hybrid (weighted linear) 55ms 0.80 0.77 When BM25/kNN weight tuning is available
Hybrid + custom scoring 65ms 0.86 0.83 Business-aware ranking (recency, popularity)
Hybrid + cross-encoder re-rank 120ms 0.91 0.88 High-quality retrieval within 200ms budget
Hybrid + Haiku re-rank 180ms 0.93 0.90 Maximum quality within budget (buy intent)

OpenSearch Serverless OCU Capacity Planning

Metric Value Notes
Index OCUs 2 (minimum for serverless) Handles nightly bulk indexing of catalog updates
Search OCUs 4 Supports ~500 QPS at < 50ms per kNN query
Total documents 100K manga products + 500K reviews + 15K authors ~615K total across 3 indexes
Embedding dimension 1536 (Titan v2) ~6KB per vector
Storage estimate ~4 GB (vectors) + ~2 GB (metadata/text) Well within serverless limits
Estimated monthly cost ~$1,750 (6 OCUs * $0.24/hr * 730 hrs) + data transfer Scale OCUs as QPS grows

Key Takeaways

  1. Hybrid search is non-negotiable for MangaAssist — neither BM25 nor kNN alone handles the full query spectrum (exact titles + semantic recommendations).
  2. RRF is the safest fusion default — it requires no score normalization and is robust to score scale mismatches between BM25 and kNN.
  3. Japanese tokenization is a first-class concern — kuromoji analyzer with custom synonym dictionaries prevents catastrophic misses on manga-specific terms.
  4. Re-ranking buys the most NDCG per millisecond — a cross-encoder on top-20 candidates adds +25% NDCG for only 30-50ms.
  5. The 200ms retrieval budget is achievable — preprocessing (15ms) + embedding (25ms) + hybrid search (80ms) + re-ranking (60ms) = ~180ms at p95.
  6. Multi-index federation is worth the complexity — querying products, reviews, and authors separately allows index-specific tuning and avoids one bloated index.