LOCAL PREVIEW View on GitHub

PO-08: End-to-End Latency Optimization

User Story

As a product owner, I want to guarantee end-to-end response latency of under 2 seconds at p95 across the entire chatbot pipeline, So that users experience a fast, reliable conversation that meets the product SLA regardless of traffic conditions.

Acceptance Criteria

  • End-to-end p95 latency from user message to first visible token is under 2 seconds.
  • Latency budget is allocated per service and monitored independently.
  • Degraded mode serves a response within 3 seconds even when a non-critical service fails.
  • Distributed tracing produces a full request waterfall for any request exceeding the 2s SLO.
  • Weekly latency reports show p50, p95, p99 trends and budget compliance per service.
  • SLO burn-rate alerts fire before users are impacted.

High-Level Design

End-to-End Latency Budget

gantt
    title Latency Budget Allocation (p95 targets)
    dateFormat X
    axisFormat %Lms

    section Edge
    CloudFront + ALB          :0, 50

    section Orchestrator
    Routing + Fan-Out         :50, 100

    section Phase 1 (Parallel)
    Intent Classification     :100, 135
    Conversation Memory       :100, 110
    Cache Lookup              :100, 102

    section Phase 2 (Sequential)
    RAG Retrieval             :135, 335
    Context Assembly          :335, 355

    section Phase 3 (LLM)
    Bedrock First-Token       :355, 755
    Streaming Delivery        :755, 1800

    section Client
    Client Render             :1800, 1850

Total budget: 1,850ms (150ms headroom to the 2,000ms SLO)

Pipeline Architecture

graph TD
    subgraph "Critical Path (latency-sensitive)"
        A[Edge: 50ms] --> B[Orchestrator: 50ms]
        B --> C[Phase 1: Parallel Lookups<br>35ms wall clock]
        C --> D[Phase 2: RAG + Assembly<br>220ms]
        D --> E[Phase 3: LLM First-Token<br>400ms]
        E --> F[Streaming Delivery<br>~1,000ms]
    end

    subgraph "Non-Critical (async)"
        G[Analytics Write]
        H[Conversation Save]
        I[Audit Log]
    end

    B --> G
    F --> H
    F --> I

    style A fill:#2d8,stroke:#333
    style E fill:#f96,stroke:#333

Low-Level Design

1. Distributed Tracing Pipeline

Use AWS X-Ray with custom subsegments to trace every hop in the request path.

sequenceDiagram
    participant Client
    participant Edge as CloudFront/ALB
    participant Orch as Orchestrator
    participant Intent as Intent Classifier
    participant Memory as Conv. Memory
    participant RAG as RAG Pipeline
    participant LLM as Bedrock

    Note over Client,LLM: X-Ray Trace ID propagated through all hops

    Client->>Edge: Message (X-Amzn-Trace-Id)
    Edge->>Orch: Forward

    par Phase 1 (parallel)
        Orch->>Intent: classify()  [subsegment]
        Orch->>Memory: get_history()  [subsegment]
    end

    Orch->>RAG: retrieve()  [subsegment]
    Orch->>LLM: invoke_model_with_response_stream()  [subsegment]
    LLM-->>Client: Streaming tokens

    Note over Orch: Total trace = sum of subsegments<br>Waterfall visible in X-Ray console

Code Example: Traced Request Pipeline

import asyncio
import logging
import time
from dataclasses import dataclass, field
from typing import Any, Optional

logger = logging.getLogger(__name__)


@dataclass
class SpanRecord:
    name: str
    start_ms: float
    end_ms: float = 0.0
    status: str = "ok"
    metadata: dict = field(default_factory=dict)

    @property
    def duration_ms(self) -> float:
        return self.end_ms - self.start_ms


