LOCAL PREVIEW View on GitHub

Scenarios and Runbooks — FM API Interfaces

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.

Skill Mapping

Field Value
Domain 2 — Implementation & Integration
Task 2.5 — Application Integration Patterns
Skill 2.5.1 — FM API Interfaces
Focus 29s API Gateway timeout, token quota exhausted, retry storm, session corruption, API version mismatch
MangaAssist Relevance Production incident scenarios for the WebSocket + Bedrock streaming pipeline

Mind Map

mindmap
  root((FM API Scenarios & Runbooks))
    Scenario 1
      29s API Gateway Timeout
        REST endpoint timeout
        Long RAG retrieval
        Cold start cascade
    Scenario 2
      Token Quota Exhausted
        Burst traffic spike
        Prompt injection attack
        Missing budget enforcement
    Scenario 3
      Retry Storm
        Bedrock throttling cascade
        No backoff on client
        Circuit breaker failure
    Scenario 4
      Session Corruption
        Race condition on write
        Stale Redis cache
        DynamoDB conditional failure
    Scenario 5
      API Version Mismatch
        Bedrock API change
        Client SDK drift
        Response format break

Scenario 1: 29-Second API Gateway Timeout

Problem

During peak manga release hours (Wednesday 00:00 JST, when new chapters drop), the REST /chat endpoint starts returning 504 Gateway Timeout errors. Users submitting complex recommendation queries via the REST fallback API (used when WebSocket connections fail) receive no response. CloudWatch shows API Gateway integration latency exceeding 29 seconds.

Detection

graph TB
    subgraph Monitoring["Detection Chain"]
        CW_ALARM[CloudWatch Alarm<br/>IntegrationLatency P95 > 20s]
        API_METRIC[API Gateway Metrics<br/>5XX Error Rate > 5%]
        X_RAY[X-Ray Trace<br/>Shows 29.0s segments]
        USER_REPORT[User Reports<br/>チャットが応答しない]
    end

    subgraph Timeline["Incident Timeline"]
        T0["T+0: Release hour traffic spike<br/>5x normal request rate"]
        T1["T+2min: OpenSearch cold queries<br/>P95 latency 8s → 18s"]
        T2["T+5min: Bedrock queue depth<br/>adds 10-15s"]
        T3["T+7min: Combined latency 28s+<br/>API Gateway 504s begin"]
        T4["T+10min: CW alarm fires<br/>PagerDuty notification"]
    end

    CW_ALARM --> T4
    API_METRIC --> T3
    X_RAY --> T2
    USER_REPORT --> T3

    T0 --> T1 --> T2 --> T3 --> T4

    style T3 fill:#dc3545,color:#fff
    style T4 fill:#ffc107,color:#000

Root Cause

The 29-second timeout is a hard limit on API Gateway REST APIs that cannot be changed. The incident chain:

  1. Wednesday release spike: 5x traffic at 00:00 JST when new manga chapters are published
  2. OpenSearch cold queries: Vector similarity searches against newly-indexed content miss the cache, taking 8-18 seconds instead of the normal 2-3 seconds
  3. Bedrock queuing: The Sonnet model experiences queuing under the traffic burst, adding 10-15 seconds
  4. Combined latency breaches 29 seconds: RAG (18s) + Bedrock (15s) = 33s, exceeding the hard limit

Resolution

"""
MangaAssist REST Timeout Prevention
Multi-pronged approach to stay under the 29-second API Gateway limit.
"""

import asyncio
import time
import logging
from typing import Optional

import boto3

logger = logging.getLogger(__name__)

# --- Fix 1: Async parallel execution for RAG + Bedrock prep ---

async def handle_chat_with_parallel_rag(
    message: str, session_id: str, timeout: float = 25.0
) -> dict:
    """
    Handle chat request with parallelized operations to stay under 29s.
    Key insight: RAG retrieval and session loading happen in parallel,
    not sequentially.
    """
    start = time.time()

    # Execute independent operations in parallel
    session_task = asyncio.create_task(load_session(session_id))
    rag_task = asyncio.create_task(retrieve_rag_context(message, timeout=8.0))
    classification_task = asyncio.create_task(classify_query(message))

    # Wait for all with a combined timeout
    try:
        session, rag_results, complexity = await asyncio.wait_for(
            asyncio.gather(session_task, rag_task, classification_task),
            timeout=10.0,  # Hard cap on parallel phase
        )
    except asyncio.TimeoutError:
        # Proceed without RAG context if retrieval is slow
        logger.warning("Parallel phase timeout — proceeding without RAG")
        session = await session_task if session_task.done() else None
        rag_results = []
        complexity = "simple"

    # Bedrock call with remaining time budget
    elapsed = time.time() - start
    bedrock_timeout = min(25.0 - elapsed, 15.0)  # Leave 4s buffer

    if bedrock_timeout < 3.0:
        # Not enough time for Bedrock — return cached/default response
        return _fallback_response(message, session_id)

    # Route to faster model if time is tight
    model = "haiku" if bedrock_timeout < 8.0 or complexity == "simple" else "sonnet"

    response = await invoke_bedrock_with_timeout(
        message=message,
        rag_context=rag_results,
        session=session,
        model=model,
        timeout=bedrock_timeout,
    )

    return response


