LOCAL PREVIEW View on GitHub

PO-01: LLM Response Latency Optimization

User Story

As a product manager for MangaAssist, I want to minimize the time between a user sending a message and seeing the first token of the LLM response, So that the chatbot feels responsive and conversational, keeping users engaged rather than abandoning the chat.

Acceptance Criteria

  • First-token latency (time to first byte from Bedrock) is under 400ms at p95.
  • Full response streaming completes within 1.5 seconds for standard queries (< 500 output tokens).
  • Prompt token count stays under 2,000 tokens for 80% of requests through compression.
  • Warm-pool provisioned throughput maintains latency under load spikes.
  • Speculative pre-generation reduces perceived latency for predictable follow-ups by 30%.

High-Level Design

The LLM Latency Problem

LLM inference is the single largest contributor to response latency. A typical Bedrock Claude 3.5 Sonnet call involves:

Phase Typical Latency What Happens
Network to Bedrock 5-15ms HTTPS request from VPC to Bedrock endpoint
Input tokenization 10-30ms Convert prompt text to tokens
Prefill (prompt processing) 100-400ms Process all input tokens in parallel
First token decode 20-50ms Generate first output token
Streaming decode 30-50ms/token Generate subsequent tokens autoregressively
Total first token ~150-500ms Dominated by prefill time

Key insight: Prefill time scales linearly with input token count. Reducing prompt size directly reduces first-token latency.

Optimization Strategy Overview

graph TD
    A[User Message] --> B[Prompt Compression]
    B --> C[Streaming Response]

    subgraph "Before LLM Call"
        B --> B1[Context Window<br>Optimization]
        B --> B2[Dynamic Prompt<br>Assembly]
        B --> B3[Conversation<br>Summarization]
    end

    subgraph "During LLM Call"
        C --> C1[Bedrock Streaming<br>API]
        C --> C2[Provisioned<br>Throughput]
        C --> C3[Token-Level<br>Streaming to Client]
    end

    subgraph "Ahead of LLM Call"
        D[Speculative<br>Pre-generation] --> D1[Predict Next<br>Intent]
        D1 --> D2[Pre-fetch<br>Context]
        D2 --> D3[Pre-warm<br>Prompt]
    end

    style B1 fill:#2d8,stroke:#333
    style C1 fill:#2d8,stroke:#333
    style D1 fill:#fd2,stroke:#333

Latency Reduction Targets

Technique Latency Reduction Applies To
Prompt compression (smaller context) 30-50% prefill time All LLM calls
Streaming (first token delivery) 60-80% perceived wait All LLM calls
Provisioned throughput Eliminates cold start variance Peak traffic hours
Speculative pre-generation 30-50% for follow-up queries Multi-turn conversations
Parallel context assembly 100-200ms off critical path All LLM calls

Low-Level Design

1. Prompt Compression Pipeline

The prompt is the largest controllable factor in prefill latency. Every unnecessary token adds ~0.1-0.3ms of prefill time.

graph LR
    subgraph "Raw Context"
        A[System Prompt<br>~300 tokens]
        B[RAG Chunks<br>~600 tokens]
        C[Conversation History<br>~500 tokens]
        D[Page Context<br>~200 tokens]
        E[User Message<br>~50 tokens]
    end

    subgraph "Compression"
        A --> F[Static Template<br>Minification]
        B --> G[Chunk Pruning<br>+ Truncation]
        C --> H[Turn Summarization<br>+ Recency Window]
        D --> I[Context Filtering<br>by Intent]
    end

    subgraph "Compressed Prompt"
        F --> J[~200 tokens]
        G --> K[~350 tokens]
        H --> L[~200 tokens]
        I --> M[~100 tokens]
        E --> N[~50 tokens]
    end

    J --> O[Total: ~900 tokens<br>vs 1650 raw]
    K --> O
    L --> O
    M --> O
    N --> O

Code Example: Prompt Compressor

from dataclasses import dataclass, field
from typing import Optional


