LOCAL PREVIEW View on GitHub

FM System Performance Architecture

MangaAssist is a JP Manga store chatbot running on AWS. The stack includes Bedrock Claude 3 (Sonnet for complex queries, Haiku for simple), OpenSearch Serverless for vector retrieval, DynamoDB for sessions/products/orders, ECS Fargate for orchestration, API Gateway WebSocket for real-time communication, and ElastiCache Redis for semantic caching. The p95 end-to-end latency target is < 2 seconds.


Skill Mapping

AWS AIP-C01 Skill Sub-Skill This File Covers
4.2.6 FM System Performance API Call Profiling X-Ray trace analysis, per-service latency breakdown, bottleneck detection
4.2.6 FM System Performance Vector DB Query Optimization OpenSearch query tuning, filter optimization, source field selection
4.2.6 FM System Performance LLM Inference Latency Reduction Connection reuse, keep-alive, pre-warming, async invocation
4.2.6 FM System Performance Service Communication Patterns gRPC, connection pooling, circuit breakers, bulkhead isolation
4.2.6 FM System Performance Network Optimization VPC endpoints for Bedrock/DynamoDB/OpenSearch, NAT gateway elimination

Mind Map — System Performance Dimensions

mindmap
  root((4.2.6<br/>System<br/>Performance))
    API Call Profiling
      X-Ray Trace Analysis
      Per-Service Latency Breakdown
      Prompt-Completion Pattern Profiling
      Slow Segment Identification
      Automated Bottleneck Detection
    Vector DB Query Optimization
      OpenSearch Query Tuning
      Filter Optimization
      Source Field Selection
      Shard Routing
      Query Caching
    LLM Inference Latency
      Connection Reuse & Keep-Alive
      Pre-Warming ECS Tasks
      Async Invocation Patterns
      Payload Minimization
      Streaming First Token
    Service Communication
      gRPC Between Internal Services
      Connection Pooling
      Circuit Breakers
      Bulkhead Isolation
      Retry with Jitter
    Network Optimization
      VPC Endpoints for Bedrock
      VPC Endpoints for DynamoDB
      VPC Endpoints for OpenSearch
      NAT Gateway Elimination
      DNS Resolution Caching
    End-to-End Profiling
      Request Lifecycle Tracking
      Latency Budget Allocation
      SLI/SLO Alignment
      Performance Regression Detection

Architecture — Profiling Points Across the MangaAssist Request Path

Every hop in the MangaAssist pipeline is a profiling opportunity. The diagram below marks each X-Ray segment and the corresponding optimization lever.

graph TB
    subgraph "Client Layer"
        CLIENT[Customer Browser<br/>WebSocket Client]
    end

    subgraph "Edge Layer"
        APIGW["API Gateway WebSocket<br/>──────────<br/>X-Ray Segment: api-gateway<br/>Target: < 25ms"]
    end

    subgraph "Orchestration Layer"
        ECS["ECS Fargate Orchestrator<br/>──────────<br/>X-Ray Segment: ecs-orchestrator<br/>Target: < 15ms overhead"]
        ROUTER["Intent Router<br/>──────────<br/>X-Ray Subsegment: intent-routing<br/>Target: < 5ms"]
    end

    subgraph "Data Layer"
        CACHE["ElastiCache Redis<br/>──────────<br/>X-Ray Segment: cache-lookup<br/>Target: < 3ms"]
        DYNAMO["DynamoDB<br/>──────────<br/>X-Ray Segment: dynamo-session<br/>Target: < 8ms"]
        OPENSEARCH["OpenSearch Serverless<br/>──────────<br/>X-Ray Segment: vector-search<br/>Target: < 150ms"]
    end

    subgraph "FM Layer"
        BEDROCK["Bedrock Claude 3<br/>──────────<br/>X-Ray Segment: bedrock-invoke<br/>Target: < 800ms (Sonnet) / < 300ms (Haiku)"]
        GUARD["Bedrock Guardrails<br/>──────────<br/>X-Ray Segment: guardrails-check<br/>Target: < 40ms"]
    end

    subgraph "Network Layer"
        VPCE_BEDROCK["VPC Endpoint<br/>Bedrock<br/>Saves ~15ms vs NAT"]
        VPCE_DYNAMO["VPC Endpoint<br/>DynamoDB<br/>Saves ~10ms vs NAT"]
        VPCE_OS["VPC Endpoint<br/>OpenSearch<br/>Saves ~12ms vs NAT"]
    end

    CLIENT -->|"WSS"| APIGW
    APIGW --> ECS
    ECS --> ROUTER
    ROUTER --> CACHE
    CACHE -->|"miss"| DYNAMO
    CACHE -->|"miss"| OPENSEARCH
    ROUTER --> OPENSEARCH
    OPENSEARCH --> ECS
    DYNAMO --> ECS
    ECS --> BEDROCK
    BEDROCK --> GUARD
    GUARD --> ECS
    ECS -->|"stream"| APIGW
    APIGW -->|"stream"| CLIENT

    ECS -.->|"private link"| VPCE_BEDROCK -.-> BEDROCK
    ECS -.->|"private link"| VPCE_DYNAMO -.-> DYNAMO
    ECS -.->|"private link"| VPCE_OS -.-> OPENSEARCH

    style BEDROCK fill:#ff9900,color:#000
    style OPENSEARCH fill:#ff9900,color:#000
    style DYNAMO fill:#ff9900,color:#000
    style CACHE fill:#2ecc71,color:#000
    style ECS fill:#3498db,color:#fff
    style APIGW fill:#3498db,color:#fff
    style GUARD fill:#e74c3c,color:#fff
    style VPCE_BEDROCK fill:#9b59b6,color:#fff
    style VPCE_DYNAMO fill:#9b59b6,color:#fff
    style VPCE_OS fill:#9b59b6,color:#fff

