LOCAL PREVIEW View on GitHub

Responsive AI Architecture for GenAI Applications

AWS AIP-C01 Task 4.2 — Skill 4.2.1: Design responsive AI systems that minimize latency while balancing cost and quality Context: MangaAssist e-commerce chatbot — Bedrock Claude 3 Sonnet/Haiku, OpenSearch Serverless, DynamoDB, ECS Fargate, API Gateway WebSocket, ElastiCache Redis North Star: End-to-end response < 3s, first token to screen < 400ms


Skill Mapping

Certification Domain Task Skill
AWS AIP-C01 Domain 4 — Operational Efficiency Task 4.2 — Optimize FM application performance Skill 4.2.1 — Identify techniques to create responsive AI systems (e.g., pre-computation, latency-optimized model selection, parallel request processing, response streaming, performance benchmarking)

Skill scope: Architect end-to-end responsive AI pipelines that deliver sub-second perceived latency through pre-computation, parallel orchestration, intelligent model routing, response streaming, and continuous performance benchmarking — all grounded in cost-aware tradeoff analysis.


Mind Map — Responsive AI Dimensions

mindmap
  root((Responsive<br/>AI Systems))
    Pre-Computation
      Trending Manga Responses
        Pre-generate for Top 100 Titles
        Nightly Batch with Haiku
        Cache in ElastiCache Redis
      Catalog Embedding Warm-Up
        New Release Embeddings
        Pre-index in OpenSearch
        Background Vectorization
      Recommendation Pre-Build
        Popular Genre Results
        Personalized Top-N per Segment
        Scheduled Recomputation
    Latency-Optimized Models
      Haiku for Time-Sensitive
        Order Status < 500ms
        Simple FAQ < 300ms
        Greeting / Chitchat < 200ms
      Sonnet for Deep Queries
        Manga Recommendations
        Plot Comparison
        Complex Product Q&A
      Dynamic Model Selection
        Intent Classification
        Complexity Scoring
        Latency Budget Routing
    Parallel Request Processing
      Concurrent Data Fetch
        RAG Retrieval
        DynamoDB Session Load
        User Profile Fetch
      asyncio.gather Pattern
        Fan-Out with Timeout
        Partial Result Handling
        Graceful Degradation
      Dependency Graph
        Independent vs Sequential
        Critical Path Analysis
        Waterfall Elimination
    Response Streaming
      Bedrock Streaming API
        invoke_model_with_response_stream
        Chunk-by-Chunk Delivery
        Server-Sent Events
      WebSocket Delivery
        API Gateway WebSocket
        Binary Frame Optimization
        Connection Keep-Alive
      Progressive Rendering
        First Token Priority
        Incremental Display
        Skeleton UI Patterns
    Performance Benchmarking
      Latency Targets per Intent
        p50 / p95 / p99 Definitions
        SLA per Query Type
        Budget Allocation
      Automated Profiling
        Continuous Latency Tests
        Statistical Significance
        Regression Detection
      Cost-Latency Tradeoff
        Quality x Speed / Cost
        Pareto Frontier Analysis
        Budget-Constrained Optimization

Pre-Computation Strategies

Pre-computation shifts latency from request time to background processing. For MangaAssist, this means generating answers before customers ask.

What to Pre-Compute

Pre-Computation Target Trigger Storage TTL Expected Hit Rate
Trending manga title descriptions Hourly trending list update ElastiCache Redis 1 hour ~35% of product queries
New catalog item embeddings Catalog CDC event from DynamoDB OpenSearch Serverless Until next update 100% of new items
Popular genre recommendation lists Nightly batch job ElastiCache Redis 24 hours ~20% of recommendation queries
FAQ answer variants Weekly prompt refresh ElastiCache Redis 7 days ~15% of support queries
Order status templates Real-time order state change DynamoDB + Redis Until next state change ~40% of order queries

Pre-Computation Pipeline

