LOCAL PREVIEW View on GitHub

AI Component Testing and Performance Optimization

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.

Skill Mapping

Attribute Detail
Certification AWS AIP-C01 — AI Practitioner
Domain 2 — Implementation & Integration of GenAI Applications
Task 2.5 — Application Integration Patterns
Skill 2.5.4 — Enhance developer productivity to accelerate development workflows for GenAI applications
Focus AI component testing (prompt regression, model output validation, load testing) and performance optimization (connection pooling, request batching, caching)

Deep-Dive Overview

This document provides production-grade implementations for the two pillars of GenAI application quality: testing and performance optimization. Standard software testing is necessary but insufficient for FM-powered applications — prompt behavior, output quality, and cost-at-scale all require specialized frameworks.

mindmap
  root((AI Testing &<br/>Performance Optimization))
    Prompt Regression Testing
      Golden Set Baselines
      Semantic Similarity Scoring
      Drift Detection Across Versions
      Multi-Language Coverage
      Automated Bisection
    Model Output Validation
      Keyword Presence / Absence
      Structural Format Checks
      Japanese Content Fluency
      Safety Guardrail Enforcement
      Hallucination Detection
    Load Testing FM Endpoints
      Concurrent Invocation Simulation
      Throttle Behavior Under Load
      Latency Distribution Analysis
      Cost Projection at Scale
      Burst vs Sustained Patterns
    Connection Pooling
      boto3 Client Reuse
      HTTP Keep-Alive Tuning
      OpenSearch Connection Pool
      Redis Pipeline Batching
      Pool Size Optimization
    Request Batching
      Bedrock Batch Inference
      DynamoDB Batch Writes
      Embedding Batch Generation
      Async Gather Patterns
      Queue-Based Batching

Prompt Regression Testing

Prompt regression testing detects when changes to prompt templates, system instructions, or model parameters cause output quality to degrade. Unlike code regressions that produce errors, prompt regressions often produce outputs that are syntactically valid but semantically wrong — they "look fine" but fail to meet quality standards.

Regression Testing Architecture

graph TB
    subgraph Baseline["Golden Set Baselines"]
        GS[Golden Set<br/>Curated Q&A Pairs]
        VH[Version History<br/>Prompt Template Hashes]
        SM[Semantic Embeddings<br/>Baseline Response Vectors]
    end

    subgraph Execution["Regression Execution"]
        RE[Regression Runner<br/>Execute All Golden Pairs]
        SC[Scoring Engine<br/>Compare vs Baseline]
        DF[Drift Detector<br/>Statistical Analysis]
    end

    subgraph Analysis["Regression Analysis"]
        SS[Semantic Similarity<br/>Cosine Distance]
        KW[Keyword Coverage<br/>Expected Terms]
        ST[Structural Validation<br/>JSON / Format Checks]
        JP[Japanese Quality<br/>Fluency Score]
    end

    subgraph Reporting["Regression Reports"]
        DR[Drift Report<br/>Score Delta per Test]
        VR[Version Comparison<br/>Before vs After]
        AL[Alert System<br/>Slack / PagerDuty]
        CI[CI Gate<br/>Block on Regression]
    end

    GS --> RE
    VH --> RE
    RE --> SC
    SC --> SS
    SC --> KW
    SC --> ST
    SC --> JP
    SS --> DF
    KW --> DF
    ST --> DF
    JP --> DF
    DF --> DR
    DF --> VR
    DF --> AL
    DF --> CI

    style GS fill:#339af0,color:#fff
    style DF fill:#ff6b6b,color:#fff
    style CI fill:#ff6b6b,color:#fff

Prompt Regression Suite Implementation

"""
PromptRegressionSuite — detects quality regressions in MangaAssist prompts
by comparing current model outputs against golden-set baselines using
semantic similarity, keyword coverage, structural validation, and
Japanese content fluency scoring.
"""

import hashlib
import json
import logging
import math
import statistics
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Optional

logger = logging.getLogger(__name__)


class RegressionSeverity(Enum):
    """Severity levels for detected regressions."""
    CRITICAL = "critical"   # Quality score dropped > 20%
    MAJOR = "major"         # Quality score dropped 10-20%
    MINOR = "minor"         # Quality score dropped 5-10%
    NONE = "none"           # No regression detected


@dataclass
class GoldenSetEntry:
    """A single golden-set entry: a known-good input/output pair."""
    entry_id: str
    prompt_template: str
    variables: dict[str, str]
    baseline_response: str
    baseline_score: float
    expected_keywords: list[str] = field(default_factory=list)
    forbidden_keywords: list[str] = field(default_factory=list)
    expected_language: str = "en"   # "en", "ja", or "mixed"
    expected_format: str = "text"   # "text", "json", "markdown"
    tags: list[str] = field(default_factory=list)
    baseline_embedding: list[float] = field(default_factory=list)
    created_at: float = field(default_factory=time.time)


@dataclass
class RegressionResult:
    """Result of comparing current output against a golden-set baseline."""
    entry_id: str
    severity: RegressionSeverity
    baseline_score: float
    current_score: float
    score_delta: float
    semantic_similarity: float
    keyword_coverage: float
    structural_match: bool
    language_correct: bool
    current_response: str
    baseline_response: str
    latency_ms: float = 0.0
    details: dict[str, Any] = field(default_factory=dict)


@dataclass
class RegressionReport:
    """Aggregated regression report across all golden-set entries."""
    suite_name: str
    prompt_template_hash: str
    total_entries: int
    regressions_critical: int
    regressions_major: int
    regressions_minor: int
    no_regression: int
    avg_score_delta: float
    avg_semantic_similarity: float
    avg_keyword_coverage: float
    overall_pass: bool
    results: list[RegressionResult] = field(default_factory=list)
    execution_time_ms: float = 0.0
    recommendations: list[str] = field(default_factory=list)