@dataclass
class PromptBudget:
    """Token budget allocation for each prompt section."""
    system: int = 200
    rag_chunks: int = 350
    conversation: int = 200
    page_context: int = 100
    user_message: int = 100
    total_limit: int = 2000


@dataclass
class CompressedPrompt:
    system_prompt: str
    rag_context: str
    conversation_context: str
    page_context: str
    user_message: str
    total_tokens: int
    compression_ratio: float


class PromptCompressor:
    """Compresses prompt components to fit within a token budget."""

    def __init__(self, tokenizer, budget: Optional[PromptBudget] = None):
        self.tokenizer = tokenizer
        self.budget = budget or PromptBudget()

    def compress(
        self,
        system_prompt: str,
        rag_chunks: list[dict],
        conversation_turns: list[dict],
        page_context: dict,
        user_message: str,
        intent: str,
    ) -> CompressedPrompt:
        raw_total = self._count_tokens(
            system_prompt
            + str(rag_chunks)
            + str(conversation_turns)
            + str(page_context)
            + user_message
        )

        compressed_system = self._compress_system_prompt(system_prompt)
        compressed_rag = self._compress_rag_chunks(rag_chunks, intent)
        compressed_conv = self._compress_conversation(conversation_turns)
        compressed_page = self._compress_page_context(page_context, intent)

        total = (
            self._count_tokens(compressed_system)
            + self._count_tokens(compressed_rag)
            + self._count_tokens(compressed_conv)
            + self._count_tokens(compressed_page)
            + self._count_tokens(user_message)
        )

        return CompressedPrompt(
            system_prompt=compressed_system,
            rag_context=compressed_rag,
            conversation_context=compressed_conv,
            page_context=compressed_page,
            user_message=user_message,
            total_tokens=total,
            compression_ratio=total / raw_total if raw_total > 0 else 1.0,
        )

    def _compress_system_prompt(self, prompt: str) -> str:
        """Remove redundant whitespace and optional instructions based on context."""
        lines = [line.strip() for line in prompt.strip().splitlines() if line.strip()]
        return "\n".join(lines)

    def _compress_rag_chunks(self, chunks: list[dict], intent: str) -> str:
        """Select and truncate RAG chunks to fit budget."""
        if not chunks:
            return ""

        # Sort by relevance score descending
        sorted_chunks = sorted(chunks, key=lambda c: c.get("score", 0), reverse=True)

        compressed_parts = []
        remaining_tokens = self.budget.rag_chunks

        for chunk in sorted_chunks:
            content = chunk.get("content", "")
            chunk_tokens = self._count_tokens(content)

            if chunk_tokens <= remaining_tokens:
                compressed_parts.append(content)
                remaining_tokens -= chunk_tokens
            elif remaining_tokens > 50:
                # Truncate the chunk to fit
                truncated = self._truncate_to_tokens(content, remaining_tokens)
                compressed_parts.append(truncated)
                break
            else:
                break

        return "\n---\n".join(compressed_parts)

    def _compress_conversation(self, turns: list[dict]) -> str:
        """Keep recent turns verbatim, summarize older turns."""
        if not turns:
            return ""

        # Keep last 4 turns (2 user + 2 assistant) verbatim
        recent_turns = turns[-4:]
        older_turns = turns[:-4]

        parts = []

        # Summarize older turns if they exist
        if older_turns:
            # Use pre-computed summaries from DynamoDB SUMMARY items
            summary = older_turns[0].get("summary")
            if summary:
                parts.append(f"[Earlier: {summary}]")
            else:
                # Fallback: compress by keeping only user messages
                for turn in older_turns:
                    if turn.get("role") == "user":
                        parts.append(f"User asked about: {turn['content'][:80]}")

        # Add recent turns verbatim
        for turn in recent_turns:
            role = turn.get("role", "user")
            content = turn.get("content", "")
            parts.append(f"{role}: {content}")

        result = "\n".join(parts)
        return self._truncate_to_tokens(result, self.budget.conversation)

    def _compress_page_context(self, page_context: dict, intent: str) -> str:
        """Include only intent-relevant page context fields."""
        if not page_context:
            return ""

        # Intent-based field selection
        relevant_fields = {
            "product_question": ["current_asin", "store_section"],
            "recommendation": ["browsing_history", "store_section"],
            "product_discovery": ["store_section", "browsing_history"],
            "order_tracking": [],  # Page context irrelevant
            "return_request": ["current_asin"],
            "faq": ["store_section"],
            "promotion": ["store_section", "cart_asins"],
            "checkout_help": ["cart_asins"],
        }

        fields_to_include = relevant_fields.get(intent, list(page_context.keys()))
        filtered = {k: v for k, v in page_context.items() if k in fields_to_include and v}
        if not filtered:
            return ""

        return str(filtered)

    def _count_tokens(self, text: str) -> int:
        """Count tokens using the configured tokenizer."""
        return len(self.tokenizer.encode(text))

    def _truncate_to_tokens(self, text: str, max_tokens: int) -> str:
        """Truncate text to fit within a token limit."""
        tokens = self.tokenizer.encode(text)
        if len(tokens) <= max_tokens:
            return text
        return self.tokenizer.decode(tokens[:max_tokens])