@dataclass
class RequestTrace:
    trace_id: str
    session_id: str
    spans: list[SpanRecord] = field(default_factory=list)
    total_ms: float = 0.0
    slo_budget_ms: float = 2000.0

    @property
    def slo_compliant(self) -> bool:
        return self.total_ms <= self.slo_budget_ms

    def summary(self) -> dict:
        return {
            "trace_id": self.trace_id,
            "total_ms": round(self.total_ms, 1),
            "slo_compliant": self.slo_compliant,
            "slowest_span": max(
                self.spans, key=lambda s: s.duration_ms
            ).name if self.spans else None,
            "spans": [
                {
                    "name": s.name,
                    "duration_ms": round(s.duration_ms, 1),
                    "status": s.status,
                }
                for s in self.spans
            ],
        }


class TracedPipeline:
    """End-to-end pipeline with per-stage latency tracking."""

    def __init__(
        self,
        intent_service,
        memory_service,
        cache_service,
        rag_service,
        llm_service,
        latency_budgets: dict[str, float] | None = None,
    ):
        self.intent_service = intent_service
        self.memory_service = memory_service
        self.cache_service = cache_service
        self.rag_service = rag_service
        self.llm_service = llm_service
        self.budgets = latency_budgets or {
            "edge": 50,
            "orchestrator": 50,
            "intent": 35,
            "memory": 10,
            "cache": 2,
            "rag": 200,
            "context_assembly": 20,
            "llm_first_token": 400,
        }

    async def execute(
        self, trace_id: str, session_id: str, user_message: str
    ) -> RequestTrace:
        """Execute the full pipeline with tracing."""
        trace = RequestTrace(
            trace_id=trace_id, session_id=session_id
        )
        pipeline_start = time.monotonic()

        # --- Phase 1: Parallel lookups ---
        phase1_results = await self._run_parallel(
            trace,
            {
                "intent": self.intent_service.classify(user_message),
                "memory": self.memory_service.get_recent(
                    session_id, limit=10
                ),
                "cache": self.cache_service.get_response(user_message),
            },
        )

        intent_result = phase1_results.get("intent")
        history = phase1_results.get("memory", [])
        cache_hit = phase1_results.get("cache")

        # Short-circuit on cache hit
        if cache_hit:
            trace.total_ms = (time.monotonic() - pipeline_start) * 1000
            return trace

        # --- Phase 2: RAG retrieval ---
        rag_results = await self._run_traced(
            trace,
            "rag",
            self.rag_service.retrieve(
                user_message,
                intent=intent_result.get("intent") if intent_result else None,
            ),
        )

        # --- Phase 2b: Context assembly ---
        context = await self._run_traced(
            trace,
            "context_assembly",
            self._assemble_context(
                user_message, history, rag_results, intent_result
            ),
        )

        # --- Phase 3: LLM first-token ---
        first_token_time = await self._run_traced(
            trace,
            "llm_first_token",
            self.llm_service.invoke_streaming(context),
        )

        trace.total_ms = (time.monotonic() - pipeline_start) * 1000

        # Log budget violations
        for span in trace.spans:
            budget = self.budgets.get(span.name)
            if budget and span.duration_ms > budget:
                logger.warning(
                    f"[{trace_id}] Budget violation: {span.name} "
                    f"took {span.duration_ms:.1f}ms "
                    f"(budget: {budget}ms)"
                )

        return trace

    async def _run_traced(
        self, trace: RequestTrace, name: str, coro
    ) -> Any:
        """Execute a coroutine and record a span."""
        start = time.monotonic()
        span = SpanRecord(name=name, start_ms=start * 1000)
        try:
            result = await coro
            span.status = "ok"
            return result
        except Exception as e:
            span.status = "error"
            span.metadata["error"] = str(e)
            raise
        finally:
            span.end_ms = time.monotonic() * 1000
            trace.spans.append(span)

    async def _run_parallel(
        self, trace: RequestTrace, tasks: dict[str, Any]
    ) -> dict[str, Any]:
        """Run multiple coroutines in parallel with tracing."""
        results = {}

        async def traced_task(name, coro):
            result = await self._run_traced(trace, name, coro)
            results[name] = result

        await asyncio.gather(
            *(traced_task(n, c) for n, c in tasks.items()),
            return_exceptions=True,
        )
        return results

    async def _assemble_context(
        self,
        user_message: str,
        history: list,
        rag_results: list | None,
        intent: dict | None,
    ) -> dict:
        """Build the LLM context from assembled components."""
        context_parts = []

        if history:
            formatted_history = "\n".join(
                f"{msg['role']}: {msg['content']}"
                for msg in history[-5:]
            )
            context_parts.append(f"<history>\n{formatted_history}\n</history>")

        if rag_results:
            formatted_docs = "\n---\n".join(
                doc.get("content", "") for doc in rag_results[:3]
            )
            context_parts.append(
                f"<knowledge>\n{formatted_docs}\n</knowledge>"
            )

        return {
            "system_prompt": self._get_system_prompt(intent),
            "context": "\n\n".join(context_parts),
            "user_message": user_message,
        }

    def _get_system_prompt(self, intent: dict | None) -> str:
        intent_name = intent.get("intent", "general") if intent else "general"
        return (
            f"You are MangaAssist, an AI shopping assistant for manga and anime products. "
            f"Current intent: {intent_name}. Be helpful and concise."
        )