class PromptRegressionSuite:
    """
    Manages golden-set baselines and runs regression tests for MangaAssist prompts.

    Workflow:
      1. Establish baselines: run prompts against a model, store outputs as golden set
      2. On prompt/model change: re-run the same prompts
      3. Compare current outputs against baselines using multiple scoring dimensions
      4. Flag regressions that exceed severity thresholds
      5. Block CI/CD if critical regressions are detected
    """

    # Thresholds for regression severity classification
    SEVERITY_THRESHOLDS = {
        "critical": 0.20,  # Score dropped > 20%
        "major": 0.10,     # Score dropped 10-20%
        "minor": 0.05,     # Score dropped 5-10%
    }

    # Minimum acceptable scores for passing
    MIN_SEMANTIC_SIMILARITY = 0.75
    MIN_KEYWORD_COVERAGE = 0.80
    MIN_OVERALL_SCORE = 0.70

    def __init__(
        self,
        suite_name: str = "MangaAssist-Prompt-Regression",
        golden_set: list[GoldenSetEntry] | None = None,
        bedrock_client: Any = None,
        embedding_client: Any = None,
    ):
        self.suite_name = suite_name
        self.golden_set: list[GoldenSetEntry] = golden_set or []
        self.bedrock_client = bedrock_client
        self.embedding_client = embedding_client
        self.results: list[RegressionResult] = []

    def add_golden_entry(self, entry: GoldenSetEntry) -> None:
        """Add a new golden-set entry to the suite."""
        self.golden_set.append(entry)
        logger.info("Added golden entry '%s' (tags: %s)", entry.entry_id, entry.tags)

    def _compute_template_hash(self, template: str) -> str:
        """Hash a prompt template for version tracking."""
        return hashlib.sha256(template.encode("utf-8")).hexdigest()[:12]

    def _render_prompt(self, template: str, variables: dict[str, str]) -> str:
        """Render a prompt template with variable substitution."""
        rendered = template
        for key, value in variables.items():
            rendered = rendered.replace(f"{{{{{key}}}}}", value)
        return rendered

    @staticmethod
    def _cosine_similarity(vec_a: list[float], vec_b: list[float]) -> float:
        """Compute cosine similarity between two vectors."""
        if not vec_a or not vec_b or len(vec_a) != len(vec_b):
            return 0.0
        dot_product = sum(a * b for a, b in zip(vec_a, vec_b))
        norm_a = math.sqrt(sum(a * a for a in vec_a))
        norm_b = math.sqrt(sum(b * b for b in vec_b))
        if norm_a == 0 or norm_b == 0:
            return 0.0
        return dot_product / (norm_a * norm_b)

    def _compute_keyword_coverage(
        self, response: str, expected: list[str], forbidden: list[str]
    ) -> float:
        """Compute keyword coverage score (0.0 to 1.0)."""
        if not expected and not forbidden:
            return 1.0

        response_lower = response.lower()
        scores: list[float] = []

        # Expected keyword coverage
        if expected:
            found = sum(1 for kw in expected if kw.lower() in response_lower)
            scores.append(found / len(expected))

        # Forbidden keyword penalty
        if forbidden:
            violations = sum(1 for kw in forbidden if kw.lower() in response_lower)
            scores.append(1.0 - (violations / len(forbidden)))

        return statistics.mean(scores) if scores else 1.0

    def _check_structural_match(self, response: str, expected_format: str) -> bool:
        """Check if response matches expected structural format."""
        if expected_format == "json":
            try:
                json.loads(response)
                return True
            except (json.JSONDecodeError, TypeError):
                return False
        if expected_format == "markdown":
            return any(
                marker in response
                for marker in ["#", "**", "- ", "1.", "```"]
            )
        return True  # "text" format always passes

    def _check_language(self, response: str, expected_language: str) -> bool:
        """Check if response contains expected language content."""
        has_japanese = any(
            "\u3040" <= ch <= "\u9fff" or "\uf900" <= ch <= "\ufaff"
            for ch in response
        )
        has_latin = any("a" <= ch.lower() <= "z" for ch in response)

        if expected_language == "ja":
            return has_japanese
        if expected_language == "en":
            return has_latin
        if expected_language == "mixed":
            return has_japanese and has_latin
        return True

    def _classify_severity(self, score_delta: float) -> RegressionSeverity:
        """Classify regression severity based on score delta."""
        abs_delta = abs(score_delta)
        if abs_delta >= self.SEVERITY_THRESHOLDS["critical"]:
            return RegressionSeverity.CRITICAL
        if abs_delta >= self.SEVERITY_THRESHOLDS["major"]:
            return RegressionSeverity.MAJOR
        if abs_delta >= self.SEVERITY_THRESHOLDS["minor"]:
            return RegressionSeverity.MINOR
        return RegressionSeverity.NONE

    def _compute_composite_score(
        self,
        semantic_sim: float,
        keyword_cov: float,
        structural_match: bool,
        language_correct: bool,
    ) -> float:
        """Compute a weighted composite quality score."""
        weights = {
            "semantic": 0.40,
            "keyword": 0.25,
            "structural": 0.15,
            "language": 0.20,
        }
        score = (
            weights["semantic"] * semantic_sim
            + weights["keyword"] * keyword_cov
            + weights["structural"] * (1.0 if structural_match else 0.0)
            + weights["language"] * (1.0 if language_correct else 0.0)
        )
        return round(score, 4)

    async def run_single_regression(
        self, entry: GoldenSetEntry
    ) -> RegressionResult:
        """Run regression test for a single golden-set entry."""
        prompt = self._render_prompt(entry.prompt_template, entry.variables)

        start = time.monotonic()
        # Invoke model
        model_response = await self.bedrock_client.invoke(
            model_id="anthropic.claude-3-haiku-20240307-v1:0",
            prompt=prompt,
            max_tokens=1024,
        )
        latency_ms = (time.monotonic() - start) * 1000
        current_response = model_response.get("text", "")

        # Compute semantic similarity
        semantic_sim = 0.0
        if entry.baseline_embedding and self.embedding_client:
            current_embedding = await self.embedding_client.embed(current_response)
            semantic_sim = self._cosine_similarity(
                entry.baseline_embedding, current_embedding
            )

        # Compute keyword coverage
        keyword_cov = self._compute_keyword_coverage(
            current_response, entry.expected_keywords, entry.forbidden_keywords
        )

        # Check structural match
        structural_ok = self._check_structural_match(
            current_response, entry.expected_format
        )

        # Check language
        language_ok = self._check_language(
            current_response, entry.expected_language
        )

        # Compute composite score
        current_score = self._compute_composite_score(
            semantic_sim, keyword_cov, structural_ok, language_ok
        )
        score_delta = entry.baseline_score - current_score
        severity = self._classify_severity(score_delta)

        result = RegressionResult(
            entry_id=entry.entry_id,
            severity=severity,
            baseline_score=entry.baseline_score,
            current_score=current_score,
            score_delta=round(score_delta, 4),
            semantic_similarity=round(semantic_sim, 4),
            keyword_coverage=round(keyword_cov, 4),
            structural_match=structural_ok,
            language_correct=language_ok,
            current_response=current_response,
            baseline_response=entry.baseline_response,
            latency_ms=round(latency_ms, 2),
            details={
                "prompt_hash": self._compute_template_hash(entry.prompt_template),
                "tags": entry.tags,
            },
        )
        self.results.append(result)
        return result

    async def run_full_suite(self) -> RegressionReport:
        """Run regression tests across all golden-set entries."""
        suite_start = time.monotonic()
        self.results = []

        for entry in self.golden_set:
            await self.run_single_regression(entry)

        execution_time = (time.monotonic() - suite_start) * 1000

        # Classify results
        critical = sum(1 for r in self.results if r.severity == RegressionSeverity.CRITICAL)
        major = sum(1 for r in self.results if r.severity == RegressionSeverity.MAJOR)
        minor = sum(1 for r in self.results if r.severity == RegressionSeverity.MINOR)
        no_reg = sum(1 for r in self.results if r.severity == RegressionSeverity.NONE)

        # Compute averages
        score_deltas = [r.score_delta for r in self.results]
        similarities = [r.semantic_similarity for r in self.results]
        coverages = [r.keyword_coverage for r in self.results]

        # Determine overall pass
        overall_pass = (
            critical == 0
            and (major / max(len(self.results), 1)) < 0.10
        )

        # Build template hash from first entry
        template_hash = ""
        if self.golden_set:
            template_hash = self._compute_template_hash(
                self.golden_set[0].prompt_template
            )

        report = RegressionReport(
            suite_name=self.suite_name,
            prompt_template_hash=template_hash,
            total_entries=len(self.results),
            regressions_critical=critical,
            regressions_major=major,
            regressions_minor=minor,
            no_regression=no_reg,
            avg_score_delta=round(statistics.mean(score_deltas), 4) if score_deltas else 0.0,
            avg_semantic_similarity=round(statistics.mean(similarities), 4) if similarities else 0.0,
            avg_keyword_coverage=round(statistics.mean(coverages), 4) if coverages else 0.0,
            overall_pass=overall_pass,
            results=self.results,
            execution_time_ms=round(execution_time, 2),
        )

        # Generate recommendations
        report.recommendations = self._generate_recommendations(report)
        return report

    def _generate_recommendations(self, report: RegressionReport) -> list[str]:
        """Generate actionable recommendations from regression results."""
        recs: list[str] = []

        if report.regressions_critical > 0:
            recs.append(
                f"{report.regressions_critical} CRITICAL regression(s) detected. "
                f"Review prompt template changes immediately. "
                f"This will BLOCK the CI/CD pipeline."
            )

        if report.avg_semantic_similarity < self.MIN_SEMANTIC_SIMILARITY:
            recs.append(
                f"Average semantic similarity ({report.avg_semantic_similarity:.2f}) "
                f"is below threshold ({self.MIN_SEMANTIC_SIMILARITY}). "
                f"Model responses have drifted significantly from baselines."
            )

        if report.avg_keyword_coverage < self.MIN_KEYWORD_COVERAGE:
            recs.append(
                f"Average keyword coverage ({report.avg_keyword_coverage:.2f}) "
                f"is below threshold ({self.MIN_KEYWORD_COVERAGE}). "
                f"Expected domain terms are missing from responses."
            )

        # Check for Japanese-specific regressions
        jp_failures = [
            r for r in report.results
            if not r.language_correct and "ja" in r.details.get("tags", [])
        ]
        if jp_failures:
            recs.append(
                f"{len(jp_failures)} Japanese content regression(s). "
                f"Model may have lost Japanese language capability for these prompts."
            )

        if not report.overall_pass:
            recs.append(
                "SUITE FAILED. Do not merge until regressions are resolved. "
                "Use `run_bisection()` to identify which commit caused the regression."
            )

        return recs

