LOCAL PREVIEW View on GitHub

AI-Assisted Development Patterns for MangaAssist

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.

Skill Mapping

Attribute Detail
Domain 2 — Implementation & Integration of GenAI Applications
Task 2.5 — Application Integration Patterns
Skill 2.5.4 — Developer Productivity
Focus Test fixtures for FM responses, mock Bedrock client, benchmark suite
MangaAssist Scope Deterministic testing of non-deterministic AI, performance benchmarks, development velocity

Mind Map

mindmap
  root((AI-Assisted<br/>Development Patterns))
    Test Fixtures
      FM Response Fixtures
        Golden Response Sets
        Edge Case Responses
        Japanese Content Fixtures
        Error Response Fixtures
      Fixture Management
        Version-Controlled Fixtures
        Auto-Generated from Prod
        Parameterized Templates
        Snapshot Testing
    Mock Bedrock Client
      Request Interception
        Model ID Routing
        Token Counting
        Latency Simulation
      Response Generation
        Deterministic Outputs
        Configurable Failures
        Streaming Simulation
        Cost Tracking
      Validation
        Schema Validation
        Token Limit Enforcement
        Content Safety Checks
    Benchmark Suite
      Latency Benchmarks
        Cold Start Timing
        Warm Path Timing
        End-to-End Chain
      Throughput Benchmarks
        Concurrent Requests
        Batch Processing
        Rate Limiting
      Cost Benchmarks
        Token Usage per Query
        Model Routing Savings
        Cache Hit Impact
      Regression Detection
        Statistical Comparison
        Trend Analysis
        Alert Thresholds

Test Fixture Architecture

graph TB
    subgraph FixtureStore["Fixture Store"]
        GR[Golden Responses<br/>Known-Good Outputs]
        EC[Edge Cases<br/>Unicode / Empty / Overflow]
        ER[Error Responses<br/>Throttle / Timeout / 500]
        JP[Japanese Fixtures<br/>Manga-Specific Content]
    end

    subgraph FixtureLoader["Fixture Loader"]
        FL[File Loader<br/>JSON / YAML]
        PL[Parameterized Loader<br/>Template + Variables]
        SL[Snapshot Loader<br/>Recorded Prod Responses]
    end

    subgraph TestTargets["Test Targets"]
        UT[Unit Tests<br/>Single Function]
        IT[Integration Tests<br/>Service Chain]
        CT[Contract Tests<br/>API Schema]
        PT[Property Tests<br/>Invariants]
    end

    GR --> FL
    EC --> FL
    ER --> FL
    JP --> PL
    FL --> UT
    PL --> IT
    SL --> CT
    FL --> PT

    style GR fill:#51cf66,color:#fff
    style EC fill:#ffd43b,color:#333
    style ER fill:#ff6b6b,color:#fff

FM Response Fixtures

"""
Test fixtures for Foundation Model responses in MangaAssist.
Provides deterministic, version-controlled response data for testing.
"""

import json
import hashlib
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Optional


@dataclass
class FMResponseFixture:
    """A captured or constructed FM response for testing."""
    fixture_id: str
    model_id: str
    prompt_hash: str
    response_text: str
    usage: dict[str, int]
    latency_ms: float
    stop_reason: str = "end_turn"
    metadata: dict[str, Any] = field(default_factory=dict)

    def to_bedrock_format(self) -> dict[str, Any]:
        """Convert to the format returned by Bedrock invoke_model."""
        return {
            "content": [{"type": "text", "text": self.response_text}],
            "usage": self.usage,
            "stop_reason": self.stop_reason,
            "model": self.model_id,
        }


