HLD Deep Dive: RAG Pipeline & LLM Response Generation
Questions covered: Q8, Q9, Q18, Q22, Q24, Q25
Interviewer level: Senior Engineer → Principal Engineer
Q8. What is RAG and why is it used?
Short Answer
Retrieval-Augmented Generation: retrieve relevant documents → augment LLM prompt → generate grounded response. Reduces hallucination and keeps responses accurate to real data.
Deep Dive
The problem RAG solves:
Without RAG:
User: "What is the return policy for manga volumes?"
LLM: "You can return manga within 30 days for a full refund."
← WRONG. LLM is guessing based on training data.
Amazon's actual policy may say 15 days, or require unopened product.
With RAG:
1. Retrieve: Find the actual return policy document from knowledge base.
2. Augment: Add it to the prompt: "Context: [actual_policy_text]"
3. Generate: LLM reads the actual policy and summarizes it accurately.
LLM: "Based on Amazon's manga return policy, physical manga volumes can be
returned within 15 days of delivery, unopened."
← CORRECT and grounded.
RAG Architecture in MangaAssist:
┌─────────────────────────────────────────────────────────────────┐
│ OFFLINE (BUILD TIME) │
│ │
│ [Source Documents] │
│ FAQ docs, policies, product descriptions, help articles │
│ │ │
│ ▼ │
│ [Chunker] → splits docs into 300-500 token chunks with overlap│
│ │ │
│ ▼ │
│ [Embedder] → Bedrock Titan Embeddings → 1536-dim vectors │
│ │ │
│ ▼ │
│ [OpenSearch Serverless] → indexed vector store │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ ONLINE (QUERY TIME) │
│ │
│ User Query │
│ │ │
│ ▼ │
│ [Query Embedder] → same Titan Embeddings model → query vector │
│ │ │
│ ▼ │
│ [Vector Search] → OpenSearch kNN → top-K candidates │
│ │ │
│ ▼ │
│ [Reranker] → Cross-encoder reranker → top-N final chunks │
│ │ │
│ ▼ │
│ [Prompt Augmentation] → inject chunks into LLM context │
│ │ │
│ ▼ │
│ [Bedrock LLM] → generates response grounded in retrieved docs │
└─────────────────────────────────────────────────────────────────┘
What goes into the knowledge base: - Store policies (returns, shipping, pricing) - Manga series FAQs ("How many volumes in One Piece?") - Product descriptions for popular series - Publishing schedule / new releases - Payment and checkout help articles
Q9. What does Amazon Bedrock provide?
Short Answer
Managed LLM inference (Claude 3.5 Sonnet) + Bedrock Guardrails for content moderation. No model hosting overhead.
Deep Dive
Bedrock's role in the stack:
What Bedrock handles:
✅ Model hosting and serving infrastructure
✅ Auto-scaling inference capacity
✅ High availability (multiple AZs)
✅ API abstraction (swap models without code changes)
✅ Integrated guardrails (content filtering)
✅ Logging to CloudWatch
✅ IAM-based access control
What you still own:
❌ Prompt engineering
❌ RAG pipeline
❌ Response post-processing
❌ Cost optimization (choosing right model, caching)
❌ Evaluation
Model options in Bedrock:
| Model | Strengths | Cost | Latency | Use Case |
|---|---|---|---|---|
| Claude 3.5 Sonnet | Best balance of quality/speed | $$$ | ~1.5s | Recommendations, complex Q&A |
| Claude 3 Haiku | Fast, cheap, good quality | $ | ~0.5s | Simple FAQ, chitchat |
| Amazon Titan Text | AWS-native, privacy | $$ | ~1s | Structured responses |
| Llama 3 (via Bedrock) | Open weights | $ | ~1s | Cost-sensitive cases |
Model tiering strategy (cost optimization):
def select_model(intent: str, complexity_score: float) -> str:
if intent in ["chitchat", "simple_faq"] or complexity_score < 0.3:
return "anthropic.claude-3-haiku" # $0.25/1M input
elif intent in ["recommendation", "product_question"]:
return "anthropic.claude-3-5-sonnet" # $3/1M input
else:
return "anthropic.claude-3-haiku" # Default to cheap
Bedrock API call pattern:
import boto3
import json
bedrock = boto3.client("bedrock-runtime", region_name="ap-northeast-1")
async def generate_response(system_prompt: str, context: str, user_message: str) -> str:
response = bedrock.invoke_model(
modelId="anthropic.claude-3-5-sonnet-20241022-v2:0",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1024,
"system": system_prompt,
"messages": [
{
"role": "user",
"content": f"Context:\n{context}\n\nQuestion: {user_message}"
}
]
})
)
result = json.loads(response["body"].read())
return result["content"][0]["text"]
Streaming response (token-by-token):
async def generate_streaming(system_prompt: str, messages: list):
response = bedrock.invoke_model_with_response_stream(
modelId="anthropic.claude-3-5-sonnet-20241022-v2:0",
body=json.dumps({...})
)
for event in response["body"]:
chunk = json.loads(event["chunk"]["bytes"])
if chunk["type"] == "content_block_delta":
token = chunk["delta"]["text"]
yield token # Stream token to WebSocket connection
Q18. Why OpenSearch Serverless over Pinecone?
Short Answer
AWS-native, no external vendor dependency, supports hybrid retrieval (text + vector), integrates directly with Bedrock Knowledge Bases.
Deep Dive
Comparison:
| Criteria | OpenSearch Serverless | Pinecone | pgvector (RDS) |
|---|---|---|---|
| AWS integration | Native | External API | Native |
| Vendor dependency | None | Pinecone Inc. | None |
| Hybrid search | ✅ BM25 + KNN | ✅ (Hybrid) | Limited |
| Auto-scaling | ✅ | ✅ | Manual |
| Cost model | OCU-based (~$700+/mo) | Per-vector ($0.096/M) | RDS instance |
| Bedrock integration | ✅ Native KB | Via custom connector | Via connector |
| Operational overhead | Low | Very low | Medium |
| Max scale | Very high | Very high | Moderate |
Why "no external vendor dependency" matters at Amazon: - Pinecone is a third-party SaaS. If Pinecone has an outage, your chatbot's RAG pipeline fails. - AWS services have contractual SLAs managed by one vendor (Amazon). - Security teams prefer all data stays within the AWS account boundary. - No additional vendor security review required.
Hybrid retrieval (OpenSearch strength):
BM25 (keyword match):
"What is the return policy for manga?"
→ Matches documents containing "return policy" and "manga"
→ Good for exact-match queries
Vector (semantic match):
"Can I send back something I bought?"
→ Embedding similarity finds documents about returns even without keyword match
→ Good for paraphrased/natural language queries
Hybrid = both scored and combined:
Final score = α × BM25_score + (1-α) × vector_score
α is tuned empirically (often 0.5)
Q24. How do you ensure RAG returns relevant, fresh content?
Short Answer
Five-layer quality system: metadata filtering → chunk quality → reranking → freshness scoring → evaluation.
Deep Dive
Layer 1: Metadata Filtering (Pre-retrieval)
# Filter before vector search to narrow the search space
query_filter = {
"bool": {
"filter": [
{"term": {"source_type": "faq"}}, # Only FAQ documents
{"term": {"category": "returns"}}, # Only returns category
{"range": {"last_updated": {"gte": "2025-01-01"}}} # Fresh content only
]
}
}
Layer 2: Chunk Quality (Data ingestion)
Bad chunking kills RAG quality. Common mistakes:
❌ Bad: Chunk at 512 token boundary (splits mid-sentence)
"...you can return manga volumes if they are unopened. Dig"
"ital products cannot be returned once downloaded."
✅ Good: Chunk at sentence/paragraph boundaries with overlap
Chunk 1: "You can return manga volumes if they are unopened.
Digital products cannot be returned once downloaded.
Returns must be initiated within 15 days of delivery."
Chunk 2: "Digital products cannot be returned once downloaded.
Returns must be initiated within 15 days of delivery.
To start a return, visit your orders page..."
(50-token overlap ensures context isn't split across chunks)
Chunking strategy:
from langchain.text_splitter import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=400, # tokens per chunk
chunk_overlap=50, # tokens of overlap
separators=["\n\n", "\n", ". ", " ", ""] # Split on paragraphs first
)
Layer 3: Reranking (Post-retrieval)
Vector search returns top-K candidates (e.g., K=20). Reranking selects the best top-N (e.g., N=5) to pass to the LLM.
Vector Search → 20 candidate chunks
Reranker (cross-encoder) → scores each chunk against the query
→ returns top 5 most relevant
Why reranking?
Vector search uses approximate nearest neighbor - fast but imprecise.
Reranker is slower but more accurate (reads query + chunk together).
Two-stage approach: vector search for recall, reranker for precision.
Reranker model options:
- cross-encoder/ms-marco-MiniLM-L-6-v2 (fast, lightweight)
- Bedrock Rerank (AWS-native, no infra to manage)
- Cohere Rerank (high quality, external vendor)
Layer 4: Freshness Scoring
def compute_freshness_score(doc_last_updated: datetime) -> float:
days_old = (datetime.now() - doc_last_updated).days
if days_old <= 7:
return 1.0 # Very fresh
elif days_old <= 30:
return 0.8
elif days_old <= 90:
return 0.5
else:
return 0.2 # Stale — deprioritize
# Combine with vector similarity
final_score = 0.8 * vector_score + 0.2 * freshness_score
Re-indexing pipeline: - Product descriptions: re-indexed nightly. - Policies: re-indexed within 1 hour of change (event-driven via S3 change notification). - FAQs: re-indexed weekly + manual trigger for urgent changes.
Layer 5: Evaluation
Golden test set: 500 manually labeled query-answer pairs
Query: "What is the return window for manga?"
Expected retrieved chunks: [return_policy.md chunk 3, returns_faq.md chunk 7]
Metrics:
Retrieval Precision@5: Of the top-5 retrieved chunks,
how many are actually relevant? Target: >80%
Retrieval Recall@5: Of all relevant chunks,
how many are in the top-5? Target: >70%
MRR (Mean Reciprocal Rank): Is the best relevant chunk in position 1, 2, 3?
Higher = better. Target: >0.85
Run this evaluation on every RAG pipeline change before deploying.
Q25. Model flexibility — how to support multiple LLMs?
Short Answer
Abstract behind an interface. Use configurable model IDs. Route by intent for A/B testing. Feature flags for rollout.
Deep Dive
Adapter pattern:
from abc import ABC, abstractmethod
class LLMAdapter(ABC):
@abstractmethod
async def generate(self, system_prompt: str, messages: list, **kwargs) -> str:
pass
class BedrockClaudeAdapter(LLMAdapter):
def __init__(self, model_id: str):
self.model_id = model_id # Configurable!
async def generate(self, system_prompt: str, messages: list, **kwargs) -> str:
# Bedrock-specific implementation
response = bedrock.invoke_model(
modelId=self.model_id,
body=json.dumps({"system": system_prompt, "messages": messages, **kwargs})
)
return parse_bedrock_response(response)
class OpenAIAdapter(LLMAdapter):
async def generate(self, system_prompt: str, messages: list, **kwargs) -> str:
# OpenAI-specific implementation (hypothetical future migration)
...
# Orchestrator doesn't care which model it's using
class ResponseGenerator:
def __init__(self, adapter: LLMAdapter):
self.adapter = adapter # Injected at runtime
async def generate(self, *args, **kwargs) -> str:
return await self.adapter.generate(*args, **kwargs)
Intent-based model routing:
MODEL_ROUTING = {
"recommendation": "anthropic.claude-3-5-sonnet", # Best quality
"product_question": "anthropic.claude-3-5-sonnet", # Needs accurate info
"faq": "anthropic.claude-3-haiku", # Fast & cheap
"chitchat": "anthropic.claude-3-haiku", # Minimal LLM use
"order_summary": "amazon.titan-text-lite", # Structured output
}
def get_model_for_intent(intent: str) -> LLMAdapter:
model_id = MODEL_ROUTING.get(intent, "anthropic.claude-3-haiku")
return BedrockClaudeAdapter(model_id=model_id)
A/B testing models:
class ABTestingAdapter(LLMAdapter):
def __init__(self, control: LLMAdapter, treatment: LLMAdapter,
treatment_pct: float = 0.10):
self.control = control
self.treatment = treatment
self.treatment_pct = treatment_pct
async def generate(self, system_prompt: str, messages: list, **kwargs) -> str:
use_treatment = random.random() < self.treatment_pct
adapter = self.treatment if use_treatment else self.control
response = await adapter.generate(system_prompt, messages, **kwargs)
# Log which model was used for later comparison
analytics.log_event("model_selection", {
"model": adapter.model_id,
"group": "treatment" if use_treatment else "control"
})
return response
Rollout via feature flags:
# Feature flag config (stored in AWS AppConfig for live updates without deploy)
{
"llm_model": {
"default": "anthropic.claude-3-5-sonnet-20240620",
"overrides": {
"employee_testing": "anthropic.claude-3-5-sonnet-20241022-v2:0", # New version
"beta_users": "anthropic.claude-3-5-sonnet-20241022-v2:0"
}
}
}
Q22. What happens when the LLM hallucinates a product?
Short Answer
ASIN Validation in the Guardrails pipeline cross-checks every product ASIN in the response against the live catalog. If an ASIN doesn't exist, it's removed.
Deep Dive
Types of LLM hallucination in an e-commerce context: 1. Invented product — LLM generates a fake ASIN or title that doesn't exist. 2. Wrong price — LLM states a price from training data that's now outdated. 3. Wrong availability — LLM says "in stock" for an out-of-stock item. 4. Wrong attributes — LLM says "contains volumes 1–12" when it only includes 1–6.
Hallucination prevention layers:
Layer 1: Prompt Grounding
System Prompt:
"You are MangaAssist. ONLY recommend products that appear in the
'Available Products' section below. Do NOT invent, guess, or recall
products from your training data. If you cannot find a suitable
recommendation in the provided list, say so."
Available Products:
[retrieved_product_list from catalog - injected at runtime]
Layer 2: ASIN Validation (Post-generation)
import re
async def validate_product_references(llm_response: str) -> str:
# Extract all ASIN-like patterns from response
asin_pattern = r'\b[A-Z0-9]{10}\b'
mentioned_asins = re.findall(asin_pattern, llm_response)
if not mentioned_asins:
return llm_response # No products mentioned, no validation needed
# Batch check against live catalog
valid_asins = await catalog.batch_check_exists(mentioned_asins)
invalid_asins = set(mentioned_asins) - set(valid_asins)
if invalid_asins:
# Log the hallucination for monitoring
logger.error(f"LLM hallucinated ASINs: {invalid_asins}")
cloudwatch.put_metric("HallucinatedASINs", len(invalid_asins))
# Remove the hallucinated product mentions
cleaned_response = remove_invalid_product_references(llm_response, invalid_asins)
# Append a safe note
return cleaned_response + "\n\n*Note: I've adjusted my recommendations to show only currently available products.*"
return llm_response
Layer 3: Price Verification
async def validate_prices(llm_response: str, product_context: dict) -> str:
# Find price mentions in response (e.g., "$49.99", "¥5,500")
price_mentions = extract_price_mentions(llm_response)
for mention in price_mentions:
product_id = mention.product_id
stated_price = mention.price
actual_price = product_context[product_id]["current_price"]
if abs(stated_price - actual_price) > 0.01: # Price mismatch
# Replace wrong price with correct price
llm_response = llm_response.replace(
f"${stated_price:.2f}",
f"${actual_price:.2f}"
)
return llm_response
Monitoring hallucination rates:
Target: ASIN hallucination rate < 0.1% of responses
Alert: If rate > 0.5%, investigate prompt or consider adding more product context