LOCAL PREVIEW View on GitHub

CD-04: RAG Knowledge Base Pipeline

User Story

As a Data Engineer on the MangaAssist AI Chatbot team, I want to establish an automated pipeline for updating the RAG knowledge base (OpenSearch vector indexes, embeddings, chunked documents) whenever source data changes, So that the chatbot always answers based on up-to-date product catalogs, policies, and FAQs — without requiring application code deployments or manual re-indexing.


Acceptance Criteria

  • Document changes in S3 source bucket automatically trigger the re-indexing pipeline
  • Chunking strategy produces consistent, overlapping chunks with metadata preservation
  • Embeddings are generated via Amazon Titan Embeddings V2 (primary) with fallback to fine-tuned e5-large
  • Blue/green index swap ensures zero-downtime during re-indexing
  • Validation queries confirm retrieval quality before index swap (recall@10 >= 95% on golden set)
  • Incremental updates process only changed documents (< 5 min for single document change)
  • Full re-index completes within 2 hours for the entire knowledge base (~50K documents)
  • Stale indexes are cleaned up automatically after 24-hour retention
  • Embedding model version changes trigger mandatory full re-index
  • Pipeline publishes metrics: documents processed, embedding latency, index size, retrieval recall

High-Level Design

Pipeline Architecture

flowchart TD
    subgraph "Data Sources"
        A[Product Catalog — S3] 
        B[Policy Documents — S3]
        C[FAQ Database — DynamoDB]
        D[Support Tickets — S3]
    end

    subgraph "Change Detection"
        A --> E[S3 Event Notification]
        B --> E
        C --> F[DynamoDB Streams]
        D --> E
        E --> G[SQS Queue]
        F --> G
    end

    subgraph "Processing Pipeline"
        G --> H{Incremental or Full?}
        H -->|Single doc change| I[Incremental Update]
        H -->|Embedding model change| J[Full Re-Index]
        H -->|Scheduled weekly| J

        I --> K[Chunk Changed Document]
        J --> L[Chunk All Documents]

        K --> M[Generate Embeddings]
        L --> M

        M --> N[Write to OpenSearch]
    end

    subgraph "Validation & Swap"
        N --> O[Run Golden Set Queries]
        O --> P{Recall >= 95%?}
        P -->|Yes| Q[Swap Index Alias]
        P -->|No| R[Alert + Keep Old Index]
        Q --> S[Cleanup Stale Index after 24h]
    end

    style Q fill:#1B660F,color:#fff
    style R fill:#DD344C,color:#fff

Knowledge Base Composition

Source Document Count Update Frequency Chunk Size Overlap
Product Catalog ~30,000 Daily (price/stock changes) 512 tokens 64 tokens
Policy Documents ~500 Monthly 1024 tokens 128 tokens
FAQ Database ~5,000 Weekly 256 tokens 32 tokens
Support Ticket Summaries ~15,000 Daily (new resolutions) 512 tokens 64 tokens
Total ~50,000

Low-Level Design

1. Change Detection — Event-Driven Trigger

# S3 Event → SQS → Lambda (change detector)
import boto3
import json

def lambda_handler(event, context):
    sqs = boto3.client('sqs')
    sfn = boto3.client('stepfunctions')

    changed_docs = []
    for record in event['Records']:
        body = json.loads(record['body'])
        for s3_event in body.get('Records', []):
            bucket = s3_event['s3']['bucket']['name']
            key = s3_event['s3']['object']['key']
            changed_docs.append({
                'bucket': bucket,
                'key': key,
                'event_type': s3_event['eventName'],  # ObjectCreated, ObjectRemoved
                'timestamp': s3_event['eventTime'],
            })

    if len(changed_docs) > 100:
        # Batch change — trigger full re-index
        mode = 'full'
    else:
        mode = 'incremental'

    sfn.start_execution(
        stateMachineArn='arn:aws:states:us-east-1:ACCOUNT:stateMachine:rag-index-pipeline',
        input=json.dumps({
            'mode': mode,
            'changed_documents': changed_docs,
            'trigger': 'event',
        }),
    )

2. Document Chunking Strategy

from dataclasses import dataclass
from typing import Iterator
import tiktoken

@dataclass
class Chunk:
    text: str
    metadata: dict
    chunk_index: int
    total_chunks: int
    token_count: int

