Latency Reduction Techniques for FM Systems
MangaAssist is a JP Manga store chatbot running on AWS. The stack includes Bedrock Claude 3 (Sonnet for complex queries, Haiku for simple), OpenSearch Serverless for vector retrieval, DynamoDB for sessions/products/orders, ECS Fargate for orchestration, API Gateway WebSocket for real-time communication, and ElastiCache Redis for semantic caching. The p95 end-to-end latency target is < 2 seconds.
Skill Mapping
| AWS AIP-C01 Skill | Sub-Skill | This File Covers |
|---|---|---|
| 4.2.6 FM System Performance | Connection Management | HTTP/2 multiplexing, connection warm-up on ECS start |
| 4.2.6 FM System Performance | VPC Endpoint Optimization | Private link to Bedrock/DynamoDB/OpenSearch, measured gains |
| 4.2.6 FM System Performance | Payload Optimization | Request/response size minimization, compression |
| 4.2.6 FM System Performance | Async Patterns | Fire-and-forget for logging/analytics, async cache writes |
| 4.2.6 FM System Performance | Prefetching | Predictive context loading, popular product pre-caching |
| 4.2.6 FM System Performance | DNS Optimization | Caching DNS resolution for AWS endpoints |
Deep-Dive: Latency Reduction for MangaAssist
Every millisecond matters when the p95 target is 2,000ms and Bedrock alone can consume 800ms. This document covers the techniques MangaAssist uses to shave latency from every non-FM segment, maximizing the budget available for model inference.
1. Connection Management
HTTP/2 Multiplexing
HTTP/1.1 connections are one-request-at-a-time. When MangaAssist needs to call Bedrock, OpenSearch, and DynamoDB in quick succession, each call blocks until the previous one completes — even on the same connection.
HTTP/2 multiplexes multiple requests over a single TCP connection, eliminating head-of-line blocking at the protocol level.
Impact on MangaAssist:
| Scenario | HTTP/1.1 | HTTP/2 | Savings |
|---|---|---|---|
| 3 parallel Bedrock subsegment calls | 3 connections, 3 TLS handshakes (90ms overhead) | 1 connection, 1 TLS handshake (30ms overhead) | 60ms |
| 5 DynamoDB reads in batch | 5 sequential on 1 connection | 5 multiplexed on 1 connection | ~20ms |
| OpenSearch + DynamoDB parallel | 2 connections, 2 TLS handshakes | 1 or 2 connections, streams multiplexed | ~15ms |
Configuration:
boto3 uses urllib3 under the hood, which supports HTTP/1.1 connection pooling but does not natively support HTTP/2 for AWS service calls. However, the latency savings come from:
- Connection reuse — boto3 keeps connections alive within the pool, so TLS handshakes happen only once per connection lifecycle.
- Concurrent streams — when using
asynciowithaioboto3, concurrent requests to the same endpoint share transport-layer resources.
Connection Warm-Up on ECS Task Start
ECS Fargate tasks are ephemeral. When a new task starts (scale-out event or deployment), the first request pays a cold-start tax:
Cold-start penalty breakdown:
DNS resolution .............. 5-15ms
TCP handshake ............... 5-10ms
TLS handshake ............... 15-25ms
SDK initialization .......... 10-20ms
─────────────────────────────────────
Total first-request penalty: 35-70ms per downstream service
With 4 downstream services (Bedrock, OpenSearch, DynamoDB, Redis), the total cold-start penalty can reach 140-280ms on the very first request.
Warm-up strategy:
sequenceDiagram
participant ECS as ECS Task Startup
participant Health as Health Check Endpoint
participant Bedrock as Bedrock
participant OS as OpenSearch
participant DDB as DynamoDB
participant Redis as Redis
Note over ECS: Container starts, app initializes
ECS->>Health: Register health check handler
par Pre-Warm All Connections
ECS->>Bedrock: Minimal invoke (Haiku, 1 token)
ECS->>OS: Ping cluster health
ECS->>DDB: GetItem (dummy key)
ECS->>Redis: PING
end
Bedrock-->>ECS: Connection established + warm
OS-->>ECS: Connection established + warm
DDB-->>ECS: Connection established + warm
Redis-->>ECS: Connection established + warm
Note over ECS: All pools warm. Mark healthy.
ECS->>Health: Return 200 OK (ready for traffic)
Note over Health: ALB starts routing traffic to this task
Key detail: The ECS task does not pass its ALB health check until all connections are warm. This prevents the load balancer from routing traffic to a cold task.
2. VPC Endpoint Optimization
The NAT Gateway Latency Problem
Without VPC endpoints, all traffic from ECS Fargate tasks in a private subnet to AWS services traverses:
graph LR
ECS["ECS Fargate<br/>(Private Subnet)"]
NAT["NAT Gateway<br/>(+10-20ms processing)"]
IGW["Internet Gateway"]
SERVICE["AWS Service<br/>Endpoint"]
ECS -->|"Private IP"| NAT
NAT -->|"Public IP"| IGW
IGW -->|"Public Internet"| SERVICE
style NAT fill:#e74c3c,color:#fff
style ECS fill:#3498db,color:#fff
style SERVICE fill:#ff9900,color:#000
The NAT Gateway adds 10-20ms per request (packet inspection, address translation, connection tracking). For a single MangaAssist request that touches 4+ AWS services, this compounds.
VPC Endpoint Architecture
graph LR
ECS["ECS Fargate<br/>(Private Subnet)"]
VPCE_B["VPC Endpoint<br/>Bedrock<br/>(Interface)"]
VPCE_D["VPC Endpoint<br/>DynamoDB<br/>(Gateway)"]
VPCE_O["VPC Endpoint<br/>OpenSearch<br/>(Interface)"]
VPCE_S["VPC Endpoint<br/>S3<br/>(Gateway)"]
VPCE_CW["VPC Endpoint<br/>CloudWatch<br/>(Interface)"]
BEDROCK["Bedrock Runtime"]
DDB["DynamoDB"]
OS["OpenSearch Serverless"]
S3["S3"]
CW["CloudWatch"]
ECS -->|"Private DNS"| VPCE_B --> BEDROCK
ECS -->|"Route Table"| VPCE_D --> DDB
ECS -->|"Private DNS"| VPCE_O --> OS
ECS -->|"Route Table"| VPCE_S --> S3
ECS -->|"Private DNS"| VPCE_CW --> CW
style ECS fill:#3498db,color:#fff
style VPCE_B fill:#9b59b6,color:#fff
style VPCE_D fill:#9b59b6,color:#fff
style VPCE_O fill:#9b59b6,color:#fff
style VPCE_S fill:#9b59b6,color:#fff
style VPCE_CW fill:#9b59b6,color:#fff
style BEDROCK fill:#ff9900,color:#000
style DDB fill:#ff9900,color:#000
style OS fill:#ff9900,color:#000
style S3 fill:#ff9900,color:#000
style CW fill:#ff9900,color:#000
Measured Latency Improvement
| Service | Via NAT Gateway (p95) | Via VPC Endpoint (p95) | Improvement | Endpoint Type |
|---|---|---|---|---|
| Bedrock Runtime | 45ms network overhead | 30ms network overhead | -15ms (-33%) | Interface |
| DynamoDB | 18ms | 8ms | -10ms (-56%) | Gateway (free) |
| OpenSearch Serverless | 72ms | 60ms | -12ms (-17%) | Interface |
| S3 | 15ms | 8ms | -7ms (-47%) | Gateway (free) |
| CloudWatch Logs | 12ms | 6ms | -6ms (-50%) | Interface |
| Per-request total | ~162ms overhead | ~112ms overhead | -50ms (-31%) |
Cost Analysis
| Item | Monthly Cost |
|---|---|
| NAT Gateway (before): data processing @ $0.045/GB, 500GB/month | $22.50 + $32.40 (hourly) = ~$55 |
| VPC Interface Endpoints (3 endpoints x 2 AZs): $0.01/hr each | $43.80 |
| VPC Gateway Endpoints (DynamoDB, S3) | Free |
| Net savings | ~$11/month + 50ms latency |
At MangaAssist scale, the cost difference is negligible. The latency improvement is the primary driver.
3. Payload Optimization
Request Payload Minimization
Every byte sent to Bedrock, OpenSearch, or DynamoDB adds serialization time, network transit time, and deserialization time.
Bedrock prompt payload:
| Approach | Payload Size | Serialization Time |
|---|---|---|
| Full conversation history (20 turns) | ~12 KB | 2.1ms |
| Pruned to last 5 turns + summary | ~3.5 KB | 0.6ms |
| Compressed (gzip) | ~1.2 KB | 0.3ms + 0.2ms decompress |
OpenSearch response payload:
| Approach | Response Size | Deserialization Time |
|---|---|---|
| All fields (25 fields per doc, 5 docs) | ~8 KB | 1.8ms |
| Selected fields (6 fields per doc, 5 docs) | ~1.6 KB | 0.4ms |
Compression for Large Context
When conversation history is long (10+ turns), MangaAssist applies compression before caching and decompresses on retrieval:
Original session context: 15,200 bytes
Gzip compressed: 3,800 bytes (75% reduction)
Compression time: 0.3ms
Decompression time: 0.1ms
Redis storage savings: 75%
Network transit savings: ~2ms per cache operation
Net benefit: 0.4ms compression overhead vs. 2ms network savings = 1.6ms net gain per cache operation.
4. Async Patterns
Fire-and-Forget for Non-Critical Operations
MangaAssist classifies operations as critical-path (must complete before response) and non-critical (can complete after response starts streaming).
graph TB
subgraph "Critical Path (blocks response)"
A1[Cache Lookup]
A2[Vector Search]
A3[Session Load]
A4[Bedrock Invoke]
A5[Guardrails Check]
end
subgraph "Non-Critical (fire-and-forget)"
B1[Cache Write]
B2[Session Update]
B3[Analytics Emit]
B4[Audit Log Write]
B5[Feedback Signal]
end
A4 -->|"Response ready"| STREAM[Stream to Client]
A4 -.->|"Async after response starts"| B1
A4 -.->|"Async after response starts"| B2
A4 -.->|"Async after response starts"| B3
A4 -.->|"Async after response starts"| B4
style A1 fill:#e74c3c,color:#fff
style A2 fill:#e74c3c,color:#fff
style A3 fill:#e74c3c,color:#fff
style A4 fill:#e74c3c,color:#fff
style A5 fill:#e74c3c,color:#fff
style B1 fill:#2ecc71,color:#000
style B2 fill:#2ecc71,color:#000
style B3 fill:#2ecc71,color:#000
style B4 fill:#2ecc71,color:#000
style B5 fill:#2ecc71,color:#000
style STREAM fill:#3498db,color:#fff
Latency savings from async patterns:
| Operation | Sync Duration | Async Impact on Critical Path | Savings |
|---|---|---|---|
| Cache write (Redis SET) | 3-5ms | 0ms (fire-and-forget) | 3-5ms |
| Session update (DynamoDB PUT) | 5-10ms | 0ms (fire-and-forget) | 5-10ms |
| Analytics emit (Kinesis PUT) | 3-8ms | 0ms (fire-and-forget) | 3-8ms |
| Audit log (CloudWatch PUT) | 2-5ms | 0ms (fire-and-forget) | 2-5ms |
| Total non-critical work | 13-28ms | 0ms | 13-28ms |
Async Cache Writes — Handling Failures
Fire-and-forget does not mean fire-and-ignore. MangaAssist handles async write failures via:
- Dead letter queue — failed cache writes go to an SQS DLQ for retry
- Metric emission —
CacheWriteFailureCloudWatch metric triggers alerts - Graceful degradation — a missed cache write means the next identical query re-computes (costs more but still works)
5. Prefetching
Predictive Context Loading
When a WebSocket connection opens, MangaAssist knows the user's session ID. Before the user sends their first message, the system prefetches:
sequenceDiagram
participant User
participant APIGW as API Gateway
participant ECS as ECS Orchestrator
participant Redis as Redis Cache
participant DDB as DynamoDB
participant OS as OpenSearch
User->>APIGW: WebSocket CONNECT
APIGW->>ECS: $connect event
par Prefetch on Connect
ECS->>Redis: Prefetch session cache
ECS->>DDB: Prefetch session data
ECS->>DDB: Prefetch user preferences
ECS->>OS: Prefetch trending manga (top 10)
end
Redis-->>ECS: Cached session (or miss)
DDB-->>ECS: Session + preferences
OS-->>ECS: Trending products
Note over ECS: Context pre-loaded in memory
User->>APIGW: First message: "recommend something"
APIGW->>ECS: $default route
Note over ECS: Session already loaded!<br/>Skip DynamoDB read (saved 8ms).<br/>Trending products ready (saved 60ms).
ECS->>ECS: Assemble prompt (using prefetched data)
ECS->>ECS: Continue pipeline...
Latency savings on first message: 60-80ms (session + preferences already loaded).
Popular Product Pre-Caching
MangaAssist pre-computes and caches results for the most common queries:
| Query Pattern | Frequency | Pre-cached? | Cache TTL | Savings When Hit |
|---|---|---|---|---|
| "trending manga" / "popular this week" | 22% of queries | Yes | 15 min | ~900ms (skip Bedrock) |
| "new releases" | 15% of queries | Yes | 1 hour | ~900ms |
| Genre browsing ("show me shonen") | 12% of queries | Yes | 30 min | ~700ms |
| Specific title lookup ("One Piece latest") | 8% of queries | Yes (top 100 titles) | 5 min | ~500ms |
| Custom/unique queries | 43% of queries | No | - | 0ms |
Aggregate impact: ~57% of queries benefit from prefetching, with average savings of 750ms per hit.
6. Service Mesh and Hop Minimization
Minimizing Inter-Service Hops
Every network hop adds latency (DNS, TCP, TLS, serialization). MangaAssist minimizes hops by co-locating logic:
graph TB
subgraph "Before: 4 internal hops"
direction TB
R1[Request Handler] --> IC1[Intent Classifier Service]
IC1 --> PE1[Product Enrichment Service]
PE1 --> PA1[Prompt Assembly Service]
PA1 --> BI1[Bedrock Invoker Service]
end
subgraph "After: 1 monolith + direct calls"
direction TB
R2[ECS Orchestrator<br/>Intent + Enrichment + Assembly<br/>all in-process] --> BI2[Bedrock<br/>Direct Call]
end
style R1 fill:#e74c3c,color:#fff
style IC1 fill:#e74c3c,color:#fff
style PE1 fill:#e74c3c,color:#fff
style PA1 fill:#e74c3c,color:#fff
style BI1 fill:#e74c3c,color:#fff
style R2 fill:#2ecc71,color:#000
style BI2 fill:#2ecc71,color:#000
| Architecture | Internal Hops | Per-Hop Overhead | Total Internal Overhead |
|---|---|---|---|
| Microservices (4 internal services) | 3 | 5-8ms | 15-24ms |
| Modular monolith (co-located) | 0 | 0ms | 0ms |
MangaAssist uses a modular monolith pattern: intent classification, product enrichment, and prompt assembly run in-process within the ECS orchestrator. Only external AWS service calls (Bedrock, OpenSearch, DynamoDB, Redis) cross the network.
When to add microservice hops: Only when a component needs independent scaling (e.g., a batch indexing service that runs on a different schedule from the chatbot).
7. DNS Optimization
Caching DNS Resolution for AWS Endpoints
Each AWS service call begins with a DNS resolution. Without caching, the ECS task resolves DNS for every request:
bedrock-runtime.ap-northeast-1.amazonaws.com → DNS lookup → 5-15ms
search-manga-collection.ap-northeast-1.aoss.amazonaws.com → DNS lookup → 5-15ms
dynamodb.ap-northeast-1.amazonaws.com → DNS lookup → 5-15ms
MangaAssist DNS optimization:
- OS-level DNS cache — ECS Fargate containers run Amazon Linux 2 with
nscd(Name Service Cache Daemon) configured for 60-second TTL. - Application-level caching —
urllib3(used by boto3) caches resolved IPs for the connection pool lifetime. - VPC DNS resolver — Route 53 Resolver in the VPC caches DNS for all VPC endpoints, reducing resolution to < 1ms for cached entries.
| DNS Strategy | First Resolution | Subsequent Resolutions | TTL |
|---|---|---|---|
| No caching | 5-15ms | 5-15ms | - |
| OS-level cache (nscd) | 5-15ms | < 1ms | 60s |
| VPC Resolver + VPC Endpoints | < 1ms (private hosted zone) | < 1ms | 300s |
Total DNS savings per request: ~15-30ms (across 4+ service calls, each saving 5-10ms on resolution).
Python Code — LatencyOptimizer with Prefetching
"""
MangaAssist Latency Optimizer — Prefetching and Async Patterns
Reduces end-to-end latency by pre-loading context on WebSocket connect
and moving non-critical operations off the hot path.
"""
import asyncio
import time
import json
import logging
import hashlib
from dataclasses import dataclass, field
from typing import Optional
import boto3
import redis.asyncio as aioredis
from aiobotocore.session import get_session as aioboto_session
logger = logging.getLogger(__name__)
@dataclass
class PrefetchedContext:
"""Pre-loaded context for a user session."""
session_id: str
session_data: Optional[dict] = None
user_preferences: Optional[dict] = None
trending_products: Optional[list] = None
recent_interactions: Optional[list] = None
prefetch_time_ms: float = 0.0
is_warm: bool = False
@dataclass
class LatencyBudget:
"""Track how much latency budget remains for each request."""
total_budget_ms: float = 2000.0
consumed_ms: float = 0.0
segments: dict = field(default_factory=dict)
@property
def remaining_ms(self) -> float:
return self.total_budget_ms - self.consumed_ms
def consume(self, segment: str, duration_ms: float):
self.segments[segment] = duration_ms
self.consumed_ms += duration_ms
def is_over_budget(self) -> bool:
return self.consumed_ms > self.total_budget_ms
class LatencyOptimizer:
"""
Manages prefetching, async patterns, and latency budget tracking
for MangaAssist requests.
Usage:
optimizer = LatencyOptimizer(redis_url, dynamodb_table, opensearch_host)
# On WebSocket connect — prefetch context
context = await optimizer.prefetch_on_connect(session_id, user_id)
# On message — use prefetched context, track latency budget
budget = LatencyBudget()
response = await optimizer.process_with_budget(
query, context, budget
)
# After response — fire-and-forget post-processing
await optimizer.async_post_process(
session_id, query, response, fire_and_forget=True
)
"""
def __init__(
self,
redis_url: str = "redis://manga-assist-cache.xxxx.apne1.cache.amazonaws.com:6379",
dynamodb_table: str = "MangaAssist-Sessions",
opensearch_host: str = "manga-assist-collection.ap-northeast-1.aoss.amazonaws.com",
region: str = "ap-northeast-1",
):
self.redis_url = redis_url
self.dynamodb_table_name = dynamodb_table
self.opensearch_host = opensearch_host
self.region = region
# Connection pool (initialized lazily)
self._redis: Optional[aioredis.Redis] = None
self._prefetch_cache: dict[str, PrefetchedContext] = {}
# Prefetch config
self.trending_cache_ttl = 900 # 15 minutes
self.session_cache_ttl = 300 # 5 minutes
self.max_prefetch_age_ms = 30_000 # re-prefetch if older than 30s
async def _get_redis(self) -> aioredis.Redis:
"""Lazy-initialize async Redis connection."""
if self._redis is None:
self._redis = aioredis.from_url(
self.redis_url,
decode_responses=True,
socket_timeout=2.0,
socket_connect_timeout=1.0,
)
return self._redis
async def prefetch_on_connect(self, session_id: str,
user_id: str) -> PrefetchedContext:
"""
Called when a WebSocket connection opens. Pre-loads all context
needed for the first user message, so the first query is fast.
"""
start = time.monotonic()
context = PrefetchedContext(session_id=session_id)
try:
# Run all prefetch operations concurrently
results = await asyncio.gather(
self._prefetch_session(session_id),
self._prefetch_preferences(user_id),
self._prefetch_trending(),
self._prefetch_recent_interactions(user_id),
return_exceptions=True,
)
# Unpack results (handle individual failures gracefully)
if not isinstance(results[0], Exception):
context.session_data = results[0]
else:
logger.warning("Session prefetch failed: %s", results[0])
if not isinstance(results[1], Exception):
context.user_preferences = results[1]
else:
logger.warning("Preferences prefetch failed: %s", results[1])
if not isinstance(results[2], Exception):
context.trending_products = results[2]
else:
logger.warning("Trending prefetch failed: %s", results[2])
if not isinstance(results[3], Exception):
context.recent_interactions = results[3]
else:
logger.warning("Recent interactions prefetch failed: %s", results[3])
context.is_warm = True
except Exception as e:
logger.error("Prefetch failed entirely: %s", e)
context.is_warm = False
context.prefetch_time_ms = (time.monotonic() - start) * 1000
self._prefetch_cache[session_id] = context
logger.info(
"Prefetch complete for session %s in %.1fms (warm=%s)",
session_id, context.prefetch_time_ms, context.is_warm,
)
return context
async def _prefetch_session(self, session_id: str) -> Optional[dict]:
"""Load session data from Redis (fast) or DynamoDB (fallback)."""
r = await self._get_redis()
cached = await r.get(f"session:{session_id}")
if cached:
return json.loads(cached)
# Fallback to DynamoDB
session = aioboto_session()
async with session.create_client("dynamodb", region_name=self.region) as ddb:
resp = await ddb.get_item(
TableName=self.dynamodb_table_name,
Key={"session_id": {"S": session_id}},
)
item = resp.get("Item")
if item:
# Cache in Redis for subsequent requests
session_data = self._deserialize_dynamodb(item)
await r.setex(
f"session:{session_id}",
self.session_cache_ttl,
json.dumps(session_data),
)
return session_data
return None
async def _prefetch_preferences(self, user_id: str) -> Optional[dict]:
"""Load user preferences (language, genre preferences, price range)."""
r = await self._get_redis()
cached = await r.get(f"prefs:{user_id}")
if cached:
return json.loads(cached)
# In production, load from DynamoDB preferences table
return {"language": "ja", "preferred_genres": ["shonen", "seinen"],
"price_max": 2000}
async def _prefetch_trending(self) -> Optional[list]:
"""Load trending manga products (shared across all users)."""
r = await self._get_redis()
cached = await r.get("trending:manga:top10")
if cached:
return json.loads(cached)
# In production, populated by a scheduled Lambda every 15 minutes
return []
async def _prefetch_recent_interactions(self, user_id: str) -> Optional[list]:
"""Load recent interaction history for personalization."""
r = await self._get_redis()
cached = await r.lrange(f"interactions:{user_id}", 0, 9)
if cached:
return [json.loads(item) for item in cached]
return []
def get_prefetched_context(self, session_id: str) -> Optional[PrefetchedContext]:
"""Retrieve prefetched context for a session (in-memory lookup)."""
context = self._prefetch_cache.get(session_id)
if context and context.is_warm:
age_ms = (time.monotonic() * 1000) - (
context.prefetch_time_ms + time.monotonic() * 1000
- context.prefetch_time_ms
)
return context
return None
async def async_post_process(
self,
session_id: str,
query: str,
response: str,
fire_and_forget: bool = True,
):
"""
Non-critical post-processing: cache write, session update,
analytics. Runs async and does not block response delivery.
"""
tasks = [
self._async_cache_write(session_id, query, response),
self._async_session_update(session_id, query, response),
self._async_analytics_emit(session_id, query, response),
]
if fire_and_forget:
# Schedule tasks but do not await them — they run in background
for task in tasks:
asyncio.create_task(
self._safe_execute(task),
name=f"post-process-{session_id}",
)
else:
# Await all (used in testing or when consistency is required)
await asyncio.gather(*tasks, return_exceptions=True)
async def _safe_execute(self, coro):
"""Execute a coroutine with exception logging (no propagation)."""
try:
await coro
except Exception as e:
logger.error("Async post-process failed: %s", e)
async def _async_cache_write(self, session_id: str,
query: str, response: str):
"""Write query-response pair to semantic cache."""
r = await self._get_redis()
cache_key = f"response:{hashlib.sha256(query.encode()).hexdigest()[:16]}"
await r.setex(cache_key, 3600, json.dumps({
"query": query,
"response": response,
"session_id": session_id,
"timestamp": time.time(),
}))
async def _async_session_update(self, session_id: str,
query: str, response: str):
"""Append interaction to session history."""
r = await self._get_redis()
interaction = json.dumps({
"query": query,
"response_preview": response[:200],
"timestamp": time.time(),
})
await r.rpush(f"session_history:{session_id}", interaction)
await r.expire(f"session_history:{session_id}", 3600)
async def _async_analytics_emit(self, session_id: str,
query: str, response: str):
"""Emit analytics event to Kinesis (simplified)."""
# In production, this would put a record to Kinesis Data Streams
logger.info(
"Analytics: session=%s, query_len=%d, response_len=%d",
session_id, len(query), len(response),
)
@staticmethod
def _deserialize_dynamodb(item: dict) -> dict:
"""Simple DynamoDB item deserializer."""
result = {}
for key, value in item.items():
if "S" in value:
result[key] = value["S"]
elif "N" in value:
result[key] = float(value["N"])
elif "BOOL" in value:
result[key] = value["BOOL"]
elif "L" in value:
result[key] = value["L"]
elif "M" in value:
result[key] = value["M"]
return result
Python Code — VPCEndpointHealthChecker
"""
MangaAssist VPC Endpoint Health Checker
Validates that VPC endpoints are healthy and faster than NAT gateway routes.
Detects misconfigurations and routing issues that silently degrade latency.
"""
import time
import socket
import logging
import json
from dataclasses import dataclass
from typing import Optional
import boto3
logger = logging.getLogger(__name__)
@dataclass
class EndpointHealthResult:
"""Health check result for a single VPC endpoint."""
service: str
endpoint_id: str
dns_resolution_ms: float
tcp_connect_ms: float
is_private_ip: bool
resolved_ip: str
status: str # "healthy", "degraded", "unhealthy"
notes: str = ""
class VPCEndpointHealthChecker:
"""
Validates VPC endpoint health and detects latency regressions.
Checks:
1. DNS resolves to a private IP (not public — would indicate VPC
endpoint misconfiguration)
2. TCP connection time is within expected range for private link
3. Latency is better than NAT gateway baseline
Usage:
checker = VPCEndpointHealthChecker()
results = checker.check_all()
for result in results:
if result.status != "healthy":
alert(result)
"""
# MangaAssist service endpoints and expected behavior
ENDPOINTS = {
"bedrock-runtime": {
"hostname": "bedrock-runtime.ap-northeast-1.amazonaws.com",
"port": 443,
"expected_max_dns_ms": 5.0,
"expected_max_connect_ms": 10.0,
"nat_baseline_ms": 45.0,
},
"dynamodb": {
"hostname": "dynamodb.ap-northeast-1.amazonaws.com",
"port": 443,
"expected_max_dns_ms": 2.0,
"expected_max_connect_ms": 5.0,
"nat_baseline_ms": 18.0,
},
"opensearch": {
"hostname": "manga-assist-collection.ap-northeast-1.aoss.amazonaws.com",
"port": 443,
"expected_max_dns_ms": 5.0,
"expected_max_connect_ms": 10.0,
"nat_baseline_ms": 72.0,
},
"s3": {
"hostname": "s3.ap-northeast-1.amazonaws.com",
"port": 443,
"expected_max_dns_ms": 2.0,
"expected_max_connect_ms": 5.0,
"nat_baseline_ms": 15.0,
},
"cloudwatch-logs": {
"hostname": "logs.ap-northeast-1.amazonaws.com",
"port": 443,
"expected_max_dns_ms": 5.0,
"expected_max_connect_ms": 10.0,
"nat_baseline_ms": 12.0,
},
}
# Private IP ranges (RFC 1918)
PRIVATE_RANGES = [
("10.0.0.0", "10.255.255.255"),
("172.16.0.0", "172.31.255.255"),
("192.168.0.0", "192.168.255.255"),
]
def __init__(self, region: str = "ap-northeast-1"):
self.region = region
self.ec2_client = boto3.client("ec2", region_name=region)
self.cw_client = boto3.client("cloudwatch", region_name=region)
def check_all(self) -> list[EndpointHealthResult]:
"""Run health checks on all VPC endpoints."""
results = []
for service_name, config in self.ENDPOINTS.items():
result = self._check_endpoint(service_name, config)
results.append(result)
self._emit_metric(result)
return results
def _check_endpoint(self, service_name: str,
config: dict) -> EndpointHealthResult:
"""Check a single endpoint's DNS and connectivity."""
hostname = config["hostname"]
port = config["port"]
# Step 1: DNS resolution timing
dns_start = time.monotonic()
try:
resolved_ip = socket.gethostbyname(hostname)
dns_ms = (time.monotonic() - dns_start) * 1000
except socket.gaierror as e:
return EndpointHealthResult(
service=service_name,
endpoint_id="unknown",
dns_resolution_ms=-1,
tcp_connect_ms=-1,
is_private_ip=False,
resolved_ip="UNRESOLVED",
status="unhealthy",
notes=f"DNS resolution failed: {e}",
)
# Step 2: Check if resolved IP is private (VPC endpoint)
is_private = self._is_private_ip(resolved_ip)
# Step 3: TCP connection timing
tcp_start = time.monotonic()
try:
sock = socket.create_connection(
(resolved_ip, port), timeout=5.0
)
tcp_ms = (time.monotonic() - tcp_start) * 1000
sock.close()
except (socket.timeout, ConnectionRefusedError, OSError) as e:
return EndpointHealthResult(
service=service_name,
endpoint_id="unknown",
dns_resolution_ms=dns_ms,
tcp_connect_ms=-1,
is_private_ip=is_private,
resolved_ip=resolved_ip,
status="unhealthy",
notes=f"TCP connection failed: {e}",
)
# Step 4: Determine health status
status = "healthy"
notes = []
if not is_private:
status = "degraded"
notes.append(
f"Resolved to public IP {resolved_ip} — traffic may be "
f"routing via NAT gateway instead of VPC endpoint"
)
if dns_ms > config["expected_max_dns_ms"]:
if status == "healthy":
status = "degraded"
notes.append(
f"DNS resolution slow: {dns_ms:.1f}ms "
f"(expected < {config['expected_max_dns_ms']}ms)"
)
if tcp_ms > config["expected_max_connect_ms"]:
if status == "healthy":
status = "degraded"
notes.append(
f"TCP connect slow: {tcp_ms:.1f}ms "
f"(expected < {config['expected_max_connect_ms']}ms)"
)
total_ms = dns_ms + tcp_ms
if total_ms > config["nat_baseline_ms"]:
status = "unhealthy"
notes.append(
f"Total latency {total_ms:.1f}ms exceeds NAT baseline "
f"{config['nat_baseline_ms']}ms — VPC endpoint may be "
f"misconfigured or not in use"
)
# Look up VPC endpoint ID
endpoint_id = self._find_endpoint_id(service_name)
return EndpointHealthResult(
service=service_name,
endpoint_id=endpoint_id or "not-found",
dns_resolution_ms=round(dns_ms, 2),
tcp_connect_ms=round(tcp_ms, 2),
is_private_ip=is_private,
resolved_ip=resolved_ip,
status=status,
notes="; ".join(notes) if notes else "All checks passed",
)
def _is_private_ip(self, ip: str) -> bool:
"""Check if an IP address is in a private range."""
ip_parts = list(map(int, ip.split(".")))
ip_num = (ip_parts[0] << 24) + (ip_parts[1] << 16) + (
ip_parts[2] << 8) + ip_parts[3]
for start, end in self.PRIVATE_RANGES:
start_parts = list(map(int, start.split(".")))
end_parts = list(map(int, end.split(".")))
start_num = (start_parts[0] << 24) + (start_parts[1] << 16) + (
start_parts[2] << 8) + start_parts[3]
end_num = (end_parts[0] << 24) + (end_parts[1] << 16) + (
end_parts[2] << 8) + end_parts[3]
if start_num <= ip_num <= end_num:
return True
return False
def _find_endpoint_id(self, service_name: str) -> Optional[str]:
"""Look up VPC endpoint ID for a service."""
try:
# Map service names to VPC endpoint service names
service_map = {
"bedrock-runtime": f"com.amazonaws.{self.region}.bedrock-runtime",
"dynamodb": f"com.amazonaws.{self.region}.dynamodb",
"opensearch": f"com.amazonaws.{self.region}.aoss",
"s3": f"com.amazonaws.{self.region}.s3",
"cloudwatch-logs": f"com.amazonaws.{self.region}.logs",
}
vpc_service = service_map.get(service_name)
if not vpc_service:
return None
resp = self.ec2_client.describe_vpc_endpoints(
Filters=[
{"Name": "service-name", "Values": [vpc_service]},
{"Name": "vpc-endpoint-state", "Values": ["available"]},
],
MaxResults=10,
)
endpoints = resp.get("VpcEndpoints", [])
if endpoints:
return endpoints[0]["VpcEndpointId"]
except Exception as e:
logger.warning("Failed to look up VPC endpoint for %s: %s",
service_name, e)
return None
def _emit_metric(self, result: EndpointHealthResult):
"""Emit health metrics to CloudWatch."""
status_value = {"healthy": 1, "degraded": 0.5, "unhealthy": 0}
try:
self.cw_client.put_metric_data(
Namespace="MangaAssist/VPCEndpoints",
MetricData=[
{
"MetricName": "EndpointHealth",
"Value": status_value.get(result.status, 0),
"Unit": "None",
"Dimensions": [
{"Name": "Service", "Value": result.service},
],
},
{
"MetricName": "DNSResolutionLatency",
"Value": max(result.dns_resolution_ms, 0),
"Unit": "Milliseconds",
"Dimensions": [
{"Name": "Service", "Value": result.service},
],
},
{
"MetricName": "TCPConnectLatency",
"Value": max(result.tcp_connect_ms, 0),
"Unit": "Milliseconds",
"Dimensions": [
{"Name": "Service", "Value": result.service},
],
},
{
"MetricName": "IsPrivateIP",
"Value": 1 if result.is_private_ip else 0,
"Unit": "None",
"Dimensions": [
{"Name": "Service", "Value": result.service},
],
},
],
)
except Exception as e:
logger.error("Failed to emit VPC endpoint metrics: %s", e)
def generate_report(self) -> str:
"""Generate a human-readable health report."""
results = self.check_all()
lines = ["MangaAssist VPC Endpoint Health Report", "=" * 45, ""]
for r in results:
emoji_map = {"healthy": "[OK]", "degraded": "[WARN]", "unhealthy": "[FAIL]"}
lines.append(f"{emoji_map.get(r.status, '?')} {r.service}")
lines.append(f" Endpoint: {r.endpoint_id}")
lines.append(f" Resolved IP: {r.resolved_ip} (private={r.is_private_ip})")
lines.append(f" DNS: {r.dns_resolution_ms}ms | TCP: {r.tcp_connect_ms}ms")
lines.append(f" Status: {r.status}")
lines.append(f" Notes: {r.notes}")
lines.append("")
return "\n".join(lines)
Optimized Request Flow — Time Saved at Each Step
sequenceDiagram
participant User
participant APIGW as API Gateway
participant ECS as ECS Orchestrator
participant Redis as Redis<br/>(VPC Endpoint)
participant OS as OpenSearch<br/>(VPC Endpoint)
participant DDB as DynamoDB<br/>(VPC Endpoint)
participant Bedrock as Bedrock<br/>(VPC Endpoint)
participant Guard as Guardrails
Note over User,Guard: Optimized MangaAssist Request Flow
User->>APIGW: WebSocket message
Note right of APIGW: 8ms (was 12ms)<br/>Saved: 4ms (keep-alive)
APIGW->>ECS: Route to task
Note right of ECS: 3ms overhead<br/>Saved: 0ms (was 3ms)
ECS->>Redis: Cache check (pre-warmed conn)
Note right of Redis: 1.5ms (was 3ms)<br/>Saved: 1.5ms (VPC endpoint)
Redis-->>ECS: Cache miss
par Parallel Retrieval (VPC endpoints)
ECS->>OS: Vector search (filtered, 6 fields)
Note right of OS: 65ms (was 150ms)<br/>Saved: 85ms (filter + source + VPC)
ECS->>DDB: Session load (prefetched!)
Note right of DDB: 0ms (was 12ms)<br/>Saved: 12ms (prefetch hit)
end
OS-->>ECS: Top 5 products
DDB-->>ECS: Already in memory
ECS->>ECS: Prompt assembly (3ms)
Note right of ECS: 3ms (was 5ms)<br/>Saved: 2ms (smaller payload)
ECS->>Bedrock: Invoke model (streaming, pre-warmed)
Note right of Bedrock: 680ms (was 800ms)<br/>Saved: 120ms (VPC + keep-alive + smaller prompt)
Bedrock-->>ECS: Stream tokens
ECS-->>APIGW: Stream to user
APIGW-->>User: First token arrives
par Async Post-Processing (fire-and-forget)
ECS->>Guard: Guardrails (async on complete)
ECS->>Redis: Cache write (fire-and-forget)
ECS->>DDB: Session update (fire-and-forget)
end
Note over User,Guard: Total: ~760ms (was ~1,100ms) — Saved 340ms (31%)
Before/After Latency Waterfall Comparison
Before Optimization
gantt
title Before Optimization — p95: 1,100ms
dateFormat X
axisFormat %L ms
section Edge
API Gateway (12ms) :0, 12
section Orchestration
ECS Routing (5ms) :12, 17
Intent Classify (3ms) :17, 20
section Data (Sequential!)
Redis Cache Check (3ms) :20, 23
DynamoDB Session (12ms) :23, 35
OpenSearch Search (150ms) :35, 185
section FM
Prompt Assembly (5ms) :185, 190
Bedrock Invoke (800ms) :190, 990
section Post (Synchronous!)
Guardrails (40ms) :990, 1030
Cache Write (5ms) :1030, 1035
Session Update (10ms) :1035, 1045
Response Format (5ms) :1045, 1050
WebSocket Send (10ms) :1050, 1060
After Optimization
gantt
title After Optimization — p95: 760ms (31% reduction)
dateFormat X
axisFormat %L ms
section Edge
API Gateway (8ms) :0, 8
section Orchestration
ECS Routing (3ms) :8, 11
section Data (Parallel + Prefetched!)
Redis Check (1.5ms) :11, 13
OpenSearch Filtered (65ms) :13, 78
DynamoDB (prefetched: 0ms) :13, 13
section FM
Prompt Assembly (3ms) :78, 81
Bedrock Invoke (680ms) :81, 761
section Post (Async!)
Stream Starts Immediately :761, 761
Where the 340ms savings come from:
| Optimization | Savings |
|---|---|
| VPC endpoints (all services) | 50ms |
| OpenSearch filter + source selection | 85ms |
| DynamoDB prefetch on connect | 12ms |
| Parallel data retrieval | 12ms (overlap) |
| Async post-processing | 55ms (cache write + session update + guardrails off critical path) |
| Connection pre-warming | 30ms (eliminates cold-start) |
| Smaller prompt payload | 5ms |
| HTTP keep-alive | 15ms |
| DNS caching | 10ms |
| Total | ~340ms |
Key Takeaways
-
VPC endpoints + connection pre-warming are table stakes — deploy them before touching application code. Combined they save 80ms with minimal risk.
-
Prefetching on WebSocket connect is a unique advantage — MangaAssist knows the user before the first message arrives. Pre-loading session data and trending products makes the first query as fast as the tenth.
-
Async post-processing is the simplest architectural change with the highest impact — moving 55ms of synchronous writes off the critical path requires only wrapping existing calls in
asyncio.create_task. -
OpenSearch query optimization is the highest single-technique win — 85ms saved by adding filter clauses and limiting returned fields. This is pure application logic, no infrastructure changes needed.
-
The modular monolith pattern avoids inter-service hop tax — every internal network call costs 5-8ms. Co-locating intent classification, enrichment, and prompt assembly in the ECS orchestrator saves 15-24ms.
-
Latency optimization compounds — each individual technique saves 5-85ms, but combined they reduce p95 from 1,100ms to 760ms. The key is to attack every segment, not just the biggest one.