2. Streaming Response Pipeline

Streaming delivers tokens to the user as they generate, reducing perceived latency from total-generation-time to first-token-time.

sequenceDiagram
    participant Client
    participant WebSocket as WebSocket Handler
    participant Orchestrator
    participant Bedrock

    Client->>WebSocket: Send message
    WebSocket->>Orchestrator: Forward request
    Note over Orchestrator: Context assembly (parallel)
    Orchestrator->>Bedrock: InvokeModelWithResponseStream

    loop Token by token
        Bedrock-->>Orchestrator: StreamChunk {token}
        Orchestrator-->>WebSocket: Delta event {token}
        WebSocket-->>Client: Display token
    end

    Bedrock-->>Orchestrator: StreamComplete
    Orchestrator->>Orchestrator: Run guardrails on full response
    Orchestrator-->>WebSocket: Completed event + metadata
    WebSocket-->>Client: Final response + products

Code Example: Bedrock Streaming Client

import asyncio
import json
import time
from dataclasses import dataclass
from typing import AsyncIterator

import boto3


@dataclass
class StreamChunk:
    token: str
    index: int
    latency_ms: float
    is_final: bool = False


@dataclass
class StreamMetrics:
    first_token_ms: float
    total_ms: float
    tokens_generated: int
    tokens_per_second: float


class BedrockStreamingClient:
    """Low-latency streaming client for Bedrock Claude models."""

    def __init__(
        self,
        model_id: str = "anthropic.claude-3-5-sonnet-20241022-v2:0",
        region: str = "us-east-1",
        max_tokens: int = 512,
    ):
        self.bedrock = boto3.client("bedrock-runtime", region_name=region)
        self.model_id = model_id
        self.max_tokens = max_tokens

    async def stream_response(
        self,
        prompt: str,
        system_prompt: str,
        temperature: float = 0.3,
    ) -> AsyncIterator[StreamChunk]:
        """Stream tokens from Bedrock with latency tracking."""
        start_time = time.monotonic()
        first_token_time = None
        token_index = 0

        body = json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": self.max_tokens,
            "temperature": temperature,
            "system": system_prompt,
            "messages": [{"role": "user", "content": prompt}],
        })

        # Use invoke_model_with_response_stream for token-level streaming
        response = await asyncio.to_thread(
            self.bedrock.invoke_model_with_response_stream,
            modelId=self.model_id,
            body=body,
            contentType="application/json",
        )

        stream = response.get("body")
        if stream is None:
            return

        for event in stream:
            chunk = event.get("chunk")
            if chunk is None:
                continue

            data = json.loads(chunk["bytes"].decode("utf-8"))
            delta = data.get("delta", {})

            if delta.get("type") == "text_delta":
                now = time.monotonic()
                if first_token_time is None:
                    first_token_time = now

                yield StreamChunk(
                    token=delta["text"],
                    index=token_index,
                    latency_ms=(now - start_time) * 1000,
                )
                token_index += 1

            # Handle stop reason
            if data.get("type") == "message_delta":
                stop_reason = data.get("delta", {}).get("stop_reason")
                if stop_reason:
                    elapsed = (time.monotonic() - start_time) * 1000
                    yield StreamChunk(
                        token="",
                        index=token_index,
                        latency_ms=elapsed,
                        is_final=True,
                    )

    def get_metrics(
        self, first_token_ms: float, total_ms: float, token_count: int
    ) -> StreamMetrics:
        return StreamMetrics(
            first_token_ms=first_token_ms,
            total_ms=total_ms,
            tokens_generated=token_count,
            tokens_per_second=token_count / (total_ms / 1000) if total_ms > 0 else 0,
        )