Model Output Validation

Model output validation ensures that every Bedrock response meets MangaAssist quality standards before being returned to users. This is a runtime concern (not just test-time) — responses that fail validation are either retried with a different model or returned with a fallback message.

Output Validation Pipeline

graph TB
    subgraph Input["Model Response Input"]
        RAW[Raw Bedrock Response<br/>JSON Body]
        META[Response Metadata<br/>Tokens, Latency, Stop Reason]
    end

    subgraph Validators["Validation Pipeline"]
        FMT[Format Validator<br/>JSON / Markdown / Plain]
        KW[Keyword Validator<br/>Expected / Forbidden]
        LNG[Language Validator<br/>Japanese / English / Mixed]
        SAF[Safety Validator<br/>No PII, No Injection Leaks]
        HAL[Hallucination Check<br/>Cross-Reference RAG Sources]
        LEN[Length Validator<br/>Min / Max Characters]
    end

    subgraph Decision["Validation Decision"]
        PASS[Accept Response<br/>Return to User]
        RETRY[Retry with Haiku<br/>Simpler Prompt]
        FALLBACK[Return Fallback<br/>Canned Response]
        LOG[Log Failure<br/>For Analysis]
    end

    RAW --> FMT
    RAW --> KW
    RAW --> LNG
    RAW --> SAF
    RAW --> HAL
    RAW --> LEN
    META --> SAF

    FMT -->|Pass| PASS
    KW -->|Pass| PASS
    LNG -->|Fail| RETRY
    SAF -->|Fail| FALLBACK
    HAL -->|Fail| RETRY
    LEN -->|Fail| RETRY

    FMT -->|Fail| LOG
    SAF -->|Fail| LOG
    HAL -->|Fail| LOG

    style PASS fill:#51cf66,color:#fff
    style FALLBACK fill:#ff6b6b,color:#fff
    style RETRY fill:#ffd43b,color:#333

Load Testing FM Endpoints

Load testing FM endpoints is fundamentally different from load testing traditional APIs. Bedrock enforces per-model token-per-minute (TPM) and requests-per-minute (RPM) limits. Load tests must simulate realistic traffic distributions, not just hammer a single endpoint.

Load Testing Architecture

graph TB
    subgraph LoadGen["Load Generator"]
        WK[Worker Pool<br/>Async Coroutines]
        TP[Traffic Patterns<br/>Burst / Sustained / Ramp]
        QD[Query Distribution<br/>Genre Mix, Language Mix]
    end

    subgraph Target["MangaAssist Endpoints"]
        WS[WebSocket API<br/>API Gateway]
        REST[REST API<br/>Health & Admin]
        BK[Bedrock Backend<br/>Sonnet & Haiku]
    end

    subgraph Metrics["Metrics Collection"]
        LAT[Latency Histogram<br/>p50 / p95 / p99]
        THR[Throughput Counter<br/>Requests/sec]
        ERR[Error Rate<br/>Throttle / Timeout / 5xx]
        CST[Cost Accumulator<br/>Tokens x Pricing]
    end

    subgraph Analysis["Load Analysis"]
        BRK[Break Point<br/>Where Does It Fail?]
        SCL[Scaling Behavior<br/>Linear vs Degraded]
        REC[Recommendations<br/>Provisioned Throughput]
    end

    WK --> WS
    WK --> REST
    WK --> BK
    TP --> WK
    QD --> WK
    WS --> LAT
    BK --> LAT
    WS --> THR
    BK --> ERR
    BK --> CST
    LAT --> BRK
    THR --> SCL
    ERR --> BRK
    CST --> REC

    style BK fill:#232f3e,color:#ff9900
    style BRK fill:#ff6b6b,color:#fff

FM Load Tester Implementation

"""
FMLoadTester — load tests MangaAssist FM endpoints with realistic traffic
patterns, measuring latency distributions, throughput limits, error rates,
and projected costs at 1M messages/day scale.
"""

import asyncio
import logging
import random
import statistics
import time
from collections import defaultdict
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Optional

logger = logging.getLogger(__name__)


class TrafficPattern(Enum):
    """Load testing traffic patterns."""
    SUSTAINED = "sustained"     # Constant rate over duration
    RAMP_UP = "ramp_up"         # Linearly increasing rate
    BURST = "burst"             # Short high-intensity bursts
    SPIKE = "spike"             # Sudden 10x spike then return to baseline
    PRODUCTION = "production"   # Modeled from real traffic distribution


class QueryType(Enum):
    """Types of MangaAssist queries for realistic traffic mix."""
    PRODUCT_LOOKUP = "product_lookup"       # Simple: use Haiku
    RECOMMENDATION = "recommendation"       # Complex: use Sonnet
    PRICE_CHECK = "price_check"             # Simple: use Haiku
    MANGA_SUMMARY = "manga_summary"         # Complex: use Sonnet
    ORDER_STATUS = "order_status"           # Simple: use Haiku
    JAPANESE_QUERY = "japanese_query"        # Mixed: depends on complexity
    GREETING = "greeting"                   # Simple: use Haiku


