LOCAL PREVIEW View on GitHub

Latency Reduction Techniques for FM Systems

MangaAssist is a JP Manga store chatbot running on AWS. The stack includes Bedrock Claude 3 (Sonnet for complex queries, Haiku for simple), OpenSearch Serverless for vector retrieval, DynamoDB for sessions/products/orders, ECS Fargate for orchestration, API Gateway WebSocket for real-time communication, and ElastiCache Redis for semantic caching. The p95 end-to-end latency target is < 2 seconds.


Skill Mapping

AWS AIP-C01 Skill Sub-Skill This File Covers
4.2.6 FM System Performance Connection Management HTTP/2 multiplexing, connection warm-up on ECS start
4.2.6 FM System Performance VPC Endpoint Optimization Private link to Bedrock/DynamoDB/OpenSearch, measured gains
4.2.6 FM System Performance Payload Optimization Request/response size minimization, compression
4.2.6 FM System Performance Async Patterns Fire-and-forget for logging/analytics, async cache writes
4.2.6 FM System Performance Prefetching Predictive context loading, popular product pre-caching
4.2.6 FM System Performance DNS Optimization Caching DNS resolution for AWS endpoints

Deep-Dive: Latency Reduction for MangaAssist

Every millisecond matters when the p95 target is 2,000ms and Bedrock alone can consume 800ms. This document covers the techniques MangaAssist uses to shave latency from every non-FM segment, maximizing the budget available for model inference.


1. Connection Management

HTTP/2 Multiplexing

HTTP/1.1 connections are one-request-at-a-time. When MangaAssist needs to call Bedrock, OpenSearch, and DynamoDB in quick succession, each call blocks until the previous one completes — even on the same connection.

HTTP/2 multiplexes multiple requests over a single TCP connection, eliminating head-of-line blocking at the protocol level.

Impact on MangaAssist:

Scenario HTTP/1.1 HTTP/2 Savings
3 parallel Bedrock subsegment calls 3 connections, 3 TLS handshakes (90ms overhead) 1 connection, 1 TLS handshake (30ms overhead) 60ms
5 DynamoDB reads in batch 5 sequential on 1 connection 5 multiplexed on 1 connection ~20ms
OpenSearch + DynamoDB parallel 2 connections, 2 TLS handshakes 1 or 2 connections, streams multiplexed ~15ms

Configuration:

boto3 uses urllib3 under the hood, which supports HTTP/1.1 connection pooling but does not natively support HTTP/2 for AWS service calls. However, the latency savings come from:

  1. Connection reuse — boto3 keeps connections alive within the pool, so TLS handshakes happen only once per connection lifecycle.
  2. Concurrent streams — when using asyncio with aioboto3, concurrent requests to the same endpoint share transport-layer resources.

Connection Warm-Up on ECS Task Start

ECS Fargate tasks are ephemeral. When a new task starts (scale-out event or deployment), the first request pays a cold-start tax:

Cold-start penalty breakdown:
  DNS resolution .............. 5-15ms
  TCP handshake ............... 5-10ms
  TLS handshake ............... 15-25ms
  SDK initialization .......... 10-20ms
  ─────────────────────────────────────
  Total first-request penalty:  35-70ms per downstream service

With 4 downstream services (Bedrock, OpenSearch, DynamoDB, Redis), the total cold-start penalty can reach 140-280ms on the very first request.

Warm-up strategy:

sequenceDiagram
    participant ECS as ECS Task Startup
    participant Health as Health Check Endpoint
    participant Bedrock as Bedrock
    participant OS as OpenSearch
    participant DDB as DynamoDB
    participant Redis as Redis

    Note over ECS: Container starts, app initializes

    ECS->>Health: Register health check handler

    par Pre-Warm All Connections
        ECS->>Bedrock: Minimal invoke (Haiku, 1 token)
        ECS->>OS: Ping cluster health
        ECS->>DDB: GetItem (dummy key)
        ECS->>Redis: PING
    end

    Bedrock-->>ECS: Connection established + warm
    OS-->>ECS: Connection established + warm
    DDB-->>ECS: Connection established + warm
    Redis-->>ECS: Connection established + warm

    Note over ECS: All pools warm. Mark healthy.

    ECS->>Health: Return 200 OK (ready for traffic)

    Note over Health: ALB starts routing traffic to this task

Key detail: The ECS task does not pass its ALB health check until all connections are warm. This prevents the load balancer from routing traffic to a cold task.


2. VPC Endpoint Optimization

The NAT Gateway Latency Problem

Without VPC endpoints, all traffic from ECS Fargate tasks in a private subnet to AWS services traverses:

graph LR
    ECS["ECS Fargate<br/>(Private Subnet)"]
    NAT["NAT Gateway<br/>(+10-20ms processing)"]
    IGW["Internet Gateway"]
    SERVICE["AWS Service<br/>Endpoint"]

    ECS -->|"Private IP"| NAT
    NAT -->|"Public IP"| IGW
    IGW -->|"Public Internet"| SERVICE

    style NAT fill:#e74c3c,color:#fff
    style ECS fill:#3498db,color:#fff
    style SERVICE fill:#ff9900,color:#000

The NAT Gateway adds 10-20ms per request (packet inspection, address translation, connection tracking). For a single MangaAssist request that touches 4+ AWS services, this compounds.

