LOCAL PREVIEW View on GitHub

Scenarios & Runbooks — Developer Productivity

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.

Skill Mapping

Attribute Detail
Domain 2 — Implementation & Integration of GenAI Applications
Task 2.5 — Application Integration Patterns
Skill 2.5.4 — Developer Productivity
Focus Troubleshooting Q Developer suggestions, test failures, profiler overhead, security vulnerabilities, flaky FM tests
MangaAssist Scope Five production scenarios with detection, root cause analysis, resolution code, and prevention strategies

Mind Map

mindmap
  root((Developer Productivity<br/>Scenarios))
    Scenario 1
      Q Developer Deprecated API
        Outdated boto3 Patterns
        Legacy Bedrock API Calls
        Detection via Linting
        Version-Pinned Suggestions
    Scenario 2
      Test Not Catching Regression
        Prompt Quality Drift
        Missing Assertion Coverage
        Snapshot Test Gaps
        Weak Quality Thresholds
    Scenario 3
      Profiler Overhead
        tracemalloc CPU Impact
        cProfile Slowdown
        Production Profiling Risk
        Sampling vs Full Trace
    Scenario 4
      Security Vulnerability in Suggestion
        Prompt Injection in Generated Code
        Hardcoded Credentials
        Missing Input Sanitization
        Unsafe Deserialization
    Scenario 5
      Flaky Tests from Non-Deterministic FM
        Temperature Variance
        Token Sampling Randomness
        Model Version Updates
        Timing-Dependent Assertions

Scenario 1 — Q Developer Suggests Deprecated Bedrock API

Problem Statement

A developer using Amazon Q Developer in VS Code receives a code suggestion for invoking Bedrock that uses the old invoke_model body format with prompt as a top-level string (the legacy Anthropic API) instead of the current messages array format. The suggested code deploys, but fails at runtime because Claude 3 models require the Messages API.

Detection

graph TB
    QD[Q Developer Suggestion<br/>Uses legacy prompt format] --> CR[Code Review<br/>Reviewer spots old pattern]
    QD --> UT[Unit Test<br/>Test passes with mock<br/>but mock also uses old format]
    QD --> RT[Runtime Error<br/>ValidationException from Bedrock]

    RT --> CW[CloudWatch Alarm<br/>Spike in 400 errors]
    CW --> INV[Investigation<br/>Check error message details]
    INV --> RC[Root Cause<br/>Deprecated API format<br/>in Q Developer suggestion]

    style RT fill:#ff6b6b,color:#fff
    style RC fill:#ffd43b,color:#333

Root Cause

Amazon Q Developer's training data includes both legacy Anthropic completion format ("prompt": "\n\nHuman: ...") and the current Messages API format ("messages": [{"role": "user", "content": "..."}]). When project context is insufficient, Q Developer may suggest the older pattern because it was more prevalent in earlier training data. The mock Bedrock client in tests also accepted the old format, so the test suite did not catch the regression.

Resolution

"""
Resolution: Schema-validating Bedrock request wrapper that rejects deprecated formats.
Catches Q Developer suggestions that use legacy API before they reach production.
"""

import json
import logging
from typing import Any

logger = logging.getLogger(__name__)

# Supported Claude 3 models requiring Messages API
MESSAGES_API_MODELS = {
    "anthropic.claude-3-sonnet-20240229-v1:0",
    "anthropic.claude-3-haiku-20240307-v1:0",
    "anthropic.claude-3-opus-20240229-v1:0",
}


class BedrockRequestValidationError(Exception):
    """Raised when a Bedrock request uses a deprecated or invalid format."""
    pass


def validate_bedrock_request(model_id: str, body: dict[str, Any]) -> None:
    """
    Validate that a Bedrock request body uses the correct API format.

    Raises BedrockRequestValidationError if the body uses deprecated patterns.
    """
    errors = []

    # Check for legacy prompt format
    if "prompt" in body and isinstance(body["prompt"], str):
        errors.append(
            "DEPRECATED: Top-level 'prompt' string detected. "
            "Claude 3 models require 'messages' array format. "
            "Use: {'messages': [{'role': 'user', 'content': '...'}]}"
        )

    # Check for missing messages array
    if model_id in MESSAGES_API_MODELS and "messages" not in body:
        errors.append(
            f"Model {model_id} requires 'messages' array in request body. "
            "See: https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters-anthropic-claude-messages.html"
        )

    # Check for missing anthropic_version
    if model_id in MESSAGES_API_MODELS and "anthropic_version" not in body:
        errors.append(
            "Missing 'anthropic_version' field. "
            "Required value: 'bedrock-2023-05-31'"
        )

    # Check for legacy Human/Assistant format in messages
    if "messages" in body:
        for msg in body["messages"]:
            content = msg.get("content", "")
            if isinstance(content, str) and content.startswith("\n\nHuman:"):
                errors.append(
                    "DEPRECATED: Legacy '\\n\\nHuman:' prefix in message content. "
                    "Use role-based messages: {'role': 'user', 'content': '...'}"
                )
                break

    if errors:
        error_msg = "Bedrock request validation failed:\n" + "\n".join(f"  - {e}" for e in errors)
        logger.error(error_msg)
        raise BedrockRequestValidationError(error_msg)