class FixtureLibrary:
    """Manages a library of FM response fixtures for MangaAssist testing."""

    def __init__(self, fixture_dir: str = "tests/fixtures/fm_responses"):
        self.fixture_dir = Path(fixture_dir)
        self.fixture_dir.mkdir(parents=True, exist_ok=True)
        self._cache: dict[str, FMResponseFixture] = {}

    def _prompt_hash(self, prompt: str) -> str:
        """Generate a stable hash for a prompt string."""
        return hashlib.sha256(prompt.encode("utf-8")).hexdigest()[:16]

    # --- Golden response fixtures for common MangaAssist queries ---

    MANGA_RECOMMENDATION = FMResponseFixture(
        fixture_id="manga_recommendation_001",
        model_id="anthropic.claude-3-sonnet-20240229-v1:0",
        prompt_hash="a1b2c3d4e5f6g7h8",
        response_text=(
            "Based on your interest in action manga, I recommend these titles:\n\n"
            "1. **鬼滅の刃 (Demon Slayer)** by 吾峠呼世晴 — A tale of Tanjiro's quest "
            "to save his sister. 23 volumes, completed.\n\n"
            "2. **呪術廻戦 (Jujutsu Kaisen)** by 芥見下々 — Modern sorcery battles "
            "with compelling characters. Currently at volume 25.\n\n"
            "3. **チェンソーマン (Chainsaw Man)** by 藤本タツキ — Dark, fast-paced action "
            "with unique art style. Part 2 ongoing.\n\n"
            "All three are available in our store. Would you like pricing or "
            "availability information for any of these?"
        ),
        usage={"input_tokens": 245, "output_tokens": 187},
        latency_ms=1250.0,
    )

    PRODUCT_LOOKUP = FMResponseFixture(
        fixture_id="product_lookup_001",
        model_id="anthropic.claude-3-haiku-20240307-v1:0",
        prompt_hash="b2c3d4e5f6g7h8i9",
        response_text=(
            "Here are the details for 進撃の巨人 (Attack on Titan) Volume 1:\n\n"
            "- **Author**: 諫山創 (Isayama Hajime)\n"
            "- **Publisher**: 講談社 (Kodansha)\n"
            "- **Price**: ¥528 (tax included)\n"
            "- **ISBN**: 978-4-06-384234-6\n"
            "- **Status**: In stock\n"
            "- **Format**: Tankōbon (単行本)\n\n"
            "Would you like to add this to your cart?"
        ),
        usage={"input_tokens": 120, "output_tokens": 98},
        latency_ms=450.0,
    )

    ORDER_STATUS = FMResponseFixture(
        fixture_id="order_status_001",
        model_id="anthropic.claude-3-haiku-20240307-v1:0",
        prompt_hash="c3d4e5f6g7h8i9j0",
        response_text=(
            "Your order #MNG-2024-78432 is currently being processed:\n\n"
            "- **Status**: Shipped\n"
            "- **Items**: 3 manga volumes\n"
            "- **Tracking**: JP-1234567890\n"
            "- **Estimated delivery**: 2-3 business days\n\n"
            "You can track your package at the Japan Post website."
        ),
        usage={"input_tokens": 95, "output_tokens": 72},
        latency_ms=380.0,
    )

    JAPANESE_GREETING = FMResponseFixture(
        fixture_id="japanese_greeting_001",
        model_id="anthropic.claude-3-haiku-20240307-v1:0",
        prompt_hash="d4e5f6g7h8i9j0k1",
        response_text=(
            "こんにちは!マンガアシストへようこそ。"
            "どのようなマンガをお探しですか?"
            "ジャンル、作者名、または作品名でお探しいただけます。"
        ),
        usage={"input_tokens": 50, "output_tokens": 45},
        latency_ms=280.0,
    )

    # --- Error response fixtures ---

    THROTTLED_RESPONSE = FMResponseFixture(
        fixture_id="error_throttle_001",
        model_id="anthropic.claude-3-sonnet-20240229-v1:0",
        prompt_hash="error_throttle",
        response_text="",
        usage={"input_tokens": 0, "output_tokens": 0},
        latency_ms=0.0,
        stop_reason="error",
        metadata={"error_code": "ThrottlingException", "retryable": True},
    )

    TIMEOUT_RESPONSE = FMResponseFixture(
        fixture_id="error_timeout_001",
        model_id="anthropic.claude-3-sonnet-20240229-v1:0",
        prompt_hash="error_timeout",
        response_text="",
        usage={"input_tokens": 245, "output_tokens": 0},
        latency_ms=30000.0,
        stop_reason="error",
        metadata={"error_code": "ModelTimeoutException", "retryable": True},
    )

    CONTENT_FILTER_RESPONSE = FMResponseFixture(
        fixture_id="error_content_filter_001",
        model_id="anthropic.claude-3-sonnet-20240229-v1:0",
        prompt_hash="error_content_filter",
        response_text="I cannot help with that request.",
        usage={"input_tokens": 120, "output_tokens": 8},
        latency_ms=200.0,
        stop_reason="content_filtered",
        metadata={"filter_type": "safety", "category": "harmful_content"},
    )

    def save_fixture(self, fixture: FMResponseFixture) -> Path:
        """Persist a fixture to disk for version control."""
        filepath = self.fixture_dir / f"{fixture.fixture_id}.json"
        data = {
            "fixture_id": fixture.fixture_id,
            "model_id": fixture.model_id,
            "prompt_hash": fixture.prompt_hash,
            "response_text": fixture.response_text,
            "usage": fixture.usage,
            "latency_ms": fixture.latency_ms,
            "stop_reason": fixture.stop_reason,
            "metadata": fixture.metadata,
        }
        with open(filepath, "w", encoding="utf-8") as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        return filepath

    def load_fixture(self, fixture_id: str) -> FMResponseFixture | None:
        """Load a fixture from disk."""
        if fixture_id in self._cache:
            return self._cache[fixture_id]
        filepath = self.fixture_dir / f"{fixture_id}.json"
        if not filepath.exists():
            return None
        with open(filepath, encoding="utf-8") as f:
            data = json.load(f)
        fixture = FMResponseFixture(**data)
        self._cache[fixture_id] = fixture
        return fixture

    def list_fixtures(self) -> list[str]:
        """List all available fixture IDs."""
        return [
            p.stem for p in self.fixture_dir.glob("*.json")
        ]

