LOCAL PREVIEW View on GitHub

Vector Store Operational Management Architecture

AWS AIP-C01 Task 4.3 → Skill 4.3.5: Create vector store operational management systems for FM augmentation
System: MangaAssist e-commerce chatbot — Bedrock Claude 3, OpenSearch Serverless (HNSW), DynamoDB, ECS Fargate, API Gateway WebSocket
Vector Store: Amazon OpenSearch Serverless with HNSW k-NN index (1536-dim Titan Embeddings)


Skill Mapping

AWS AIP-C01 Element Coverage
Task 4.3 Monitor and optimize inference performance of FMs
Skill 4.3.5 Create vector store operational management systems for FM augmentation
Key Focus Performance monitoring, index optimization, data quality, capacity management for vector stores powering RAG pipelines
MangaAssist Context OpenSearch Serverless stores ~2M product embeddings (manga, figures, merch) used for retrieval-augmented generation with Claude 3

Mind Map — Vector Store Operations

mindmap
  root((Vector Store Ops))
    Performance Monitoring
      Query Latency p50/p95/p99
      Throughput QPS
      Concurrent Connections
      Index Scan Efficiency
      Memory Pressure
      Cache Hit Rate
    Index Optimization
      HNSW Parameters
        ef_construction
        m (graph connectivity)
        ef_search
      Segment Compaction
      Auto-Reindex Triggers
      Benchmark-Driven Tuning
      Recall@K Tracking
    Data Quality
      Embedding Freshness
      Stale Content Detection
      Embedding Model Version Tracking
      Clustering Coherence
      Nearest-Neighbor Accuracy
      Drift Detection
    Capacity Management
      Storage Utilization
      OCU Scaling
      Cost per Query
      Growth Projection
      Dimension Optimization
    Operational Reliability
      Health Checks
      Failover Procedures
      Backup and Restore
      Replication Lag
      Disaster Recovery

Architecture — Vector Store Monitoring Framework

graph TB
    subgraph IngestionPipeline["Ingestion Pipeline"]
        style IngestionPipeline fill:#1a1a2e,stroke:#16213e,color:#fff
        DDB["DynamoDB<br/>Product Catalog"] -->|Change Streams| LAMBDA_ETL["Lambda ETL<br/>Extract & Transform"]
        LAMBDA_ETL -->|Text chunks| TITAN_EMB_I["Bedrock Titan<br/>Embeddings v2"]
        TITAN_EMB_I -->|1536-dim vectors| BULK_IDX["OpenSearch Serverless<br/>Bulk Indexing API"]
        BULK_IDX --> OSS_COLL["OpenSearch Serverless<br/>manga-products Collection"]
    end

    subgraph QueryPipeline["Query Pipeline"]
        style QueryPipeline fill:#0f3460,stroke:#16213e,color:#fff
        USER_Q["User Query<br/>via WebSocket"] --> ECS_ORCH["ECS Orchestrator<br/>Fargate Service"]
        ECS_ORCH --> TITAN_EMB_Q["Bedrock Titan<br/>Embedding"]
        TITAN_EMB_Q -->|Query vector| KNN_SEARCH["OpenSearch k-NN<br/>HNSW Search"]
        KNN_SEARCH --> RERANK["Results Reranking<br/>Score Normalization"]
        RERANK --> FM_CTX["Claude 3 Context<br/>Augmented Prompt"]
    end

    subgraph MonitoringLayer["Monitoring Layer"]
        style MonitoringLayer fill:#533483,stroke:#16213e,color:#fff
        CW_METRICS["CloudWatch Metrics<br/>Latency, Errors, OCU Usage"]
        CUSTOM_METRICS["Custom Metrics<br/>Embedding Freshness<br/>Quality Scores<br/>Recall@K"]
        HEALTH_CHK["Automated Health Checks<br/>Every 60 seconds"]
    end

    subgraph OptimizationLayer["Optimization Layer"]
        style OptimizationLayer fill:#e94560,stroke:#16213e,color:#fff
        IDX_OPT["Index Parameter<br/>Optimizer"]
        SEG_COMP["Segment Compaction<br/>Scheduler"]
        EMB_REFRESH["Embedding Refresh<br/>Pipeline"]
    end

    subgraph AlertingLayer["Alerting & Response"]
        style AlertingLayer fill:#0a1931,stroke:#16213e,color:#fff
        CW_ALARM["CloudWatch Alarms<br/>Threshold Breach"]
        SNS_TOPIC["SNS Topic<br/>Notifications"]
        AUTO_SCALE["Auto-Scale OCU<br/>Capacity Policy"]
        RUNBOOK["Runbook Automation<br/>Lambda Remediation"]
    end

    OSS_COLL --> CW_METRICS
    KNN_SEARCH --> CW_METRICS
    KNN_SEARCH --> CUSTOM_METRICS
    BULK_IDX --> CUSTOM_METRICS
    HEALTH_CHK --> CW_METRICS
    HEALTH_CHK --> CUSTOM_METRICS

    CW_METRICS --> IDX_OPT
    CUSTOM_METRICS --> IDX_OPT
    CUSTOM_METRICS --> SEG_COMP
    CUSTOM_METRICS --> EMB_REFRESH
    EMB_REFRESH --> LAMBDA_ETL

    CW_METRICS --> CW_ALARM
    CUSTOM_METRICS --> CW_ALARM
    CW_ALARM --> SNS_TOPIC
    CW_ALARM --> AUTO_SCALE
    CW_ALARM --> RUNBOOK