def safe_invoke_model(
    bedrock_client: Any,
    model_id: str,
    body: dict[str, Any],
) -> dict[str, Any]:
    """Wrapper around invoke_model that validates the request first."""
    validate_bedrock_request(model_id, body)
    response = bedrock_client.invoke_model(
        modelId=model_id,
        contentType="application/json",
        accept="application/json",
        body=json.dumps(body),
    )
    return json.loads(response["body"].read())


# --- Linting rule for CI/CD ---

def lint_bedrock_calls(source_code: str) -> list[dict[str, str]]:
    """Static analysis to catch deprecated Bedrock API patterns in code."""
    issues = []
    lines = source_code.split("\n")
    for i, line in enumerate(lines, 1):
        stripped = line.strip()
        if '"prompt":' in stripped and "messages" not in source_code:
            issues.append({
                "line": i,
                "severity": "error",
                "message": "Possible deprecated Bedrock prompt format. Use 'messages' array for Claude 3.",
                "rule": "bedrock-deprecated-prompt-format",
            })
        if "\\n\\nHuman:" in stripped or "'\\n\\nHuman:" in stripped:
            issues.append({
                "line": i,
                "severity": "error",
                "message": "Legacy Human/Assistant prompt format detected. Use Messages API.",
                "rule": "bedrock-legacy-prompt-prefix",
            })
    return issues

Prevention

  1. Add a pre-commit hook that runs lint_bedrock_calls on all Python files touching Bedrock.
  2. Update the mock Bedrock client to reject legacy format — if the mock accepts it, tests pass falsely.
  3. Pin anthropic_version in a shared constant used across all Bedrock calls.
  4. Configure Q Developer workspace context with a .qdeveloper/context.md file specifying "Always use Claude 3 Messages API format."
  5. Add a CodeBuild lint stage that fails PRs containing deprecated patterns.

Scenario 2 — Test Suite Not Catching Prompt Quality Regression

Problem Statement

A developer modifies the manga recommendation system prompt to improve Japanese content. The unit tests pass because they only check for HTTP 200 and non-empty response. In production, the modified prompt causes recommendations to be generic and off-topic, losing manga specificity. Customer satisfaction drops 15% before the issue is detected through manual review.

Detection

graph TB
    PC[Prompt Change<br/>Modified system prompt] --> TS[Test Suite<br/>All tests pass]
    TS --> DEP[Deployment<br/>Changes go to production]
    DEP --> CSAT[Customer Feedback<br/>15% satisfaction drop]
    CSAT --> INV[Investigation<br/>Sample production responses]
    INV --> DIFF[Diff Analysis<br/>Compare old vs new outputs]
    DIFF --> RC[Root Cause<br/>Tests lacked quality assertions]

    TS -.->|Missing| QA[Quality Assertions<br/>Keyword / Relevance / Specificity]

    style CSAT fill:#ff6b6b,color:#fff
    style QA fill:#ffd43b,color:#333
    style RC fill:#ff6b6b,color:#fff

Root Cause

The test suite validated structural correctness (status codes, non-empty responses, valid JSON) but did not validate semantic quality. There were no assertions checking that manga-specific terms appeared, that Japanese content was present, or that recommendations matched the user's stated genre preference. The quality threshold was effectively 0%.

Resolution

"""
Resolution: Semantic quality assertions for MangaAssist prompt tests.
Ensures prompt changes maintain manga-specific response quality.
"""

import re
import statistics
from dataclasses import dataclass, field
from typing import Any


@dataclass
class QualityAssertion:
    """A single quality check with pass/fail criteria."""
    name: str
    passed: bool
    score: float
    message: str