VPC Endpoint Architecture

graph LR
    ECS["ECS Fargate<br/>(Private Subnet)"]
    VPCE_B["VPC Endpoint<br/>Bedrock<br/>(Interface)"]
    VPCE_D["VPC Endpoint<br/>DynamoDB<br/>(Gateway)"]
    VPCE_O["VPC Endpoint<br/>OpenSearch<br/>(Interface)"]
    VPCE_S["VPC Endpoint<br/>S3<br/>(Gateway)"]
    VPCE_CW["VPC Endpoint<br/>CloudWatch<br/>(Interface)"]

    BEDROCK["Bedrock Runtime"]
    DDB["DynamoDB"]
    OS["OpenSearch Serverless"]
    S3["S3"]
    CW["CloudWatch"]

    ECS -->|"Private DNS"| VPCE_B --> BEDROCK
    ECS -->|"Route Table"| VPCE_D --> DDB
    ECS -->|"Private DNS"| VPCE_O --> OS
    ECS -->|"Route Table"| VPCE_S --> S3
    ECS -->|"Private DNS"| VPCE_CW --> CW

    style ECS fill:#3498db,color:#fff
    style VPCE_B fill:#9b59b6,color:#fff
    style VPCE_D fill:#9b59b6,color:#fff
    style VPCE_O fill:#9b59b6,color:#fff
    style VPCE_S fill:#9b59b6,color:#fff
    style VPCE_CW fill:#9b59b6,color:#fff
    style BEDROCK fill:#ff9900,color:#000
    style DDB fill:#ff9900,color:#000
    style OS fill:#ff9900,color:#000
    style S3 fill:#ff9900,color:#000
    style CW fill:#ff9900,color:#000

Measured Latency Improvement

Service Via NAT Gateway (p95) Via VPC Endpoint (p95) Improvement Endpoint Type
Bedrock Runtime 45ms network overhead 30ms network overhead -15ms (-33%) Interface
DynamoDB 18ms 8ms -10ms (-56%) Gateway (free)
OpenSearch Serverless 72ms 60ms -12ms (-17%) Interface
S3 15ms 8ms -7ms (-47%) Gateway (free)
CloudWatch Logs 12ms 6ms -6ms (-50%) Interface
Per-request total ~162ms overhead ~112ms overhead -50ms (-31%)

Cost Analysis

Item Monthly Cost
NAT Gateway (before): data processing @ $0.045/GB, 500GB/month $22.50 + $32.40 (hourly) = ~$55
VPC Interface Endpoints (3 endpoints x 2 AZs): $0.01/hr each $43.80
VPC Gateway Endpoints (DynamoDB, S3) Free
Net savings ~$11/month + 50ms latency

At MangaAssist scale, the cost difference is negligible. The latency improvement is the primary driver.


3. Payload Optimization

Request Payload Minimization

Every byte sent to Bedrock, OpenSearch, or DynamoDB adds serialization time, network transit time, and deserialization time.

Bedrock prompt payload:

Approach Payload Size Serialization Time
Full conversation history (20 turns) ~12 KB 2.1ms
Pruned to last 5 turns + summary ~3.5 KB 0.6ms
Compressed (gzip) ~1.2 KB 0.3ms + 0.2ms decompress

OpenSearch response payload:

Approach Response Size Deserialization Time
All fields (25 fields per doc, 5 docs) ~8 KB 1.8ms
Selected fields (6 fields per doc, 5 docs) ~1.6 KB 0.4ms

Compression for Large Context

When conversation history is long (10+ turns), MangaAssist applies compression before caching and decompresses on retrieval:

Original session context: 15,200 bytes
Gzip compressed:           3,800 bytes (75% reduction)
Compression time:          0.3ms
Decompression time:        0.1ms
Redis storage savings:     75%
Network transit savings:   ~2ms per cache operation

Net benefit: 0.4ms compression overhead vs. 2ms network savings = 1.6ms net gain per cache operation.


4. Async Patterns

Fire-and-Forget for Non-Critical Operations

MangaAssist classifies operations as critical-path (must complete before response) and non-critical (can complete after response starts streaming).

graph TB
    subgraph "Critical Path (blocks response)"
        A1[Cache Lookup]
        A2[Vector Search]
        A3[Session Load]
        A4[Bedrock Invoke]
        A5[Guardrails Check]
    end

    subgraph "Non-Critical (fire-and-forget)"
        B1[Cache Write]
        B2[Session Update]
        B3[Analytics Emit]
        B4[Audit Log Write]
        B5[Feedback Signal]
    end

    A4 -->|"Response ready"| STREAM[Stream to Client]
    A4 -.->|"Async after response starts"| B1
    A4 -.->|"Async after response starts"| B2
    A4 -.->|"Async after response starts"| B3
    A4 -.->|"Async after response starts"| B4

    style A1 fill:#e74c3c,color:#fff
    style A2 fill:#e74c3c,color:#fff
    style A3 fill:#e74c3c,color:#fff
    style A4 fill:#e74c3c,color:#fff
    style A5 fill:#e74c3c,color:#fff
    style B1 fill:#2ecc71,color:#000
    style B2 fill:#2ecc71,color:#000
    style B3 fill:#2ecc71,color:#000
    style B4 fill:#2ecc71,color:#000
    style B5 fill:#2ecc71,color:#000
    style STREAM fill:#3498db,color:#fff

