LOCAL PREVIEW View on GitHub

02: FM Integration Troubleshooting

AIP-C01 Mapping

Task 5.2 → Skill 5.2.2: Diagnose and resolve FM integration issues to identify and fix API integration problems specific to GenAI services (error logging, request validation, response analysis).


User Story

As a backend engineer on the MangaAssist team, I want to systematically diagnose and resolve Foundation Model API integration failures, So that the chatbot maintains reliable communication with Amazon Bedrock, handles transient and systemic failures gracefully, and provides degraded but useful responses when the FM is unavailable.


Acceptance Criteria

  • Every Bedrock API call is logged with correlation ID, request size, response latency, and status code
  • Request validation catches payload issues (missing fields, oversized input, invalid parameters) before the API call
  • Response analysis detects and handles malformed JSON, incomplete streaming, and unexpected content
  • Circuit breaker prevents cascading failures when Bedrock is degraded or throttled
  • Retry strategy uses exponential backoff with jitter; max 2 retries for non-idempotent calls
  • Error rate > 5% triggers automatic fallback to template-based responses for known intents
  • Mean time to detect (MTTD) for FM integration failures < 2 minutes via CloudWatch alarms

High-Level Design

FM Integration Failure Taxonomy

graph TD
    A[FM Integration<br>Failure] --> B[Request Failures]
    A --> C[Response Failures]
    A --> D[Streaming Failures]
    A --> E[Systemic Failures]

    B --> B1[Payload validation error<br>400 Bad Request]
    B --> B2[Token limit exceeded<br>413 / validation error]
    B --> B3[Model not found<br>404]
    B --> B4[Missing permissions<br>403]

    C --> C1[Malformed JSON output]
    C --> C2[Incomplete response<br>cutoff mid-sentence]
    C --> C3[Hallucinated structure<br>wrong schema]
    C --> C4[Empty response body]

    D --> D1[Stream timeout<br>no chunks received]
    D --> D2[Stream interrupted<br>partial delivery]
    D --> D3[WebSocket drop<br>during streaming]
    D --> D4[Chunk ordering error]

    E --> E1[Throttling<br>429 Too Many Requests]
    E --> E2[Service outage<br>500/503]
    E --> E3[Model deprecation<br>endpoint removed]
    E --> E4[Region failover<br>needed]

Error Handling Architecture

flowchart TD
    A[Chat Request] --> B[Request Validator]
    B -->|Invalid| C[Return 400 with<br>specific error]
    B -->|Valid| D{Circuit Breaker<br>State?}

    D -->|OPEN| E[Return cached/template<br>response immediately]
    D -->|HALF_OPEN| F[Allow single probe<br>request through]
    D -->|CLOSED| G[Call Bedrock API]

    G --> H{Response<br>received?}
    H -->|Timeout| I{Retry count<br>< max?}
    H -->|Error 429| J[Backoff + Retry<br>with jitter]
    H -->|Error 5xx| I
    H -->|Success| K[Response Analyzer]

    I -->|Yes| G
    I -->|No| L[Record failure<br>in circuit breaker]

    J --> G

    K --> M{Response<br>valid?}
    M -->|Malformed JSON| N[Attempt repair<br>+ re-validate]
    M -->|Incomplete| O[Log + return<br>partial if usable]
    M -->|Valid| P[Return to<br>Orchestrator]

    L --> E
    N -->|Repaired| P
    N -->|Unrepairable| E

    F --> H

Bedrock Error Code Reference

HTTP Status Error Code Cause MangaAssist Impact Resolution
400 ValidationException Malformed request body, invalid model parameters Single request fails Fix payload; log + alert if recurring
400 ModelTimeoutException Input too large or model overloaded Response delayed or lost Reduce input size; retry once
403 AccessDeniedException IAM role lacks bedrock:InvokeModel All requests fail Fix IAM policy; critical alert
404 ResourceNotFoundException Model ID wrong or deprecated All requests fail Update model ID; critical alert
429 ThrottlingException Exceeded provisioned throughput or account limit Requests queued or dropped Backoff + jitter; request quota increase
500 InternalServerError Bedrock service error Intermittent failures Retry with backoff; open circuit if persistent
503 ServiceUnavailableException Bedrock capacity issue All requests fail Circuit breaker opens; template fallback