1. API Call Profiling for Prompt-Completion Patterns

X-Ray Trace Analysis

Every MangaAssist request generates an X-Ray trace that spans the full pipeline: API Gateway to ECS to data stores to Bedrock and back. Profiling focuses on three dimensions:

  1. Segment timing — how long each service hop takes
  2. Subsegment detail — within Bedrock, separate prompt tokenization from generation
  3. Annotation correlation — tag traces with intent type, model used, cache hit/miss

Identifying Slow Segments

The most common slow segments in MangaAssist:

Segment Typical p50 Typical p95 Root Cause When Slow
bedrock-invoke 350ms 800ms Large prompt, Sonnet for simple queries, throttling
vector-search 60ms 150ms Unfiltered queries, returning too many fields, cold OCU
dynamo-session 4ms 12ms Hot partition, large session item, eventually-consistent read
cache-lookup 1ms 3ms Network hop to different AZ, large serialized value
guardrails-check 15ms 40ms Long response text, complex policy evaluation
api-gateway 8ms 25ms Auth/rate limit overhead, WebSocket frame assembly

Per-Service Latency Breakdown

gantt
    title MangaAssist p95 Latency Breakdown — Before Optimization
    dateFormat X
    axisFormat %L ms

    section Edge
    API Gateway + Auth        :0, 25

    section Orchestration
    ECS Routing + Session     :25, 50

    section Data Retrieval
    Redis Cache Check         :50, 53
    DynamoDB Session Load     :53, 65
    OpenSearch Vector Search  :65, 215

    section FM Inference
    Bedrock Prompt Assembly   :215, 230
    Bedrock Generation (p95)  :230, 1030

    section Post-Processing
    Guardrails Evaluation     :1030, 1070
    Response Formatting       :1070, 1085

    section Delivery
    WebSocket Stream Start    :1085, 1100

Total p95: ~1,100ms (within the 2s target, but optimizable).


2. Vector Database Query Optimization

OpenSearch Query Tuning

MangaAssist queries OpenSearch Serverless for manga product embeddings. Three optimization axes:

a) Filter Optimization — Reduce Candidate Set Before Vector Comparison

{
  "size": 5,
  "query": {
    "bool": {
      "filter": [
        { "term": { "category": "manga" } },
        { "term": { "language": "ja" } },
        { "term": { "in_stock": true } },
        { "range": { "price": { "lte": 2000 } } }
      ],
      "must": [
        {
          "knn": {
            "embedding": {
              "vector": [0.12, -0.34, 0.56],
              "k": 5
            }
          }
        }
      ]
    }
  }
}

Pre-filtering with bool.filter clauses reduces the vector search space from millions of documents to thousands, cutting search latency by 40-60%.

b) Source Field Selection — Return Only What the LLM Needs

{
  "_source": {
    "includes": ["title", "author", "genre", "price", "synopsis_short", "asin"]
  }
}

MangaAssist product documents contain 25+ fields (full descriptions, images, reviews, metadata). The LLM context only needs 6 fields. Returning only those fields reduces payload by ~80% and cuts deserialization time.

c) Shard Routing — Direct Queries to the Right Shard

For category-specific queries (e.g., "recommend shonen manga"), use routing to target only shards containing that category:

{
  "preference": "_shards:0,1",
  "query": { ... }
}

OpenSearch Optimization Impact

Technique Before After Improvement
Pre-filtering 150ms p95 85ms p95 -43%
Source field selection 85ms p95 65ms p95 -24%
Query caching (repeated queries) 65ms p95 8ms p95 -88%
Combined 150ms p95 65ms p95 (fresh) / 8ms (cached) -57% to -95%

3. LLM Inference Latency Reduction

Connection Reuse and Keep-Alive

Establishing a new HTTPS connection to Bedrock costs 30-50ms (DNS + TCP + TLS). MangaAssist reuses connections across requests within each ECS task.