class MangaQualityAssertions:
    """
    Quality assertion library for MangaAssist chatbot responses.
    Catches regressions in content specificity, language, and relevance.
    """

    # Manga-specific terms that should appear in recommendations
    MANGA_TERMS_JP = ["マンガ", "漫画", "コミック", "巻", "作者", "出版社", "連載", "単行本"]
    MANGA_TERMS_EN = ["manga", "volume", "chapter", "author", "series", "genre", "publisher"]

    # Genre-specific term sets
    GENRE_TERMS = {
        "action": ["アクション", "バトル", "戦い", "action", "battle", "fight"],
        "romance": ["恋愛", "ラブ", "romance", "love", "relationship"],
        "horror": ["ホラー", "恐怖", "horror", "scary", "terrifying"],
        "comedy": ["コメディ", "ギャグ", "笑い", "comedy", "funny", "humor"],
    }

    @classmethod
    def assert_manga_specificity(
        cls, response: str, min_terms: int = 2
    ) -> QualityAssertion:
        """Assert that the response contains manga-specific terminology."""
        response_lower = response.lower()
        found_jp = [t for t in cls.MANGA_TERMS_JP if t in response]
        found_en = [t for t in cls.MANGA_TERMS_EN if t in response_lower]
        total = len(found_jp) + len(found_en)
        passed = total >= min_terms
        return QualityAssertion(
            name="manga_specificity",
            passed=passed,
            score=min(1.0, total / max(min_terms, 1)),
            message=f"Found {total} manga terms (need {min_terms}): JP={found_jp}, EN={found_en}",
        )

    @classmethod
    def assert_genre_relevance(
        cls, response: str, genre: str, min_terms: int = 1
    ) -> QualityAssertion:
        """Assert that the response is relevant to the requested genre."""
        genre_terms = cls.GENRE_TERMS.get(genre, [])
        if not genre_terms:
            return QualityAssertion(
                name="genre_relevance",
                passed=True,
                score=1.0,
                message=f"No genre terms defined for '{genre}'; skipped",
            )
        response_lower = response.lower()
        found = [t for t in genre_terms if t in response or t in response_lower]
        passed = len(found) >= min_terms
        return QualityAssertion(
            name="genre_relevance",
            passed=passed,
            score=min(1.0, len(found) / max(min_terms, 1)),
            message=f"Genre '{genre}': found {found} (need {min_terms})",
        )

    @classmethod
    def assert_japanese_content(
        cls, response: str, min_ratio: float = 0.05
    ) -> QualityAssertion:
        """Assert that the response contains a minimum ratio of Japanese characters."""
        if not response:
            return QualityAssertion(
                name="japanese_content",
                passed=False,
                score=0.0,
                message="Empty response",
            )
        jp_chars = sum(
            1 for ch in response
            if "\u3040" <= ch <= "\u9fff" or "\uf900" <= ch <= "\ufaff"
        )
        ratio = jp_chars / len(response)
        passed = ratio >= min_ratio
        return QualityAssertion(
            name="japanese_content",
            passed=passed,
            score=min(1.0, ratio / max(min_ratio, 0.001)),
            message=f"Japanese ratio: {ratio:.3f} (min: {min_ratio})",
        )

    @classmethod
    def assert_structured_response(
        cls, response: str, min_items: int = 2
    ) -> QualityAssertion:
        """Assert that the response contains structured items (numbered or bulleted)."""
        numbered = re.findall(r"^\d+[\.\)]\s", response, re.MULTILINE)
        bulleted = re.findall(r"^[-*•]\s", response, re.MULTILINE)
        markdown_bold = re.findall(r"\*\*.+?\*\*", response)
        total = max(len(numbered), len(bulleted), len(markdown_bold))
        passed = total >= min_items
        return QualityAssertion(
            name="structured_response",
            passed=passed,
            score=min(1.0, total / max(min_items, 1)),
            message=f"Found {total} structured items (need {min_items})",
        )

    @classmethod
    def assert_no_hallucination_markers(cls, response: str) -> QualityAssertion:
        """Check for common hallucination indicators."""
        markers = [
            "I don't have access to",
            "I cannot verify",
            "I'm not sure if",
            "as an AI language model",
            "I apologize, but I don't have real-time",
        ]
        found = [m for m in markers if m.lower() in response.lower()]
        passed = len(found) == 0
        return QualityAssertion(
            name="no_hallucination_markers",
            passed=passed,
            score=1.0 if passed else 0.0,
            message=f"Hallucination markers: {found}" if found else "No markers found",
        )

    @classmethod
    def run_all_assertions(
        cls, response: str, genre: str | None = None
    ) -> dict[str, Any]:
        """Run the full assertion suite and return a summary."""
        assertions = [
            cls.assert_manga_specificity(response),
            cls.assert_japanese_content(response),
            cls.assert_structured_response(response),
            cls.assert_no_hallucination_markers(response),
        ]
        if genre:
            assertions.append(cls.assert_genre_relevance(response, genre))

        all_passed = all(a.passed for a in assertions)
        avg_score = statistics.mean(a.score for a in assertions)

        return {
            "all_passed": all_passed,
            "average_score": round(avg_score, 3),
            "assertions": [
                {
                    "name": a.name,
                    "passed": a.passed,
                    "score": a.score,
                    "message": a.message,
                }
                for a in assertions
            ],
        }

Prevention

  1. Add MangaQualityAssertions.run_all_assertions() to every prompt-related test — no test passes without quality score >= 0.7.
  2. Implement snapshot testing — capture golden responses and diff new outputs against them for semantic drift.
  3. Add a "prompt change review" label in GitHub that triggers extended test suites including human-eval samples.
  4. Track quality scores over time in a dashboard so drift is visible before customer impact.
  5. Run quality assertions in staging against 100 sample queries before promoting to production.

Scenario 3 — Profiler Overhead Degrading Production Performance

Problem Statement

A developer enables tracemalloc and cProfile in the ECS Fargate production containers to investigate a memory leak. The profiling overhead adds 400-600ms to every request, pushing P95 latency from 2.2s to 2.8s and occasionally over the 3-second budget. Some users experience timeouts, and the API Gateway WebSocket connection drops under load.

Detection