Low-Level Design

1. Structured Error Logging

Every Bedrock interaction gets a standardized log entry. This is the single most important troubleshooting investment because it makes every failure diagnosable after the fact.

import json
import time
import uuid
import logging
from dataclasses import dataclass, field, asdict
from typing import Optional

logger = logging.getLogger("mangaassist.fm_integration")


@dataclass
class BedrockCallLog:
    """Structured log entry for every Bedrock API call."""
    correlation_id: str
    session_id: str
    intent: str
    model_id: str
    request_timestamp: float
    response_timestamp: Optional[float] = None

    # Request metrics
    input_tokens: int = 0
    max_output_tokens: int = 0
    temperature: float = 0.0
    prompt_sections: dict = field(default_factory=dict)  # section -> token count

    # Response metrics
    output_tokens: int = 0
    latency_ms: float = 0.0
    time_to_first_token_ms: Optional[float] = None

    # Status
    status: str = "pending"  # pending, success, error, timeout, throttled
    error_code: Optional[str] = None
    error_message: Optional[str] = None
    retry_count: int = 0

    # Quality signals
    response_valid_json: Optional[bool] = None
    response_has_products: Optional[bool] = None
    guardrail_blocked: bool = False

    def to_log_dict(self) -> dict:
        """Produce a structured log dictionary safe for CloudWatch Logs."""
        d = asdict(self)
        d["log_type"] = "bedrock_call"
        d["latency_ms"] = round(self.latency_ms, 2)
        if self.time_to_first_token_ms is not None:
            d["time_to_first_token_ms"] = round(self.time_to_first_token_ms, 2)
        return d

    def finalize(self, status: str, output_tokens: int = 0, error_code: str = None, error_message: str = None):
        self.response_timestamp = time.time()
        self.latency_ms = (self.response_timestamp - self.request_timestamp) * 1000
        self.status = status
        self.output_tokens = output_tokens
        self.error_code = error_code
        self.error_message = error_message
        logger.info(json.dumps(self.to_log_dict()))

2. Request Validator

Catches bad requests before they hit Bedrock. Every rejected request is cheaper, faster, and more diagnosable than a Bedrock 400 error.

from dataclasses import dataclass
from typing import Optional


@dataclass
class ValidationResult:
    valid: bool
    error_code: Optional[str] = None
    error_message: Optional[str] = None
    field: Optional[str] = None