# --- Fix 2: Pre-warm OpenSearch indexes before release time ---

def prewarm_opensearch_for_release(release_titles: list[str]) -> None:
    """
    Pre-warm OpenSearch vector cache 30 minutes before known release times.
    Called by EventBridge scheduled rule at 23:30 JST every Wednesday.
    """
    import opensearchpy
    from sentence_transformers import SentenceTransformer

    client = opensearchpy.OpenSearch(
        hosts=[{"host": "manga-vectors.ap-northeast-1.aoss.amazonaws.com", "port": 443}],
        use_ssl=True,
    )
    model = SentenceTransformer("intfloat/multilingual-e5-large")

    # Generate queries users are likely to ask about new releases
    prewarm_queries = []
    for title in release_titles:
        prewarm_queries.extend([
            f"{title} 新刊",
            f"{title} おすすめ",
            f"{title} みたいなマンガ",
            f"{title} 似ている作品",
        ])

    for query in prewarm_queries:
        embedding = model.encode(query).tolist()
        client.search(
            index="manga-vectors",
            body={
                "query": {
                    "knn": {
                        "embedding": {
                            "vector": embedding,
                            "k": 5,
                        }
                    }
                },
                "size": 5,
            },
        )
        logger.info(f"Pre-warmed: {query}")


# --- Fix 3: Migrate REST to WebSocket for long requests ---

REST_TIMEOUT_THRESHOLD = 20  # seconds

def should_upgrade_to_websocket(estimated_latency: float) -> bool:
    """
    Recommend WebSocket upgrade when estimated latency approaches REST limit.
    Returns True if the client should reconnect via WebSocket.
    """
    return estimated_latency > REST_TIMEOUT_THRESHOLD


# Helper stubs for the parallel execution example
async def load_session(session_id: str) -> dict:
    """Load session from Redis/DynamoDB."""
    pass

async def retrieve_rag_context(message: str, timeout: float) -> list[str]:
    """Retrieve from OpenSearch with timeout."""
    pass

async def classify_query(message: str) -> str:
    """Classify query complexity."""
    pass

async def invoke_bedrock_with_timeout(
    message: str, rag_context: list, session: dict,
    model: str, timeout: float
) -> dict:
    """Invoke Bedrock with strict timeout."""
    pass

def _fallback_response(message: str, session_id: str) -> dict:
    """Return a graceful fallback when time budget is exhausted."""
    return {
        "text": "申し訳ございません、現在アクセスが集中しております。"
                "少々お待ちいただくか、もう一度お試しください。",
        "fallback": True,
        "sessionId": session_id,
    }

Prevention

  • Pre-warm OpenSearch indexes 30 minutes before known traffic spikes (Wednesday 23:30 JST via EventBridge rule)
  • Parallelize independent operations — session load, RAG retrieval, and query classification execute concurrently
  • Dynamic model downgrade — switch to Haiku when remaining time budget is tight
  • Migrate REST callers to WebSocket — WebSocket API has no per-message timeout, only 10-minute idle timeout
  • CloudWatch alarm on P95 integration latency > 20s to trigger proactive scaling

Scenario 2: Token Quota Exhausted

Problem

A MangaAssist power user (or a bot exploiting the API) exhausts their daily token quota within 2 hours, then receives cryptic 429 errors for the rest of the day. Customer support receives complaints: "チャットが使えなくなった" (the chat stopped working). The daily cost dashboard shows one user consuming $47 in a single day — 10x the expected per-user limit.

Detection

graph TB
    subgraph Signals["Detection Signals"]
        REDIS[Redis Quota Counter<br/>user:quota:2024-01-15<br/>input_tokens: 4,200,000]
        CW_COST[CloudWatch Cost Metric<br/>Single user > $10/day]
        RATE[Request Rate<br/>500 req/min from one user]
        SUPPORT[Support Ticket<br/>チャットが使えない]
    end

    subgraph Analysis["Root Cause Analysis"]
        A1[Automated script<br/>hitting API in loop]
        A2[Prompt injection<br/>requesting max tokens]
        A3[No output token cap<br/>on admin API key]
    end

    REDIS --> A1
    CW_COST --> A2
    RATE --> A3
    SUPPORT --> A1

    style CW_COST fill:#dc3545,color:#fff
    style RATE fill:#ffc107,color:#000

Root Cause

Three contributing factors:

  1. No per-user rate limiting — The API only had global rate limits, not per-user
  2. Missing output token cap — Admin API keys bypassed the max_tokens parameter, allowing 4096-token responses
  3. No progressive warnings — Users hit a hard wall at 100% quota with no warning at 80% or 90%

