LOCAL PREVIEW View on GitHub

04: Retrieval System Troubleshooting

AIP-C01 Mapping

Task 5.2 → Skill 5.2.4: Diagnose and resolve RAG retrieval quality issues including model response relevance, embedding quality, drift monitoring, vectorization problems, and vector search performance.


User Story

As an ML engineer on the MangaAssist team, I want to systematically diagnose and fix retrieval-augmented generation failures — from embedding quality to search relevance to retrieval-generation alignment, So that the chatbot consistently retrieves the most relevant manga product information, FAQs, and policy documents, and the FM generates answers grounded in the retrieved content.


Acceptance Criteria

  • Embedding drift is detected within 24 hours when the distribution shift exceeds a configurable threshold
  • Retrieval relevance is measured per query: top-K results are scored against ground truth with MRR ≥ 0.7
  • Vectorization failures (encoding errors, dimension mismatches, NaN embeddings) are caught before indexing
  • Chunk quality metrics (size distribution, overlap ratio, metadata completeness) are tracked per ingestion pipeline run
  • Vector search latency P95 < 100ms for OpenSearch Serverless with correct index configuration
  • End-to-end retrieval-generation alignment score (retrieved content vs generated answer) ≥ 0.8

High-Level Design

Retrieval Failure Taxonomy

graph TD
    A[RAG Retrieval<br>Failure] --> B[Embedding<br>Issues]
    A --> C[Indexing<br>Issues]
    A --> D[Search<br>Issues]
    A --> E[Retrieval-Generation<br>Alignment Issues]

    B --> B1[Embedding drift:<br>model updated quietly]
    B --> B2[Dimension mismatch:<br>new model, old index]
    B --> B3[NaN/zero embeddings:<br>encoding failure]
    B --> B4[Language bias:<br>JP text embedded poorly]

    C --> C1[Stale index:<br>new products missing]
    C --> C2[Chunk too large:<br>diluted relevance]
    C --> C3[Chunk too small:<br>missing context]
    C --> C4[Missing metadata:<br>no filtering possible]

    D --> D1[Wrong K value:<br>too few/many results]
    D --> D2[Threshold too strict:<br>good results filtered]
    D --> D3[Index not refreshed:<br>reads stale segments]
    D --> D4[ANN inaccuracy:<br>approximate misses]

    E --> E1[Retrieved but ignored:<br>FM skips context]
    E --> E2[Hallucination despite<br>good retrieval]
    E --> E3[Wrong chunk selected<br>from multiple hits]
    E --> E4[Context formatting<br>confuses FM]

Retrieval Quality Pipeline

flowchart TD
    A[User Query] --> B[Query Embedding]
    B --> C[Vector Search<br>OpenSearch k-NN]
    C --> D[Top-K Results]
    D --> E{Relevance<br>Score Check}

    E -->|All below threshold| F[Fallback: keyword<br>search + re-rank]
    E -->|Some above| G[Filter + Re-rank]

    G --> H[Context Assembly]
    F --> H

    H --> I[FM Invocation<br>with Context]
    I --> J[Response]

    J --> K[Alignment Check:<br>Did FM use context?]
    K -->|Used context| L[Log: grounded response]
    K -->|Ignored context| M[Log: potential<br>hallucination risk]

    subgraph Monitoring
        N[Embedding Drift<br>Monitor]
        O[Index Freshness<br>Checker]
        P[Search Quality<br>Metrics]
        Q[Alignment<br>Scorer]
    end

    B -.-> N
    C -.-> P
    D -.-> O
    K -.-> Q

Embedding Drift Detection Architecture