Key configuration: - HTTP keep-alive enabled on the Bedrock client - Connection idle timeout set to 60 seconds (matches Bedrock's server-side timeout) - Maximum connections per ECS task: 10 (matches expected concurrency per container)

Pre-Warming ECS Tasks

Cold ECS tasks incur ~200ms penalty for the first Bedrock call (connection establishment + SDK initialization). MangaAssist pre-warms by:

  1. Startup probe — on ECS task start, send a minimal Bedrock invoke (Haiku, 10-token prompt)
  2. Connection pool initialization — establish 3 connections during health check window
  3. DNS pre-resolution — resolve Bedrock endpoint DNS during container init

Async Invocation Patterns

Not all work in the MangaAssist pipeline needs to be synchronous:

sequenceDiagram
    participant User
    participant ECS as ECS Orchestrator
    participant Cache as Redis Cache
    participant OS as OpenSearch
    participant Bedrock as Bedrock Claude 3
    participant DDB as DynamoDB
    participant Guard as Guardrails

    User->>ECS: User query

    par Parallel Data Retrieval
        ECS->>Cache: Check semantic cache
        ECS->>OS: Vector search (async)
        ECS->>DDB: Load session (async)
    end

    Cache-->>ECS: Cache miss
    OS-->>ECS: Top 5 products
    DDB-->>ECS: Session context

    ECS->>Bedrock: Invoke model (streaming)

    par Async Post-Processing
        Bedrock-->>ECS: Stream tokens
        ECS-->>User: Stream to client
        ECS->>Guard: Guardrails check (async, on complete response)
        ECS->>Cache: Cache result (fire-and-forget)
        ECS->>DDB: Update session (fire-and-forget)
    end

    Guard-->>ECS: Validation result (logged, not blocking stream)

Latency saved: Cache writes and session updates are fire-and-forget, saving 8-15ms from the critical path.


4. Service Communication Patterns

gRPC Between Internal Services

For internal MangaAssist microservices (e.g., orchestrator to intent classifier, orchestrator to product enrichment), gRPC provides:

Aspect REST/JSON gRPC/Protobuf Improvement
Serialization time 2.1ms 0.3ms 7x faster
Payload size 1,200 bytes 340 bytes 3.5x smaller
Connection overhead New TCP per request Multiplexed HTTP/2 Persistent
Latency (p50) 8ms 2ms 4x faster

Connection Pooling

MangaAssist maintains connection pools for each downstream service:

Service Pool Size per ECS Task Max Idle Keep-Alive
Bedrock 10 60s Yes
OpenSearch 20 120s Yes
DynamoDB 25 300s Yes
Redis 50 600s Yes

Circuit Breakers

Each downstream service has a circuit breaker to prevent cascade failures:

stateDiagram-v2
    [*] --> Closed: Normal operation
    Closed --> Open: 5 failures in 30s
    Open --> HalfOpen: After 15s cooldown
    HalfOpen --> Closed: 3 consecutive successes
    HalfOpen --> Open: Any failure

    Closed: All requests pass through
    Open: Requests fail fast (< 1ms)\nReturn cached/fallback response
    HalfOpen: Limited requests (20%) pass through\nTest if service recovered

MangaAssist circuit breaker settings:

Service Failure Threshold Cooldown Recovery Probes Fallback
Bedrock Sonnet 5 in 30s 15s 3 successes Route to Haiku
OpenSearch 3 in 20s 10s 2 successes Return cached results
DynamoDB 5 in 30s 10s 3 successes Return minimal session
Redis 3 in 10s 5s 2 successes Skip cache (passthrough)

Bulkhead Isolation

MangaAssist isolates workloads to prevent one slow service from consuming all resources:

graph TB
    subgraph "ECS Task Thread Pool (100 threads)"
        subgraph "Bedrock Bulkhead (40 threads)"
            B1[Bedrock Call 1]
            B2[Bedrock Call 2]
            BN[Bedrock Call N]
        end
        subgraph "OpenSearch Bulkhead (25 threads)"
            O1[OS Query 1]
            O2[OS Query 2]
        end
        subgraph "DynamoDB Bulkhead (20 threads)"
            D1[DDB Read 1]
            D2[DDB Write 1]
        end
        subgraph "Redis Bulkhead (15 threads)"
            R1[Cache Read 1]
            R2[Cache Write 1]
        end
    end

    style B1 fill:#ff9900,color:#000
    style B2 fill:#ff9900,color:#000
    style BN fill:#ff9900,color:#000
    style O1 fill:#3498db,color:#fff
    style O2 fill:#3498db,color:#fff
    style D1 fill:#2ecc71,color:#000
    style D2 fill:#2ecc71,color:#000
    style R1 fill:#e74c3c,color:#fff
    style R2 fill:#e74c3c,color:#fff

If Bedrock becomes slow, only the Bedrock bulkhead's 40 threads are blocked. OpenSearch, DynamoDB, and Redis bulkheads continue operating normally, ensuring cache hits and simple queries still return fast.


5. End-to-End Request Profiling

X-Ray Segments for the Full MangaAssist Pipeline

Each request generates a trace with the following segment hierarchy:

Trace: manga-assist-request (e2e)
  |-- Segment: api-gateway-websocket (8-25ms)
  |-- Segment: ecs-orchestrator (2-5ms overhead)
  |   |-- Subsegment: intent-classification (1-3ms)
  |   |-- Subsegment: request-validation (< 1ms)
  |-- Segment: cache-lookup (1-3ms)
  |-- Segment: parallel-data-retrieval
  |   |-- Subsegment: opensearch-vector-search (60-150ms)
  |   |-- Subsegment: dynamodb-session-load (4-12ms)
  |-- Segment: prompt-assembly (2-5ms)
  |-- Segment: bedrock-invoke (200-800ms)
  |   |-- Subsegment: request-serialization (< 1ms)
  |   |-- Subsegment: network-transit (5-15ms)
  |   |-- Subsegment: model-inference (190-780ms)
  |   |-- Subsegment: response-deserialization (< 1ms)
  |-- Segment: guardrails-check (15-40ms)
  |-- Segment: response-streaming (5-15ms)
  |-- Segment: async-postprocessing (fire-and-forget)
      |-- Subsegment: cache-write (2-5ms)
      |-- Subsegment: session-update (3-8ms)
      |-- Subsegment: analytics-emit (1-2ms)

Automated Bottleneck Detection

MangaAssist runs an automated analysis every 5 minutes that:

  1. Queries X-Ray for the last 5 minutes of traces
  2. Identifies the segment with the highest contribution to p95 latency
  3. Compares against the latency budget allocation
  4. Emits a CloudWatch alarm if any segment exceeds its budget by > 20%

6. Network Optimization — VPC Endpoints

Why VPC Endpoints Matter

Without VPC endpoints, traffic from ECS Fargate to AWS services traverses:

ECS Task → NAT Gateway → Internet Gateway → AWS Service Endpoint

This adds 10-20ms per call due to NAT Gateway processing. With VPC endpoints:

ECS Task → VPC Endpoint (private link) → AWS Service Endpoint

MangaAssist VPC Endpoint Configuration

Service Endpoint Type Latency Before (via NAT) Latency After (VPC Endpoint) Savings
Bedrock Interface 45ms 30ms 15ms
DynamoDB Gateway 18ms 8ms 10ms
OpenSearch Serverless Interface 72ms 60ms 12ms
S3 (model artifacts) Gateway 15ms 8ms 7ms
CloudWatch (telemetry) Interface 12ms 6ms 6ms

Total savings per request (hitting all services): ~50ms reduction — significant when the p95 budget is 2,000ms.

Cost Consideration

VPC endpoints have an hourly cost (~$0.01/hr per endpoint per AZ). For MangaAssist at scale, the cost is negligible compared to the latency improvement and the NAT Gateway data processing charges eliminated ($0.045/GB).


Python Code — RequestProfiler with X-Ray Integration

"""
MangaAssist Request Profiler — X-Ray Integration
Profiles every segment of the MangaAssist request pipeline and
detects bottlenecks against the latency budget.
"""

import time
import json
import logging
import statistics
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum

import boto3
from aws_xray_sdk.core import xray_recorder, patch_all
from aws_xray_sdk.core.models.subsegment import Subsegment

logger = logging.getLogger(__name__)

# Patch boto3 and other libraries for automatic X-Ray tracing
patch_all()


class ServiceSegment(str, Enum):
    """Named segments in the MangaAssist request pipeline."""
    API_GATEWAY = "api-gateway"
    ECS_ORCHESTRATOR = "ecs-orchestrator"
    CACHE_LOOKUP = "cache-lookup"
    VECTOR_SEARCH = "vector-search"
    DYNAMO_SESSION = "dynamo-session"
    PROMPT_ASSEMBLY = "prompt-assembly"
    BEDROCK_INVOKE = "bedrock-invoke"
    GUARDRAILS_CHECK = "guardrails-check"
    RESPONSE_STREAM = "response-stream"
    CACHE_WRITE = "cache-write-async"
    SESSION_UPDATE = "session-update-async"


@dataclass
class LatencyBudget:
    """Defines the p95 latency budget for each segment (in ms)."""
    segment: ServiceSegment
    budget_ms: float
    alert_threshold_pct: float = 20.0  # alert if exceeded by this %


# MangaAssist latency budget allocation
LATENCY_BUDGETS = {
    ServiceSegment.API_GATEWAY: LatencyBudget(ServiceSegment.API_GATEWAY, 25),
    ServiceSegment.ECS_ORCHESTRATOR: LatencyBudget(ServiceSegment.ECS_ORCHESTRATOR, 15),
    ServiceSegment.CACHE_LOOKUP: LatencyBudget(ServiceSegment.CACHE_LOOKUP, 3),
    ServiceSegment.VECTOR_SEARCH: LatencyBudget(ServiceSegment.VECTOR_SEARCH, 150),
    ServiceSegment.DYNAMO_SESSION: LatencyBudget(ServiceSegment.DYNAMO_SESSION, 12),
    ServiceSegment.PROMPT_ASSEMBLY: LatencyBudget(ServiceSegment.PROMPT_ASSEMBLY, 5),
    ServiceSegment.BEDROCK_INVOKE: LatencyBudget(ServiceSegment.BEDROCK_INVOKE, 800),
    ServiceSegment.GUARDRAILS_CHECK: LatencyBudget(ServiceSegment.GUARDRAILS_CHECK, 40),
    ServiceSegment.RESPONSE_STREAM: LatencyBudget(ServiceSegment.RESPONSE_STREAM, 15),
}


@dataclass
class SegmentTiming:
    """Recorded timing for a single segment execution."""
    segment: ServiceSegment
    start_ms: float
    end_ms: float
    metadata: dict = field(default_factory=dict)

    @property
    def duration_ms(self) -> float:
        return self.end_ms - self.start_ms


@dataclass
class RequestProfile:
    """Complete profile of a single MangaAssist request."""
    request_id: str
    intent: str
    model_used: str
    cache_hit: bool
    timings: list[SegmentTiming] = field(default_factory=list)
    total_latency_ms: float = 0.0
    bottleneck_segment: Optional[str] = None

    def identify_bottleneck(self) -> SegmentTiming:
        """Find the segment that consumed the most time."""
        if not self.timings:
            return None
        bottleneck = max(self.timings, key=lambda t: t.duration_ms)
        self.bottleneck_segment = bottleneck.segment.value
        return bottleneck


class RequestProfiler:
    """
    Profiles MangaAssist requests with X-Ray integration.

    Usage:
        profiler = RequestProfiler()

        with profiler.trace_request(request_id, intent, model) as profile:
            with profiler.trace_segment(ServiceSegment.CACHE_LOOKUP):
                result = redis_client.get(cache_key)

            with profiler.trace_segment(ServiceSegment.VECTOR_SEARCH, {"k": 5}):
                docs = opensearch_client.search(query)

            with profiler.trace_segment(ServiceSegment.BEDROCK_INVOKE, {"model": "sonnet"}):
                response = bedrock_client.invoke_model(body)

        # profile now contains all timings and bottleneck analysis
    """

    def __init__(self, cloudwatch_namespace: str = "MangaAssist/Performance"):
        self.cw_client = boto3.client("cloudwatch")
        self.cw_namespace = cloudwatch_namespace
        self._current_profile: Optional[RequestProfile] = None
        self._profiles_buffer: list[RequestProfile] = []
        self._buffer_limit = 100

    class trace_request:
        """Context manager for tracing a full request lifecycle."""

        def __init__(self, profiler: "RequestProfiler", request_id: str,
                     intent: str, model: str):
            self.profiler = profiler
            self.request_id = request_id
            self.intent = intent
            self.model = model
            self.start_time = None

        def __enter__(self) -> RequestProfile:
            self.start_time = time.monotonic() * 1000
            self.profile = RequestProfile(
                request_id=self.request_id,
                intent=self.intent,
                model_used=self.model,
                cache_hit=False,
            )
            self.profiler._current_profile = self.profile

            # Begin X-Ray segment
            xray_recorder.begin_segment(
                name="manga-assist-request",
                traceid=None,  # auto-generated
            )
            xray_recorder.current_segment().put_annotation("intent", self.intent)
            xray_recorder.current_segment().put_annotation("model", self.model)
            xray_recorder.current_segment().put_annotation("request_id", self.request_id)

            return self.profile

        def __exit__(self, exc_type, exc_val, exc_tb):
            end_time = time.monotonic() * 1000
            self.profile.total_latency_ms = end_time - self.start_time
            self.profile.identify_bottleneck()

            # Close X-Ray segment
            xray_recorder.current_segment().put_metadata(
                "total_latency_ms", self.profile.total_latency_ms
            )
            xray_recorder.current_segment().put_metadata(
                "bottleneck", self.profile.bottleneck_segment
            )
            xray_recorder.end_segment()

            # Emit metrics and buffer the profile
            self.profiler._emit_metrics(self.profile)
            self.profiler._buffer_profile(self.profile)
            self.profiler._current_profile = None

            return False  # do not suppress exceptions

    class trace_segment:
        """Context manager for tracing a single service segment."""

        def __init__(self, profiler: "RequestProfiler",
                     segment: ServiceSegment, metadata: dict = None):
            self.profiler = profiler
            self.segment = segment
            self.metadata = metadata or {}

        def __enter__(self):
            self.start_ms = time.monotonic() * 1000
            xray_recorder.begin_subsegment(self.segment.value)
            return self

        def __exit__(self, exc_type, exc_val, exc_tb):
            end_ms = time.monotonic() * 1000
            timing = SegmentTiming(
                segment=self.segment,
                start_ms=self.start_ms,
                end_ms=end_ms,
                metadata=self.metadata,
            )

            subsegment = xray_recorder.current_subsegment()
            if subsegment:
                for k, v in self.metadata.items():
                    subsegment.put_metadata(k, v)
                if exc_type:
                    subsegment.add_exception(exc_val, [])

            xray_recorder.end_subsegment()

            if self.profiler._current_profile:
                self.profiler._current_profile.timings.append(timing)

            # Check budget violation
            budget = LATENCY_BUDGETS.get(self.segment)
            if budget and timing.duration_ms > budget.budget_ms * (1 + budget.alert_threshold_pct / 100):
                logger.warning(
                    "Segment %s exceeded budget: %.1fms (budget: %.1fms, threshold: %.0f%%)",
                    self.segment.value, timing.duration_ms,
                    budget.budget_ms, budget.alert_threshold_pct,
                )

            return False

    def _emit_metrics(self, profile: RequestProfile):
        """Emit per-segment latency metrics to CloudWatch."""
        metric_data = []

        # Total latency
        metric_data.append({
            "MetricName": "RequestLatency",
            "Value": profile.total_latency_ms,
            "Unit": "Milliseconds",
            "Dimensions": [
                {"Name": "Intent", "Value": profile.intent},
                {"Name": "Model", "Value": profile.model_used},
            ],
        })

        # Per-segment latency
        for timing in profile.timings:
            metric_data.append({
                "MetricName": f"SegmentLatency_{timing.segment.value}",
                "Value": timing.duration_ms,
                "Unit": "Milliseconds",
                "Dimensions": [
                    {"Name": "Segment", "Value": timing.segment.value},
                ],
            })

        # Bottleneck indicator
        if profile.bottleneck_segment:
            metric_data.append({
                "MetricName": "BottleneckSegment",
                "Value": 1,
                "Unit": "Count",
                "Dimensions": [
                    {"Name": "Segment", "Value": profile.bottleneck_segment},
                ],
            })

        try:
            self.cw_client.put_metric_data(
                Namespace=self.cw_namespace,
                MetricData=metric_data,
            )
        except Exception as e:
            logger.error("Failed to emit metrics: %s", e)

    def _buffer_profile(self, profile: RequestProfile):
        """Buffer profiles for batch analysis."""
        self._profiles_buffer.append(profile)
        if len(self._profiles_buffer) >= self._buffer_limit:
            self._analyze_batch()

    def _analyze_batch(self):
        """Analyze buffered profiles for systemic issues."""
        if not self._profiles_buffer:
            return

        # Per-segment p95 analysis
        segment_latencies: dict[str, list[float]] = {}
        for profile in self._profiles_buffer:
            for timing in profile.timings:
                segment_latencies.setdefault(timing.segment.value, []).append(
                    timing.duration_ms
                )

        budget_violations = []
        for segment_name, latencies in segment_latencies.items():
            sorted_latencies = sorted(latencies)
            p95_idx = int(len(sorted_latencies) * 0.95)
            p95 = sorted_latencies[min(p95_idx, len(sorted_latencies) - 1)]

            try:
                segment_enum = ServiceSegment(segment_name)
                budget = LATENCY_BUDGETS.get(segment_enum)
                if budget and p95 > budget.budget_ms:
                    budget_violations.append({
                        "segment": segment_name,
                        "p95_ms": round(p95, 1),
                        "budget_ms": budget.budget_ms,
                        "overage_pct": round((p95 / budget.budget_ms - 1) * 100, 1),
                    })
            except ValueError:
                pass

        if budget_violations:
            logger.warning(
                "Budget violations detected in last %d requests: %s",
                len(self._profiles_buffer),
                json.dumps(budget_violations, indent=2),
            )

        self._profiles_buffer.clear()


# --- Usage Example ---
def handle_manga_assist_request(request_id: str, user_query: str):
    """Example: Profile a MangaAssist request end-to-end."""
    profiler = RequestProfiler()

    # Determine intent and model (simplified)
    intent = "product_recommendation"
    model = "claude-3-sonnet"

    ctx = profiler.trace_request(profiler, request_id, intent, model)
    with ctx as profile:
        # Cache check
        with profiler.trace_segment(profiler, ServiceSegment.CACHE_LOOKUP):
            cache_result = None  # redis_client.get(cache_key)

        if cache_result:
            profile.cache_hit = True
            return cache_result

        # Parallel data retrieval (simplified as sequential here)
        with profiler.trace_segment(profiler, ServiceSegment.VECTOR_SEARCH, {"k": 5}):
            search_results = []  # opensearch_client.search(query)

        with profiler.trace_segment(profiler, ServiceSegment.DYNAMO_SESSION):
            session_data = {}  # dynamodb_table.get_item(Key={"session_id": sid})

        # Bedrock invocation
        with profiler.trace_segment(profiler, ServiceSegment.BEDROCK_INVOKE, {"model": model}):
            response = ""  # bedrock_client.invoke_model(body=prompt)

        # Guardrails
        with profiler.trace_segment(profiler, ServiceSegment.GUARDRAILS_CHECK):
            validated = True  # guardrails_client.apply_guardrail(response)

        return response

Python Code — ConnectionPoolManager

"""
MangaAssist Connection Pool Manager
Manages persistent, pre-warmed connection pools for all downstream services.
Eliminates per-request connection overhead (30-50ms per cold connection).
"""

import time
import logging
import threading
from dataclasses import dataclass
from typing import Optional
from enum import Enum
from contextlib import contextmanager

import boto3
from botocore.config import Config as BotoConfig
import redis
from opensearchpy import OpenSearch, RequestsHttpConnection

logger = logging.getLogger(__name__)


class DownstreamService(str, Enum):
    """Downstream services in the MangaAssist architecture."""
    BEDROCK = "bedrock-runtime"
    OPENSEARCH = "opensearch"
    DYNAMODB = "dynamodb"
    REDIS = "redis"


@dataclass
class PoolConfig:
    """Configuration for a single connection pool."""
    service: DownstreamService
    max_connections: int
    max_idle_seconds: int
    keep_alive: bool
    connect_timeout_seconds: float
    read_timeout_seconds: float
    retry_max_attempts: int = 3
    retry_mode: str = "adaptive"


# MangaAssist pool configurations
POOL_CONFIGS = {
    DownstreamService.BEDROCK: PoolConfig(
        service=DownstreamService.BEDROCK,
        max_connections=10,
        max_idle_seconds=60,
        keep_alive=True,
        connect_timeout_seconds=2.0,
        read_timeout_seconds=30.0,  # Sonnet can take up to 30s for long responses
        retry_max_attempts=3,
        retry_mode="adaptive",
    ),
    DownstreamService.OPENSEARCH: PoolConfig(
        service=DownstreamService.OPENSEARCH,
        max_connections=20,
        max_idle_seconds=120,
        keep_alive=True,
        connect_timeout_seconds=2.0,
        read_timeout_seconds=5.0,
        retry_max_attempts=2,
    ),
    DownstreamService.DYNAMODB: PoolConfig(
        service=DownstreamService.DYNAMODB,
        max_connections=25,
        max_idle_seconds=300,
        keep_alive=True,
        connect_timeout_seconds=1.0,
        read_timeout_seconds=3.0,
        retry_max_attempts=3,
        retry_mode="adaptive",
    ),
    DownstreamService.REDIS: PoolConfig(
        service=DownstreamService.REDIS,
        max_connections=50,
        max_idle_seconds=600,
        keep_alive=True,
        connect_timeout_seconds=1.0,
        read_timeout_seconds=2.0,
    ),
}


@dataclass
class PoolHealth:
    """Health status of a connection pool."""
    service: str
    active_connections: int
    idle_connections: int
    max_connections: int
    total_requests: int
    total_errors: int
    avg_checkout_ms: float
    last_health_check: float


class ConnectionPoolManager:
    """
    Centralized connection pool manager for all MangaAssist downstream services.

    Features:
    - Pre-warms connections on ECS task startup
    - Monitors pool health and emits CloudWatch metrics
    - Graceful degradation when pools are exhausted
    - Thread-safe connection checkout/checkin

    Usage:
        pool_mgr = ConnectionPoolManager()
        pool_mgr.initialize()  # Call during ECS task startup

        bedrock_client = pool_mgr.get_bedrock_client()
        opensearch_client = pool_mgr.get_opensearch_client()
        dynamodb_table = pool_mgr.get_dynamodb_table("MangaAssist-Sessions")
        redis_client = pool_mgr.get_redis_client()

        # Periodic health check
        health = pool_mgr.get_pool_health()
    """

    def __init__(self, region: str = "ap-northeast-1"):
        self.region = region
        self._bedrock_client = None
        self._opensearch_client = None
        self._dynamodb_resource = None
        self._redis_pool = None
        self._redis_client = None
        self._initialized = False
        self._lock = threading.Lock()
        self._stats = {svc: {"requests": 0, "errors": 0, "checkout_times": []}
                       for svc in DownstreamService}

    def initialize(self):
        """
        Initialize all connection pools. Call once during ECS task startup.
        This pre-warms connections to avoid cold-start latency on first request.
        """
        if self._initialized:
            return

        with self._lock:
            if self._initialized:
                return

            logger.info("Initializing MangaAssist connection pools...")
            start = time.monotonic()

            self._init_bedrock()
            self._init_opensearch()
            self._init_dynamodb()
            self._init_redis()

            elapsed = (time.monotonic() - start) * 1000
            logger.info("All connection pools initialized in %.1fms", elapsed)
            self._initialized = True

    def _init_bedrock(self):
        """Initialize Bedrock client with connection pooling."""
        config = POOL_CONFIGS[DownstreamService.BEDROCK]
        boto_config = BotoConfig(
            region_name=self.region,
            retries={"max_attempts": config.retry_max_attempts, "mode": config.retry_mode},
            max_pool_connections=config.max_connections,
            connect_timeout=config.connect_timeout_seconds,
            read_timeout=config.read_timeout_seconds,
        )
        self._bedrock_client = boto3.client(
            "bedrock-runtime",
            config=boto_config,
        )

        # Pre-warm: send a minimal request to establish connections
        try:
            self._bedrock_client.invoke_model(
                modelId="anthropic.claude-3-haiku-20240307-v1:0",
                contentType="application/json",
                accept="application/json",
                body='{"anthropic_version":"bedrock-2023-05-31","max_tokens":1,"messages":[{"role":"user","content":"hi"}]}',
            )
            logger.info("Bedrock connection pre-warmed successfully")
        except Exception as e:
            logger.warning("Bedrock pre-warm failed (non-critical): %s", e)

    def _init_opensearch(self):
        """Initialize OpenSearch client with connection pooling."""
        config = POOL_CONFIGS[DownstreamService.OPENSEARCH]
        self._opensearch_client = OpenSearch(
            hosts=[{"host": "manga-assist-collection.ap-northeast-1.aoss.amazonaws.com",
                    "port": 443}],
            http_auth=None,  # Uses IAM via SigV4
            use_ssl=True,
            verify_certs=True,
            connection_class=RequestsHttpConnection,
            pool_maxsize=config.max_connections,
            timeout=config.read_timeout_seconds,
        )
        logger.info("OpenSearch connection pool initialized (max=%d)", config.max_connections)

    def _init_dynamodb(self):
        """Initialize DynamoDB resource with connection pooling."""
        config = POOL_CONFIGS[DownstreamService.DYNAMODB]
        boto_config = BotoConfig(
            region_name=self.region,
            retries={"max_attempts": config.retry_max_attempts, "mode": config.retry_mode},
            max_pool_connections=config.max_connections,
            connect_timeout=config.connect_timeout_seconds,
            read_timeout=config.read_timeout_seconds,
        )
        self._dynamodb_resource = boto3.resource("dynamodb", config=boto_config)
        logger.info("DynamoDB connection pool initialized (max=%d)", config.max_connections)

    def _init_redis(self):
        """Initialize Redis connection pool."""
        config = POOL_CONFIGS[DownstreamService.REDIS]
        self._redis_pool = redis.ConnectionPool(
            host="manga-assist-cache.xxxx.apne1.cache.amazonaws.com",
            port=6379,
            db=0,
            max_connections=config.max_connections,
            socket_timeout=config.read_timeout_seconds,
            socket_connect_timeout=config.connect_timeout_seconds,
            socket_keepalive=config.keep_alive,
            retry_on_timeout=True,
        )
        self._redis_client = redis.Redis(connection_pool=self._redis_pool)

        # Pre-warm: ping to establish initial connections
        try:
            self._redis_client.ping()
            logger.info("Redis connection pre-warmed successfully")
        except Exception as e:
            logger.warning("Redis pre-warm failed (non-critical): %s", e)

    def get_bedrock_client(self):
        """Return the pooled Bedrock runtime client."""
        self._ensure_initialized()
        self._stats[DownstreamService.BEDROCK]["requests"] += 1
        return self._bedrock_client

    def get_opensearch_client(self) -> OpenSearch:
        """Return the pooled OpenSearch client."""
        self._ensure_initialized()
        self._stats[DownstreamService.OPENSEARCH]["requests"] += 1
        return self._opensearch_client

    def get_dynamodb_table(self, table_name: str):
        """Return a DynamoDB Table resource using the pooled connection."""
        self._ensure_initialized()
        self._stats[DownstreamService.DYNAMODB]["requests"] += 1
        return self._dynamodb_resource.Table(table_name)

    def get_redis_client(self) -> redis.Redis:
        """Return the pooled Redis client."""
        self._ensure_initialized()
        self._stats[DownstreamService.REDIS]["requests"] += 1
        return self._redis_client

    def get_pool_health(self) -> list[PoolHealth]:
        """Return health status for all connection pools."""
        health_reports = []

        # Redis pool has introspection
        if self._redis_pool:
            pool_info = self._redis_pool.connection_kwargs
            health_reports.append(PoolHealth(
                service=DownstreamService.REDIS.value,
                active_connections=len(self._redis_pool._in_use_connections)
                if hasattr(self._redis_pool, "_in_use_connections") else -1,
                idle_connections=self._redis_pool.pool.qsize()
                if hasattr(self._redis_pool, "pool") else -1,
                max_connections=POOL_CONFIGS[DownstreamService.REDIS].max_connections,
                total_requests=self._stats[DownstreamService.REDIS]["requests"],
                total_errors=self._stats[DownstreamService.REDIS]["errors"],
                avg_checkout_ms=0.0,
                last_health_check=time.time(),
            ))

        # For boto3 clients, pool introspection is limited
        for svc in [DownstreamService.BEDROCK, DownstreamService.OPENSEARCH,
                     DownstreamService.DYNAMODB]:
            health_reports.append(PoolHealth(
                service=svc.value,
                active_connections=-1,  # boto3 does not expose this
                idle_connections=-1,
                max_connections=POOL_CONFIGS[svc].max_connections,
                total_requests=self._stats[svc]["requests"],
                total_errors=self._stats[svc]["errors"],
                avg_checkout_ms=0.0,
                last_health_check=time.time(),
            ))

        return health_reports

    def _ensure_initialized(self):
        if not self._initialized:
            raise RuntimeError(
                "ConnectionPoolManager not initialized. Call initialize() first."
            )

    def shutdown(self):
        """Gracefully close all connection pools."""
        logger.info("Shutting down connection pools...")
        if self._redis_pool:
            self._redis_pool.disconnect()
        self._initialized = False
        logger.info("All connection pools closed.")

Optimization Technique Comparison

Technique Latency Saved (p95) Implementation Effort Risk Notes
VPC endpoints for Bedrock 15ms Low (Terraform/CDK) Very Low One-time infra change
VPC endpoints for DynamoDB 10ms Low (Terraform/CDK) Very Low Gateway endpoint — free
VPC endpoints for OpenSearch 12ms Low (Terraform/CDK) Very Low Interface endpoint
Connection pool pre-warming 30-50ms (first req) Medium (app code) Low Reduces cold start
HTTP keep-alive to Bedrock 15-25ms per call Low (SDK config) Very Low Default in latest SDK
OpenSearch filter optimization 65ms Medium (query tuning) Low Requires index analysis
OpenSearch source field selection 20ms Low (query change) Very Low Immediate improvement
Async cache writes 8-15ms Medium (app refactor) Medium Must handle write failures
Async session updates 5-10ms Medium (app refactor) Medium Eventual consistency ok
Circuit breakers 0ms (normal) / prevents cascade Medium (library) Low Improves resilience
Bulkhead isolation 0ms (normal) / prevents starvation Medium (threading) Medium Requires load testing
gRPC for internal services 6ms per internal call High (protocol change) Medium Only if multiple internal hops
DNS resolution caching 5-10ms per call Low (OS/container config) Very Low Reduces DNS lookups
Response streaming -200ms (perceived) Medium (API change) Low First token earlier

Key Takeaways

  1. Bedrock invocation dominates latency — at 200-800ms p95, it consumes 60-75% of the total latency budget. Focus optimization on reducing everything else so the budget absorbs Bedrock variability.

  2. VPC endpoints are the easiest win — 37ms saved with near-zero risk. Deploy them before touching application code.

  3. Connection pooling and pre-warming eliminate cold-start spikes — the first request on a new ECS task can be 200ms slower without pre-warming.

  4. OpenSearch query optimization has outsized impact — filtering before vector search and returning only needed fields can cut vector retrieval time in half.

  5. Async patterns remove non-critical work from the hot path — cache writes, session updates, and analytics emissions should never block response delivery.

  6. Profile before you optimize — X-Ray traces reveal the actual bottleneck, which is often not where you expect. Invest in profiling infrastructure first.