2. SLO Burn-Rate Alert System

Use multi-window burn-rate alerts to detect SLO violations before they impact the error budget.

graph TD
    subgraph "SLO: p95 < 2s"
        A[Monthly Budget: 0.1% bad<br>= ~4,320 bad requests/month]
    end

    subgraph "Fast Burn (1h window)"
        B[Burn rate > 14.4x] --> C[Page on-call<br>Budget depleted in 5 days]
    end

    subgraph "Slow Burn (6h window)"
        D[Burn rate > 6x] --> E[Ticket alert<br>Budget depleted in 10 days]
    end

    subgraph "Trend (24h window)"
        F[Burn rate > 3x] --> G[Dashboard warning<br>Investigate next sprint]
    end

Code Example: SLO Monitor

import time
from dataclasses import dataclass, field
from collections import deque


@dataclass
class LatencySample:
    timestamp: float
    duration_ms: float
    compliant: bool  # True if within SLO


class SLOMonitor:
    """Monitors SLO compliance with multi-window burn-rate alerts."""

    def __init__(
        self,
        slo_target_ms: float = 2000.0,
        slo_percentile: float = 0.95,
        monthly_error_budget_pct: float = 0.1,
    ):
        self.slo_target_ms = slo_target_ms
        self.slo_percentile = slo_percentile
        self.monthly_error_budget_pct = monthly_error_budget_pct

        # Sliding windows
        self._samples_1h: deque[LatencySample] = deque()
        self._samples_6h: deque[LatencySample] = deque()
        self._samples_24h: deque[LatencySample] = deque()

        # Alert thresholds (burn rate multipliers)
        self.fast_burn_threshold = 14.4  # 1h window
        self.slow_burn_threshold = 6.0   # 6h window
        self.trend_threshold = 3.0       # 24h window

    def record(self, duration_ms: float) -> dict | None:
        """Record a latency sample and check for alerts."""
        now = time.time()
        compliant = duration_ms <= self.slo_target_ms
        sample = LatencySample(
            timestamp=now,
            duration_ms=duration_ms,
            compliant=compliant,
        )

        # Add to all windows
        self._samples_1h.append(sample)
        self._samples_6h.append(sample)
        self._samples_24h.append(sample)

        # Trim windows
        self._trim_window(self._samples_1h, now, 3600)
        self._trim_window(self._samples_6h, now, 21600)
        self._trim_window(self._samples_24h, now, 86400)

        # Check burn rates
        return self._check_alerts()

    def _trim_window(
        self, window: deque, now: float, max_age_s: float
    ) -> None:
        cutoff = now - max_age_s
        while window and window[0].timestamp < cutoff:
            window.popleft()

    def _error_rate(self, window: deque) -> float:
        if len(window) == 0:
            return 0.0
        violations = sum(1 for s in window if not s.compliant)
        return violations / len(window)

    def _burn_rate(self, window: deque) -> float:
        error_rate = self._error_rate(window)
        budget = self.monthly_error_budget_pct / 100
        if budget == 0:
            return 0.0
        return error_rate / budget

    def _check_alerts(self) -> dict | None:
        burn_1h = self._burn_rate(self._samples_1h)
        burn_6h = self._burn_rate(self._samples_6h)
        burn_24h = self._burn_rate(self._samples_24h)

        if burn_1h > self.fast_burn_threshold:
            return {
                "severity": "critical",
                "alert": "fast_burn",
                "burn_rate": round(burn_1h, 2),
                "window": "1h",
                "message": (
                    f"SLO burn rate {burn_1h:.1f}x in 1h window. "
                    f"Budget will be depleted in ~5 days."
                ),
            }

        if burn_6h > self.slow_burn_threshold:
            return {
                "severity": "warning",
                "alert": "slow_burn",
                "burn_rate": round(burn_6h, 2),
                "window": "6h",
                "message": (
                    f"SLO burn rate {burn_6h:.1f}x in 6h window. "
                    f"Budget will be depleted in ~10 days."
                ),
            }

        if burn_24h > self.trend_threshold:
            return {
                "severity": "info",
                "alert": "trend",
                "burn_rate": round(burn_24h, 2),
                "window": "24h",
                "message": (
                    f"SLO burn rate {burn_24h:.1f}x in 24h window. "
                    f"Investigate in next sprint."
                ),
            }

        return None

    def get_status(self) -> dict:
        """Current SLO compliance status."""
        return {
            "slo_target_ms": self.slo_target_ms,
            "windows": {
                "1h": {
                    "samples": len(self._samples_1h),
                    "error_rate_pct": round(
                        self._error_rate(self._samples_1h) * 100, 3
                    ),
                    "burn_rate": round(
                        self._burn_rate(self._samples_1h), 2
                    ),
                },
                "6h": {
                    "samples": len(self._samples_6h),
                    "error_rate_pct": round(
                        self._error_rate(self._samples_6h) * 100, 3
                    ),
                    "burn_rate": round(
                        self._burn_rate(self._samples_6h), 2
                    ),
                },
                "24h": {
                    "samples": len(self._samples_24h),
                    "error_rate_pct": round(
                        self._error_rate(self._samples_24h) * 100, 3
                    ),
                    "burn_rate": round(
                        self._burn_rate(self._samples_24h), 2
                    ),
                },
            },
        }