3. Provisioned Throughput for Consistent Latency

On-demand Bedrock inference can have variable latency due to cold starts and shared capacity. Provisioned throughput reserves dedicated capacity.

graph TD
    subgraph "Traffic Pattern"
        A[Peak Hours<br>9AM - 11PM JST] --> B[Provisioned Throughput<br>Reserved model units]
        C[Off-Peak Hours<br>11PM - 9AM JST] --> D[On-Demand<br>Shared capacity]
    end

    subgraph "Provisioned Throughput Benefits"
        B --> E[Consistent p95 latency]
        B --> F[No cold start penalty]
        B --> G[Guaranteed capacity<br>during traffic spikes]
    end

    subgraph "Scheduling"
        H[CloudWatch Alarm<br>Traffic > threshold] --> I[Scale up provisioned units]
        J[Scheduled Rule<br>Cron: 9AM JST daily] --> I
        K[Scheduled Rule<br>Cron: 11PM JST daily] --> L[Scale down to on-demand]
    end

    style B fill:#2d8,stroke:#333
    style D fill:#fd2,stroke:#333

Code Example: Provisioned Throughput Manager

import boto3
from datetime import datetime, timezone, timedelta


JST = timezone(timedelta(hours=9))


class ProvisionedThroughputManager:
    """Manages Bedrock provisioned throughput based on traffic patterns."""

    def __init__(
        self,
        model_id: str = "anthropic.claude-3-5-sonnet-20241022-v2:0",
        region: str = "us-east-1",
    ):
        self.bedrock = boto3.client("bedrock", region_name=region)
        self.model_id = model_id
        self._provisioned_arn: str | None = None

    def is_peak_hours(self) -> bool:
        """Check if current time is within peak hours (9AM-11PM JST)."""
        now_jst = datetime.now(JST)
        return 9 <= now_jst.hour < 23

    def ensure_provisioned_throughput(
        self,
        model_units: int = 1,
        commitment: str = "NO_COMMITMENT",
    ) -> str:
        """Create or return existing provisioned throughput ARN."""
        if self._provisioned_arn:
            return self._provisioned_arn

        response = self.bedrock.create_provisioned_model_throughput(
            modelUnits=model_units,
            provisionedModelName=f"mangaassist-{self.model_id.split('.')[-1][:20]}",
            modelId=self.model_id,
            commitmentDuration=commitment,
        )
        self._provisioned_arn = response["provisionedModelArn"]
        return self._provisioned_arn

    def get_model_id_for_current_load(self) -> str:
        """Return provisioned ARN during peak, on-demand model ID otherwise."""
        if self.is_peak_hours() and self._provisioned_arn:
            return self._provisioned_arn
        return self.model_id

    def release_provisioned_throughput(self) -> None:
        """Release provisioned throughput during off-peak to save costs."""
        if self._provisioned_arn:
            self.bedrock.delete_provisioned_model_throughput(
                provisionedModelId=self._provisioned_arn
            )
            self._provisioned_arn = None