Mock Bedrock Client

graph TB
    subgraph MockClient["Mock Bedrock Client"]
        RI[Request Interceptor]
        RR[Response Router]
        LS[Latency Simulator]
        TC[Token Counter]
        CT[Cost Tracker]
    end

    subgraph ResponseStrategies["Response Strategies"]
        FX[Fixture-Based<br/>Return saved responses]
        DT[Deterministic<br/>Hash-based selection]
        RN[Random<br/>Controlled randomness]
        ER[Error Injection<br/>Fault simulation]
    end

    subgraph Validation["Request Validation"]
        SV[Schema Validator<br/>Anthropic message format]
        TL[Token Limit Check<br/>Max tokens enforcement]
        ML[Model Validator<br/>Supported model IDs]
    end

    RI --> SV
    RI --> TL
    RI --> ML
    SV -->|Valid| RR
    RR --> FX
    RR --> DT
    RR --> RN
    RR --> ER
    FX --> LS
    DT --> LS
    LS --> TC
    TC --> CT

    style MockClient fill:#232f3e,color:#ff9900

Mock Bedrock Client Implementation

"""
Mock Bedrock client for deterministic testing of MangaAssist.
Simulates invoke_model, invoke_model_with_response_stream, and error conditions.
"""

import asyncio
import hashlib
import json
import logging
import random
import time
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Any, AsyncIterator, Optional

logger = logging.getLogger(__name__)


@dataclass
class MockModelConfig:
    """Configuration for a mocked Bedrock model."""
    model_id: str
    avg_latency_ms: float
    latency_jitter_ms: float = 50.0
    tokens_per_second: float = 80.0
    max_tokens: int = 4096
    error_rate: float = 0.0
    throttle_rate: float = 0.0


MOCK_MODELS = {
    "anthropic.claude-3-sonnet-20240229-v1:0": MockModelConfig(
        model_id="anthropic.claude-3-sonnet-20240229-v1:0",
        avg_latency_ms=1200.0,
        latency_jitter_ms=300.0,
        tokens_per_second=60.0,
        max_tokens=4096,
    ),
    "anthropic.claude-3-haiku-20240307-v1:0": MockModelConfig(
        model_id="anthropic.claude-3-haiku-20240307-v1:0",
        avg_latency_ms=350.0,
        latency_jitter_ms=100.0,
        tokens_per_second=120.0,
        max_tokens=4096,
    ),
}


