PO-01: LLM Response Latency Optimization
User Story
As a product manager for MangaAssist, I want to minimize the time between a user sending a message and seeing the first token of the LLM response, So that the chatbot feels responsive and conversational, keeping users engaged rather than abandoning the chat.
Acceptance Criteria
- First-token latency (time to first byte from Bedrock) is under 400ms at p95.
- Full response streaming completes within 1.5 seconds for standard queries (< 500 output tokens).
- Prompt token count stays under 2,000 tokens for 80% of requests through compression.
- Warm-pool provisioned throughput maintains latency under load spikes.
- Speculative pre-generation reduces perceived latency for predictable follow-ups by 30%.
High-Level Design
The LLM Latency Problem
LLM inference is the single largest contributor to response latency. A typical Bedrock Claude 3.5 Sonnet call involves:
| Phase | Typical Latency | What Happens |
|---|---|---|
| Network to Bedrock | 5-15ms | HTTPS request from VPC to Bedrock endpoint |
| Input tokenization | 10-30ms | Convert prompt text to tokens |
| Prefill (prompt processing) | 100-400ms | Process all input tokens in parallel |
| First token decode | 20-50ms | Generate first output token |
| Streaming decode | 30-50ms/token | Generate subsequent tokens autoregressively |
| Total first token | ~150-500ms | Dominated by prefill time |
Key insight: Prefill time scales linearly with input token count. Reducing prompt size directly reduces first-token latency.
Optimization Strategy Overview
graph TD
A[User Message] --> B[Prompt Compression]
B --> C[Streaming Response]
subgraph "Before LLM Call"
B --> B1[Context Window<br>Optimization]
B --> B2[Dynamic Prompt<br>Assembly]
B --> B3[Conversation<br>Summarization]
end
subgraph "During LLM Call"
C --> C1[Bedrock Streaming<br>API]
C --> C2[Provisioned<br>Throughput]
C --> C3[Token-Level<br>Streaming to Client]
end
subgraph "Ahead of LLM Call"
D[Speculative<br>Pre-generation] --> D1[Predict Next<br>Intent]
D1 --> D2[Pre-fetch<br>Context]
D2 --> D3[Pre-warm<br>Prompt]
end
style B1 fill:#2d8,stroke:#333
style C1 fill:#2d8,stroke:#333
style D1 fill:#fd2,stroke:#333
Latency Reduction Targets
| Technique | Latency Reduction | Applies To |
|---|---|---|
| Prompt compression (smaller context) | 30-50% prefill time | All LLM calls |
| Streaming (first token delivery) | 60-80% perceived wait | All LLM calls |
| Provisioned throughput | Eliminates cold start variance | Peak traffic hours |
| Speculative pre-generation | 30-50% for follow-up queries | Multi-turn conversations |
| Parallel context assembly | 100-200ms off critical path | All LLM calls |
Low-Level Design
1. Prompt Compression Pipeline
The prompt is the largest controllable factor in prefill latency. Every unnecessary token adds ~0.1-0.3ms of prefill time.
graph LR
subgraph "Raw Context"
A[System Prompt<br>~300 tokens]
B[RAG Chunks<br>~600 tokens]
C[Conversation History<br>~500 tokens]
D[Page Context<br>~200 tokens]
E[User Message<br>~50 tokens]
end
subgraph "Compression"
A --> F[Static Template<br>Minification]
B --> G[Chunk Pruning<br>+ Truncation]
C --> H[Turn Summarization<br>+ Recency Window]
D --> I[Context Filtering<br>by Intent]
end
subgraph "Compressed Prompt"
F --> J[~200 tokens]
G --> K[~350 tokens]
H --> L[~200 tokens]
I --> M[~100 tokens]
E --> N[~50 tokens]
end
J --> O[Total: ~900 tokens<br>vs 1650 raw]
K --> O
L --> O
M --> O
N --> O
Code Example: Prompt Compressor
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class PromptBudget:
"""Token budget allocation for each prompt section."""
system: int = 200
rag_chunks: int = 350
conversation: int = 200
page_context: int = 100
user_message: int = 100
total_limit: int = 2000
@dataclass
class CompressedPrompt:
system_prompt: str
rag_context: str
conversation_context: str
page_context: str
user_message: str
total_tokens: int
compression_ratio: float
class PromptCompressor:
"""Compresses prompt components to fit within a token budget."""
def __init__(self, tokenizer, budget: Optional[PromptBudget] = None):
self.tokenizer = tokenizer
self.budget = budget or PromptBudget()
def compress(
self,
system_prompt: str,
rag_chunks: list[dict],
conversation_turns: list[dict],
page_context: dict,
user_message: str,
intent: str,
) -> CompressedPrompt:
raw_total = self._count_tokens(
system_prompt
+ str(rag_chunks)
+ str(conversation_turns)
+ str(page_context)
+ user_message
)
compressed_system = self._compress_system_prompt(system_prompt)
compressed_rag = self._compress_rag_chunks(rag_chunks, intent)
compressed_conv = self._compress_conversation(conversation_turns)
compressed_page = self._compress_page_context(page_context, intent)
total = (
self._count_tokens(compressed_system)
+ self._count_tokens(compressed_rag)
+ self._count_tokens(compressed_conv)
+ self._count_tokens(compressed_page)
+ self._count_tokens(user_message)
)
return CompressedPrompt(
system_prompt=compressed_system,
rag_context=compressed_rag,
conversation_context=compressed_conv,
page_context=compressed_page,
user_message=user_message,
total_tokens=total,
compression_ratio=total / raw_total if raw_total > 0 else 1.0,
)
def _compress_system_prompt(self, prompt: str) -> str:
"""Remove redundant whitespace and optional instructions based on context."""
lines = [line.strip() for line in prompt.strip().splitlines() if line.strip()]
return "\n".join(lines)
def _compress_rag_chunks(self, chunks: list[dict], intent: str) -> str:
"""Select and truncate RAG chunks to fit budget."""
if not chunks:
return ""
# Sort by relevance score descending
sorted_chunks = sorted(chunks, key=lambda c: c.get("score", 0), reverse=True)
compressed_parts = []
remaining_tokens = self.budget.rag_chunks
for chunk in sorted_chunks:
content = chunk.get("content", "")
chunk_tokens = self._count_tokens(content)
if chunk_tokens <= remaining_tokens:
compressed_parts.append(content)
remaining_tokens -= chunk_tokens
elif remaining_tokens > 50:
# Truncate the chunk to fit
truncated = self._truncate_to_tokens(content, remaining_tokens)
compressed_parts.append(truncated)
break
else:
break
return "\n---\n".join(compressed_parts)
def _compress_conversation(self, turns: list[dict]) -> str:
"""Keep recent turns verbatim, summarize older turns."""
if not turns:
return ""
# Keep last 4 turns (2 user + 2 assistant) verbatim
recent_turns = turns[-4:]
older_turns = turns[:-4]
parts = []
# Summarize older turns if they exist
if older_turns:
# Use pre-computed summaries from DynamoDB SUMMARY items
summary = older_turns[0].get("summary")
if summary:
parts.append(f"[Earlier: {summary}]")
else:
# Fallback: compress by keeping only user messages
for turn in older_turns:
if turn.get("role") == "user":
parts.append(f"User asked about: {turn['content'][:80]}")
# Add recent turns verbatim
for turn in recent_turns:
role = turn.get("role", "user")
content = turn.get("content", "")
parts.append(f"{role}: {content}")
result = "\n".join(parts)
return self._truncate_to_tokens(result, self.budget.conversation)
def _compress_page_context(self, page_context: dict, intent: str) -> str:
"""Include only intent-relevant page context fields."""
if not page_context:
return ""
# Intent-based field selection
relevant_fields = {
"product_question": ["current_asin", "store_section"],
"recommendation": ["browsing_history", "store_section"],
"product_discovery": ["store_section", "browsing_history"],
"order_tracking": [], # Page context irrelevant
"return_request": ["current_asin"],
"faq": ["store_section"],
"promotion": ["store_section", "cart_asins"],
"checkout_help": ["cart_asins"],
}
fields_to_include = relevant_fields.get(intent, list(page_context.keys()))
filtered = {k: v for k, v in page_context.items() if k in fields_to_include and v}
if not filtered:
return ""
return str(filtered)
def _count_tokens(self, text: str) -> int:
"""Count tokens using the configured tokenizer."""
return len(self.tokenizer.encode(text))
def _truncate_to_tokens(self, text: str, max_tokens: int) -> str:
"""Truncate text to fit within a token limit."""
tokens = self.tokenizer.encode(text)
if len(tokens) <= max_tokens:
return text
return self.tokenizer.decode(tokens[:max_tokens])
2. Streaming Response Pipeline
Streaming delivers tokens to the user as they generate, reducing perceived latency from total-generation-time to first-token-time.
sequenceDiagram
participant Client
participant WebSocket as WebSocket Handler
participant Orchestrator
participant Bedrock
Client->>WebSocket: Send message
WebSocket->>Orchestrator: Forward request
Note over Orchestrator: Context assembly (parallel)
Orchestrator->>Bedrock: InvokeModelWithResponseStream
loop Token by token
Bedrock-->>Orchestrator: StreamChunk {token}
Orchestrator-->>WebSocket: Delta event {token}
WebSocket-->>Client: Display token
end
Bedrock-->>Orchestrator: StreamComplete
Orchestrator->>Orchestrator: Run guardrails on full response
Orchestrator-->>WebSocket: Completed event + metadata
WebSocket-->>Client: Final response + products
Code Example: Bedrock Streaming Client
import asyncio
import json
import time
from dataclasses import dataclass
from typing import AsyncIterator
import boto3
@dataclass
class StreamChunk:
token: str
index: int
latency_ms: float
is_final: bool = False
@dataclass
class StreamMetrics:
first_token_ms: float
total_ms: float
tokens_generated: int
tokens_per_second: float
class BedrockStreamingClient:
"""Low-latency streaming client for Bedrock Claude models."""
def __init__(
self,
model_id: str = "anthropic.claude-3-5-sonnet-20241022-v2:0",
region: str = "us-east-1",
max_tokens: int = 512,
):
self.bedrock = boto3.client("bedrock-runtime", region_name=region)
self.model_id = model_id
self.max_tokens = max_tokens
async def stream_response(
self,
prompt: str,
system_prompt: str,
temperature: float = 0.3,
) -> AsyncIterator[StreamChunk]:
"""Stream tokens from Bedrock with latency tracking."""
start_time = time.monotonic()
first_token_time = None
token_index = 0
body = json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": self.max_tokens,
"temperature": temperature,
"system": system_prompt,
"messages": [{"role": "user", "content": prompt}],
})
# Use invoke_model_with_response_stream for token-level streaming
response = await asyncio.to_thread(
self.bedrock.invoke_model_with_response_stream,
modelId=self.model_id,
body=body,
contentType="application/json",
)
stream = response.get("body")
if stream is None:
return
for event in stream:
chunk = event.get("chunk")
if chunk is None:
continue
data = json.loads(chunk["bytes"].decode("utf-8"))
delta = data.get("delta", {})
if delta.get("type") == "text_delta":
now = time.monotonic()
if first_token_time is None:
first_token_time = now
yield StreamChunk(
token=delta["text"],
index=token_index,
latency_ms=(now - start_time) * 1000,
)
token_index += 1
# Handle stop reason
if data.get("type") == "message_delta":
stop_reason = data.get("delta", {}).get("stop_reason")
if stop_reason:
elapsed = (time.monotonic() - start_time) * 1000
yield StreamChunk(
token="",
index=token_index,
latency_ms=elapsed,
is_final=True,
)
def get_metrics(
self, first_token_ms: float, total_ms: float, token_count: int
) -> StreamMetrics:
return StreamMetrics(
first_token_ms=first_token_ms,
total_ms=total_ms,
tokens_generated=token_count,
tokens_per_second=token_count / (total_ms / 1000) if total_ms > 0 else 0,
)
3. Provisioned Throughput for Consistent Latency
On-demand Bedrock inference can have variable latency due to cold starts and shared capacity. Provisioned throughput reserves dedicated capacity.
graph TD
subgraph "Traffic Pattern"
A[Peak Hours<br>9AM - 11PM JST] --> B[Provisioned Throughput<br>Reserved model units]
C[Off-Peak Hours<br>11PM - 9AM JST] --> D[On-Demand<br>Shared capacity]
end
subgraph "Provisioned Throughput Benefits"
B --> E[Consistent p95 latency]
B --> F[No cold start penalty]
B --> G[Guaranteed capacity<br>during traffic spikes]
end
subgraph "Scheduling"
H[CloudWatch Alarm<br>Traffic > threshold] --> I[Scale up provisioned units]
J[Scheduled Rule<br>Cron: 9AM JST daily] --> I
K[Scheduled Rule<br>Cron: 11PM JST daily] --> L[Scale down to on-demand]
end
style B fill:#2d8,stroke:#333
style D fill:#fd2,stroke:#333
Code Example: Provisioned Throughput Manager
import boto3
from datetime import datetime, timezone, timedelta
JST = timezone(timedelta(hours=9))
class ProvisionedThroughputManager:
"""Manages Bedrock provisioned throughput based on traffic patterns."""
def __init__(
self,
model_id: str = "anthropic.claude-3-5-sonnet-20241022-v2:0",
region: str = "us-east-1",
):
self.bedrock = boto3.client("bedrock", region_name=region)
self.model_id = model_id
self._provisioned_arn: str | None = None
def is_peak_hours(self) -> bool:
"""Check if current time is within peak hours (9AM-11PM JST)."""
now_jst = datetime.now(JST)
return 9 <= now_jst.hour < 23
def ensure_provisioned_throughput(
self,
model_units: int = 1,
commitment: str = "NO_COMMITMENT",
) -> str:
"""Create or return existing provisioned throughput ARN."""
if self._provisioned_arn:
return self._provisioned_arn
response = self.bedrock.create_provisioned_model_throughput(
modelUnits=model_units,
provisionedModelName=f"mangaassist-{self.model_id.split('.')[-1][:20]}",
modelId=self.model_id,
commitmentDuration=commitment,
)
self._provisioned_arn = response["provisionedModelArn"]
return self._provisioned_arn
def get_model_id_for_current_load(self) -> str:
"""Return provisioned ARN during peak, on-demand model ID otherwise."""
if self.is_peak_hours() and self._provisioned_arn:
return self._provisioned_arn
return self.model_id
def release_provisioned_throughput(self) -> None:
"""Release provisioned throughput during off-peak to save costs."""
if self._provisioned_arn:
self.bedrock.delete_provisioned_model_throughput(
provisionedModelId=self._provisioned_arn
)
self._provisioned_arn = None
4. Speculative Pre-Generation
For multi-turn conversations, the orchestrator can predict the likely next intent and pre-fetch context before the user sends their next message.
sequenceDiagram
participant User
participant Orchestrator
participant PrefetchWorker
participant Cache
participant Services
User->>Orchestrator: "Show me horror manga"
Orchestrator-->>User: [Streaming response with 3 recommendations]
Note over Orchestrator,PrefetchWorker: Predict likely follow-ups
Orchestrator->>PrefetchWorker: Pre-fetch for predicted intents
par Pre-fetch product details
PrefetchWorker->>Services: GET /products/{asin1}
Services-->>PrefetchWorker: Product details
PrefetchWorker->>Cache: Cache product:{asin1}
and Pre-fetch reviews
PrefetchWorker->>Services: GET /reviews/{asin1}
Services-->>PrefetchWorker: Review summary
PrefetchWorker->>Cache: Cache review:{asin1}
and Pre-compute recommendations
PrefetchWorker->>Services: GET /reco?seed={asin1}
Services-->>PrefetchWorker: Similar items
PrefetchWorker->>Cache: Cache reco:{user}:{asin1}
end
User->>Orchestrator: "Tell me more about the first one"
Note over Orchestrator: Cache hit! Context already loaded
Orchestrator-->>User: [Fast response - context pre-fetched]
Code Example: Speculative Pre-fetcher
import asyncio
from dataclasses import dataclass
from typing import Optional
@dataclass
class PredictedFollowUp:
intent: str
probability: float
prefetch_keys: list[str]
context_params: dict
# Transition probabilities based on historical data
INTENT_TRANSITIONS = {
"product_discovery": [
PredictedFollowUp("product_question", 0.45, ["product_details", "reviews"], {}),
PredictedFollowUp("recommendation", 0.30, ["similar_items"], {}),
PredictedFollowUp("checkout_help", 0.10, ["cart_context"], {}),
],
"recommendation": [
PredictedFollowUp("product_question", 0.50, ["product_details", "reviews"], {}),
PredictedFollowUp("product_discovery", 0.20, ["trending_items"], {}),
PredictedFollowUp("checkout_help", 0.15, ["cart_context"], {}),
],
"product_question": [
PredictedFollowUp("checkout_help", 0.30, ["cart_context", "promotions"], {}),
PredictedFollowUp("recommendation", 0.25, ["similar_items"], {}),
PredictedFollowUp("product_question", 0.20, ["product_details"], {}),
],
}
PREFETCH_PROBABILITY_THRESHOLD = 0.25
class SpeculativePrefetcher:
"""Pre-fetches context for predicted follow-up intents."""
def __init__(self, cache_client, product_service, reco_service, promo_service):
self.cache = cache_client
self.product_service = product_service
self.reco_service = reco_service
self.promo_service = promo_service
async def prefetch_for_predicted_intents(
self,
current_intent: str,
mentioned_asins: list[str],
user_id: Optional[str],
) -> None:
"""Fire-and-forget pre-fetch for likely follow-up intents."""
predictions = INTENT_TRANSITIONS.get(current_intent, [])
high_probability = [
p for p in predictions if p.probability >= PREFETCH_PROBABILITY_THRESHOLD
]
tasks = []
for prediction in high_probability:
for key_type in prediction.prefetch_keys:
tasks.extend(
self._create_prefetch_tasks(key_type, mentioned_asins, user_id)
)
if tasks:
# Run all pre-fetches concurrently with a timeout
await asyncio.wait_for(
asyncio.gather(*tasks, return_exceptions=True),
timeout=2.0,
)
def _create_prefetch_tasks(
self, key_type: str, asins: list[str], user_id: Optional[str]
) -> list[asyncio.Task]:
"""Create async tasks for a specific prefetch type."""
tasks = []
if key_type == "product_details":
for asin in asins[:3]: # Limit to top 3 ASINs
tasks.append(asyncio.create_task(self._prefetch_product(asin)))
elif key_type == "reviews":
for asin in asins[:3]:
tasks.append(asyncio.create_task(self._prefetch_reviews(asin)))
elif key_type == "similar_items" and asins:
tasks.append(
asyncio.create_task(
self._prefetch_recommendations(asins[0], user_id)
)
)
elif key_type == "promotions":
tasks.append(asyncio.create_task(self._prefetch_promotions()))
return tasks
async def _prefetch_product(self, asin: str) -> None:
"""Pre-fetch and cache product details."""
cached = await self.cache.get(f"product:{asin}")
if cached:
return
product = await self.product_service.get_product(asin)
if product:
await self.cache.set(f"product:{asin}", product, ttl=300)
async def _prefetch_reviews(self, asin: str) -> None:
"""Pre-fetch and cache review summary."""
cached = await self.cache.get(f"review:{asin}")
if cached:
return
reviews = await self.product_service.get_review_summary(asin)
if reviews:
await self.cache.set(f"review:{asin}", reviews, ttl=3600)
async def _prefetch_recommendations(
self, seed_asin: str, user_id: Optional[str]
) -> None:
"""Pre-fetch and cache recommendations."""
cache_key = f"reco:{user_id or 'anon'}:{seed_asin}"
cached = await self.cache.get(cache_key)
if cached:
return
recs = await self.reco_service.get_recommendations(seed_asin, user_id)
if recs:
await self.cache.set(cache_key, recs, ttl=900)
async def _prefetch_promotions(self) -> None:
"""Pre-fetch and cache current promotions."""
cached = await self.cache.get("promo:manga-home")
if cached:
return
promos = await self.promo_service.get_active_promotions("manga-home")
if promos:
await self.cache.set("promo:manga-home", promos, ttl=900)
5. Parallel Context Assembly
Instead of sequentially loading memory, running RAG, and fetching service data, the orchestrator runs all independent context-gathering operations in parallel.
graph LR
subgraph "Sequential (Before)"
A1[Load Memory<br>50ms] --> A2[Classify Intent<br>50ms]
A2 --> A3[RAG Retrieval<br>200ms]
A3 --> A4[Service Call<br>100ms]
A4 --> A5[Build Prompt<br>10ms]
end
subgraph "Parallel (After)"
B1[Load Memory] --> B2[Classify Intent]
B2 --> B3a[RAG Retrieval]
B2 --> B3b[Service Call]
B2 --> B3c[Cache Check]
B3a --> B4[Build Prompt]
B3b --> B4
B3c --> B4
end
style A1 fill:#f66,stroke:#333
style A2 fill:#f66,stroke:#333
style A3 fill:#f66,stroke:#333
style A4 fill:#f66,stroke:#333
style B3a fill:#2d8,stroke:#333
style B3b fill:#2d8,stroke:#333
style B3c fill:#2d8,stroke:#333
Code Example: Parallel Context Assembler
import asyncio
import time
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class AssembledContext:
"""All context needed for prompt construction, assembled in parallel."""
memory_turns: list[dict] = field(default_factory=list)
rag_chunks: list[dict] = field(default_factory=list)
service_data: dict = field(default_factory=dict)
cached_data: dict = field(default_factory=dict)
assembly_time_ms: float = 0.0
errors: list[str] = field(default_factory=list)
class ParallelContextAssembler:
"""Assembles all prompt context in parallel to minimize latency."""
def __init__(self, memory_client, rag_client, service_router, cache_client):
self.memory = memory_client
self.rag = rag_client
self.services = service_router
self.cache = cache_client
async def assemble(
self,
session_id: str,
intent: str,
user_message: str,
entities: dict,
) -> AssembledContext:
start = time.monotonic()
context = AssembledContext()
# Fan out all independent operations
results = await asyncio.gather(
self._load_memory(session_id),
self._retrieve_rag_chunks(user_message, intent),
self._fetch_service_data(intent, entities),
self._check_cache(intent, entities),
return_exceptions=True,
)
# Collect results, tolerating partial failures
if not isinstance(results[0], BaseException):
context.memory_turns = results[0]
else:
context.errors.append(f"memory: {results[0]}")
if not isinstance(results[1], BaseException):
context.rag_chunks = results[1]
else:
context.errors.append(f"rag: {results[1]}")
if not isinstance(results[2], BaseException):
context.service_data = results[2]
else:
context.errors.append(f"service: {results[2]}")
if not isinstance(results[3], BaseException):
context.cached_data = results[3]
else:
context.errors.append(f"cache: {results[3]}")
context.assembly_time_ms = (time.monotonic() - start) * 1000
return context
async def _load_memory(self, session_id: str) -> list[dict]:
"""Load conversation turns from DynamoDB."""
return await self.memory.get_recent_turns(session_id, limit=10)
async def _retrieve_rag_chunks(self, query: str, intent: str) -> list[dict]:
"""Retrieve relevant chunks from OpenSearch."""
return await self.rag.retrieve(query=query, intent_filter=intent, top_k=5)
async def _fetch_service_data(self, intent: str, entities: dict) -> dict:
"""Fetch data from the appropriate service based on intent."""
return await self.services.fetch(intent=intent, entities=entities)
async def _check_cache(self, intent: str, entities: dict) -> dict:
"""Check cache for pre-fetched data."""
cache_keys = self._resolve_cache_keys(intent, entities)
results = {}
for key in cache_keys:
value = await self.cache.get(key)
if value is not None:
results[key] = value
return results
def _resolve_cache_keys(self, intent: str, entities: dict) -> list[str]:
"""Determine which cache keys to check based on intent."""
keys = []
asin = entities.get("asin") or entities.get("current_asin")
if asin:
keys.append(f"product:{asin}")
keys.append(f"review:{asin}")
if intent in ("promotion", "product_discovery"):
keys.append("promo:manga-home")
return keys
Metrics and Monitoring
Key Latency Metrics
| Metric | Target | Alarm Threshold | Resolution |
|---|---|---|---|
bedrock.first_token_latency_ms |
p95 < 400ms | p95 > 500ms for 5 min | Switch to provisioned throughput |
bedrock.total_latency_ms |
p95 < 1500ms | p95 > 2000ms for 5 min | Reduce max_tokens, compress prompt |
prompt.total_tokens |
p50 < 1200 | p95 > 2500 | Review compression pipeline |
prompt.compression_ratio |
< 0.65 | > 0.85 | Tune summarization window |
context.assembly_time_ms |
p95 < 200ms | p95 > 350ms | Check downstream service latency |
prefetch.hit_rate |
> 30% | < 15% | Retrain transition probabilities |
Monitoring Dashboard
graph TD
subgraph "CloudWatch Metrics"
A[First Token Latency<br>p50 / p95 / p99]
B[Total Generation Time<br>by model tier]
C[Prompt Token Count<br>distribution]
D[Compression Ratio<br>by intent]
E[Prefetch Hit Rate<br>by intent transition]
end
subgraph "Alarms"
A --> F{p95 > 500ms?}
F -->|Yes| G[Page on-call +<br>auto-enable provisioned]
C --> H{p95 > 2500 tokens?}
H -->|Yes| I[Alert: prompt bloat]
end