Resolution

"""
MangaAssist Quota Management with Progressive Warnings and Rate Limiting
"""

import time
import logging
from dataclasses import dataclass
from typing import Optional, Tuple

logger = logging.getLogger(__name__)


@dataclass
class QuotaTier:
    """Configurable quota tier."""
    name: str
    daily_input_tokens: int
    daily_output_tokens: int
    daily_cost_limit_usd: float
    max_requests_per_minute: int
    max_tokens_per_request: int

    # Warning thresholds (percentage of daily limit)
    warn_at_80: bool = True
    warn_at_90: bool = True
    hard_limit_at_100: bool = True


# Default tiers
QUOTA_TIERS = {
    "free": QuotaTier(
        name="free",
        daily_input_tokens=100_000,
        daily_output_tokens=50_000,
        daily_cost_limit_usd=1.00,
        max_requests_per_minute=10,
        max_tokens_per_request=512,
    ),
    "standard": QuotaTier(
        name="standard",
        daily_input_tokens=500_000,
        daily_output_tokens=200_000,
        daily_cost_limit_usd=5.00,
        max_requests_per_minute=30,
        max_tokens_per_request=1024,
    ),
    "premium": QuotaTier(
        name="premium",
        daily_input_tokens=2_000_000,
        daily_output_tokens=1_000_000,
        daily_cost_limit_usd=20.00,
        max_requests_per_minute=60,
        max_tokens_per_request=2048,
    ),
}


class QuotaManager:
    """
    Per-user quota management with progressive warnings and rate limiting.
    """

    def __init__(self, redis_client):
        self.redis = redis_client

    def check_and_enforce(
        self, user_id: str, tier_name: str, estimated_tokens: int
    ) -> Tuple[bool, Optional[str], dict]:
        """
        Check quota and rate limit.

        Returns:
            (allowed, warning_message, quota_status)
        """
        tier = QUOTA_TIERS.get(tier_name, QUOTA_TIERS["free"])

        # Check per-minute rate limit
        rate_ok, rate_remaining = self._check_rate_limit(user_id, tier)
        if not rate_ok:
            return False, None, {
                "blocked_reason": "rate_limit",
                "retry_after_seconds": 60,
                "message_ja": "リクエストが多すぎます。1分後にお試しください。",
            }

        # Check daily quota
        today = time.strftime("%Y-%m-%d")
        quota_key = f"quota:{user_id}:{today}"
        usage = self.redis.hgetall(quota_key)

        used_input = int(usage.get("input_tokens", 0))
        used_output = int(usage.get("output_tokens", 0))
        used_cost = float(usage.get("cost_usd", 0.0))

        input_pct = (used_input / tier.daily_input_tokens) * 100
        output_pct = (used_output / tier.daily_output_tokens) * 100
        cost_pct = (used_cost / tier.daily_cost_limit_usd) * 100
        max_pct = max(input_pct, output_pct, cost_pct)

        status = {
            "input_used": used_input,
            "input_limit": tier.daily_input_tokens,
            "input_pct": round(input_pct, 1),
            "output_used": used_output,
            "output_limit": tier.daily_output_tokens,
            "output_pct": round(output_pct, 1),
            "cost_used_usd": round(used_cost, 4),
            "cost_limit_usd": tier.daily_cost_limit_usd,
            "rate_remaining": rate_remaining,
        }

        # Hard limit
        if max_pct >= 100:
            return False, None, {
                **status,
                "blocked_reason": "daily_quota",
                "message_ja": "本日のご利用上限に達しました。明日またお越しください。",
                "message_en": "Daily quota reached. Please come back tomorrow.",
            }

        # Progressive warnings
        warning = None
        if tier.warn_at_90 and max_pct >= 90:
            warning = (
                f"本日の利用量が90%に達しました。残り約{int(tier.daily_input_tokens - used_input):,}トークンです。"
            )
        elif tier.warn_at_80 and max_pct >= 80:
            warning = (
                f"本日の利用量が80%を超えました。ご利用はあと{int(tier.daily_input_tokens - used_input):,}トークンまでです。"
            )

        return True, warning, status

    def _check_rate_limit(
        self, user_id: str, tier: QuotaTier
    ) -> Tuple[bool, int]:
        """
        Sliding window rate limit per user.
        Returns (allowed, remaining_requests).
        """
        window_key = f"rate:{user_id}:{int(time.time()) // 60}"
        pipe = self.redis.pipeline()
        pipe.incr(window_key)
        pipe.expire(window_key, 120)  # Keep for 2 minutes
        results = pipe.execute()

        current_count = results[0]
        remaining = max(0, tier.max_requests_per_minute - current_count)

        return current_count <= tier.max_requests_per_minute, remaining

    def record_usage(
        self,
        user_id: str,
        input_tokens: int,
        output_tokens: int,
        cost_usd: float,
    ) -> None:
        """Record token usage after a request completes."""
        today = time.strftime("%Y-%m-%d")
        quota_key = f"quota:{user_id}:{today}"

        pipe = self.redis.pipeline()
        pipe.hincrby(quota_key, "input_tokens", input_tokens)
        pipe.hincrby(quota_key, "output_tokens", output_tokens)
        pipe.hincrbyfloat(quota_key, "cost_usd", cost_usd)
        pipe.hincrby(quota_key, "request_count", 1)
        pipe.expire(quota_key, 172800)  # 48h expiry (covers timezone edge)
        pipe.execute()

        # Check if we should alert ops
        used_cost = float(self.redis.hget(quota_key, "cost_usd") or 0)
        if used_cost > 10.0:
            logger.warning(
                f"High-cost user detected: {user_id} spent ${used_cost:.2f} today"
            )