Latency savings from async patterns:

Operation Sync Duration Async Impact on Critical Path Savings
Cache write (Redis SET) 3-5ms 0ms (fire-and-forget) 3-5ms
Session update (DynamoDB PUT) 5-10ms 0ms (fire-and-forget) 5-10ms
Analytics emit (Kinesis PUT) 3-8ms 0ms (fire-and-forget) 3-8ms
Audit log (CloudWatch PUT) 2-5ms 0ms (fire-and-forget) 2-5ms
Total non-critical work 13-28ms 0ms 13-28ms

Async Cache Writes — Handling Failures

Fire-and-forget does not mean fire-and-ignore. MangaAssist handles async write failures via:

  1. Dead letter queue — failed cache writes go to an SQS DLQ for retry
  2. Metric emissionCacheWriteFailure CloudWatch metric triggers alerts
  3. Graceful degradation — a missed cache write means the next identical query re-computes (costs more but still works)

5. Prefetching

Predictive Context Loading

When a WebSocket connection opens, MangaAssist knows the user's session ID. Before the user sends their first message, the system prefetches:

sequenceDiagram
    participant User
    participant APIGW as API Gateway
    participant ECS as ECS Orchestrator
    participant Redis as Redis Cache
    participant DDB as DynamoDB
    participant OS as OpenSearch

    User->>APIGW: WebSocket CONNECT
    APIGW->>ECS: $connect event

    par Prefetch on Connect
        ECS->>Redis: Prefetch session cache
        ECS->>DDB: Prefetch session data
        ECS->>DDB: Prefetch user preferences
        ECS->>OS: Prefetch trending manga (top 10)
    end

    Redis-->>ECS: Cached session (or miss)
    DDB-->>ECS: Session + preferences
    OS-->>ECS: Trending products

    Note over ECS: Context pre-loaded in memory

    User->>APIGW: First message: "recommend something"
    APIGW->>ECS: $default route

    Note over ECS: Session already loaded!<br/>Skip DynamoDB read (saved 8ms).<br/>Trending products ready (saved 60ms).

    ECS->>ECS: Assemble prompt (using prefetched data)
    ECS->>ECS: Continue pipeline...

Latency savings on first message: 60-80ms (session + preferences already loaded).

MangaAssist pre-computes and caches results for the most common queries:

Query Pattern Frequency Pre-cached? Cache TTL Savings When Hit
"trending manga" / "popular this week" 22% of queries Yes 15 min ~900ms (skip Bedrock)
"new releases" 15% of queries Yes 1 hour ~900ms
Genre browsing ("show me shonen") 12% of queries Yes 30 min ~700ms
Specific title lookup ("One Piece latest") 8% of queries Yes (top 100 titles) 5 min ~500ms
Custom/unique queries 43% of queries No - 0ms

Aggregate impact: ~57% of queries benefit from prefetching, with average savings of 750ms per hit.


6. Service Mesh and Hop Minimization

Minimizing Inter-Service Hops

Every network hop adds latency (DNS, TCP, TLS, serialization). MangaAssist minimizes hops by co-locating logic:

graph TB
    subgraph "Before: 4 internal hops"
        direction TB
        R1[Request Handler] --> IC1[Intent Classifier Service]
        IC1 --> PE1[Product Enrichment Service]
        PE1 --> PA1[Prompt Assembly Service]
        PA1 --> BI1[Bedrock Invoker Service]
    end

    subgraph "After: 1 monolith + direct calls"
        direction TB
        R2[ECS Orchestrator<br/>Intent + Enrichment + Assembly<br/>all in-process] --> BI2[Bedrock<br/>Direct Call]
    end

    style R1 fill:#e74c3c,color:#fff
    style IC1 fill:#e74c3c,color:#fff
    style PE1 fill:#e74c3c,color:#fff
    style PA1 fill:#e74c3c,color:#fff
    style BI1 fill:#e74c3c,color:#fff

    style R2 fill:#2ecc71,color:#000
    style BI2 fill:#2ecc71,color:#000
Architecture Internal Hops Per-Hop Overhead Total Internal Overhead
Microservices (4 internal services) 3 5-8ms 15-24ms
Modular monolith (co-located) 0 0ms 0ms

MangaAssist uses a modular monolith pattern: intent classification, product enrichment, and prompt assembly run in-process within the ECS orchestrator. Only external AWS service calls (Bedrock, OpenSearch, DynamoDB, Redis) cross the network.

When to add microservice hops: Only when a component needs independent scaling (e.g., a batch indexing service that runs on a different schedule from the chatbot).


7. DNS Optimization

Caching DNS Resolution for AWS Endpoints

Each AWS service call begins with a DNS resolution. Without caching, the ECS task resolves DNS for every request:

bedrock-runtime.ap-northeast-1.amazonaws.com  →  DNS lookup  →  5-15ms
search-manga-collection.ap-northeast-1.aoss.amazonaws.com  →  DNS lookup  →  5-15ms
dynamodb.ap-northeast-1.amazonaws.com  →  DNS lookup  →  5-15ms