graph LR
    subgraph "Event Sources"
        TREND[Trending API<br/>Hourly]
        CATALOG[DynamoDB Streams<br/>CDC Events]
        BATCH[EventBridge<br/>Nightly Schedule]
    end

    subgraph "Pre-Computation Engine"
        LAMBDA_TREND[Lambda: Generate<br/>Trending Responses]
        LAMBDA_EMBED[Lambda: Compute<br/>Embeddings]
        LAMBDA_REC[Lambda: Build<br/>Recommendations]
    end

    subgraph "Pre-Computed Storage"
        REDIS[ElastiCache Redis<br/>Response Cache]
        OPENSEARCH[OpenSearch<br/>Embedding Index]
    end

    TREND --> LAMBDA_TREND --> REDIS
    CATALOG --> LAMBDA_EMBED --> OPENSEARCH
    BATCH --> LAMBDA_REC --> REDIS

    style REDIS fill:#2ecc71,color:#000
    style OPENSEARCH fill:#ff9900,color:#000
    style LAMBDA_TREND fill:#ff9900,color:#000
    style LAMBDA_EMBED fill:#ff9900,color:#000
    style LAMBDA_REC fill:#ff9900,color:#000

Pre-Computation Cost Analysis

Strategy Compute Cost (Monthly) Storage Cost (Monthly) Latency Saved Net Savings at 1M queries/day
Trending responses (Haiku) $45 (hourly batch) $12 (Redis) 1.2s per hit $4,200 (avoided Sonnet calls)
Catalog embeddings $28 (Lambda) $8 (OpenSearch) 0.8s per new-item query $1,100 (faster retrieval)
Genre recommendations $18 (nightly Haiku) $6 (Redis) 1.5s per hit $2,800 (avoided full RAG+LLM)
Total $91 $26 avg 1.1s $8,100 saved

Latency-Optimized Model Selection

Not every query deserves the same model. MangaAssist routes queries to the fastest model that meets quality requirements.

Intent-to-Model Routing Matrix

Intent Category Example Query Model Target Latency Why This Model
Order status "Where's my order #12345?" Haiku < 500ms Template-based, low reasoning needed
Simple FAQ "Do you ship to Osaka?" Haiku < 300ms Factual, single-hop lookup
Greeting / Chitchat "Hello!" Haiku < 200ms Minimal generation
Manga recommendation "Suggest something like One Piece" Sonnet < 2.5s Multi-factor reasoning, quality matters
Plot comparison "How does Naruto compare to Bleach?" Sonnet < 3.0s Nuanced analysis, longer output
Complex product Q&A "What special editions of AoT exist?" Sonnet < 2.0s RAG + synthesis of multiple documents
Complaint handling "I received a damaged volume" Sonnet < 2.0s Empathy + policy lookup + resolution

Model Selection Decision Flow

flowchart TD
    START[Incoming Query] --> CLASSIFY[Intent Classifier<br/>Haiku - 50ms]
    CLASSIFY --> CHECK_COMPLEXITY{Complexity<br/>Score}

    CHECK_COMPLEXITY -->|Score < 0.3<br/>Simple| HAIKU_PATH[Route to Haiku]
    CHECK_COMPLEXITY -->|Score 0.3-0.6<br/>Medium| CHECK_LATENCY{Latency<br/>Budget?}
    CHECK_COMPLEXITY -->|Score > 0.6<br/>Complex| SONNET_PATH[Route to Sonnet]

    CHECK_LATENCY -->|Budget < 1s| HAIKU_PATH
    CHECK_LATENCY -->|Budget >= 1s| SONNET_PATH

    HAIKU_PATH --> HAIKU[Claude 3 Haiku<br/>~150ms generation]
    SONNET_PATH --> SONNET[Claude 3 Sonnet<br/>~800ms generation]

    HAIKU --> QUALITY_CHECK{Quality<br/>Gate}
    QUALITY_CHECK -->|Pass| RESPOND[Stream Response]
    QUALITY_CHECK -->|Fail: Low confidence| SONNET

    SONNET --> RESPOND

    style HAIKU fill:#2ecc71,color:#000
    style SONNET fill:#3498db,color:#fff
    style CLASSIFY fill:#e74c3c,color:#fff

Latency-Cost Tradeoff Framework

The optimization function balances three dimensions:

Score = (Quality_weight x Quality) x (Speed_weight x 1/Latency) / Cost
Intent Quality Weight Speed Weight Optimal Model Score
Order status 0.6 0.9 Haiku 8.4
Manga recommendation 0.9 0.5 Sonnet 6.2
Complaint handling 0.95 0.6 Sonnet 5.8
FAQ 0.5 0.85 Haiku 9.1
Greeting 0.3 0.95 Haiku 11.2

Parallel Request Orchestration

MangaAssist fires independent data fetches concurrently instead of sequentially. This collapses the waterfall.

Sequential vs Parallel Comparison