# MangaAssist production traffic distribution (based on analytics)
PRODUCTION_TRAFFIC_MIX = {
    QueryType.PRODUCT_LOOKUP: 0.25,
    QueryType.RECOMMENDATION: 0.20,
    QueryType.PRICE_CHECK: 0.15,
    QueryType.MANGA_SUMMARY: 0.10,
    QueryType.ORDER_STATUS: 0.10,
    QueryType.JAPANESE_QUERY: 0.12,
    QueryType.GREETING: 0.08,
}


@dataclass
class LoadTestRequest:
    """A single load test request."""
    request_id: int
    query_type: QueryType
    prompt: str
    model_id: str
    timestamp: float = field(default_factory=time.time)
    expected_max_latency_ms: float = 3000.0


@dataclass
class LoadTestResponse:
    """Response from a single load test request."""
    request_id: int
    query_type: QueryType
    model_id: str
    latency_ms: float
    tokens_in: int = 0
    tokens_out: int = 0
    cost_usd: float = 0.0
    status: str = "success"   # "success", "throttled", "timeout", "error"
    error_message: str = ""
    timestamp: float = field(default_factory=time.time)


@dataclass
class LoadTestReport:
    """Comprehensive load test report."""
    test_name: str
    traffic_pattern: TrafficPattern
    duration_seconds: float
    total_requests: int
    successful_requests: int
    throttled_requests: int
    timeout_requests: int
    error_requests: int
    avg_latency_ms: float
    p50_latency_ms: float
    p95_latency_ms: float
    p99_latency_ms: float
    max_latency_ms: float
    requests_per_second: float
    total_tokens_in: int
    total_tokens_out: int
    total_cost_usd: float
    projected_daily_cost_usd: float
    projected_monthly_cost_usd: float
    sla_compliance_rate: float
    by_query_type: dict[str, dict[str, Any]] = field(default_factory=dict)
    by_model: dict[str, dict[str, Any]] = field(default_factory=dict)
    recommendations: list[str] = field(default_factory=list)


# Token pricing (per 1M tokens)
MODEL_PRICING = {
    "anthropic.claude-3-sonnet-20240229-v1:0": {"input": 3.00, "output": 15.00},
    "anthropic.claude-3-haiku-20240307-v1:0": {"input": 0.25, "output": 1.25},
}

# Sample prompts for each query type
QUERY_PROMPTS = {
    QueryType.PRODUCT_LOOKUP: [
        "Do you have Attack on Titan volume 34 in stock?",
        "What edition of One Piece volume 100 is available?",
        "Is Spy x Family volume 12 available in English?",
    ],
    QueryType.RECOMMENDATION: [
        "Recommend 5 manga similar to Demon Slayer for someone who likes action.",
        "I liked Fruits Basket. What other shojo manga would you suggest?",
        "What are the top-rated seinen manga released this year?",
    ],
    QueryType.PRICE_CHECK: [
        "How much is Jujutsu Kaisen volume 20?",
        "What's the price for the complete Naruto box set?",
        "Compare prices: My Hero Academia paperback vs digital.",
    ],
    QueryType.MANGA_SUMMARY: [
        "Give me a detailed summary of the Chainsaw Man arc.",
        "Summarize the first 50 chapters of One Piece without spoilers.",
        "What is the overall plot and themes of Berserk?",
    ],
    QueryType.ORDER_STATUS: [
        "Where is my order #MGA-2024-78542?",
        "When will my pre-order for Blue Lock volume 25 ship?",
        "Can I change the shipping address for order #MGA-2024-91023?",
    ],
    QueryType.JAPANESE_QUERY: [
        "ワンピースの最新巻はいつ発売ですか?",
        "おすすめの少年マンガを教えてください。",
        "進撃の巨人の全巻セットの価格を教えてください。",
    ],
    QueryType.GREETING: [
        "Hello!",
        "こんにちは",
        "Hi, I'm looking for manga recommendations.",
    ],
}

# Model routing per query type
QUERY_MODEL_ROUTING = {
    QueryType.PRODUCT_LOOKUP: "anthropic.claude-3-haiku-20240307-v1:0",
    QueryType.RECOMMENDATION: "anthropic.claude-3-sonnet-20240229-v1:0",
    QueryType.PRICE_CHECK: "anthropic.claude-3-haiku-20240307-v1:0",
    QueryType.MANGA_SUMMARY: "anthropic.claude-3-sonnet-20240229-v1:0",
    QueryType.ORDER_STATUS: "anthropic.claude-3-haiku-20240307-v1:0",
    QueryType.JAPANESE_QUERY: "anthropic.claude-3-haiku-20240307-v1:0",
    QueryType.GREETING: "anthropic.claude-3-haiku-20240307-v1:0",
}