graph TB
    PF[Profiler Enabled<br/>tracemalloc + cProfile<br/>in production container] --> LI[Latency Increase<br/>P95: 2.2s → 2.8s]
    LI --> CW[CloudWatch Alarm<br/>P95 > 2.5s threshold]
    LI --> TO[Timeouts<br/>3s budget breached]
    TO --> WS[WebSocket Drops<br/>API Gateway 504]
    WS --> USR[User Impact<br/>Incomplete responses]

    CW --> INV[Investigation<br/>Check recent deployments]
    INV --> ENV[Environment Check<br/>ENABLE_PROFILING=true<br/>in task definition]
    ENV --> RC[Root Cause<br/>Full profiling in prod<br/>adds 400-600ms overhead]

    style TO fill:#ff6b6b,color:#fff
    style RC fill:#ffd43b,color:#333

Root Cause

The profiling flags were set as environment variables in the ECS task definition (ENABLE_PROFILING=true) during a debugging session and were not removed before the next deployment. Both tracemalloc (which instruments every memory allocation) and cProfile (which wraps every function call) were active on all production containers. The combined overhead consumed 20-30% of the 3-second latency budget.

Resolution

"""
Resolution: Safe profiling system that prevents production overhead.
Uses sampling-based profiling and environment-aware guards.
"""

import logging
import os
import random
import time
import functools
from contextlib import contextmanager
from typing import Any, Callable, Generator

logger = logging.getLogger(__name__)


class Environment:
    """Detects the current deployment environment."""
    PRODUCTION = "production"
    STAGING = "staging"
    DEVELOPMENT = "development"

    @staticmethod
    def current() -> str:
        return os.environ.get("ENVIRONMENT", "development").lower()

    @staticmethod
    def is_production() -> bool:
        return Environment.current() == Environment.PRODUCTION


class SafeProfiler:
    """
    Production-safe profiling system for MangaAssist.

    Key safety features:
    - Disabled by default in production
    - Sampling-based (profiles 1 in N requests)
    - Automatic timeout to prevent runaway profiling
    - Overhead budget enforcement
    """

    def __init__(
        self,
        sample_rate: float = 0.01,       # Profile 1% of requests
        max_overhead_ms: float = 100.0,   # Max acceptable overhead
        auto_disable_after_s: float = 300.0,  # Auto-disable after 5 minutes
        allow_in_production: bool = False,
    ):
        self.sample_rate = sample_rate
        self.max_overhead_ms = max_overhead_ms
        self.auto_disable_after_s = auto_disable_after_s
        self.allow_in_production = allow_in_production
        self._enabled = False
        self._enabled_at: float | None = None
        self._overhead_samples: list[float] = []

    def enable(self) -> bool:
        """Enable profiling with safety checks."""
        if Environment.is_production() and not self.allow_in_production:
            logger.warning(
                "Profiling blocked in production. "
                "Set allow_in_production=True explicitly if needed."
            )
            return False
        self._enabled = True
        self._enabled_at = time.monotonic()
        logger.info(
            "Profiling enabled (sample_rate=%.2f, auto_disable=%ds)",
            self.sample_rate,
            self.auto_disable_after_s,
        )
        return True

    def disable(self) -> None:
        """Disable profiling."""
        self._enabled = False
        self._enabled_at = None
        self._overhead_samples.clear()
        logger.info("Profiling disabled")

    def _should_profile(self) -> bool:
        """Determine if this request should be profiled."""
        if not self._enabled:
            return False

        # Auto-disable after timeout
        if self._enabled_at is not None:
            elapsed = time.monotonic() - self._enabled_at
            if elapsed > self.auto_disable_after_s:
                logger.warning("Profiling auto-disabled after %ds", int(elapsed))
                self.disable()
                return False

        # Check overhead budget
        if len(self._overhead_samples) >= 10:
            avg_overhead = sum(self._overhead_samples[-10:]) / 10
            if avg_overhead > self.max_overhead_ms:
                logger.warning(
                    "Profiling auto-disabled: avg overhead %.0fms > %.0fms budget",
                    avg_overhead,
                    self.max_overhead_ms,
                )
                self.disable()
                return False

        # Sampling decision
        return random.random() < self.sample_rate

    @contextmanager
    def profile_request(self, request_id: str) -> Generator[dict[str, Any], None, None]:
        """Context manager for request-level profiling."""
        profile_data: dict[str, Any] = {"profiled": False, "request_id": request_id}

        if not self._should_profile():
            yield profile_data
            return

        profile_data["profiled"] = True
        start = time.monotonic()

        try:
            yield profile_data
        finally:
            overhead_ms = (time.monotonic() - start) * 1000
            # Subtract estimated actual work time (track only overhead)
            profile_data["overhead_ms"] = round(overhead_ms * 0.15, 2)  # ~15% is profiling overhead
            self._overhead_samples.append(profile_data["overhead_ms"])

    def timing_decorator(self, operation: str) -> Callable:
        """Lightweight timing-only decorator (safe for production)."""
        def decorator(func: Callable) -> Callable:
            @functools.wraps(func)
            async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
                start = time.monotonic()
                result = await func(*args, **kwargs)
                elapsed_ms = (time.monotonic() - start) * 1000
                if elapsed_ms > self.max_overhead_ms * 10:  # Log slow operations
                    logger.warning(
                        "Slow operation: %s took %.0fms", operation, elapsed_ms
                    )
                return result

            @functools.wraps(func)
            def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
                start = time.monotonic()
                result = func(*args, **kwargs)
                elapsed_ms = (time.monotonic() - start) * 1000
                if elapsed_ms > self.max_overhead_ms * 10:
                    logger.warning(
                        "Slow operation: %s took %.0fms", operation, elapsed_ms
                    )
                return result

            import asyncio
            if asyncio.iscoroutinefunction(func):
                return async_wrapper
            return sync_wrapper
        return decorator


