LOCAL PREVIEW View on GitHub

Scenarios and Runbooks — Flexible Model Interaction

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Dimension Value
Certification AWS Certified AI Practitioner — Specialty (AIP-C01)
Task 2.4 — Select and implement FM API integration patterns
Skill 2.4.1 — Flexible Model Interaction
This File 03 — Scenarios and Runbooks (sync timeout, SQS backlog, JP text validation, Lambda throttling, async callback failure)

Skill Scope

This file provides five production incident scenarios that exercise the synchronous/asynchronous Bedrock integration patterns, API Gateway validation, and SQS processing pipeline. Each scenario includes the problem statement, a detection flow diagram, root cause analysis, resolution code, and preventive measures.


Scenario 1: Synchronous Timeout — 504 Gateway Timeout

Problem

During evening peak hours (19:00-22:00 JST) when manga readers are most active, approximately 15% of chat requests return HTTP 504 Gateway Timeout. Users see a blank response after waiting the full 29 seconds. The issue correlates with Sonnet model invocations for complex queries (multi-turn conversations with 8+ turns of context).

Detection

flowchart TD
    A[CloudWatch Alarm:<br/>5XX rate > 5%] --> B[Check API Gateway<br/>Latency metrics]
    B --> C{IntegrationLatency<br/>> 29000ms?}
    C -->|Yes| D[Backend timeout<br/>confirmed]
    C -->|No| E[Check ECS task<br/>health]

    D --> F[Check Bedrock<br/>InvocationLatency metric]
    F --> G{Bedrock latency<br/>> 25000ms?}
    G -->|Yes| H[Model inference<br/>is the bottleneck]
    G -->|No| I[Check DynamoDB/<br/>Redis latency]

    H --> J[Check input<br/>token count]
    J --> K{Input tokens<br/>> 8000?}
    K -->|Yes| L[Root Cause:<br/>Oversized context window]
    K -->|No| M[Root Cause:<br/>Bedrock service latency]

    style L fill:#ff6b6b
    style M fill:#ffa07a

Root Cause

Long conversation histories accumulate in DynamoDB sessions. When the orchestrator loads all turns and includes them in the prompt, the input token count exceeds 8,000 tokens. Claude 3 Sonnet's inference time scales with input length — at 8K+ input tokens with a 4K max output, the model needs 25-30 seconds, exceeding the API Gateway 29-second timeout.

Resolution

"""
Fix: Context window management with token budgeting.
Truncates conversation history to fit within a token budget
while preserving the most recent and most relevant turns.
"""
import re
import logging
from typing import List, Dict

logger = logging.getLogger(__name__)

# Budget: 25s timeout means ~6000 input tokens max for Sonnet
MAX_INPUT_TOKENS = 6000
SYSTEM_PROMPT_TOKENS = 200  # Approximate system prompt size
SAFETY_MARGIN = 500


def estimate_tokens(text: str) -> int:
    """Estimate token count — Japanese chars ~1.5 tokens each."""
    jp_chars = len(re.findall(r"[\u3000-\u9fff]", text))
    other = len(text) - jp_chars
    return int(jp_chars * 1.5 + other * 0.3)


def truncate_conversation_history(
    history: List[Dict],
    current_message: str,
    max_tokens: int = MAX_INPUT_TOKENS,
) -> List[Dict]:
    """
    Truncate conversation history to fit token budget.

    Strategy:
    1. Always keep the first turn (establishes topic)
    2. Always keep the last 3 turns (recent context)
    3. Summarize middle turns if needed
    4. Drop oldest middle turns first
    """
    budget = max_tokens - SYSTEM_PROMPT_TOKENS - SAFETY_MARGIN
    current_tokens = estimate_tokens(current_message)
    budget -= current_tokens

    if not history:
        return []

    # Always keep first and last 3 turns
    if len(history) <= 4:
        return history

    first_turn = [history[0]]
    last_turns = history[-3:]
    middle_turns = history[1:-3]

    # Calculate token usage of required turns
    required_tokens = sum(estimate_tokens(t["content"]) for t in first_turn + last_turns)

    if required_tokens > budget:
        # Even required turns exceed budget — keep only last 2
        logger.warning(
            "Context severely over budget | required=%d | budget=%d",
            required_tokens, budget,
        )
        return history[-2:]

    remaining_budget = budget - required_tokens
    kept_middle = []

    # Add middle turns from most recent to oldest
    for turn in reversed(middle_turns):
        turn_tokens = estimate_tokens(turn["content"])
        if remaining_budget >= turn_tokens:
            kept_middle.insert(0, turn)
            remaining_budget -= turn_tokens
        else:
            break

    result = first_turn + kept_middle + last_turns
    total_tokens = sum(estimate_tokens(t["content"]) for t in result) + current_tokens

    logger.info(
        "Context truncated | original=%d turns | kept=%d turns | tokens=%d/%d",
        len(history), len(result), total_tokens, max_tokens,
    )
    return result