class DocumentChunker:
    def __init__(self, chunk_size: int = 512, overlap: int = 64):
        self.chunk_size = chunk_size
        self.overlap = overlap
        self.tokenizer = tiktoken.encoding_for_model("cl100k_base")

    def chunk_document(self, text: str, metadata: dict) -> Iterator[Chunk]:
        """Chunk document with overlap, preserving sentence boundaries."""
        sentences = self._split_into_sentences(text)
        current_chunk_tokens = []
        current_chunk_text = []
        chunk_index = 0

        for sentence in sentences:
            sentence_tokens = self.tokenizer.encode(sentence)

            if len(current_chunk_tokens) + len(sentence_tokens) > self.chunk_size:
                # Yield current chunk
                yield Chunk(
                    text=' '.join(current_chunk_text),
                    metadata={
                        **metadata,
                        'chunk_index': chunk_index,
                        'source_doc': metadata.get('s3_key', ''),
                    },
                    chunk_index=chunk_index,
                    total_chunks=-1,  # Updated after processing
                    token_count=len(current_chunk_tokens),
                )
                chunk_index += 1

                # Keep overlap tokens from end of current chunk
                overlap_text = current_chunk_text[-2:]  # Last 2 sentences as overlap
                current_chunk_text = overlap_text
                current_chunk_tokens = self.tokenizer.encode(' '.join(overlap_text))

            current_chunk_text.append(sentence)
            current_chunk_tokens.extend(sentence_tokens)

        # Yield final chunk
        if current_chunk_text:
            yield Chunk(
                text=' '.join(current_chunk_text),
                metadata={**metadata, 'chunk_index': chunk_index},
                chunk_index=chunk_index,
                total_chunks=chunk_index + 1,
                token_count=len(current_chunk_tokens),
            )

    def _split_into_sentences(self, text: str) -> list[str]:
        """Simple sentence splitter preserving paragraph structure."""
        import re
        sentences = re.split(r'(?<=[.!?])\s+', text)
        return [s.strip() for s in sentences if s.strip()]

3. Embedding Generation — Dual Model Strategy

flowchart TD
    A[Chunked Document] --> B{Document Type?}
    B -->|Product Catalog| C[Amazon Titan Embeddings V2<br/>via Bedrock]
    B -->|Policy/FAQ| C
    B -->|Support Tickets| D[Fine-tuned e5-large<br/>via SageMaker]

    C --> E[1024-dim vector]
    D --> F[768-dim vector + projection to 1024]

    E --> G[Write to OpenSearch]
    F --> G
import boto3
import json

class EmbeddingGenerator:
    def __init__(self):
        self.bedrock = boto3.client('bedrock-runtime')
        self.sm_runtime = boto3.client('sagemaker-runtime')

    def generate_titan_embedding(self, text: str) -> list[float]:
        """Primary: Amazon Titan Embeddings V2 via Bedrock."""
        response = self.bedrock.invoke_model(
            modelId='amazon.titan-embed-text-v2:0',
            body=json.dumps({
                'inputText': text,
                'dimensions': 1024,
                'normalize': True,
            }),
        )
        return json.loads(response['body'].read())['embedding']

    def generate_e5_embedding(self, text: str) -> list[float]:
        """Specialist: Fine-tuned multilingual-e5-large via SageMaker."""
        response = self.sm_runtime.invoke_endpoint(
            EndpointName='mangaassist-e5-large-ft',
            Body=json.dumps({'inputs': f"passage: {text}"}),
            ContentType='application/json',
        )
        embedding_768 = json.loads(response['Body'].read())
        # Project 768 → 1024 via learned linear layer (stored in model)
        return embedding_768['embedding_1024']

    def generate_batch(self, chunks: list[dict], model: str = 'titan') -> list[dict]:
        """Batch embedding generation with rate limiting."""
        results = []
        generator = (
            self.generate_titan_embedding if model == 'titan'
            else self.generate_e5_embedding
        )
        for chunk in chunks:
            embedding = generator(chunk['text'])
            results.append({
                **chunk,
                'embedding': embedding,
            })
        return results

4. Blue/Green Index Swap

sequenceDiagram
    participant PIPE as Pipeline
    participant OS as OpenSearch Serverless
    participant APP as Chatbot App

    Note over OS: Current: index-v5 (alias "kb-live" → index-v5)

    PIPE->>OS: Create index-v6 (new schema if needed)
    PIPE->>OS: Bulk index all chunks to index-v6
    OS-->>PIPE: Indexing complete

    PIPE->>OS: Run validation queries on index-v6
    OS-->>PIPE: Recall@10 = 96.2% ✓

    PIPE->>OS: Update alias "kb-live" → index-v6
    Note over APP: App reads via alias — seamless swap
    APP->>OS: Query "kb-live" — now hits index-v6

    PIPE->>PIPE: Schedule cleanup: delete index-v5 in 24h
