PO-08: End-to-End Latency Optimization
User Story
As a product owner, I want to guarantee end-to-end response latency of under 2 seconds at p95 across the entire chatbot pipeline, So that users experience a fast, reliable conversation that meets the product SLA regardless of traffic conditions.
Acceptance Criteria
- End-to-end p95 latency from user message to first visible token is under 2 seconds.
- Latency budget is allocated per service and monitored independently.
- Degraded mode serves a response within 3 seconds even when a non-critical service fails.
- Distributed tracing produces a full request waterfall for any request exceeding the 2s SLO.
- Weekly latency reports show p50, p95, p99 trends and budget compliance per service.
- SLO burn-rate alerts fire before users are impacted.
High-Level Design
End-to-End Latency Budget
gantt
title Latency Budget Allocation (p95 targets)
dateFormat X
axisFormat %Lms
section Edge
CloudFront + ALB :0, 50
section Orchestrator
Routing + Fan-Out :50, 100
section Phase 1 (Parallel)
Intent Classification :100, 135
Conversation Memory :100, 110
Cache Lookup :100, 102
section Phase 2 (Sequential)
RAG Retrieval :135, 335
Context Assembly :335, 355
section Phase 3 (LLM)
Bedrock First-Token :355, 755
Streaming Delivery :755, 1800
section Client
Client Render :1800, 1850
Total budget: 1,850ms (150ms headroom to the 2,000ms SLO)
Pipeline Architecture
graph TD
subgraph "Critical Path (latency-sensitive)"
A[Edge: 50ms] --> B[Orchestrator: 50ms]
B --> C[Phase 1: Parallel Lookups<br>35ms wall clock]
C --> D[Phase 2: RAG + Assembly<br>220ms]
D --> E[Phase 3: LLM First-Token<br>400ms]
E --> F[Streaming Delivery<br>~1,000ms]
end
subgraph "Non-Critical (async)"
G[Analytics Write]
H[Conversation Save]
I[Audit Log]
end
B --> G
F --> H
F --> I
style A fill:#2d8,stroke:#333
style E fill:#f96,stroke:#333
Low-Level Design
1. Distributed Tracing Pipeline
Use AWS X-Ray with custom subsegments to trace every hop in the request path.
sequenceDiagram
participant Client
participant Edge as CloudFront/ALB
participant Orch as Orchestrator
participant Intent as Intent Classifier
participant Memory as Conv. Memory
participant RAG as RAG Pipeline
participant LLM as Bedrock
Note over Client,LLM: X-Ray Trace ID propagated through all hops
Client->>Edge: Message (X-Amzn-Trace-Id)
Edge->>Orch: Forward
par Phase 1 (parallel)
Orch->>Intent: classify() [subsegment]
Orch->>Memory: get_history() [subsegment]
end
Orch->>RAG: retrieve() [subsegment]
Orch->>LLM: invoke_model_with_response_stream() [subsegment]
LLM-->>Client: Streaming tokens
Note over Orch: Total trace = sum of subsegments<br>Waterfall visible in X-Ray console
Code Example: Traced Request Pipeline
import asyncio
import logging
import time
from dataclasses import dataclass, field
from typing import Any, Optional
logger = logging.getLogger(__name__)
@dataclass
class SpanRecord:
name: str
start_ms: float
end_ms: float = 0.0
status: str = "ok"
metadata: dict = field(default_factory=dict)
@property
def duration_ms(self) -> float:
return self.end_ms - self.start_ms
@dataclass
class RequestTrace:
trace_id: str
session_id: str
spans: list[SpanRecord] = field(default_factory=list)
total_ms: float = 0.0
slo_budget_ms: float = 2000.0
@property
def slo_compliant(self) -> bool:
return self.total_ms <= self.slo_budget_ms
def summary(self) -> dict:
return {
"trace_id": self.trace_id,
"total_ms": round(self.total_ms, 1),
"slo_compliant": self.slo_compliant,
"slowest_span": max(
self.spans, key=lambda s: s.duration_ms
).name if self.spans else None,
"spans": [
{
"name": s.name,
"duration_ms": round(s.duration_ms, 1),
"status": s.status,
}
for s in self.spans
],
}
class TracedPipeline:
"""End-to-end pipeline with per-stage latency tracking."""
def __init__(
self,
intent_service,
memory_service,
cache_service,
rag_service,
llm_service,
latency_budgets: dict[str, float] | None = None,
):
self.intent_service = intent_service
self.memory_service = memory_service
self.cache_service = cache_service
self.rag_service = rag_service
self.llm_service = llm_service
self.budgets = latency_budgets or {
"edge": 50,
"orchestrator": 50,
"intent": 35,
"memory": 10,
"cache": 2,
"rag": 200,
"context_assembly": 20,
"llm_first_token": 400,
}
async def execute(
self, trace_id: str, session_id: str, user_message: str
) -> RequestTrace:
"""Execute the full pipeline with tracing."""
trace = RequestTrace(
trace_id=trace_id, session_id=session_id
)
pipeline_start = time.monotonic()
# --- Phase 1: Parallel lookups ---
phase1_results = await self._run_parallel(
trace,
{
"intent": self.intent_service.classify(user_message),
"memory": self.memory_service.get_recent(
session_id, limit=10
),
"cache": self.cache_service.get_response(user_message),
},
)
intent_result = phase1_results.get("intent")
history = phase1_results.get("memory", [])
cache_hit = phase1_results.get("cache")
# Short-circuit on cache hit
if cache_hit:
trace.total_ms = (time.monotonic() - pipeline_start) * 1000
return trace
# --- Phase 2: RAG retrieval ---
rag_results = await self._run_traced(
trace,
"rag",
self.rag_service.retrieve(
user_message,
intent=intent_result.get("intent") if intent_result else None,
),
)
# --- Phase 2b: Context assembly ---
context = await self._run_traced(
trace,
"context_assembly",
self._assemble_context(
user_message, history, rag_results, intent_result
),
)
# --- Phase 3: LLM first-token ---
first_token_time = await self._run_traced(
trace,
"llm_first_token",
self.llm_service.invoke_streaming(context),
)
trace.total_ms = (time.monotonic() - pipeline_start) * 1000
# Log budget violations
for span in trace.spans:
budget = self.budgets.get(span.name)
if budget and span.duration_ms > budget:
logger.warning(
f"[{trace_id}] Budget violation: {span.name} "
f"took {span.duration_ms:.1f}ms "
f"(budget: {budget}ms)"
)
return trace
async def _run_traced(
self, trace: RequestTrace, name: str, coro
) -> Any:
"""Execute a coroutine and record a span."""
start = time.monotonic()
span = SpanRecord(name=name, start_ms=start * 1000)
try:
result = await coro
span.status = "ok"
return result
except Exception as e:
span.status = "error"
span.metadata["error"] = str(e)
raise
finally:
span.end_ms = time.monotonic() * 1000
trace.spans.append(span)
async def _run_parallel(
self, trace: RequestTrace, tasks: dict[str, Any]
) -> dict[str, Any]:
"""Run multiple coroutines in parallel with tracing."""
results = {}
async def traced_task(name, coro):
result = await self._run_traced(trace, name, coro)
results[name] = result
await asyncio.gather(
*(traced_task(n, c) for n, c in tasks.items()),
return_exceptions=True,
)
return results
async def _assemble_context(
self,
user_message: str,
history: list,
rag_results: list | None,
intent: dict | None,
) -> dict:
"""Build the LLM context from assembled components."""
context_parts = []
if history:
formatted_history = "\n".join(
f"{msg['role']}: {msg['content']}"
for msg in history[-5:]
)
context_parts.append(f"<history>\n{formatted_history}\n</history>")
if rag_results:
formatted_docs = "\n---\n".join(
doc.get("content", "") for doc in rag_results[:3]
)
context_parts.append(
f"<knowledge>\n{formatted_docs}\n</knowledge>"
)
return {
"system_prompt": self._get_system_prompt(intent),
"context": "\n\n".join(context_parts),
"user_message": user_message,
}
def _get_system_prompt(self, intent: dict | None) -> str:
intent_name = intent.get("intent", "general") if intent else "general"
return (
f"You are MangaAssist, an AI shopping assistant for manga and anime products. "
f"Current intent: {intent_name}. Be helpful and concise."
)
2. SLO Burn-Rate Alert System
Use multi-window burn-rate alerts to detect SLO violations before they impact the error budget.
graph TD
subgraph "SLO: p95 < 2s"
A[Monthly Budget: 0.1% bad<br>= ~4,320 bad requests/month]
end
subgraph "Fast Burn (1h window)"
B[Burn rate > 14.4x] --> C[Page on-call<br>Budget depleted in 5 days]
end
subgraph "Slow Burn (6h window)"
D[Burn rate > 6x] --> E[Ticket alert<br>Budget depleted in 10 days]
end
subgraph "Trend (24h window)"
F[Burn rate > 3x] --> G[Dashboard warning<br>Investigate next sprint]
end
Code Example: SLO Monitor
import time
from dataclasses import dataclass, field
from collections import deque
@dataclass
class LatencySample:
timestamp: float
duration_ms: float
compliant: bool # True if within SLO
class SLOMonitor:
"""Monitors SLO compliance with multi-window burn-rate alerts."""
def __init__(
self,
slo_target_ms: float = 2000.0,
slo_percentile: float = 0.95,
monthly_error_budget_pct: float = 0.1,
):
self.slo_target_ms = slo_target_ms
self.slo_percentile = slo_percentile
self.monthly_error_budget_pct = monthly_error_budget_pct
# Sliding windows
self._samples_1h: deque[LatencySample] = deque()
self._samples_6h: deque[LatencySample] = deque()
self._samples_24h: deque[LatencySample] = deque()
# Alert thresholds (burn rate multipliers)
self.fast_burn_threshold = 14.4 # 1h window
self.slow_burn_threshold = 6.0 # 6h window
self.trend_threshold = 3.0 # 24h window
def record(self, duration_ms: float) -> dict | None:
"""Record a latency sample and check for alerts."""
now = time.time()
compliant = duration_ms <= self.slo_target_ms
sample = LatencySample(
timestamp=now,
duration_ms=duration_ms,
compliant=compliant,
)
# Add to all windows
self._samples_1h.append(sample)
self._samples_6h.append(sample)
self._samples_24h.append(sample)
# Trim windows
self._trim_window(self._samples_1h, now, 3600)
self._trim_window(self._samples_6h, now, 21600)
self._trim_window(self._samples_24h, now, 86400)
# Check burn rates
return self._check_alerts()
def _trim_window(
self, window: deque, now: float, max_age_s: float
) -> None:
cutoff = now - max_age_s
while window and window[0].timestamp < cutoff:
window.popleft()
def _error_rate(self, window: deque) -> float:
if len(window) == 0:
return 0.0
violations = sum(1 for s in window if not s.compliant)
return violations / len(window)
def _burn_rate(self, window: deque) -> float:
error_rate = self._error_rate(window)
budget = self.monthly_error_budget_pct / 100
if budget == 0:
return 0.0
return error_rate / budget
def _check_alerts(self) -> dict | None:
burn_1h = self._burn_rate(self._samples_1h)
burn_6h = self._burn_rate(self._samples_6h)
burn_24h = self._burn_rate(self._samples_24h)
if burn_1h > self.fast_burn_threshold:
return {
"severity": "critical",
"alert": "fast_burn",
"burn_rate": round(burn_1h, 2),
"window": "1h",
"message": (
f"SLO burn rate {burn_1h:.1f}x in 1h window. "
f"Budget will be depleted in ~5 days."
),
}
if burn_6h > self.slow_burn_threshold:
return {
"severity": "warning",
"alert": "slow_burn",
"burn_rate": round(burn_6h, 2),
"window": "6h",
"message": (
f"SLO burn rate {burn_6h:.1f}x in 6h window. "
f"Budget will be depleted in ~10 days."
),
}
if burn_24h > self.trend_threshold:
return {
"severity": "info",
"alert": "trend",
"burn_rate": round(burn_24h, 2),
"window": "24h",
"message": (
f"SLO burn rate {burn_24h:.1f}x in 24h window. "
f"Investigate in next sprint."
),
}
return None
def get_status(self) -> dict:
"""Current SLO compliance status."""
return {
"slo_target_ms": self.slo_target_ms,
"windows": {
"1h": {
"samples": len(self._samples_1h),
"error_rate_pct": round(
self._error_rate(self._samples_1h) * 100, 3
),
"burn_rate": round(
self._burn_rate(self._samples_1h), 2
),
},
"6h": {
"samples": len(self._samples_6h),
"error_rate_pct": round(
self._error_rate(self._samples_6h) * 100, 3
),
"burn_rate": round(
self._burn_rate(self._samples_6h), 2
),
},
"24h": {
"samples": len(self._samples_24h),
"error_rate_pct": round(
self._error_rate(self._samples_24h) * 100, 3
),
"burn_rate": round(
self._burn_rate(self._samples_24h), 2
),
},
},
}
3. Degraded Mode Handler
When a non-critical service fails, serve a reduced-quality response rather than timing out.
flowchart TD
A[Request Arrives] --> B{Cache Hit?}
B -->|Yes| C[Return cached response<br>~2ms]
B -->|No| D{Intent Classifier<br>Available?}
D -->|Yes| E[Full Pipeline]
D -->|No| F[Fallback: keyword-based<br>intent detection]
E --> G{RAG Available?}
G -->|Yes| H[Full context + RAG]
G -->|No| I[LLM with history only<br>No knowledge augmentation]
H --> J{LLM Available?}
I --> J
F --> J
J -->|Yes| K[Generate response]
J -->|No| L[Return canned response<br>"I'm having trouble, try again"]
K --> M[Return to user]
L --> M
style C fill:#2d8,stroke:#333
style F fill:#fa0,stroke:#333
style I fill:#fa0,stroke:#333
style L fill:#f66,stroke:#333
Code Example: Degraded Mode Handler
import logging
from dataclasses import dataclass
from enum import Enum
from typing import Optional
logger = logging.getLogger(__name__)
class DegradationLevel(Enum):
FULL = "full" # All services healthy
NO_RAG = "no_rag" # RAG unavailable
NO_INTENT = "no_intent" # Intent classifier unavailable
NO_RAG_NO_INTENT = "minimal" # Only LLM + history
CANNED = "canned" # LLM unavailable
@dataclass
class ServiceHealth:
intent_available: bool = True
rag_available: bool = True
llm_available: bool = True
cache_available: bool = True
memory_available: bool = True
@property
def degradation_level(self) -> DegradationLevel:
if not self.llm_available:
return DegradationLevel.CANNED
if not self.intent_available and not self.rag_available:
return DegradationLevel.NO_RAG_NO_INTENT
if not self.rag_available:
return DegradationLevel.NO_RAG
if not self.intent_available:
return DegradationLevel.NO_INTENT
return DegradationLevel.FULL
CANNED_RESPONSES = {
"default": (
"I'm sorry, I'm having trouble processing your request right now. "
"Please try again in a moment, or browse our manga catalog directly."
),
"order_status": (
"I'm unable to check order details right now. "
"Please visit your account page for order status, "
"or try again in a moment."
),
"product_search": (
"I'm having trouble searching right now. "
"You can browse our catalog at the top of the page."
),
}
class DegradedModeHandler:
"""Handles degraded responses when services are unavailable."""
def __init__(
self,
intent_service,
memory_service,
rag_service,
llm_service,
circuit_breakers: dict,
):
self.intent_service = intent_service
self.memory_service = memory_service
self.rag_service = rag_service
self.llm_service = llm_service
self.circuit_breakers = circuit_breakers
def assess_health(self) -> ServiceHealth:
"""Check circuit breaker states to determine service health."""
return ServiceHealth(
intent_available=(
self.circuit_breakers.get("intent")
and self.circuit_breakers["intent"].state.value != "open"
),
rag_available=(
self.circuit_breakers.get("rag")
and self.circuit_breakers["rag"].state.value != "open"
),
llm_available=(
self.circuit_breakers.get("llm")
and self.circuit_breakers["llm"].state.value != "open"
),
)
async def handle(
self,
session_id: str,
user_message: str,
health: ServiceHealth,
) -> dict:
"""Route to the appropriate degraded pipeline."""
level = health.degradation_level
logger.info(
f"Degradation level: {level.value} for session {session_id}"
)
if level == DegradationLevel.CANNED:
return self._canned_response(user_message)
# Get conversation history (best effort)
history = []
if health.memory_available:
try:
history = await self.memory_service.get_recent(
session_id, limit=5
)
except Exception:
pass
# Get intent (if available)
intent = None
if health.intent_available:
intent = await self.intent_service.classify(user_message)
else:
intent = self._keyword_intent_fallback(user_message)
# Get RAG context (if available)
rag_context = None
if health.rag_available and intent:
rag_context = await self.rag_service.retrieve(
user_message,
intent=intent.get("intent"),
)
# Build degraded context for LLM
context = self._build_degraded_context(
user_message, history, rag_context, intent, level
)
response = await self.llm_service.invoke(context)
return {
"response": response,
"degradation_level": level.value,
"services_unavailable": [
s for s, available in {
"intent": health.intent_available,
"rag": health.rag_available,
}.items() if not available
],
}
def _keyword_intent_fallback(self, message: str) -> dict:
"""Simple keyword-based intent detection when ML classifier is down."""
message_lower = message.lower()
keyword_map = {
"order": "order_status",
"tracking": "order_status",
"shipped": "order_status",
"recommend": "product_recommendation",
"suggest": "product_recommendation",
"similar": "product_recommendation",
"price": "product_search",
"how much": "product_search",
"search": "product_search",
"return": "return_policy",
"refund": "return_policy",
}
for keyword, intent_name in keyword_map.items():
if keyword in message_lower:
return {
"intent": intent_name,
"confidence": 0.5,
"source": "keyword_fallback",
}
return {
"intent": "general",
"confidence": 0.3,
"source": "keyword_fallback",
}
def _build_degraded_context(
self,
user_message: str,
history: list,
rag_context: list | None,
intent: dict | None,
level: DegradationLevel,
) -> dict:
system_parts = [
"You are MangaAssist, an AI shopping assistant.",
]
if level != DegradationLevel.FULL:
system_parts.append(
"Note: Some knowledge sources are temporarily unavailable. "
"Answer based on available context only. "
"If unsure, suggest the user browse the catalog directly."
)
context_parts = []
if history:
context_parts.append(
"\n".join(
f"{m['role']}: {m['content']}" for m in history[-3:]
)
)
if rag_context:
context_parts.append(
"\n---\n".join(
doc.get("content", "") for doc in rag_context[:2]
)
)
return {
"system_prompt": " ".join(system_parts),
"context": "\n\n".join(context_parts),
"user_message": user_message,
}
def _canned_response(self, user_message: str) -> dict:
"""Return a static response when LLM is unavailable."""
fallback_intent = self._keyword_intent_fallback(user_message)
intent_name = fallback_intent.get("intent", "default")
response_text = CANNED_RESPONSES.get(
intent_name, CANNED_RESPONSES["default"]
)
return {
"response": response_text,
"degradation_level": DegradationLevel.CANNED.value,
"services_unavailable": ["llm"],
}
Metrics and Monitoring
| Metric | Target | Alarm Threshold |
|---|---|---|
e2e.latency_ms (p95) |
< 2,000ms | > 2,000ms |
e2e.latency_ms (p50) |
< 1,200ms | > 1,500ms |
e2e.first_token_ms (p95) |
< 800ms | > 1,000ms |
slo.burn_rate_1h |
< 1.0 | > 14.4 (page) |
slo.burn_rate_6h |
< 1.0 | > 6.0 (warn) |
slo.error_budget_remaining_pct |
> 50% | < 25% |
degraded.invocations |
0 in steady state | > 10/min |
degraded.canned_responses |
0 in steady state | > 5/min |
trace.budget_violations |
Monitor per span | Any span > 2× budget |
Latency Dashboard Layout
graph TD
subgraph "Row 1: Overall SLO"
A[SLO Compliance %<br>Last 30 days]
B[Error Budget<br>Remaining %]
C[Burn Rate<br>1h / 6h / 24h]
end
subgraph "Row 2: Per-Service Latency"
D[Intent p95]
E[RAG p95]
F[LLM First-Token p95]
G[WebSocket Delivery p95]
end
subgraph "Row 3: Budget Allocation"
H[Latency Waterfall<br>p95 by service]
I[Budget Violations<br>per service / hour]
end
subgraph "Row 4: Degradation"
J[Degraded Responses<br>by level]
K[Circuit Breaker<br>State Timeline]
end
SLO Summary
pie title Latency Budget Distribution (p95 = 2,000ms)
"Edge (CloudFront + ALB)" : 50
"Orchestrator" : 50
"Intent Classifier" : 35
"Conv. Memory" : 10
"Cache Lookup" : 2
"RAG Retrieval" : 200
"Context Assembly" : 20
"LLM First-Token" : 400
"Streaming Delivery" : 1050
"Headroom" : 183