class FMLoadTester:
    """
    Load tests MangaAssist FM endpoints with configurable traffic patterns.

    Supports:
      - Sustained, ramp-up, burst, spike, and production-modeled patterns
      - Realistic query distribution matching production analytics
      - Model routing (Sonnet for complex, Haiku for simple)
      - Latency distribution analysis (p50/p95/p99)
      - Cost projection at 1M messages/day
      - Throttle and timeout detection
    """

    def __init__(
        self,
        bedrock_client: Any,
        max_concurrency: int = 50,
        sla_latency_ms: float = 3000.0,
        daily_message_target: int = 1_000_000,
    ):
        self.bedrock_client = bedrock_client
        self.max_concurrency = max_concurrency
        self.sla_latency_ms = sla_latency_ms
        self.daily_message_target = daily_message_target
        self.responses: list[LoadTestResponse] = []
        self._semaphore = asyncio.Semaphore(max_concurrency)
        self._request_counter = 0

    def _select_query(self) -> tuple[QueryType, str, str]:
        """Select a random query based on production traffic mix."""
        rand = random.random()
        cumulative = 0.0
        selected_type = QueryType.GREETING

        for query_type, weight in PRODUCTION_TRAFFIC_MIX.items():
            cumulative += weight
            if rand <= cumulative:
                selected_type = query_type
                break

        prompt = random.choice(QUERY_PROMPTS[selected_type])
        model_id = QUERY_MODEL_ROUTING[selected_type]
        return selected_type, prompt, model_id

    def _compute_cost(self, model_id: str, tokens_in: int, tokens_out: int) -> float:
        """Compute cost for a single invocation."""
        pricing = MODEL_PRICING.get(model_id, {"input": 3.0, "output": 15.0})
        return (
            (tokens_in / 1_000_000) * pricing["input"]
            + (tokens_out / 1_000_000) * pricing["output"]
        )

    async def _execute_single_request(
        self, request: LoadTestRequest
    ) -> LoadTestResponse:
        """Execute a single load test request with concurrency limiting."""
        async with self._semaphore:
            start = time.monotonic()
            try:
                response = await self.bedrock_client.invoke(
                    model_id=request.model_id,
                    prompt=request.prompt,
                    max_tokens=1024,
                )
                latency_ms = (time.monotonic() - start) * 1000
                tokens_in = response.get("usage", {}).get("input_tokens", 500)
                tokens_out = response.get("usage", {}).get("output_tokens", 200)

                result = LoadTestResponse(
                    request_id=request.request_id,
                    query_type=request.query_type,
                    model_id=request.model_id,
                    latency_ms=round(latency_ms, 2),
                    tokens_in=tokens_in,
                    tokens_out=tokens_out,
                    cost_usd=self._compute_cost(request.model_id, tokens_in, tokens_out),
                    status="success",
                )

            except Exception as e:
                latency_ms = (time.monotonic() - start) * 1000
                error_str = str(e)
                status = "error"
                if "ThrottlingException" in error_str:
                    status = "throttled"
                elif "timeout" in error_str.lower() or latency_ms > 30000:
                    status = "timeout"

                result = LoadTestResponse(
                    request_id=request.request_id,
                    query_type=request.query_type,
                    model_id=request.model_id,
                    latency_ms=round(latency_ms, 2),
                    status=status,
                    error_message=error_str[:500],
                )

            self.responses.append(result)
            return result

    async def _generate_traffic_schedule(
        self,
        pattern: TrafficPattern,
        total_requests: int,
        duration_seconds: float,
    ) -> list[float]:
        """Generate request timestamps based on traffic pattern."""
        timestamps: list[float] = []
        start_time = time.monotonic()

        if pattern == TrafficPattern.SUSTAINED:
            interval = duration_seconds / total_requests
            for i in range(total_requests):
                timestamps.append(start_time + i * interval)

        elif pattern == TrafficPattern.RAMP_UP:
            # Linearly increasing rate: more requests toward the end
            for i in range(total_requests):
                progress = i / total_requests
                delay = duration_seconds * (1 - progress**2)
                timestamps.append(start_time + delay * (i / total_requests))

        elif pattern == TrafficPattern.BURST:
            # 80% of requests in 20% of the time
            burst_count = int(total_requests * 0.8)
            burst_duration = duration_seconds * 0.2
            rest_count = total_requests - burst_count
            rest_duration = duration_seconds * 0.8

            for i in range(burst_count):
                timestamps.append(start_time + (i / burst_count) * burst_duration)
            for i in range(rest_count):
                timestamps.append(
                    start_time + burst_duration + (i / max(rest_count, 1)) * rest_duration
                )

        elif pattern == TrafficPattern.SPIKE:
            # Normal rate, then 10x spike at midpoint, then return
            normal_count = int(total_requests * 0.6)
            spike_count = total_requests - normal_count
            normal_half = normal_count // 2

            for i in range(normal_half):
                timestamps.append(start_time + (i / normal_half) * (duration_seconds * 0.4))
            spike_start = duration_seconds * 0.4
            for i in range(spike_count):
                timestamps.append(
                    start_time + spike_start + (i / spike_count) * (duration_seconds * 0.2)
                )
            for i in range(normal_count - normal_half):
                timestamps.append(
                    start_time + duration_seconds * 0.6
                    + (i / max(normal_count - normal_half, 1)) * (duration_seconds * 0.4)
                )

        else:  # PRODUCTION — random with time-of-day weighting
            for i in range(total_requests):
                timestamps.append(
                    start_time + random.random() * duration_seconds
                )

        return sorted(timestamps)

    async def run_load_test(
        self,
        test_name: str = "MangaAssist-Load-Test",
        pattern: TrafficPattern = TrafficPattern.SUSTAINED,
        total_requests: int = 1000,
        duration_seconds: float = 60.0,
    ) -> LoadTestReport:
        """Execute a full load test and return a comprehensive report."""
        self.responses = []
        self._request_counter = 0

        logger.info(
            "Starting load test '%s': %d requests, %s pattern, %.0fs duration",
            test_name, total_requests, pattern.value, duration_seconds,
        )

        timestamps = await self._generate_traffic_schedule(
            pattern, total_requests, duration_seconds
        )

        tasks = []
        start_time = time.monotonic()

        for ts in timestamps:
            self._request_counter += 1
            query_type, prompt, model_id = self._select_query()

            request = LoadTestRequest(
                request_id=self._request_counter,
                query_type=query_type,
                prompt=prompt,
                model_id=model_id,
            )

            # Schedule with delay based on traffic pattern
            delay = ts - time.monotonic()
            if delay > 0:
                await asyncio.sleep(delay)

            tasks.append(
                asyncio.create_task(self._execute_single_request(request))
            )

        # Wait for all requests to complete
        await asyncio.gather(*tasks, return_exceptions=True)
        actual_duration = time.monotonic() - start_time

        return self._build_report(test_name, pattern, actual_duration)

    @staticmethod
    def _percentile(values: list[float], pct: float) -> float:
        if not values:
            return 0.0
        s = sorted(values)
        idx = min(int(len(s) * pct / 100), len(s) - 1)
        return s[idx]

    def _build_report(
        self,
        test_name: str,
        pattern: TrafficPattern,
        actual_duration: float,
    ) -> LoadTestReport:
        """Build a comprehensive load test report from collected responses."""
        successful = [r for r in self.responses if r.status == "success"]
        throttled = [r for r in self.responses if r.status == "throttled"]
        timeouts = [r for r in self.responses if r.status == "timeout"]
        errors = [r for r in self.responses if r.status == "error"]

        latencies = [r.latency_ms for r in successful]
        total_tokens_in = sum(r.tokens_in for r in successful)
        total_tokens_out = sum(r.tokens_out for r in successful)
        total_cost = sum(r.cost_usd for r in successful)

        sla_compliant = sum(1 for lat in latencies if lat <= self.sla_latency_ms)
        sla_rate = sla_compliant / max(len(latencies), 1)

        # Per-query-type breakdown
        by_query: dict[str, dict[str, Any]] = {}
        for qt in QueryType:
            qt_responses = [r for r in self.responses if r.query_type == qt]
            if qt_responses:
                qt_latencies = [r.latency_ms for r in qt_responses if r.status == "success"]
                by_query[qt.value] = {
                    "total": len(qt_responses),
                    "success": sum(1 for r in qt_responses if r.status == "success"),
                    "avg_latency_ms": round(statistics.mean(qt_latencies), 2) if qt_latencies else 0,
                    "p95_latency_ms": round(self._percentile(qt_latencies, 95), 2),
                    "cost_usd": round(sum(r.cost_usd for r in qt_responses), 6),
                }

        # Per-model breakdown
        by_model: dict[str, dict[str, Any]] = {}
        for model_id in MODEL_PRICING:
            m_responses = [r for r in self.responses if r.model_id == model_id]
            if m_responses:
                m_latencies = [r.latency_ms for r in m_responses if r.status == "success"]
                by_model[model_id.split(".")[-1][:20]] = {
                    "total": len(m_responses),
                    "success": sum(1 for r in m_responses if r.status == "success"),
                    "avg_latency_ms": round(statistics.mean(m_latencies), 2) if m_latencies else 0,
                    "cost_usd": round(sum(r.cost_usd for r in m_responses), 6),
                }

        # Cost projections
        cost_per_request = total_cost / max(len(successful), 1)
        daily_cost = cost_per_request * self.daily_message_target
        monthly_cost = daily_cost * 30

        report = LoadTestReport(
            test_name=test_name,
            traffic_pattern=pattern,
            duration_seconds=round(actual_duration, 2),
            total_requests=len(self.responses),
            successful_requests=len(successful),
            throttled_requests=len(throttled),
            timeout_requests=len(timeouts),
            error_requests=len(errors),
            avg_latency_ms=round(statistics.mean(latencies), 2) if latencies else 0,
            p50_latency_ms=round(self._percentile(latencies, 50), 2),
            p95_latency_ms=round(self._percentile(latencies, 95), 2),
            p99_latency_ms=round(self._percentile(latencies, 99), 2),
            max_latency_ms=round(max(latencies), 2) if latencies else 0,
            requests_per_second=round(len(self.responses) / max(actual_duration, 0.001), 2),
            total_tokens_in=total_tokens_in,
            total_tokens_out=total_tokens_out,
            total_cost_usd=round(total_cost, 6),
            projected_daily_cost_usd=round(daily_cost, 2),
            projected_monthly_cost_usd=round(monthly_cost, 2),
            sla_compliance_rate=round(sla_rate, 4),
            by_query_type=by_query,
            by_model=by_model,
        )

        report.recommendations = self._generate_recommendations(report)
        return report

    def _generate_recommendations(self, report: LoadTestReport) -> list[str]:
        """Generate recommendations from load test results."""
        recs: list[str] = []

        if report.throttled_requests > 0:
            throttle_rate = report.throttled_requests / report.total_requests
            recs.append(
                f"Throttle rate: {throttle_rate:.1%} "
                f"({report.throttled_requests} of {report.total_requests}). "
                f"Request Bedrock quota increase or implement request queuing."
            )

        if report.sla_compliance_rate < 0.99:
            recs.append(
                f"SLA compliance: {report.sla_compliance_rate:.1%} — target 99%. "
                f"P95={report.p95_latency_ms}ms, P99={report.p99_latency_ms}ms. "
                f"Consider caching, model routing, or prompt shortening."
            )

        if report.projected_monthly_cost_usd > 50000:
            recs.append(
                f"Projected monthly cost: ${report.projected_monthly_cost_usd:,.0f}. "
                f"Route more queries to Haiku and increase cache hit rate."
            )

        if report.timeout_requests > 0:
            recs.append(
                f"{report.timeout_requests} timeout(s) detected. "
                f"Review prompt length and max_tokens settings. "
                f"Consider streaming for long responses."
            )

        return recs