from opensearchpy import OpenSearch

class IndexSwapper:
    def __init__(self, os_client: OpenSearch):
        self.client = os_client
        self.alias = 'kb-live'

    def create_new_index(self, version: int) -> str:
        index_name = f"mangaassist-kb-v{version}"
        self.client.indices.create(
            index=index_name,
            body={
                'settings': {
                    'index.knn': True,
                    'index.knn.algo_param.ef_search': 512,
                    'number_of_shards': 2,
                    'number_of_replicas': 1,
                },
                'mappings': {
                    'properties': {
                        'embedding': {
                            'type': 'knn_vector',
                            'dimension': 1024,
                            'method': {
                                'name': 'hnsw',
                                'space_type': 'cosinesimil',
                                'engine': 'nmslib',
                                'parameters': {
                                    'ef_construction': 512,
                                    'm': 16,
                                },
                            },
                        },
                        'text': {'type': 'text', 'analyzer': 'standard'},
                        'metadata': {'type': 'object'},
                        'source_doc': {'type': 'keyword'},
                        'chunk_index': {'type': 'integer'},
                        'last_updated': {'type': 'date'},
                    },
                },
            },
        )
        return index_name

    def swap_alias(self, new_index: str):
        """Atomic alias swap — zero downtime."""
        # Find current index behind alias
        alias_info = self.client.indices.get_alias(name=self.alias)
        old_index = list(alias_info.keys())[0]

        # Atomic swap
        self.client.indices.update_aliases(body={
            'actions': [
                {'remove': {'index': old_index, 'alias': self.alias}},
                {'add': {'index': new_index, 'alias': self.alias}},
            ]
        })
        return old_index  # Caller schedules deletion

5. Validation — Golden Set Queries

GOLDEN_SET = [
    {
        'query': 'How do I return a manga book?',
        'expected_docs': ['return-policy-v3.md', 'faq-returns.md'],
        'min_recall_at_10': 1.0,  # Both must appear in top 10
    },
    {
        'query': 'One Piece latest volume price',
        'expected_docs': ['catalog/one-piece-vol-108.json'],
        'min_recall_at_10': 1.0,
    },
    {
        'query': 'shipping time to California',
        'expected_docs': ['shipping-policy.md', 'faq-shipping.md'],
        'min_recall_at_10': 0.5,  # At least 1 of 2
    },
    # ... 50+ golden queries covering all document types
]

def validate_index(os_client, index_name: str) -> dict:
    """Run golden set queries against new index before swap."""
    embedding_gen = EmbeddingGenerator()
    total_recall = 0
    passed = 0

    for test in GOLDEN_SET:
        query_embedding = embedding_gen.generate_titan_embedding(test['query'])

        response = os_client.search(
            index=index_name,
            body={
                'size': 10,
                'query': {
                    'knn': {
                        'embedding': {
                            'vector': query_embedding,
                            'k': 10,
                        },
                    },
                },
            },
        )

        retrieved_docs = [
            hit['_source']['source_doc']
            for hit in response['hits']['hits']
        ]

        hits = sum(1 for doc in test['expected_docs'] if doc in retrieved_docs)
        recall = hits / len(test['expected_docs'])
        total_recall += recall

        if recall >= test['min_recall_at_10']:
            passed += 1

    avg_recall = total_recall / len(GOLDEN_SET)
    pass_rate = passed / len(GOLDEN_SET)

    return {
        'avg_recall_at_10': avg_recall,
        'pass_rate': pass_rate,
        'passed': pass_rate >= 0.95,  # 95% of golden queries must pass
        'total_queries': len(GOLDEN_SET),
    }

Critical Decisions

Decision 1: Index Update Strategy — In-Place Update vs Blue/Green Index Swap

Criteria (Weight) In-Place Update Blue/Green Index Swap
Zero Downtime (25%) 6/10 — reads may return partial results during update 10/10 — alias swap is atomic
Data Consistency (25%) 5/10 — mixed old/new during update window 10/10 — new index is fully built before swap
Cost (15%) 9/10 — single index 6/10 — 2x storage during build
Rollback Speed (15%) 3/10 — must rebuild old index 10/10 — swap alias back instantly
Operational Complexity (10%) 8/10 — simple upsert API 6/10 — alias management, cleanup job
Build Time Impact (10%) 8/10 — update only changed docs 5/10 — full rebuild required
Weighted Score 6.3/10 8.4/10