OpenSearch Serverless for Vector Search — Key Concepts

OCU (OpenSearch Compute Units)

OpenSearch Serverless abstracts infrastructure into OCUs — the fundamental unit of compute and memory:

Aspect Detail
What is an OCU A compute unit providing vCPU, memory, and storage I/O capacity
Minimum allocation 2 OCUs for indexing + 2 OCUs for search (always in pairs)
Scaling behavior Auto-scales in OCU increments based on workload; no manual provisioning
Cost model Billed per OCU-hour (~$0.24/OCU-hour); minimum cost = 4 OCUs × 24h
Monitoring CloudWatch SearchOCU and IndexingOCU metrics track utilization
MangaAssist impact Peak hours (8pm–12am JST) drive search OCU from 4→12; indexing steady at 2–4

Why OCU monitoring matters: Unlike provisioned OpenSearch where you manage instances, Serverless auto-scales but with cost implications. Unmonitored OCU spikes (e.g., poorly structured queries causing full scans) can 3–4x your daily cost without any visible errors.

HNSW Index Parameters

Hierarchical Navigable Small World (HNSW) is a graph-based approximate nearest neighbor algorithm:

  • ef_construction (build-time): Controls how many neighbors are evaluated when building the graph. Higher values → better recall but slower indexing. Set once at index creation; changing requires full re-index.
  • m (graph edges): Maximum number of connections per node in the graph. Higher values → better recall and search speed but significantly more memory. The sweet spot for 1536-dim vectors is typically 16–48.
  • ef_search (query-time): Controls the search beam width at query time. Can be adjusted per-query. Higher values → better recall but slower search. This is the primary runtime tuning knob.

Segment Merging

OpenSearch stores data in immutable segments. As documents are indexed, many small segments accumulate:

  • Problem: Each k-NN search must scan every segment; 50 small segments = 50× the search overhead vs 1 merged segment
  • Solution: Segment compaction (force merge) combines small segments into fewer large ones
  • Trade-off: Merging is I/O-intensive and temporarily increases OCU consumption
  • Schedule for MangaAssist: Run compaction during off-peak (4am–6am JST) when search OCU drops to minimum

Index Size, Dimensions, and Memory Relationship

Memory per vector ≈ (dimensions × 4 bytes) + (m × 2 × 4 bytes) + overhead

For MangaAssist: (1536 × 4) + (24 × 2 × 4) + 64 ≈ 6,400 bytes/vector

With 2M vectors: 2,000,000 × 6,400 ≈ 12.8 GB in-memory index

This directly drives OCU requirements since HNSW indices must be memory-resident for fast search.


Vector Store Comparison Matrix

Capability OpenSearch Serverless Pinecone pgvector ChromaDB Qdrant
Built-in metrics ✅ CloudWatch (latency, OCU, errors, 4xx/5xx) ✅ Dashboard (latency, request count) ❌ Manual via pg_stat ❌ None native ✅ Prometheus endpoint
Latency tracking ✅ p50/p95/p99 via CloudWatch ✅ p50/p95/p99 in console ⚠️ Via pg_stat_statements ❌ Application-level only ✅ Built-in histograms
Auto-scaling ✅ OCU auto-scale (2–200) ✅ Serverless auto-scale ❌ Manual (scale host) ❌ Single-node ⚠️ Manual cluster scaling
Index optimization ⚠️ Manual HNSW tuning, force merge ✅ Fully managed, automatic ❌ Manual VACUUM, reindex ⚠️ Basic compaction ✅ Auto-optimization
Embedding drift detection ❌ Custom implementation needed ❌ Custom implementation needed ❌ Custom implementation needed ❌ Custom implementation needed ❌ Custom implementation needed
Backup/restore ✅ Snapshot to S3, PITR ✅ Collections backup ✅ pg_dump / pg_restore ⚠️ File-based persist ✅ Snapshot/restore API
Cost tracking ✅ CloudWatch + Cost Explorer per OCU ✅ Dashboard shows read/write units ✅ Standard RDS/instance billing ✅ Free (self-hosted) ✅ Self-hosted or cloud billing
Alerting integration ✅ CloudWatch Alarms → SNS → Lambda ✅ Basic email alerts ⚠️ Via RDS Events ❌ None ✅ Prometheus Alertmanager
Max dimensions 16,000 20,000 16,000 Unlimited 65,536
Approx. cost (2M vectors) ~$700–1,200/mo ~$70–200/mo (serverless) ~$200–500/mo (RDS) Free (compute cost) ~$100–400/mo (cloud)