Prevention

Measure Implementation
Token budget enforcement Always compute estimated tokens before calling Bedrock; reject or truncate if over 6000
Conversation summarization After 5 turns, use Haiku to summarize older context into a single compact turn
Adaptive timeout Set Bedrock read_timeout proportional to estimated input tokens: min(25, 10 + tokens/1000) seconds
Session TTL DynamoDB TTL expires sessions after 30 minutes of inactivity, preventing unbounded growth
CloudWatch alarm Alert when p95 InvocationLatency exceeds 20 seconds — investigate before it hits 29s

Scenario 2: SQS Backlog Growing — Catalog Enrichment Queue Depth Alarm

Problem

The manga-enrichment-queue.fifo queue depth grows steadily from 0 to 50,000+ messages over 6 hours. The Lambda consumer is running but not keeping pace. The admin dashboard shows enrichment jobs submitted at 500/minute but only 50/minute are completing. New manga catalog uploads are stacking up without synopses or tags.

Detection

flowchart TD
    A[CloudWatch Alarm:<br/>ApproximateNumberOfMessages > 10000] --> B[Check SQS Metrics]
    B --> C[NumberOfMessagesSent:<br/>500/min]
    B --> D[NumberOfMessagesReceived:<br/>60/min]
    B --> E[ApproximateAgeOfOldest:<br/>4 hours]

    D --> F[Check Lambda<br/>Concurrency]
    F --> G{ConcurrentExecutions<br/>= ReservedConcurrency?}
    G -->|Yes = 5| H[Lambda at<br/>concurrency cap]
    G -->|No| I[Check Lambda<br/>Duration]

    H --> J[Check Lambda<br/>batch processing time]
    J --> K{Average Duration<br/>> 60s per batch?}
    K -->|Yes| L[Root Cause:<br/>Bedrock latency per item<br/>× batch size = timeout]

    style L fill:#ff6b6b

Root Cause

The Lambda consumer has ReservedConcurrency=5 and BatchSize=10. Each enrichment job calls Bedrock Haiku sequentially, taking ~2 seconds per call. With 10 items per batch processed sequentially: 10 items x 2s = 20s per batch. At 5 concurrent Lambdas, throughput is: 5 x 10 / 20s = 2.5 messages/second = 150/minute. But the producer submits 500/minute — a 3.3x deficit.

Resolution

"""
Fix: Parallel Bedrock invocation within Lambda batch + concurrency increase.
Uses asyncio to process batch items concurrently instead of sequentially.
"""
import json
import time
import asyncio
import logging
from typing import Dict, Any, List
from concurrent.futures import ThreadPoolExecutor

import boto3

logger = logging.getLogger(__name__)

bedrock = boto3.client("bedrock-runtime", region_name="us-east-1")
dynamodb = boto3.resource("dynamodb").Table("manga-catalog")

# Thread pool for concurrent Bedrock calls within a single Lambda invocation
_executor = ThreadPoolExecutor(max_workers=10)


def _invoke_bedrock_sync(manga: Dict, enrichment_type: str) -> Dict:
    """Single synchronous Bedrock invocation (runs in thread pool)."""
    prompt = f"Generate a {enrichment_type} for manga: {manga.get('title', '')} ({manga.get('titleJp', '')})"

    response = bedrock.invoke_model(
        modelId="anthropic.claude-3-haiku-20240307-v1:0",
        contentType="application/json",
        accept="application/json",
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 1024,
            "temperature": 0.5,
            "messages": [{"role": "user", "content": prompt}],
        }),
    )
    result = json.loads(response["body"].read())
    return {
        "manga_id": manga["id"],
        "enrichment_type": enrichment_type,
        "data": result["content"][0]["text"],
    }