Decision: Blue/Green Index Swap

Rationale: The chatbot serves customer queries in real-time. An in-place update that returns stale or partial results during the update window is unacceptable — imagine a customer asking about a return policy and getting the old policy because the update hasn't propagated yet. Blue/green ensures every query hits either the fully old or fully new index, never a hybrid. The 2x storage cost (~$50/month for OpenSearch Serverless) is trivial compared to the consistency guarantee.


Decision 2: Re-Embedding Strategy — Batch Full Re-Index vs Incremental Updates

Criteria Batch Full Re-Index Incremental Updates Hybrid (Current)
Consistency Perfect — all embeddings from same model version Risk of mixed model versions Perfect for full, acceptable for incremental
Speed Slow (2 hours for 50K docs) Fast (< 5 min per doc) Best of both
Cost ~$35 per full run (Titan API calls) ~$0.01 per document Full: $35/week, Incremental: ~$5/day
When to Use Embedding model update, schema change Single doc add/update/delete Incremental for daily changes, full weekly

Decision: Hybrid — Incremental daily + Full weekly + Full on model change

flowchart TD
    A[Change Detected] --> B{What changed?}
    B -->|"1-100 documents"| C[Incremental Update]
    B -->|"100+ documents"| D[Full Re-Index]
    B -->|"Embedding model updated"| D
    B -->|"Scheduled weekly"| D

    C --> E[Chunk changed docs only]
    E --> F[Generate embeddings for chunks]
    F --> G[Upsert into LIVE index]
    G --> H[Validate affected queries]

    D --> I[Chunk ALL documents]
    I --> J[Generate ALL embeddings]
    J --> K[Build new index]
    K --> L[Blue/Green swap]

Decision 3: Embedding Model Versioning — Re-Embed Everything vs Maintain Compatibility

Scenario Strategy Impact
Titan V2 minor update (same API, same dimensions) Keep existing embeddings, re-embed on next weekly cycle Minimal — minor quality variation within index
Titan V2 → V3 major update (different embedding space) Mandatory full re-embed before production use Expensive ($35) but required — mixed spaces break similarity search
Fine-tuned e5 retrained on new data Mandatory full re-embed of e5-indexed docs only Only affects support ticket chunks (~15K docs)

Decision: Version tag all embeddings + mandatory full re-embed on major model changes

Each document chunk stores its embedding_model_version in metadata. The validation pipeline flags any mixed-version index as a warning. Major model changes (Titan V2→V3, e5 retraining) trigger a blocking full re-index before the new model is usable.


Tradeoffs

The Debate: Index Freshness vs Compute Cost

graph TD
    subgraph "Product Manager"
        PM1["Customers see outdated prices"]
        PM2["New products must be searchable in < 1 hour"]
        PM3["Real-time indexing is table stakes"]
    end

    subgraph "Architect"
        AR1["Real-time indexing is expensive"]
        AR2["Embedding API costs scale with frequency"]
        AR3["Batch is 10x cheaper than real-time"]
    end

    subgraph "Data Engineer"
        DE1["Incremental is fragile — edge cases everywhere"]
        DE2["Full re-index is simple and reliable"]
        DE3["Weekly full + daily incremental is the sweet spot"]
    end

    PM2 ---|"Tension"| AR3
    PM3 ---|"Tension"| DE1
    AR1 ---|"Agrees with"| DE3

Resolution

Concern Solution Compromise
PM: New products searchable in < 1 hour Incremental pipeline triggers on S3 event, processes in < 5 min Only new/changed docs — doesn't improve existing retrieval quality
PM: Prices must be current Price data stored as metadata (not embedded), updated independently Embedding stays the same — only metadata refresh
Architect: Control embedding costs Full re-index weekly ($35), incremental daily (~$5/day) = ~$70/week total Not real-time — batch aggregates changes every 15 min for incremental
Data Engineer: Keep it simple Blue/green for full re-index (reliable), upsert for incremental (fast) Incremental path has more edge cases (deleted docs, moved docs)

Key Insight: Embeddings don't change when prices change. The embedding captures the semantic meaning ("One Piece Volume 108, manga, adventure, pirates"). The price is metadata stored alongside the embedding, updateable without re-embedding. This insight saves ~90% of re-indexing costs — most catalog changes are price/stock updates, not content changes.