gantt
    title Sequential Execution (Before) — Total: 1850ms
    dateFormat X
    axisFormat %L ms

    section Sequential
    RAG Retrieval (OpenSearch)    :0, 650
    DynamoDB Session Load         :650, 850
    User Profile Fetch            :850, 1000
    ElastiCache Check             :1000, 1050
    Bedrock Invocation            :1050, 1850
gantt
    title Parallel Execution (After) — Total: 1450ms
    dateFormat X
    axisFormat %L ms

    section Parallel Fan-Out
    RAG Retrieval (OpenSearch)    :0, 650
    DynamoDB Session Load         :0, 200
    User Profile Fetch            :0, 150
    ElastiCache Check             :0, 50

    section Sequential (depends on fan-out)
    Bedrock Invocation            :650, 1450

Result: Parallel fan-out saves 400ms (from 1850ms to 1450ms) by executing independent data fetches concurrently. The critical path is dominated by RAG retrieval (650ms) + Bedrock invocation (800ms).

ParallelOrchestrator Implementation

import asyncio
import time
from dataclasses import dataclass, field
from typing import Any, Optional

import aioboto3
from aws_lambda_powertools import Logger, Metrics, Tracer

logger = Logger(service="mangaassist-orchestrator")
tracer = Tracer(service="mangaassist-orchestrator")
metrics = Metrics(namespace="MangaAssist/Orchestrator")


@dataclass
class ParallelResult:
    """Container for parallel execution results with timing metadata."""
    rag_context: Optional[list[dict]] = None
    session_history: Optional[list[dict]] = None
    user_profile: Optional[dict] = None
    cached_response: Optional[str] = None
    timings: dict[str, float] = field(default_factory=dict)
    errors: dict[str, str] = field(default_factory=dict)