async def process_batch_parallel(records: List[Dict]) -> List[str]:
    """
    Process all batch records in parallel using thread pool.

    Before fix:  10 items x 2s sequential = 20s
    After fix:   10 items x 2s parallel  =  ~3s (with 10 threads)
    """
    loop = asyncio.get_event_loop()
    failed_ids = []
    tasks = []

    for record in records:
        body = json.loads(record["body"])
        manga = body["manga"]
        enrichment_type = body.get("enrichmentType", "synopsis")

        task = loop.run_in_executor(
            _executor,
            _invoke_bedrock_sync,
            manga,
            enrichment_type,
        )
        tasks.append((record["messageId"], manga["id"], task))

    for message_id, manga_id, task in tasks:
        try:
            result = await task
            # Store in DynamoDB
            dynamodb.update_item(
                Key={"mangaId": result["manga_id"]},
                UpdateExpression="SET enrichment.#t = :d",
                ExpressionAttributeNames={"#t": result["enrichment_type"]},
                ExpressionAttributeValues={":d": result["data"]},
            )
        except Exception as exc:
            logger.error("Enrichment failed | manga=%s | error=%s", manga_id, str(exc))
            failed_ids.append(message_id)

    return failed_ids


def lambda_handler(event: Dict[str, Any], context) -> Dict:
    """
    Optimized Lambda handler with parallel processing.

    Throughput improvement:
    - Before: 5 concurrency x 10 batch / 20s = 150 msg/min
    - After:  10 concurrency x 10 batch / 3s  = 2000 msg/min
    """
    records = event["Records"]
    logger.info("Processing batch | size=%d", len(records))

    # Run parallel processing
    loop = asyncio.new_event_loop()
    failed_ids = loop.run_until_complete(process_batch_parallel(records))
    loop.close()

    return {
        "batchItemFailures": [{"itemIdentifier": mid} for mid in failed_ids]
    }

Prevention

Measure Implementation
Right-size concurrency Set Lambda reserved concurrency based on: production_rate / (batch_size / avg_batch_duration)
Parallel within batch Always process Bedrock calls concurrently within a batch, never sequentially
Auto-scaling alarm CloudWatch alarm when queue age > 15 minutes triggers Lambda concurrency increase via Application Auto Scaling
Backpressure on producer Rate-limit the admin API to prevent submitting faster than the consumer can process
Queue depth dashboard Real-time dashboard showing messages in flight, age of oldest, and consumer throughput

Scenario 3: Japanese Text Validation Rejection — Valid Messages Blocked

Problem

Japanese-speaking users report that some messages are rejected with a 400 Bad Request error from API Gateway. The rejected messages contain valid Japanese text — specifically, messages with emoji (commonly used in Japanese digital communication) and special kanji characters. Approximately 8% of Japanese messages are being incorrectly rejected.

Detection

flowchart TD
    A[User reports:<br/>メッセージが送れません] --> B[Check API GW<br/>4XX metrics]
    B --> C{400 rate > normal<br/>baseline?}
    C -->|Yes, 8% vs 1%| D[Pull API GW<br/>execution logs]

    D --> E[Examine rejected<br/>request bodies]
    E --> F{Pattern in<br/>rejected messages?}
    F -->|Emoji present| G[Check JSON Schema<br/>maxLength validation]
    F -->|Special kanji| H[Check encoding<br/>handling]

    G --> I[maxLength counts<br/>UTF-16 surrogate pairs<br/>as 2 chars]
    H --> J[Some validators<br/>reject chars > BMP]

    I --> K[Root Cause:<br/>API GW JSON Schema<br/>counts string length<br/>in UTF-16 code units,<br/>not characters]

    style K fill:#ff6b6b

Root Cause