MangaAssist choice rationale: OpenSearch Serverless selected for native AWS integration (IAM, CloudWatch, VPC), managed HNSW support, and serverless scaling. Trade-off: higher cost than Pinecone but lower operational overhead within AWS ecosystem and stronger CloudWatch integration for unified monitoring.


Three Operational Pillars Deep Dive

Pillar 1: Performance Monitoring

Metrics Hierarchy

Metric Target Warning Critical Source
Query latency p50 < 20ms 20–50ms > 50ms CloudWatch custom
Query latency p95 < 80ms 80–150ms > 150ms CloudWatch custom
Query latency p99 < 200ms 200–500ms > 500ms CloudWatch custom
Throughput (QPS) 100–500 < 100 sustained < 50 sustained CloudWatch custom
Search OCU utilization 40–70% 70–85% > 85% CloudWatch SearchOCU
Indexing OCU utilization 30–60% 60–80% > 80% CloudWatch IndexingOCU
4xx error rate < 0.1% 0.1–1% > 1% CloudWatch 4xx
5xx error rate 0% > 0% > 0.5% CloudWatch 5xx
Cache hit rate > 60% 40–60% < 40% CloudWatch custom
Recall@10 > 0.90 0.85–0.90 < 0.85 Nightly benchmark

Query Execution Path with Monitoring Instrumentation

flowchart LR
    A["User Query Arrives"] -->|T0: timestamp| B["Generate Query<br/>Embedding"]
    B -->|T1: embedding_latency_ms| C["Execute k-NN<br/>Search"]
    C -->|T2: knn_latency_ms| D["Score & Rerank<br/>Results"]
    D -->|T3: rerank_latency_ms| E["Return Top-K<br/>Results"]
    E -->|T4: total_latency_ms| F["Emit CloudWatch<br/>Metrics"]

    B -.->|Monitor| M1["Embedding API<br/>latency & errors"]
    C -.->|Monitor| M2["k-NN search latency<br/>segments scanned<br/>scores distribution"]
    D -.->|Monitor| M3["Rerank duration<br/>score delta<br/>results filtered"]
    F -.->|Monitor| M4["Total e2e latency<br/>cache hit/miss<br/>result count"]

    style A fill:#2d3436,stroke:#00b894,color:#fff
    style F fill:#2d3436,stroke:#00b894,color:#fff
    style M1 fill:#6c5ce7,stroke:#a29bfe,color:#fff
    style M2 fill:#6c5ce7,stroke:#a29bfe,color:#fff
    style M3 fill:#6c5ce7,stroke:#a29bfe,color:#fff
    style M4 fill:#6c5ce7,stroke:#a29bfe,color:#fff

Latency Decomposition for MangaAssist

Typical p50 query path breakdown (total ~45ms e2e):

Stage p50 Latency % of Total Optimization Lever
Embedding generation (Titan) 15ms 33% Request batching, caching repeated queries
k-NN search (HNSW) 12ms 27% ef_search tuning, segment compaction
Network (VPC internal) 5ms 11% VPC endpoint, same-AZ placement
Score normalization + reranking 8ms 18% Algorithm optimization, pre-filtering
Serialization + response 5ms 11% Reduce returned fields, compression

Pillar 2: Automated Index Optimization

HNSW Parameter Tuning Cycle

The continuous tuning loop follows: Benchmark → Measure → Adjust → Validate

  1. Benchmark: Run standardized query set (500 queries with known ground-truth nearest neighbors) against current index configuration
  2. Measure: Calculate recall@10, recall@50, p50/p95 latency, and memory consumption
  3. Adjust: If recall < target, increase m or ef_search; if latency > target, decrease ef_search or run compaction
  4. Validate: Re-run benchmark to confirm improvement; roll back if regression detected

Parameter Guidance

ef_construction (set at index creation): - Controls neighbor evaluation during graph construction - Higher value = denser, more accurate graph = better recall - Trade-off: indexing time increases linearly; index build for 2M vectors at ef_construction=256 takes ~45 min vs ~20 min at ef_construction=128 - MangaAssist setting: 256 (balanced for nightly re-index feasibility)

m (set at index creation): - Maximum bi-directional connections per node - Higher value = each node connects to more neighbors = better recall AND faster search (more paths to target) - Trade-off: memory usage increases substantially: m=48 uses ~2x memory vs m=16 - MangaAssist setting: 24 (good recall with acceptable memory at 2M vectors)

ef_search (set per query, adjustable at runtime): - Search-time beam width; controls how many candidates are evaluated - The primary runtime tuning knob — no re-index needed - Higher value = better recall but slower queries - MangaAssist setting: 256 (default), dynamically increased to 512 for low-confidence queries

Segment Compaction Strategy