3. Degraded Mode Handler

When a non-critical service fails, serve a reduced-quality response rather than timing out.

flowchart TD
    A[Request Arrives] --> B{Cache Hit?}
    B -->|Yes| C[Return cached response<br>~2ms]
    B -->|No| D{Intent Classifier<br>Available?}

    D -->|Yes| E[Full Pipeline]
    D -->|No| F[Fallback: keyword-based<br>intent detection]

    E --> G{RAG Available?}
    G -->|Yes| H[Full context + RAG]
    G -->|No| I[LLM with history only<br>No knowledge augmentation]

    H --> J{LLM Available?}
    I --> J
    F --> J

    J -->|Yes| K[Generate response]
    J -->|No| L[Return canned response<br>"I'm having trouble, try again"]

    K --> M[Return to user]
    L --> M

    style C fill:#2d8,stroke:#333
    style F fill:#fa0,stroke:#333
    style I fill:#fa0,stroke:#333
    style L fill:#f66,stroke:#333

Code Example: Degraded Mode Handler

import logging
from dataclasses import dataclass
from enum import Enum
from typing import Optional

logger = logging.getLogger(__name__)


class DegradationLevel(Enum):
    FULL = "full"                  # All services healthy
    NO_RAG = "no_rag"              # RAG unavailable
    NO_INTENT = "no_intent"        # Intent classifier unavailable
    NO_RAG_NO_INTENT = "minimal"   # Only LLM + history
    CANNED = "canned"              # LLM unavailable