API Gateway's JSON Schema validation uses the maxLength keyword, which internally counts string length in UTF-16 code units (following JSON Schema spec). Japanese emoji like 🎉 and supplementary kanji like 𠮷 are encoded as UTF-16 surrogate pairs (2 code units each). A message with 2000 visible characters containing 100 emoji counts as 2100 in the validator, exceeding the maxLength: 4000 limit for messages that are actually 3950+ visible characters with emoji.

Resolution

"""
Fix: Two-layer validation — relaxed API GW schema + accurate ECS validation.

1. Increase API GW maxLength to 8000 (2x buffer for surrogate pairs)
2. Add byte-level validation in ECS that correctly counts characters
"""
import json
import logging
import unicodedata

logger = logging.getLogger(__name__)

# Updated API Gateway JSON Schema (deployed via CDK/CloudFormation)
UPDATED_API_GW_SCHEMA = {
    "$schema": "http://json-schema.org/draft-04/schema#",
    "title": "MangaAssistChatRequest",
    "type": "object",
    "required": ["action", "data"],
    "properties": {
        "action": {
            "type": "string",
            "enum": ["sendMessage", "getHistory", "clearSession"],
        },
        "data": {
            "type": "object",
            "required": ["message"],
            "properties": {
                "message": {
                    "type": "string",
                    "minLength": 1,
                    # Increased from 4000 to 8000 to account for
                    # UTF-16 surrogate pairs in emoji/supplementary kanji
                    "maxLength": 8000,
                },
            },
        },
    },
}


def validate_message_length_accurate(message: str, max_chars: int = 4000) -> tuple:
    """
    Accurately validate message length counting Unicode grapheme clusters.
    This handles emoji, combining characters, and supplementary kanji correctly.

    A single visible character like 👨‍👩‍👧‍👦 (family emoji) is:
    - 1 grapheme cluster (what users see)
    - 7 Unicode code points
    - 11 UTF-16 code units
    - 25 UTF-8 bytes

    We count grapheme clusters — what the user perceives as characters.
    """
    # Count grapheme clusters (visual characters)
    grapheme_count = count_grapheme_clusters(message)

    if grapheme_count > max_chars:
        return False, f"Message too long: {grapheme_count} characters (max {max_chars})"

    # Also enforce byte limit for backend safety
    byte_len = len(message.encode("utf-8"))
    max_bytes = max_chars * 4  # Worst case: 4 bytes per grapheme
    if byte_len > max_bytes:
        return False, f"Message too large: {byte_len} bytes (max {max_bytes})"

    return True, None


def count_grapheme_clusters(text: str) -> int:
    """
    Count user-perceived characters (grapheme clusters).
    Simple approximation that handles most JP text correctly.
    """
    count = 0
    i = 0
    while i < len(text):
        count += 1
        i += 1
        # Skip combining marks and variation selectors
        while i < len(text) and unicodedata.category(text[i]).startswith(("M", "Sk")):
            i += 1
        # Skip zero-width joiners (for compound emoji)
        while i < len(text) and text[i] == "\u200d":
            i += 1
            if i < len(text):
                i += 1  # Skip the joined character
                while i < len(text) and unicodedata.category(text[i]).startswith("M"):
                    i += 1
    return count


# Test cases for JP text with emoji
def test_validation():
    """Verify validation handles Japanese text correctly."""
    test_cases = [
        ("こんにちは", True),                           # Basic hiragana
        ("新刊マンガ🎉おすすめ!", True),              # With emoji
        ("𠮷田さんのマンガ", True),                     # Supplementary kanji
        ("A" * 4001, False),                            # Over limit
        ("あ" * 4000, True),                            # Exactly at limit
        ("🎉" * 2001, False),                           # Emoji at limit
    ]
    for text, expected_valid in test_cases:
        is_valid, reason = validate_message_length_accurate(text)
        status = "PASS" if is_valid == expected_valid else "FAIL"
        print(f"{status}: len={len(text)} graphemes={count_grapheme_clusters(text)} valid={is_valid}")

Prevention