Trigger Action Schedule
Segment count > 20 Force merge to 5 segments Daily 4am JST
After bulk indexing (>10K docs) Force merge affected shards Post-index pipeline step
p95 latency increase > 30% Emergency compaction On-demand via alarm
Weekly maintenance Full optimization Sunday 3am JST

Decision Tree: When to Re-index vs Tune vs Scale

flowchart TD
    A["Performance Degradation<br/>Detected"] --> B{"Recall@10<br/>below target?"}
    B -->|Yes| C{"Current ef_search<br/>< max configured?"}
    C -->|Yes| D["Increase ef_search<br/>(runtime, no downtime)"]
    C -->|No| E{"Segment count<br/>> 20?"}
    E -->|Yes| F["Run Segment<br/>Compaction"]
    E -->|No| G{"m parameter<br/>< 48?"}
    G -->|Yes| H["Schedule Full Re-index<br/>with higher m"]
    G -->|No| I["Scale up OCUs<br/>or optimize queries"]

    B -->|No| J{"Latency p95<br/>above target?"}
    J -->|Yes| K{"Search OCU<br/>> 80%?"}
    K -->|Yes| L["Increase max OCU<br/>capacity policy"]
    K -->|No| M{"Segment count<br/>> 15?"}
    M -->|Yes| F
    M -->|No| N["Decrease ef_search<br/>(trade recall for speed)"]

    J -->|No| O["No action needed<br/>Continue monitoring"]

    style A fill:#e74c3c,stroke:#c0392b,color:#fff
    style D fill:#27ae60,stroke:#2ecc71,color:#fff
    style F fill:#f39c12,stroke:#e67e22,color:#fff
    style H fill:#e74c3c,stroke:#c0392b,color:#fff
    style I fill:#8e44ad,stroke:#9b59b6,color:#fff
    style L fill:#8e44ad,stroke:#9b59b6,color:#fff
    style N fill:#f39c12,stroke:#e67e22,color:#fff
    style O fill:#27ae60,stroke:#2ecc71,color:#fff

Pillar 3: Data Quality Validation

Embedding Freshness Tracking

Every document in the vector store carries metadata:

{
  "document_id": "prod-manga-12345",
  "source_updated_at": "2026-03-29T10:15:00Z",
  "embedding_created_at": "2026-03-29T11:00:00Z",
  "embedding_model_version": "amazon.titan-embed-text-v2:0",
  "source_hash": "sha256:abc123...",
  "staleness_hours": 0.75
}

Freshness policy for MangaAssist: - Product descriptions: re-embed within 24 hours of source update - Pricing data: re-embed within 1 hour (stored as metadata, not embedded directly) - Customer reviews: re-embed within 48 hours - Category/tag changes: re-embed within 6 hours (affects retrieval relevance)

Stale Content Detection Pipeline

Detection runs on two tracks: 1. Continuous: DynamoDB Streams trigger Lambda on product updates → compares source_updated_at with embedding_created_at in OpenSearch metadata 2. Nightly batch: Full scan of all documents, computing staleness distribution and flagging outliers

Embedding Quality Scoring

Metric Description Target How Measured
Silhouette score How well embeddings cluster by product category > 0.4 Nightly sample of 10K vectors, sklearn silhouette_score
Inter-cluster distance Separation between product category centroids > 0.3 cosine Nightly compute centroids per category
Intra-cluster variance Tightness within a product category < 0.15 cosine Standard deviation of distances to centroid
Known-pair accuracy % of known-similar products in each other's top-10 > 85% Test set of 500 curated pairs
Embedding drift Cosine distance between old and new embeddings for unchanged content < 0.05 Compare after model version update

Embedding Model Version Tracking

When upgrading the embedding model (e.g., Titan v1 → v2), all embeddings must use the same model version. Mixed-version indices produce meaningless similarity scores because different models map to different vector spaces.

MangaAssist version migration protocol: 1. Create new collection with new model version embeddings 2. Run parallel queries against old and new collection for 48 hours 3. Compare recall and relevance metrics 4. If new collection meets targets, switch traffic atomically 5. Retain old collection for 7 days as rollback target 6. Delete old collection after validation period

Data Quality Validation Pipeline