class BedrockRequestValidator:
    """Validates Bedrock invoke_model payloads before submission.

    Catches:
    - Missing required fields
    - Token counts exceeding model limits
    - Invalid parameter values (temperature, top_p, etc.)
    - Empty or whitespace-only prompts
    """

    MODEL_LIMITS = {
        "anthropic.claude-3-5-sonnet-20241022-v2:0": {
            "max_input_tokens": 200_000,
            "max_output_tokens": 8_192,
            "temperature_range": (0.0, 1.0),
            "top_p_range": (0.0, 1.0),
        },
        "anthropic.claude-3-haiku-20240307-v1:0": {
            "max_input_tokens": 200_000,
            "max_output_tokens": 4_096,
            "temperature_range": (0.0, 1.0),
            "top_p_range": (0.0, 1.0),
        },
    }

    # Practical budget — not the model limit, but what we allow to keep cost/latency in check
    PRACTICAL_INPUT_LIMIT = 5_000
    PRACTICAL_OUTPUT_LIMIT = 1_500

    def validate(self, model_id: str, messages: list, params: dict) -> ValidationResult:
        """Validate a Bedrock Messages API request."""

        # 1. Model exists and is supported
        if model_id not in self.MODEL_LIMITS:
            return ValidationResult(
                valid=False,
                error_code="INVALID_MODEL",
                error_message=f"Model '{model_id}' is not configured. Available: {list(self.MODEL_LIMITS.keys())}",
                field="model_id",
            )

        limits = self.MODEL_LIMITS[model_id]

        # 2. Messages list is not empty
        if not messages or len(messages) == 0:
            return ValidationResult(
                valid=False,
                error_code="EMPTY_MESSAGES",
                error_message="Messages list is empty",
                field="messages",
            )

        # 3. Last message is from user
        if messages[-1].get("role") != "user":
            return ValidationResult(
                valid=False,
                error_code="INVALID_TURN_ORDER",
                error_message="Last message must be from 'user' role",
                field="messages[-1].role",
            )

        # 4. No empty content
        for i, msg in enumerate(messages):
            content = msg.get("content", "")
            if isinstance(content, str) and not content.strip():
                return ValidationResult(
                    valid=False,
                    error_code="EMPTY_CONTENT",
                    error_message=f"Message at index {i} has empty content",
                    field=f"messages[{i}].content",
                )

        # 5. Token count within practical limits
        total_input_tokens = self._estimate_tokens(messages)
        if total_input_tokens > self.PRACTICAL_INPUT_LIMIT:
            return ValidationResult(
                valid=False,
                error_code="INPUT_TOO_LARGE",
                error_message=(
                    f"Estimated input tokens ({total_input_tokens}) exceed practical limit "
                    f"({self.PRACTICAL_INPUT_LIMIT}). Compress prompt before submission."
                ),
                field="messages",
            )

        # 6. Hard model limit check
        if total_input_tokens > limits["max_input_tokens"]:
            return ValidationResult(
                valid=False,
                error_code="INPUT_EXCEEDS_MODEL_LIMIT",
                error_message=f"Input tokens ({total_input_tokens}) exceed model limit ({limits['max_input_tokens']})",
                field="messages",
            )

        # 7. Parameter validation
        temperature = params.get("temperature", 0.0)
        min_t, max_t = limits["temperature_range"]
        if not (min_t <= temperature <= max_t):
            return ValidationResult(
                valid=False,
                error_code="INVALID_TEMPERATURE",
                error_message=f"Temperature {temperature} outside range [{min_t}, {max_t}]",
                field="temperature",
            )

        max_tokens = params.get("max_tokens", 1000)
        if max_tokens > limits["max_output_tokens"]:
            return ValidationResult(
                valid=False,
                error_code="MAX_TOKENS_EXCEEDED",
                error_message=f"max_tokens ({max_tokens}) exceeds model limit ({limits['max_output_tokens']})",
                field="max_tokens",
            )

        return ValidationResult(valid=True)

    def _estimate_tokens(self, messages: list) -> int:
        total = 0
        for msg in messages:
            content = msg.get("content", "")
            if isinstance(content, str):
                total += len(content) // 4
            elif isinstance(content, list):
                for block in content:
                    if isinstance(block, dict) and "text" in block:
                        total += len(block["text"]) // 4
        return total

3. Bedrock Client with Retry and Circuit Breaker

import time
import random
import json
import logging
import boto3
from dataclasses import dataclass, field
from typing import Optional, AsyncIterator
from enum import Enum

logger = logging.getLogger("mangaassist.fm_integration")


class CircuitState(Enum):
    CLOSED = "closed"       # Normal operation
    OPEN = "open"           # Failing — reject requests immediately
    HALF_OPEN = "half_open" # Testing — allow one probe request