Measure Implementation
API GW schema as coarse filter Set maxLength to 2x the actual limit to catch only extreme cases
Accurate server-side validation Count grapheme clusters in ECS, not UTF-16 code units
Test with real JP data Include emoji, supplementary kanji, and combining characters in validation test suite
Error message localization Return Japanese error messages for JP users: メッセージが長すぎます (最大4000文字)
Monitoring Track 400 rate by preferredLanguage dimension to detect language-specific issues

Scenario 4: Lambda Throttling on Enrichment Consumer

Problem

The catalog enrichment Lambda consumer starts returning TooManyRequestsException errors. The SQS event source mapping stops invoking new Lambda instances. The enrichment queue depth spikes from 1,000 to 80,000 in 30 minutes. The issue starts at 10:00 JST when a scheduled EventBridge rule triggers bulk enrichment for the weekly new releases (2,000 manga).

Detection

flowchart TD
    A[CloudWatch Alarm:<br/>Lambda Throttles > 0] --> B[Check Lambda<br/>ConcurrentExecutions]
    B --> C{At account<br/>concurrency limit?}
    C -->|Yes, 1000/1000| D[Account-level<br/>throttling]
    C -->|No| E{At reserved<br/>concurrency limit?}
    E -->|Yes, 5/5| F[Function-level<br/>throttling]

    D --> G[Check what else<br/>is consuming concurrency]
    G --> H[Other Lambda functions<br/>using 950 concurrent]
    H --> I[Root Cause:<br/>Shared account concurrency<br/>exhausted by unrelated<br/>image processing Lambda]

    F --> J[Root Cause:<br/>Reserved concurrency<br/>too low for burst]

    style I fill:#ff6b6b
    style J fill:#ffa07a

Root Cause

The AWS account has a 1,000 concurrent Lambda execution limit (default). An unrelated image processing pipeline in the same account is consuming 950 concurrent executions. The enrichment Lambda's reserved concurrency of 5 is honored, but when the image processing Lambda exceeds its unreserved portion, Lambda's throttling affects all functions competing for the unreserved pool. The enrichment Lambda's reserved concurrency protects its minimum, but the SQS event source mapping has its own scaling behavior that gets disrupted.

Resolution

"""
Fix: Isolate Lambda concurrency + implement SQS-based backpressure.

1. Request account concurrency limit increase to 3000
2. Set reserved concurrency for ALL Lambda functions
3. Add SQS-based backpressure to prevent overwhelming Bedrock
"""
import json
import logging
from typing import Dict, Any

import boto3

logger = logging.getLogger(__name__)


class LambdaConcurrencyManager:
    """Manage Lambda concurrency settings for MangaAssist functions."""

    def __init__(self, region: str = "us-east-1"):
        self.lambda_client = boto3.client("lambda", region_name=region)

    def configure_concurrency(self):
        """
        Set reserved concurrency for all MangaAssist Lambda functions.
        This isolates our functions from other account workloads.

        Account limit: 3000 (after increase request)
        MangaAssist allocation:
          - enrichment-consumer:     25 (handles burst of 2000 items)
          - moderation-consumer:     10
          - websocket-handler:       50 (real-time chat)
          - deferred-processor:      15
        Total reserved: 100
        Remaining unreserved: 2900 (for other account workloads)
        """
        allocations = {
            "manga-enrichment-consumer": 25,
            "manga-moderation-consumer": 10,
            "manga-websocket-handler": 50,
            "manga-deferred-processor": 15,
        }

        for function_name, concurrency in allocations.items():
            try:
                self.lambda_client.put_function_concurrency(
                    FunctionName=function_name,
                    ReservedConcurrentExecutions=concurrency,
                )
                logger.info(
                    "Set concurrency | function=%s | reserved=%d",
                    function_name, concurrency,
                )
            except Exception as exc:
                logger.error(
                    "Failed to set concurrency | function=%s | error=%s",
                    function_name, str(exc),
                )

    def configure_event_source_mapping(self):
        """
        Update SQS event source mapping with proper scaling config.

        ScalingConfig.MaximumConcurrency limits how many concurrent
        Lambda instances the SQS poller will invoke, preventing
        thundering herd when backlog clears.
        """
        sqs_client = boto3.client("lambda", region_name="us-east-1")

        # List event source mappings for our function
        response = sqs_client.list_event_source_mappings(
            FunctionName="manga-enrichment-consumer",
        )

        for mapping in response["EventSourceMappings"]:
            sqs_client.update_event_source_mapping(
                UUID=mapping["UUID"],
                BatchSize=10,
                MaximumBatchingWindowInSeconds=30,
                FunctionResponseTypes=["ReportBatchItemFailures"],
                ScalingConfig={
                    "MaximumConcurrency": 20,  # Cap concurrent pollers
                },
            )
            logger.info("Updated event source mapping | uuid=%s", mapping["UUID"])