flowchart TD
    subgraph NightlyBatch["Nightly Batch Validation (2am JST)"]
        style NightlyBatch fill:#1a1a2e,stroke:#e94560,color:#fff
        NB1["Scan all documents<br/>in collection"] --> NB2["Compute freshness<br/>for each document"]
        NB2 --> NB3["Calculate clustering<br/>metrics (sample 10K)"]
        NB3 --> NB4["Run known-pair<br/>accuracy test"]
        NB4 --> NB5["Check model version<br/>consistency"]
        NB5 --> NB6{"All metrics<br/>within targets?"}
        NB6 -->|Yes| NB7["Publish healthy<br/>status to CloudWatch"]
        NB6 -->|No| NB8["Identify degraded<br/>metrics"]
        NB8 --> NB9["Trigger targeted<br/>re-embedding for<br/>stale documents"]
        NB9 --> NB10["Publish degraded<br/>status + alert"]
    end

    subgraph ContinuousCheck["Continuous Spot-Check"]
        style ContinuousCheck fill:#0f3460,stroke:#e94560,color:#fff
        CC1["DynamoDB Stream<br/>detects product update"] --> CC2["Compare source_updated_at<br/>vs embedding_created_at"]
        CC2 --> CC3{"Staleness ><br/>threshold?"}
        CC3 -->|Yes| CC4["Add to re-embedding<br/>priority queue"]
        CC3 -->|No| CC5["Log freshness metric<br/>continue"]
        CC4 --> CC6["Lambda processes<br/>queue every 5 min"]
        CC6 --> CC7["Re-embed document<br/>via Titan v2"]
        CC7 --> CC8["Update OpenSearch<br/>document in-place"]
    end

    NB10 -.->|Feed into| CC4

HLD: Vector Store Operations Data Model

from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
from enum import Enum


class IndexHealthStatus(Enum):
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    REINDEXING = "reindexing"
    STALE = "stale"
    CORRUPTED = "corrupted"