@dataclass
class CircuitBreaker:
    """Circuit breaker for Bedrock API calls.

    Opens after `failure_threshold` consecutive failures.
    Half-opens after `recovery_timeout_seconds`.
    Closes after a successful probe in HALF_OPEN state.
    """
    failure_threshold: int = 5
    recovery_timeout_seconds: float = 30.0

    state: CircuitState = CircuitState.CLOSED
    failure_count: int = 0
    last_failure_time: float = 0.0
    last_state_change: float = field(default_factory=time.time)

    def record_success(self):
        self.failure_count = 0
        if self.state == CircuitState.HALF_OPEN:
            self._transition(CircuitState.CLOSED)
            logger.info("Circuit breaker CLOSED — Bedrock recovered")

    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()

        if self.state == CircuitState.HALF_OPEN:
            self._transition(CircuitState.OPEN)
            logger.warning("Circuit breaker OPEN (probe failed) — Bedrock still degraded")
        elif self.failure_count >= self.failure_threshold:
            self._transition(CircuitState.OPEN)
            logger.warning(
                "Circuit breaker OPEN — %d consecutive failures",
                self.failure_count,
            )

    def allow_request(self) -> bool:
        if self.state == CircuitState.CLOSED:
            return True
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time >= self.recovery_timeout_seconds:
                self._transition(CircuitState.HALF_OPEN)
                logger.info("Circuit breaker HALF_OPEN — allowing probe request")
                return True
            return False
        if self.state == CircuitState.HALF_OPEN:
            return True  # Allow the single probe
        return False

    def _transition(self, new_state: CircuitState):
        old_state = self.state
        self.state = new_state
        self.last_state_change = time.time()
        logger.info(
            "Circuit breaker transition: %s -> %s",
            old_state.value, new_state.value,
        )


class BedrockError(Exception):
    """Structured exception for Bedrock API failures."""
    def __init__(self, error_code: str, message: str, retryable: bool, status_code: int = 0):
        super().__init__(message)
        self.error_code = error_code
        self.retryable = retryable
        self.status_code = status_code


