FM System Performance Architecture
MangaAssist is a JP Manga store chatbot running on AWS. The stack includes Bedrock Claude 3 (Sonnet for complex queries, Haiku for simple), OpenSearch Serverless for vector retrieval, DynamoDB for sessions/products/orders, ECS Fargate for orchestration, API Gateway WebSocket for real-time communication, and ElastiCache Redis for semantic caching. The p95 end-to-end latency target is < 2 seconds.
Skill Mapping
| AWS AIP-C01 Skill | Sub-Skill | This File Covers |
|---|---|---|
| 4.2.6 FM System Performance | API Call Profiling | X-Ray trace analysis, per-service latency breakdown, bottleneck detection |
| 4.2.6 FM System Performance | Vector DB Query Optimization | OpenSearch query tuning, filter optimization, source field selection |
| 4.2.6 FM System Performance | LLM Inference Latency Reduction | Connection reuse, keep-alive, pre-warming, async invocation |
| 4.2.6 FM System Performance | Service Communication Patterns | gRPC, connection pooling, circuit breakers, bulkhead isolation |
| 4.2.6 FM System Performance | Network Optimization | VPC endpoints for Bedrock/DynamoDB/OpenSearch, NAT gateway elimination |
Mind Map — System Performance Dimensions
mindmap
root((4.2.6<br/>System<br/>Performance))
API Call Profiling
X-Ray Trace Analysis
Per-Service Latency Breakdown
Prompt-Completion Pattern Profiling
Slow Segment Identification
Automated Bottleneck Detection
Vector DB Query Optimization
OpenSearch Query Tuning
Filter Optimization
Source Field Selection
Shard Routing
Query Caching
LLM Inference Latency
Connection Reuse & Keep-Alive
Pre-Warming ECS Tasks
Async Invocation Patterns
Payload Minimization
Streaming First Token
Service Communication
gRPC Between Internal Services
Connection Pooling
Circuit Breakers
Bulkhead Isolation
Retry with Jitter
Network Optimization
VPC Endpoints for Bedrock
VPC Endpoints for DynamoDB
VPC Endpoints for OpenSearch
NAT Gateway Elimination
DNS Resolution Caching
End-to-End Profiling
Request Lifecycle Tracking
Latency Budget Allocation
SLI/SLO Alignment
Performance Regression Detection
Architecture — Profiling Points Across the MangaAssist Request Path
Every hop in the MangaAssist pipeline is a profiling opportunity. The diagram below marks each X-Ray segment and the corresponding optimization lever.
graph TB
subgraph "Client Layer"
CLIENT[Customer Browser<br/>WebSocket Client]
end
subgraph "Edge Layer"
APIGW["API Gateway WebSocket<br/>──────────<br/>X-Ray Segment: api-gateway<br/>Target: < 25ms"]
end
subgraph "Orchestration Layer"
ECS["ECS Fargate Orchestrator<br/>──────────<br/>X-Ray Segment: ecs-orchestrator<br/>Target: < 15ms overhead"]
ROUTER["Intent Router<br/>──────────<br/>X-Ray Subsegment: intent-routing<br/>Target: < 5ms"]
end
subgraph "Data Layer"
CACHE["ElastiCache Redis<br/>──────────<br/>X-Ray Segment: cache-lookup<br/>Target: < 3ms"]
DYNAMO["DynamoDB<br/>──────────<br/>X-Ray Segment: dynamo-session<br/>Target: < 8ms"]
OPENSEARCH["OpenSearch Serverless<br/>──────────<br/>X-Ray Segment: vector-search<br/>Target: < 150ms"]
end
subgraph "FM Layer"
BEDROCK["Bedrock Claude 3<br/>──────────<br/>X-Ray Segment: bedrock-invoke<br/>Target: < 800ms (Sonnet) / < 300ms (Haiku)"]
GUARD["Bedrock Guardrails<br/>──────────<br/>X-Ray Segment: guardrails-check<br/>Target: < 40ms"]
end
subgraph "Network Layer"
VPCE_BEDROCK["VPC Endpoint<br/>Bedrock<br/>Saves ~15ms vs NAT"]
VPCE_DYNAMO["VPC Endpoint<br/>DynamoDB<br/>Saves ~10ms vs NAT"]
VPCE_OS["VPC Endpoint<br/>OpenSearch<br/>Saves ~12ms vs NAT"]
end
CLIENT -->|"WSS"| APIGW
APIGW --> ECS
ECS --> ROUTER
ROUTER --> CACHE
CACHE -->|"miss"| DYNAMO
CACHE -->|"miss"| OPENSEARCH
ROUTER --> OPENSEARCH
OPENSEARCH --> ECS
DYNAMO --> ECS
ECS --> BEDROCK
BEDROCK --> GUARD
GUARD --> ECS
ECS -->|"stream"| APIGW
APIGW -->|"stream"| CLIENT
ECS -.->|"private link"| VPCE_BEDROCK -.-> BEDROCK
ECS -.->|"private link"| VPCE_DYNAMO -.-> DYNAMO
ECS -.->|"private link"| VPCE_OS -.-> OPENSEARCH
style BEDROCK fill:#ff9900,color:#000
style OPENSEARCH fill:#ff9900,color:#000
style DYNAMO fill:#ff9900,color:#000
style CACHE fill:#2ecc71,color:#000
style ECS fill:#3498db,color:#fff
style APIGW fill:#3498db,color:#fff
style GUARD fill:#e74c3c,color:#fff
style VPCE_BEDROCK fill:#9b59b6,color:#fff
style VPCE_DYNAMO fill:#9b59b6,color:#fff
style VPCE_OS fill:#9b59b6,color:#fff
1. API Call Profiling for Prompt-Completion Patterns
X-Ray Trace Analysis
Every MangaAssist request generates an X-Ray trace that spans the full pipeline: API Gateway to ECS to data stores to Bedrock and back. Profiling focuses on three dimensions:
- Segment timing — how long each service hop takes
- Subsegment detail — within Bedrock, separate prompt tokenization from generation
- Annotation correlation — tag traces with intent type, model used, cache hit/miss
Identifying Slow Segments
The most common slow segments in MangaAssist:
| Segment | Typical p50 | Typical p95 | Root Cause When Slow |
|---|---|---|---|
bedrock-invoke |
350ms | 800ms | Large prompt, Sonnet for simple queries, throttling |
vector-search |
60ms | 150ms | Unfiltered queries, returning too many fields, cold OCU |
dynamo-session |
4ms | 12ms | Hot partition, large session item, eventually-consistent read |
cache-lookup |
1ms | 3ms | Network hop to different AZ, large serialized value |
guardrails-check |
15ms | 40ms | Long response text, complex policy evaluation |
api-gateway |
8ms | 25ms | Auth/rate limit overhead, WebSocket frame assembly |
Per-Service Latency Breakdown
gantt
title MangaAssist p95 Latency Breakdown — Before Optimization
dateFormat X
axisFormat %L ms
section Edge
API Gateway + Auth :0, 25
section Orchestration
ECS Routing + Session :25, 50
section Data Retrieval
Redis Cache Check :50, 53
DynamoDB Session Load :53, 65
OpenSearch Vector Search :65, 215
section FM Inference
Bedrock Prompt Assembly :215, 230
Bedrock Generation (p95) :230, 1030
section Post-Processing
Guardrails Evaluation :1030, 1070
Response Formatting :1070, 1085
section Delivery
WebSocket Stream Start :1085, 1100
Total p95: ~1,100ms (within the 2s target, but optimizable).
2. Vector Database Query Optimization
OpenSearch Query Tuning
MangaAssist queries OpenSearch Serverless for manga product embeddings. Three optimization axes:
a) Filter Optimization — Reduce Candidate Set Before Vector Comparison
{
"size": 5,
"query": {
"bool": {
"filter": [
{ "term": { "category": "manga" } },
{ "term": { "language": "ja" } },
{ "term": { "in_stock": true } },
{ "range": { "price": { "lte": 2000 } } }
],
"must": [
{
"knn": {
"embedding": {
"vector": [0.12, -0.34, 0.56],
"k": 5
}
}
}
]
}
}
}
Pre-filtering with bool.filter clauses reduces the vector search space from millions of documents to thousands, cutting search latency by 40-60%.
b) Source Field Selection — Return Only What the LLM Needs
{
"_source": {
"includes": ["title", "author", "genre", "price", "synopsis_short", "asin"]
}
}
MangaAssist product documents contain 25+ fields (full descriptions, images, reviews, metadata). The LLM context only needs 6 fields. Returning only those fields reduces payload by ~80% and cuts deserialization time.
c) Shard Routing — Direct Queries to the Right Shard
For category-specific queries (e.g., "recommend shonen manga"), use routing to target only shards containing that category:
{
"preference": "_shards:0,1",
"query": { ... }
}
OpenSearch Optimization Impact
| Technique | Before | After | Improvement |
|---|---|---|---|
| Pre-filtering | 150ms p95 | 85ms p95 | -43% |
| Source field selection | 85ms p95 | 65ms p95 | -24% |
| Query caching (repeated queries) | 65ms p95 | 8ms p95 | -88% |
| Combined | 150ms p95 | 65ms p95 (fresh) / 8ms (cached) | -57% to -95% |
3. LLM Inference Latency Reduction
Connection Reuse and Keep-Alive
Establishing a new HTTPS connection to Bedrock costs 30-50ms (DNS + TCP + TLS). MangaAssist reuses connections across requests within each ECS task.
Key configuration: - HTTP keep-alive enabled on the Bedrock client - Connection idle timeout set to 60 seconds (matches Bedrock's server-side timeout) - Maximum connections per ECS task: 10 (matches expected concurrency per container)
Pre-Warming ECS Tasks
Cold ECS tasks incur ~200ms penalty for the first Bedrock call (connection establishment + SDK initialization). MangaAssist pre-warms by:
- Startup probe — on ECS task start, send a minimal Bedrock invoke (Haiku, 10-token prompt)
- Connection pool initialization — establish 3 connections during health check window
- DNS pre-resolution — resolve Bedrock endpoint DNS during container init
Async Invocation Patterns
Not all work in the MangaAssist pipeline needs to be synchronous:
sequenceDiagram
participant User
participant ECS as ECS Orchestrator
participant Cache as Redis Cache
participant OS as OpenSearch
participant Bedrock as Bedrock Claude 3
participant DDB as DynamoDB
participant Guard as Guardrails
User->>ECS: User query
par Parallel Data Retrieval
ECS->>Cache: Check semantic cache
ECS->>OS: Vector search (async)
ECS->>DDB: Load session (async)
end
Cache-->>ECS: Cache miss
OS-->>ECS: Top 5 products
DDB-->>ECS: Session context
ECS->>Bedrock: Invoke model (streaming)
par Async Post-Processing
Bedrock-->>ECS: Stream tokens
ECS-->>User: Stream to client
ECS->>Guard: Guardrails check (async, on complete response)
ECS->>Cache: Cache result (fire-and-forget)
ECS->>DDB: Update session (fire-and-forget)
end
Guard-->>ECS: Validation result (logged, not blocking stream)
Latency saved: Cache writes and session updates are fire-and-forget, saving 8-15ms from the critical path.
4. Service Communication Patterns
gRPC Between Internal Services
For internal MangaAssist microservices (e.g., orchestrator to intent classifier, orchestrator to product enrichment), gRPC provides:
| Aspect | REST/JSON | gRPC/Protobuf | Improvement |
|---|---|---|---|
| Serialization time | 2.1ms | 0.3ms | 7x faster |
| Payload size | 1,200 bytes | 340 bytes | 3.5x smaller |
| Connection overhead | New TCP per request | Multiplexed HTTP/2 | Persistent |
| Latency (p50) | 8ms | 2ms | 4x faster |
Connection Pooling
MangaAssist maintains connection pools for each downstream service:
| Service | Pool Size per ECS Task | Max Idle | Keep-Alive |
|---|---|---|---|
| Bedrock | 10 | 60s | Yes |
| OpenSearch | 20 | 120s | Yes |
| DynamoDB | 25 | 300s | Yes |
| Redis | 50 | 600s | Yes |
Circuit Breakers
Each downstream service has a circuit breaker to prevent cascade failures:
stateDiagram-v2
[*] --> Closed: Normal operation
Closed --> Open: 5 failures in 30s
Open --> HalfOpen: After 15s cooldown
HalfOpen --> Closed: 3 consecutive successes
HalfOpen --> Open: Any failure
Closed: All requests pass through
Open: Requests fail fast (< 1ms)\nReturn cached/fallback response
HalfOpen: Limited requests (20%) pass through\nTest if service recovered
MangaAssist circuit breaker settings:
| Service | Failure Threshold | Cooldown | Recovery Probes | Fallback |
|---|---|---|---|---|
| Bedrock Sonnet | 5 in 30s | 15s | 3 successes | Route to Haiku |
| OpenSearch | 3 in 20s | 10s | 2 successes | Return cached results |
| DynamoDB | 5 in 30s | 10s | 3 successes | Return minimal session |
| Redis | 3 in 10s | 5s | 2 successes | Skip cache (passthrough) |
Bulkhead Isolation
MangaAssist isolates workloads to prevent one slow service from consuming all resources:
graph TB
subgraph "ECS Task Thread Pool (100 threads)"
subgraph "Bedrock Bulkhead (40 threads)"
B1[Bedrock Call 1]
B2[Bedrock Call 2]
BN[Bedrock Call N]
end
subgraph "OpenSearch Bulkhead (25 threads)"
O1[OS Query 1]
O2[OS Query 2]
end
subgraph "DynamoDB Bulkhead (20 threads)"
D1[DDB Read 1]
D2[DDB Write 1]
end
subgraph "Redis Bulkhead (15 threads)"
R1[Cache Read 1]
R2[Cache Write 1]
end
end
style B1 fill:#ff9900,color:#000
style B2 fill:#ff9900,color:#000
style BN fill:#ff9900,color:#000
style O1 fill:#3498db,color:#fff
style O2 fill:#3498db,color:#fff
style D1 fill:#2ecc71,color:#000
style D2 fill:#2ecc71,color:#000
style R1 fill:#e74c3c,color:#fff
style R2 fill:#e74c3c,color:#fff
If Bedrock becomes slow, only the Bedrock bulkhead's 40 threads are blocked. OpenSearch, DynamoDB, and Redis bulkheads continue operating normally, ensuring cache hits and simple queries still return fast.
5. End-to-End Request Profiling
X-Ray Segments for the Full MangaAssist Pipeline
Each request generates a trace with the following segment hierarchy:
Trace: manga-assist-request (e2e)
|-- Segment: api-gateway-websocket (8-25ms)
|-- Segment: ecs-orchestrator (2-5ms overhead)
| |-- Subsegment: intent-classification (1-3ms)
| |-- Subsegment: request-validation (< 1ms)
|-- Segment: cache-lookup (1-3ms)
|-- Segment: parallel-data-retrieval
| |-- Subsegment: opensearch-vector-search (60-150ms)
| |-- Subsegment: dynamodb-session-load (4-12ms)
|-- Segment: prompt-assembly (2-5ms)
|-- Segment: bedrock-invoke (200-800ms)
| |-- Subsegment: request-serialization (< 1ms)
| |-- Subsegment: network-transit (5-15ms)
| |-- Subsegment: model-inference (190-780ms)
| |-- Subsegment: response-deserialization (< 1ms)
|-- Segment: guardrails-check (15-40ms)
|-- Segment: response-streaming (5-15ms)
|-- Segment: async-postprocessing (fire-and-forget)
|-- Subsegment: cache-write (2-5ms)
|-- Subsegment: session-update (3-8ms)
|-- Subsegment: analytics-emit (1-2ms)
Automated Bottleneck Detection
MangaAssist runs an automated analysis every 5 minutes that:
- Queries X-Ray for the last 5 minutes of traces
- Identifies the segment with the highest contribution to p95 latency
- Compares against the latency budget allocation
- Emits a CloudWatch alarm if any segment exceeds its budget by > 20%
6. Network Optimization — VPC Endpoints
Why VPC Endpoints Matter
Without VPC endpoints, traffic from ECS Fargate to AWS services traverses:
ECS Task → NAT Gateway → Internet Gateway → AWS Service Endpoint
This adds 10-20ms per call due to NAT Gateway processing. With VPC endpoints:
ECS Task → VPC Endpoint (private link) → AWS Service Endpoint
MangaAssist VPC Endpoint Configuration
| Service | Endpoint Type | Latency Before (via NAT) | Latency After (VPC Endpoint) | Savings |
|---|---|---|---|---|
| Bedrock | Interface | 45ms | 30ms | 15ms |
| DynamoDB | Gateway | 18ms | 8ms | 10ms |
| OpenSearch Serverless | Interface | 72ms | 60ms | 12ms |
| S3 (model artifacts) | Gateway | 15ms | 8ms | 7ms |
| CloudWatch (telemetry) | Interface | 12ms | 6ms | 6ms |
Total savings per request (hitting all services): ~50ms reduction — significant when the p95 budget is 2,000ms.
Cost Consideration
VPC endpoints have an hourly cost (~$0.01/hr per endpoint per AZ). For MangaAssist at scale, the cost is negligible compared to the latency improvement and the NAT Gateway data processing charges eliminated ($0.045/GB).
Python Code — RequestProfiler with X-Ray Integration
"""
MangaAssist Request Profiler — X-Ray Integration
Profiles every segment of the MangaAssist request pipeline and
detects bottlenecks against the latency budget.
"""
import time
import json
import logging
import statistics
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum
import boto3
from aws_xray_sdk.core import xray_recorder, patch_all
from aws_xray_sdk.core.models.subsegment import Subsegment
logger = logging.getLogger(__name__)
# Patch boto3 and other libraries for automatic X-Ray tracing
patch_all()
class ServiceSegment(str, Enum):
"""Named segments in the MangaAssist request pipeline."""
API_GATEWAY = "api-gateway"
ECS_ORCHESTRATOR = "ecs-orchestrator"
CACHE_LOOKUP = "cache-lookup"
VECTOR_SEARCH = "vector-search"
DYNAMO_SESSION = "dynamo-session"
PROMPT_ASSEMBLY = "prompt-assembly"
BEDROCK_INVOKE = "bedrock-invoke"
GUARDRAILS_CHECK = "guardrails-check"
RESPONSE_STREAM = "response-stream"
CACHE_WRITE = "cache-write-async"
SESSION_UPDATE = "session-update-async"
@dataclass
class LatencyBudget:
"""Defines the p95 latency budget for each segment (in ms)."""
segment: ServiceSegment
budget_ms: float
alert_threshold_pct: float = 20.0 # alert if exceeded by this %
# MangaAssist latency budget allocation
LATENCY_BUDGETS = {
ServiceSegment.API_GATEWAY: LatencyBudget(ServiceSegment.API_GATEWAY, 25),
ServiceSegment.ECS_ORCHESTRATOR: LatencyBudget(ServiceSegment.ECS_ORCHESTRATOR, 15),
ServiceSegment.CACHE_LOOKUP: LatencyBudget(ServiceSegment.CACHE_LOOKUP, 3),
ServiceSegment.VECTOR_SEARCH: LatencyBudget(ServiceSegment.VECTOR_SEARCH, 150),
ServiceSegment.DYNAMO_SESSION: LatencyBudget(ServiceSegment.DYNAMO_SESSION, 12),
ServiceSegment.PROMPT_ASSEMBLY: LatencyBudget(ServiceSegment.PROMPT_ASSEMBLY, 5),
ServiceSegment.BEDROCK_INVOKE: LatencyBudget(ServiceSegment.BEDROCK_INVOKE, 800),
ServiceSegment.GUARDRAILS_CHECK: LatencyBudget(ServiceSegment.GUARDRAILS_CHECK, 40),
ServiceSegment.RESPONSE_STREAM: LatencyBudget(ServiceSegment.RESPONSE_STREAM, 15),
}
@dataclass
class SegmentTiming:
"""Recorded timing for a single segment execution."""
segment: ServiceSegment
start_ms: float
end_ms: float
metadata: dict = field(default_factory=dict)
@property
def duration_ms(self) -> float:
return self.end_ms - self.start_ms
@dataclass
class RequestProfile:
"""Complete profile of a single MangaAssist request."""
request_id: str
intent: str
model_used: str
cache_hit: bool
timings: list[SegmentTiming] = field(default_factory=list)
total_latency_ms: float = 0.0
bottleneck_segment: Optional[str] = None
def identify_bottleneck(self) -> SegmentTiming:
"""Find the segment that consumed the most time."""
if not self.timings:
return None
bottleneck = max(self.timings, key=lambda t: t.duration_ms)
self.bottleneck_segment = bottleneck.segment.value
return bottleneck
class RequestProfiler:
"""
Profiles MangaAssist requests with X-Ray integration.
Usage:
profiler = RequestProfiler()
with profiler.trace_request(request_id, intent, model) as profile:
with profiler.trace_segment(ServiceSegment.CACHE_LOOKUP):
result = redis_client.get(cache_key)
with profiler.trace_segment(ServiceSegment.VECTOR_SEARCH, {"k": 5}):
docs = opensearch_client.search(query)
with profiler.trace_segment(ServiceSegment.BEDROCK_INVOKE, {"model": "sonnet"}):
response = bedrock_client.invoke_model(body)
# profile now contains all timings and bottleneck analysis
"""
def __init__(self, cloudwatch_namespace: str = "MangaAssist/Performance"):
self.cw_client = boto3.client("cloudwatch")
self.cw_namespace = cloudwatch_namespace
self._current_profile: Optional[RequestProfile] = None
self._profiles_buffer: list[RequestProfile] = []
self._buffer_limit = 100
class trace_request:
"""Context manager for tracing a full request lifecycle."""
def __init__(self, profiler: "RequestProfiler", request_id: str,
intent: str, model: str):
self.profiler = profiler
self.request_id = request_id
self.intent = intent
self.model = model
self.start_time = None
def __enter__(self) -> RequestProfile:
self.start_time = time.monotonic() * 1000
self.profile = RequestProfile(
request_id=self.request_id,
intent=self.intent,
model_used=self.model,
cache_hit=False,
)
self.profiler._current_profile = self.profile
# Begin X-Ray segment
xray_recorder.begin_segment(
name="manga-assist-request",
traceid=None, # auto-generated
)
xray_recorder.current_segment().put_annotation("intent", self.intent)
xray_recorder.current_segment().put_annotation("model", self.model)
xray_recorder.current_segment().put_annotation("request_id", self.request_id)
return self.profile
def __exit__(self, exc_type, exc_val, exc_tb):
end_time = time.monotonic() * 1000
self.profile.total_latency_ms = end_time - self.start_time
self.profile.identify_bottleneck()
# Close X-Ray segment
xray_recorder.current_segment().put_metadata(
"total_latency_ms", self.profile.total_latency_ms
)
xray_recorder.current_segment().put_metadata(
"bottleneck", self.profile.bottleneck_segment
)
xray_recorder.end_segment()
# Emit metrics and buffer the profile
self.profiler._emit_metrics(self.profile)
self.profiler._buffer_profile(self.profile)
self.profiler._current_profile = None
return False # do not suppress exceptions
class trace_segment:
"""Context manager for tracing a single service segment."""
def __init__(self, profiler: "RequestProfiler",
segment: ServiceSegment, metadata: dict = None):
self.profiler = profiler
self.segment = segment
self.metadata = metadata or {}
def __enter__(self):
self.start_ms = time.monotonic() * 1000
xray_recorder.begin_subsegment(self.segment.value)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
end_ms = time.monotonic() * 1000
timing = SegmentTiming(
segment=self.segment,
start_ms=self.start_ms,
end_ms=end_ms,
metadata=self.metadata,
)
subsegment = xray_recorder.current_subsegment()
if subsegment:
for k, v in self.metadata.items():
subsegment.put_metadata(k, v)
if exc_type:
subsegment.add_exception(exc_val, [])
xray_recorder.end_subsegment()
if self.profiler._current_profile:
self.profiler._current_profile.timings.append(timing)
# Check budget violation
budget = LATENCY_BUDGETS.get(self.segment)
if budget and timing.duration_ms > budget.budget_ms * (1 + budget.alert_threshold_pct / 100):
logger.warning(
"Segment %s exceeded budget: %.1fms (budget: %.1fms, threshold: %.0f%%)",
self.segment.value, timing.duration_ms,
budget.budget_ms, budget.alert_threshold_pct,
)
return False
def _emit_metrics(self, profile: RequestProfile):
"""Emit per-segment latency metrics to CloudWatch."""
metric_data = []
# Total latency
metric_data.append({
"MetricName": "RequestLatency",
"Value": profile.total_latency_ms,
"Unit": "Milliseconds",
"Dimensions": [
{"Name": "Intent", "Value": profile.intent},
{"Name": "Model", "Value": profile.model_used},
],
})
# Per-segment latency
for timing in profile.timings:
metric_data.append({
"MetricName": f"SegmentLatency_{timing.segment.value}",
"Value": timing.duration_ms,
"Unit": "Milliseconds",
"Dimensions": [
{"Name": "Segment", "Value": timing.segment.value},
],
})
# Bottleneck indicator
if profile.bottleneck_segment:
metric_data.append({
"MetricName": "BottleneckSegment",
"Value": 1,
"Unit": "Count",
"Dimensions": [
{"Name": "Segment", "Value": profile.bottleneck_segment},
],
})
try:
self.cw_client.put_metric_data(
Namespace=self.cw_namespace,
MetricData=metric_data,
)
except Exception as e:
logger.error("Failed to emit metrics: %s", e)
def _buffer_profile(self, profile: RequestProfile):
"""Buffer profiles for batch analysis."""
self._profiles_buffer.append(profile)
if len(self._profiles_buffer) >= self._buffer_limit:
self._analyze_batch()
def _analyze_batch(self):
"""Analyze buffered profiles for systemic issues."""
if not self._profiles_buffer:
return
# Per-segment p95 analysis
segment_latencies: dict[str, list[float]] = {}
for profile in self._profiles_buffer:
for timing in profile.timings:
segment_latencies.setdefault(timing.segment.value, []).append(
timing.duration_ms
)
budget_violations = []
for segment_name, latencies in segment_latencies.items():
sorted_latencies = sorted(latencies)
p95_idx = int(len(sorted_latencies) * 0.95)
p95 = sorted_latencies[min(p95_idx, len(sorted_latencies) - 1)]
try:
segment_enum = ServiceSegment(segment_name)
budget = LATENCY_BUDGETS.get(segment_enum)
if budget and p95 > budget.budget_ms:
budget_violations.append({
"segment": segment_name,
"p95_ms": round(p95, 1),
"budget_ms": budget.budget_ms,
"overage_pct": round((p95 / budget.budget_ms - 1) * 100, 1),
})
except ValueError:
pass
if budget_violations:
logger.warning(
"Budget violations detected in last %d requests: %s",
len(self._profiles_buffer),
json.dumps(budget_violations, indent=2),
)
self._profiles_buffer.clear()
# --- Usage Example ---
def handle_manga_assist_request(request_id: str, user_query: str):
"""Example: Profile a MangaAssist request end-to-end."""
profiler = RequestProfiler()
# Determine intent and model (simplified)
intent = "product_recommendation"
model = "claude-3-sonnet"
ctx = profiler.trace_request(profiler, request_id, intent, model)
with ctx as profile:
# Cache check
with profiler.trace_segment(profiler, ServiceSegment.CACHE_LOOKUP):
cache_result = None # redis_client.get(cache_key)
if cache_result:
profile.cache_hit = True
return cache_result
# Parallel data retrieval (simplified as sequential here)
with profiler.trace_segment(profiler, ServiceSegment.VECTOR_SEARCH, {"k": 5}):
search_results = [] # opensearch_client.search(query)
with profiler.trace_segment(profiler, ServiceSegment.DYNAMO_SESSION):
session_data = {} # dynamodb_table.get_item(Key={"session_id": sid})
# Bedrock invocation
with profiler.trace_segment(profiler, ServiceSegment.BEDROCK_INVOKE, {"model": model}):
response = "" # bedrock_client.invoke_model(body=prompt)
# Guardrails
with profiler.trace_segment(profiler, ServiceSegment.GUARDRAILS_CHECK):
validated = True # guardrails_client.apply_guardrail(response)
return response
Python Code — ConnectionPoolManager
"""
MangaAssist Connection Pool Manager
Manages persistent, pre-warmed connection pools for all downstream services.
Eliminates per-request connection overhead (30-50ms per cold connection).
"""
import time
import logging
import threading
from dataclasses import dataclass
from typing import Optional
from enum import Enum
from contextlib import contextmanager
import boto3
from botocore.config import Config as BotoConfig
import redis
from opensearchpy import OpenSearch, RequestsHttpConnection
logger = logging.getLogger(__name__)
class DownstreamService(str, Enum):
"""Downstream services in the MangaAssist architecture."""
BEDROCK = "bedrock-runtime"
OPENSEARCH = "opensearch"
DYNAMODB = "dynamodb"
REDIS = "redis"
@dataclass
class PoolConfig:
"""Configuration for a single connection pool."""
service: DownstreamService
max_connections: int
max_idle_seconds: int
keep_alive: bool
connect_timeout_seconds: float
read_timeout_seconds: float
retry_max_attempts: int = 3
retry_mode: str = "adaptive"
# MangaAssist pool configurations
POOL_CONFIGS = {
DownstreamService.BEDROCK: PoolConfig(
service=DownstreamService.BEDROCK,
max_connections=10,
max_idle_seconds=60,
keep_alive=True,
connect_timeout_seconds=2.0,
read_timeout_seconds=30.0, # Sonnet can take up to 30s for long responses
retry_max_attempts=3,
retry_mode="adaptive",
),
DownstreamService.OPENSEARCH: PoolConfig(
service=DownstreamService.OPENSEARCH,
max_connections=20,
max_idle_seconds=120,
keep_alive=True,
connect_timeout_seconds=2.0,
read_timeout_seconds=5.0,
retry_max_attempts=2,
),
DownstreamService.DYNAMODB: PoolConfig(
service=DownstreamService.DYNAMODB,
max_connections=25,
max_idle_seconds=300,
keep_alive=True,
connect_timeout_seconds=1.0,
read_timeout_seconds=3.0,
retry_max_attempts=3,
retry_mode="adaptive",
),
DownstreamService.REDIS: PoolConfig(
service=DownstreamService.REDIS,
max_connections=50,
max_idle_seconds=600,
keep_alive=True,
connect_timeout_seconds=1.0,
read_timeout_seconds=2.0,
),
}
@dataclass
class PoolHealth:
"""Health status of a connection pool."""
service: str
active_connections: int
idle_connections: int
max_connections: int
total_requests: int
total_errors: int
avg_checkout_ms: float
last_health_check: float
class ConnectionPoolManager:
"""
Centralized connection pool manager for all MangaAssist downstream services.
Features:
- Pre-warms connections on ECS task startup
- Monitors pool health and emits CloudWatch metrics
- Graceful degradation when pools are exhausted
- Thread-safe connection checkout/checkin
Usage:
pool_mgr = ConnectionPoolManager()
pool_mgr.initialize() # Call during ECS task startup
bedrock_client = pool_mgr.get_bedrock_client()
opensearch_client = pool_mgr.get_opensearch_client()
dynamodb_table = pool_mgr.get_dynamodb_table("MangaAssist-Sessions")
redis_client = pool_mgr.get_redis_client()
# Periodic health check
health = pool_mgr.get_pool_health()
"""
def __init__(self, region: str = "ap-northeast-1"):
self.region = region
self._bedrock_client = None
self._opensearch_client = None
self._dynamodb_resource = None
self._redis_pool = None
self._redis_client = None
self._initialized = False
self._lock = threading.Lock()
self._stats = {svc: {"requests": 0, "errors": 0, "checkout_times": []}
for svc in DownstreamService}
def initialize(self):
"""
Initialize all connection pools. Call once during ECS task startup.
This pre-warms connections to avoid cold-start latency on first request.
"""
if self._initialized:
return
with self._lock:
if self._initialized:
return
logger.info("Initializing MangaAssist connection pools...")
start = time.monotonic()
self._init_bedrock()
self._init_opensearch()
self._init_dynamodb()
self._init_redis()
elapsed = (time.monotonic() - start) * 1000
logger.info("All connection pools initialized in %.1fms", elapsed)
self._initialized = True
def _init_bedrock(self):
"""Initialize Bedrock client with connection pooling."""
config = POOL_CONFIGS[DownstreamService.BEDROCK]
boto_config = BotoConfig(
region_name=self.region,
retries={"max_attempts": config.retry_max_attempts, "mode": config.retry_mode},
max_pool_connections=config.max_connections,
connect_timeout=config.connect_timeout_seconds,
read_timeout=config.read_timeout_seconds,
)
self._bedrock_client = boto3.client(
"bedrock-runtime",
config=boto_config,
)
# Pre-warm: send a minimal request to establish connections
try:
self._bedrock_client.invoke_model(
modelId="anthropic.claude-3-haiku-20240307-v1:0",
contentType="application/json",
accept="application/json",
body='{"anthropic_version":"bedrock-2023-05-31","max_tokens":1,"messages":[{"role":"user","content":"hi"}]}',
)
logger.info("Bedrock connection pre-warmed successfully")
except Exception as e:
logger.warning("Bedrock pre-warm failed (non-critical): %s", e)
def _init_opensearch(self):
"""Initialize OpenSearch client with connection pooling."""
config = POOL_CONFIGS[DownstreamService.OPENSEARCH]
self._opensearch_client = OpenSearch(
hosts=[{"host": "manga-assist-collection.ap-northeast-1.aoss.amazonaws.com",
"port": 443}],
http_auth=None, # Uses IAM via SigV4
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
pool_maxsize=config.max_connections,
timeout=config.read_timeout_seconds,
)
logger.info("OpenSearch connection pool initialized (max=%d)", config.max_connections)
def _init_dynamodb(self):
"""Initialize DynamoDB resource with connection pooling."""
config = POOL_CONFIGS[DownstreamService.DYNAMODB]
boto_config = BotoConfig(
region_name=self.region,
retries={"max_attempts": config.retry_max_attempts, "mode": config.retry_mode},
max_pool_connections=config.max_connections,
connect_timeout=config.connect_timeout_seconds,
read_timeout=config.read_timeout_seconds,
)
self._dynamodb_resource = boto3.resource("dynamodb", config=boto_config)
logger.info("DynamoDB connection pool initialized (max=%d)", config.max_connections)
def _init_redis(self):
"""Initialize Redis connection pool."""
config = POOL_CONFIGS[DownstreamService.REDIS]
self._redis_pool = redis.ConnectionPool(
host="manga-assist-cache.xxxx.apne1.cache.amazonaws.com",
port=6379,
db=0,
max_connections=config.max_connections,
socket_timeout=config.read_timeout_seconds,
socket_connect_timeout=config.connect_timeout_seconds,
socket_keepalive=config.keep_alive,
retry_on_timeout=True,
)
self._redis_client = redis.Redis(connection_pool=self._redis_pool)
# Pre-warm: ping to establish initial connections
try:
self._redis_client.ping()
logger.info("Redis connection pre-warmed successfully")
except Exception as e:
logger.warning("Redis pre-warm failed (non-critical): %s", e)
def get_bedrock_client(self):
"""Return the pooled Bedrock runtime client."""
self._ensure_initialized()
self._stats[DownstreamService.BEDROCK]["requests"] += 1
return self._bedrock_client
def get_opensearch_client(self) -> OpenSearch:
"""Return the pooled OpenSearch client."""
self._ensure_initialized()
self._stats[DownstreamService.OPENSEARCH]["requests"] += 1
return self._opensearch_client
def get_dynamodb_table(self, table_name: str):
"""Return a DynamoDB Table resource using the pooled connection."""
self._ensure_initialized()
self._stats[DownstreamService.DYNAMODB]["requests"] += 1
return self._dynamodb_resource.Table(table_name)
def get_redis_client(self) -> redis.Redis:
"""Return the pooled Redis client."""
self._ensure_initialized()
self._stats[DownstreamService.REDIS]["requests"] += 1
return self._redis_client
def get_pool_health(self) -> list[PoolHealth]:
"""Return health status for all connection pools."""
health_reports = []
# Redis pool has introspection
if self._redis_pool:
pool_info = self._redis_pool.connection_kwargs
health_reports.append(PoolHealth(
service=DownstreamService.REDIS.value,
active_connections=len(self._redis_pool._in_use_connections)
if hasattr(self._redis_pool, "_in_use_connections") else -1,
idle_connections=self._redis_pool.pool.qsize()
if hasattr(self._redis_pool, "pool") else -1,
max_connections=POOL_CONFIGS[DownstreamService.REDIS].max_connections,
total_requests=self._stats[DownstreamService.REDIS]["requests"],
total_errors=self._stats[DownstreamService.REDIS]["errors"],
avg_checkout_ms=0.0,
last_health_check=time.time(),
))
# For boto3 clients, pool introspection is limited
for svc in [DownstreamService.BEDROCK, DownstreamService.OPENSEARCH,
DownstreamService.DYNAMODB]:
health_reports.append(PoolHealth(
service=svc.value,
active_connections=-1, # boto3 does not expose this
idle_connections=-1,
max_connections=POOL_CONFIGS[svc].max_connections,
total_requests=self._stats[svc]["requests"],
total_errors=self._stats[svc]["errors"],
avg_checkout_ms=0.0,
last_health_check=time.time(),
))
return health_reports
def _ensure_initialized(self):
if not self._initialized:
raise RuntimeError(
"ConnectionPoolManager not initialized. Call initialize() first."
)
def shutdown(self):
"""Gracefully close all connection pools."""
logger.info("Shutting down connection pools...")
if self._redis_pool:
self._redis_pool.disconnect()
self._initialized = False
logger.info("All connection pools closed.")
Optimization Technique Comparison
| Technique | Latency Saved (p95) | Implementation Effort | Risk | Notes |
|---|---|---|---|---|
| VPC endpoints for Bedrock | 15ms | Low (Terraform/CDK) | Very Low | One-time infra change |
| VPC endpoints for DynamoDB | 10ms | Low (Terraform/CDK) | Very Low | Gateway endpoint — free |
| VPC endpoints for OpenSearch | 12ms | Low (Terraform/CDK) | Very Low | Interface endpoint |
| Connection pool pre-warming | 30-50ms (first req) | Medium (app code) | Low | Reduces cold start |
| HTTP keep-alive to Bedrock | 15-25ms per call | Low (SDK config) | Very Low | Default in latest SDK |
| OpenSearch filter optimization | 65ms | Medium (query tuning) | Low | Requires index analysis |
| OpenSearch source field selection | 20ms | Low (query change) | Very Low | Immediate improvement |
| Async cache writes | 8-15ms | Medium (app refactor) | Medium | Must handle write failures |
| Async session updates | 5-10ms | Medium (app refactor) | Medium | Eventual consistency ok |
| Circuit breakers | 0ms (normal) / prevents cascade | Medium (library) | Low | Improves resilience |
| Bulkhead isolation | 0ms (normal) / prevents starvation | Medium (threading) | Medium | Requires load testing |
| gRPC for internal services | 6ms per internal call | High (protocol change) | Medium | Only if multiple internal hops |
| DNS resolution caching | 5-10ms per call | Low (OS/container config) | Very Low | Reduces DNS lookups |
| Response streaming | -200ms (perceived) | Medium (API change) | Low | First token earlier |
Key Takeaways
-
Bedrock invocation dominates latency — at 200-800ms p95, it consumes 60-75% of the total latency budget. Focus optimization on reducing everything else so the budget absorbs Bedrock variability.
-
VPC endpoints are the easiest win — 37ms saved with near-zero risk. Deploy them before touching application code.
-
Connection pooling and pre-warming eliminate cold-start spikes — the first request on a new ECS task can be 200ms slower without pre-warming.
-
OpenSearch query optimization has outsized impact — filtering before vector search and returning only needed fields can cut vector retrieval time in half.
-
Async patterns remove non-critical work from the hot path — cache writes, session updates, and analytics emissions should never block response delivery.
-
Profile before you optimize — X-Ray traces reveal the actual bottleneck, which is often not where you expect. Invest in profiling infrastructure first.