4. Speculative Pre-Generation

For multi-turn conversations, the orchestrator can predict the likely next intent and pre-fetch context before the user sends their next message.

sequenceDiagram
    participant User
    participant Orchestrator
    participant PrefetchWorker
    participant Cache
    participant Services

    User->>Orchestrator: "Show me horror manga"
    Orchestrator-->>User: [Streaming response with 3 recommendations]

    Note over Orchestrator,PrefetchWorker: Predict likely follow-ups
    Orchestrator->>PrefetchWorker: Pre-fetch for predicted intents

    par Pre-fetch product details
        PrefetchWorker->>Services: GET /products/{asin1}
        Services-->>PrefetchWorker: Product details
        PrefetchWorker->>Cache: Cache product:{asin1}
    and Pre-fetch reviews
        PrefetchWorker->>Services: GET /reviews/{asin1}
        Services-->>PrefetchWorker: Review summary
        PrefetchWorker->>Cache: Cache review:{asin1}
    and Pre-compute recommendations
        PrefetchWorker->>Services: GET /reco?seed={asin1}
        Services-->>PrefetchWorker: Similar items
        PrefetchWorker->>Cache: Cache reco:{user}:{asin1}
    end

    User->>Orchestrator: "Tell me more about the first one"
    Note over Orchestrator: Cache hit! Context already loaded
    Orchestrator-->>User: [Fast response - context pre-fetched]

Code Example: Speculative Pre-fetcher

import asyncio
from dataclasses import dataclass
from typing import Optional


@dataclass
class PredictedFollowUp:
    intent: str
    probability: float
    prefetch_keys: list[str]
    context_params: dict


# Transition probabilities based on historical data
INTENT_TRANSITIONS = {
    "product_discovery": [
        PredictedFollowUp("product_question", 0.45, ["product_details", "reviews"], {}),
        PredictedFollowUp("recommendation", 0.30, ["similar_items"], {}),
        PredictedFollowUp("checkout_help", 0.10, ["cart_context"], {}),
    ],
    "recommendation": [
        PredictedFollowUp("product_question", 0.50, ["product_details", "reviews"], {}),
        PredictedFollowUp("product_discovery", 0.20, ["trending_items"], {}),
        PredictedFollowUp("checkout_help", 0.15, ["cart_context"], {}),
    ],
    "product_question": [
        PredictedFollowUp("checkout_help", 0.30, ["cart_context", "promotions"], {}),
        PredictedFollowUp("recommendation", 0.25, ["similar_items"], {}),
        PredictedFollowUp("product_question", 0.20, ["product_details"], {}),
    ],
}

PREFETCH_PROBABILITY_THRESHOLD = 0.25