# --- ECS Task Definition Guard ---

def validate_task_definition(task_def: dict[str, Any]) -> list[str]:
    """Validate an ECS task definition for profiling safety."""
    warnings = []
    for container in task_def.get("containerDefinitions", []):
        env_vars = {
            e["name"]: e["value"]
            for e in container.get("environment", [])
        }
        if env_vars.get("ENABLE_PROFILING", "").lower() == "true":
            warnings.append(
                f"Container '{container['name']}' has ENABLE_PROFILING=true. "
                "This will degrade production performance by 20-30%. "
                "Remove or set to 'false' before deploying to production."
            )
        if env_vars.get("PYTHONTRACEMALLOC", ""):
            warnings.append(
                f"Container '{container['name']}' has PYTHONTRACEMALLOC set. "
                "tracemalloc instruments every allocation and adds significant overhead."
            )
    return warnings

Prevention

  1. Never enable full profiling in production — use the SafeProfiler with 1% sampling rate and auto-disable.
  2. Add validate_task_definition() as a CDK/CloudFormation deployment guard that blocks deploys with profiling flags.
  3. Use CloudWatch Embedded Metrics Format (EMF) for production latency tracking instead of in-process profilers.
  4. Implement a profiling flag in Parameter Store with auto-expiry so it cannot be left on permanently.
  5. Set up a P95 latency alarm at 2.5s (below the 3s budget) to catch overhead early.

Scenario 4 — Security Vulnerability in Q Developer Code Suggestion

Problem Statement

Amazon Q Developer suggests code for handling user search queries that directly interpolates user input into an OpenSearch query string without sanitization. This creates a query injection vulnerability where a malicious user could craft a search query that extracts data from other users' sessions or modifies search results.

Detection

graph TB
    QD[Q Developer Suggestion<br/>Unsanitized user input<br/>in OpenSearch query] --> CR[Code Review<br/>Reviewer misses injection risk]
    CR --> MR[Merge to Main<br/>Deployed to production]
    MR --> PEN[Penetration Test<br/>Quarterly security scan]
    PEN --> FIND[Finding<br/>Query injection vulnerability]

    MR --> ALT[Alternative Detection<br/>SAST Scanner]
    ALT --> BLOCK[Block Deployment]

    FIND --> RC[Root Cause<br/>Q Developer suggested<br/>f-string query construction]

    style FIND fill:#ff6b6b,color:#fff
    style RC fill:#ff6b6b,color:#fff
    style BLOCK fill:#51cf66,color:#fff

Root Cause

Q Developer generated an OpenSearch query using Python f-strings: query = f'{{"query": {{"match": {{"title": "{user_input}"}}}}}}'. This pattern allows a user to inject arbitrary JSON into the query body by including special characters like "}}, "match_all": {} in their search input. The vulnerability is a classic injection pattern that code generation tools can produce because their training data contains both safe and unsafe patterns.

Resolution

"""
Resolution: Secure OpenSearch query builder that prevents injection.
Replaces f-string query construction with parameterized builders.
"""

import json
import logging
import re
from typing import Any, Optional

logger = logging.getLogger(__name__)


class QueryInjectionError(Exception):
    """Raised when a potential query injection is detected."""
    pass