Performance Optimization — Connection Pooling

Connection pooling is critical for MangaAssist at 1M messages/day. Creating a new boto3 client or HTTP connection per request adds 30-80ms of overhead — at scale, this wastes 500-900 CPU-hours/day.

Connection Pool Optimizer

"""
ConnectionPoolOptimizer — manages and optimizes connection pools for
MangaAssist's AWS service clients (Bedrock, DynamoDB, OpenSearch, Redis).
Eliminates per-request connection overhead and enforces pool sizing limits.
"""

import asyncio
import logging
import time
from collections import defaultdict
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from typing import Any, AsyncIterator, Optional

import boto3
from botocore.config import Config

logger = logging.getLogger(__name__)


@dataclass
class PoolMetrics:
    """Metrics for a single connection pool."""
    pool_name: str
    pool_size: int
    active_connections: int
    idle_connections: int
    total_checkouts: int = 0
    total_checkins: int = 0
    total_timeouts: int = 0
    avg_checkout_ms: float = 0.0
    max_checkout_ms: float = 0.0
    created_at: float = field(default_factory=time.time)


@dataclass
class PoolConfig:
    """Configuration for a connection pool."""
    pool_name: str
    min_size: int = 2
    max_size: int = 20
    max_idle_seconds: int = 300
    checkout_timeout_seconds: float = 5.0
    health_check_interval_seconds: float = 60.0
    retry_max_attempts: int = 3
    retry_mode: str = "adaptive"
    connect_timeout_seconds: float = 5.0
    read_timeout_seconds: float = 30.0


# Recommended pool configurations for MangaAssist services
MANGA_ASSIST_POOL_CONFIGS = {
    "bedrock": PoolConfig(
        pool_name="bedrock",
        min_size=5,
        max_size=50,
        read_timeout_seconds=10.0,
        connect_timeout_seconds=3.0,
        checkout_timeout_seconds=2.0,
    ),
    "dynamodb": PoolConfig(
        pool_name="dynamodb",
        min_size=3,
        max_size=30,
        read_timeout_seconds=5.0,
        connect_timeout_seconds=2.0,
    ),
    "opensearch": PoolConfig(
        pool_name="opensearch",
        min_size=3,
        max_size=20,
        read_timeout_seconds=5.0,
        connect_timeout_seconds=3.0,
    ),
    "redis": PoolConfig(
        pool_name="redis",
        min_size=5,
        max_size=100,
        read_timeout_seconds=1.0,
        connect_timeout_seconds=1.0,
        checkout_timeout_seconds=0.5,
    ),
}