@dataclass
class BedrockClientWrapper:
    """Production Bedrock client with validation, retry, circuit breaking, and structured logging."""

    model_id: str = "anthropic.claude-3-5-sonnet-20241022-v2:0"
    region: str = "ap-northeast-1"
    max_retries: int = 2
    base_backoff_seconds: float = 0.5
    max_backoff_seconds: float = 8.0

    def __post_init__(self):
        self.client = boto3.client("bedrock-runtime", region_name=self.region)
        self.validator = BedrockRequestValidator()
        self.circuit_breaker = CircuitBreaker()
        self._call_log: Optional[BedrockCallLog] = None

    def invoke(
        self,
        messages: list,
        system_prompt: str,
        session_id: str,
        intent: str,
        temperature: float = 0.0,
        max_tokens: int = 1000,
    ) -> dict:
        """Synchronous Bedrock invocation with full error handling pipeline."""

        correlation_id = str(uuid.uuid4())
        params = {"temperature": temperature, "max_tokens": max_tokens}

        # Initialize structured log
        self._call_log = BedrockCallLog(
            correlation_id=correlation_id,
            session_id=session_id,
            intent=intent,
            model_id=self.model_id,
            request_timestamp=time.time(),
            input_tokens=self.validator._estimate_tokens(messages),
            max_output_tokens=max_tokens,
            temperature=temperature,
        )

        # Step 1: Validate request
        validation = self.validator.validate(self.model_id, messages, params)
        if not validation.valid:
            self._call_log.finalize(
                status="validation_error",
                error_code=validation.error_code,
                error_message=validation.error_message,
            )
            raise BedrockError(
                error_code=validation.error_code,
                message=validation.error_message,
                retryable=False,
            )

        # Step 2: Check circuit breaker
        if not self.circuit_breaker.allow_request():
            self._call_log.finalize(
                status="circuit_open",
                error_code="CIRCUIT_OPEN",
                error_message="Circuit breaker is open — Bedrock degraded",
            )
            raise BedrockError(
                error_code="CIRCUIT_OPEN",
                message="Bedrock circuit breaker is open. Use template fallback.",
                retryable=False,
            )

        # Step 3: Call Bedrock with retry
        last_error = None
        for attempt in range(1 + self.max_retries):
            try:
                response = self._invoke_bedrock(messages, system_prompt, params)
                self.circuit_breaker.record_success()

                # Step 4: Analyze response
                analyzed = self._analyze_response(response)

                self._call_log.finalize(
                    status="success",
                    output_tokens=analyzed.get("output_tokens", 0),
                )
                self._call_log.response_valid_json = analyzed.get("valid_json", False)
                self._call_log.response_has_products = analyzed.get("has_products", False)
                self._call_log.retry_count = attempt

                return analyzed

            except self.client.exceptions.ThrottlingException as e:
                last_error = e
                self._call_log.retry_count = attempt
                backoff = self._calculate_backoff(attempt)
                logger.warning(
                    "Bedrock throttled (attempt %d/%d), backing off %.1fs",
                    attempt + 1, 1 + self.max_retries, backoff,
                    extra={"correlation_id": correlation_id, "backoff_seconds": backoff},
                )
                time.sleep(backoff)

            except (
                self.client.exceptions.InternalServerError,
                self.client.exceptions.ServiceUnavailableException,
            ) as e:
                last_error = e
                self.circuit_breaker.record_failure()
                if attempt < self.max_retries:
                    backoff = self._calculate_backoff(attempt)
                    logger.warning(
                        "Bedrock server error (attempt %d/%d), retrying in %.1fs",
                        attempt + 1, 1 + self.max_retries, backoff,
                        extra={"correlation_id": correlation_id},
                    )
                    time.sleep(backoff)

            except Exception as e:
                # Non-retryable error
                self.circuit_breaker.record_failure()
                self._call_log.finalize(
                    status="error",
                    error_code=type(e).__name__,
                    error_message=str(e)[:500],
                )
                raise BedrockError(
                    error_code=type(e).__name__,
                    message=str(e),
                    retryable=False,
                )

        # All retries exhausted
        self.circuit_breaker.record_failure()
        self._call_log.finalize(
            status="retries_exhausted",
            error_code=type(last_error).__name__,
            error_message=str(last_error)[:500],
        )
        raise BedrockError(
            error_code="RETRIES_EXHAUSTED",
            message=f"Bedrock call failed after {1 + self.max_retries} attempts: {last_error}",
            retryable=False,
        )

    def _invoke_bedrock(self, messages: list, system_prompt: str, params: dict) -> dict:
        """Raw Bedrock API call."""
        body = {
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": params["max_tokens"],
            "temperature": params["temperature"],
            "system": system_prompt,
            "messages": messages,
        }

        response = self.client.invoke_model(
            modelId=self.model_id,
            contentType="application/json",
            accept="application/json",
            body=json.dumps(body),
        )

        return json.loads(response["body"].read())

    def _calculate_backoff(self, attempt: int) -> float:
        """Exponential backoff with full jitter."""
        base = self.base_backoff_seconds * (2 ** attempt)
        capped = min(base, self.max_backoff_seconds)
        return random.uniform(0, capped)

    def _analyze_response(self, response: dict) -> dict:
        """Analyze Bedrock response for quality signals."""
        result = {
            "raw_response": response,
            "output_tokens": response.get("usage", {}).get("output_tokens", 0),
            "stop_reason": response.get("stop_reason", "unknown"),
            "valid_json": False,
            "has_products": False,
            "text": "",
        }

        # Extract text content
        content_blocks = response.get("content", [])
        text_parts = []
        for block in content_blocks:
            if block.get("type") == "text":
                text_parts.append(block["text"])

        result["text"] = "\n".join(text_parts)

        # Check if response contains valid JSON (for structured outputs)
        try:
            # Try to parse the entire text as JSON
            json.loads(result["text"])
            result["valid_json"] = True
        except (json.JSONDecodeError, TypeError):
            # Try to find JSON within the text (common with markdown fences)
            import re
            json_match = re.search(r'```json\s*(.*?)\s*```', result["text"], re.DOTALL)
            if json_match:
                try:
                    json.loads(json_match.group(1))
                    result["valid_json"] = True
                except (json.JSONDecodeError, TypeError):
                    pass

        # Check for product data
        result["has_products"] = bool(re.search(r'B0[A-Z0-9]{8}', result["text"]))

        # Check for incomplete response
        if response.get("stop_reason") == "max_tokens":
            logger.warning(
                "Response truncated at max_tokens — consider increasing output budget",
                extra={"output_tokens": result["output_tokens"]},
            )
            result["was_truncated"] = True

        return result