@dataclass
class ServiceHealth:
    intent_available: bool = True
    rag_available: bool = True
    llm_available: bool = True
    cache_available: bool = True
    memory_available: bool = True

    @property
    def degradation_level(self) -> DegradationLevel:
        if not self.llm_available:
            return DegradationLevel.CANNED
        if not self.intent_available and not self.rag_available:
            return DegradationLevel.NO_RAG_NO_INTENT
        if not self.rag_available:
            return DegradationLevel.NO_RAG
        if not self.intent_available:
            return DegradationLevel.NO_INTENT
        return DegradationLevel.FULL


CANNED_RESPONSES = {
    "default": (
        "I'm sorry, I'm having trouble processing your request right now. "
        "Please try again in a moment, or browse our manga catalog directly."
    ),
    "order_status": (
        "I'm unable to check order details right now. "
        "Please visit your account page for order status, "
        "or try again in a moment."
    ),
    "product_search": (
        "I'm having trouble searching right now. "
        "You can browse our catalog at the top of the page."
    ),
}


class DegradedModeHandler:
    """Handles degraded responses when services are unavailable."""

    def __init__(
        self,
        intent_service,
        memory_service,
        rag_service,
        llm_service,
        circuit_breakers: dict,
    ):
        self.intent_service = intent_service
        self.memory_service = memory_service
        self.rag_service = rag_service
        self.llm_service = llm_service
        self.circuit_breakers = circuit_breakers

    def assess_health(self) -> ServiceHealth:
        """Check circuit breaker states to determine service health."""
        return ServiceHealth(
            intent_available=(
                self.circuit_breakers.get("intent")
                and self.circuit_breakers["intent"].state.value != "open"
            ),
            rag_available=(
                self.circuit_breakers.get("rag")
                and self.circuit_breakers["rag"].state.value != "open"
            ),
            llm_available=(
                self.circuit_breakers.get("llm")
                and self.circuit_breakers["llm"].state.value != "open"
            ),
        )

    async def handle(
        self,
        session_id: str,
        user_message: str,
        health: ServiceHealth,
    ) -> dict:
        """Route to the appropriate degraded pipeline."""
        level = health.degradation_level
        logger.info(
            f"Degradation level: {level.value} for session {session_id}"
        )

        if level == DegradationLevel.CANNED:
            return self._canned_response(user_message)

        # Get conversation history (best effort)
        history = []
        if health.memory_available:
            try:
                history = await self.memory_service.get_recent(
                    session_id, limit=5
                )
            except Exception:
                pass

        # Get intent (if available)
        intent = None
        if health.intent_available:
            intent = await self.intent_service.classify(user_message)
        else:
            intent = self._keyword_intent_fallback(user_message)

        # Get RAG context (if available)
        rag_context = None
        if health.rag_available and intent:
            rag_context = await self.rag_service.retrieve(
                user_message,
                intent=intent.get("intent"),
            )

        # Build degraded context for LLM
        context = self._build_degraded_context(
            user_message, history, rag_context, intent, level
        )

        response = await self.llm_service.invoke(context)
        return {
            "response": response,
            "degradation_level": level.value,
            "services_unavailable": [
                s for s, available in {
                    "intent": health.intent_available,
                    "rag": health.rag_available,
                }.items() if not available
            ],
        }

    def _keyword_intent_fallback(self, message: str) -> dict:
        """Simple keyword-based intent detection when ML classifier is down."""
        message_lower = message.lower()
        keyword_map = {
            "order": "order_status",
            "tracking": "order_status",
            "shipped": "order_status",
            "recommend": "product_recommendation",
            "suggest": "product_recommendation",
            "similar": "product_recommendation",
            "price": "product_search",
            "how much": "product_search",
            "search": "product_search",
            "return": "return_policy",
            "refund": "return_policy",
        }
        for keyword, intent_name in keyword_map.items():
            if keyword in message_lower:
                return {
                    "intent": intent_name,
                    "confidence": 0.5,
                    "source": "keyword_fallback",
                }
        return {
            "intent": "general",
            "confidence": 0.3,
            "source": "keyword_fallback",
        }

    def _build_degraded_context(
        self,
        user_message: str,
        history: list,
        rag_context: list | None,
        intent: dict | None,
        level: DegradationLevel,
    ) -> dict:
        system_parts = [
            "You are MangaAssist, an AI shopping assistant.",
        ]
        if level != DegradationLevel.FULL:
            system_parts.append(
                "Note: Some knowledge sources are temporarily unavailable. "
                "Answer based on available context only. "
                "If unsure, suggest the user browse the catalog directly."
            )

        context_parts = []
        if history:
            context_parts.append(
                "\n".join(
                    f"{m['role']}: {m['content']}" for m in history[-3:]
                )
            )
        if rag_context:
            context_parts.append(
                "\n---\n".join(
                    doc.get("content", "") for doc in rag_context[:2]
                )
            )

        return {
            "system_prompt": " ".join(system_parts),
            "context": "\n\n".join(context_parts),
            "user_message": user_message,
        }

    def _canned_response(self, user_message: str) -> dict:
        """Return a static response when LLM is unavailable."""
        fallback_intent = self._keyword_intent_fallback(user_message)
        intent_name = fallback_intent.get("intent", "default")
        response_text = CANNED_RESPONSES.get(
            intent_name, CANNED_RESPONSES["default"]
        )
        return {
            "response": response_text,
            "degradation_level": DegradationLevel.CANNED.value,
            "services_unavailable": ["llm"],
        }