class MockBedrockClient:
    """
    A mock AWS Bedrock runtime client for testing MangaAssist.

    Provides deterministic responses, latency simulation, error injection,
    and usage tracking — all without calling the real Bedrock service.
    """

    def __init__(
        self,
        fixtures: dict[str, Any] | None = None,
        default_model: str = "anthropic.claude-3-haiku-20240307-v1:0",
        seed: int = 42,
    ):
        self.fixtures = fixtures or {}
        self.default_model = default_model
        self.rng = random.Random(seed)
        self.call_log: list[dict[str, Any]] = []
        self.token_usage: defaultdict[str, dict[str, int]] = defaultdict(
            lambda: {"input": 0, "output": 0}
        )
        self.total_cost_usd: float = 0.0
        self._forced_errors: list[dict[str, Any]] = []
        self._response_overrides: dict[str, str] = {}

    def force_error(
        self,
        error_code: str,
        error_message: str = "Simulated error",
        count: int = 1,
    ) -> None:
        """Queue a forced error for the next N invocations."""
        for _ in range(count):
            self._forced_errors.append({
                "code": error_code,
                "message": error_message,
            })

    def set_response_override(self, prompt_hash: str, response: str) -> None:
        """Set a specific response for a given prompt hash."""
        self._response_overrides[prompt_hash] = response

    def _hash_prompt(self, messages: list[dict]) -> str:
        """Create a deterministic hash of the conversation messages."""
        content = json.dumps(messages, sort_keys=True)
        return hashlib.sha256(content.encode()).hexdigest()[:16]

    def _estimate_tokens(self, text: str) -> int:
        """Rough token estimation: ~4 characters per token for mixed JP/EN."""
        return max(1, len(text) // 4)

    def _compute_cost(
        self, model_id: str, input_tokens: int, output_tokens: int
    ) -> float:
        """Compute cost in USD based on MangaAssist model pricing."""
        pricing = {
            "anthropic.claude-3-sonnet-20240229-v1:0": (3.0, 15.0),
            "anthropic.claude-3-haiku-20240307-v1:0": (0.25, 1.25),
        }
        input_rate, output_rate = pricing.get(model_id, (3.0, 15.0))
        return (
            (input_tokens / 1_000_000) * input_rate
            + (output_tokens / 1_000_000) * output_rate
        )

    def _generate_response(
        self, model_id: str, messages: list[dict], max_tokens: int
    ) -> str:
        """Generate a deterministic mock response."""
        prompt_hash = self._hash_prompt(messages)

        # Check for overrides
        if prompt_hash in self._response_overrides:
            return self._response_overrides[prompt_hash]

        # Check for fixture matches
        if prompt_hash in self.fixtures:
            return self.fixtures[prompt_hash]

        # Default: echo-based deterministic response
        last_message = messages[-1].get("content", "") if messages else ""
        if isinstance(last_message, list):
            last_message = " ".join(
                block.get("text", "") for block in last_message
                if isinstance(block, dict)
            )

        return (
            f"[Mock {model_id.split('.')[-1].split('-')[0]}] "
            f"Response to: {last_message[:100]}..."
        )

    async def invoke(
        self,
        model_id: str | None = None,
        prompt: str = "",
        max_tokens: int = 1024,
        messages: list[dict] | None = None,
        system: str | None = None,
        temperature: float = 0.7,
    ) -> dict[str, Any]:
        """
        Mock invoke_model that simulates Bedrock behavior.

        Returns a dict with 'text', 'usage', 'latency_ms', and 'cost_usd'.
        """
        model_id = model_id or self.default_model
        config = MOCK_MODELS.get(model_id, MOCK_MODELS[self.default_model])

        # Check for forced errors
        if self._forced_errors:
            error = self._forced_errors.pop(0)
            raise Exception(
                f"[{error['code']}] {error['message']}"
            )

        # Check random error injection
        if self.rng.random() < config.error_rate:
            raise Exception("[ModelError] Random error injection triggered")

        if self.rng.random() < config.throttle_rate:
            raise Exception("[ThrottlingException] Rate limit exceeded")

        # Build messages if only prompt provided
        if messages is None:
            messages = [{"role": "user", "content": prompt}]

        # Generate response
        response_text = self._generate_response(model_id, messages, max_tokens)

        # Compute tokens
        input_text = json.dumps(messages)
        if system:
            input_text += system
        input_tokens = self._estimate_tokens(input_text)
        output_tokens = self._estimate_tokens(response_text)

        # Simulate latency
        latency = config.avg_latency_ms + self.rng.gauss(0, config.latency_jitter_ms)
        latency = max(50.0, latency)
        await asyncio.sleep(latency / 1000.0)

        # Track usage
        self.token_usage[model_id]["input"] += input_tokens
        self.token_usage[model_id]["output"] += output_tokens
        cost = self._compute_cost(model_id, input_tokens, output_tokens)
        self.total_cost_usd += cost

        # Log the call
        call_record = {
            "model_id": model_id,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "latency_ms": round(latency, 2),
            "cost_usd": round(cost, 8),
            "timestamp": time.time(),
        }
        self.call_log.append(call_record)

        return {
            "text": response_text,
            "usage": {
                "input_tokens": input_tokens,
                "output_tokens": output_tokens,
            },
            "latency_ms": round(latency, 2),
            "cost_usd": round(cost, 8),
            "model_id": model_id,
            "stop_reason": "end_turn",
        }

    async def invoke_stream(
        self,
        model_id: str | None = None,
        prompt: str = "",
        max_tokens: int = 1024,
        messages: list[dict] | None = None,
    ) -> AsyncIterator[dict[str, Any]]:
        """Mock streaming response that yields chunks."""
        model_id = model_id or self.default_model
        config = MOCK_MODELS.get(model_id, MOCK_MODELS[self.default_model])

        if messages is None:
            messages = [{"role": "user", "content": prompt}]

        response_text = self._generate_response(model_id, messages, max_tokens)

        # Simulate streaming: yield word-by-word
        words = response_text.split()
        chunk_size = max(1, len(words) // 10)

        for i in range(0, len(words), chunk_size):
            chunk = " ".join(words[i : i + chunk_size])
            await asyncio.sleep(50 / 1000.0)  # 50ms between chunks
            yield {
                "type": "content_block_delta",
                "delta": {"type": "text_delta", "text": chunk + " "},
            }

        yield {
            "type": "message_stop",
            "amazon-bedrock-invocationMetrics": {
                "inputTokenCount": self._estimate_tokens(json.dumps(messages)),
                "outputTokenCount": self._estimate_tokens(response_text),
            },
        }

    def get_usage_report(self) -> dict[str, Any]:
        """Get a summary of all mock invocations."""
        return {
            "total_calls": len(self.call_log),
            "total_cost_usd": round(self.total_cost_usd, 6),
            "token_usage": dict(self.token_usage),
            "avg_latency_ms": (
                round(
                    sum(c["latency_ms"] for c in self.call_log) / len(self.call_log), 2
                )
                if self.call_log
                else 0
            ),
            "calls_by_model": {
                model: sum(1 for c in self.call_log if c["model_id"] == model)
                for model in set(c["model_id"] for c in self.call_log)
            },
        }

    def reset(self) -> None:
        """Reset all tracking state."""
        self.call_log.clear()
        self.token_usage.clear()
        self.total_cost_usd = 0.0
        self._forced_errors.clear()
        self._response_overrides.clear()

Benchmark Suite

graph TB
    subgraph BenchmarkTypes["Benchmark Categories"]
        LB[Latency Benchmarks<br/>Response timing]
        TB[Throughput Benchmarks<br/>Concurrent load]
        CB[Cost Benchmarks<br/>Token economics]
        RB[Regression Benchmarks<br/>Historical comparison]
    end

    subgraph Execution["Benchmark Execution"]
        WU[Warm-Up Phase<br/>10 requests]
        ME[Measurement Phase<br/>100+ requests]
        CD[Cool-Down Phase<br/>Flush metrics]
    end

    subgraph Analysis["Statistical Analysis"]
        DS[Descriptive Stats<br/>Mean / Median / StdDev]
        PC[Percentiles<br/>P50 / P95 / P99]
        CI[Confidence Intervals<br/>95% CI]
        TR[Trend Detection<br/>Regression alerts]
    end

    subgraph Output["Benchmark Output"]
        JR[JSON Report]
        HR[HTML Dashboard]
        GA[GitHub Actions Annotation]
        SL[Slack Notification]
    end

    LB --> WU
    TB --> WU
    CB --> WU
    RB --> WU
    WU --> ME
    ME --> CD
    CD --> DS
    CD --> PC
    DS --> CI
    PC --> TR
    CI --> JR
    TR --> GA
    JR --> HR
    GA --> SL

    style ME fill:#339af0,color:#fff
    style TR fill:#ff6b6b,color:#fff

Benchmark Suite Implementation

"""
Benchmark suite for MangaAssist chatbot performance testing.
Measures latency, throughput, cost, and detects regressions.
"""

import asyncio
import json
import logging
import statistics
import time
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Coroutine, Optional

logger = logging.getLogger(__name__)


class BenchmarkStatus(Enum):
    PASS = "pass"
    REGRESSION = "regression"
    IMPROVEMENT = "improvement"
    BASELINE = "baseline"


@dataclass
class BenchmarkResult:
    """Result of a single benchmark run."""
    name: str
    samples: int
    mean_ms: float
    median_ms: float
    stddev_ms: float
    p50_ms: float
    p95_ms: float
    p99_ms: float
    min_ms: float
    max_ms: float
    throughput_rps: float = 0.0
    total_tokens: int = 0
    total_cost_usd: float = 0.0
    status: BenchmarkStatus = BenchmarkStatus.BASELINE
    regression_pct: float = 0.0


@dataclass
class BenchmarkConfig:
    """Configuration for a benchmark run."""
    name: str
    warmup_iterations: int = 10
    measurement_iterations: int = 100
    concurrent_workers: int = 1
    timeout_seconds: float = 30.0
    model_id: str = "anthropic.claude-3-haiku-20240307-v1:0"
    regression_threshold_pct: float = 10.0


class LatencyBenchmark:
    """Measures response latency for MangaAssist operations."""

    def __init__(self, bedrock_client: Any, config: BenchmarkConfig):
        self.client = bedrock_client
        self.config = config
        self.latencies: list[float] = []

    async def _single_request(self, prompt: str) -> dict[str, Any]:
        """Execute a single benchmarked request."""
        start = time.monotonic()
        result = await self.client.invoke(
            model_id=self.config.model_id,
            prompt=prompt,
            max_tokens=512,
        )
        elapsed_ms = (time.monotonic() - start) * 1000
        return {"latency_ms": elapsed_ms, **result}

    async def run(self, prompts: list[str]) -> BenchmarkResult:
        """Run the latency benchmark with warmup and measurement phases."""
        # Warm-up phase
        logger.info("Warmup: %d iterations", self.config.warmup_iterations)
        for i in range(self.config.warmup_iterations):
            prompt = prompts[i % len(prompts)]
            await self._single_request(prompt)

        # Measurement phase
        logger.info("Measuring: %d iterations", self.config.measurement_iterations)
        self.latencies.clear()
        total_tokens = 0
        total_cost = 0.0

        for i in range(self.config.measurement_iterations):
            prompt = prompts[i % len(prompts)]
            result = await self._single_request(prompt)
            self.latencies.append(result["latency_ms"])
            usage = result.get("usage", {})
            total_tokens += usage.get("input_tokens", 0) + usage.get("output_tokens", 0)
            total_cost += result.get("cost_usd", 0)

        sorted_latencies = sorted(self.latencies)
        total_time_s = sum(self.latencies) / 1000.0

        return BenchmarkResult(
            name=self.config.name,
            samples=len(self.latencies),
            mean_ms=round(statistics.mean(self.latencies), 2),
            median_ms=round(statistics.median(self.latencies), 2),
            stddev_ms=round(statistics.stdev(self.latencies), 2) if len(self.latencies) > 1 else 0,
            p50_ms=round(sorted_latencies[len(sorted_latencies) // 2], 2),
            p95_ms=round(sorted_latencies[int(len(sorted_latencies) * 0.95)], 2),
            p99_ms=round(sorted_latencies[int(len(sorted_latencies) * 0.99)], 2),
            min_ms=round(min(self.latencies), 2),
            max_ms=round(max(self.latencies), 2),
            throughput_rps=round(len(self.latencies) / total_time_s, 2) if total_time_s > 0 else 0,
            total_tokens=total_tokens,
            total_cost_usd=round(total_cost, 6),
        )


class ThroughputBenchmark:
    """Measures concurrent request throughput for MangaAssist."""

    def __init__(self, bedrock_client: Any, config: BenchmarkConfig):
        self.client = bedrock_client
        self.config = config

    async def _worker(
        self, worker_id: int, prompts: list[str], results: list[float]
    ) -> None:
        """A single concurrent worker executing requests."""
        iterations = self.config.measurement_iterations // self.config.concurrent_workers
        for i in range(iterations):
            prompt = prompts[(worker_id * iterations + i) % len(prompts)]
            start = time.monotonic()
            await self.client.invoke(
                model_id=self.config.model_id,
                prompt=prompt,
                max_tokens=512,
            )
            results.append((time.monotonic() - start) * 1000)

    async def run(self, prompts: list[str]) -> BenchmarkResult:
        """Run throughput benchmark with concurrent workers."""
        all_latencies: list[float] = []

        wall_start = time.monotonic()
        tasks = [
            self._worker(i, prompts, all_latencies)
            for i in range(self.config.concurrent_workers)
        ]
        await asyncio.gather(*tasks)
        wall_time_s = time.monotonic() - wall_start

        sorted_lat = sorted(all_latencies)
        return BenchmarkResult(
            name=f"{self.config.name}_concurrent_{self.config.concurrent_workers}",
            samples=len(all_latencies),
            mean_ms=round(statistics.mean(all_latencies), 2),
            median_ms=round(statistics.median(all_latencies), 2),
            stddev_ms=round(statistics.stdev(all_latencies), 2) if len(all_latencies) > 1 else 0,
            p50_ms=round(sorted_lat[len(sorted_lat) // 2], 2),
            p95_ms=round(sorted_lat[int(len(sorted_lat) * 0.95)], 2),
            p99_ms=round(sorted_lat[int(len(sorted_lat) * 0.99)], 2),
            min_ms=round(min(all_latencies), 2),
            max_ms=round(max(all_latencies), 2),
            throughput_rps=round(len(all_latencies) / wall_time_s, 2),
        )


class RegressionDetector:
    """Detects performance regressions by comparing against historical baselines."""

    def __init__(
        self,
        baseline_path: str = "benchmarks/baselines",
        threshold_pct: float = 10.0,
    ):
        self.baseline_path = Path(baseline_path)
        self.baseline_path.mkdir(parents=True, exist_ok=True)
        self.threshold_pct = threshold_pct

    def save_baseline(self, result: BenchmarkResult) -> None:
        """Save a benchmark result as the new baseline."""
        filepath = self.baseline_path / f"{result.name}.json"
        data = {
            "name": result.name,
            "p95_ms": result.p95_ms,
            "mean_ms": result.mean_ms,
            "throughput_rps": result.throughput_rps,
            "timestamp": time.time(),
        }
        with open(filepath, "w") as f:
            json.dump(data, f, indent=2)

    def load_baseline(self, name: str) -> dict[str, Any] | None:
        """Load a historical baseline for comparison."""
        filepath = self.baseline_path / f"{name}.json"
        if not filepath.exists():
            return None
        with open(filepath) as f:
            return json.load(f)

    def compare(self, result: BenchmarkResult) -> BenchmarkResult:
        """Compare a result against its baseline and set status."""
        baseline = self.load_baseline(result.name)
        if not baseline:
            result.status = BenchmarkStatus.BASELINE
            return result

        baseline_p95 = baseline["p95_ms"]
        change_pct = ((result.p95_ms - baseline_p95) / baseline_p95) * 100

        result.regression_pct = round(change_pct, 2)

        if change_pct > self.threshold_pct:
            result.status = BenchmarkStatus.REGRESSION
            logger.warning(
                "REGRESSION: %s P95 increased by %.1f%% (%s -> %s ms)",
                result.name, change_pct, baseline_p95, result.p95_ms,
            )
        elif change_pct < -self.threshold_pct:
            result.status = BenchmarkStatus.IMPROVEMENT
            logger.info(
                "IMPROVEMENT: %s P95 decreased by %.1f%%",
                result.name, abs(change_pct),
            )
        else:
            result.status = BenchmarkStatus.PASS

        return result


class BenchmarkSuite:
    """Orchestrates the full benchmark suite for MangaAssist."""

    # Standard prompts for benchmarking MangaAssist
    STANDARD_PROMPTS = [
        "おすすめのアクションマンガを教えてください",
        "進撃の巨人の1巻はありますか?",
        "注文番号MNG-2024-78432の配送状況を教えてください",
        "What are the top selling manga this week?",
        "鬼滅の刃の全巻セットの価格は?",
        "Do you have any manga by 手塚治虫?",
        "新刊コミックの入荷予定を教えてください",
        "Can you recommend manga similar to One Piece?",
        "漫画のギフトラッピングはできますか?",
        "What genres of manga do you carry?",
    ]

    def __init__(self, bedrock_client: Any):
        self.client = bedrock_client
        self.detector = RegressionDetector()
        self.results: list[BenchmarkResult] = []

    async def run_all(self) -> dict[str, Any]:
        """Run the complete benchmark suite."""
        # Latency benchmarks
        for model_name, model_id in [
            ("sonnet", "anthropic.claude-3-sonnet-20240229-v1:0"),
            ("haiku", "anthropic.claude-3-haiku-20240307-v1:0"),
        ]:
            config = BenchmarkConfig(
                name=f"latency_{model_name}",
                warmup_iterations=5,
                measurement_iterations=50,
                model_id=model_id,
            )
            bench = LatencyBenchmark(self.client, config)
            result = await bench.run(self.STANDARD_PROMPTS)
            result = self.detector.compare(result)
            self.results.append(result)

        # Throughput benchmark
        config = BenchmarkConfig(
            name="throughput_haiku",
            warmup_iterations=5,
            measurement_iterations=100,
            concurrent_workers=10,
            model_id="anthropic.claude-3-haiku-20240307-v1:0",
        )
        bench = ThroughputBenchmark(self.client, config)
        result = await bench.run(self.STANDARD_PROMPTS)
        result = self.detector.compare(result)
        self.results.append(result)

        # Build summary
        regressions = [r for r in self.results if r.status == BenchmarkStatus.REGRESSION]

        return {
            "total_benchmarks": len(self.results),
            "regressions": len(regressions),
            "results": [
                {
                    "name": r.name,
                    "status": r.status.value,
                    "p95_ms": r.p95_ms,
                    "mean_ms": r.mean_ms,
                    "throughput_rps": r.throughput_rps,
                    "regression_pct": r.regression_pct,
                    "total_cost_usd": r.total_cost_usd,
                }
                for r in self.results
            ],
            "gate_passed": len(regressions) == 0,
        }

Cost Modeling for MangaAssist

graph LR
    subgraph QueryTypes["Query Types"]
        SQ[Simple Query<br/>Greeting / FAQ]
        PL[Product Lookup<br/>ISBN / Title]
        RC[Recommendation<br/>Personalized]
        OS[Order Status<br/>Tracking]
        CX[Complex Query<br/>Multi-Turn]
    end

    subgraph ModelRouting["Model Routing"]
        HK[Haiku<br/>$0.25 / $1.25]
        SN[Sonnet<br/>$3.00 / $15.00]
    end

    subgraph CostImpact["Daily Cost @ 1M msgs"]
        HC[Haiku Simple<br/>~$150/day]
        SC[Sonnet Complex<br/>~$4,500/day]
        MC[Mixed Routing<br/>~$900/day]
    end

    SQ --> HK
    PL --> HK
    OS --> HK
    RC --> SN
    CX --> SN
    HK --> HC
    SN --> SC
    HK --> MC
    SN --> MC

    style HK fill:#51cf66,color:#fff
    style SN fill:#339af0,color:#fff
    style MC fill:#ffd43b,color:#333

Cost Analysis Utility

"""
Cost analysis and model routing optimizer for MangaAssist.
Estimates daily costs at 1M messages/day based on query mix.
"""

from dataclasses import dataclass
from typing import Any


@dataclass
class QueryProfile:
    """Profile for a type of user query in MangaAssist."""
    name: str
    share_pct: float         # Percentage of total traffic
    avg_input_tokens: int
    avg_output_tokens: int
    preferred_model: str     # "haiku" or "sonnet"
    cache_hit_rate: float    # 0.0 to 1.0


MANGA_QUERY_MIX = [
    QueryProfile("greeting_faq", 20.0, 80, 60, "haiku", 0.90),
    QueryProfile("product_lookup", 30.0, 150, 120, "haiku", 0.40),
    QueryProfile("recommendation", 20.0, 300, 250, "sonnet", 0.10),
    QueryProfile("order_status", 15.0, 100, 80, "haiku", 0.05),
    QueryProfile("complex_multi_turn", 15.0, 500, 400, "sonnet", 0.05),
]

MODEL_COSTS_PER_M = {
    "haiku": {"input": 0.25, "output": 1.25},
    "sonnet": {"input": 3.00, "output": 15.00},
}


def estimate_daily_cost(
    daily_messages: int = 1_000_000,
    query_mix: list[QueryProfile] | None = None,
) -> dict[str, Any]:
    """Estimate daily cost breakdown for MangaAssist at scale."""
    query_mix = query_mix or MANGA_QUERY_MIX
    breakdown = []
    total_cost = 0.0
    total_cached = 0

    for profile in query_mix:
        daily_queries = int(daily_messages * profile.share_pct / 100)
        cached = int(daily_queries * profile.cache_hit_rate)
        uncached = daily_queries - cached
        total_cached += cached

        pricing = MODEL_COSTS_PER_M[profile.preferred_model]
        input_cost = (uncached * profile.avg_input_tokens / 1_000_000) * pricing["input"]
        output_cost = (uncached * profile.avg_output_tokens / 1_000_000) * pricing["output"]
        query_cost = input_cost + output_cost
        total_cost += query_cost

        breakdown.append({
            "query_type": profile.name,
            "model": profile.preferred_model,
            "daily_queries": daily_queries,
            "cached": cached,
            "uncached": uncached,
            "daily_cost_usd": round(query_cost, 2),
            "cost_per_query_usd": round(query_cost / max(uncached, 1), 6),
        })

    return {
        "daily_messages": daily_messages,
        "total_daily_cost_usd": round(total_cost, 2),
        "monthly_cost_usd": round(total_cost * 30, 2),
        "cache_savings_pct": round(total_cached / daily_messages * 100, 1),
        "avg_cost_per_message_usd": round(total_cost / daily_messages, 6),
        "breakdown": breakdown,
    }

Key Takeaways

# Takeaway MangaAssist Application
1 Test fixtures make non-deterministic FM testing deterministic. Golden responses, error scenarios, and Japanese content fixtures give repeatable test runs. Every MangaAssist prompt category (recommendation, product lookup, order status) has a golden fixture with known token counts and quality scores.
2 Mock Bedrock clients must simulate latency, token counting, and cost — not just return canned text. The mock client tracks cumulative cost using real Sonnet/Haiku pricing so developers see cost impact during testing.
3 Streaming mocks are essential for testing the WebSocket delivery path. Word-by-word yielding simulates real Bedrock streaming. API Gateway WebSocket tests use the mock stream iterator to verify chunk assembly and partial-response rendering.
4 Benchmark suites need warm-up phases to avoid cold-start noise in measurements. 10 warm-up requests before 100 measured requests ensures ECS Fargate container pools are active.
5 Regression detection uses statistical comparison (P95 change > 10%) rather than simple threshold checks. A 12% P95 increase after a prompt template change triggers a warning annotation on the GitHub PR.
6 Cost modeling at 1M messages/day reveals that model routing is the largest lever — Haiku for simple queries saves 80%+ vs. all-Sonnet. Routing greetings and FAQ to Haiku ($0.25/$1.25) instead of Sonnet ($3/$15) saves an estimated $3,600/day.
7 Cache hit rates vary dramatically by query type — FAQs cache at 90% while recommendations cache at 10%. ElastiCache Redis hit rates are tracked per query category, and the benchmark suite models their cost impact.