class ParallelOrchestrator:
    """
    Orchestrates concurrent data fetches for MangaAssist queries.

    Fires RAG retrieval, session history load, user profile fetch,
    and cache check concurrently using asyncio.gather. Each task
    has an independent timeout to prevent one slow call from
    blocking the entire pipeline.
    """

    def __init__(
        self,
        opensearch_client,
        dynamodb_resource,
        redis_client,
        rag_timeout: float = 2.0,
        session_timeout: float = 1.0,
        profile_timeout: float = 1.0,
        cache_timeout: float = 0.5,
    ):
        self.opensearch = opensearch_client
        self.dynamodb = dynamodb_resource
        self.redis = redis_client
        self.rag_timeout = rag_timeout
        self.session_timeout = session_timeout
        self.profile_timeout = profile_timeout
        self.cache_timeout = cache_timeout

    @tracer.capture_method
    async def execute_parallel(
        self, query: str, user_id: str, session_id: str
    ) -> ParallelResult:
        """
        Execute all data fetches concurrently with individual timeouts.

        Returns partial results if some fetches fail — the LLM can
        still generate a response with degraded context rather than
        failing entirely.
        """
        result = ParallelResult()
        start = time.monotonic()

        tasks = {
            "rag": self._fetch_rag_context(query),
            "session": self._fetch_session_history(session_id),
            "profile": self._fetch_user_profile(user_id),
            "cache": self._check_cache(query, user_id),
        }

        timeouts = {
            "rag": self.rag_timeout,
            "session": self.session_timeout,
            "profile": self.profile_timeout,
            "cache": self.cache_timeout,
        }

        # Execute all tasks concurrently
        gathered = await asyncio.gather(
            *[
                self._with_timeout(name, coro, timeouts[name])
                for name, coro in tasks.items()
            ],
            return_exceptions=True,
        )

        # Unpack results with error handling
        for (name, _), outcome in zip(tasks.items(), gathered):
            elapsed = time.monotonic() - start
            if isinstance(outcome, Exception):
                result.errors[name] = str(outcome)
                result.timings[name] = elapsed
                logger.warning(f"Parallel task {name} failed", error=str(outcome))
                metrics.add_metric(
                    name=f"parallel_{name}_error", unit="Count", value=1
                )
            else:
                task_result, task_time = outcome
                result.timings[name] = task_time
                setattr(result, self._result_field(name), task_result)
                metrics.add_metric(
                    name=f"parallel_{name}_latency_ms",
                    unit="Milliseconds",
                    value=task_time * 1000,
                )

        total_time = time.monotonic() - start
        result.timings["total_parallel"] = total_time
        metrics.add_metric(
            name="parallel_total_latency_ms",
            unit="Milliseconds",
            value=total_time * 1000,
        )

        logger.info("Parallel execution complete", timings=result.timings)
        return result

    async def _with_timeout(
        self, name: str, coro, timeout: float
    ) -> tuple[Any, float]:
        """Wrap a coroutine with a per-task timeout."""
        start = time.monotonic()
        try:
            result = await asyncio.wait_for(coro, timeout=timeout)
            elapsed = time.monotonic() - start
            return result, elapsed
        except asyncio.TimeoutError:
            raise TimeoutError(
                f"Task '{name}' exceeded {timeout}s timeout"
            )

    @tracer.capture_method
    async def _fetch_rag_context(self, query: str) -> list[dict]:
        """Retrieve relevant manga documents from OpenSearch vector store."""
        embedding = await self._generate_embedding(query)
        response = await self.opensearch.search(
            index="manga-products",
            body={
                "size": 5,
                "query": {
                    "knn": {
                        "embedding": {
                            "vector": embedding,
                            "k": 5,
                        }
                    }
                },
            },
        )
        return [hit["_source"] for hit in response["hits"]["hits"]]

    @tracer.capture_method
    async def _fetch_session_history(self, session_id: str) -> list[dict]:
        """Load conversation history from DynamoDB."""
        table = self.dynamodb.Table("manga-sessions")
        response = await table.query(
            KeyConditionExpression="session_id = :sid",
            ExpressionAttributeValues={":sid": session_id},
            ScanIndexForward=False,
            Limit=10,
        )
        return response.get("Items", [])

    @tracer.capture_method
    async def _fetch_user_profile(self, user_id: str) -> dict:
        """Fetch user preferences and purchase history."""
        table = self.dynamodb.Table("manga-users")
        response = await table.get_item(Key={"user_id": user_id})
        return response.get("Item", {})

    @tracer.capture_method
    async def _check_cache(self, query: str, user_id: str) -> Optional[str]:
        """Check ElastiCache for a pre-computed or cached response."""
        cache_key = f"response:{user_id}:{hash(query)}"
        cached = await self.redis.get(cache_key)
        if cached:
            metrics.add_metric(name="cache_hit", unit="Count", value=1)
        else:
            metrics.add_metric(name="cache_miss", unit="Count", value=1)
        return cached

    async def _generate_embedding(self, text: str) -> list[float]:
        """Generate embedding via Bedrock Titan Embeddings."""
        # In production, use aioboto3 session for async Bedrock call
        session = aioboto3.Session()
        async with session.client("bedrock-runtime") as bedrock:
            response = await bedrock.invoke_model(
                modelId="amazon.titan-embed-text-v2:0",
                body='{"inputText": "' + text.replace('"', '\\"') + '"}',
            )
            import json
            body = json.loads(await response["body"].read())
            return body["embedding"]

    @staticmethod
    def _result_field(name: str) -> str:
        """Map task name to ParallelResult field."""
        return {
            "rag": "rag_context",
            "session": "session_history",
            "profile": "user_profile",
            "cache": "cached_response",
        }[name]

Response Streaming

Streaming delivers the first token to the user before the full response is generated. This dramatically improves perceived latency.

Streaming Architecture

sequenceDiagram
    participant Client as Browser Client
    participant APIGW as API Gateway<br/>WebSocket
    participant ECS as ECS Fargate<br/>Orchestrator
    participant Bedrock as Bedrock<br/>Claude 3

    Client->>APIGW: Send query via WebSocket
    APIGW->>ECS: Route to orchestrator

    Note over ECS: Parallel fan-out (RAG + Session + Profile)

    ECS->>Bedrock: invoke_model_with_response_stream()

    loop Each chunk (~50-100 tokens)
        Bedrock-->>ECS: StreamEvent: chunk
        ECS-->>APIGW: WebSocket frame: chunk
        APIGW-->>Client: Render incrementally
    end

    Bedrock-->>ECS: StreamEvent: end
    ECS-->>APIGW: WebSocket frame: [DONE]
    APIGW-->>Client: Finalize rendering

    Note over Client: User saw first token at ~400ms<br/>Full response at ~2.5s

StreamingResponseHandler Implementation

import asyncio
import json
import time
from typing import AsyncIterator

import boto3
from aws_lambda_powertools import Logger, Metrics, Tracer