class SpeculativePrefetcher:
    """Pre-fetches context for predicted follow-up intents."""

    def __init__(self, cache_client, product_service, reco_service, promo_service):
        self.cache = cache_client
        self.product_service = product_service
        self.reco_service = reco_service
        self.promo_service = promo_service

    async def prefetch_for_predicted_intents(
        self,
        current_intent: str,
        mentioned_asins: list[str],
        user_id: Optional[str],
    ) -> None:
        """Fire-and-forget pre-fetch for likely follow-up intents."""
        predictions = INTENT_TRANSITIONS.get(current_intent, [])
        high_probability = [
            p for p in predictions if p.probability >= PREFETCH_PROBABILITY_THRESHOLD
        ]

        tasks = []
        for prediction in high_probability:
            for key_type in prediction.prefetch_keys:
                tasks.extend(
                    self._create_prefetch_tasks(key_type, mentioned_asins, user_id)
                )

        if tasks:
            # Run all pre-fetches concurrently with a timeout
            await asyncio.wait_for(
                asyncio.gather(*tasks, return_exceptions=True),
                timeout=2.0,
            )

    def _create_prefetch_tasks(
        self, key_type: str, asins: list[str], user_id: Optional[str]
    ) -> list[asyncio.Task]:
        """Create async tasks for a specific prefetch type."""
        tasks = []

        if key_type == "product_details":
            for asin in asins[:3]:  # Limit to top 3 ASINs
                tasks.append(asyncio.create_task(self._prefetch_product(asin)))

        elif key_type == "reviews":
            for asin in asins[:3]:
                tasks.append(asyncio.create_task(self._prefetch_reviews(asin)))

        elif key_type == "similar_items" and asins:
            tasks.append(
                asyncio.create_task(
                    self._prefetch_recommendations(asins[0], user_id)
                )
            )

        elif key_type == "promotions":
            tasks.append(asyncio.create_task(self._prefetch_promotions()))

        return tasks

    async def _prefetch_product(self, asin: str) -> None:
        """Pre-fetch and cache product details."""
        cached = await self.cache.get(f"product:{asin}")
        if cached:
            return
        product = await self.product_service.get_product(asin)
        if product:
            await self.cache.set(f"product:{asin}", product, ttl=300)

    async def _prefetch_reviews(self, asin: str) -> None:
        """Pre-fetch and cache review summary."""
        cached = await self.cache.get(f"review:{asin}")
        if cached:
            return
        reviews = await self.product_service.get_review_summary(asin)
        if reviews:
            await self.cache.set(f"review:{asin}", reviews, ttl=3600)

    async def _prefetch_recommendations(
        self, seed_asin: str, user_id: Optional[str]
    ) -> None:
        """Pre-fetch and cache recommendations."""
        cache_key = f"reco:{user_id or 'anon'}:{seed_asin}"
        cached = await self.cache.get(cache_key)
        if cached:
            return
        recs = await self.reco_service.get_recommendations(seed_asin, user_id)
        if recs:
            await self.cache.set(cache_key, recs, ttl=900)

    async def _prefetch_promotions(self) -> None:
        """Pre-fetch and cache current promotions."""
        cached = await self.cache.get("promo:manga-home")
        if cached:
            return
        promos = await self.promo_service.get_active_promotions("manga-home")
        if promos:
            await self.cache.set("promo:manga-home", promos, ttl=900)

5. Parallel Context Assembly

Instead of sequentially loading memory, running RAG, and fetching service data, the orchestrator runs all independent context-gathering operations in parallel.

graph LR
    subgraph "Sequential (Before)"
        A1[Load Memory<br>50ms] --> A2[Classify Intent<br>50ms]
        A2 --> A3[RAG Retrieval<br>200ms]
        A3 --> A4[Service Call<br>100ms]
        A4 --> A5[Build Prompt<br>10ms]
    end

    subgraph "Parallel (After)"
        B1[Load Memory] --> B2[Classify Intent]
        B2 --> B3a[RAG Retrieval]
        B2 --> B3b[Service Call]
        B2 --> B3c[Cache Check]
        B3a --> B4[Build Prompt]
        B3b --> B4
        B3c --> B4
    end

    style A1 fill:#f66,stroke:#333
    style A2 fill:#f66,stroke:#333
    style A3 fill:#f66,stroke:#333
    style A4 fill:#f66,stroke:#333

    style B3a fill:#2d8,stroke:#333
    style B3b fill:#2d8,stroke:#333
    style B3c fill:#2d8,stroke:#333

Code Example: Parallel Context Assembler

import asyncio
import time
from dataclasses import dataclass, field
from typing import Optional


@dataclass
class AssembledContext:
    """All context needed for prompt construction, assembled in parallel."""
    memory_turns: list[dict] = field(default_factory=list)
    rag_chunks: list[dict] = field(default_factory=list)
    service_data: dict = field(default_factory=dict)
    cached_data: dict = field(default_factory=dict)
    assembly_time_ms: float = 0.0
    errors: list[str] = field(default_factory=list)