MangaAssist DNS optimization:

  1. OS-level DNS cache — ECS Fargate containers run Amazon Linux 2 with nscd (Name Service Cache Daemon) configured for 60-second TTL.
  2. Application-level cachingurllib3 (used by boto3) caches resolved IPs for the connection pool lifetime.
  3. VPC DNS resolver — Route 53 Resolver in the VPC caches DNS for all VPC endpoints, reducing resolution to < 1ms for cached entries.
DNS Strategy First Resolution Subsequent Resolutions TTL
No caching 5-15ms 5-15ms -
OS-level cache (nscd) 5-15ms < 1ms 60s
VPC Resolver + VPC Endpoints < 1ms (private hosted zone) < 1ms 300s

Total DNS savings per request: ~15-30ms (across 4+ service calls, each saving 5-10ms on resolution).


Python Code — LatencyOptimizer with Prefetching

"""
MangaAssist Latency Optimizer — Prefetching and Async Patterns
Reduces end-to-end latency by pre-loading context on WebSocket connect
and moving non-critical operations off the hot path.
"""

import asyncio
import time
import json
import logging
import hashlib
from dataclasses import dataclass, field
from typing import Optional

import boto3
import redis.asyncio as aioredis
from aiobotocore.session import get_session as aioboto_session

logger = logging.getLogger(__name__)


@dataclass
class PrefetchedContext:
    """Pre-loaded context for a user session."""
    session_id: str
    session_data: Optional[dict] = None
    user_preferences: Optional[dict] = None
    trending_products: Optional[list] = None
    recent_interactions: Optional[list] = None
    prefetch_time_ms: float = 0.0
    is_warm: bool = False


@dataclass
class LatencyBudget:
    """Track how much latency budget remains for each request."""
    total_budget_ms: float = 2000.0
    consumed_ms: float = 0.0
    segments: dict = field(default_factory=dict)

    @property
    def remaining_ms(self) -> float:
        return self.total_budget_ms - self.consumed_ms

    def consume(self, segment: str, duration_ms: float):
        self.segments[segment] = duration_ms
        self.consumed_ms += duration_ms

    def is_over_budget(self) -> bool:
        return self.consumed_ms > self.total_budget_ms