sequenceDiagram
    participant Sched as CloudWatch Scheduler
    participant Lambda as Drift Monitor Lambda
    participant OS as OpenSearch
    participant Embed as Bedrock Embeddings
    participant CW as CloudWatch Metrics

    Sched->>Lambda: Trigger daily drift check
    Lambda->>OS: Fetch random sample (N=200)
    Lambda->>Lambda: Select anchor set (reference embeddings)

    loop Each sampled document
        Lambda->>Embed: Re-embed document text
        Lambda->>Lambda: Compare new vs stored embedding (cosine distance)
    end

    Lambda->>Lambda: Compute drift statistics (mean, P95, max)
    Lambda->>CW: Emit EmbeddingDriftMean, EmbeddingDriftP95

    alt Drift > threshold
        Lambda->>CW: Emit DriftAlert = 1
        Lambda->>Lambda: Log drifted document IDs for investigation
    end

Low-Level Design

1. Embedding Drift Monitor

Embedding models can change behavior across versions, or the data distribution can shift (new product categories, new manga genres). The drift monitor re-embeds a random sample and compares against stored vectors.

import json
import time
import logging
import random
import math
import boto3
from dataclasses import dataclass, field
from typing import Optional
from datetime import datetime

logger = logging.getLogger("mangaassist.retrieval")


@dataclass
class DriftSample:
    """A single document sampled for drift analysis."""
    doc_id: str
    doc_text: str
    stored_embedding: list
    fresh_embedding: Optional[list] = None
    cosine_distance: Optional[float] = None


@dataclass
class DriftReport:
    """Report from a single drift monitoring run."""
    run_id: str
    timestamp: str
    sample_size: int
    mean_drift: float = 0.0
    p50_drift: float = 0.0
    p95_drift: float = 0.0
    max_drift: float = 0.0
    drifted_doc_ids: list = field(default_factory=list)
    drift_threshold: float = 0.15
    alert: bool = False

    def summary(self) -> str:
        status = "ALERT" if self.alert else "OK"
        return (
            f"[{status}] Drift Report {self.run_id}: "
            f"mean={self.mean_drift:.4f}, p95={self.p95_drift:.4f}, "
            f"max={self.max_drift:.4f}, drifted={len(self.drifted_doc_ids)}/{self.sample_size}"
        )