Prevention

  • Per-user rate limiting via Redis sliding window counters (10-60 req/min by tier)
  • Progressive quota warnings at 80% and 90% thresholds sent as metadata in responses
  • Hard output token cap on all API keys, including admin (max 2048 tokens)
  • Anomaly detection alarm when single-user cost exceeds $10/day
  • Graceful quota exhaustion messages in Japanese with clear "try again tomorrow" guidance

Scenario 3: Retry Storm

Problem

Amazon Bedrock experiences a 2-minute regional throttling event on Claude 3 Sonnet. MangaAssist's retry logic — running across 50 ECS Fargate tasks — all begin retrying simultaneously with identical exponential backoff timing. The retry storm amplifies the original throttling by 3x, extends the outage from 2 minutes to 15 minutes, and generates 450,000 failed requests.

Detection

graph TB
    subgraph Metrics["Metric Cascade"]
        THROTTLE[Bedrock ThrottlingException<br/>rate: 200/sec → 600/sec]
        RETRY_COUNT[Retry Counter<br/>0 → 150,000 in 2 min]
        CPU[ECS CPU Utilization<br/>30% → 85%]
        QUEUE[Request Queue Depth<br/>0 → 50,000]
        TIMEOUT[Client Timeout Rate<br/>0.1% → 45%]
    end

    subgraph Timeline["Storm Timeline"]
        T0["T+0: Bedrock throttling begins<br/>ThrottlingException 200/sec"]
        T1["T+30s: All 50 tasks retry simultaneously<br/>synchronized at 1s backoff"]
        T2["T+60s: Second retry wave<br/>2s backoff, still synchronized"]
        T3["T+120s: Bedrock recovers, but<br/>retry queue overwhelms again"]
        T4["T+900s: Queue finally drains<br/>normal operation resumes"]
    end

    THROTTLE --> T0
    RETRY_COUNT --> T1
    CPU --> T2
    QUEUE --> T3
    TIMEOUT --> T4

    T0 --> T1 --> T2 --> T3 --> T4

    style T1 fill:#dc3545,color:#fff
    style T2 fill:#dc3545,color:#fff
    style T3 fill:#ffc107,color:#000

Root Cause

The retry implementation had three critical flaws:

  1. No jitter on backoff delays — All 50 tasks calculated identical retry times (1s, 2s, 4s, 8s), creating synchronized retry waves
  2. No circuit breaker — Even after 100+ failures, each task kept retrying individually
  3. Client-side retries compounded server-side retries — The WebSocket client had its own retry logic, doubling the request volume

Resolution

"""
MangaAssist Anti-Retry-Storm Implementation
Coordinated retry with jitter, circuit breaker, and client backpressure.
"""

import time
import random
import logging
from typing import Optional

logger = logging.getLogger(__name__)