class LatencyOptimizer:
    """
    Manages prefetching, async patterns, and latency budget tracking
    for MangaAssist requests.

    Usage:
        optimizer = LatencyOptimizer(redis_url, dynamodb_table, opensearch_host)

        # On WebSocket connect — prefetch context
        context = await optimizer.prefetch_on_connect(session_id, user_id)

        # On message — use prefetched context, track latency budget
        budget = LatencyBudget()
        response = await optimizer.process_with_budget(
            query, context, budget
        )

        # After response — fire-and-forget post-processing
        await optimizer.async_post_process(
            session_id, query, response, fire_and_forget=True
        )
    """

    def __init__(
        self,
        redis_url: str = "redis://manga-assist-cache.xxxx.apne1.cache.amazonaws.com:6379",
        dynamodb_table: str = "MangaAssist-Sessions",
        opensearch_host: str = "manga-assist-collection.ap-northeast-1.aoss.amazonaws.com",
        region: str = "ap-northeast-1",
    ):
        self.redis_url = redis_url
        self.dynamodb_table_name = dynamodb_table
        self.opensearch_host = opensearch_host
        self.region = region

        # Connection pool (initialized lazily)
        self._redis: Optional[aioredis.Redis] = None
        self._prefetch_cache: dict[str, PrefetchedContext] = {}

        # Prefetch config
        self.trending_cache_ttl = 900  # 15 minutes
        self.session_cache_ttl = 300   # 5 minutes
        self.max_prefetch_age_ms = 30_000  # re-prefetch if older than 30s

    async def _get_redis(self) -> aioredis.Redis:
        """Lazy-initialize async Redis connection."""
        if self._redis is None:
            self._redis = aioredis.from_url(
                self.redis_url,
                decode_responses=True,
                socket_timeout=2.0,
                socket_connect_timeout=1.0,
            )
        return self._redis

    async def prefetch_on_connect(self, session_id: str,
                                   user_id: str) -> PrefetchedContext:
        """
        Called when a WebSocket connection opens. Pre-loads all context
        needed for the first user message, so the first query is fast.
        """
        start = time.monotonic()
        context = PrefetchedContext(session_id=session_id)

        try:
            # Run all prefetch operations concurrently
            results = await asyncio.gather(
                self._prefetch_session(session_id),
                self._prefetch_preferences(user_id),
                self._prefetch_trending(),
                self._prefetch_recent_interactions(user_id),
                return_exceptions=True,
            )

            # Unpack results (handle individual failures gracefully)
            if not isinstance(results[0], Exception):
                context.session_data = results[0]
            else:
                logger.warning("Session prefetch failed: %s", results[0])

            if not isinstance(results[1], Exception):
                context.user_preferences = results[1]
            else:
                logger.warning("Preferences prefetch failed: %s", results[1])

            if not isinstance(results[2], Exception):
                context.trending_products = results[2]
            else:
                logger.warning("Trending prefetch failed: %s", results[2])

            if not isinstance(results[3], Exception):
                context.recent_interactions = results[3]
            else:
                logger.warning("Recent interactions prefetch failed: %s", results[3])

            context.is_warm = True

        except Exception as e:
            logger.error("Prefetch failed entirely: %s", e)
            context.is_warm = False

        context.prefetch_time_ms = (time.monotonic() - start) * 1000
        self._prefetch_cache[session_id] = context

        logger.info(
            "Prefetch complete for session %s in %.1fms (warm=%s)",
            session_id, context.prefetch_time_ms, context.is_warm,
        )
        return context

    async def _prefetch_session(self, session_id: str) -> Optional[dict]:
        """Load session data from Redis (fast) or DynamoDB (fallback)."""
        r = await self._get_redis()
        cached = await r.get(f"session:{session_id}")
        if cached:
            return json.loads(cached)

        # Fallback to DynamoDB
        session = aioboto_session()
        async with session.create_client("dynamodb", region_name=self.region) as ddb:
            resp = await ddb.get_item(
                TableName=self.dynamodb_table_name,
                Key={"session_id": {"S": session_id}},
            )
            item = resp.get("Item")
            if item:
                # Cache in Redis for subsequent requests
                session_data = self._deserialize_dynamodb(item)
                await r.setex(
                    f"session:{session_id}",
                    self.session_cache_ttl,
                    json.dumps(session_data),
                )
                return session_data
        return None

    async def _prefetch_preferences(self, user_id: str) -> Optional[dict]:
        """Load user preferences (language, genre preferences, price range)."""
        r = await self._get_redis()
        cached = await r.get(f"prefs:{user_id}")
        if cached:
            return json.loads(cached)
        # In production, load from DynamoDB preferences table
        return {"language": "ja", "preferred_genres": ["shonen", "seinen"],
                "price_max": 2000}

    async def _prefetch_trending(self) -> Optional[list]:
        """Load trending manga products (shared across all users)."""
        r = await self._get_redis()
        cached = await r.get("trending:manga:top10")
        if cached:
            return json.loads(cached)
        # In production, populated by a scheduled Lambda every 15 minutes
        return []

    async def _prefetch_recent_interactions(self, user_id: str) -> Optional[list]:
        """Load recent interaction history for personalization."""
        r = await self._get_redis()
        cached = await r.lrange(f"interactions:{user_id}", 0, 9)
        if cached:
            return [json.loads(item) for item in cached]
        return []

    def get_prefetched_context(self, session_id: str) -> Optional[PrefetchedContext]:
        """Retrieve prefetched context for a session (in-memory lookup)."""
        context = self._prefetch_cache.get(session_id)
        if context and context.is_warm:
            age_ms = (time.monotonic() * 1000) - (
                context.prefetch_time_ms + time.monotonic() * 1000
                - context.prefetch_time_ms
            )
            return context
        return None

    async def async_post_process(
        self,
        session_id: str,
        query: str,
        response: str,
        fire_and_forget: bool = True,
    ):
        """
        Non-critical post-processing: cache write, session update,
        analytics. Runs async and does not block response delivery.
        """
        tasks = [
            self._async_cache_write(session_id, query, response),
            self._async_session_update(session_id, query, response),
            self._async_analytics_emit(session_id, query, response),
        ]

        if fire_and_forget:
            # Schedule tasks but do not await them — they run in background
            for task in tasks:
                asyncio.create_task(
                    self._safe_execute(task),
                    name=f"post-process-{session_id}",
                )
        else:
            # Await all (used in testing or when consistency is required)
            await asyncio.gather(*tasks, return_exceptions=True)

    async def _safe_execute(self, coro):
        """Execute a coroutine with exception logging (no propagation)."""
        try:
            await coro
        except Exception as e:
            logger.error("Async post-process failed: %s", e)

    async def _async_cache_write(self, session_id: str,
                                  query: str, response: str):
        """Write query-response pair to semantic cache."""
        r = await self._get_redis()
        cache_key = f"response:{hashlib.sha256(query.encode()).hexdigest()[:16]}"
        await r.setex(cache_key, 3600, json.dumps({
            "query": query,
            "response": response,
            "session_id": session_id,
            "timestamp": time.time(),
        }))

    async def _async_session_update(self, session_id: str,
                                     query: str, response: str):
        """Append interaction to session history."""
        r = await self._get_redis()
        interaction = json.dumps({
            "query": query,
            "response_preview": response[:200],
            "timestamp": time.time(),
        })
        await r.rpush(f"session_history:{session_id}", interaction)
        await r.expire(f"session_history:{session_id}", 3600)

    async def _async_analytics_emit(self, session_id: str,
                                     query: str, response: str):
        """Emit analytics event to Kinesis (simplified)."""
        # In production, this would put a record to Kinesis Data Streams
        logger.info(
            "Analytics: session=%s, query_len=%d, response_len=%d",
            session_id, len(query), len(response),
        )

    @staticmethod
    def _deserialize_dynamodb(item: dict) -> dict:
        """Simple DynamoDB item deserializer."""
        result = {}
        for key, value in item.items():
            if "S" in value:
                result[key] = value["S"]
            elif "N" in value:
                result[key] = float(value["N"])
            elif "BOOL" in value:
                result[key] = value["BOOL"]
            elif "L" in value:
                result[key] = value["L"]
            elif "M" in value:
                result[key] = value["M"]
        return result

Python Code — VPCEndpointHealthChecker

"""
MangaAssist VPC Endpoint Health Checker
Validates that VPC endpoints are healthy and faster than NAT gateway routes.
Detects misconfigurations and routing issues that silently degrade latency.
"""

import time
import socket
import logging
import json
from dataclasses import dataclass
from typing import Optional