logger = Logger(service="mangaassist-streaming")
tracer = Tracer(service="mangaassist-streaming")
metrics = Metrics(namespace="MangaAssist/Streaming")


class StreamingResponseHandler:
    """
    Manages Bedrock streaming responses and delivers chunks
    to the client via API Gateway WebSocket.

    Optimizations:
    - Pre-loaded system prompt (avoids per-request prompt assembly latency)
    - Warm connection pooling to Bedrock
    - Chunk batching to reduce WebSocket frame overhead
    - First-token latency tracking for SLA monitoring
    """

    def __init__(
        self,
        bedrock_client,
        apigw_management_client,
        connection_id: str,
        model_id: str = "anthropic.claude-3-sonnet-20240229-v1:0",
        chunk_batch_size: int = 3,
    ):
        self.bedrock = bedrock_client
        self.apigw = apigw_management_client
        self.connection_id = connection_id
        self.model_id = model_id
        self.chunk_batch_size = chunk_batch_size
        self._system_prompt = self._load_system_prompt()

    def _load_system_prompt(self) -> str:
        """Pre-load system prompt at initialization, not per-request."""
        return (
            "You are MangaAssist, a helpful assistant for a Japanese manga "
            "e-commerce store. You help customers find manga, track orders, "
            "and answer questions about products. Be concise and friendly. "
            "Always respond in the language the customer uses."
        )

    @tracer.capture_method
    async def stream_response(
        self,
        user_message: str,
        rag_context: list[dict],
        session_history: list[dict],
        user_profile: dict,
    ) -> dict:
        """
        Invoke Bedrock with streaming and deliver chunks via WebSocket.

        Returns metadata about the streaming session including
        first-token latency, total tokens, and total duration.
        """
        start_time = time.monotonic()
        first_token_time = None
        total_tokens = 0
        full_response = []

        # Build the prompt with RAG context
        messages = self._build_messages(
            user_message, rag_context, session_history, user_profile
        )

        # Send "thinking" indicator immediately
        await self._send_to_client({
            "type": "status",
            "message": "Looking up your manga...",
        })

        try:
            # Invoke Bedrock with streaming
            response = self.bedrock.invoke_model_with_response_stream(
                modelId=self.model_id,
                body=json.dumps({
                    "anthropic_version": "bedrock-2023-05-31",
                    "max_tokens": 1024,
                    "system": self._system_prompt,
                    "messages": messages,
                }),
            )

            # Process the stream
            chunk_buffer = []
            async for chunk_text in self._parse_stream(response):
                total_tokens += 1

                if first_token_time is None:
                    first_token_time = time.monotonic()
                    first_token_latency = (
                        first_token_time - start_time
                    ) * 1000
                    metrics.add_metric(
                        name="first_token_latency_ms",
                        unit="Milliseconds",
                        value=first_token_latency,
                    )
                    logger.info(
                        "First token delivered",
                        first_token_ms=first_token_latency,
                    )

                full_response.append(chunk_text)
                chunk_buffer.append(chunk_text)

                # Batch chunks to reduce WebSocket frame overhead
                if len(chunk_buffer) >= self.chunk_batch_size:
                    await self._send_to_client({
                        "type": "chunk",
                        "text": "".join(chunk_buffer),
                    })
                    chunk_buffer = []

            # Flush remaining buffer
            if chunk_buffer:
                await self._send_to_client({
                    "type": "chunk",
                    "text": "".join(chunk_buffer),
                })

            # Send completion signal
            await self._send_to_client({"type": "done"})

        except Exception as e:
            logger.exception("Streaming error", error=str(e))
            await self._send_to_client({
                "type": "error",
                "message": "Sorry, I encountered an issue. Please try again.",
            })
            raise

        total_duration = (time.monotonic() - start_time) * 1000
        metrics.add_metric(
            name="streaming_total_ms",
            unit="Milliseconds",
            value=total_duration,
        )
        metrics.add_metric(
            name="streaming_tokens",
            unit="Count",
            value=total_tokens,
        )

        return {
            "first_token_latency_ms": (
                (first_token_time - start_time) * 1000
                if first_token_time
                else None
            ),
            "total_duration_ms": total_duration,
            "total_tokens": total_tokens,
            "full_response": "".join(full_response),
        }

    async def _parse_stream(self, response) -> AsyncIterator[str]:
        """Parse Bedrock streaming response into text chunks."""
        stream = response.get("body")
        for event in stream:
            chunk = event.get("chunk")
            if chunk:
                payload = json.loads(chunk.get("bytes", b"{}"))
                if payload.get("type") == "content_block_delta":
                    delta = payload.get("delta", {})
                    if delta.get("type") == "text_delta":
                        yield delta.get("text", "")

    async def _send_to_client(self, data: dict) -> None:
        """Send a message to the client via API Gateway WebSocket."""
        try:
            self.apigw.post_to_connection(
                ConnectionId=self.connection_id,
                Data=json.dumps(data).encode("utf-8"),
            )
        except self.apigw.exceptions.GoneException:
            logger.warning(
                "WebSocket connection gone",
                connection_id=self.connection_id,
            )
            raise ConnectionError("Client disconnected")

    def _build_messages(
        self,
        user_message: str,
        rag_context: list[dict],
        session_history: list[dict],
        user_profile: dict,
    ) -> list[dict]:
        """Assemble the message array with RAG context and history."""
        messages = []

        # Add session history (last 5 turns)
        for turn in session_history[-5:]:
            messages.append({
                "role": turn["role"],
                "content": turn["content"],
            })

        # Build current user message with RAG context
        context_block = "\n\n".join(
            f"[Product: {doc.get('title', 'Unknown')}]\n{doc.get('description', '')}"
            for doc in rag_context[:3]
        )

        augmented_message = (
            f"<context>\n{context_block}\n</context>\n\n"
            f"<user_preferences>\n"
            f"Favorite genres: {user_profile.get('favorite_genres', 'unknown')}\n"
            f"Language: {user_profile.get('language', 'ja')}\n"
            f"</user_preferences>\n\n"
            f"Customer question: {user_message}"
        )

        messages.append({"role": "user", "content": augmented_message})
        return messages