4. Response Analyzer — Repair and Validation

import json
import re
import logging
from dataclasses import dataclass
from typing import Optional

logger = logging.getLogger("mangaassist.fm_integration")


@dataclass
class ResponseAnalysis:
    is_valid: bool
    response_text: str
    parsed_json: Optional[dict] = None
    repair_applied: bool = False
    repair_type: Optional[str] = None
    issues: list = None

    def __post_init__(self):
        if self.issues is None:
            self.issues = []


class ResponseAnalyzer:
    """Validates and repairs FM responses.

    Common issues in production:
    1. JSON with trailing commas (Claude sometimes does this)
    2. JSON wrapped in markdown code fences
    3. Incomplete JSON due to output token limit
    4. Mixed text + JSON when structured output was expected
    5. Missing required fields in structured response
    """

    REQUIRED_RESPONSE_FIELDS = ["response_text"]
    OPTIONAL_RESPONSE_FIELDS = ["products", "actions", "follow_up_suggestions"]

    def analyze(self, raw_text: str, expected_format: str = "text") -> ResponseAnalysis:
        """Analyze and optionally repair the FM response."""
        issues = []

        if not raw_text or not raw_text.strip():
            return ResponseAnalysis(
                is_valid=False,
                response_text="",
                issues=["Empty response from FM"],
            )

        if expected_format == "json":
            return self._analyze_json_response(raw_text)
        elif expected_format == "structured":
            return self._analyze_structured_response(raw_text)
        else:
            return ResponseAnalysis(is_valid=True, response_text=raw_text)

    def _analyze_json_response(self, raw_text: str) -> ResponseAnalysis:
        """Try to extract valid JSON from the response."""
        issues = []

        # Attempt 1: Direct parse
        try:
            parsed = json.loads(raw_text)
            return ResponseAnalysis(
                is_valid=True, response_text=raw_text, parsed_json=parsed,
            )
        except json.JSONDecodeError:
            issues.append("Direct JSON parse failed")

        # Attempt 2: Extract from markdown code fence
        json_match = re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', raw_text)
        if json_match:
            try:
                parsed = json.loads(json_match.group(1))
                return ResponseAnalysis(
                    is_valid=True,
                    response_text=json_match.group(1),
                    parsed_json=parsed,
                    repair_applied=True,
                    repair_type="extracted_from_code_fence",
                    issues=issues,
                )
            except json.JSONDecodeError:
                issues.append("JSON in code fence also invalid")

        # Attempt 3: Remove trailing commas (common FM mistake)
        cleaned = re.sub(r',\s*([}\]])', r'\1', raw_text)
        try:
            parsed = json.loads(cleaned)
            return ResponseAnalysis(
                is_valid=True,
                response_text=cleaned,
                parsed_json=parsed,
                repair_applied=True,
                repair_type="removed_trailing_commas",
                issues=issues,
            )
        except json.JSONDecodeError:
            issues.append("Trailing comma removal did not fix JSON")

        # Attempt 4: Try to close unclosed braces/brackets (truncated response)
        repaired = self._try_close_json(raw_text)
        if repaired:
            try:
                parsed = json.loads(repaired)
                logger.warning(
                    "Repaired truncated JSON by closing brackets",
                    extra={"repair_type": "closed_brackets"},
                )
                return ResponseAnalysis(
                    is_valid=True,
                    response_text=repaired,
                    parsed_json=parsed,
                    repair_applied=True,
                    repair_type="closed_truncated_json",
                    issues=issues + ["WARNING: JSON was truncated and auto-closed"],
                )
            except json.JSONDecodeError:
                issues.append("Bracket closure repair failed")

        # All repair attempts failed
        logger.error(
            "Unrepairable JSON response",
            extra={"issues": issues, "raw_text_preview": raw_text[:200]},
        )
        return ResponseAnalysis(
            is_valid=False,
            response_text=raw_text,
            issues=issues + ["All JSON repair attempts failed"],
        )

    def _analyze_structured_response(self, raw_text: str) -> ResponseAnalysis:
        """Validate structured response against expected MangaAssist schema."""
        analysis = self._analyze_json_response(raw_text)

        if not analysis.is_valid or analysis.parsed_json is None:
            return analysis

        # Check required fields
        missing_required = [
            f for f in self.REQUIRED_RESPONSE_FIELDS
            if f not in analysis.parsed_json
        ]
        if missing_required:
            analysis.issues.append(f"Missing required fields: {missing_required}")
            analysis.is_valid = False

        # Validate product ASINs if present
        products = analysis.parsed_json.get("products", [])
        for i, product in enumerate(products):
            asin = product.get("asin", "")
            if asin and not re.match(r'^B0[A-Z0-9]{8}$', asin):
                analysis.issues.append(f"Invalid ASIN format in product[{i}]: {asin}")

        return analysis

    def _try_close_json(self, text: str) -> Optional[str]:
        """Try to close truncated JSON by counting open/close brackets."""
        open_braces = text.count('{') - text.count('}')
        open_brackets = text.count('[') - text.count(']')

        if open_braces <= 0 and open_brackets <= 0:
            return None  # Not a bracket issue

        # Remove any trailing partial key-value pair
        text = re.sub(r',\s*"[^"]*$', '', text)  # Remove trailing partial key
        text = re.sub(r',\s*$', '', text)          # Remove trailing comma

        # Close brackets
        text += ']' * open_brackets + '}' * open_braces

        return text

