CD-04: RAG Knowledge Base Pipeline
User Story
As a Data Engineer on the MangaAssist AI Chatbot team, I want to establish an automated pipeline for updating the RAG knowledge base (OpenSearch vector indexes, embeddings, chunked documents) whenever source data changes, So that the chatbot always answers based on up-to-date product catalogs, policies, and FAQs — without requiring application code deployments or manual re-indexing.
Acceptance Criteria
- Document changes in S3 source bucket automatically trigger the re-indexing pipeline
- Chunking strategy produces consistent, overlapping chunks with metadata preservation
- Embeddings are generated via Amazon Titan Embeddings V2 (primary) with fallback to fine-tuned e5-large
- Blue/green index swap ensures zero-downtime during re-indexing
- Validation queries confirm retrieval quality before index swap (recall@10 >= 95% on golden set)
- Incremental updates process only changed documents (< 5 min for single document change)
- Full re-index completes within 2 hours for the entire knowledge base (~50K documents)
- Stale indexes are cleaned up automatically after 24-hour retention
- Embedding model version changes trigger mandatory full re-index
- Pipeline publishes metrics: documents processed, embedding latency, index size, retrieval recall
High-Level Design
Pipeline Architecture
flowchart TD
subgraph "Data Sources"
A[Product Catalog — S3]
B[Policy Documents — S3]
C[FAQ Database — DynamoDB]
D[Support Tickets — S3]
end
subgraph "Change Detection"
A --> E[S3 Event Notification]
B --> E
C --> F[DynamoDB Streams]
D --> E
E --> G[SQS Queue]
F --> G
end
subgraph "Processing Pipeline"
G --> H{Incremental or Full?}
H -->|Single doc change| I[Incremental Update]
H -->|Embedding model change| J[Full Re-Index]
H -->|Scheduled weekly| J
I --> K[Chunk Changed Document]
J --> L[Chunk All Documents]
K --> M[Generate Embeddings]
L --> M
M --> N[Write to OpenSearch]
end
subgraph "Validation & Swap"
N --> O[Run Golden Set Queries]
O --> P{Recall >= 95%?}
P -->|Yes| Q[Swap Index Alias]
P -->|No| R[Alert + Keep Old Index]
Q --> S[Cleanup Stale Index after 24h]
end
style Q fill:#1B660F,color:#fff
style R fill:#DD344C,color:#fff
Knowledge Base Composition
| Source | Document Count | Update Frequency | Chunk Size | Overlap |
|---|---|---|---|---|
| Product Catalog | ~30,000 | Daily (price/stock changes) | 512 tokens | 64 tokens |
| Policy Documents | ~500 | Monthly | 1024 tokens | 128 tokens |
| FAQ Database | ~5,000 | Weekly | 256 tokens | 32 tokens |
| Support Ticket Summaries | ~15,000 | Daily (new resolutions) | 512 tokens | 64 tokens |
| Total | ~50,000 |
Low-Level Design
1. Change Detection — Event-Driven Trigger
# S3 Event → SQS → Lambda (change detector)
import boto3
import json
def lambda_handler(event, context):
sqs = boto3.client('sqs')
sfn = boto3.client('stepfunctions')
changed_docs = []
for record in event['Records']:
body = json.loads(record['body'])
for s3_event in body.get('Records', []):
bucket = s3_event['s3']['bucket']['name']
key = s3_event['s3']['object']['key']
changed_docs.append({
'bucket': bucket,
'key': key,
'event_type': s3_event['eventName'], # ObjectCreated, ObjectRemoved
'timestamp': s3_event['eventTime'],
})
if len(changed_docs) > 100:
# Batch change — trigger full re-index
mode = 'full'
else:
mode = 'incremental'
sfn.start_execution(
stateMachineArn='arn:aws:states:us-east-1:ACCOUNT:stateMachine:rag-index-pipeline',
input=json.dumps({
'mode': mode,
'changed_documents': changed_docs,
'trigger': 'event',
}),
)
2. Document Chunking Strategy
from dataclasses import dataclass
from typing import Iterator
import tiktoken
@dataclass
class Chunk:
text: str
metadata: dict
chunk_index: int
total_chunks: int
token_count: int
class DocumentChunker:
def __init__(self, chunk_size: int = 512, overlap: int = 64):
self.chunk_size = chunk_size
self.overlap = overlap
self.tokenizer = tiktoken.encoding_for_model("cl100k_base")
def chunk_document(self, text: str, metadata: dict) -> Iterator[Chunk]:
"""Chunk document with overlap, preserving sentence boundaries."""
sentences = self._split_into_sentences(text)
current_chunk_tokens = []
current_chunk_text = []
chunk_index = 0
for sentence in sentences:
sentence_tokens = self.tokenizer.encode(sentence)
if len(current_chunk_tokens) + len(sentence_tokens) > self.chunk_size:
# Yield current chunk
yield Chunk(
text=' '.join(current_chunk_text),
metadata={
**metadata,
'chunk_index': chunk_index,
'source_doc': metadata.get('s3_key', ''),
},
chunk_index=chunk_index,
total_chunks=-1, # Updated after processing
token_count=len(current_chunk_tokens),
)
chunk_index += 1
# Keep overlap tokens from end of current chunk
overlap_text = current_chunk_text[-2:] # Last 2 sentences as overlap
current_chunk_text = overlap_text
current_chunk_tokens = self.tokenizer.encode(' '.join(overlap_text))
current_chunk_text.append(sentence)
current_chunk_tokens.extend(sentence_tokens)
# Yield final chunk
if current_chunk_text:
yield Chunk(
text=' '.join(current_chunk_text),
metadata={**metadata, 'chunk_index': chunk_index},
chunk_index=chunk_index,
total_chunks=chunk_index + 1,
token_count=len(current_chunk_tokens),
)
def _split_into_sentences(self, text: str) -> list[str]:
"""Simple sentence splitter preserving paragraph structure."""
import re
sentences = re.split(r'(?<=[.!?])\s+', text)
return [s.strip() for s in sentences if s.strip()]
3. Embedding Generation — Dual Model Strategy
flowchart TD
A[Chunked Document] --> B{Document Type?}
B -->|Product Catalog| C[Amazon Titan Embeddings V2<br/>via Bedrock]
B -->|Policy/FAQ| C
B -->|Support Tickets| D[Fine-tuned e5-large<br/>via SageMaker]
C --> E[1024-dim vector]
D --> F[768-dim vector + projection to 1024]
E --> G[Write to OpenSearch]
F --> G
import boto3
import json
class EmbeddingGenerator:
def __init__(self):
self.bedrock = boto3.client('bedrock-runtime')
self.sm_runtime = boto3.client('sagemaker-runtime')
def generate_titan_embedding(self, text: str) -> list[float]:
"""Primary: Amazon Titan Embeddings V2 via Bedrock."""
response = self.bedrock.invoke_model(
modelId='amazon.titan-embed-text-v2:0',
body=json.dumps({
'inputText': text,
'dimensions': 1024,
'normalize': True,
}),
)
return json.loads(response['body'].read())['embedding']
def generate_e5_embedding(self, text: str) -> list[float]:
"""Specialist: Fine-tuned multilingual-e5-large via SageMaker."""
response = self.sm_runtime.invoke_endpoint(
EndpointName='mangaassist-e5-large-ft',
Body=json.dumps({'inputs': f"passage: {text}"}),
ContentType='application/json',
)
embedding_768 = json.loads(response['Body'].read())
# Project 768 → 1024 via learned linear layer (stored in model)
return embedding_768['embedding_1024']
def generate_batch(self, chunks: list[dict], model: str = 'titan') -> list[dict]:
"""Batch embedding generation with rate limiting."""
results = []
generator = (
self.generate_titan_embedding if model == 'titan'
else self.generate_e5_embedding
)
for chunk in chunks:
embedding = generator(chunk['text'])
results.append({
**chunk,
'embedding': embedding,
})
return results
4. Blue/Green Index Swap
sequenceDiagram
participant PIPE as Pipeline
participant OS as OpenSearch Serverless
participant APP as Chatbot App
Note over OS: Current: index-v5 (alias "kb-live" → index-v5)
PIPE->>OS: Create index-v6 (new schema if needed)
PIPE->>OS: Bulk index all chunks to index-v6
OS-->>PIPE: Indexing complete
PIPE->>OS: Run validation queries on index-v6
OS-->>PIPE: Recall@10 = 96.2% ✓
PIPE->>OS: Update alias "kb-live" → index-v6
Note over APP: App reads via alias — seamless swap
APP->>OS: Query "kb-live" — now hits index-v6
PIPE->>PIPE: Schedule cleanup: delete index-v5 in 24h
from opensearchpy import OpenSearch
class IndexSwapper:
def __init__(self, os_client: OpenSearch):
self.client = os_client
self.alias = 'kb-live'
def create_new_index(self, version: int) -> str:
index_name = f"mangaassist-kb-v{version}"
self.client.indices.create(
index=index_name,
body={
'settings': {
'index.knn': True,
'index.knn.algo_param.ef_search': 512,
'number_of_shards': 2,
'number_of_replicas': 1,
},
'mappings': {
'properties': {
'embedding': {
'type': 'knn_vector',
'dimension': 1024,
'method': {
'name': 'hnsw',
'space_type': 'cosinesimil',
'engine': 'nmslib',
'parameters': {
'ef_construction': 512,
'm': 16,
},
},
},
'text': {'type': 'text', 'analyzer': 'standard'},
'metadata': {'type': 'object'},
'source_doc': {'type': 'keyword'},
'chunk_index': {'type': 'integer'},
'last_updated': {'type': 'date'},
},
},
},
)
return index_name
def swap_alias(self, new_index: str):
"""Atomic alias swap — zero downtime."""
# Find current index behind alias
alias_info = self.client.indices.get_alias(name=self.alias)
old_index = list(alias_info.keys())[0]
# Atomic swap
self.client.indices.update_aliases(body={
'actions': [
{'remove': {'index': old_index, 'alias': self.alias}},
{'add': {'index': new_index, 'alias': self.alias}},
]
})
return old_index # Caller schedules deletion
5. Validation — Golden Set Queries
GOLDEN_SET = [
{
'query': 'How do I return a manga book?',
'expected_docs': ['return-policy-v3.md', 'faq-returns.md'],
'min_recall_at_10': 1.0, # Both must appear in top 10
},
{
'query': 'One Piece latest volume price',
'expected_docs': ['catalog/one-piece-vol-108.json'],
'min_recall_at_10': 1.0,
},
{
'query': 'shipping time to California',
'expected_docs': ['shipping-policy.md', 'faq-shipping.md'],
'min_recall_at_10': 0.5, # At least 1 of 2
},
# ... 50+ golden queries covering all document types
]
def validate_index(os_client, index_name: str) -> dict:
"""Run golden set queries against new index before swap."""
embedding_gen = EmbeddingGenerator()
total_recall = 0
passed = 0
for test in GOLDEN_SET:
query_embedding = embedding_gen.generate_titan_embedding(test['query'])
response = os_client.search(
index=index_name,
body={
'size': 10,
'query': {
'knn': {
'embedding': {
'vector': query_embedding,
'k': 10,
},
},
},
},
)
retrieved_docs = [
hit['_source']['source_doc']
for hit in response['hits']['hits']
]
hits = sum(1 for doc in test['expected_docs'] if doc in retrieved_docs)
recall = hits / len(test['expected_docs'])
total_recall += recall
if recall >= test['min_recall_at_10']:
passed += 1
avg_recall = total_recall / len(GOLDEN_SET)
pass_rate = passed / len(GOLDEN_SET)
return {
'avg_recall_at_10': avg_recall,
'pass_rate': pass_rate,
'passed': pass_rate >= 0.95, # 95% of golden queries must pass
'total_queries': len(GOLDEN_SET),
}
Critical Decisions
Decision 1: Index Update Strategy — In-Place Update vs Blue/Green Index Swap
| Criteria (Weight) | In-Place Update | Blue/Green Index Swap |
|---|---|---|
| Zero Downtime (25%) | 6/10 — reads may return partial results during update | 10/10 — alias swap is atomic |
| Data Consistency (25%) | 5/10 — mixed old/new during update window | 10/10 — new index is fully built before swap |
| Cost (15%) | 9/10 — single index | 6/10 — 2x storage during build |
| Rollback Speed (15%) | 3/10 — must rebuild old index | 10/10 — swap alias back instantly |
| Operational Complexity (10%) | 8/10 — simple upsert API | 6/10 — alias management, cleanup job |
| Build Time Impact (10%) | 8/10 — update only changed docs | 5/10 — full rebuild required |
| Weighted Score | 6.3/10 | 8.4/10 |
Decision: Blue/Green Index Swap
Rationale: The chatbot serves customer queries in real-time. An in-place update that returns stale or partial results during the update window is unacceptable — imagine a customer asking about a return policy and getting the old policy because the update hasn't propagated yet. Blue/green ensures every query hits either the fully old or fully new index, never a hybrid. The 2x storage cost (~$50/month for OpenSearch Serverless) is trivial compared to the consistency guarantee.
Decision 2: Re-Embedding Strategy — Batch Full Re-Index vs Incremental Updates
| Criteria | Batch Full Re-Index | Incremental Updates | Hybrid (Current) |
|---|---|---|---|
| Consistency | Perfect — all embeddings from same model version | Risk of mixed model versions | Perfect for full, acceptable for incremental |
| Speed | Slow (2 hours for 50K docs) | Fast (< 5 min per doc) | Best of both |
| Cost | ~$35 per full run (Titan API calls) | ~$0.01 per document | Full: $35/week, Incremental: ~$5/day |
| When to Use | Embedding model update, schema change | Single doc add/update/delete | Incremental for daily changes, full weekly |
Decision: Hybrid — Incremental daily + Full weekly + Full on model change
flowchart TD
A[Change Detected] --> B{What changed?}
B -->|"1-100 documents"| C[Incremental Update]
B -->|"100+ documents"| D[Full Re-Index]
B -->|"Embedding model updated"| D
B -->|"Scheduled weekly"| D
C --> E[Chunk changed docs only]
E --> F[Generate embeddings for chunks]
F --> G[Upsert into LIVE index]
G --> H[Validate affected queries]
D --> I[Chunk ALL documents]
I --> J[Generate ALL embeddings]
J --> K[Build new index]
K --> L[Blue/Green swap]
Decision 3: Embedding Model Versioning — Re-Embed Everything vs Maintain Compatibility
| Scenario | Strategy | Impact |
|---|---|---|
| Titan V2 minor update (same API, same dimensions) | Keep existing embeddings, re-embed on next weekly cycle | Minimal — minor quality variation within index |
| Titan V2 → V3 major update (different embedding space) | Mandatory full re-embed before production use | Expensive ($35) but required — mixed spaces break similarity search |
| Fine-tuned e5 retrained on new data | Mandatory full re-embed of e5-indexed docs only | Only affects support ticket chunks (~15K docs) |
Decision: Version tag all embeddings + mandatory full re-embed on major model changes
Each document chunk stores its embedding_model_version in metadata. The validation pipeline flags any mixed-version index as a warning. Major model changes (Titan V2→V3, e5 retraining) trigger a blocking full re-index before the new model is usable.
Tradeoffs
The Debate: Index Freshness vs Compute Cost
graph TD
subgraph "Product Manager"
PM1["Customers see outdated prices"]
PM2["New products must be searchable in < 1 hour"]
PM3["Real-time indexing is table stakes"]
end
subgraph "Architect"
AR1["Real-time indexing is expensive"]
AR2["Embedding API costs scale with frequency"]
AR3["Batch is 10x cheaper than real-time"]
end
subgraph "Data Engineer"
DE1["Incremental is fragile — edge cases everywhere"]
DE2["Full re-index is simple and reliable"]
DE3["Weekly full + daily incremental is the sweet spot"]
end
PM2 ---|"Tension"| AR3
PM3 ---|"Tension"| DE1
AR1 ---|"Agrees with"| DE3
Resolution
| Concern | Solution | Compromise |
|---|---|---|
| PM: New products searchable in < 1 hour | Incremental pipeline triggers on S3 event, processes in < 5 min | Only new/changed docs — doesn't improve existing retrieval quality |
| PM: Prices must be current | Price data stored as metadata (not embedded), updated independently | Embedding stays the same — only metadata refresh |
| Architect: Control embedding costs | Full re-index weekly ($35), incremental daily (~$5/day) = ~$70/week total | Not real-time — batch aggregates changes every 15 min for incremental |
| Data Engineer: Keep it simple | Blue/green for full re-index (reliable), upsert for incremental (fast) | Incremental path has more edge cases (deleted docs, moved docs) |
Key Insight: Embeddings don't change when prices change. The embedding captures the semantic meaning ("One Piece Volume 108, manga, adventure, pirates"). The price is metadata stored alongside the embedding, updateable without re-embedding. This insight saves ~90% of re-indexing costs — most catalog changes are price/stock updates, not content changes.