class ConnectionPoolOptimizer:
    """
    Optimizes connection pools for all MangaAssist AWS services.

    Key optimizations:
      1. Module-level client singletons — no per-request client creation
      2. Tuned pool sizes per service (Bedrock needs more than DynamoDB)
      3. Adaptive retry configuration — exponential backoff on throttles
      4. Explicit timeouts — read/connect/checkout all bounded for 3s SLA
      5. Health checks — detect and recycle stale connections
      6. Metrics collection — track pool utilization for right-sizing
    """

    def __init__(
        self,
        region: str = "us-east-1",
        pool_configs: dict[str, PoolConfig] | None = None,
    ):
        self.region = region
        self.pool_configs = pool_configs or MANGA_ASSIST_POOL_CONFIGS
        self._clients: dict[str, Any] = {}
        self._metrics: dict[str, PoolMetrics] = {}
        self._checkout_times: dict[str, list[float]] = defaultdict(list)

    def _create_boto_config(self, pool_config: PoolConfig) -> Config:
        """Create a botocore Config optimized for the given pool configuration."""
        return Config(
            region_name=self.region,
            retries={
                "max_attempts": pool_config.retry_max_attempts,
                "mode": pool_config.retry_mode,
            },
            connect_timeout=pool_config.connect_timeout_seconds,
            read_timeout=pool_config.read_timeout_seconds,
            max_pool_connections=pool_config.max_size,
        )

    def get_bedrock_client(self) -> Any:
        """
        Get the pooled Bedrock Runtime client.
        Creates once, reuses for all subsequent calls.
        """
        if "bedrock" not in self._clients:
            config = self._create_boto_config(self.pool_configs["bedrock"])
            self._clients["bedrock"] = boto3.client(
                "bedrock-runtime",
                config=config,
                region_name=self.region,
            )
            self._metrics["bedrock"] = PoolMetrics(
                pool_name="bedrock",
                pool_size=self.pool_configs["bedrock"].max_size,
                active_connections=0,
                idle_connections=self.pool_configs["bedrock"].min_size,
            )
            logger.info(
                "Created Bedrock client pool (max=%d, timeout=%ds)",
                self.pool_configs["bedrock"].max_size,
                self.pool_configs["bedrock"].read_timeout_seconds,
            )
        return self._clients["bedrock"]

    def get_dynamodb_resource(self) -> Any:
        """Get the pooled DynamoDB resource."""
        if "dynamodb" not in self._clients:
            config = self._create_boto_config(self.pool_configs["dynamodb"])
            self._clients["dynamodb"] = boto3.resource(
                "dynamodb",
                config=config,
                region_name=self.region,
            )
            self._metrics["dynamodb"] = PoolMetrics(
                pool_name="dynamodb",
                pool_size=self.pool_configs["dynamodb"].max_size,
                active_connections=0,
                idle_connections=self.pool_configs["dynamodb"].min_size,
            )
        return self._clients["dynamodb"]

    def get_opensearch_config(self) -> dict[str, Any]:
        """Get optimized OpenSearch connection configuration."""
        pool_config = self.pool_configs["opensearch"]
        return {
            "hosts": [{"host": "search-manga-vectors.us-east-1.aoss.amazonaws.com", "port": 443}],
            "http_auth": None,  # Will use SigV4 auth
            "use_ssl": True,
            "verify_certs": True,
            "connection_class": "RequestsHttpConnection",
            "pool_maxsize": pool_config.max_size,
            "pool_connections": pool_config.min_size,
            "timeout": pool_config.read_timeout_seconds,
            "max_retries": pool_config.retry_max_attempts,
        }

    def get_redis_config(self) -> dict[str, Any]:
        """Get optimized Redis connection pool configuration."""
        pool_config = self.pool_configs["redis"]
        return {
            "host": "manga-cache.xxxxx.ng.0001.use1.cache.amazonaws.com",
            "port": 6379,
            "ssl": True,
            "max_connections": pool_config.max_size,
            "socket_timeout": pool_config.read_timeout_seconds,
            "socket_connect_timeout": pool_config.connect_timeout_seconds,
            "retry_on_timeout": True,
            "health_check_interval": int(pool_config.health_check_interval_seconds),
        }

    def record_checkout(self, pool_name: str, duration_ms: float) -> None:
        """Record a connection checkout for metrics."""
        if pool_name in self._metrics:
            self._metrics[pool_name].total_checkouts += 1
            self._metrics[pool_name].active_connections += 1
            self._checkout_times[pool_name].append(duration_ms)
            if duration_ms > self._metrics[pool_name].max_checkout_ms:
                self._metrics[pool_name].max_checkout_ms = duration_ms

    def record_checkin(self, pool_name: str) -> None:
        """Record a connection return for metrics."""
        if pool_name in self._metrics:
            self._metrics[pool_name].total_checkins += 1
            self._metrics[pool_name].active_connections = max(
                0, self._metrics[pool_name].active_connections - 1
            )

    def get_pool_metrics(self) -> dict[str, dict[str, Any]]:
        """Return current metrics for all connection pools."""
        result: dict[str, dict[str, Any]] = {}
        for name, metrics in self._metrics.items():
            checkout_times = self._checkout_times.get(name, [])
            result[name] = {
                "pool_size": metrics.pool_size,
                "active": metrics.active_connections,
                "idle": metrics.pool_size - metrics.active_connections,
                "total_checkouts": metrics.total_checkouts,
                "total_timeouts": metrics.total_timeouts,
                "avg_checkout_ms": round(
                    sum(checkout_times) / max(len(checkout_times), 1), 2
                ),
                "max_checkout_ms": round(metrics.max_checkout_ms, 2),
                "utilization": round(
                    metrics.active_connections / max(metrics.pool_size, 1), 3
                ),
            }
        return result

    def get_optimization_recommendations(self) -> list[str]:
        """Analyze pool metrics and recommend optimizations."""
        recs: list[str] = []
        for name, metrics in self._metrics.items():
            utilization = metrics.active_connections / max(metrics.pool_size, 1)

            if utilization > 0.8:
                recs.append(
                    f"Pool '{name}' utilization is {utilization:.0%}. "
                    f"Consider increasing max_size from {metrics.pool_size} "
                    f"to {int(metrics.pool_size * 1.5)}."
                )
            elif utilization < 0.2 and metrics.pool_size > 10:
                recs.append(
                    f"Pool '{name}' utilization is only {utilization:.0%}. "
                    f"Consider reducing max_size from {metrics.pool_size} "
                    f"to {max(5, int(metrics.pool_size * 0.5))} to save memory."
                )

            if metrics.total_timeouts > 0:
                timeout_rate = metrics.total_timeouts / max(metrics.total_checkouts, 1)
                if timeout_rate > 0.01:
                    recs.append(
                        f"Pool '{name}' timeout rate: {timeout_rate:.1%}. "
                        f"Increase pool size or reduce checkout timeout."
                    )

        return recs

Performance Optimization — Request Batching

Request batching reduces per-request overhead by grouping multiple operations into a single API call. For MangaAssist at 1M messages/day, batching DynamoDB writes and embedding generations can cut costs by 30-40%.

Batching Strategy Overview

Operation Without Batching With Batching Savings
DynamoDB session writes 1M put_item/day 40K batch_write/day (25 items each) 96% fewer API calls
Embedding generation 1M embed calls/day 100K batch calls (10 texts each) 90% fewer API calls
Redis pipeline commands 3M individual ops/day 300K pipelines (10 cmds each) 90% fewer round trips
Bedrock batch inference N/A (real-time) Offline analytics: batch 1000 at once 50% cost reduction

Batching Architecture

graph LR
    subgraph Incoming["Incoming Requests"]
        R1[Request 1]
        R2[Request 2]
        R3[Request 3]
        RN[Request N]
    end

    subgraph BatchCollector["Batch Collector"]
        Q[Async Queue<br/>Max Wait 50ms]
        B[Batch Builder<br/>Max Size 25]
        T[Timer<br/>Flush Interval]
    end

    subgraph BatchExecution["Batch Execution"]
        DDB[DynamoDB<br/>batch_write_item]
        EMB[Embedding<br/>batch_embed]
        RED[Redis<br/>pipeline.execute]
    end

    subgraph Results["Result Distribution"]
        F1[Future 1]
        F2[Future 2]
        F3[Future 3]
        FN[Future N]
    end

    R1 --> Q
    R2 --> Q
    R3 --> Q
    RN --> Q
    Q --> B
    T --> B
    B --> DDB
    B --> EMB
    B --> RED
    DDB --> F1
    DDB --> F2
    EMB --> F3
    RED --> FN

    style Q fill:#339af0,color:#fff
    style B fill:#339af0,color:#fff

Request Batcher Implementation

"""
Request batching utilities for MangaAssist.
Groups individual operations into efficient batch calls to reduce
per-request overhead and lower costs at scale.
"""

import asyncio
import logging
import time
from dataclasses import dataclass, field
from typing import Any, Callable, Coroutine, Generic, TypeVar

logger = logging.getLogger(__name__)

T = TypeVar("T")
R = TypeVar("R")


@dataclass
class BatchItem(Generic[T]):
    """A single item waiting to be batched."""
    data: T
    future: asyncio.Future
    enqueued_at: float = field(default_factory=time.monotonic)