class SecureOpenSearchQueryBuilder:
    """
    Builds OpenSearch queries safely by parameterizing all user input.
    Prevents query injection attacks in MangaAssist search.
    """

    # Characters that could indicate injection attempts
    INJECTION_PATTERNS = [
        r'[{}\[\]]',           # JSON structural characters
        r'\\u[0-9a-fA-F]{4}', # Unicode escapes
        r'"\s*:\s*',           # Key-value patterns
        r'_score',             # Score manipulation
        r'_source',            # Source field access
        r'script',             # Script injection
    ]

    MAX_QUERY_LENGTH = 500
    MAX_FIELD_VALUES = 10

    @classmethod
    def sanitize_input(cls, user_input: str) -> str:
        """Sanitize user input for safe use in OpenSearch queries."""
        if not user_input:
            return ""
        # Truncate to max length
        sanitized = user_input[:cls.MAX_QUERY_LENGTH]
        # Remove null bytes
        sanitized = sanitized.replace("\x00", "")
        # Escape special Lucene characters
        special_chars = r'+-=&|><!(){}[]^"~*?:\/'
        for char in special_chars:
            sanitized = sanitized.replace(char, f"\\{char}")
        return sanitized.strip()

    @classmethod
    def detect_injection(cls, user_input: str) -> bool:
        """Check if user input contains potential injection patterns."""
        for pattern in cls.INJECTION_PATTERNS:
            if re.search(pattern, user_input):
                logger.warning(
                    "Potential query injection detected: pattern=%s, input=%s",
                    pattern,
                    user_input[:50],
                )
                return True
        return False

    @classmethod
    def manga_search_query(
        cls,
        search_text: str,
        genre: str | None = None,
        author: str | None = None,
        language: str = "ja",
        max_results: int = 10,
    ) -> dict[str, Any]:
        """Build a safe manga search query with optional filters."""
        # Sanitize all user inputs
        safe_text = cls.sanitize_input(search_text)
        safe_genre = cls.sanitize_input(genre) if genre else None
        safe_author = cls.sanitize_input(author) if author else None

        # Check for injection attempts
        if cls.detect_injection(search_text):
            raise QueryInjectionError(
                "Potentially malicious search input detected"
            )

        # Build query using safe dictionary construction
        must_clauses: list[dict[str, Any]] = []

        if safe_text:
            must_clauses.append({
                "multi_match": {
                    "query": safe_text,
                    "fields": ["title^3", "title.japanese^3", "description", "author"],
                    "type": "best_fields",
                    "fuzziness": "AUTO",
                }
            })

        filter_clauses: list[dict[str, Any]] = []
        if safe_genre:
            filter_clauses.append({"term": {"genre.keyword": safe_genre}})
        if safe_author:
            filter_clauses.append({"term": {"author.keyword": safe_author}})
        if language:
            filter_clauses.append({"term": {"language": language}})

        query = {
            "size": min(max_results, 50),
            "query": {
                "bool": {
                    "must": must_clauses or [{"match_all": {}}],
                    "filter": filter_clauses,
                }
            },
            "_source": ["title", "author", "genre", "price", "isbn", "cover_url"],
        }

        return query

    @classmethod
    def vector_search_query(
        cls,
        embedding: list[float],
        k: int = 10,
        genre_filter: str | None = None,
    ) -> dict[str, Any]:
        """Build a safe k-NN vector search query."""
        # Validate embedding dimensions
        if not embedding or len(embedding) != 1536:
            raise ValueError(
                f"Expected 1536-dim embedding, got {len(embedding) if embedding else 0}"
            )
        # Validate numeric values
        if not all(isinstance(v, (int, float)) for v in embedding):
            raise ValueError("Embedding must contain only numeric values")

        query: dict[str, Any] = {
            "size": min(k, 50),
            "query": {
                "knn": {
                    "embedding": {
                        "vector": embedding,
                        "k": min(k, 50),
                    }
                }
            },
            "_source": ["title", "author", "genre", "price", "isbn"],
        }

        if genre_filter:
            safe_genre = cls.sanitize_input(genre_filter)
            query["query"] = {
                "bool": {
                    "must": [query["query"]],
                    "filter": [{"term": {"genre.keyword": safe_genre}}],
                }
            }

        return query


# --- SAST rule for CI/CD pipeline ---

def scan_for_query_injection(source_code: str) -> list[dict[str, Any]]:
    """Static analysis to detect unsafe query construction patterns."""
    findings = []
    lines = source_code.split("\n")
    for i, line in enumerate(lines, 1):
        # Detect f-string query construction
        if re.search(r'f["\'].*\{.*user.*\}.*query', line, re.IGNORECASE):
            findings.append({
                "line": i,
                "severity": "critical",
                "rule": "opensearch-injection",
                "message": "User input interpolated into query via f-string. Use SecureOpenSearchQueryBuilder.",
            })
        # Detect string format with user input in queries
        if ".format(" in line and ("query" in line.lower() or "search" in line.lower()):
            findings.append({
                "line": i,
                "severity": "high",
                "rule": "opensearch-injection-format",
                "message": "String .format() used in query construction. Use parameterized builder.",
            })
        # Detect string concatenation in queries
        if ("+ user" in line or "+ query" in line) and "search" in line.lower():
            findings.append({
                "line": i,
                "severity": "high",
                "rule": "opensearch-injection-concat",
                "message": "String concatenation in search query. Use SecureOpenSearchQueryBuilder.",
            })
    return findings

Prevention

  1. Run scan_for_query_injection() in CI/CD — block merges with critical findings.
  2. Use SecureOpenSearchQueryBuilder as the only allowed query construction method — enforce via code review checklist.
  3. Enable Amazon Q Developer security scanning which flags potential injection patterns.
  4. Add WAF rules on API Gateway to detect and block common injection payloads in WebSocket messages.
  5. Log all query injection detection events to CloudWatch for security monitoring.

Scenario 5 — Flaky Tests from Non-Deterministic FM Responses

Problem Statement

The MangaAssist test suite intermittently fails in CI/CD. Tests pass locally but fail 15-20% of the time in GitHub Actions. The failures are in tests that assert on exact FM response text, which varies due to temperature sampling, model version updates, and token-level randomness even at temperature=0.

Detection