class CoordinatedRetryManager:
    """
    Retry manager that prevents retry storms through:
    1. Full jitter on backoff delays
    2. Global circuit breaker (shared via Redis)
    3. Client backpressure signaling
    4. Retry budget per time window
    """

    def __init__(self, redis_client):
        self.redis = redis_client

    def should_retry(
        self, model_id: str, attempt: int, max_attempts: int = 3
    ) -> tuple[bool, float]:
        """
        Determine if a retry should be attempted.
        Returns (should_retry, delay_seconds).

        Uses GLOBAL retry budget to prevent storm across all tasks.
        """
        if attempt >= max_attempts:
            return False, 0.0

        # Check global circuit breaker
        circuit_state = self._get_global_circuit(model_id)
        if circuit_state == "open":
            logger.warning(f"Global circuit OPEN for {model_id} — no retry")
            return False, 0.0

        # Check global retry budget (max 100 retries per 10-second window)
        window = int(time.time()) // 10
        budget_key = f"retry_budget:{model_id}:{window}"
        current = self.redis.incr(budget_key)
        self.redis.expire(budget_key, 30)

        if current > 100:  # Global retry budget exhausted
            logger.warning(
                f"Global retry budget exhausted for {model_id} "
                f"({current} retries in window)"
            )
            return False, 0.0

        # Full jitter: delay = random(0, base * 2^attempt)
        # This ensures NO two tasks retry at the same time
        max_delay = min(0.5 * (2 ** attempt), 16.0)
        delay = random.uniform(0.1, max_delay)

        return True, delay

    def record_failure(self, model_id: str) -> None:
        """Record a failure and potentially open circuit."""
        window = int(time.time()) // 30  # 30-second failure window
        fail_key = f"failures:{model_id}:{window}"

        pipe = self.redis.pipeline()
        pipe.incr(fail_key)
        pipe.expire(fail_key, 60)
        results = pipe.execute()

        failure_count = results[0]
        if failure_count >= 50:  # 50 failures in 30s = open circuit
            self._open_global_circuit(model_id)

    def record_success(self, model_id: str) -> None:
        """Record success — may close circuit if in half-open."""
        circuit_key = f"circuit_global:{model_id}"
        state = self.redis.hget(circuit_key, "state")

        if state == "half_open":
            successes = self.redis.hincrby(circuit_key, "half_open_success", 1)
            if successes >= 3:
                self.redis.delete(circuit_key)
                logger.info(f"Global circuit CLOSED for {model_id}")

    def _get_global_circuit(self, model_id: str) -> str:
        """Get global circuit breaker state."""
        circuit_key = f"circuit_global:{model_id}"
        data = self.redis.hgetall(circuit_key)

        if not data:
            return "closed"

        state = data.get("state", "closed")
        opened_at = float(data.get("opened_at", 0))

        if state == "open" and time.time() - opened_at > 30:
            # Transition to half-open after 30 seconds
            self.redis.hset(circuit_key, "state", "half_open")
            return "half_open"

        return state

    def _open_global_circuit(self, model_id: str) -> None:
        """Open the global circuit breaker."""
        circuit_key = f"circuit_global:{model_id}"
        self.redis.hset(circuit_key, mapping={
            "state": "open",
            "opened_at": str(time.time()),
        })
        self.redis.expire(circuit_key, 120)
        logger.critical(
            f"GLOBAL CIRCUIT OPEN for {model_id} — all retries halted"
        )

    def get_client_backpressure_signal(self, model_id: str) -> dict:
        """
        Generate backpressure signal for WebSocket clients.
        Tells clients to stop retrying and wait.
        """
        circuit_state = self._get_global_circuit(model_id)

        if circuit_state == "open":
            return {
                "type": "backpressure",
                "action": "pause",
                "retryAfter": 30,
                "message_ja": "サーバーが混雑しています。30秒後に自動的に再接続します。",
            }
        elif circuit_state == "half_open":
            return {
                "type": "backpressure",
                "action": "slow",
                "retryAfter": 5,
                "message_ja": "接続を回復中です。しばらくお待ちください。",
            }

        return {"type": "backpressure", "action": "none"}

Prevention

  • Full jitter on backoffrandom(0, base * 2^attempt) ensures zero synchronization across tasks
  • Global circuit breaker in Redis — shared across all 50 ECS tasks, not per-task
  • Global retry budget — max 100 retries per model per 10-second window across the fleet
  • Client backpressure signals — WebSocket sends pause messages telling clients to stop retrying for 30 seconds
  • Separate Bedrock throughput provisioning for peak hours (Wednesday 00:00 JST)

Scenario 4: Session Corruption

Problem

Users report garbled conversations: the assistant responds to questions they never asked, or previous conversation context is missing. Investigation reveals that when a user switches from mobile to desktop, both devices briefly hold active WebSocket connections to the same session. Concurrent writes to DynamoDB create a last-writer-wins race condition, and the Redis cache serves stale data.

Detection

graph TB
    subgraph Symptoms["User-Reported Symptoms"]
        S1["Assistant answers wrong question"]
        S2["Conversation history missing turns"]
        S3["Duplicate messages in history"]
        S4["Session suddenly resets"]
    end

    subgraph Investigation["Investigation Signals"]
        DDB_WRITES[DynamoDB Write Metrics<br/>ConditionalCheckFailedRequests > 0]
        REDIS_STALE[Redis TTL Check<br/>Cache age > DDB lastModified]
        CONN_COUNT[Connection Count<br/>2+ active connections per session]
        XRAY_RACE[X-Ray Traces<br/>Overlapping PutItem calls]
    end

    S1 --> REDIS_STALE
    S2 --> DDB_WRITES
    S3 --> CONN_COUNT
    S4 --> XRAY_RACE

    style S1 fill:#dc3545,color:#fff
    style CONN_COUNT fill:#ffc107,color:#000

Root Cause

  1. No optimistic concurrency on DynamoDB session writes — PutItem overwrites without version checking
  2. Redis cache not invalidated on DynamoDB updates from a different connection
  3. No single-writer enforcement — multiple connections writing to the same session simultaneously