Prevention

Measure Implementation
Reserved concurrency for ALL functions Prevents any single function from consuming the entire account limit
Account limit increase Request 3000+ from AWS Support before production launch
SQS ScalingConfig.MaximumConcurrency Cap Lambda scaling from SQS to prevent thundering herd
Separate accounts Run unrelated workloads (image processing) in a different AWS account
Concurrency alarm Alert when any function's concurrent executions exceed 80% of its reserved allocation

Scenario 5: Async Callback Failure — Enrichment Results Never Delivered

Problem

The admin dashboard shows enrichment jobs as "in progress" indefinitely. The SQS queue processes messages successfully (queue depth stays near zero), but the catalog database never receives the enrichment results. The Lambda consumer logs show successful Bedrock invocations but the DynamoDB writes silently fail. No errors appear in CloudWatch Logs.

Detection

flowchart TD
    A[Admin report:<br/>Enrichment stuck] --> B[Check SQS metrics]
    B --> C[Queue depth: 0<br/>Messages processed: OK]

    C --> D[Check Lambda<br/>CloudWatch Logs]
    D --> E[Bedrock calls:<br/>All successful]

    E --> F[Check DynamoDB<br/>writes]
    F --> G{ConsumedWriteCapacity<br/>normal?}
    G -->|Very low| H[Writes not<br/>reaching DDB]

    H --> I[Check Lambda<br/>IAM role]
    I --> J{Has dynamodb:UpdateItem<br/>on catalog table?}
    J -->|No| K[Root Cause:<br/>IAM policy missing<br/>after table rename]

    K --> L[Table renamed from<br/>manga-products to<br/>manga-catalog but<br/>Lambda role still<br/>references old name]

    style L fill:#ff6b6b

Root Cause

The DynamoDB table was renamed from manga-products to manga-catalog during a recent infrastructure update. The Lambda function's code was updated to reference the new table name, but the IAM execution role's policy still grants dynamodb:UpdateItem only on arn:aws:dynamodb:*:*:table/manga-products. The boto3 DynamoDB client raises an AccessDeniedException, but the Lambda's broad except Exception clause catches it and reports the message as a batch failure silently (the error log message is present but at DEBUG level, which is filtered out in production).

Resolution

"""
Fix: Correct IAM policy + improve error handling to surface DDB failures.
"""
import json
import logging
from typing import Dict, Any

import boto3
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)

# Fix 1: IAM policy update (via CDK/CloudFormation)
CORRECTED_IAM_POLICY = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:UpdateItem",
                "dynamodb:PutItem",
                "dynamodb:GetItem",
            ],
            "Resource": [
                # Use the CORRECT table name
                "arn:aws:dynamodb:us-east-1:123456789012:table/manga-catalog",
                "arn:aws:dynamodb:us-east-1:123456789012:table/manga-catalog/index/*",
            ],
        },
        {
            "Effect": "Allow",
            "Action": [
                "bedrock:InvokeModel",
            ],
            "Resource": [
                "arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-3-*",
            ],
        },
    ],
}