class AlertSeverity(Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"


@dataclass
class VectorQueryMetrics:
    """Metrics captured for each vector search query."""
    query_id: str
    collection_name: str
    timestamp: datetime
    latency_ms: float
    results_count: int
    top_score: float
    min_score: float
    query_vector_norm: float
    index_size: int
    ef_search_used: int
    segments_scanned: int = 0
    cache_hit: bool = False


@dataclass
class IndexHealthReport:
    """Periodic health assessment of a vector index."""
    collection_name: str
    timestamp: datetime
    status: IndexHealthStatus
    total_vectors: int
    dimension: int
    index_size_bytes: int
    segment_count: int
    hnsw_m: int
    hnsw_ef_construction: int
    avg_query_latency_ms: float
    p95_query_latency_ms: float
    p99_query_latency_ms: float
    recall_at_10: float
    stale_vector_percentage: float
    embedding_model_version: str
    search_ocu_utilization: float = 0.0
    indexing_ocu_utilization: float = 0.0
    last_reindex_time: Optional[datetime] = None
    last_compaction_time: Optional[datetime] = None
    recommendations: List[str] = field(default_factory=list)


@dataclass
class EmbeddingFreshnessRecord:
    """Tracks freshness of a single document's embedding vs source data."""
    document_id: str
    collection_name: str
    source_updated_at: datetime
    embedding_created_at: datetime
    embedding_model_version: str
    source_hash: str = ""
    staleness_hours: float = 0.0
    needs_refresh: bool = False


@dataclass
class HNSWConfig:
    """Tracks HNSW configuration for a collection with tuning history."""
    collection_name: str
    ef_construction: int
    m: int
    ef_search_default: int
    dimension: int
    distance_metric: str = "cosine"
    last_tuned: Optional[datetime] = None
    tuning_history: List[Dict] = field(default_factory=list)


@dataclass
class CompactionEvent:
    """Record of a segment compaction operation."""
    collection_name: str
    started_at: datetime
    completed_at: Optional[datetime] = None
    segments_before: int = 0
    segments_after: int = 0
    size_before_bytes: int = 0
    size_after_bytes: int = 0
    trigger_reason: str = "scheduled"
    success: bool = False

LLD: Vector Store Operations Monitor

import boto3
import time
import json
import statistics
from datetime import datetime, timedelta
from typing import Dict, List, Optional


class VectorStoreOpsMonitor:
    """
    Monitors OpenSearch Serverless vector store health and performance.

    Publishes custom CloudWatch metrics for query latency, embedding freshness,
    index health, and triggers automated remediation via alarms.
    """

    def __init__(self, config: dict):
        self.cloudwatch = boto3.client('cloudwatch')
        self.opensearch = boto3.client('opensearchserverless')
        self.sns = boto3.client('sns')
        self.namespace = config.get("namespace", "MangaAssist/VectorStore")
        self.collection_name = config["collection_name"]
        self.collection_endpoint = config["collection_endpoint"]
        self.stale_threshold_hours = config.get("stale_threshold_hours", 24)
        self.alert_topic_arn = config.get("alert_topic_arn")
        self._latency_window: List[float] = []
        self._quality_scores: List[float] = []
        self._max_window = 1000

    # ------------------------------------------------------------------
    # Query Performance Monitoring
    # ------------------------------------------------------------------

    def record_query(self, query_id: str, latency_ms: float, results_count: int,
                     top_score: float, min_score: float = 0.0,
                     ef_search_used: int = 256, cache_hit: bool = False):
        """Record a vector query execution and publish CloudWatch metrics."""
        self._latency_window.append(latency_ms)
        if len(self._latency_window) > self._max_window:
            self._latency_window.pop(0)

        dimensions = [{"Name": "Collection", "Value": self.collection_name}]
        metrics = [
            {
                "MetricName": "QueryLatency",
                "Value": latency_ms,
                "Unit": "Milliseconds",
                "Dimensions": dimensions,
                "Timestamp": datetime.utcnow(),
            },
            {
                "MetricName": "QueryCount",
                "Value": 1,
                "Unit": "Count",
                "Dimensions": dimensions,
            },
            {
                "MetricName": "ResultsCount",
                "Value": results_count,
                "Unit": "Count",
                "Dimensions": dimensions,
            },
            {
                "MetricName": "TopScore",
                "Value": top_score,
                "Unit": "None",
                "Dimensions": dimensions,
            },
            {
                "MetricName": "ScoreSpread",
                "Value": top_score - min_score if min_score else 0,
                "Unit": "None",
                "Dimensions": dimensions,
            },
            {
                "MetricName": "CacheHit",
                "Value": 1 if cache_hit else 0,
                "Unit": "Count",
                "Dimensions": dimensions,
            },
        ]
        self.cloudwatch.put_metric_data(Namespace=self.namespace, MetricData=metrics)

        # Anomaly detection: spike relative to recent p95
        if len(self._latency_window) >= 50:
            p95 = sorted(self._latency_window)[int(len(self._latency_window) * 0.95)]
            if latency_ms > p95 * 2:
                self._emit_alert(
                    "query_latency_spike",
                    f"Query {query_id} latency {latency_ms:.0f}ms > 2x p95 ({p95:.0f}ms)",
                    severity="WARNING",
                )

    def get_latency_percentiles(self) -> Dict[str, float]:
        """Calculate current latency percentiles from the sliding window."""
        if not self._latency_window:
            return {"p50": 0, "p95": 0, "p99": 0, "count": 0}
        sorted_latencies = sorted(self._latency_window)
        n = len(sorted_latencies)
        return {
            "p50": sorted_latencies[int(n * 0.50)],
            "p95": sorted_latencies[int(n * 0.95)],
            "p99": sorted_latencies[int(n * 0.99)],
            "count": n,
        }

    # ------------------------------------------------------------------
    # Embedding Freshness Monitoring
    # ------------------------------------------------------------------

    def check_embedding_freshness(self, documents: List[dict]) -> dict:
        """
        Check freshness of embeddings against source data timestamps.

        Each document dict must contain:
          - document_id: str
          - source_updated_at: datetime
          - embedding_created_at: datetime
        """
        stale_count = 0
        total = len(documents)
        stale_docs = []

        for doc in documents:
            source_updated = doc.get("source_updated_at")
            embedding_created = doc.get("embedding_created_at")
            if source_updated and embedding_created:
                staleness = (source_updated - embedding_created).total_seconds() / 3600
                if staleness > self.stale_threshold_hours:
                    stale_count += 1
                    stale_docs.append({
                        "document_id": doc["document_id"],
                        "staleness_hours": round(staleness, 1),
                    })

        stale_pct = (stale_count / total * 100) if total > 0 else 0

        self.cloudwatch.put_metric_data(
            Namespace=self.namespace,
            MetricData=[
                {
                    "MetricName": "StaleEmbeddingPercentage",
                    "Value": stale_pct,
                    "Unit": "Percent",
                    "Dimensions": [
                        {"Name": "Collection", "Value": self.collection_name}
                    ],
                },
                {
                    "MetricName": "StaleEmbeddingCount",
                    "Value": stale_count,
                    "Unit": "Count",
                    "Dimensions": [
                        {"Name": "Collection", "Value": self.collection_name}
                    ],
                },
            ],
        )

        if stale_pct > 10:
            self._emit_alert(
                "stale_embeddings",
                f"{stale_pct:.1f}% embeddings stale (>{self.stale_threshold_hours}h) — "
                f"{stale_count}/{total} documents need re-embedding",
                severity="WARNING" if stale_pct <= 25 else "CRITICAL",
            )

        return {
            "total": total,
            "stale_count": stale_count,
            "stale_pct": round(stale_pct, 2),
            "stale_docs": stale_docs[:50],  # Cap to avoid oversized responses
        }

    # ------------------------------------------------------------------
    # Index Health Evaluation
    # ------------------------------------------------------------------

    def evaluate_index_health(self, recall_at_10: float, avg_latency_ms: float,
                               p95_latency_ms: float, segment_count: int,
                               stale_pct: float, ocu_utilization: float = 0.0) -> dict:
        """Evaluate overall index health and generate prioritized recommendations."""
        recommendations = []
        status = "healthy"

        # Latency checks
        if avg_latency_ms > 100:
            recommendations.append(
                f"HIGH: Avg latency {avg_latency_ms:.0f}ms exceeds 100ms target — "
                "consider increasing OCU capacity or running segment compaction"
            )
            status = "degraded"
        if p95_latency_ms > 200:
            recommendations.append(
                f"HIGH: p95 latency {p95_latency_ms:.0f}ms exceeds 200ms target — "
                "check segment count and ef_search setting"
            )
            status = "degraded"

        # Recall check
        if recall_at_10 < 0.85:
            recommendations.append(
                f"HIGH: Recall@10 {recall_at_10:.2f} below 0.85 target — "
                "increase m parameter (requires re-index) or increase ef_search"
            )
            status = "degraded"

        # Segment count
        if segment_count > 20:
            recommendations.append(
                f"MEDIUM: {segment_count} segments exceeds 20 threshold — "
                "schedule segment compaction to reduce search overhead"
            )

        # Freshness
        if stale_pct > 10:
            recommendations.append(
                f"MEDIUM: {stale_pct:.1f}% embeddings stale — "
                "trigger targeted re-embedding pipeline"
            )
            if status == "healthy":
                status = "stale"

        # OCU utilization
        if ocu_utilization > 85:
            recommendations.append(
                f"HIGH: OCU utilization at {ocu_utilization:.0f}% — "
                "increase max OCU capacity policy to prevent throttling"
            )

        # Critical thresholds
        if recall_at_10 < 0.7 or avg_latency_ms > 500:
            status = "unhealthy"
            recommendations.insert(0,
                "CRITICAL: Consider full re-index with updated HNSW parameters — "
                "current performance severely degraded"
            )

        # Publish health status metric
        status_value = {"healthy": 1, "degraded": 2, "stale": 3, "unhealthy": 4}
        self.cloudwatch.put_metric_data(
            Namespace=self.namespace,
            MetricData=[{
                "MetricName": "IndexHealthStatus",
                "Value": status_value.get(status, 0),
                "Unit": "None",
                "Dimensions": [
                    {"Name": "Collection", "Value": self.collection_name}
                ],
            }],
        )

        return {"status": status, "recommendations": recommendations}

    # ------------------------------------------------------------------
    # OCU Monitoring
    # ------------------------------------------------------------------

    def get_ocu_metrics(self, period_minutes: int = 60) -> dict:
        """Retrieve current OCU utilization from CloudWatch."""
        end_time = datetime.utcnow()
        start_time = end_time - timedelta(minutes=period_minutes)

        search_ocu = self.cloudwatch.get_metric_statistics(
            Namespace="AWS/AOSS",
            MetricName="SearchOCU",
            StartTime=start_time,
            EndTime=end_time,
            Period=300,
            Statistics=["Average", "Maximum"],
            Dimensions=[
                {"Name": "CollectionName", "Value": self.collection_name}
            ],
        )
        indexing_ocu = self.cloudwatch.get_metric_statistics(
            Namespace="AWS/AOSS",
            MetricName="IndexingOCU",
            StartTime=start_time,
            EndTime=end_time,
            Period=300,
            Statistics=["Average", "Maximum"],
            Dimensions=[
                {"Name": "CollectionName", "Value": self.collection_name}
            ],
        )
        return {
            "search_ocu": search_ocu.get("Datapoints", []),
            "indexing_ocu": indexing_ocu.get("Datapoints", []),
        }

    # ------------------------------------------------------------------
    # Alerting
    # ------------------------------------------------------------------

    def _emit_alert(self, alert_type: str, message: str, severity: str = "WARNING"):
        """Emit alert metric and optionally publish to SNS."""
        self.cloudwatch.put_metric_data(
            Namespace=self.namespace,
            MetricData=[{
                "MetricName": "Alert",
                "Value": 1,
                "Unit": "Count",
                "Dimensions": [
                    {"Name": "Collection", "Value": self.collection_name},
                    {"Name": "AlertType", "Value": alert_type},
                    {"Name": "Severity", "Value": severity},
                ],
            }],
        )

        if self.alert_topic_arn:
            self.sns.publish(
                TopicArn=self.alert_topic_arn,
                Subject=f"[{severity}] MangaAssist Vector Store: {alert_type}",
                Message=json.dumps({
                    "alert_type": alert_type,
                    "severity": severity,
                    "collection": self.collection_name,
                    "message": message,
                    "timestamp": datetime.utcnow().isoformat(),
                }),
            )

HNSW Parameter Impact Matrix

Parameter Range Impact on Recall Impact on Latency Impact on Memory Impact on Build Time MangaAssist Default
ef_construction 128–512 128→256: +5–8% recall No query impact (build-time only) Minimal (build temp memory) 128: ~20min, 256: ~45min, 512: ~90min (2M vectors) 256
m 16–64 16→24: +3–5% recall; 24→48: +2–3% 16→24: −10% latency; 24→48: −5% 16: 1x baseline; 24: 1.4x; 48: 2.5x Linear increase with m 24
ef_search 100–500 100→256: +4–6% recall; 256→512: +1–2% 100: ~8ms; 256: ~12ms; 512: ~22ms No impact (query-time only) No impact (query-time only) 256 (dynamic: 512 for low-confidence)

MangaAssist Tuning Benchmarks (2M vectors, 1536-dim, cosine)

Configuration Recall@10 p50 Latency p95 Latency Memory Notes
m=16, ef_c=128, ef_s=100 0.82 6ms 14ms 10.2 GB ❌ Recall too low
m=24, ef_c=256, ef_s=256 0.92 12ms 28ms 12.8 GB Production config
m=48, ef_c=512, ef_s=256 0.95 10ms 24ms 22.1 GB ⚠️ Memory too high for cost
m=24, ef_c=256, ef_s=512 0.94 20ms 42ms 12.8 GB ⚠️ Latency increase for +2% recall

Key Design Decisions

# Decision Choice Rationale Trade-off
1 HNSW defaults m=24, ef_construction=256, ef_search=256 Best recall/latency/memory balance at 2M scale Higher m (48) would improve recall 2–3% but nearly double memory cost
2 Freshness threshold 24 hours for product descriptions Balances re-embedding cost vs content freshness; product descriptions change ~2–3x/week Prices use metadata (not embedded) so staleness is less critical for core vector quality
3 Reindex trigger Recall@10 < 0.85 OR p95 > 200ms sustained 15min Prevent silent quality degradation; p95 catches tail latency before user impact Full re-index takes ~45min; during re-index queries hit existing index (stale but functional)
4 Monitoring granularity Per-query metrics for latency; 60-second health checks Per-query gives precise p-tile tracking; 60s health checks balance visibility vs CloudWatch cost ~500 QPS × custom metrics = ~$15/day CloudWatch cost; acceptable for production
5 Compaction schedule Daily 4am JST + post-bulk-index 4am is lowest traffic; post-bulk prevents segment accumulation after catalog updates Compaction temporarily spikes OCU usage; schedule avoids user-facing impact
6 Quality benchmark frequency Nightly (recall@K, clustering) + continuous (freshness) Nightly full benchmark catches drift; continuous freshness prevents stale content from accumulating Nightly benchmark requires 500-query test set maintained by ML team
7 Embedding model version policy All vectors must use same model version; blue-green migration Mixed versions produce invalid similarity scores; blue-green avoids downtime Migration re-embeds all 2M documents (~$40 Titan cost, ~3hr pipeline); justified only for model upgrades
8 Cache strategy LRU cache for top-1000 repeated query embeddings 30% of queries are repeated (popular products); cache hit avoids Titan API call (~15ms saved) Cache invalidation on product update; 1000 entries ≈ 6MB memory on ECS
9 Alert routing CloudWatch Alarm → SNS → PagerDuty (critical) / Slack (warning) Severity-based routing ensures critical issues get immediate attention Requires maintaining SNS topic subscriptions and PagerDuty integration

Cross-References

Topic Reference Relevance
Embedding drift troubleshooting 04-retrieval-system-troubleshooting.md Detailed debugging when embedding drift is detected by this monitoring system
RAG pipeline cost optimization US-06-rag-pipeline-cost-optimization.md Cost management for the re-embedding and indexing pipelines
Performance monitoring Skill-4.3.1 through 4.3.4 Broader FM inference monitoring that feeds into vector store performance correlation
DynamoDB source monitoring DynamoDB folder Source data change detection that triggers the embedding refresh pipeline
Security guardrails Security-Privacy-Guardrails Access control policies for OpenSearch Serverless collections
Model evaluation Evaluation-Systems-GenAI Recall@K and retrieval quality metrics feed into end-to-end evaluation

Key Takeaways

  1. Embedding freshness is a first-class operational metric — stale embeddings cause silent quality degradation that users notice (irrelevant recommendations) but standard infrastructure monitoring misses entirely. Track source_updated_at vs embedding_created_at for every document.

  2. HNSW tuning is continuous, not set-and-forget — as data volume grows and query patterns shift, the optimal ef_search/m balance changes. Nightly benchmarks with a ground-truth query set catch drift before users feel it.

  3. The recall vs latency tradeoff is the core operational tension — higher ef_search improves recall but increases latency; the right balance depends on your SLA. For MangaAssist, we accept p95 < 80ms for recall@10 > 0.90.

  4. Segment count is a hidden performance killer — OpenSearch accumulates small segments during indexing; without scheduled compaction, query latency creeps up silently as k-NN search must scan each segment independently.

  5. Stale embeddings cause silent quality degradation — unlike a service outage (which triggers alarms), stale embeddings return plausible but wrong results. The chatbot still responds confidently with outdated product information — the most dangerous failure mode.

  6. Mixed embedding model versions invalidate similarity scores — treat model version consistency as a hard constraint, not a nice-to-have. A single document embedded with v1 in a v2 index corrupts neighbor relationships for all nearby vectors.

  7. OCU cost is the primary operational expense — OpenSearch Serverless auto-scales OCUs based on load, and unoptimized queries (full scans, excessive ef_search) directly translate to higher CloudWatch bills. Monitor OCU/query as a cost efficiency metric.