Vector Store Operational Management Architecture
AWS AIP-C01 Task 4.3 → Skill 4.3.5: Create vector store operational management systems for FM augmentation
System: MangaAssist e-commerce chatbot — Bedrock Claude 3, OpenSearch Serverless (HNSW), DynamoDB, ECS Fargate, API Gateway WebSocket
Vector Store: Amazon OpenSearch Serverless with HNSW k-NN index (1536-dim Titan Embeddings)
Skill Mapping
| AWS AIP-C01 Element | Coverage |
|---|---|
| Task 4.3 | Monitor and optimize inference performance of FMs |
| Skill 4.3.5 | Create vector store operational management systems for FM augmentation |
| Key Focus | Performance monitoring, index optimization, data quality, capacity management for vector stores powering RAG pipelines |
| MangaAssist Context | OpenSearch Serverless stores ~2M product embeddings (manga, figures, merch) used for retrieval-augmented generation with Claude 3 |
Mind Map — Vector Store Operations
mindmap
root((Vector Store Ops))
Performance Monitoring
Query Latency p50/p95/p99
Throughput QPS
Concurrent Connections
Index Scan Efficiency
Memory Pressure
Cache Hit Rate
Index Optimization
HNSW Parameters
ef_construction
m (graph connectivity)
ef_search
Segment Compaction
Auto-Reindex Triggers
Benchmark-Driven Tuning
Recall@K Tracking
Data Quality
Embedding Freshness
Stale Content Detection
Embedding Model Version Tracking
Clustering Coherence
Nearest-Neighbor Accuracy
Drift Detection
Capacity Management
Storage Utilization
OCU Scaling
Cost per Query
Growth Projection
Dimension Optimization
Operational Reliability
Health Checks
Failover Procedures
Backup and Restore
Replication Lag
Disaster Recovery
Architecture — Vector Store Monitoring Framework
graph TB
subgraph IngestionPipeline["Ingestion Pipeline"]
style IngestionPipeline fill:#1a1a2e,stroke:#16213e,color:#fff
DDB["DynamoDB<br/>Product Catalog"] -->|Change Streams| LAMBDA_ETL["Lambda ETL<br/>Extract & Transform"]
LAMBDA_ETL -->|Text chunks| TITAN_EMB_I["Bedrock Titan<br/>Embeddings v2"]
TITAN_EMB_I -->|1536-dim vectors| BULK_IDX["OpenSearch Serverless<br/>Bulk Indexing API"]
BULK_IDX --> OSS_COLL["OpenSearch Serverless<br/>manga-products Collection"]
end
subgraph QueryPipeline["Query Pipeline"]
style QueryPipeline fill:#0f3460,stroke:#16213e,color:#fff
USER_Q["User Query<br/>via WebSocket"] --> ECS_ORCH["ECS Orchestrator<br/>Fargate Service"]
ECS_ORCH --> TITAN_EMB_Q["Bedrock Titan<br/>Embedding"]
TITAN_EMB_Q -->|Query vector| KNN_SEARCH["OpenSearch k-NN<br/>HNSW Search"]
KNN_SEARCH --> RERANK["Results Reranking<br/>Score Normalization"]
RERANK --> FM_CTX["Claude 3 Context<br/>Augmented Prompt"]
end
subgraph MonitoringLayer["Monitoring Layer"]
style MonitoringLayer fill:#533483,stroke:#16213e,color:#fff
CW_METRICS["CloudWatch Metrics<br/>Latency, Errors, OCU Usage"]
CUSTOM_METRICS["Custom Metrics<br/>Embedding Freshness<br/>Quality Scores<br/>Recall@K"]
HEALTH_CHK["Automated Health Checks<br/>Every 60 seconds"]
end
subgraph OptimizationLayer["Optimization Layer"]
style OptimizationLayer fill:#e94560,stroke:#16213e,color:#fff
IDX_OPT["Index Parameter<br/>Optimizer"]
SEG_COMP["Segment Compaction<br/>Scheduler"]
EMB_REFRESH["Embedding Refresh<br/>Pipeline"]
end
subgraph AlertingLayer["Alerting & Response"]
style AlertingLayer fill:#0a1931,stroke:#16213e,color:#fff
CW_ALARM["CloudWatch Alarms<br/>Threshold Breach"]
SNS_TOPIC["SNS Topic<br/>Notifications"]
AUTO_SCALE["Auto-Scale OCU<br/>Capacity Policy"]
RUNBOOK["Runbook Automation<br/>Lambda Remediation"]
end
OSS_COLL --> CW_METRICS
KNN_SEARCH --> CW_METRICS
KNN_SEARCH --> CUSTOM_METRICS
BULK_IDX --> CUSTOM_METRICS
HEALTH_CHK --> CW_METRICS
HEALTH_CHK --> CUSTOM_METRICS
CW_METRICS --> IDX_OPT
CUSTOM_METRICS --> IDX_OPT
CUSTOM_METRICS --> SEG_COMP
CUSTOM_METRICS --> EMB_REFRESH
EMB_REFRESH --> LAMBDA_ETL
CW_METRICS --> CW_ALARM
CUSTOM_METRICS --> CW_ALARM
CW_ALARM --> SNS_TOPIC
CW_ALARM --> AUTO_SCALE
CW_ALARM --> RUNBOOK
OpenSearch Serverless for Vector Search — Key Concepts
OCU (OpenSearch Compute Units)
OpenSearch Serverless abstracts infrastructure into OCUs — the fundamental unit of compute and memory:
| Aspect | Detail |
|---|---|
| What is an OCU | A compute unit providing vCPU, memory, and storage I/O capacity |
| Minimum allocation | 2 OCUs for indexing + 2 OCUs for search (always in pairs) |
| Scaling behavior | Auto-scales in OCU increments based on workload; no manual provisioning |
| Cost model | Billed per OCU-hour (~$0.24/OCU-hour); minimum cost = 4 OCUs × 24h |
| Monitoring | CloudWatch SearchOCU and IndexingOCU metrics track utilization |
| MangaAssist impact | Peak hours (8pm–12am JST) drive search OCU from 4→12; indexing steady at 2–4 |
Why OCU monitoring matters: Unlike provisioned OpenSearch where you manage instances, Serverless auto-scales but with cost implications. Unmonitored OCU spikes (e.g., poorly structured queries causing full scans) can 3–4x your daily cost without any visible errors.
HNSW Index Parameters
Hierarchical Navigable Small World (HNSW) is a graph-based approximate nearest neighbor algorithm:
- ef_construction (build-time): Controls how many neighbors are evaluated when building the graph. Higher values → better recall but slower indexing. Set once at index creation; changing requires full re-index.
- m (graph edges): Maximum number of connections per node in the graph. Higher values → better recall and search speed but significantly more memory. The sweet spot for 1536-dim vectors is typically 16–48.
- ef_search (query-time): Controls the search beam width at query time. Can be adjusted per-query. Higher values → better recall but slower search. This is the primary runtime tuning knob.
Segment Merging
OpenSearch stores data in immutable segments. As documents are indexed, many small segments accumulate:
- Problem: Each k-NN search must scan every segment; 50 small segments = 50× the search overhead vs 1 merged segment
- Solution: Segment compaction (force merge) combines small segments into fewer large ones
- Trade-off: Merging is I/O-intensive and temporarily increases OCU consumption
- Schedule for MangaAssist: Run compaction during off-peak (4am–6am JST) when search OCU drops to minimum
Index Size, Dimensions, and Memory Relationship
Memory per vector ≈ (dimensions × 4 bytes) + (m × 2 × 4 bytes) + overhead
For MangaAssist: (1536 × 4) + (24 × 2 × 4) + 64 ≈ 6,400 bytes/vector
With 2M vectors: 2,000,000 × 6,400 ≈ 12.8 GB in-memory index
This directly drives OCU requirements since HNSW indices must be memory-resident for fast search.
Vector Store Comparison Matrix
| Capability | OpenSearch Serverless | Pinecone | pgvector | ChromaDB | Qdrant |
|---|---|---|---|---|---|
| Built-in metrics | ✅ CloudWatch (latency, OCU, errors, 4xx/5xx) | ✅ Dashboard (latency, request count) | ❌ Manual via pg_stat | ❌ None native | ✅ Prometheus endpoint |
| Latency tracking | ✅ p50/p95/p99 via CloudWatch | ✅ p50/p95/p99 in console | ⚠️ Via pg_stat_statements | ❌ Application-level only | ✅ Built-in histograms |
| Auto-scaling | ✅ OCU auto-scale (2–200) | ✅ Serverless auto-scale | ❌ Manual (scale host) | ❌ Single-node | ⚠️ Manual cluster scaling |
| Index optimization | ⚠️ Manual HNSW tuning, force merge | ✅ Fully managed, automatic | ❌ Manual VACUUM, reindex | ⚠️ Basic compaction | ✅ Auto-optimization |
| Embedding drift detection | ❌ Custom implementation needed | ❌ Custom implementation needed | ❌ Custom implementation needed | ❌ Custom implementation needed | ❌ Custom implementation needed |
| Backup/restore | ✅ Snapshot to S3, PITR | ✅ Collections backup | ✅ pg_dump / pg_restore | ⚠️ File-based persist | ✅ Snapshot/restore API |
| Cost tracking | ✅ CloudWatch + Cost Explorer per OCU | ✅ Dashboard shows read/write units | ✅ Standard RDS/instance billing | ✅ Free (self-hosted) | ✅ Self-hosted or cloud billing |
| Alerting integration | ✅ CloudWatch Alarms → SNS → Lambda | ✅ Basic email alerts | ⚠️ Via RDS Events | ❌ None | ✅ Prometheus Alertmanager |
| Max dimensions | 16,000 | 20,000 | 16,000 | Unlimited | 65,536 |
| Approx. cost (2M vectors) | ~$700–1,200/mo | ~$70–200/mo (serverless) | ~$200–500/mo (RDS) | Free (compute cost) | ~$100–400/mo (cloud) |
MangaAssist choice rationale: OpenSearch Serverless selected for native AWS integration (IAM, CloudWatch, VPC), managed HNSW support, and serverless scaling. Trade-off: higher cost than Pinecone but lower operational overhead within AWS ecosystem and stronger CloudWatch integration for unified monitoring.
Three Operational Pillars Deep Dive
Pillar 1: Performance Monitoring
Metrics Hierarchy
| Metric | Target | Warning | Critical | Source |
|---|---|---|---|---|
| Query latency p50 | < 20ms | 20–50ms | > 50ms | CloudWatch custom |
| Query latency p95 | < 80ms | 80–150ms | > 150ms | CloudWatch custom |
| Query latency p99 | < 200ms | 200–500ms | > 500ms | CloudWatch custom |
| Throughput (QPS) | 100–500 | < 100 sustained | < 50 sustained | CloudWatch custom |
| Search OCU utilization | 40–70% | 70–85% | > 85% | CloudWatch SearchOCU |
| Indexing OCU utilization | 30–60% | 60–80% | > 80% | CloudWatch IndexingOCU |
| 4xx error rate | < 0.1% | 0.1–1% | > 1% | CloudWatch 4xx |
| 5xx error rate | 0% | > 0% | > 0.5% | CloudWatch 5xx |
| Cache hit rate | > 60% | 40–60% | < 40% | CloudWatch custom |
| Recall@10 | > 0.90 | 0.85–0.90 | < 0.85 | Nightly benchmark |
Query Execution Path with Monitoring Instrumentation
flowchart LR
A["User Query Arrives"] -->|T0: timestamp| B["Generate Query<br/>Embedding"]
B -->|T1: embedding_latency_ms| C["Execute k-NN<br/>Search"]
C -->|T2: knn_latency_ms| D["Score & Rerank<br/>Results"]
D -->|T3: rerank_latency_ms| E["Return Top-K<br/>Results"]
E -->|T4: total_latency_ms| F["Emit CloudWatch<br/>Metrics"]
B -.->|Monitor| M1["Embedding API<br/>latency & errors"]
C -.->|Monitor| M2["k-NN search latency<br/>segments scanned<br/>scores distribution"]
D -.->|Monitor| M3["Rerank duration<br/>score delta<br/>results filtered"]
F -.->|Monitor| M4["Total e2e latency<br/>cache hit/miss<br/>result count"]
style A fill:#2d3436,stroke:#00b894,color:#fff
style F fill:#2d3436,stroke:#00b894,color:#fff
style M1 fill:#6c5ce7,stroke:#a29bfe,color:#fff
style M2 fill:#6c5ce7,stroke:#a29bfe,color:#fff
style M3 fill:#6c5ce7,stroke:#a29bfe,color:#fff
style M4 fill:#6c5ce7,stroke:#a29bfe,color:#fff
Latency Decomposition for MangaAssist
Typical p50 query path breakdown (total ~45ms e2e):
| Stage | p50 Latency | % of Total | Optimization Lever |
|---|---|---|---|
| Embedding generation (Titan) | 15ms | 33% | Request batching, caching repeated queries |
| k-NN search (HNSW) | 12ms | 27% | ef_search tuning, segment compaction |
| Network (VPC internal) | 5ms | 11% | VPC endpoint, same-AZ placement |
| Score normalization + reranking | 8ms | 18% | Algorithm optimization, pre-filtering |
| Serialization + response | 5ms | 11% | Reduce returned fields, compression |
Pillar 2: Automated Index Optimization
HNSW Parameter Tuning Cycle
The continuous tuning loop follows: Benchmark → Measure → Adjust → Validate
- Benchmark: Run standardized query set (500 queries with known ground-truth nearest neighbors) against current index configuration
- Measure: Calculate recall@10, recall@50, p50/p95 latency, and memory consumption
- Adjust: If recall < target, increase
moref_search; if latency > target, decreaseef_searchor run compaction - Validate: Re-run benchmark to confirm improvement; roll back if regression detected
Parameter Guidance
ef_construction (set at index creation): - Controls neighbor evaluation during graph construction - Higher value = denser, more accurate graph = better recall - Trade-off: indexing time increases linearly; index build for 2M vectors at ef_construction=256 takes ~45 min vs ~20 min at ef_construction=128 - MangaAssist setting: 256 (balanced for nightly re-index feasibility)
m (set at index creation): - Maximum bi-directional connections per node - Higher value = each node connects to more neighbors = better recall AND faster search (more paths to target) - Trade-off: memory usage increases substantially: m=48 uses ~2x memory vs m=16 - MangaAssist setting: 24 (good recall with acceptable memory at 2M vectors)
ef_search (set per query, adjustable at runtime): - Search-time beam width; controls how many candidates are evaluated - The primary runtime tuning knob — no re-index needed - Higher value = better recall but slower queries - MangaAssist setting: 256 (default), dynamically increased to 512 for low-confidence queries
Segment Compaction Strategy
| Trigger | Action | Schedule |
|---|---|---|
| Segment count > 20 | Force merge to 5 segments | Daily 4am JST |
| After bulk indexing (>10K docs) | Force merge affected shards | Post-index pipeline step |
| p95 latency increase > 30% | Emergency compaction | On-demand via alarm |
| Weekly maintenance | Full optimization | Sunday 3am JST |
Decision Tree: When to Re-index vs Tune vs Scale
flowchart TD
A["Performance Degradation<br/>Detected"] --> B{"Recall@10<br/>below target?"}
B -->|Yes| C{"Current ef_search<br/>< max configured?"}
C -->|Yes| D["Increase ef_search<br/>(runtime, no downtime)"]
C -->|No| E{"Segment count<br/>> 20?"}
E -->|Yes| F["Run Segment<br/>Compaction"]
E -->|No| G{"m parameter<br/>< 48?"}
G -->|Yes| H["Schedule Full Re-index<br/>with higher m"]
G -->|No| I["Scale up OCUs<br/>or optimize queries"]
B -->|No| J{"Latency p95<br/>above target?"}
J -->|Yes| K{"Search OCU<br/>> 80%?"}
K -->|Yes| L["Increase max OCU<br/>capacity policy"]
K -->|No| M{"Segment count<br/>> 15?"}
M -->|Yes| F
M -->|No| N["Decrease ef_search<br/>(trade recall for speed)"]
J -->|No| O["No action needed<br/>Continue monitoring"]
style A fill:#e74c3c,stroke:#c0392b,color:#fff
style D fill:#27ae60,stroke:#2ecc71,color:#fff
style F fill:#f39c12,stroke:#e67e22,color:#fff
style H fill:#e74c3c,stroke:#c0392b,color:#fff
style I fill:#8e44ad,stroke:#9b59b6,color:#fff
style L fill:#8e44ad,stroke:#9b59b6,color:#fff
style N fill:#f39c12,stroke:#e67e22,color:#fff
style O fill:#27ae60,stroke:#2ecc71,color:#fff
Pillar 3: Data Quality Validation
Embedding Freshness Tracking
Every document in the vector store carries metadata:
{
"document_id": "prod-manga-12345",
"source_updated_at": "2026-03-29T10:15:00Z",
"embedding_created_at": "2026-03-29T11:00:00Z",
"embedding_model_version": "amazon.titan-embed-text-v2:0",
"source_hash": "sha256:abc123...",
"staleness_hours": 0.75
}
Freshness policy for MangaAssist: - Product descriptions: re-embed within 24 hours of source update - Pricing data: re-embed within 1 hour (stored as metadata, not embedded directly) - Customer reviews: re-embed within 48 hours - Category/tag changes: re-embed within 6 hours (affects retrieval relevance)
Stale Content Detection Pipeline
Detection runs on two tracks:
1. Continuous: DynamoDB Streams trigger Lambda on product updates → compares source_updated_at with embedding_created_at in OpenSearch metadata
2. Nightly batch: Full scan of all documents, computing staleness distribution and flagging outliers
Embedding Quality Scoring
| Metric | Description | Target | How Measured |
|---|---|---|---|
| Silhouette score | How well embeddings cluster by product category | > 0.4 | Nightly sample of 10K vectors, sklearn silhouette_score |
| Inter-cluster distance | Separation between product category centroids | > 0.3 cosine | Nightly compute centroids per category |
| Intra-cluster variance | Tightness within a product category | < 0.15 cosine | Standard deviation of distances to centroid |
| Known-pair accuracy | % of known-similar products in each other's top-10 | > 85% | Test set of 500 curated pairs |
| Embedding drift | Cosine distance between old and new embeddings for unchanged content | < 0.05 | Compare after model version update |
Embedding Model Version Tracking
When upgrading the embedding model (e.g., Titan v1 → v2), all embeddings must use the same model version. Mixed-version indices produce meaningless similarity scores because different models map to different vector spaces.
MangaAssist version migration protocol: 1. Create new collection with new model version embeddings 2. Run parallel queries against old and new collection for 48 hours 3. Compare recall and relevance metrics 4. If new collection meets targets, switch traffic atomically 5. Retain old collection for 7 days as rollback target 6. Delete old collection after validation period
Data Quality Validation Pipeline
flowchart TD
subgraph NightlyBatch["Nightly Batch Validation (2am JST)"]
style NightlyBatch fill:#1a1a2e,stroke:#e94560,color:#fff
NB1["Scan all documents<br/>in collection"] --> NB2["Compute freshness<br/>for each document"]
NB2 --> NB3["Calculate clustering<br/>metrics (sample 10K)"]
NB3 --> NB4["Run known-pair<br/>accuracy test"]
NB4 --> NB5["Check model version<br/>consistency"]
NB5 --> NB6{"All metrics<br/>within targets?"}
NB6 -->|Yes| NB7["Publish healthy<br/>status to CloudWatch"]
NB6 -->|No| NB8["Identify degraded<br/>metrics"]
NB8 --> NB9["Trigger targeted<br/>re-embedding for<br/>stale documents"]
NB9 --> NB10["Publish degraded<br/>status + alert"]
end
subgraph ContinuousCheck["Continuous Spot-Check"]
style ContinuousCheck fill:#0f3460,stroke:#e94560,color:#fff
CC1["DynamoDB Stream<br/>detects product update"] --> CC2["Compare source_updated_at<br/>vs embedding_created_at"]
CC2 --> CC3{"Staleness ><br/>threshold?"}
CC3 -->|Yes| CC4["Add to re-embedding<br/>priority queue"]
CC3 -->|No| CC5["Log freshness metric<br/>continue"]
CC4 --> CC6["Lambda processes<br/>queue every 5 min"]
CC6 --> CC7["Re-embed document<br/>via Titan v2"]
CC7 --> CC8["Update OpenSearch<br/>document in-place"]
end
NB10 -.->|Feed into| CC4
HLD: Vector Store Operations Data Model
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
from enum import Enum
class IndexHealthStatus(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
REINDEXING = "reindexing"
STALE = "stale"
CORRUPTED = "corrupted"
class AlertSeverity(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class VectorQueryMetrics:
"""Metrics captured for each vector search query."""
query_id: str
collection_name: str
timestamp: datetime
latency_ms: float
results_count: int
top_score: float
min_score: float
query_vector_norm: float
index_size: int
ef_search_used: int
segments_scanned: int = 0
cache_hit: bool = False
@dataclass
class IndexHealthReport:
"""Periodic health assessment of a vector index."""
collection_name: str
timestamp: datetime
status: IndexHealthStatus
total_vectors: int
dimension: int
index_size_bytes: int
segment_count: int
hnsw_m: int
hnsw_ef_construction: int
avg_query_latency_ms: float
p95_query_latency_ms: float
p99_query_latency_ms: float
recall_at_10: float
stale_vector_percentage: float
embedding_model_version: str
search_ocu_utilization: float = 0.0
indexing_ocu_utilization: float = 0.0
last_reindex_time: Optional[datetime] = None
last_compaction_time: Optional[datetime] = None
recommendations: List[str] = field(default_factory=list)
@dataclass
class EmbeddingFreshnessRecord:
"""Tracks freshness of a single document's embedding vs source data."""
document_id: str
collection_name: str
source_updated_at: datetime
embedding_created_at: datetime
embedding_model_version: str
source_hash: str = ""
staleness_hours: float = 0.0
needs_refresh: bool = False
@dataclass
class HNSWConfig:
"""Tracks HNSW configuration for a collection with tuning history."""
collection_name: str
ef_construction: int
m: int
ef_search_default: int
dimension: int
distance_metric: str = "cosine"
last_tuned: Optional[datetime] = None
tuning_history: List[Dict] = field(default_factory=list)
@dataclass
class CompactionEvent:
"""Record of a segment compaction operation."""
collection_name: str
started_at: datetime
completed_at: Optional[datetime] = None
segments_before: int = 0
segments_after: int = 0
size_before_bytes: int = 0
size_after_bytes: int = 0
trigger_reason: str = "scheduled"
success: bool = False
LLD: Vector Store Operations Monitor
import boto3
import time
import json
import statistics
from datetime import datetime, timedelta
from typing import Dict, List, Optional
class VectorStoreOpsMonitor:
"""
Monitors OpenSearch Serverless vector store health and performance.
Publishes custom CloudWatch metrics for query latency, embedding freshness,
index health, and triggers automated remediation via alarms.
"""
def __init__(self, config: dict):
self.cloudwatch = boto3.client('cloudwatch')
self.opensearch = boto3.client('opensearchserverless')
self.sns = boto3.client('sns')
self.namespace = config.get("namespace", "MangaAssist/VectorStore")
self.collection_name = config["collection_name"]
self.collection_endpoint = config["collection_endpoint"]
self.stale_threshold_hours = config.get("stale_threshold_hours", 24)
self.alert_topic_arn = config.get("alert_topic_arn")
self._latency_window: List[float] = []
self._quality_scores: List[float] = []
self._max_window = 1000
# ------------------------------------------------------------------
# Query Performance Monitoring
# ------------------------------------------------------------------
def record_query(self, query_id: str, latency_ms: float, results_count: int,
top_score: float, min_score: float = 0.0,
ef_search_used: int = 256, cache_hit: bool = False):
"""Record a vector query execution and publish CloudWatch metrics."""
self._latency_window.append(latency_ms)
if len(self._latency_window) > self._max_window:
self._latency_window.pop(0)
dimensions = [{"Name": "Collection", "Value": self.collection_name}]
metrics = [
{
"MetricName": "QueryLatency",
"Value": latency_ms,
"Unit": "Milliseconds",
"Dimensions": dimensions,
"Timestamp": datetime.utcnow(),
},
{
"MetricName": "QueryCount",
"Value": 1,
"Unit": "Count",
"Dimensions": dimensions,
},
{
"MetricName": "ResultsCount",
"Value": results_count,
"Unit": "Count",
"Dimensions": dimensions,
},
{
"MetricName": "TopScore",
"Value": top_score,
"Unit": "None",
"Dimensions": dimensions,
},
{
"MetricName": "ScoreSpread",
"Value": top_score - min_score if min_score else 0,
"Unit": "None",
"Dimensions": dimensions,
},
{
"MetricName": "CacheHit",
"Value": 1 if cache_hit else 0,
"Unit": "Count",
"Dimensions": dimensions,
},
]
self.cloudwatch.put_metric_data(Namespace=self.namespace, MetricData=metrics)
# Anomaly detection: spike relative to recent p95
if len(self._latency_window) >= 50:
p95 = sorted(self._latency_window)[int(len(self._latency_window) * 0.95)]
if latency_ms > p95 * 2:
self._emit_alert(
"query_latency_spike",
f"Query {query_id} latency {latency_ms:.0f}ms > 2x p95 ({p95:.0f}ms)",
severity="WARNING",
)
def get_latency_percentiles(self) -> Dict[str, float]:
"""Calculate current latency percentiles from the sliding window."""
if not self._latency_window:
return {"p50": 0, "p95": 0, "p99": 0, "count": 0}
sorted_latencies = sorted(self._latency_window)
n = len(sorted_latencies)
return {
"p50": sorted_latencies[int(n * 0.50)],
"p95": sorted_latencies[int(n * 0.95)],
"p99": sorted_latencies[int(n * 0.99)],
"count": n,
}
# ------------------------------------------------------------------
# Embedding Freshness Monitoring
# ------------------------------------------------------------------
def check_embedding_freshness(self, documents: List[dict]) -> dict:
"""
Check freshness of embeddings against source data timestamps.
Each document dict must contain:
- document_id: str
- source_updated_at: datetime
- embedding_created_at: datetime
"""
stale_count = 0
total = len(documents)
stale_docs = []
for doc in documents:
source_updated = doc.get("source_updated_at")
embedding_created = doc.get("embedding_created_at")
if source_updated and embedding_created:
staleness = (source_updated - embedding_created).total_seconds() / 3600
if staleness > self.stale_threshold_hours:
stale_count += 1
stale_docs.append({
"document_id": doc["document_id"],
"staleness_hours": round(staleness, 1),
})
stale_pct = (stale_count / total * 100) if total > 0 else 0
self.cloudwatch.put_metric_data(
Namespace=self.namespace,
MetricData=[
{
"MetricName": "StaleEmbeddingPercentage",
"Value": stale_pct,
"Unit": "Percent",
"Dimensions": [
{"Name": "Collection", "Value": self.collection_name}
],
},
{
"MetricName": "StaleEmbeddingCount",
"Value": stale_count,
"Unit": "Count",
"Dimensions": [
{"Name": "Collection", "Value": self.collection_name}
],
},
],
)
if stale_pct > 10:
self._emit_alert(
"stale_embeddings",
f"{stale_pct:.1f}% embeddings stale (>{self.stale_threshold_hours}h) — "
f"{stale_count}/{total} documents need re-embedding",
severity="WARNING" if stale_pct <= 25 else "CRITICAL",
)
return {
"total": total,
"stale_count": stale_count,
"stale_pct": round(stale_pct, 2),
"stale_docs": stale_docs[:50], # Cap to avoid oversized responses
}
# ------------------------------------------------------------------
# Index Health Evaluation
# ------------------------------------------------------------------
def evaluate_index_health(self, recall_at_10: float, avg_latency_ms: float,
p95_latency_ms: float, segment_count: int,
stale_pct: float, ocu_utilization: float = 0.0) -> dict:
"""Evaluate overall index health and generate prioritized recommendations."""
recommendations = []
status = "healthy"
# Latency checks
if avg_latency_ms > 100:
recommendations.append(
f"HIGH: Avg latency {avg_latency_ms:.0f}ms exceeds 100ms target — "
"consider increasing OCU capacity or running segment compaction"
)
status = "degraded"
if p95_latency_ms > 200:
recommendations.append(
f"HIGH: p95 latency {p95_latency_ms:.0f}ms exceeds 200ms target — "
"check segment count and ef_search setting"
)
status = "degraded"
# Recall check
if recall_at_10 < 0.85:
recommendations.append(
f"HIGH: Recall@10 {recall_at_10:.2f} below 0.85 target — "
"increase m parameter (requires re-index) or increase ef_search"
)
status = "degraded"
# Segment count
if segment_count > 20:
recommendations.append(
f"MEDIUM: {segment_count} segments exceeds 20 threshold — "
"schedule segment compaction to reduce search overhead"
)
# Freshness
if stale_pct > 10:
recommendations.append(
f"MEDIUM: {stale_pct:.1f}% embeddings stale — "
"trigger targeted re-embedding pipeline"
)
if status == "healthy":
status = "stale"
# OCU utilization
if ocu_utilization > 85:
recommendations.append(
f"HIGH: OCU utilization at {ocu_utilization:.0f}% — "
"increase max OCU capacity policy to prevent throttling"
)
# Critical thresholds
if recall_at_10 < 0.7 or avg_latency_ms > 500:
status = "unhealthy"
recommendations.insert(0,
"CRITICAL: Consider full re-index with updated HNSW parameters — "
"current performance severely degraded"
)
# Publish health status metric
status_value = {"healthy": 1, "degraded": 2, "stale": 3, "unhealthy": 4}
self.cloudwatch.put_metric_data(
Namespace=self.namespace,
MetricData=[{
"MetricName": "IndexHealthStatus",
"Value": status_value.get(status, 0),
"Unit": "None",
"Dimensions": [
{"Name": "Collection", "Value": self.collection_name}
],
}],
)
return {"status": status, "recommendations": recommendations}
# ------------------------------------------------------------------
# OCU Monitoring
# ------------------------------------------------------------------
def get_ocu_metrics(self, period_minutes: int = 60) -> dict:
"""Retrieve current OCU utilization from CloudWatch."""
end_time = datetime.utcnow()
start_time = end_time - timedelta(minutes=period_minutes)
search_ocu = self.cloudwatch.get_metric_statistics(
Namespace="AWS/AOSS",
MetricName="SearchOCU",
StartTime=start_time,
EndTime=end_time,
Period=300,
Statistics=["Average", "Maximum"],
Dimensions=[
{"Name": "CollectionName", "Value": self.collection_name}
],
)
indexing_ocu = self.cloudwatch.get_metric_statistics(
Namespace="AWS/AOSS",
MetricName="IndexingOCU",
StartTime=start_time,
EndTime=end_time,
Period=300,
Statistics=["Average", "Maximum"],
Dimensions=[
{"Name": "CollectionName", "Value": self.collection_name}
],
)
return {
"search_ocu": search_ocu.get("Datapoints", []),
"indexing_ocu": indexing_ocu.get("Datapoints", []),
}
# ------------------------------------------------------------------
# Alerting
# ------------------------------------------------------------------
def _emit_alert(self, alert_type: str, message: str, severity: str = "WARNING"):
"""Emit alert metric and optionally publish to SNS."""
self.cloudwatch.put_metric_data(
Namespace=self.namespace,
MetricData=[{
"MetricName": "Alert",
"Value": 1,
"Unit": "Count",
"Dimensions": [
{"Name": "Collection", "Value": self.collection_name},
{"Name": "AlertType", "Value": alert_type},
{"Name": "Severity", "Value": severity},
],
}],
)
if self.alert_topic_arn:
self.sns.publish(
TopicArn=self.alert_topic_arn,
Subject=f"[{severity}] MangaAssist Vector Store: {alert_type}",
Message=json.dumps({
"alert_type": alert_type,
"severity": severity,
"collection": self.collection_name,
"message": message,
"timestamp": datetime.utcnow().isoformat(),
}),
)
HNSW Parameter Impact Matrix
| Parameter | Range | Impact on Recall | Impact on Latency | Impact on Memory | Impact on Build Time | MangaAssist Default |
|---|---|---|---|---|---|---|
| ef_construction | 128–512 | 128→256: +5–8% recall | No query impact (build-time only) | Minimal (build temp memory) | 128: ~20min, 256: ~45min, 512: ~90min (2M vectors) | 256 |
| m | 16–64 | 16→24: +3–5% recall; 24→48: +2–3% | 16→24: −10% latency; 24→48: −5% | 16: 1x baseline; 24: 1.4x; 48: 2.5x | Linear increase with m | 24 |
| ef_search | 100–500 | 100→256: +4–6% recall; 256→512: +1–2% | 100: ~8ms; 256: ~12ms; 512: ~22ms | No impact (query-time only) | No impact (query-time only) | 256 (dynamic: 512 for low-confidence) |
MangaAssist Tuning Benchmarks (2M vectors, 1536-dim, cosine)
| Configuration | Recall@10 | p50 Latency | p95 Latency | Memory | Notes |
|---|---|---|---|---|---|
| m=16, ef_c=128, ef_s=100 | 0.82 | 6ms | 14ms | 10.2 GB | ❌ Recall too low |
| m=24, ef_c=256, ef_s=256 | 0.92 | 12ms | 28ms | 12.8 GB | ✅ Production config |
| m=48, ef_c=512, ef_s=256 | 0.95 | 10ms | 24ms | 22.1 GB | ⚠️ Memory too high for cost |
| m=24, ef_c=256, ef_s=512 | 0.94 | 20ms | 42ms | 12.8 GB | ⚠️ Latency increase for +2% recall |
Key Design Decisions
| # | Decision | Choice | Rationale | Trade-off |
|---|---|---|---|---|
| 1 | HNSW defaults | m=24, ef_construction=256, ef_search=256 | Best recall/latency/memory balance at 2M scale | Higher m (48) would improve recall 2–3% but nearly double memory cost |
| 2 | Freshness threshold | 24 hours for product descriptions | Balances re-embedding cost vs content freshness; product descriptions change ~2–3x/week | Prices use metadata (not embedded) so staleness is less critical for core vector quality |
| 3 | Reindex trigger | Recall@10 < 0.85 OR p95 > 200ms sustained 15min | Prevent silent quality degradation; p95 catches tail latency before user impact | Full re-index takes ~45min; during re-index queries hit existing index (stale but functional) |
| 4 | Monitoring granularity | Per-query metrics for latency; 60-second health checks | Per-query gives precise p-tile tracking; 60s health checks balance visibility vs CloudWatch cost | ~500 QPS × custom metrics = ~$15/day CloudWatch cost; acceptable for production |
| 5 | Compaction schedule | Daily 4am JST + post-bulk-index | 4am is lowest traffic; post-bulk prevents segment accumulation after catalog updates | Compaction temporarily spikes OCU usage; schedule avoids user-facing impact |
| 6 | Quality benchmark frequency | Nightly (recall@K, clustering) + continuous (freshness) | Nightly full benchmark catches drift; continuous freshness prevents stale content from accumulating | Nightly benchmark requires 500-query test set maintained by ML team |
| 7 | Embedding model version policy | All vectors must use same model version; blue-green migration | Mixed versions produce invalid similarity scores; blue-green avoids downtime | Migration re-embeds all 2M documents (~$40 Titan cost, ~3hr pipeline); justified only for model upgrades |
| 8 | Cache strategy | LRU cache for top-1000 repeated query embeddings | 30% of queries are repeated (popular products); cache hit avoids Titan API call (~15ms saved) | Cache invalidation on product update; 1000 entries ≈ 6MB memory on ECS |
| 9 | Alert routing | CloudWatch Alarm → SNS → PagerDuty (critical) / Slack (warning) | Severity-based routing ensures critical issues get immediate attention | Requires maintaining SNS topic subscriptions and PagerDuty integration |
Cross-References
| Topic | Reference | Relevance |
|---|---|---|
| Embedding drift troubleshooting | 04-retrieval-system-troubleshooting.md | Detailed debugging when embedding drift is detected by this monitoring system |
| RAG pipeline cost optimization | US-06-rag-pipeline-cost-optimization.md | Cost management for the re-embedding and indexing pipelines |
| Performance monitoring | Skill-4.3.1 through 4.3.4 | Broader FM inference monitoring that feeds into vector store performance correlation |
| DynamoDB source monitoring | DynamoDB folder | Source data change detection that triggers the embedding refresh pipeline |
| Security guardrails | Security-Privacy-Guardrails | Access control policies for OpenSearch Serverless collections |
| Model evaluation | Evaluation-Systems-GenAI | Recall@K and retrieval quality metrics feed into end-to-end evaluation |
Key Takeaways
-
Embedding freshness is a first-class operational metric — stale embeddings cause silent quality degradation that users notice (irrelevant recommendations) but standard infrastructure monitoring misses entirely. Track
source_updated_atvsembedding_created_atfor every document. -
HNSW tuning is continuous, not set-and-forget — as data volume grows and query patterns shift, the optimal ef_search/m balance changes. Nightly benchmarks with a ground-truth query set catch drift before users feel it.
-
The recall vs latency tradeoff is the core operational tension — higher ef_search improves recall but increases latency; the right balance depends on your SLA. For MangaAssist, we accept p95 < 80ms for recall@10 > 0.90.
-
Segment count is a hidden performance killer — OpenSearch accumulates small segments during indexing; without scheduled compaction, query latency creeps up silently as k-NN search must scan each segment independently.
-
Stale embeddings cause silent quality degradation — unlike a service outage (which triggers alarms), stale embeddings return plausible but wrong results. The chatbot still responds confidently with outdated product information — the most dangerous failure mode.
-
Mixed embedding model versions invalidate similarity scores — treat model version consistency as a hard constraint, not a nice-to-have. A single document embedded with v1 in a v2 index corrupts neighbor relationships for all nearby vectors.
-
OCU cost is the primary operational expense — OpenSearch Serverless auto-scales OCUs based on load, and unoptimized queries (full scans, excessive ef_search) directly translate to higher CloudWatch bills. Monitor OCU/query as a cost efficiency metric.