Resolution

"""
MangaAssist Session Corruption Fix
Optimistic concurrency control + single-writer enforcement.
"""

import time
import json
import logging
from typing import Optional

import boto3
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)

dynamodb = boto3.resource("dynamodb")
sessions_table = dynamodb.Table("MangaAssist-Sessions")


class ConcurrentSessionManager:
    """
    Session manager with optimistic concurrency control.
    Uses DynamoDB version numbers and Redis cache invalidation.
    """

    def __init__(self, redis_client):
        self.redis = redis_client

    def acquire_write_lock(
        self, session_id: str, connection_id: str, ttl: int = 10
    ) -> bool:
        """
        Acquire exclusive write lock for a session.
        Only one connection can write at a time.
        Uses Redis SET NX (set-if-not-exists).
        """
        lock_key = f"session_lock:{session_id}"
        acquired = self.redis.set(
            lock_key, connection_id, nx=True, ex=ttl
        )

        if not acquired:
            # Check if WE already hold the lock
            current_holder = self.redis.get(lock_key)
            if current_holder == connection_id:
                self.redis.expire(lock_key, ttl)
                return True

            logger.warning(
                f"Session {session_id} write lock held by {current_holder}, "
                f"requested by {connection_id}"
            )
            return False

        return True

    def release_write_lock(self, session_id: str, connection_id: str) -> None:
        """Release write lock only if we hold it."""
        lock_key = f"session_lock:{session_id}"
        current = self.redis.get(lock_key)
        if current == connection_id:
            self.redis.delete(lock_key)

    def update_session_safe(
        self,
        session_id: str,
        connection_id: str,
        conversation_history: list[dict],
        expected_version: int,
    ) -> tuple[bool, int]:
        """
        Update session with optimistic concurrency control.

        Uses DynamoDB ConditionExpression to ensure version matches.
        Returns (success, new_version).
        """
        # Try to acquire write lock
        if not self.acquire_write_lock(session_id, connection_id):
            return False, expected_version

        try:
            new_version = expected_version + 1

            sessions_table.update_item(
                Key={"sessionId": session_id},
                UpdateExpression=(
                    "SET conversationHistory = :history, "
                    "lastActivity = :ts, "
                    "activeConnectionId = :connId, "
                    "#ver = :new_ver"
                ),
                ConditionExpression="#ver = :expected_ver",
                ExpressionAttributeNames={"#ver": "version"},
                ExpressionAttributeValues={
                    ":history": json.dumps(conversation_history, ensure_ascii=False),
                    ":ts": int(time.time()),
                    ":connId": connection_id,
                    ":new_ver": new_version,
                    ":expected_ver": expected_version,
                },
            )

            # Invalidate Redis cache so other connections fetch fresh data
            self._invalidate_cache(session_id)

            # Update cache with new data
            self._update_cache(
                session_id, conversation_history, new_version
            )

            return True, new_version

        except ClientError as e:
            if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
                logger.warning(
                    f"Version conflict on session {session_id}: "
                    f"expected version {expected_version}"
                )
                # Reload from DynamoDB to get current version
                return False, self._get_current_version(session_id)
            raise
        finally:
            self.release_write_lock(session_id, connection_id)

    def handle_device_switch(
        self, session_id: str, old_connection_id: str, new_connection_id: str
    ) -> dict:
        """
        Handle graceful device switch.
        Notifies old connection and transfers session ownership.
        """
        # Force-release lock from old connection
        lock_key = f"session_lock:{session_id}"
        current_holder = self.redis.get(lock_key)
        if current_holder == old_connection_id:
            self.redis.delete(lock_key)

        # Update active connection in DynamoDB
        try:
            sessions_table.update_item(
                Key={"sessionId": session_id},
                UpdateExpression=(
                    "SET activeConnectionId = :new_conn, "
                    "lastDeviceSwitch = :ts"
                ),
                ExpressionAttributeValues={
                    ":new_conn": new_connection_id,
                    ":ts": int(time.time()),
                },
            )
        except ClientError as e:
            logger.error(f"Device switch failed: {e}")
            raise

        # Invalidate cache to force fresh load
        self._invalidate_cache(session_id)

        return {
            "status": "transferred",
            "sessionId": session_id,
            "newConnectionId": new_connection_id,
        }

    def _invalidate_cache(self, session_id: str) -> None:
        """Remove stale cache entry."""
        self.redis.delete(f"session:{session_id}")

    def _update_cache(
        self, session_id: str, history: list[dict], version: int
    ) -> None:
        """Update Redis cache with versioned data."""
        data = {
            "conversationHistory": history,
            "version": version,
            "cachedAt": time.time(),
        }
        self.redis.setex(
            f"session:{session_id}",
            1800,
            json.dumps(data, ensure_ascii=False),
        )

    def _get_current_version(self, session_id: str) -> int:
        """Get current version from DynamoDB."""
        response = sessions_table.get_item(
            Key={"sessionId": session_id},
            ProjectionExpression="#ver",
            ExpressionAttributeNames={"#ver": "version"},
        )
        return int(response.get("Item", {}).get("version", 0))