graph TB
    CI[CI/CD Pipeline<br/>GitHub Actions] --> F1[Run 1: PASS]
    CI --> F2[Run 2: FAIL<br/>assertion on response text]
    CI --> F3[Run 3: PASS]
    CI --> F4[Run 4: FAIL<br/>different assertion text]

    F2 --> INV[Investigation<br/>Compare failure messages]
    F4 --> INV
    INV --> PAT[Pattern Analysis<br/>Failures are non-deterministic<br/>Same test, different outputs]
    PAT --> RC[Root Cause<br/>Tests assert on exact text<br/>from non-deterministic FM]

    F2 -.->|Retry| F2R[Run 2 Retry: PASS]
    F4 -.->|Retry| F4R[Run 4 Retry: PASS]

    style F2 fill:#ff6b6b,color:#fff
    style F4 fill:#ff6b6b,color:#fff
    style RC fill:#ffd43b,color:#333

Root Cause

Multiple factors contribute to non-deterministic FM responses: 1. Temperature > 0 — Even small values (0.1) introduce token-level randomness. 2. Model version updates — Bedrock may silently update to a new model patch version. 3. Batching effects — Different batch sizes in inference can affect floating-point operations. 4. Tests asserting exact textassertEqual(response, "exact expected text") fails on any token difference.

Resolution

"""
Resolution: Deterministic testing strategy for non-deterministic FM responses.
Uses semantic assertions, fixture mocks, and statistical acceptance.
"""

import hashlib
import json
import logging
import statistics
from difflib import SequenceMatcher
from typing import Any, Callable

logger = logging.getLogger(__name__)


class DeterministicTestStrategy:
    """
    Strategies for making FM-based tests deterministic.
    Eliminates flakiness from non-deterministic model outputs.
    """

    @staticmethod
    def semantic_similarity(text_a: str, text_b: str) -> float:
        """Compute semantic similarity using SequenceMatcher (no ML needed)."""
        return SequenceMatcher(None, text_a.lower(), text_b.lower()).ratio()

    @staticmethod
    def assert_semantic_match(
        actual: str,
        expected: str,
        min_similarity: float = 0.6,
    ) -> tuple[bool, float]:
        """Assert responses are semantically similar, not identical."""
        similarity = DeterministicTestStrategy.semantic_similarity(actual, expected)
        passed = similarity >= min_similarity
        return passed, round(similarity, 3)

    @staticmethod
    def assert_structural_match(
        actual: str,
        expected_structure: dict[str, Any],
    ) -> tuple[bool, list[str]]:
        """
        Assert the response matches a structural pattern.

        expected_structure = {
            "min_length": 50,
            "max_length": 2000,
            "contains_all": ["manga", "volume"],
            "contains_any": ["recommend", "suggest", "try"],
            "has_numbered_list": True,
            "min_items": 3,
        }
        """
        failures = []
        actual_lower = actual.lower()

        min_len = expected_structure.get("min_length", 0)
        if len(actual) < min_len:
            failures.append(f"Too short: {len(actual)} < {min_len}")

        max_len = expected_structure.get("max_length", float("inf"))
        if len(actual) > max_len:
            failures.append(f"Too long: {len(actual)} > {max_len}")

        contains_all = expected_structure.get("contains_all", [])
        for term in contains_all:
            if term.lower() not in actual_lower:
                failures.append(f"Missing required term: '{term}'")

        contains_any = expected_structure.get("contains_any", [])
        if contains_any:
            found_any = any(t.lower() in actual_lower for t in contains_any)
            if not found_any:
                failures.append(f"Missing any of: {contains_any}")

        if expected_structure.get("has_numbered_list"):
            import re
            items = re.findall(r"^\d+[\.\)]\s", actual, re.MULTILINE)
            min_items = expected_structure.get("min_items", 1)
            if len(items) < min_items:
                failures.append(f"Expected {min_items}+ list items, found {len(items)}")

        return len(failures) == 0, failures

    @staticmethod
    def statistical_assertion(
        test_func: Callable[[], str],
        validation_func: Callable[[str], bool],
        runs: int = 5,
        min_pass_rate: float = 0.8,
    ) -> tuple[bool, dict[str, Any]]:
        """
        Run a test multiple times and accept if pass rate meets threshold.

        Use for inherently non-deterministic tests where 80%+ pass rate
        indicates correct behavior.
        """
        results = []
        for i in range(runs):
            try:
                output = test_func()
                passed = validation_func(output)
                results.append({"run": i, "passed": passed, "error": None})
            except Exception as e:
                results.append({"run": i, "passed": False, "error": str(e)})

        pass_count = sum(1 for r in results if r["passed"])
        pass_rate = pass_count / runs

        return pass_rate >= min_pass_rate, {
            "runs": runs,
            "passed": pass_count,
            "failed": runs - pass_count,
            "pass_rate": round(pass_rate, 3),
            "min_pass_rate": min_pass_rate,
            "results": results,
        }