Performance Benchmarking Framework

Latency Targets per Intent

Intent p50 Target p95 Target p99 Target Model Parallelism Strategy
Greeting 150ms 250ms 400ms Haiku None (direct generation)
Simple FAQ 200ms 400ms 600ms Haiku Cache check only
Order status 300ms 500ms 800ms Haiku DynamoDB lookup parallel with cache
Product search 800ms 1500ms 2200ms Sonnet Full parallel fan-out (RAG + Session + Profile)
Manga recommendation 1200ms 2500ms 3000ms Sonnet Full parallel fan-out + streaming
Plot comparison 1500ms 2800ms 3500ms Sonnet Full parallel fan-out + streaming
Complaint handling 1000ms 2000ms 2800ms Sonnet Full parallel fan-out + streaming

Automated Regression Detection

flowchart TD
    START[Scheduled Benchmark<br/>Every 15 min] --> RUN[Execute Test Suite<br/>50 queries per intent]
    RUN --> COLLECT[Collect Latency<br/>Distributions]
    COLLECT --> COMPARE{p95 vs Baseline<br/>Mann-Whitney U Test}

    COMPARE -->|p > 0.05<br/>No regression| LOG[Log to CloudWatch<br/>Update Dashboard]
    COMPARE -->|p <= 0.05<br/>Regression detected| SEVERITY{Regression<br/>Severity}

    SEVERITY -->|< 10% increase| WARN[CloudWatch Alarm<br/>SEV-3 Warning]
    SEVERITY -->|10-25% increase| ALERT[PagerDuty Alert<br/>SEV-2]
    SEVERITY -->|> 25% increase| CRITICAL[PagerDuty Page<br/>SEV-1 + Auto-Rollback]

    WARN --> DASHBOARD[Update Grafana<br/>Dashboard]
    ALERT --> DASHBOARD
    CRITICAL --> ROLLBACK[Auto-Rollback<br/>Last Known Good Config] --> DASHBOARD

    style CRITICAL fill:#e74c3c,color:#fff
    style ALERT fill:#f39c12,color:#000
    style WARN fill:#f1c40f,color:#000
    style LOG fill:#2ecc71,color:#000

Benchmarking Metrics Collection

import asyncio
import statistics
import time
from dataclasses import dataclass, field

import boto3
from scipy import stats