5. MangaAssist Scenarios

Scenario A: Prime Day Throttling Cascade

Context: Prime Day traffic increases MangaAssist usage 5x. Bedrock starts returning 429 ThrottlingException for 15% of requests.

Symptom: Users see "Sorry, I'm having trouble responding right now" messages. Response latency P95 jumps from 2.5s to 12s due to retries.

Detection: CloudWatch alarm fires on BedrockThrottleRate > 5%. Structured logs show status: "throttled" with retry_count: 2 before failure.

Resolution: 1. Circuit breaker opens after 5 consecutive throttle failures → template responses served instantly 2. Backoff with jitter prevents retry storms from making throttling worse 3. Ops team requests Bedrock provisioned throughput increase for the event

Prevention: - Pre-provision Bedrock throughput before known traffic events - Configure lower-cost model fallback (Claude Haiku) for simple intents during peak

Scenario B: Model Version Deprecation

Context: Model ID anthropic.claude-3-sonnet-20240229-v1:0 is deprecated. The MangaAssist staging environment starts failing with 404 ResourceNotFoundException.

Detection: BedrockRequestValidator catches the error in staging. Structured logs show error_code: "ResourceNotFoundException" for 100% of requests.

Resolution: Update model_id configuration to anthropic.claude-3-5-sonnet-20241022-v2:0. Run prompt regression tests (see file 03) to verify output quality with the new model version.

Prevention: Subscribe to AWS Bedrock model lifecycle notifications. Add model ID to externalized configuration (not hardcoded).

Scenario C: Streaming Response Interruption

Context: WebSocket connection drops mid-stream for ~3% of users on mobile networks. The user sees a partial response that cuts off mid-sentence.

Detection: CloudWatch metric StreamInterruptionRate (emitted by WebSocket handler) > 2%. Structured logs show stop_reason: "connection_closed" instead of "end_turn".