Prevention

  • Optimistic concurrency with DynamoDB ConditionExpression on a version counter
  • Distributed write locks via Redis SET NX to enforce single-writer per session
  • Cache invalidation on every write — delete Redis cache before updating, then repopulate
  • Graceful device switch protocol — old connection receives a "session transferred" message and stops writing
  • Connection count monitoring — CloudWatch alarm when a session has 2+ concurrent connections

Scenario 5: API Version Mismatch

Problem

After a Bedrock service update, Claude 3 Sonnet responses start including a new usage field structure in the message_delta event. The MangaAssist stream parser expects usage.output_tokens as an integer but receives a nested object usage.output_tokens.total. All token counting breaks, quotas are not enforced, and the cost dashboard shows zero usage while actual costs accumulate.

Detection

graph TB
    subgraph Symptoms["Visible Symptoms"]
        S1["CloudWatch: token_count metric = 0<br/>for all requests"]
        S2["Quota system: no users blocked<br/>despite heavy traffic"]
        S3["Cost dashboard: $0.00 daily<br/>but AWS bill shows $2,400"]
        S4["Logs: KeyError 'output_tokens'<br/>in stream parser"]
    end

    subgraph Root["Root Cause Chain"]
        R1["Bedrock API update<br/>Changed usage response format"]
        R2["Stream parser expects int<br/>Gets nested object"]
        R3["Exception caught silently<br/>Defaults to 0 tokens"]
        R4["All downstream tracking<br/>reports zero usage"]
    end

    S1 --> R3
    S2 --> R4
    S3 --> R4
    S4 --> R2

    R1 --> R2 --> R3 --> R4

    style S3 fill:#dc3545,color:#fff
    style R1 fill:#ffc107,color:#000

Root Cause

  1. Brittle response parsing — The stream parser accessed chunk["usage"]["output_tokens"] directly without defensive type checking
  2. Silent exception handlingexcept Exception: pass in the token counting code masked the KeyError
  3. No integration test against actual Bedrock responses — unit tests used mocked fixtures that did not reflect the API change
  4. No schema validation on Bedrock response events

Resolution

"""
MangaAssist Resilient Bedrock Response Parser
Handles API version changes gracefully with defensive parsing.
"""

import json
import logging
from typing import Optional, Any

logger = logging.getLogger(__name__)


class BedrockResponseParser:
    """
    Resilient parser for Bedrock streaming response events.
    Handles schema changes gracefully with fallback extraction.
    """

    # Known response format versions
    KNOWN_FORMATS = {"bedrock-2023-05-31", "bedrock-2024-01-01"}

    def extract_usage(self, chunk_data: dict) -> dict:
        """
        Extract token usage from a message_delta or message_start event.
        Handles multiple API response formats defensively.
        """
        usage = chunk_data.get("usage", {})
        if not usage:
            # Try alternate locations
            usage = chunk_data.get("message", {}).get("usage", {})

        input_tokens = self._extract_token_count(usage, "input_tokens")
        output_tokens = self._extract_token_count(usage, "output_tokens")

        # Validate: tokens should be non-negative integers
        if input_tokens < 0 or output_tokens < 0:
            logger.error(
                f"Invalid token counts: input={input_tokens}, output={output_tokens}"
            )
            input_tokens = max(0, input_tokens)
            output_tokens = max(0, output_tokens)

        # Anomaly detection: flag suspiciously high or zero counts
        if output_tokens == 0 and chunk_data.get("type") == "message_delta":
            logger.warning(
                "Zero output tokens in message_delta — possible schema change. "
                f"Raw usage: {json.dumps(usage)}"
            )

        return {
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "raw_usage": usage,  # Preserve raw data for debugging
        }

    def _extract_token_count(self, usage: dict, field: str) -> int:
        """
        Extract token count from usage dict, handling multiple formats.

        Supported formats:
        - {"output_tokens": 342}                    (original)
        - {"output_tokens": {"total": 342}}         (nested)
        - {"output_tokens": {"value": 342}}         (alternate nested)
        - {"outputTokens": 342}                     (camelCase variant)
        """
        value = usage.get(field)

        if value is None:
            # Try camelCase variant
            camel = self._to_camel_case(field)
            value = usage.get(camel)

        if value is None:
            return 0

        # Direct integer
        if isinstance(value, (int, float)):
            return int(value)

        # Nested object with "total" or "value"
        if isinstance(value, dict):
            for key in ("total", "value", "count"):
                if key in value:
                    return int(value[key])
            # Unknown nested format — log and return 0
            logger.error(
                f"Unknown nested format for {field}: {json.dumps(value)}"
            )
            return 0

        # String that might be numeric
        if isinstance(value, str):
            try:
                return int(value)
            except ValueError:
                logger.error(f"Non-numeric string for {field}: {value}")
                return 0

        logger.error(f"Unexpected type for {field}: {type(value).__name__}")
        return 0

    def _to_camel_case(self, snake_str: str) -> str:
        """Convert snake_case to camelCase."""
        parts = snake_str.split("_")
        return parts[0] + "".join(p.capitalize() for p in parts[1:])

    def validate_stream_event(self, event_data: dict) -> bool:
        """
        Validate a stream event against expected schema.
        Returns True if the event is recognized, False if unknown.
        """
        known_types = {
            "message_start", "content_block_start", "content_block_delta",
            "content_block_stop", "message_delta", "message_stop",
        }

        event_type = event_data.get("type")
        if event_type not in known_types:
            logger.warning(
                f"Unknown stream event type: {event_type}. "
                f"Keys: {list(event_data.keys())}"
            )
            return False

        return True

    def parse_stream_event(self, raw_bytes: bytes) -> Optional[dict]:
        """
        Parse a raw stream event with error handling.
        Returns parsed dict or None if unparseable.
        """
        try:
            data = json.loads(raw_bytes)
        except json.JSONDecodeError as e:
            logger.error(f"Failed to parse stream event: {e}")
            return None

        if not self.validate_stream_event(data):
            # Still return it — caller may know how to handle new types
            pass

        return data