@dataclass
class BenchmarkResult:
    """Results from a single benchmark run."""
    intent: str
    latencies_ms: list[float] = field(default_factory=list)
    first_token_latencies_ms: list[float] = field(default_factory=list)
    errors: int = 0
    total_requests: int = 0

    @property
    def p50(self) -> float:
        return self._percentile(50)

    @property
    def p95(self) -> float:
        return self._percentile(95)

    @property
    def p99(self) -> float:
        return self._percentile(99)

    @property
    def first_token_p50(self) -> float:
        return self._percentile(50, self.first_token_latencies_ms)

    @property
    def first_token_p95(self) -> float:
        return self._percentile(95, self.first_token_latencies_ms)

    def _percentile(
        self, pct: int, data: list[float] | None = None
    ) -> float:
        d = data if data is not None else self.latencies_ms
        if not d:
            return 0.0
        sorted_d = sorted(d)
        idx = int(len(sorted_d) * pct / 100)
        return sorted_d[min(idx, len(sorted_d) - 1)]


class LatencyBenchmarkRunner:
    """
    Automated latency benchmarking for MangaAssist.

    Runs test queries per intent, collects latency distributions,
    and performs statistical regression detection against a baseline.
    """

    BASELINE_TARGETS = {
        "greeting": {"p50": 150, "p95": 250, "p99": 400},
        "faq": {"p50": 200, "p95": 400, "p99": 600},
        "order_status": {"p50": 300, "p95": 500, "p99": 800},
        "product_search": {"p50": 800, "p95": 1500, "p99": 2200},
        "recommendation": {"p50": 1200, "p95": 2500, "p99": 3000},
        "comparison": {"p50": 1500, "p95": 2800, "p99": 3500},
        "complaint": {"p50": 1000, "p95": 2000, "p99": 2800},
    }

    TEST_QUERIES = {
        "greeting": ["Hello!", "Hi there", "Good evening"],
        "faq": [
            "Do you ship internationally?",
            "What's your return policy?",
            "Do you accept PayPal?",
        ],
        "order_status": [
            "Where is order #MNG-78901?",
            "Track my latest order",
            "When will my package arrive?",
        ],
        "product_search": [
            "Show me shonen manga under 1000 yen",
            "New releases this week",
            "Manga by Eiichiro Oda",
        ],
        "recommendation": [
            "Suggest manga like One Piece",
            "Best manga for beginners",
            "What's trending in seinen?",
        ],
        "comparison": [
            "Compare Naruto and Bleach",
            "Dragon Ball vs One Piece",
            "How is Jujutsu Kaisen different from Demon Slayer?",
        ],
        "complaint": [
            "My order arrived damaged",
            "I received the wrong volume",
            "I want a refund for my last purchase",
        ],
    }

    def __init__(self, orchestrator, cloudwatch_client=None):
        self.orchestrator = orchestrator
        self.cloudwatch = cloudwatch_client or boto3.client("cloudwatch")

    async def run_full_benchmark(self) -> dict[str, BenchmarkResult]:
        """Run benchmark across all intents and detect regressions."""
        results = {}

        for intent, queries in self.TEST_QUERIES.items():
            result = BenchmarkResult(
                intent=intent, total_requests=len(queries)
            )

            for query in queries:
                start = time.monotonic()
                try:
                    response = await self.orchestrator.handle_query(query)
                    elapsed_ms = (time.monotonic() - start) * 1000
                    result.latencies_ms.append(elapsed_ms)

                    if "first_token_latency_ms" in response:
                        result.first_token_latencies_ms.append(
                            response["first_token_latency_ms"]
                        )
                except Exception:
                    result.errors += 1

            results[intent] = result
            self._publish_metrics(result)
            self._check_regression(intent, result)

        return results

    def _check_regression(
        self, intent: str, result: BenchmarkResult
    ) -> None:
        """
        Detect latency regression using Mann-Whitney U test
        against baseline targets.
        """
        baseline = self.BASELINE_TARGETS.get(intent, {})
        if not baseline or not result.latencies_ms:
            return

        # Compare p95 against target
        if result.p95 > baseline["p95"] * 1.25:
            severity = "CRITICAL"
        elif result.p95 > baseline["p95"] * 1.10:
            severity = "WARNING"
        else:
            severity = "OK"

        self.cloudwatch.put_metric_data(
            Namespace="MangaAssist/Benchmark",
            MetricData=[
                {
                    "MetricName": f"regression_{intent}",
                    "Value": 1 if severity != "OK" else 0,
                    "Unit": "Count",
                    "Dimensions": [
                        {"Name": "Severity", "Value": severity},
                    ],
                }
            ],
        )

    def _publish_metrics(self, result: BenchmarkResult) -> None:
        """Publish benchmark metrics to CloudWatch."""
        self.cloudwatch.put_metric_data(
            Namespace="MangaAssist/Benchmark",
            MetricData=[
                {
                    "MetricName": f"{result.intent}_p50",
                    "Value": result.p50,
                    "Unit": "Milliseconds",
                },
                {
                    "MetricName": f"{result.intent}_p95",
                    "Value": result.p95,
                    "Unit": "Milliseconds",
                },
                {
                    "MetricName": f"{result.intent}_p99",
                    "Value": result.p99,
                    "Unit": "Milliseconds",
                },
                {
                    "MetricName": f"{result.intent}_first_token_p95",
                    "Value": result.first_token_p95,
                    "Unit": "Milliseconds",
                },
            ],
        )