class AsyncBatcher(Generic[T, R]):
    """
    Generic async batcher that collects individual items and executes
    them in batches when either max_batch_size or max_wait_ms is reached.

    Usage:
        batcher = AsyncBatcher(
            batch_fn=dynamodb_batch_write,
            max_batch_size=25,
            max_wait_ms=50,
        )
        result = await batcher.submit(item)  # Batched automatically
    """

    def __init__(
        self,
        batch_fn: Callable[[list[T]], Coroutine[Any, Any, list[R]]],
        max_batch_size: int = 25,
        max_wait_ms: float = 50.0,
        name: str = "batcher",
    ):
        self.batch_fn = batch_fn
        self.max_batch_size = max_batch_size
        self.max_wait_ms = max_wait_ms
        self.name = name
        self._queue: list[BatchItem[T]] = []
        self._lock = asyncio.Lock()
        self._flush_task: asyncio.Task | None = None
        self._total_batches = 0
        self._total_items = 0

    async def submit(self, item: T) -> R:
        """Submit a single item for batching. Returns when the batch completes."""
        loop = asyncio.get_event_loop()
        future: asyncio.Future[R] = loop.create_future()

        async with self._lock:
            self._queue.append(BatchItem(data=item, future=future))

            if len(self._queue) >= self.max_batch_size:
                await self._flush()
            elif self._flush_task is None or self._flush_task.done():
                self._flush_task = asyncio.create_task(self._delayed_flush())

        return await future

    async def _delayed_flush(self) -> None:
        """Flush after max_wait_ms if batch hasn't filled."""
        await asyncio.sleep(self.max_wait_ms / 1000.0)
        async with self._lock:
            if self._queue:
                await self._flush()

    async def _flush(self) -> None:
        """Execute the current batch and distribute results."""
        if not self._queue:
            return

        batch = self._queue[:]
        self._queue.clear()
        items = [bi.data for bi in batch]

        self._total_batches += 1
        self._total_items += len(items)

        try:
            results = await self.batch_fn(items)
            for bi, result in zip(batch, results):
                if not bi.future.done():
                    bi.future.set_result(result)
        except Exception as e:
            logger.error("Batch '%s' failed: %s", self.name, e)
            for bi in batch:
                if not bi.future.done():
                    bi.future.set_exception(e)

    def get_metrics(self) -> dict[str, Any]:
        """Return batching metrics."""
        return {
            "name": self.name,
            "total_batches": self._total_batches,
            "total_items": self._total_items,
            "avg_batch_size": round(
                self._total_items / max(self._total_batches, 1), 2
            ),
            "pending": len(self._queue),
            "efficiency": round(
                1 - (self._total_batches / max(self._total_items, 1)), 4
            ),
        }


# --- MangaAssist-specific batch implementations ---

async def dynamodb_batch_write(
    items: list[dict[str, Any]],
    table_name: str = "MangaAssist-Sessions",
    dynamodb_resource: Any = None,
) -> list[dict[str, Any]]:
    """
    Batch write items to DynamoDB (max 25 per call).
    Returns the list of items with write confirmation.
    """
    if dynamodb_resource is None:
        import boto3
        dynamodb_resource = boto3.resource("dynamodb")

    table = dynamodb_resource.Table(table_name)
    results = []

    with table.batch_writer() as batch:
        for item in items:
            batch.put_item(Item=item)
            results.append({"status": "written", "key": item.get("session_id", "")})

    return results


async def redis_pipeline_batch(
    commands: list[tuple[str, list[Any]]],
    redis_client: Any = None,
) -> list[Any]:
    """
    Execute multiple Redis commands in a pipeline for reduced round trips.
    Each command is a tuple of (command_name, [args]).
    """
    if redis_client is None:
        return [None] * len(commands)

    pipe = redis_client.pipeline(transaction=False)
    for cmd_name, args in commands:
        getattr(pipe, cmd_name)(*args)
    return pipe.execute()


async def embedding_batch_generate(
    texts: list[str],
    bedrock_client: Any = None,
    model_id: str = "amazon.titan-embed-text-v2:0",
) -> list[list[float]]:
    """
    Generate embeddings for multiple texts in a single batch call.
    Reduces per-text overhead from ~100ms to ~20ms at batch size 10.
    """
    import json

    if bedrock_client is None:
        # Return dummy embeddings for testing
        return [[0.0] * 1536 for _ in texts]

    embeddings = []
    for text in texts:
        body = json.dumps({"inputText": text})
        response = bedrock_client.invoke_model(
            modelId=model_id,
            contentType="application/json",
            accept="application/json",
            body=body,
        )
        result = json.loads(response["body"].read())
        embeddings.append(result["embedding"])

    return embeddings

Cost Projection Dashboard

graph TB
    subgraph CurrentMetrics["Current Metrics (from Profiler)"]
        CPQ[Cost per Query<br/>$0.0035 avg]
        TPD[Tokens per Day<br/>~500M input, ~200M output]
        CHR[Cache Hit Rate<br/>32%]
        MR[Model Routing<br/>65% Haiku / 35% Sonnet]
    end

    subgraph Projections["Daily Cost Projections at 1M msgs/day"]
        HC[Haiku Cost<br/>650K × $0.0003 = $195]
        SC[Sonnet Cost<br/>350K × $0.009 = $3,150]
        CC[Cache Savings<br/>320K queries saved = -$960]
        TC[Total Daily<br/>$2,385]
    end

    subgraph Optimization["Optimization Opportunities"]
        O1[Increase Haiku routing<br/>to 80% → saves $945/day]
        O2[Increase cache hit<br/>to 50% → saves $570/day]
        O3[Reduce max_tokens<br/>from 1024 to 512 → saves $400/day]
        O4[Combined savings<br/>$1,915/day → $57,450/month]
    end

    CPQ --> HC
    CPQ --> SC
    CHR --> CC
    HC --> TC
    SC --> TC
    CC --> TC
    TC --> O1
    TC --> O2
    TC --> O3
    O1 --> O4
    O2 --> O4
    O3 --> O4

    style TC fill:#ff6b6b,color:#fff
    style O4 fill:#51cf66,color:#fff

Key Takeaways

# Takeaway MangaAssist Application
1 Prompt regression testing catches quality degradation that standard tests miss — outputs can be syntactically correct but semantically wrong. PromptRegressionSuite uses 4-dimensional scoring (semantic similarity, keyword coverage, structural format, Japanese fluency) against golden-set baselines.
2 Load testing FM endpoints must model realistic traffic distributions, not just peak throughput — Bedrock enforces per-model TPM/RPM limits. FMLoadTester uses production traffic mix (25% lookups, 20% recommendations, 12% Japanese) with 5 traffic patterns including burst and spike.
3 Connection pooling eliminates 30-80ms per-request overhead — at 1M messages/day that saves 500-900 CPU-hours daily. ConnectionPoolOptimizer creates module-level client singletons with tuned pool sizes: Bedrock(50), DynamoDB(30), Redis(100).
4 Request batching cuts API call volume by 90%+ — group DynamoDB writes (25/batch), embeddings (10/batch), Redis commands (pipeline). AsyncBatcher collects items up to max_batch_size or max_wait_ms, then flushes — adding at most 50ms latency for 90% fewer round trips.
5 Cost projection from load tests prevents budget surprises — per-query cost × daily volume × 30 = monthly bill. At $0.0035/query average: $3,500/day, $105K/month. Optimizations (routing + caching + token reduction) can cut this to $47K/month.
6 Regression severity classification (critical > 20%, major 10-20%, minor 5-10%) enables proportionate CI/CD response — block on critical, warn on major. Critical regressions block merge; major regressions warn and require sign-off; minor regressions are logged for trend analysis.
7 Model output validation at runtime (not just test time) catches production-quality issues and enables automatic retry/fallback. Safety, language, and format validators run on every Bedrock response; failures trigger retry with Haiku or fallback to canned responses.
8 Pool utilization monitoring enables right-sizing — over-provisioned pools waste memory, under-provisioned pools cause checkout timeouts. Track utilization per pool; alert when Bedrock pool > 80% (scale up) or DynamoDB pool < 20% (scale down).