Metrics and Monitoring

Metric Target Alarm Threshold
e2e.latency_ms (p95) < 2,000ms > 2,000ms
e2e.latency_ms (p50) < 1,200ms > 1,500ms
e2e.first_token_ms (p95) < 800ms > 1,000ms
slo.burn_rate_1h < 1.0 > 14.4 (page)
slo.burn_rate_6h < 1.0 > 6.0 (warn)
slo.error_budget_remaining_pct > 50% < 25%
degraded.invocations 0 in steady state > 10/min
degraded.canned_responses 0 in steady state > 5/min
trace.budget_violations Monitor per span Any span > 2× budget

Latency Dashboard Layout

graph TD
    subgraph "Row 1: Overall SLO"
        A[SLO Compliance %<br>Last 30 days]
        B[Error Budget<br>Remaining %]
        C[Burn Rate<br>1h / 6h / 24h]
    end

    subgraph "Row 2: Per-Service Latency"
        D[Intent p95]
        E[RAG p95]
        F[LLM First-Token p95]
        G[WebSocket Delivery p95]
    end

    subgraph "Row 3: Budget Allocation"
        H[Latency Waterfall<br>p95 by service]
        I[Budget Violations<br>per service / hour]
    end

    subgraph "Row 4: Degradation"
        J[Degraded Responses<br>by level]
        K[Circuit Breaker<br>State Timeline]
    end

SLO Summary

pie title Latency Budget Distribution (p95 = 2,000ms)
    "Edge (CloudFront + ALB)" : 50
    "Orchestrator" : 50
    "Intent Classifier" : 35
    "Conv. Memory" : 10
    "Cache Lookup" : 2
    "RAG Retrieval" : 200
    "Context Assembly" : 20
    "LLM First-Token" : 400
    "Streaming Delivery" : 1050
    "Headroom" : 183