import boto3

logger = logging.getLogger(__name__)


@dataclass
class EndpointHealthResult:
    """Health check result for a single VPC endpoint."""
    service: str
    endpoint_id: str
    dns_resolution_ms: float
    tcp_connect_ms: float
    is_private_ip: bool
    resolved_ip: str
    status: str  # "healthy", "degraded", "unhealthy"
    notes: str = ""


class VPCEndpointHealthChecker:
    """
    Validates VPC endpoint health and detects latency regressions.

    Checks:
    1. DNS resolves to a private IP (not public — would indicate VPC
       endpoint misconfiguration)
    2. TCP connection time is within expected range for private link
    3. Latency is better than NAT gateway baseline

    Usage:
        checker = VPCEndpointHealthChecker()
        results = checker.check_all()
        for result in results:
            if result.status != "healthy":
                alert(result)
    """

    # MangaAssist service endpoints and expected behavior
    ENDPOINTS = {
        "bedrock-runtime": {
            "hostname": "bedrock-runtime.ap-northeast-1.amazonaws.com",
            "port": 443,
            "expected_max_dns_ms": 5.0,
            "expected_max_connect_ms": 10.0,
            "nat_baseline_ms": 45.0,
        },
        "dynamodb": {
            "hostname": "dynamodb.ap-northeast-1.amazonaws.com",
            "port": 443,
            "expected_max_dns_ms": 2.0,
            "expected_max_connect_ms": 5.0,
            "nat_baseline_ms": 18.0,
        },
        "opensearch": {
            "hostname": "manga-assist-collection.ap-northeast-1.aoss.amazonaws.com",
            "port": 443,
            "expected_max_dns_ms": 5.0,
            "expected_max_connect_ms": 10.0,
            "nat_baseline_ms": 72.0,
        },
        "s3": {
            "hostname": "s3.ap-northeast-1.amazonaws.com",
            "port": 443,
            "expected_max_dns_ms": 2.0,
            "expected_max_connect_ms": 5.0,
            "nat_baseline_ms": 15.0,
        },
        "cloudwatch-logs": {
            "hostname": "logs.ap-northeast-1.amazonaws.com",
            "port": 443,
            "expected_max_dns_ms": 5.0,
            "expected_max_connect_ms": 10.0,
            "nat_baseline_ms": 12.0,
        },
    }

    # Private IP ranges (RFC 1918)
    PRIVATE_RANGES = [
        ("10.0.0.0", "10.255.255.255"),
        ("172.16.0.0", "172.31.255.255"),
        ("192.168.0.0", "192.168.255.255"),
    ]

    def __init__(self, region: str = "ap-northeast-1"):
        self.region = region
        self.ec2_client = boto3.client("ec2", region_name=region)
        self.cw_client = boto3.client("cloudwatch", region_name=region)

    def check_all(self) -> list[EndpointHealthResult]:
        """Run health checks on all VPC endpoints."""
        results = []
        for service_name, config in self.ENDPOINTS.items():
            result = self._check_endpoint(service_name, config)
            results.append(result)
            self._emit_metric(result)
        return results

    def _check_endpoint(self, service_name: str,
                        config: dict) -> EndpointHealthResult:
        """Check a single endpoint's DNS and connectivity."""
        hostname = config["hostname"]
        port = config["port"]

        # Step 1: DNS resolution timing
        dns_start = time.monotonic()
        try:
            resolved_ip = socket.gethostbyname(hostname)
            dns_ms = (time.monotonic() - dns_start) * 1000
        except socket.gaierror as e:
            return EndpointHealthResult(
                service=service_name,
                endpoint_id="unknown",
                dns_resolution_ms=-1,
                tcp_connect_ms=-1,
                is_private_ip=False,
                resolved_ip="UNRESOLVED",
                status="unhealthy",
                notes=f"DNS resolution failed: {e}",
            )

        # Step 2: Check if resolved IP is private (VPC endpoint)
        is_private = self._is_private_ip(resolved_ip)

        # Step 3: TCP connection timing
        tcp_start = time.monotonic()
        try:
            sock = socket.create_connection(
                (resolved_ip, port), timeout=5.0
            )
            tcp_ms = (time.monotonic() - tcp_start) * 1000
            sock.close()
        except (socket.timeout, ConnectionRefusedError, OSError) as e:
            return EndpointHealthResult(
                service=service_name,
                endpoint_id="unknown",
                dns_resolution_ms=dns_ms,
                tcp_connect_ms=-1,
                is_private_ip=is_private,
                resolved_ip=resolved_ip,
                status="unhealthy",
                notes=f"TCP connection failed: {e}",
            )

        # Step 4: Determine health status
        status = "healthy"
        notes = []

        if not is_private:
            status = "degraded"
            notes.append(
                f"Resolved to public IP {resolved_ip} — traffic may be "
                f"routing via NAT gateway instead of VPC endpoint"
            )

        if dns_ms > config["expected_max_dns_ms"]:
            if status == "healthy":
                status = "degraded"
            notes.append(
                f"DNS resolution slow: {dns_ms:.1f}ms "
                f"(expected < {config['expected_max_dns_ms']}ms)"
            )

        if tcp_ms > config["expected_max_connect_ms"]:
            if status == "healthy":
                status = "degraded"
            notes.append(
                f"TCP connect slow: {tcp_ms:.1f}ms "
                f"(expected < {config['expected_max_connect_ms']}ms)"
            )

        total_ms = dns_ms + tcp_ms
        if total_ms > config["nat_baseline_ms"]:
            status = "unhealthy"
            notes.append(
                f"Total latency {total_ms:.1f}ms exceeds NAT baseline "
                f"{config['nat_baseline_ms']}ms — VPC endpoint may be "
                f"misconfigured or not in use"
            )

        # Look up VPC endpoint ID
        endpoint_id = self._find_endpoint_id(service_name)

        return EndpointHealthResult(
            service=service_name,
            endpoint_id=endpoint_id or "not-found",
            dns_resolution_ms=round(dns_ms, 2),
            tcp_connect_ms=round(tcp_ms, 2),
            is_private_ip=is_private,
            resolved_ip=resolved_ip,
            status=status,
            notes="; ".join(notes) if notes else "All checks passed",
        )

    def _is_private_ip(self, ip: str) -> bool:
        """Check if an IP address is in a private range."""
        ip_parts = list(map(int, ip.split(".")))
        ip_num = (ip_parts[0] << 24) + (ip_parts[1] << 16) + (
            ip_parts[2] << 8) + ip_parts[3]

        for start, end in self.PRIVATE_RANGES:
            start_parts = list(map(int, start.split(".")))
            end_parts = list(map(int, end.split(".")))
            start_num = (start_parts[0] << 24) + (start_parts[1] << 16) + (
                start_parts[2] << 8) + start_parts[3]
            end_num = (end_parts[0] << 24) + (end_parts[1] << 16) + (
                end_parts[2] << 8) + end_parts[3]
            if start_num <= ip_num <= end_num:
                return True
        return False

    def _find_endpoint_id(self, service_name: str) -> Optional[str]:
        """Look up VPC endpoint ID for a service."""
        try:
            # Map service names to VPC endpoint service names
            service_map = {
                "bedrock-runtime": f"com.amazonaws.{self.region}.bedrock-runtime",
                "dynamodb": f"com.amazonaws.{self.region}.dynamodb",
                "opensearch": f"com.amazonaws.{self.region}.aoss",
                "s3": f"com.amazonaws.{self.region}.s3",
                "cloudwatch-logs": f"com.amazonaws.{self.region}.logs",
            }
            vpc_service = service_map.get(service_name)
            if not vpc_service:
                return None

            resp = self.ec2_client.describe_vpc_endpoints(
                Filters=[
                    {"Name": "service-name", "Values": [vpc_service]},
                    {"Name": "vpc-endpoint-state", "Values": ["available"]},
                ],
                MaxResults=10,
            )
            endpoints = resp.get("VpcEndpoints", [])
            if endpoints:
                return endpoints[0]["VpcEndpointId"]
        except Exception as e:
            logger.warning("Failed to look up VPC endpoint for %s: %s",
                          service_name, e)
        return None

    def _emit_metric(self, result: EndpointHealthResult):
        """Emit health metrics to CloudWatch."""
        status_value = {"healthy": 1, "degraded": 0.5, "unhealthy": 0}
        try:
            self.cw_client.put_metric_data(
                Namespace="MangaAssist/VPCEndpoints",
                MetricData=[
                    {
                        "MetricName": "EndpointHealth",
                        "Value": status_value.get(result.status, 0),
                        "Unit": "None",
                        "Dimensions": [
                            {"Name": "Service", "Value": result.service},
                        ],
                    },
                    {
                        "MetricName": "DNSResolutionLatency",
                        "Value": max(result.dns_resolution_ms, 0),
                        "Unit": "Milliseconds",
                        "Dimensions": [
                            {"Name": "Service", "Value": result.service},
                        ],
                    },
                    {
                        "MetricName": "TCPConnectLatency",
                        "Value": max(result.tcp_connect_ms, 0),
                        "Unit": "Milliseconds",
                        "Dimensions": [
                            {"Name": "Service", "Value": result.service},
                        ],
                    },
                    {
                        "MetricName": "IsPrivateIP",
                        "Value": 1 if result.is_private_ip else 0,
                        "Unit": "None",
                        "Dimensions": [
                            {"Name": "Service", "Value": result.service},
                        ],
                    },
                ],
            )
        except Exception as e:
            logger.error("Failed to emit VPC endpoint metrics: %s", e)

    def generate_report(self) -> str:
        """Generate a human-readable health report."""
        results = self.check_all()
        lines = ["MangaAssist VPC Endpoint Health Report", "=" * 45, ""]

        for r in results:
            emoji_map = {"healthy": "[OK]", "degraded": "[WARN]", "unhealthy": "[FAIL]"}
            lines.append(f"{emoji_map.get(r.status, '?')} {r.service}")
            lines.append(f"    Endpoint: {r.endpoint_id}")
            lines.append(f"    Resolved IP: {r.resolved_ip} (private={r.is_private_ip})")
            lines.append(f"    DNS: {r.dns_resolution_ms}ms | TCP: {r.tcp_connect_ms}ms")
            lines.append(f"    Status: {r.status}")
            lines.append(f"    Notes: {r.notes}")
            lines.append("")

        return "\n".join(lines)