class EmbeddingDriftMonitor:
    """Detects embedding drift by re-embedding sampled documents and comparing.

    Two types of drift:
    1. Model drift: The embedding model was updated (new version behind the API)
    2. Data drift: The document distribution changed (new product categories)

    We detect both by tracking cosine distance between stored and fresh embeddings.
    """

    def __init__(
        self,
        opensearch_client,
        bedrock_client,
        index_name: str = "mangaassist-products",
        embedding_model_id: str = "amazon.titan-embed-text-v2:0",
        sample_size: int = 200,
        drift_threshold: float = 0.15,
    ):
        self.os_client = opensearch_client
        self.bedrock_client = bedrock_client
        self.index_name = index_name
        self.embedding_model_id = embedding_model_id
        self.sample_size = sample_size
        self.drift_threshold = drift_threshold

    def run_drift_check(self) -> DriftReport:
        """Run a full drift monitoring cycle."""
        run_id = f"drift-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}"

        # Step 1: Sample documents from OpenSearch
        samples = self._sample_documents()
        if not samples:
            logger.warning("No documents sampled — index may be empty")
            return DriftReport(
                run_id=run_id,
                timestamp=datetime.utcnow().isoformat(),
                sample_size=0,
            )

        # Step 2: Re-embed each document
        for sample in samples:
            sample.fresh_embedding = self._embed_text(sample.doc_text)
            if sample.fresh_embedding and sample.stored_embedding:
                sample.cosine_distance = self._cosine_distance(
                    sample.stored_embedding, sample.fresh_embedding,
                )

        # Step 3: Compute statistics
        distances = [s.cosine_distance for s in samples if s.cosine_distance is not None]
        distances.sort()

        report = DriftReport(
            run_id=run_id,
            timestamp=datetime.utcnow().isoformat(),
            sample_size=len(samples),
            drift_threshold=self.drift_threshold,
        )

        if distances:
            report.mean_drift = sum(distances) / len(distances)
            report.p50_drift = distances[len(distances) // 2]
            report.p95_drift = distances[int(len(distances) * 0.95)]
            report.max_drift = distances[-1]
            report.drifted_doc_ids = [
                s.doc_id for s in samples
                if s.cosine_distance is not None and s.cosine_distance > self.drift_threshold
            ]
            report.alert = report.p95_drift > self.drift_threshold

        logger.info(report.summary())
        return report

    def _sample_documents(self) -> list:
        """Random sample from OpenSearch index."""
        body = {
            "size": self.sample_size,
            "query": {
                "function_score": {
                    "query": {"match_all": {}},
                    "random_score": {"seed": int(time.time())},
                }
            },
            "_source": ["text", "embedding", "doc_id"],
        }

        response = self.os_client.search(index=self.index_name, body=body)

        samples = []
        for hit in response["hits"]["hits"]:
            source = hit["_source"]
            samples.append(DriftSample(
                doc_id=source.get("doc_id", hit["_id"]),
                doc_text=source.get("text", ""),
                stored_embedding=source.get("embedding", []),
            ))
        return samples

    def _embed_text(self, text: str) -> list:
        """Get fresh embedding from Bedrock."""
        try:
            body = json.dumps({"inputText": text[:2000]})  # Truncate to embedding model limit
            response = self.bedrock_client.invoke_model(
                modelId=self.embedding_model_id,
                contentType="application/json",
                accept="application/json",
                body=body,
            )
            result = json.loads(response["body"].read())
            return result.get("embedding", [])
        except Exception as e:
            logger.error("Embedding failed for text (len=%d): %s", len(text), e)
            return []

    @staticmethod
    def _cosine_distance(vec_a: list, vec_b: list) -> float:
        """Compute cosine distance (1 - cosine_similarity). 0 = identical, 2 = opposite."""
        if len(vec_a) != len(vec_b):
            return 1.0  # Flag dimension mismatch as maximum anomaly

        dot = sum(a * b for a, b in zip(vec_a, vec_b))
        norm_a = math.sqrt(sum(a * a for a in vec_a))
        norm_b = math.sqrt(sum(b * b for b in vec_b))

        if norm_a == 0 or norm_b == 0:
            return 1.0  # Zero vector — encoding failure

        similarity = dot / (norm_a * norm_b)
        return 1.0 - similarity

2. Retrieval Quality Diagnostics

Measures how well the retrieval pipeline performs: are we getting the right documents, in the right order, with the right relevance scores?

import json
import time
import logging
from dataclasses import dataclass, field
from typing import Optional

logger = logging.getLogger("mangaassist.retrieval")


@dataclass
class RetrievalResult:
    """A single document returned by the retrieval pipeline."""
    doc_id: str
    score: float
    text_preview: str
    metadata: dict = field(default_factory=dict)


@dataclass
class RetrievalEvalCase:
    """Ground truth for evaluating retrieval quality."""
    query: str
    intent: str
    relevant_doc_ids: list  # Ordered by relevance (most relevant first)
    irrelevant_doc_ids: list = field(default_factory=list)  # Known irrelevant docs


@dataclass
class RetrievalMetrics:
    """Standard IR metrics for one query."""
    query: str
    precision_at_k: float = 0.0
    recall_at_k: float = 0.0
    mrr: float = 0.0                  # Mean Reciprocal Rank
    ndcg_at_k: float = 0.0            # Normalized Discounted Cumulative Gain
    first_relevant_rank: int = -1     # Rank of first relevant result (-1 = not found)
    retrieved_count: int = 0
    relevant_in_retrieved: int = 0


class RetrievalDiagnostics:
    """Evaluate and diagnose retrieval pipeline quality.

    Key metrics:
    - MRR (Mean Reciprocal Rank): How high is the first relevant result? > 0.7 is good.
    - Precision@K: What fraction of top-K results are relevant? > 0.6 is good.
    - NDCG@K: Are relevant results ordered correctly? > 0.7 is good.
    - Recall@K: What fraction of all relevant docs appear in top-K? > 0.5 is acceptable.
    """

    def __init__(self, opensearch_client, index_name: str, k: int = 5):
        self.os_client = opensearch_client
        self.index_name = index_name
        self.k = k

    def evaluate_query(
        self,
        query_embedding: list,
        eval_case: RetrievalEvalCase,
    ) -> RetrievalMetrics:
        """Evaluate a single query against ground truth."""

        # Run k-NN search
        results = self._search(query_embedding)
        retrieved_ids = [r.doc_id for r in results[:self.k]]

        relevant_set = set(eval_case.relevant_doc_ids)
        relevant_in_top_k = [rid for rid in retrieved_ids if rid in relevant_set]

        metrics = RetrievalMetrics(
            query=eval_case.query,
            retrieved_count=len(retrieved_ids),
            relevant_in_retrieved=len(relevant_in_top_k),
        )

        # Precision@K
        metrics.precision_at_k = len(relevant_in_top_k) / self.k if self.k > 0 else 0

        # Recall@K
        metrics.recall_at_k = (
            len(relevant_in_top_k) / len(relevant_set) if relevant_set else 0
        )

        # MRR
        for rank, doc_id in enumerate(retrieved_ids, 1):
            if doc_id in relevant_set:
                metrics.mrr = 1.0 / rank
                metrics.first_relevant_rank = rank
                break

        # NDCG@K
        metrics.ndcg_at_k = self._compute_ndcg(retrieved_ids, eval_case.relevant_doc_ids)

        return metrics

    def evaluate_suite(self, query_embeddings: list, eval_cases: list) -> dict:
        """Evaluate a full suite of queries and produce an aggregate report."""
        all_metrics = []
        for embedding, case in zip(query_embeddings, eval_cases):
            m = self.evaluate_query(embedding, case)
            all_metrics.append(m)

        avg_mrr = sum(m.mrr for m in all_metrics) / len(all_metrics) if all_metrics else 0
        avg_precision = sum(m.precision_at_k for m in all_metrics) / len(all_metrics) if all_metrics else 0
        avg_ndcg = sum(m.ndcg_at_k for m in all_metrics) / len(all_metrics) if all_metrics else 0
        avg_recall = sum(m.recall_at_k for m in all_metrics) / len(all_metrics) if all_metrics else 0

        # Identify worst-performing queries (diagnostic value)
        worst_by_mrr = sorted(all_metrics, key=lambda m: m.mrr)[:5]

        return {
            "total_queries": len(all_metrics),
            "avg_mrr": round(avg_mrr, 4),
            "avg_precision_at_k": round(avg_precision, 4),
            "avg_ndcg_at_k": round(avg_ndcg, 4),
            "avg_recall_at_k": round(avg_recall, 4),
            "worst_queries": [
                {"query": m.query, "mrr": m.mrr, "first_relevant_rank": m.first_relevant_rank}
                for m in worst_by_mrr
            ],
        }

    def _search(self, query_embedding: list) -> list:
        """Execute k-NN search against OpenSearch."""
        body = {
            "size": self.k,
            "query": {
                "knn": {
                    "embedding": {
                        "vector": query_embedding,
                        "k": self.k,
                    }
                }
            },
        }
        response = self.os_client.search(index=self.index_name, body=body)

        results = []
        for hit in response["hits"]["hits"]:
            results.append(RetrievalResult(
                doc_id=hit["_id"],
                score=hit["_score"],
                text_preview=hit["_source"].get("text", "")[:200],
                metadata=hit["_source"].get("metadata", {}),
            ))
        return results

    @staticmethod
    def _compute_ndcg(retrieved_ids: list, ideal_order: list) -> float:
        """Compute NDCG@K for a ranked list against ideal ordering."""
        import math

        relevance_map = {doc_id: len(ideal_order) - rank for rank, doc_id in enumerate(ideal_order)}

        # DCG
        dcg = 0.0
        for rank, doc_id in enumerate(retrieved_ids, 1):
            rel = relevance_map.get(doc_id, 0)
            dcg += rel / math.log2(rank + 1)

        # Ideal DCG
        ideal_rels = sorted(relevance_map.values(), reverse=True)[:len(retrieved_ids)]
        idcg = sum(rel / math.log2(rank + 1) for rank, rel in enumerate(ideal_rels, 1))

        return dcg / idcg if idcg > 0 else 0.0

3. Chunk Quality Analyzer

Bad chunking is the most common root cause of retrieval failures. Chunks too large dilute relevance; too small lose context. This analyzer profiles the chunk distribution and flags problems.

import re
import logging
from dataclasses import dataclass, field
from typing import Optional
from collections import Counter

logger = logging.getLogger("mangaassist.retrieval")


@dataclass
class ChunkProfile:
    """Quality profile for a set of indexed chunks."""
    total_chunks: int = 0
    avg_size_chars: float = 0.0
    median_size_chars: float = 0.0
    min_size_chars: int = 0
    max_size_chars: int = 0
    std_dev_size: float = 0.0

    # Quality signals
    too_small_count: int = 0       # < 100 chars — too small to be useful
    too_large_count: int = 0       # > 2000 chars — dilutes relevance
    no_metadata_count: int = 0     # Missing category, source, or timestamp
    duplicate_count: int = 0       # Content-identical chunks
    empty_count: int = 0           # Empty or whitespace-only

    issues: list = field(default_factory=list)

    def health_score(self) -> float:
        """0.0 to 1.0 health score for the chunk collection."""
        if self.total_chunks == 0:
            return 0.0

        penalties = [
            self.too_small_count / self.total_chunks * 0.3,
            self.too_large_count / self.total_chunks * 0.3,
            self.no_metadata_count / self.total_chunks * 0.2,
            self.duplicate_count / self.total_chunks * 0.2,
        ]
        return max(0.0, 1.0 - sum(penalties))


class ChunkQualityAnalyzer:
    """Analyze and diagnose chunk quality in the vector index.

    Common chunk problems in MangaAssist:
    - Product descriptions vary wildly in length (50 chars for simple items, 5000 chars for manga series)
    - FAQ answers contain embedded JSON/HTML that inflates chunk size without adding semantic value
    - Metadata fields (category, ASIN, publish date) are inconsistently populated
    - Duplicate chunks from overlapping ingestion runs
    """

    MIN_CHUNK_SIZE = 100
    MAX_CHUNK_SIZE = 2000
    REQUIRED_METADATA_FIELDS = ["category", "source", "last_updated"]

    def analyze_index(self, opensearch_client, index_name: str, sample_size: int = 1000) -> ChunkProfile:
        """Analyze chunk quality from an OpenSearch index sample."""

        body = {
            "size": sample_size,
            "query": {"match_all": {}},
            "_source": ["text", "metadata", "doc_id"],
        }
        response = opensearch_client.search(index=index_name, body=body)

        chunks = []
        for hit in response["hits"]["hits"]:
            chunks.append({
                "doc_id": hit["_id"],
                "text": hit["_source"].get("text", ""),
                "metadata": hit["_source"].get("metadata", {}),
            })

        return self._profile_chunks(chunks)

    def _profile_chunks(self, chunks: list) -> ChunkProfile:
        profile = ChunkProfile(total_chunks=len(chunks))

        if not chunks:
            return profile

        sizes = []
        text_hashes = Counter()

        for chunk in chunks:
            text = chunk.get("text", "")
            metadata = chunk.get("metadata", {})
            size = len(text)
            sizes.append(size)

            # Content dedup check
            text_hash = hash(text.strip())
            text_hashes[text_hash] += 1

            # Size checks
            if size < self.MIN_CHUNK_SIZE:
                profile.too_small_count += 1
            elif size > self.MAX_CHUNK_SIZE:
                profile.too_large_count += 1

            if not text.strip():
                profile.empty_count += 1

            # Metadata check
            missing = [f for f in self.REQUIRED_METADATA_FIELDS if f not in metadata or not metadata[f]]
            if missing:
                profile.no_metadata_count += 1

        # Duplicates
        profile.duplicate_count = sum(count - 1 for count in text_hashes.values() if count > 1)

        # Size statistics
        sizes.sort()
        profile.avg_size_chars = sum(sizes) / len(sizes)
        profile.median_size_chars = sizes[len(sizes) // 2]
        profile.min_size_chars = sizes[0]
        profile.max_size_chars = sizes[-1]

        mean = profile.avg_size_chars
        variance = sum((s - mean) ** 2 for s in sizes) / len(sizes)
        profile.std_dev_size = variance ** 0.5

        # Issue diagnosis
        if profile.too_small_count > len(chunks) * 0.1:
            profile.issues.append(
                f"{profile.too_small_count} chunks below {self.MIN_CHUNK_SIZE} chars — "
                "consider merging small chunks or increasing minimum chunk size"
            )
        if profile.too_large_count > len(chunks) * 0.1:
            profile.issues.append(
                f"{profile.too_large_count} chunks above {self.MAX_CHUNK_SIZE} chars — "
                "consider splitting with semantic boundaries"
            )
        if profile.duplicate_count > 0:
            profile.issues.append(
                f"{profile.duplicate_count} duplicate chunks — "
                "check ingestion pipeline for idempotency"
            )
        if profile.no_metadata_count > len(chunks) * 0.2:
            profile.issues.append(
                f"{profile.no_metadata_count} chunks missing metadata — "
                "metadata filtering will be unreliable"
            )
        if profile.std_dev_size > profile.avg_size_chars * 0.8:
            profile.issues.append(
                f"High chunk size variance (σ={profile.std_dev_size:.0f}, μ={profile.avg_size_chars:.0f}) — "
                "consider content-type-aware chunking"
            )

        logger.info(
            "Chunk profile: total=%d, health=%.2f, issues=%d",
            profile.total_chunks, profile.health_score(), len(profile.issues),
        )
        return profile

4. Retrieval-Generation Alignment Checker

The final check: even if retrieval found the right documents, did the FM actually use them? This catches the "retrieved but ignored" failure mode where the model hallucinates instead of grounding on context.

import re
import logging
from dataclasses import dataclass, field

logger = logging.getLogger("mangaassist.retrieval")


@dataclass
class AlignmentResult:
    """How well the generated response aligns with retrieved context."""
    alignment_score: float = 0.0    # 0.0 to 1.0
    grounded_claims: int = 0        # Claims traceable to retrieved docs
    ungrounded_claims: int = 0      # Claims not in any retrieved doc
    context_utilization: float = 0.0  # Fraction of retrieved docs referenced
    cited_doc_ids: list = field(default_factory=list)
    details: str = ""


class AlignmentChecker:
    """Checks whether the FM response is grounded in retrieved context.

    Three failure modes:
    1. Context ignored: FM generates a plausible answer without using any retrieved docs
    2. Selective use: FM uses 1 of 5 retrieved docs, missing relevant information from others
    3. Hallucinated extension: FM starts grounded but then adds fabricated details
    """

    def check_alignment(
        self,
        response_text: str,
        retrieved_docs: list,  # list of {"doc_id": str, "text": str}
    ) -> AlignmentResult:
        """Check how well the response aligns with retrieved documents."""

        result = AlignmentResult()

        if not retrieved_docs or not response_text:
            result.details = "No docs or no response to check"
            return result

        # Extract key entities from response
        response_entities = self._extract_entities(response_text)

        # Check each entity against retrieved docs
        doc_matches = {doc["doc_id"]: False for doc in retrieved_docs}
        grounded = 0
        ungrounded = 0

        for entity in response_entities:
            found_in_doc = False
            for doc in retrieved_docs:
                if entity.lower() in doc["text"].lower():
                    doc_matches[doc["doc_id"]] = True
                    found_in_doc = True
                    break

            if found_in_doc:
                grounded += 1
            else:
                ungrounded += 1

        result.grounded_claims = grounded
        result.ungrounded_claims = ungrounded
        result.cited_doc_ids = [doc_id for doc_id, matched in doc_matches.items() if matched]

        total = grounded + ungrounded
        result.alignment_score = grounded / total if total > 0 else 0.0
        result.context_utilization = len(result.cited_doc_ids) / len(retrieved_docs) if retrieved_docs else 0.0

        # Diagnostic summary
        if result.alignment_score < 0.5:
            result.details = (
                f"LOW alignment ({result.alignment_score:.2f}): "
                f"{ungrounded} claims not grounded in context. "
                "Likely hallucination or context formatting issue."
            )
        elif result.context_utilization < 0.4:
            result.details = (
                f"LOW utilization ({result.context_utilization:.2f}): "
                f"Only {len(result.cited_doc_ids)}/{len(retrieved_docs)} docs used. "
                "Check context ordering or relevance of lower-ranked docs."
            )
        else:
            result.details = f"Good alignment ({result.alignment_score:.2f}), utilization ({result.context_utilization:.2f})"

        return result

    def _extract_entities(self, text: str) -> list:
        """Extract key entities from response for grounding check.

        In MangaAssist, key entities are:
        - ASINs (B0XXXXXXXXX)
        - Product names / manga titles
        - Prices
        - Specific claims (dates, quantities, policies)
        """
        entities = []

        # ASINs
        entities.extend(re.findall(r'B0[A-Z0-9]{8}', text))

        # Prices
        entities.extend(re.findall(r'\$\d+\.?\d{0,2}', text))

        # Quoted terms (often product names or manga titles)
        entities.extend(re.findall(r'"([^"]{3,50})"', text))

        # Date references
        entities.extend(re.findall(r'\b\d{4}-\d{2}-\d{2}\b', text))

        # Volume/chapter numbers (manga-specific)
        entities.extend(re.findall(r'(?:Vol(?:ume)?\.?\s*|Chapter\s+)\d+', text, re.IGNORECASE))

        return list(set(entities))

5. MangaAssist Scenarios

Scenario A: Stale Manga Catalog Embeddings

Context: A new manga series ("Dandadan") launches and sells 50K units in the first week. Customers ask MangaAssist for recommendations for "action comedy manga like Dandadan" but the chatbot never mentions Dandadan itself.

Detection: Product team reports the gap. Retrieval diagnostics show Dandadan documents are not in the OpenSearch index because the nightly ingestion pipeline failed silently 3 days ago.

Root Cause: The Lambda ingestion function hit a 15-minute timeout processing a large batch that included 2,000 newly listed manga items plus Dandadan.

Resolution: 1. Fix: Add batch size limits (200 items per invocation) with SQS-based queue for overflow 2. Re-run ingestion for the missing batch 3. Add CloudWatch alarm on ingestion Lambda errors and timeout rate

Prevention: Add "index freshness" check — compare OpenSearch document count against DynamoDB product catalog count nightly, alert on > 1% gap.

Scenario B: Genre Clustering Issues After Embedding Model Update

Context: The embedding model is upgraded from Titan Embed v1 to v2. After reindexing, users report that searching for "horror manga" returns romance titles.

Detection: Embedding drift monitor shows mean drift of 0.32 (threshold 0.15). Retrieval diagnostics show MRR dropped from 0.78 to 0.45 for genre-specific queries.

Root Cause: Titan Embed v2 produces 1024-dim vectors (v1 was 1536-dim). The OpenSearch index was recreated with the new dimension but the similarity metric was not recalibrated. The HNSW index parameters (m and ef_construction) optimized for v1 are suboptimal for v2's embedding space.

Resolution: 1. Reindex with updated HNSW parameters (ef_construction: 512, m: 32 for v2) 2. Run retrieval evaluation suite — MRR recovers to 0.74 3. Fine-tune similarity threshold from 0.7 to 0.65 (v2 embeddings have different score distribution)

Scenario C: New Series Not Retrievable Due to Sparse Descriptions

Context: Newly listed manga have only a title and single-sentence description. Retrieval returns them with very low relevance scores because the sparse text produces low-information embeddings.

Detection: Chunk quality analyzer shows 35% of new product chunks are below 100 characters. First-relevant-rank for new series is 8+ (below the top-5 cutoff).

Resolution: 1. Enrich sparse product listings by appending metadata (genre, author, publisher, series tags) to the chunk text before embedding 2. For products with < 200 chars, concatenate with the category description to add semantic context 3. Add a "minimum enriched text length" check in the ingestion pipeline — products below threshold get queued for manual enrichment


6. CloudWatch Dashboard and Alerts

Metric Alarm Threshold Action
EmbeddingDriftP95 > 0.15 Alert: investigate model or data shift
IndexFreshnessGap > 1% doc count delta Warn: ingestion pipeline may be failing
RetrievalMRR < 0.6 (daily eval) Alert: retrieval quality degraded
ChunkHealthScore < 0.7 Warn: chunk quality issues in ingestion
AlignmentScore < 0.6 (sampled production) Alert: FM may be ignoring retrieved context
SearchLatencyP95 > 100ms Warn: index or query optimization needed

CloudWatch Logs Insights Queries

Embedding drift trend over time:

fields @timestamp, mean_drift, p95_drift, drifted_doc_count, alert
| filter log_type = "drift_report"
| sort @timestamp desc
| limit 30

Retrieval quality by intent:

fields @timestamp, intent, mrr, precision_at_k, ndcg_at_k
| filter log_type = "retrieval_eval"
| stats avg(mrr) as avg_mrr, avg(precision_at_k) as avg_precision by intent
| sort avg_mrr asc

Chunk quality issues from ingestion:

fields @timestamp, total_chunks, health_score, issues
| filter log_type = "chunk_profile"
| sort @timestamp desc
| limit 10


Intuition Gained

What Mental Model You Build

Retrieval troubleshooting teaches you to reason about the vector space your data lives in — not as an abstract mathematical concept, but as a tangible space where documents cluster, drift, and sometimes hide.

1. The Freshness vs. Quality Tradeoff Instinct: You learn that a retrieval system has two independent failure axes: the data can be fresh but poorly embedded, or well-embedded but stale. These failures look identical to the end user ("the chatbot doesn't know about X") but have completely different fixes. You develop the instinct to check freshness first (it is cheaper to diagnose and fix), then quality.

2. The Embedding Space Mental Model: You start visualizing your data as clusters in high-dimensional space. When retrieval fails, you ask: "Is this document in the right neighborhood?" If a horror manga is clustered with romance, it is an embedding problem. If it is in the right cluster but ranked low, it is a search configuration problem. If it is not in the index at all, it is an ingestion problem.

3. The End-to-End Pipeline Instinct: You learn to trace failures through the full RAG pipeline: data ingestion → chunking → embedding → indexing → query embedding → search → re-ranking → context assembly → FM invocation → response validation. Each stage can fail independently, and the failure at each stage looks different. You stop blaming "the model" for failures that originated three stages earlier.

How This Intuition Guides Future Decisions

  • When selecting an embedding model: You evaluate not just benchmark scores but operational characteristics — dimension size (cost at scale), drift behavior across versions, language coverage, and speed. You run your retrieval evaluation suite against the candidate model before committing.
  • When designing a new knowledge base: You think about chunking strategy on day one. You choose chunk sizes based on the content type (small for FAQs, larger for product narratives), set up metadata schemas upfront, and build the freshness monitoring before it is needed.
  • When a user reports "the chatbot gave a wrong answer": You do not start with the prompt. You check retrieval first: what documents were retrieved? Were they relevant? Were they stale? Only after confirming retrieval is correct do you look at the generation layer.
  • When scaling to a new domain: You know that retrieval quality in one domain (e.g., manga) does not guarantee quality in another (e.g., electronics). Each domain has different vocabulary, different document density, and different query patterns. You plan a domain-specific evaluation suite before launch.