class ParallelContextAssembler:
    """Assembles all prompt context in parallel to minimize latency."""

    def __init__(self, memory_client, rag_client, service_router, cache_client):
        self.memory = memory_client
        self.rag = rag_client
        self.services = service_router
        self.cache = cache_client

    async def assemble(
        self,
        session_id: str,
        intent: str,
        user_message: str,
        entities: dict,
    ) -> AssembledContext:
        start = time.monotonic()
        context = AssembledContext()

        # Fan out all independent operations
        results = await asyncio.gather(
            self._load_memory(session_id),
            self._retrieve_rag_chunks(user_message, intent),
            self._fetch_service_data(intent, entities),
            self._check_cache(intent, entities),
            return_exceptions=True,
        )

        # Collect results, tolerating partial failures
        if not isinstance(results[0], BaseException):
            context.memory_turns = results[0]
        else:
            context.errors.append(f"memory: {results[0]}")

        if not isinstance(results[1], BaseException):
            context.rag_chunks = results[1]
        else:
            context.errors.append(f"rag: {results[1]}")

        if not isinstance(results[2], BaseException):
            context.service_data = results[2]
        else:
            context.errors.append(f"service: {results[2]}")

        if not isinstance(results[3], BaseException):
            context.cached_data = results[3]
        else:
            context.errors.append(f"cache: {results[3]}")

        context.assembly_time_ms = (time.monotonic() - start) * 1000
        return context

    async def _load_memory(self, session_id: str) -> list[dict]:
        """Load conversation turns from DynamoDB."""
        return await self.memory.get_recent_turns(session_id, limit=10)

    async def _retrieve_rag_chunks(self, query: str, intent: str) -> list[dict]:
        """Retrieve relevant chunks from OpenSearch."""
        return await self.rag.retrieve(query=query, intent_filter=intent, top_k=5)

    async def _fetch_service_data(self, intent: str, entities: dict) -> dict:
        """Fetch data from the appropriate service based on intent."""
        return await self.services.fetch(intent=intent, entities=entities)

    async def _check_cache(self, intent: str, entities: dict) -> dict:
        """Check cache for pre-fetched data."""
        cache_keys = self._resolve_cache_keys(intent, entities)
        results = {}
        for key in cache_keys:
            value = await self.cache.get(key)
            if value is not None:
                results[key] = value
        return results

    def _resolve_cache_keys(self, intent: str, entities: dict) -> list[str]:
        """Determine which cache keys to check based on intent."""
        keys = []
        asin = entities.get("asin") or entities.get("current_asin")
        if asin:
            keys.append(f"product:{asin}")
            keys.append(f"review:{asin}")
        if intent in ("promotion", "product_discovery"):
            keys.append("promo:manga-home")
        return keys

Metrics and Monitoring

Key Latency Metrics

Metric Target Alarm Threshold Resolution
bedrock.first_token_latency_ms p95 < 400ms p95 > 500ms for 5 min Switch to provisioned throughput
bedrock.total_latency_ms p95 < 1500ms p95 > 2000ms for 5 min Reduce max_tokens, compress prompt
prompt.total_tokens p50 < 1200 p95 > 2500 Review compression pipeline
prompt.compression_ratio < 0.65 > 0.85 Tune summarization window
context.assembly_time_ms p95 < 200ms p95 > 350ms Check downstream service latency
prefetch.hit_rate > 30% < 15% Retrain transition probabilities

Monitoring Dashboard

graph TD
    subgraph "CloudWatch Metrics"
        A[First Token Latency<br>p50 / p95 / p99]
        B[Total Generation Time<br>by model tier]
        C[Prompt Token Count<br>distribution]
        D[Compression Ratio<br>by intent]
        E[Prefetch Hit Rate<br>by intent transition]
    end

    subgraph "Alarms"
        A --> F{p95 > 500ms?}
        F -->|Yes| G[Page on-call +<br>auto-enable provisioned]
        C --> H{p95 > 2500 tokens?}
        H -->|Yes| I[Alert: prompt bloat]
    end