def store_enrichment_result(manga_id: str, enrichment_type: str, data: str) -> bool:
    """
    Store enrichment result with explicit error handling.

    Fix 2: Never swallow DynamoDB errors silently.
    Distinguish between retryable and non-retryable errors.
    """
    dynamodb = boto3.resource("dynamodb").Table("manga-catalog")

    try:
        dynamodb.update_item(
            Key={"mangaId": manga_id},
            UpdateExpression="SET enrichment.#t = :d, enrichment.lastUpdated = :ts",
            ExpressionAttributeNames={"#t": enrichment_type},
            ExpressionAttributeValues={
                ":d": data,
                ":ts": int(__import__("time").time()),
            },
        )
        logger.info("Enrichment stored | manga=%s | type=%s", manga_id, enrichment_type)
        return True

    except ClientError as exc:
        error_code = exc.response["Error"]["Code"]

        if error_code == "AccessDeniedException":
            # CRITICAL: This means IAM is misconfigured — raise loudly
            logger.critical(
                "DynamoDB ACCESS DENIED | manga=%s | table=manga-catalog | "
                "Check Lambda execution role IAM policy!",
                manga_id,
            )
            raise  # Do NOT swallow — let it surface as a batch failure

        elif error_code in ("ProvisionedThroughputExceededException", "ThrottlingException"):
            # Retryable — let SQS retry this message
            logger.warning(
                "DynamoDB throttled | manga=%s | will retry via SQS",
                manga_id,
            )
            raise

        elif error_code == "ResourceNotFoundException":
            logger.critical(
                "DynamoDB table not found! Expected: manga-catalog | "
                "This indicates a table rename or deletion.",
            )
            raise

        else:
            logger.error(
                "DynamoDB unexpected error | manga=%s | code=%s | msg=%s",
                manga_id, error_code, exc.response["Error"]["Message"],
            )
            raise

    except Exception as exc:
        # Catch-all with EXPLICIT logging at ERROR level
        logger.error(
            "Unexpected error storing enrichment | manga=%s | type=%s | error=%s",
            manga_id, enrichment_type, str(exc),
            exc_info=True,  # Include stack trace
        )
        raise  # Always re-raise — never swallow


def lambda_handler(event: Dict[str, Any], context) -> Dict:
    """Improved handler with proper error classification."""
    failed_items = []

    for record in event["Records"]:
        try:
            body = json.loads(record["body"])
            manga = body["manga"]
            enrichment_type = body.get("enrichmentType", "synopsis")

            # Invoke Bedrock (existing code)
            bedrock = boto3.client("bedrock-runtime")
            response = bedrock.invoke_model(
                modelId="anthropic.claude-3-haiku-20240307-v1:0",
                contentType="application/json",
                accept="application/json",
                body=json.dumps({
                    "anthropic_version": "bedrock-2023-05-31",
                    "max_tokens": 1024,
                    "messages": [{"role": "user", "content": f"Enrich: {manga['title']}"}],
                }),
            )
            result = json.loads(response["body"].read())

            # Store with improved error handling
            store_enrichment_result(
                manga_id=manga["id"],
                enrichment_type=enrichment_type,
                data=result["content"][0]["text"],
            )

        except Exception as exc:
            logger.error(
                "Record failed | msgId=%s | error=%s",
                record["messageId"], str(exc),
                exc_info=True,
            )
            failed_items.append({"itemIdentifier": record["messageId"]})

    return {"batchItemFailures": failed_items}

Prevention

Measure Implementation
IAM policy references via CDK/CloudFormation Use table.grant_read_write_data(lambda_fn) — CDK auto-updates ARNs when table name changes
Never swallow exceptions silently Use logger.error(..., exc_info=True) and always re-raise DynamoDB errors
Integration test on deploy Post-deploy Lambda runs a test write to DynamoDB and alerts if it fails
DynamoDB write metrics CloudWatch alarm on ConsumedWriteCapacityUnits = 0 when enrichment queue has messages
Structured logging Use JSON structured logs so AccessDeniedException errors surface in CloudWatch Insights queries

Key Takeaways

# Takeaway
1 Token budgets prevent timeouts — estimate input tokens before calling Bedrock and truncate conversation history to stay within the time budget.
2 Parallel batch processing transforms Lambda throughput — processing 10 Bedrock calls concurrently instead of sequentially improves throughput by ~7x.
3 UTF-16 surrogate pairs cause silent validation failures for Japanese emoji — set API Gateway maxLength to 2x and validate grapheme clusters server-side.
4 Reserved concurrency isolation prevents unrelated Lambda functions from starving your consumer.
5 Never swallow exceptionsexcept Exception: pass is the root cause of "silent failure" incidents. Always log at ERROR with exc_info=True and re-raise.