Optimized Request Flow — Time Saved at Each Step

sequenceDiagram
    participant User
    participant APIGW as API Gateway
    participant ECS as ECS Orchestrator
    participant Redis as Redis<br/>(VPC Endpoint)
    participant OS as OpenSearch<br/>(VPC Endpoint)
    participant DDB as DynamoDB<br/>(VPC Endpoint)
    participant Bedrock as Bedrock<br/>(VPC Endpoint)
    participant Guard as Guardrails

    Note over User,Guard: Optimized MangaAssist Request Flow

    User->>APIGW: WebSocket message
    Note right of APIGW: 8ms (was 12ms)<br/>Saved: 4ms (keep-alive)

    APIGW->>ECS: Route to task
    Note right of ECS: 3ms overhead<br/>Saved: 0ms (was 3ms)

    ECS->>Redis: Cache check (pre-warmed conn)
    Note right of Redis: 1.5ms (was 3ms)<br/>Saved: 1.5ms (VPC endpoint)
    Redis-->>ECS: Cache miss

    par Parallel Retrieval (VPC endpoints)
        ECS->>OS: Vector search (filtered, 6 fields)
        Note right of OS: 65ms (was 150ms)<br/>Saved: 85ms (filter + source + VPC)
        ECS->>DDB: Session load (prefetched!)
        Note right of DDB: 0ms (was 12ms)<br/>Saved: 12ms (prefetch hit)
    end

    OS-->>ECS: Top 5 products
    DDB-->>ECS: Already in memory

    ECS->>ECS: Prompt assembly (3ms)
    Note right of ECS: 3ms (was 5ms)<br/>Saved: 2ms (smaller payload)

    ECS->>Bedrock: Invoke model (streaming, pre-warmed)
    Note right of Bedrock: 680ms (was 800ms)<br/>Saved: 120ms (VPC + keep-alive + smaller prompt)

    Bedrock-->>ECS: Stream tokens
    ECS-->>APIGW: Stream to user
    APIGW-->>User: First token arrives

    par Async Post-Processing (fire-and-forget)
        ECS->>Guard: Guardrails (async on complete)
        ECS->>Redis: Cache write (fire-and-forget)
        ECS->>DDB: Session update (fire-and-forget)
    end

    Note over User,Guard: Total: ~760ms (was ~1,100ms) — Saved 340ms (31%)