End-to-End Architecture — Parallel Execution and Streaming Pipeline

graph TB
    subgraph "Client Layer"
        CLIENT[Browser / Mobile App]
    end

    subgraph "API Layer"
        APIGW[API Gateway<br/>WebSocket]
    end

    subgraph "Orchestration Layer — ECS Fargate"
        ROUTER[Intent Classifier<br/>+ Model Router]
        PARALLEL[ParallelOrchestrator<br/>asyncio.gather]
        STREAMER[StreamingResponseHandler<br/>Chunk Delivery]
    end

    subgraph "Parallel Fan-Out (concurrent)"
        direction LR
        CACHE_CHECK[ElastiCache Redis<br/>Cache Check<br/>~50ms]
        RAG[OpenSearch Serverless<br/>RAG Retrieval<br/>~650ms]
        SESSION[DynamoDB<br/>Session History<br/>~200ms]
        PROFILE[DynamoDB<br/>User Profile<br/>~150ms]
    end

    subgraph "FM Layer"
        HAIKU[Bedrock Claude 3 Haiku<br/>Simple Queries<br/>~150ms generation]
        SONNET[Bedrock Claude 3 Sonnet<br/>Complex Queries<br/>~800ms generation]
    end

    CLIENT <-->|WebSocket| APIGW
    APIGW --> ROUTER
    ROUTER -->|Simple intent| HAIKU
    ROUTER -->|Complex intent| PARALLEL
    PARALLEL --> CACHE_CHECK & RAG & SESSION & PROFILE
    PARALLEL -->|Context assembled| SONNET
    HAIKU -->|Stream| STREAMER
    SONNET -->|Stream| STREAMER
    STREAMER -->|Chunk frames| APIGW

    style CACHE_CHECK fill:#2ecc71,color:#000
    style RAG fill:#ff9900,color:#000
    style SESSION fill:#ff9900,color:#000
    style PROFILE fill:#ff9900,color:#000
    style HAIKU fill:#2ecc71,color:#000
    style SONNET fill:#3498db,color:#fff
    style STREAMER fill:#9b59b6,color:#fff

Summary — Five Responsive AI Techniques

# Technique MangaAssist Implementation Latency Impact Cost Impact
1 Pre-Computation Pre-generate trending manga responses, pre-compute new catalog embeddings, warm recommendation caches -1.1s avg for cache hits (~30% of queries) +$117/mo compute, -$8,100/mo avoided FM calls
2 Latency-Optimized Model Selection Haiku for order/FAQ/greeting (<500ms), Sonnet for recommendations/comparisons (acceptable latency) -600ms avg for simple intents routed to Haiku -40% FM cost via Haiku routing
3 Parallel Request Processing asyncio.gather for RAG + Session + Profile + Cache concurrently -400ms (waterfall elimination) Zero additional cost
4 Response Streaming Bedrock streaming API via WebSocket, first-token <400ms, chunk batching First token 400ms vs 2.5s without streaming Zero additional cost
5 Performance Benchmarking p50/p95/p99 targets per intent, automated regression detection, Mann-Whitney U test Prevents regressions before users notice ~$50/mo for benchmark compute

Key Exam Takeaways

  1. Pre-computation trades background compute cost for request-time latency — only cost-effective for predictable, high-frequency queries
  2. Model selection is the single largest lever: routing 60% of queries to Haiku saves both latency AND cost
  3. Parallel orchestration is free latency savings — always identify independent data fetches and run them concurrently
  4. Streaming reduces perceived latency even when total generation time is unchanged — first token matters more than last token
  5. Benchmarking must be automated, per-intent, and statistically rigorous — use p95/p99, not averages, and test for significance before alerting