Responsive AI Architecture for GenAI Applications
AWS AIP-C01 Task 4.2 — Skill 4.2.1: Design responsive AI systems that minimize latency while balancing cost and quality Context: MangaAssist e-commerce chatbot — Bedrock Claude 3 Sonnet/Haiku, OpenSearch Serverless, DynamoDB, ECS Fargate, API Gateway WebSocket, ElastiCache Redis North Star: End-to-end response < 3s, first token to screen < 400ms
Skill Mapping
| Certification | Domain | Task | Skill |
|---|---|---|---|
| AWS AIP-C01 | Domain 4 — Operational Efficiency | Task 4.2 — Optimize FM application performance | Skill 4.2.1 — Identify techniques to create responsive AI systems (e.g., pre-computation, latency-optimized model selection, parallel request processing, response streaming, performance benchmarking) |
Skill scope: Architect end-to-end responsive AI pipelines that deliver sub-second perceived latency through pre-computation, parallel orchestration, intelligent model routing, response streaming, and continuous performance benchmarking — all grounded in cost-aware tradeoff analysis.
Mind Map — Responsive AI Dimensions
mindmap
root((Responsive<br/>AI Systems))
Pre-Computation
Trending Manga Responses
Pre-generate for Top 100 Titles
Nightly Batch with Haiku
Cache in ElastiCache Redis
Catalog Embedding Warm-Up
New Release Embeddings
Pre-index in OpenSearch
Background Vectorization
Recommendation Pre-Build
Popular Genre Results
Personalized Top-N per Segment
Scheduled Recomputation
Latency-Optimized Models
Haiku for Time-Sensitive
Order Status < 500ms
Simple FAQ < 300ms
Greeting / Chitchat < 200ms
Sonnet for Deep Queries
Manga Recommendations
Plot Comparison
Complex Product Q&A
Dynamic Model Selection
Intent Classification
Complexity Scoring
Latency Budget Routing
Parallel Request Processing
Concurrent Data Fetch
RAG Retrieval
DynamoDB Session Load
User Profile Fetch
asyncio.gather Pattern
Fan-Out with Timeout
Partial Result Handling
Graceful Degradation
Dependency Graph
Independent vs Sequential
Critical Path Analysis
Waterfall Elimination
Response Streaming
Bedrock Streaming API
invoke_model_with_response_stream
Chunk-by-Chunk Delivery
Server-Sent Events
WebSocket Delivery
API Gateway WebSocket
Binary Frame Optimization
Connection Keep-Alive
Progressive Rendering
First Token Priority
Incremental Display
Skeleton UI Patterns
Performance Benchmarking
Latency Targets per Intent
p50 / p95 / p99 Definitions
SLA per Query Type
Budget Allocation
Automated Profiling
Continuous Latency Tests
Statistical Significance
Regression Detection
Cost-Latency Tradeoff
Quality x Speed / Cost
Pareto Frontier Analysis
Budget-Constrained Optimization
Pre-Computation Strategies
Pre-computation shifts latency from request time to background processing. For MangaAssist, this means generating answers before customers ask.
What to Pre-Compute
| Pre-Computation Target | Trigger | Storage | TTL | Expected Hit Rate |
|---|---|---|---|---|
| Trending manga title descriptions | Hourly trending list update | ElastiCache Redis | 1 hour | ~35% of product queries |
| New catalog item embeddings | Catalog CDC event from DynamoDB | OpenSearch Serverless | Until next update | 100% of new items |
| Popular genre recommendation lists | Nightly batch job | ElastiCache Redis | 24 hours | ~20% of recommendation queries |
| FAQ answer variants | Weekly prompt refresh | ElastiCache Redis | 7 days | ~15% of support queries |
| Order status templates | Real-time order state change | DynamoDB + Redis | Until next state change | ~40% of order queries |
Pre-Computation Pipeline
graph LR
subgraph "Event Sources"
TREND[Trending API<br/>Hourly]
CATALOG[DynamoDB Streams<br/>CDC Events]
BATCH[EventBridge<br/>Nightly Schedule]
end
subgraph "Pre-Computation Engine"
LAMBDA_TREND[Lambda: Generate<br/>Trending Responses]
LAMBDA_EMBED[Lambda: Compute<br/>Embeddings]
LAMBDA_REC[Lambda: Build<br/>Recommendations]
end
subgraph "Pre-Computed Storage"
REDIS[ElastiCache Redis<br/>Response Cache]
OPENSEARCH[OpenSearch<br/>Embedding Index]
end
TREND --> LAMBDA_TREND --> REDIS
CATALOG --> LAMBDA_EMBED --> OPENSEARCH
BATCH --> LAMBDA_REC --> REDIS
style REDIS fill:#2ecc71,color:#000
style OPENSEARCH fill:#ff9900,color:#000
style LAMBDA_TREND fill:#ff9900,color:#000
style LAMBDA_EMBED fill:#ff9900,color:#000
style LAMBDA_REC fill:#ff9900,color:#000
Pre-Computation Cost Analysis
| Strategy | Compute Cost (Monthly) | Storage Cost (Monthly) | Latency Saved | Net Savings at 1M queries/day |
|---|---|---|---|---|
| Trending responses (Haiku) | $45 (hourly batch) | $12 (Redis) | 1.2s per hit | $4,200 (avoided Sonnet calls) |
| Catalog embeddings | $28 (Lambda) | $8 (OpenSearch) | 0.8s per new-item query | $1,100 (faster retrieval) |
| Genre recommendations | $18 (nightly Haiku) | $6 (Redis) | 1.5s per hit | $2,800 (avoided full RAG+LLM) |
| Total | $91 | $26 | avg 1.1s | $8,100 saved |
Latency-Optimized Model Selection
Not every query deserves the same model. MangaAssist routes queries to the fastest model that meets quality requirements.
Intent-to-Model Routing Matrix
| Intent Category | Example Query | Model | Target Latency | Why This Model |
|---|---|---|---|---|
| Order status | "Where's my order #12345?" | Haiku | < 500ms | Template-based, low reasoning needed |
| Simple FAQ | "Do you ship to Osaka?" | Haiku | < 300ms | Factual, single-hop lookup |
| Greeting / Chitchat | "Hello!" | Haiku | < 200ms | Minimal generation |
| Manga recommendation | "Suggest something like One Piece" | Sonnet | < 2.5s | Multi-factor reasoning, quality matters |
| Plot comparison | "How does Naruto compare to Bleach?" | Sonnet | < 3.0s | Nuanced analysis, longer output |
| Complex product Q&A | "What special editions of AoT exist?" | Sonnet | < 2.0s | RAG + synthesis of multiple documents |
| Complaint handling | "I received a damaged volume" | Sonnet | < 2.0s | Empathy + policy lookup + resolution |
Model Selection Decision Flow
flowchart TD
START[Incoming Query] --> CLASSIFY[Intent Classifier<br/>Haiku - 50ms]
CLASSIFY --> CHECK_COMPLEXITY{Complexity<br/>Score}
CHECK_COMPLEXITY -->|Score < 0.3<br/>Simple| HAIKU_PATH[Route to Haiku]
CHECK_COMPLEXITY -->|Score 0.3-0.6<br/>Medium| CHECK_LATENCY{Latency<br/>Budget?}
CHECK_COMPLEXITY -->|Score > 0.6<br/>Complex| SONNET_PATH[Route to Sonnet]
CHECK_LATENCY -->|Budget < 1s| HAIKU_PATH
CHECK_LATENCY -->|Budget >= 1s| SONNET_PATH
HAIKU_PATH --> HAIKU[Claude 3 Haiku<br/>~150ms generation]
SONNET_PATH --> SONNET[Claude 3 Sonnet<br/>~800ms generation]
HAIKU --> QUALITY_CHECK{Quality<br/>Gate}
QUALITY_CHECK -->|Pass| RESPOND[Stream Response]
QUALITY_CHECK -->|Fail: Low confidence| SONNET
SONNET --> RESPOND
style HAIKU fill:#2ecc71,color:#000
style SONNET fill:#3498db,color:#fff
style CLASSIFY fill:#e74c3c,color:#fff
Latency-Cost Tradeoff Framework
The optimization function balances three dimensions:
Score = (Quality_weight x Quality) x (Speed_weight x 1/Latency) / Cost
| Intent | Quality Weight | Speed Weight | Optimal Model | Score |
|---|---|---|---|---|
| Order status | 0.6 | 0.9 | Haiku | 8.4 |
| Manga recommendation | 0.9 | 0.5 | Sonnet | 6.2 |
| Complaint handling | 0.95 | 0.6 | Sonnet | 5.8 |
| FAQ | 0.5 | 0.85 | Haiku | 9.1 |
| Greeting | 0.3 | 0.95 | Haiku | 11.2 |
Parallel Request Orchestration
MangaAssist fires independent data fetches concurrently instead of sequentially. This collapses the waterfall.
Sequential vs Parallel Comparison
gantt
title Sequential Execution (Before) — Total: 1850ms
dateFormat X
axisFormat %L ms
section Sequential
RAG Retrieval (OpenSearch) :0, 650
DynamoDB Session Load :650, 850
User Profile Fetch :850, 1000
ElastiCache Check :1000, 1050
Bedrock Invocation :1050, 1850
gantt
title Parallel Execution (After) — Total: 1450ms
dateFormat X
axisFormat %L ms
section Parallel Fan-Out
RAG Retrieval (OpenSearch) :0, 650
DynamoDB Session Load :0, 200
User Profile Fetch :0, 150
ElastiCache Check :0, 50
section Sequential (depends on fan-out)
Bedrock Invocation :650, 1450
Result: Parallel fan-out saves 400ms (from 1850ms to 1450ms) by executing independent data fetches concurrently. The critical path is dominated by RAG retrieval (650ms) + Bedrock invocation (800ms).
ParallelOrchestrator Implementation
import asyncio
import time
from dataclasses import dataclass, field
from typing import Any, Optional
import aioboto3
from aws_lambda_powertools import Logger, Metrics, Tracer
logger = Logger(service="mangaassist-orchestrator")
tracer = Tracer(service="mangaassist-orchestrator")
metrics = Metrics(namespace="MangaAssist/Orchestrator")
@dataclass
class ParallelResult:
"""Container for parallel execution results with timing metadata."""
rag_context: Optional[list[dict]] = None
session_history: Optional[list[dict]] = None
user_profile: Optional[dict] = None
cached_response: Optional[str] = None
timings: dict[str, float] = field(default_factory=dict)
errors: dict[str, str] = field(default_factory=dict)
class ParallelOrchestrator:
"""
Orchestrates concurrent data fetches for MangaAssist queries.
Fires RAG retrieval, session history load, user profile fetch,
and cache check concurrently using asyncio.gather. Each task
has an independent timeout to prevent one slow call from
blocking the entire pipeline.
"""
def __init__(
self,
opensearch_client,
dynamodb_resource,
redis_client,
rag_timeout: float = 2.0,
session_timeout: float = 1.0,
profile_timeout: float = 1.0,
cache_timeout: float = 0.5,
):
self.opensearch = opensearch_client
self.dynamodb = dynamodb_resource
self.redis = redis_client
self.rag_timeout = rag_timeout
self.session_timeout = session_timeout
self.profile_timeout = profile_timeout
self.cache_timeout = cache_timeout
@tracer.capture_method
async def execute_parallel(
self, query: str, user_id: str, session_id: str
) -> ParallelResult:
"""
Execute all data fetches concurrently with individual timeouts.
Returns partial results if some fetches fail — the LLM can
still generate a response with degraded context rather than
failing entirely.
"""
result = ParallelResult()
start = time.monotonic()
tasks = {
"rag": self._fetch_rag_context(query),
"session": self._fetch_session_history(session_id),
"profile": self._fetch_user_profile(user_id),
"cache": self._check_cache(query, user_id),
}
timeouts = {
"rag": self.rag_timeout,
"session": self.session_timeout,
"profile": self.profile_timeout,
"cache": self.cache_timeout,
}
# Execute all tasks concurrently
gathered = await asyncio.gather(
*[
self._with_timeout(name, coro, timeouts[name])
for name, coro in tasks.items()
],
return_exceptions=True,
)
# Unpack results with error handling
for (name, _), outcome in zip(tasks.items(), gathered):
elapsed = time.monotonic() - start
if isinstance(outcome, Exception):
result.errors[name] = str(outcome)
result.timings[name] = elapsed
logger.warning(f"Parallel task {name} failed", error=str(outcome))
metrics.add_metric(
name=f"parallel_{name}_error", unit="Count", value=1
)
else:
task_result, task_time = outcome
result.timings[name] = task_time
setattr(result, self._result_field(name), task_result)
metrics.add_metric(
name=f"parallel_{name}_latency_ms",
unit="Milliseconds",
value=task_time * 1000,
)
total_time = time.monotonic() - start
result.timings["total_parallel"] = total_time
metrics.add_metric(
name="parallel_total_latency_ms",
unit="Milliseconds",
value=total_time * 1000,
)
logger.info("Parallel execution complete", timings=result.timings)
return result
async def _with_timeout(
self, name: str, coro, timeout: float
) -> tuple[Any, float]:
"""Wrap a coroutine with a per-task timeout."""
start = time.monotonic()
try:
result = await asyncio.wait_for(coro, timeout=timeout)
elapsed = time.monotonic() - start
return result, elapsed
except asyncio.TimeoutError:
raise TimeoutError(
f"Task '{name}' exceeded {timeout}s timeout"
)
@tracer.capture_method
async def _fetch_rag_context(self, query: str) -> list[dict]:
"""Retrieve relevant manga documents from OpenSearch vector store."""
embedding = await self._generate_embedding(query)
response = await self.opensearch.search(
index="manga-products",
body={
"size": 5,
"query": {
"knn": {
"embedding": {
"vector": embedding,
"k": 5,
}
}
},
},
)
return [hit["_source"] for hit in response["hits"]["hits"]]
@tracer.capture_method
async def _fetch_session_history(self, session_id: str) -> list[dict]:
"""Load conversation history from DynamoDB."""
table = self.dynamodb.Table("manga-sessions")
response = await table.query(
KeyConditionExpression="session_id = :sid",
ExpressionAttributeValues={":sid": session_id},
ScanIndexForward=False,
Limit=10,
)
return response.get("Items", [])
@tracer.capture_method
async def _fetch_user_profile(self, user_id: str) -> dict:
"""Fetch user preferences and purchase history."""
table = self.dynamodb.Table("manga-users")
response = await table.get_item(Key={"user_id": user_id})
return response.get("Item", {})
@tracer.capture_method
async def _check_cache(self, query: str, user_id: str) -> Optional[str]:
"""Check ElastiCache for a pre-computed or cached response."""
cache_key = f"response:{user_id}:{hash(query)}"
cached = await self.redis.get(cache_key)
if cached:
metrics.add_metric(name="cache_hit", unit="Count", value=1)
else:
metrics.add_metric(name="cache_miss", unit="Count", value=1)
return cached
async def _generate_embedding(self, text: str) -> list[float]:
"""Generate embedding via Bedrock Titan Embeddings."""
# In production, use aioboto3 session for async Bedrock call
session = aioboto3.Session()
async with session.client("bedrock-runtime") as bedrock:
response = await bedrock.invoke_model(
modelId="amazon.titan-embed-text-v2:0",
body='{"inputText": "' + text.replace('"', '\\"') + '"}',
)
import json
body = json.loads(await response["body"].read())
return body["embedding"]
@staticmethod
def _result_field(name: str) -> str:
"""Map task name to ParallelResult field."""
return {
"rag": "rag_context",
"session": "session_history",
"profile": "user_profile",
"cache": "cached_response",
}[name]
Response Streaming
Streaming delivers the first token to the user before the full response is generated. This dramatically improves perceived latency.
Streaming Architecture
sequenceDiagram
participant Client as Browser Client
participant APIGW as API Gateway<br/>WebSocket
participant ECS as ECS Fargate<br/>Orchestrator
participant Bedrock as Bedrock<br/>Claude 3
Client->>APIGW: Send query via WebSocket
APIGW->>ECS: Route to orchestrator
Note over ECS: Parallel fan-out (RAG + Session + Profile)
ECS->>Bedrock: invoke_model_with_response_stream()
loop Each chunk (~50-100 tokens)
Bedrock-->>ECS: StreamEvent: chunk
ECS-->>APIGW: WebSocket frame: chunk
APIGW-->>Client: Render incrementally
end
Bedrock-->>ECS: StreamEvent: end
ECS-->>APIGW: WebSocket frame: [DONE]
APIGW-->>Client: Finalize rendering
Note over Client: User saw first token at ~400ms<br/>Full response at ~2.5s
StreamingResponseHandler Implementation
import asyncio
import json
import time
from typing import AsyncIterator
import boto3
from aws_lambda_powertools import Logger, Metrics, Tracer
logger = Logger(service="mangaassist-streaming")
tracer = Tracer(service="mangaassist-streaming")
metrics = Metrics(namespace="MangaAssist/Streaming")
class StreamingResponseHandler:
"""
Manages Bedrock streaming responses and delivers chunks
to the client via API Gateway WebSocket.
Optimizations:
- Pre-loaded system prompt (avoids per-request prompt assembly latency)
- Warm connection pooling to Bedrock
- Chunk batching to reduce WebSocket frame overhead
- First-token latency tracking for SLA monitoring
"""
def __init__(
self,
bedrock_client,
apigw_management_client,
connection_id: str,
model_id: str = "anthropic.claude-3-sonnet-20240229-v1:0",
chunk_batch_size: int = 3,
):
self.bedrock = bedrock_client
self.apigw = apigw_management_client
self.connection_id = connection_id
self.model_id = model_id
self.chunk_batch_size = chunk_batch_size
self._system_prompt = self._load_system_prompt()
def _load_system_prompt(self) -> str:
"""Pre-load system prompt at initialization, not per-request."""
return (
"You are MangaAssist, a helpful assistant for a Japanese manga "
"e-commerce store. You help customers find manga, track orders, "
"and answer questions about products. Be concise and friendly. "
"Always respond in the language the customer uses."
)
@tracer.capture_method
async def stream_response(
self,
user_message: str,
rag_context: list[dict],
session_history: list[dict],
user_profile: dict,
) -> dict:
"""
Invoke Bedrock with streaming and deliver chunks via WebSocket.
Returns metadata about the streaming session including
first-token latency, total tokens, and total duration.
"""
start_time = time.monotonic()
first_token_time = None
total_tokens = 0
full_response = []
# Build the prompt with RAG context
messages = self._build_messages(
user_message, rag_context, session_history, user_profile
)
# Send "thinking" indicator immediately
await self._send_to_client({
"type": "status",
"message": "Looking up your manga...",
})
try:
# Invoke Bedrock with streaming
response = self.bedrock.invoke_model_with_response_stream(
modelId=self.model_id,
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1024,
"system": self._system_prompt,
"messages": messages,
}),
)
# Process the stream
chunk_buffer = []
async for chunk_text in self._parse_stream(response):
total_tokens += 1
if first_token_time is None:
first_token_time = time.monotonic()
first_token_latency = (
first_token_time - start_time
) * 1000
metrics.add_metric(
name="first_token_latency_ms",
unit="Milliseconds",
value=first_token_latency,
)
logger.info(
"First token delivered",
first_token_ms=first_token_latency,
)
full_response.append(chunk_text)
chunk_buffer.append(chunk_text)
# Batch chunks to reduce WebSocket frame overhead
if len(chunk_buffer) >= self.chunk_batch_size:
await self._send_to_client({
"type": "chunk",
"text": "".join(chunk_buffer),
})
chunk_buffer = []
# Flush remaining buffer
if chunk_buffer:
await self._send_to_client({
"type": "chunk",
"text": "".join(chunk_buffer),
})
# Send completion signal
await self._send_to_client({"type": "done"})
except Exception as e:
logger.exception("Streaming error", error=str(e))
await self._send_to_client({
"type": "error",
"message": "Sorry, I encountered an issue. Please try again.",
})
raise
total_duration = (time.monotonic() - start_time) * 1000
metrics.add_metric(
name="streaming_total_ms",
unit="Milliseconds",
value=total_duration,
)
metrics.add_metric(
name="streaming_tokens",
unit="Count",
value=total_tokens,
)
return {
"first_token_latency_ms": (
(first_token_time - start_time) * 1000
if first_token_time
else None
),
"total_duration_ms": total_duration,
"total_tokens": total_tokens,
"full_response": "".join(full_response),
}
async def _parse_stream(self, response) -> AsyncIterator[str]:
"""Parse Bedrock streaming response into text chunks."""
stream = response.get("body")
for event in stream:
chunk = event.get("chunk")
if chunk:
payload = json.loads(chunk.get("bytes", b"{}"))
if payload.get("type") == "content_block_delta":
delta = payload.get("delta", {})
if delta.get("type") == "text_delta":
yield delta.get("text", "")
async def _send_to_client(self, data: dict) -> None:
"""Send a message to the client via API Gateway WebSocket."""
try:
self.apigw.post_to_connection(
ConnectionId=self.connection_id,
Data=json.dumps(data).encode("utf-8"),
)
except self.apigw.exceptions.GoneException:
logger.warning(
"WebSocket connection gone",
connection_id=self.connection_id,
)
raise ConnectionError("Client disconnected")
def _build_messages(
self,
user_message: str,
rag_context: list[dict],
session_history: list[dict],
user_profile: dict,
) -> list[dict]:
"""Assemble the message array with RAG context and history."""
messages = []
# Add session history (last 5 turns)
for turn in session_history[-5:]:
messages.append({
"role": turn["role"],
"content": turn["content"],
})
# Build current user message with RAG context
context_block = "\n\n".join(
f"[Product: {doc.get('title', 'Unknown')}]\n{doc.get('description', '')}"
for doc in rag_context[:3]
)
augmented_message = (
f"<context>\n{context_block}\n</context>\n\n"
f"<user_preferences>\n"
f"Favorite genres: {user_profile.get('favorite_genres', 'unknown')}\n"
f"Language: {user_profile.get('language', 'ja')}\n"
f"</user_preferences>\n\n"
f"Customer question: {user_message}"
)
messages.append({"role": "user", "content": augmented_message})
return messages
Performance Benchmarking Framework
Latency Targets per Intent
| Intent | p50 Target | p95 Target | p99 Target | Model | Parallelism Strategy |
|---|---|---|---|---|---|
| Greeting | 150ms | 250ms | 400ms | Haiku | None (direct generation) |
| Simple FAQ | 200ms | 400ms | 600ms | Haiku | Cache check only |
| Order status | 300ms | 500ms | 800ms | Haiku | DynamoDB lookup parallel with cache |
| Product search | 800ms | 1500ms | 2200ms | Sonnet | Full parallel fan-out (RAG + Session + Profile) |
| Manga recommendation | 1200ms | 2500ms | 3000ms | Sonnet | Full parallel fan-out + streaming |
| Plot comparison | 1500ms | 2800ms | 3500ms | Sonnet | Full parallel fan-out + streaming |
| Complaint handling | 1000ms | 2000ms | 2800ms | Sonnet | Full parallel fan-out + streaming |
Automated Regression Detection
flowchart TD
START[Scheduled Benchmark<br/>Every 15 min] --> RUN[Execute Test Suite<br/>50 queries per intent]
RUN --> COLLECT[Collect Latency<br/>Distributions]
COLLECT --> COMPARE{p95 vs Baseline<br/>Mann-Whitney U Test}
COMPARE -->|p > 0.05<br/>No regression| LOG[Log to CloudWatch<br/>Update Dashboard]
COMPARE -->|p <= 0.05<br/>Regression detected| SEVERITY{Regression<br/>Severity}
SEVERITY -->|< 10% increase| WARN[CloudWatch Alarm<br/>SEV-3 Warning]
SEVERITY -->|10-25% increase| ALERT[PagerDuty Alert<br/>SEV-2]
SEVERITY -->|> 25% increase| CRITICAL[PagerDuty Page<br/>SEV-1 + Auto-Rollback]
WARN --> DASHBOARD[Update Grafana<br/>Dashboard]
ALERT --> DASHBOARD
CRITICAL --> ROLLBACK[Auto-Rollback<br/>Last Known Good Config] --> DASHBOARD
style CRITICAL fill:#e74c3c,color:#fff
style ALERT fill:#f39c12,color:#000
style WARN fill:#f1c40f,color:#000
style LOG fill:#2ecc71,color:#000
Benchmarking Metrics Collection
import asyncio
import statistics
import time
from dataclasses import dataclass, field
import boto3
from scipy import stats
@dataclass
class BenchmarkResult:
"""Results from a single benchmark run."""
intent: str
latencies_ms: list[float] = field(default_factory=list)
first_token_latencies_ms: list[float] = field(default_factory=list)
errors: int = 0
total_requests: int = 0
@property
def p50(self) -> float:
return self._percentile(50)
@property
def p95(self) -> float:
return self._percentile(95)
@property
def p99(self) -> float:
return self._percentile(99)
@property
def first_token_p50(self) -> float:
return self._percentile(50, self.first_token_latencies_ms)
@property
def first_token_p95(self) -> float:
return self._percentile(95, self.first_token_latencies_ms)
def _percentile(
self, pct: int, data: list[float] | None = None
) -> float:
d = data if data is not None else self.latencies_ms
if not d:
return 0.0
sorted_d = sorted(d)
idx = int(len(sorted_d) * pct / 100)
return sorted_d[min(idx, len(sorted_d) - 1)]
class LatencyBenchmarkRunner:
"""
Automated latency benchmarking for MangaAssist.
Runs test queries per intent, collects latency distributions,
and performs statistical regression detection against a baseline.
"""
BASELINE_TARGETS = {
"greeting": {"p50": 150, "p95": 250, "p99": 400},
"faq": {"p50": 200, "p95": 400, "p99": 600},
"order_status": {"p50": 300, "p95": 500, "p99": 800},
"product_search": {"p50": 800, "p95": 1500, "p99": 2200},
"recommendation": {"p50": 1200, "p95": 2500, "p99": 3000},
"comparison": {"p50": 1500, "p95": 2800, "p99": 3500},
"complaint": {"p50": 1000, "p95": 2000, "p99": 2800},
}
TEST_QUERIES = {
"greeting": ["Hello!", "Hi there", "Good evening"],
"faq": [
"Do you ship internationally?",
"What's your return policy?",
"Do you accept PayPal?",
],
"order_status": [
"Where is order #MNG-78901?",
"Track my latest order",
"When will my package arrive?",
],
"product_search": [
"Show me shonen manga under 1000 yen",
"New releases this week",
"Manga by Eiichiro Oda",
],
"recommendation": [
"Suggest manga like One Piece",
"Best manga for beginners",
"What's trending in seinen?",
],
"comparison": [
"Compare Naruto and Bleach",
"Dragon Ball vs One Piece",
"How is Jujutsu Kaisen different from Demon Slayer?",
],
"complaint": [
"My order arrived damaged",
"I received the wrong volume",
"I want a refund for my last purchase",
],
}
def __init__(self, orchestrator, cloudwatch_client=None):
self.orchestrator = orchestrator
self.cloudwatch = cloudwatch_client or boto3.client("cloudwatch")
async def run_full_benchmark(self) -> dict[str, BenchmarkResult]:
"""Run benchmark across all intents and detect regressions."""
results = {}
for intent, queries in self.TEST_QUERIES.items():
result = BenchmarkResult(
intent=intent, total_requests=len(queries)
)
for query in queries:
start = time.monotonic()
try:
response = await self.orchestrator.handle_query(query)
elapsed_ms = (time.monotonic() - start) * 1000
result.latencies_ms.append(elapsed_ms)
if "first_token_latency_ms" in response:
result.first_token_latencies_ms.append(
response["first_token_latency_ms"]
)
except Exception:
result.errors += 1
results[intent] = result
self._publish_metrics(result)
self._check_regression(intent, result)
return results
def _check_regression(
self, intent: str, result: BenchmarkResult
) -> None:
"""
Detect latency regression using Mann-Whitney U test
against baseline targets.
"""
baseline = self.BASELINE_TARGETS.get(intent, {})
if not baseline or not result.latencies_ms:
return
# Compare p95 against target
if result.p95 > baseline["p95"] * 1.25:
severity = "CRITICAL"
elif result.p95 > baseline["p95"] * 1.10:
severity = "WARNING"
else:
severity = "OK"
self.cloudwatch.put_metric_data(
Namespace="MangaAssist/Benchmark",
MetricData=[
{
"MetricName": f"regression_{intent}",
"Value": 1 if severity != "OK" else 0,
"Unit": "Count",
"Dimensions": [
{"Name": "Severity", "Value": severity},
],
}
],
)
def _publish_metrics(self, result: BenchmarkResult) -> None:
"""Publish benchmark metrics to CloudWatch."""
self.cloudwatch.put_metric_data(
Namespace="MangaAssist/Benchmark",
MetricData=[
{
"MetricName": f"{result.intent}_p50",
"Value": result.p50,
"Unit": "Milliseconds",
},
{
"MetricName": f"{result.intent}_p95",
"Value": result.p95,
"Unit": "Milliseconds",
},
{
"MetricName": f"{result.intent}_p99",
"Value": result.p99,
"Unit": "Milliseconds",
},
{
"MetricName": f"{result.intent}_first_token_p95",
"Value": result.first_token_p95,
"Unit": "Milliseconds",
},
],
)
End-to-End Architecture — Parallel Execution and Streaming Pipeline
graph TB
subgraph "Client Layer"
CLIENT[Browser / Mobile App]
end
subgraph "API Layer"
APIGW[API Gateway<br/>WebSocket]
end
subgraph "Orchestration Layer — ECS Fargate"
ROUTER[Intent Classifier<br/>+ Model Router]
PARALLEL[ParallelOrchestrator<br/>asyncio.gather]
STREAMER[StreamingResponseHandler<br/>Chunk Delivery]
end
subgraph "Parallel Fan-Out (concurrent)"
direction LR
CACHE_CHECK[ElastiCache Redis<br/>Cache Check<br/>~50ms]
RAG[OpenSearch Serverless<br/>RAG Retrieval<br/>~650ms]
SESSION[DynamoDB<br/>Session History<br/>~200ms]
PROFILE[DynamoDB<br/>User Profile<br/>~150ms]
end
subgraph "FM Layer"
HAIKU[Bedrock Claude 3 Haiku<br/>Simple Queries<br/>~150ms generation]
SONNET[Bedrock Claude 3 Sonnet<br/>Complex Queries<br/>~800ms generation]
end
CLIENT <-->|WebSocket| APIGW
APIGW --> ROUTER
ROUTER -->|Simple intent| HAIKU
ROUTER -->|Complex intent| PARALLEL
PARALLEL --> CACHE_CHECK & RAG & SESSION & PROFILE
PARALLEL -->|Context assembled| SONNET
HAIKU -->|Stream| STREAMER
SONNET -->|Stream| STREAMER
STREAMER -->|Chunk frames| APIGW
style CACHE_CHECK fill:#2ecc71,color:#000
style RAG fill:#ff9900,color:#000
style SESSION fill:#ff9900,color:#000
style PROFILE fill:#ff9900,color:#000
style HAIKU fill:#2ecc71,color:#000
style SONNET fill:#3498db,color:#fff
style STREAMER fill:#9b59b6,color:#fff
Summary — Five Responsive AI Techniques
| # | Technique | MangaAssist Implementation | Latency Impact | Cost Impact |
|---|---|---|---|---|
| 1 | Pre-Computation | Pre-generate trending manga responses, pre-compute new catalog embeddings, warm recommendation caches | -1.1s avg for cache hits (~30% of queries) | +$117/mo compute, -$8,100/mo avoided FM calls |
| 2 | Latency-Optimized Model Selection | Haiku for order/FAQ/greeting (<500ms), Sonnet for recommendations/comparisons (acceptable latency) | -600ms avg for simple intents routed to Haiku | -40% FM cost via Haiku routing |
| 3 | Parallel Request Processing | asyncio.gather for RAG + Session + Profile + Cache concurrently | -400ms (waterfall elimination) | Zero additional cost |
| 4 | Response Streaming | Bedrock streaming API via WebSocket, first-token <400ms, chunk batching | First token 400ms vs 2.5s without streaming | Zero additional cost |
| 5 | Performance Benchmarking | p50/p95/p99 targets per intent, automated regression detection, Mann-Whitney U test | Prevents regressions before users notice | ~$50/mo for benchmark compute |
Key Exam Takeaways
- Pre-computation trades background compute cost for request-time latency — only cost-effective for predictable, high-frequency queries
- Model selection is the single largest lever: routing 60% of queries to Haiku saves both latency AND cost
- Parallel orchestration is free latency savings — always identify independent data fetches and run them concurrently
- Streaming reduces perceived latency even when total generation time is unchanged — first token matters more than last token
- Benchmarking must be automated, per-intent, and statistically rigorous — use p95/p99, not averages, and test for significance before alerting