class MockFirstTestPattern:
    """
    Test pattern that uses mocks by default and real FM only in integration.

    Unit tests: Always use MockBedrockClient (deterministic).
    Integration tests: Use real Bedrock but with structural assertions.
    Smoke tests: Run against production with statistical acceptance.
    """

    @staticmethod
    def unit_test_example() -> dict[str, Any]:
        """
        Example unit test structure — no real FM calls, always deterministic.
        """
        return {
            "test_name": "test_manga_recommendation_prompt",
            "mock_setup": "MockBedrockClient with fixture 'manga_recommendation_001'",
            "assertions": [
                "assert response is not None",
                "assert 'manga' in response.lower()",
                "assert len(response) > 50",
                "assert fixture.usage['output_tokens'] < 500",
            ],
            "flaky_risk": "NONE — fully deterministic via fixtures",
        }

    @staticmethod
    def integration_test_example() -> dict[str, Any]:
        """
        Example integration test structure — real FM, structural assertions.
        """
        return {
            "test_name": "test_manga_recommendation_e2e",
            "bedrock_call": "Real invoke_model with temperature=0",
            "assertions": [
                "assert_structural_match(response, {min_length: 100, contains_all: ['manga']})",
                "assert_manga_specificity(response, min_terms=2)",
                "assert latency_ms < 3000",
            ],
            "flaky_risk": "LOW — structural + semantic checks tolerate token variation",
        }

    @staticmethod
    def smoke_test_example() -> dict[str, Any]:
        """
        Example smoke test structure — production, statistical acceptance.
        """
        return {
            "test_name": "test_manga_recommendation_smoke",
            "environment": "production",
            "method": "statistical_assertion(runs=5, min_pass_rate=0.8)",
            "assertions": [
                "response not empty",
                "latency < 3000ms",
                "no error codes",
            ],
            "flaky_risk": "MINIMAL — 80% pass rate absorbs occasional variance",
        }


# --- pytest fixture example ---

PYTEST_FIXTURE_CODE = '''
import pytest
from unittest.mock import AsyncMock
from manga_assist.mock_bedrock import MockBedrockClient
from manga_assist.fixtures import FixtureLibrary

@pytest.fixture
def mock_bedrock():
    """Deterministic Bedrock client for unit tests."""
    client = MockBedrockClient(seed=42)
    return client

@pytest.fixture
def fixture_library():
    """FM response fixtures for test assertions."""
    return FixtureLibrary()

@pytest.mark.asyncio
async def test_manga_recommendation(mock_bedrock):
    """Test recommendation with deterministic mock — never flaky."""
    result = await mock_bedrock.invoke(
        model_id="anthropic.claude-3-haiku-20240307-v1:0",
        prompt="Recommend action manga",
    )
    assert result["text"]
    assert result["usage"]["input_tokens"] > 0
    assert result["latency_ms"] > 0
    assert result["cost_usd"] >= 0

@pytest.mark.integration
@pytest.mark.asyncio
async def test_manga_recommendation_real(real_bedrock_client):
    """Integration test with structural assertions — low flakiness."""
    result = await real_bedrock_client.invoke(
        model_id="anthropic.claude-3-haiku-20240307-v1:0",
        prompt="Recommend action manga",
        max_tokens=512,
    )
    passed, failures = DeterministicTestStrategy.assert_structural_match(
        result["text"],
        {
            "min_length": 50,
            "contains_any": ["manga", "マンガ", "漫画"],
            "has_numbered_list": True,
            "min_items": 2,
        },
    )
    assert passed, f"Structural match failures: {failures}"
'''

Prevention

  1. Default to MockBedrockClient for all unit tests — no real FM calls in pytest without @pytest.mark.integration.
  2. Use structural and semantic assertions instead of exact text matching for integration tests.
  3. Set temperature=0 in all test configurations to minimize (but not eliminate) randomness.
  4. Use statistical acceptance (80% pass rate over 5 runs) for smoke tests against production.
  5. Pin model versions explicitly in test configurations to avoid silent model updates.
  6. Tag flaky tests with @pytest.mark.flaky(reruns=3) and track their stability over time.

Key Takeaways

# Takeaway MangaAssist Application
1 Validate Q Developer suggestions against current API schemas before accepting. Legacy patterns are common in generated code. A pre-commit linter rejects deprecated Bedrock prompt formats and flags legacy Human/Assistant prefixes.
2 Semantic quality assertions catch regressions that structural tests miss. Manga specificity, genre relevance, and Japanese content must be validated. Every prompt test includes MangaQualityAssertions.run_all_assertions() with a 0.7 minimum score.
3 Never enable full profiling in production. Use sampling-based profiling with auto-disable to protect the 3-second latency budget. SafeProfiler samples 1% of requests, auto-disables after 5 minutes, and blocks itself in production by default.
4 Security scanning must cover AI-generated code — code suggestions can include injection vulnerabilities. scan_for_query_injection() runs in CI/CD to block f-string and .format() patterns in OpenSearch queries.
5 Mock-first testing eliminates FM-induced flakiness. Only integration and smoke tests should call real Bedrock. Unit tests use MockBedrockClient(seed=42) for fully deterministic behavior, cutting flaky test rate from 20% to 0%.