Before/After Latency Waterfall Comparison

Before Optimization

gantt
    title Before Optimization — p95: 1,100ms
    dateFormat X
    axisFormat %L ms

    section Edge
    API Gateway (12ms)          :0, 12

    section Orchestration
    ECS Routing (5ms)           :12, 17
    Intent Classify (3ms)       :17, 20

    section Data (Sequential!)
    Redis Cache Check (3ms)     :20, 23
    DynamoDB Session (12ms)     :23, 35
    OpenSearch Search (150ms)   :35, 185

    section FM
    Prompt Assembly (5ms)       :185, 190
    Bedrock Invoke (800ms)      :190, 990

    section Post (Synchronous!)
    Guardrails (40ms)           :990, 1030
    Cache Write (5ms)           :1030, 1035
    Session Update (10ms)       :1035, 1045
    Response Format (5ms)       :1045, 1050
    WebSocket Send (10ms)       :1050, 1060

After Optimization

gantt
    title After Optimization — p95: 760ms (31% reduction)
    dateFormat X
    axisFormat %L ms

    section Edge
    API Gateway (8ms)           :0, 8

    section Orchestration
    ECS Routing (3ms)           :8, 11

    section Data (Parallel + Prefetched!)
    Redis Check (1.5ms)         :11, 13
    OpenSearch Filtered (65ms)  :13, 78
    DynamoDB (prefetched: 0ms)  :13, 13

    section FM
    Prompt Assembly (3ms)       :78, 81
    Bedrock Invoke (680ms)      :81, 761

    section Post (Async!)
    Stream Starts Immediately   :761, 761

Where the 340ms savings come from:

Optimization Savings
VPC endpoints (all services) 50ms
OpenSearch filter + source selection 85ms
DynamoDB prefetch on connect 12ms
Parallel data retrieval 12ms (overlap)
Async post-processing 55ms (cache write + session update + guardrails off critical path)
Connection pre-warming 30ms (eliminates cold-start)
Smaller prompt payload 5ms
HTTP keep-alive 15ms
DNS caching 10ms
Total ~340ms

Key Takeaways

  1. VPC endpoints + connection pre-warming are table stakes — deploy them before touching application code. Combined they save 80ms with minimal risk.

  2. Prefetching on WebSocket connect is a unique advantage — MangaAssist knows the user before the first message arrives. Pre-loading session data and trending products makes the first query as fast as the tenth.

  3. Async post-processing is the simplest architectural change with the highest impact — moving 55ms of synchronous writes off the critical path requires only wrapping existing calls in asyncio.create_task.

  4. OpenSearch query optimization is the highest single-technique win — 85ms saved by adding filter clauses and limiting returned fields. This is pure application logic, no infrastructure changes needed.

  5. The modular monolith pattern avoids inter-service hop tax — every internal network call costs 5-8ms. Co-locating intent classification, enrichment, and prompt assembly in the ECS orchestrator saves 15-24ms.

  6. Latency optimization compounds — each individual technique saves 5-85ms, but combined they reduce p95 from 1,100ms to 760ms. The key is to attack every segment, not just the biggest one.