Resolution: 1. Implement response buffering in the orchestrator — buffer the full response before streaming to the WebSocket 2. Add a GET /chat/message/{response_id} fallback endpoint so clients can fetch the complete response after reconnection 3. Client-side retry: on WebSocket reconnect, fetch the buffered response for any in-flight response IDs


6. CloudWatch Dashboard and Alerts

Key Metrics

Metric Alarm Threshold Action
BedrockLatencyP95 > 3,000ms Warn: check model throughput
BedrockErrorRate > 5% for 2 min Page: circuit breaker likely open
BedrockThrottleRate > 2% for 5 min Warn: request throughput increase
CircuitBreakerState OPEN for > 1 min Page: all requests on template fallback
ResponseRepairRate > 10% Warn: FM output quality degraded
ValidationRejectionRate > 1% Warn: upstream sending bad payloads

CloudWatch Logs Insights Queries

Bedrock error breakdown by type:

fields @timestamp, error_code, intent, latency_ms
| filter log_type = "bedrock_call" and status != "success"
| stats count(*) as error_count by error_code, intent
| sort error_count desc

Retry effectiveness — how often do retries succeed?

fields @timestamp, correlation_id, retry_count, status
| filter log_type = "bedrock_call" and retry_count > 0
| stats count(*) as total,
        sum(case when status = "success" then 1 else 0 end) as retry_successes
| display total, retry_successes, (retry_successes * 100.0 / total) as retry_success_pct

Circuit breaker state changes:

fields @timestamp, @message
| filter @message like /Circuit breaker transition/
| sort @timestamp desc
| limit 50

Latency percentiles by intent:

fields @timestamp, intent, latency_ms
| filter log_type = "bedrock_call" and status = "success"
| stats avg(latency_ms) as avg_ms,
        percentile(latency_ms, 50) as p50,
        percentile(latency_ms, 95) as p95,
        percentile(latency_ms, 99) as p99
  by intent
| sort p95 desc


Intuition Gained

What Mental Model You Build

FM integration troubleshooting teaches you to read API failures like a story. Every failure has a character (the error code), a plot (the sequence of events leading to it), and a moral (the systemic fix that prevents recurrence).

You develop three core instincts:

1. The Transient vs. Systemic Instinct: A single 429 is traffic noise. Five consecutive 429s is a capacity problem. A 404 is never transient — it means something changed permanently. You learn to classify failures instantly and respond appropriately: retry transient errors, escalate systemic ones, and never retry permanent failures.

2. The Graceful Degradation Instinct: You stop thinking in terms of "works" vs. "broken" and start thinking in degradation levels. The chatbot has five modes: (1) full FM response, (2) FM response with repaired JSON, (3) FM response from backup model (Haiku), (4) template response for known intent, (5) apologetic message with link to human agent. You design systems with all five modes built in.

3. The Instrumentation-First Instinct: You learn that the cost of adding a structured log entry is near-zero, but the cost of debugging without one is hours. Every Bedrock call gets a correlation ID, input token count, output token count, latency, status, and retry count — because the one field you skip is the one you need at 2 AM during an incident.

How This Intuition Guides Future Decisions

  • When integrating a new FM provider: You immediately ask "What are the error codes? What's the retry policy? What's the rate limit?" before writing any prompt logic. You build the circuit breaker and fallback path before the happy path.
  • When evaluating FM reliability: You look beyond uptime SLAs. You ask about throttling behavior under load, error response consistency, and streaming reliability. A service with 99.9% uptime but unpredictable throttling is harder to integrate than one with 99.5% uptime and clean error codes.
  • When designing multi-model architectures: You know that model tiering (Sonnet for complex, Haiku for simple) is not just a cost optimization — it is a reliability pattern. If Sonnet is throttled, Haiku might not be. If one model version is deprecated, the others keep running.
  • When debugging user-reported issues: You trace the correlation ID through logs before looking at the prompt or model output. Eight times out of ten, the issue is in the integration layer (timeout, truncation, throttling), not in the model itself.