def setup_integration_health_check(redis_client) -> None:
    """
    Periodic health check that validates Bedrock response format.
    Run every 5 minutes via ECS task scheduler.
    """
    import boto3
    from botocore.config import Config

    bedrock = boto3.client(
        "bedrock-runtime",
        config=Config(region_name="ap-northeast-1"),
    )

    test_body = json.dumps({
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 10,
        "messages": [{"role": "user", "content": [{"type": "text", "text": "Hi"}]}],
    })

    try:
        response = bedrock.invoke_model_with_response_stream(
            modelId="anthropic.claude-3-haiku-20240307-v1:0",
            contentType="application/json",
            accept="application/json",
            body=test_body,
        )

        parser = BedrockResponseParser()
        found_usage = False

        for event in response["body"]:
            chunk = event.get("chunk")
            if not chunk:
                continue
            data = json.loads(chunk["bytes"])

            if data.get("type") == "message_delta":
                usage = parser.extract_usage(data)
                if usage["output_tokens"] > 0:
                    found_usage = True
                else:
                    # Alert: possible schema change
                    logger.critical(
                        "HEALTH CHECK FAILED: message_delta returned 0 output tokens. "
                        f"Raw: {json.dumps(data)}"
                    )
                    redis_client.set("bedrock_schema_alert", "true", ex=3600)

        if found_usage:
            redis_client.set("bedrock_health", "ok", ex=600)
            logger.info("Bedrock integration health check: OK")
        else:
            redis_client.set("bedrock_health", "degraded", ex=600)
            logger.warning("Bedrock integration health check: DEGRADED")

    except Exception as e:
        redis_client.set("bedrock_health", "error", ex=600)
        logger.error(f"Bedrock health check failed: {e}")

Prevention

  • Defensive parsing — Never assume field types; check for int, dict, and string variants
  • Never silently swallow exceptions in token counting — log at ERROR level and alert
  • Integration health check every 5 minutes that validates actual Bedrock response format
  • Schema change alerts — CloudWatch alarm when the health check detects unexpected formats
  • Raw response logging (sampled) — Store 1% of raw Bedrock responses in S3 for forensic analysis
  • Pin the anthropic_version header in all Bedrock requests to maintain API compatibility

Key Takeaways

# Takeaway MangaAssist Application
1 The 29-second REST timeout is architectural — Design around it with WebSocket for streaming and parallel operations to compress latency within the limit. MangaAssist uses WebSocket for all user-facing chat; REST is only for health checks and admin APIs.
2 Token quotas need progressive warnings — Hard walls with no warning create terrible UX. Warn at 80% and 90% so users can self-regulate. Japanese-language quota warnings are sent as metadata in chat responses: "本日の利用量が90%に達しました".
3 Retry storms are worse than the original failure — Synchronized retries across a fleet amplify throttling. Full jitter + global retry budgets are essential. Global Redis-based circuit breaker and retry budget prevent MangaAssist's 50 ECS tasks from creating synchronized retry waves.
4 Session writes need optimistic concurrency — Multi-device users create race conditions that corrupt conversation history. DynamoDB conditional writes + version counters fix this. Device switch protocol notifies old connections and transfers write ownership before the new device starts writing.
5 Never trust API response schemas to be stable — Parse defensively, validate types, and run integration health checks against live APIs every few minutes. Bedrock response parser handles int, nested dict, and string formats for token counts; health check runs every 5 minutes.