LOCAL PREVIEW View on GitHub

Scenarios and Runbooks — Accessible AI Interfaces

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.

Skill Mapping

Field Value
Domain 2 — Implementation & Integration
Task 2.5 — Application Integration Patterns
Skill 2.5.2 — Accessible AI Interfaces
Focus Amplify WebSocket failing on mobile, OpenAPI spec drift, Prompt Flow timeout, stale cached response, unexpected prompt chains
MangaAssist Relevance Production incidents in the frontend, API contract, and prompt flow layers

Mind Map

mindmap
  root((Accessible AI Scenarios & Runbooks))
    Scenario 1
      Amplify WebSocket Failing on Mobile
        iOS Safari WebSocket bugs
        Carrier NAT timeout
        Service worker interference
    Scenario 2
      OpenAPI Spec Drift
        Spec vs implementation mismatch
        Missing new fields
        Breaking partner SDKs
    Scenario 3
      Prompt Flow Timeout
        Complex flow chain
        Lambda cold start in flow
        Bedrock queue depth
    Scenario 4
      Stale Cached Response
        Redis serving old data
        CDN caching API responses
        Cache invalidation failure
    Scenario 5
      Unexpected Prompt Chains
        Recursive flow invocation
        Condition node infinite loop
        Token budget explosion

Scenario 1: Amplify WebSocket Failing on Mobile

Problem

MangaAssist mobile users on Japanese carriers (NTT Docomo, au, SoftBank) report intermittent chat failures. The chat widget shows "接続中..." (Connecting...) indefinitely, then falls back to "接続できませんでした" (Connection failed). The issue affects roughly 15% of mobile sessions, primarily on iOS Safari. Desktop Chrome and Firefox work fine.

Detection

graph TB
    subgraph Signals["Detection Signals"]
        ANALYTICS[Amplify Analytics<br/>Mobile connection success: 85%<br/>Desktop connection success: 99.5%]
        CW_WS[API Gateway WebSocket<br/>ConnectCount stable<br/>DisconnectCount spike on mobile]
        SUPPORT[Support Tickets<br/>iOS users: 接続できない]
        RUM[Real User Monitoring<br/>iOS Safari TTFB: 8.2s avg<br/>Chrome mobile: 1.1s avg]
    end

    subgraph RootCause["Root Cause Analysis"]
        RC1["iOS Safari WebSocket<br/>idle timeout: 60s vs 10min"]
        RC2["Carrier NAT tables<br/>drop idle TCP after 30s"]
        RC3["Service worker<br/>intercepting WS upgrade"]
        RC4["Mixed content<br/>wss:// blocked by CSP"]
    end

    ANALYTICS --> RC1
    CW_WS --> RC2
    SUPPORT --> RC3
    RUM --> RC4

    style ANALYTICS fill:#dc3545,color:#fff
    style RC2 fill:#ffc107,color:#000

Root Cause

Three compounding issues specific to mobile networks in Japan:

  1. Japanese carrier NAT timeout — NTT Docomo and au aggressively recycle NAT table entries, dropping idle TCP connections after 30 seconds (vs. typical 5-minute timeout on WiFi)
  2. iOS Safari WebSocket handling — Safari on iOS has stricter WebSocket idle detection and terminates connections that do not receive data frames within 60 seconds
  3. Service worker interference — The Amplify PWA service worker was intercepting the WebSocket upgrade request, causing it to fail silently on some iOS versions

Resolution

"""
MangaAssist Mobile WebSocket Fix
Addresses carrier NAT timeout, iOS Safari quirks, and service worker interference.
"""

import json
import time
import logging
from dataclasses import dataclass
from typing import Optional

logger = logging.getLogger(__name__)


@dataclass
class MobileWebSocketConfig:
    """Optimized WebSocket config for Japanese mobile carriers."""
    # Ping interval shorter than carrier NAT timeout (30s)
    ping_interval_seconds: int = 20
    # Pong timeout — if no pong in 5s, assume connection dead
    pong_timeout_seconds: int = 5
    # Reconnection settings
    reconnect_base_delay: float = 0.5
    reconnect_max_delay: float = 10.0
    max_reconnect_attempts: int = 10
    # iOS Safari specific
    force_close_on_background: bool = True
    reconnect_on_foreground: bool = True
    # Disable service worker for WebSocket paths
    service_worker_ws_bypass: bool = True


class MobileOptimizedWebSocket:
    """
    WebSocket client optimized for Japanese mobile carriers.
    Addresses NAT timeout, iOS Safari, and service worker issues.
    """

    def __init__(self, config: MobileWebSocketConfig):
        self.config = config
        self._last_pong = 0
        self._ping_task = None
        self._visibility_listener = None

    def get_connection_params(self) -> dict:
        """
        Generate WebSocket connection parameters optimized for mobile.
        """
        return {
            # Use binary frames for ping/pong (more reliable on mobile)
            "ping_type": "application",  # Application-level ping, not WebSocket frame
            "ping_interval": self.config.ping_interval_seconds,
            "protocols": ["manga-assist-v2"],
            "headers": {
                "X-Client-Type": "mobile",
                "X-Carrier-Hint": "jp-mobile",
            },
        }

    def create_ping_handler(self) -> dict:
        """
        Application-level ping handler.
        Uses JSON messages rather than WebSocket ping frames
        because some Japanese carrier proxies strip WS control frames.
        """
        return {
            "type": "ping_config",
            "interval_ms": self.config.ping_interval_seconds * 1000,
            "timeout_ms": self.config.pong_timeout_seconds * 1000,
            "message": json.dumps({"action": "ping", "ts": int(time.time())}),
            "expected_response": "pong",
            "on_timeout": "reconnect",
        }

    def get_service_worker_config(self) -> dict:
        """
        Service worker configuration that bypasses WebSocket URLs.
        Prevents the SW from intercepting WS upgrade requests.
        """
        return {
            "navigationPreload": True,
            "bypassPatterns": [
                # Never intercept WebSocket connections
                r"wss?://.*manga-assist\.example\.com.*",
                # Never intercept API Gateway Management API callbacks
                r"https://.*execute-api.*amazonaws\.com.*",
            ],
            "cachingStrategy": {
                "/": "network-first",
                "/static/*": "cache-first",
                "/api/*": "network-only",
            },
        }

    def get_visibility_handler_config(self) -> dict:
        """
        Page visibility change handler for iOS Safari.
        iOS aggressively suspends background tabs, killing WS connections.
        We proactively close and reconnect on visibility change.
        """
        return {
            "on_hidden": {
                "action": "close_gracefully" if self.config.force_close_on_background else "keep_alive",
                "save_state": True,
                "reason": "Proactive close to avoid zombie connections",
            },
            "on_visible": {
                "action": "reconnect" if self.config.reconnect_on_foreground else "check_connection",
                "delay_ms": 100,
                "restore_state": True,
            },
        }


def generate_lambda_authorizer_mobile_aware() -> str:
    """
    Lambda authorizer that recognizes mobile clients
    and adjusts connection TTL accordingly.
    """
    return '''
import json

def handler(event, context):
    """
    Mobile-aware WebSocket authorizer.
    Sets shorter connection idle timeout for mobile clients.
    """
    headers = event.get("headers", {})
    client_type = headers.get("x-client-type", "desktop")
    query = event.get("queryStringParameters", {}) or {}
    token = query.get("token", "")

    # Validate token (simplified)
    if not token:
        return generate_policy("anonymous", "Deny", event)

    # Set context for downstream handlers
    context_data = {
        "clientType": client_type,
        "idleTimeoutMs": "30000" if client_type == "mobile" else "600000",
        "pingRequired": "true" if client_type == "mobile" else "false",
    }

    policy = generate_policy("user123", "Allow", event)
    policy["context"] = context_data
    return policy


def generate_policy(principal_id, effect, event):
    return {
        "principalId": principal_id,
        "policyDocument": {
            "Version": "2012-10-17",
            "Statement": [{
                "Action": "execute-api:Invoke",
                "Effect": effect,
                "Resource": event["methodArn"],
            }],
        },
    }
'''

Prevention

  • Application-level pings every 20 seconds — Shorter than the 30-second carrier NAT timeout
  • Service worker bypass for all WebSocket URLs — prevents SW from intercepting upgrade requests
  • Proactive close/reconnect on page visibility change — avoids zombie connections when iOS suspends the tab
  • Mobile-specific CloudWatch dashboard with carrier-level success rate breakdowns
  • Real User Monitoring (RUM) via CloudWatch RUM to track actual connection success rates by device/carrier

Scenario 2: OpenAPI Spec Drift

Problem

A partner integrating with MangaAssist's REST API reports that their auto-generated TypeScript SDK is throwing type errors. The /chat response now includes a metadata.quotaRemaining field that was added by the backend team but never reflected in the OpenAPI spec. The partner's strict TypeScript types reject the response as invalid.

Detection

graph TB
    subgraph Signals["Detection Signals"]
        PARTNER[Partner Bug Report<br/>TypeScript SDK type error]
        SCHEMATHESIS[Contract Test<br/>Schemathesis found<br/>undocumented fields]
        DIFF[API Diff Check<br/>Response body mismatch]
        REVIEW[Code Review Gap<br/>Backend PR merged<br/>without spec update]
    end

    subgraph Drift["Drift Analysis"]
        D1["New field: metadata.quotaRemaining<br/>Added in backend code"]
        D2["OpenAPI spec: not updated<br/>Schema missing field"]
        D3["SDK generator: stale types<br/>Partner gets TypeScript errors"]
        D4["API Gateway: no validation<br/>Pass-through mode"]
    end

    PARTNER --> D3
    SCHEMATHESIS --> D1
    DIFF --> D2
    REVIEW --> D4

    style PARTNER fill:#dc3545,color:#fff
    style REVIEW fill:#ffc107,color:#000

Root Cause

  1. No spec-code synchronization check in the CI pipeline — backend changes could be merged without updating the OpenAPI spec
  2. API Gateway in pass-through mode — no response validation against the spec
  3. No automated spec-drift detection — Schemathesis contract tests were only run manually

Resolution

"""
MangaAssist OpenAPI Spec Drift Prevention
Automated pipeline to detect and prevent API spec drift.
"""

import json
import subprocess
import sys
import logging
from pathlib import Path
from typing import Optional

logger = logging.getLogger(__name__)


class SpecDriftDetector:
    """
    Detects drift between OpenAPI spec and actual API responses.
    Runs as a CI step and a periodic production check.
    """

    def __init__(self, spec_path: str, api_base_url: str):
        self.spec_path = spec_path
        self.api_base_url = api_base_url

    def run_ci_checks(self) -> dict:
        """
        CI pipeline checks to prevent spec drift.
        Returns dict with check results.
        """
        results = {}

        # Check 1: Validate spec is valid OpenAPI
        results["spec_valid"] = self._validate_spec()

        # Check 2: Check for breaking changes vs previous version
        results["breaking_changes"] = self._check_breaking_changes()

        # Check 3: Verify all API routes in code have spec entries
        results["route_coverage"] = self._check_route_coverage()

        # Check 4: Run contract tests against staging
        results["contract_tests"] = self._run_contract_tests()

        results["all_passed"] = all(
            r.get("passed", False) for r in results.values()
        )

        return results

    def _validate_spec(self) -> dict:
        """Validate OpenAPI spec with spectral."""
        result = subprocess.run(
            [
                "npx", "@stoplight/spectral-cli", "lint",
                self.spec_path,
                "--ruleset", ".spectral.yml",
            ],
            capture_output=True, text=True,
        )
        return {
            "passed": result.returncode == 0,
            "output": result.stdout,
            "errors": result.stderr,
        }

    def _check_breaking_changes(self) -> dict:
        """Check for breaking changes using oasdiff."""
        previous = self.spec_path.replace(".yaml", ".previous.yaml")
        if not Path(previous).exists():
            return {"passed": True, "message": "No previous spec to compare"}

        result = subprocess.run(
            [
                "oasdiff", "breaking",
                "--base", previous,
                "--revision", self.spec_path,
                "--format", "json",
                "--fail-on", "ERR",
            ],
            capture_output=True, text=True,
        )
        return {
            "passed": result.returncode == 0,
            "output": result.stdout,
        }

    def _check_route_coverage(self) -> dict:
        """
        Verify all Flask/FastAPI routes have corresponding OpenAPI paths.
        Scans source code for route decorators and cross-references spec.
        """
        import yaml

        with open(self.spec_path) as f:
            spec = yaml.safe_load(f)

        spec_paths = set(spec.get("paths", {}).keys())

        # Scan source for route definitions (FastAPI pattern)
        source_routes = set()
        for py_file in Path("src").rglob("*.py"):
            content = py_file.read_text()
            import re
            for match in re.finditer(
                r'@(?:app|router)\.(get|post|put|delete)\(\s*["\']([^"\']+)',
                content,
            ):
                route = match.group(2)
                source_routes.add(route)

        undocumented = source_routes - spec_paths
        orphaned = spec_paths - source_routes

        return {
            "passed": len(undocumented) == 0,
            "undocumented_routes": list(undocumented),
            "orphaned_spec_paths": list(orphaned),
        }

    def _run_contract_tests(self) -> dict:
        """Run Schemathesis contract tests against staging."""
        result = subprocess.run(
            [
                "schemathesis", "run",
                self.spec_path,
                "--base-url", self.api_base_url,
                "--checks", "all",
                "--max-examples", "20",
                "--hypothesis-suppress-health-check", "too_slow",
            ],
            capture_output=True, text=True, timeout=180,
        )
        return {
            "passed": result.returncode == 0,
            "output": result.stdout,
        }


def create_git_hook_script() -> str:
    """
    Git pre-commit hook that blocks commits changing API routes
    without corresponding OpenAPI spec updates.
    """
    return """#!/bin/bash
# Pre-commit hook: Check for API spec drift

SPEC_FILE="api/manga-assist-api.yaml"
API_FILES=$(git diff --cached --name-only | grep -E "^src/.*routes.*\\.py$")

if [ -n "$API_FILES" ]; then
    SPEC_CHANGED=$(git diff --cached --name-only | grep "$SPEC_FILE")

    if [ -z "$SPEC_CHANGED" ]; then
        echo "ERROR: API route files changed but OpenAPI spec was not updated."
        echo ""
        echo "Changed route files:"
        echo "$API_FILES"
        echo ""
        echo "Please update $SPEC_FILE to match your API changes."
        echo "Run 'npm run spec:validate' to check the spec."
        exit 1
    fi
fi

exit 0
"""

Prevention

  • Pre-commit hook blocks route file changes without corresponding spec updates
  • CI pipeline runs Spectral validation, oasdiff breaking-change detection, route coverage, and Schemathesis contract tests
  • API Gateway response validation enabled (catches undocumented fields)
  • Weekly automated spec-drift scan against production API
  • Partner SDK auto-rebuild triggered by spec changes, with breaking-change notifications

Scenario 3: Prompt Flow Timeout

Problem

The MangaAssist recommendation Prompt Flow starts timing out for queries involving the "Isekai" genre. The flow has a two-node chain (classification then recommendation), and the total execution time exceeds the 30-second Bedrock Agent timeout. Users see: "申し訳ございません、推薦の生成に失敗しました" (Sorry, recommendation generation failed).

Detection

graph TB
    subgraph Signals["Detection Signals"]
        FLOW_METRIC[Prompt Flow Metrics<br/>Isekai queries: 95% timeout<br/>Other genres: 2% timeout]
        BEDROCK_LATENCY[Bedrock Latency<br/>Sonnet P95: 18s<br/>Normal P95: 6s]
        RAG_SIZE[RAG Context Size<br/>Isekai: 4,200 tokens avg<br/>Other: 1,800 tokens avg]
        FLOW_LOGS[Flow Execution Logs<br/>ClassifyIntent: 3s<br/>GenerateRec: timeout at 27s]
    end

    subgraph RootCause["Root Cause Chain"]
        R1["Isekai genre recently added<br/>2,000+ new titles indexed"]
        R2["RAG returns larger context<br/>4,200 tokens vs 1,800 avg"]
        R3["Sonnet prompt is longer<br/>More tokens = slower generation"]
        R4["Two-node chain:<br/>3s classify + 27s recommend = 30s timeout"]
    end

    FLOW_METRIC --> R1
    RAG_SIZE --> R2
    BEDROCK_LATENCY --> R3
    FLOW_LOGS --> R4

    style FLOW_METRIC fill:#dc3545,color:#fff
    style R4 fill:#ffc107,color:#000

Root Cause

  1. New Isekai genre explosion — 2,000+ titles recently indexed, making RAG retrieval return much larger context (4,200 tokens vs. 1,800 average)
  2. Two-node serial chain — Classification (3s) + Recommendation (up to 27s) approaches the 30s timeout
  3. No per-genre token limits on RAG results — The RAG retrieval returned all matching results without limiting by genre

Resolution

"""
MangaAssist Prompt Flow Timeout Fix
RAG result limiting, flow optimization, and timeout-aware architecture.
"""

import json
import time
import logging
from typing import Optional

logger = logging.getLogger(__name__)


class OptimizedFlowInvoker:
    """
    Prompt Flow invoker with timeout awareness and RAG optimization.
    """

    # Per-genre RAG token limits
    RAG_TOKEN_LIMITS = {
        "isekai": 1500,    # Large catalog — limit strictly
        "shonen": 2000,
        "shoujo": 2000,
        "seinen": 2000,
        "josei": 1800,
        "horror": 1500,
        "sports": 1500,
        "default": 1800,
    }

    FLOW_TIMEOUT = 25  # seconds (5s buffer before 30s hard limit)
    CLASSIFY_BUDGET = 4  # seconds for classification
    RECOMMEND_BUDGET = 20  # seconds for recommendation

    def __init__(self, bedrock_agent_client, redis_client):
        self.bedrock = bedrock_agent_client
        self.redis = redis_client

    def invoke_with_timeout_awareness(
        self,
        user_message: str,
        session_context: str,
        rag_results: list[str],
        genre_hint: Optional[str] = None,
    ) -> dict:
        """
        Invoke the recommendation flow with timeout-aware optimizations.
        """
        start = time.time()

        # Step 1: If we already know the genre, skip classification
        if genre_hint:
            genre = genre_hint
            intent = "recommendation"
            classify_time = 0
        else:
            classify_start = time.time()
            classification = self._classify_with_timeout(
                user_message, session_context
            )
            classify_time = time.time() - classify_start
            genre = classification.get("genre", "unknown")
            intent = classification.get("intent", "general")

        if intent != "recommendation":
            return {"intent": intent, "genre": genre}

        # Step 2: Limit RAG results based on genre
        token_limit = self.RAG_TOKEN_LIMITS.get(
            genre, self.RAG_TOKEN_LIMITS["default"]
        )
        trimmed_rag = self._trim_rag_results(rag_results, token_limit)

        # Step 3: Calculate remaining time budget
        elapsed = time.time() - start
        remaining = self.FLOW_TIMEOUT - elapsed

        if remaining < 5:
            # Not enough time — use cached recommendation or Haiku fallback
            logger.warning(
                f"Insufficient time for recommendation: {remaining:.1f}s remaining"
            )
            return self._get_cached_or_fallback(user_message, genre)

        # Step 4: Invoke recommendation with adjusted timeout
        recommendation = self._generate_recommendation(
            user_message=user_message,
            genre=genre,
            rag_context="\n".join(trimmed_rag),
            session_context=session_context,
            timeout=remaining - 2,  # 2s buffer
        )

        total_time = time.time() - start
        logger.info(
            f"Flow completed: genre={genre}, classify={classify_time:.1f}s, "
            f"total={total_time:.1f}s"
        )

        return recommendation

    def _classify_with_timeout(
        self, message: str, context: str
    ) -> dict:
        """Classify intent with strict timeout."""
        try:
            # Use Haiku for fast classification
            # Direct Bedrock call instead of Prompt Flow for speed
            import boto3
            bedrock_runtime = boto3.client("bedrock-runtime", region_name="ap-northeast-1")

            body = json.dumps({
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": 150,
                "messages": [{"role": "user", "content": [{"type": "text", "text": (
                    f"Classify this manga query. Respond with JSON only.\n"
                    f"Message: {message}\n"
                    f'Output: {{"intent": "recommendation|faq|general", '
                    f'"genre": "shonen|shoujo|seinen|josei|isekai|horror|sports|unknown"}}'
                )}]}],
                "temperature": 0.1,
            })

            response = bedrock_runtime.invoke_model(
                modelId="anthropic.claude-3-haiku-20240307-v1:0",
                body=body,
            )

            result = json.loads(response["body"].read())
            text = result["content"][0]["text"]
            return json.loads(text)

        except Exception as e:
            logger.error(f"Classification failed: {e}")
            return {"intent": "general", "genre": "unknown"}

    def _trim_rag_results(
        self, results: list[str], max_tokens: int
    ) -> list[str]:
        """Trim RAG results to fit within token budget."""
        trimmed = []
        total_tokens = 0

        for result in results:
            # Estimate tokens (JP text: ~1 token per 1.5 chars)
            est_tokens = len(result) // 2
            if total_tokens + est_tokens > max_tokens:
                break
            trimmed.append(result)
            total_tokens += est_tokens

        if len(trimmed) < len(results):
            logger.info(
                f"RAG trimmed: {len(results)} -> {len(trimmed)} results, "
                f"~{total_tokens} tokens (limit: {max_tokens})"
            )

        return trimmed

    def _generate_recommendation(
        self, user_message: str, genre: str, rag_context: str,
        session_context: str, timeout: float,
    ) -> dict:
        """Generate recommendation with timeout."""
        # Implementation invokes Bedrock with timeout
        pass

    def _get_cached_or_fallback(self, message: str, genre: str) -> dict:
        """Return cached recommendation or quick Haiku fallback."""
        # Check cache for similar recent queries
        cache_key = f"rec_cache:{genre}:{hash(message) % 10000}"
        cached = self.redis.get(cache_key)
        if cached:
            return {"text": cached, "cached": True}

        # Quick Haiku fallback
        return {
            "text": f"「{genre}」ジャンルのおすすめ作品をお探しですね。"
                    "少々お待ちください。詳しいおすすめは次のメッセージでお伝えします。",
            "fallback": True,
        }

Prevention

  • Per-genre RAG token limits — cap context size per genre to prevent prompt bloat
  • Time-budget-aware flow execution — track elapsed time and adapt strategy (skip classification if genre is known, use Haiku fallback if time is short)
  • Direct Bedrock call for classification instead of Prompt Flow — saves ~2 seconds of flow orchestration overhead
  • Cached recommendations for popular queries in Redis with 5-minute TTL
  • Genre-specific monitoring dashboard — CloudWatch alarm when any genre's P95 latency exceeds 15 seconds

Scenario 4: Stale Cached Response

Problem

Users report receiving outdated manga availability information. A title that went out of stock 2 hours ago still shows as "在庫あり" (In stock) in the chatbot's responses. Investigation reveals that Redis is serving cached Bedrock responses that reference stale product data, and the CloudFront CDN is also caching API responses due to a misconfigured Cache-Control header.

Detection

graph TB
    subgraph Symptoms["User Symptoms"]
        S1["Chat says 在庫あり<br/>but product page shows 在庫なし"]
        S2["Prices in chat response<br/>differ from catalog"]
        S3["New releases not appearing<br/>in recommendations"]
    end

    subgraph CacheStack["Cache Layer Analysis"]
        CDN[CloudFront CDN<br/>Cache-Control: max-age=3600<br/>on /chat endpoint ERROR]
        REDIS_RESP[Redis Response Cache<br/>TTL: 1 hour for chat responses<br/>Not invalidated on product change]
        REDIS_RAG[Redis RAG Cache<br/>OpenSearch results cached 30min<br/>Not invalidated on reindex]
        DYNAMO_CACHE[DynamoDB DAX<br/>Item cache TTL: 5 min<br/>Query cache TTL: 1 min]
    end

    S1 --> CDN
    S1 --> REDIS_RESP
    S2 --> REDIS_RAG
    S3 --> REDIS_RAG

    style CDN fill:#dc3545,color:#fff
    style REDIS_RESP fill:#ffc107,color:#000

Root Cause

  1. CloudFront caching API responses — A misconfigured Cache-Control header (max-age=3600 instead of no-cache) on the chat endpoint caused CloudFront to cache dynamic responses for 1 hour
  2. Redis response cache not inventory-aware — Chat responses were cached with a flat 1-hour TTL, regardless of whether they referenced product inventory
  3. RAG cache not invalidated on product updates — OpenSearch query results were cached for 30 minutes and not invalidated when products were updated or delisted

Resolution

"""
MangaAssist Cache Invalidation Strategy
Multi-layer cache management with event-driven invalidation.
"""

import json
import time
import hashlib
import logging
from typing import Optional

import boto3

logger = logging.getLogger(__name__)


class CacheManager:
    """
    Multi-layer cache manager with event-driven invalidation.
    Handles Redis response cache, RAG cache, and CloudFront.
    """

    def __init__(self, redis_client, cloudfront_client=None):
        self.redis = redis_client
        self.cloudfront = cloudfront_client or boto3.client("cloudfront")
        self.distribution_id = "E1234567890ABC"

    def cache_chat_response(
        self,
        session_id: str,
        message: str,
        response_text: str,
        referenced_products: list[str],
        ttl: int = 300,  # 5 minutes, not 1 hour
    ) -> None:
        """
        Cache a chat response with product-aware metadata.
        Short TTL (5 min) and product dependency tracking.
        """
        cache_key = self._make_cache_key(session_id, message)

        cache_entry = {
            "text": response_text,
            "referenced_products": referenced_products,
            "cached_at": int(time.time()),
        }

        # Store response with short TTL
        self.redis.setex(
            f"chat_cache:{cache_key}",
            ttl,
            json.dumps(cache_entry, ensure_ascii=False),
        )

        # Track which cache entries reference which products
        # This enables targeted invalidation when products change
        for product_id in referenced_products:
            self.redis.sadd(f"product_cache_refs:{product_id}", cache_key)
            self.redis.expire(f"product_cache_refs:{product_id}", ttl + 60)

    def get_cached_response(
        self, session_id: str, message: str
    ) -> Optional[str]:
        """Get cached response if still valid."""
        cache_key = self._make_cache_key(session_id, message)
        cached = self.redis.get(f"chat_cache:{cache_key}")

        if not cached:
            return None

        entry = json.loads(cached)

        # Verify referenced products haven't been invalidated
        for product_id in entry.get("referenced_products", []):
            if self.redis.get(f"product_invalidated:{product_id}"):
                # Product changed — cache is stale
                self.redis.delete(f"chat_cache:{cache_key}")
                logger.info(
                    f"Cache invalidated: product {product_id} changed"
                )
                return None

        return entry["text"]

    def invalidate_product_caches(self, product_id: str) -> int:
        """
        Invalidate all caches referencing a specific product.
        Called by the product update event handler.
        """
        # Mark product as recently changed
        self.redis.setex(
            f"product_invalidated:{product_id}", 600, "1"
        )

        # Find and delete all chat cache entries referencing this product
        cache_refs = self.redis.smembers(f"product_cache_refs:{product_id}")
        deleted = 0
        for cache_key in cache_refs:
            if self.redis.delete(f"chat_cache:{cache_key}"):
                deleted += 1

        self.redis.delete(f"product_cache_refs:{product_id}")

        # Also invalidate RAG cache for this product
        self.redis.delete(f"rag_cache:product:{product_id}")

        logger.info(
            f"Product {product_id} cache invalidation: "
            f"{deleted} chat caches, 1 RAG cache"
        )
        return deleted

    def invalidate_cloudfront_paths(self, paths: list[str]) -> str:
        """Create CloudFront invalidation for specific paths."""
        response = self.cloudfront.create_invalidation(
            DistributionId=self.distribution_id,
            InvalidationBatch={
                "Paths": {
                    "Quantity": len(paths),
                    "Items": paths,
                },
                "CallerReference": f"manga-{int(time.time())}",
            },
        )
        invalidation_id = response["Invalidation"]["Id"]
        logger.info(f"CloudFront invalidation created: {invalidation_id}")
        return invalidation_id

    def _make_cache_key(self, session_id: str, message: str) -> str:
        """Generate deterministic cache key."""
        return hashlib.sha256(
            f"{session_id}:{message}".encode()
        ).hexdigest()[:16]


def handle_product_update_event(event: dict, context) -> dict:
    """
    DynamoDB Stream handler for product table updates.
    Triggers cache invalidation when products change.
    """
    import redis
    r = redis.Redis(
        host="manga-assist-cache.xxxxx.apne1.cache.amazonaws.com",
        port=6379, decode_responses=True,
    )
    cache_mgr = CacheManager(r)

    invalidated = 0

    for record in event.get("Records", []):
        if record["eventName"] in ("MODIFY", "REMOVE"):
            product_id = record["dynamodb"]["Keys"]["productId"]["S"]
            invalidated += cache_mgr.invalidate_product_caches(product_id)

    # Fix: Set correct Cache-Control header on API Gateway
    # This should already be configured, but we verify
    return {
        "statusCode": 200,
        "headers": {
            "Cache-Control": "no-store, no-cache, must-revalidate",
        },
        "body": json.dumps({"invalidated": invalidated}),
    }


def fix_cloudfront_cache_policy() -> dict:
    """
    Fix CloudFront cache behavior for the chat API.
    Chat API responses must NEVER be cached at the CDN layer.
    """
    cloudfront = boto3.client("cloudfront")

    # Create a no-cache policy for API endpoints
    policy = cloudfront.create_cache_policy(
        CachePolicyConfig={
            "Name": "MangaAssist-API-NoCache",
            "DefaultTTL": 0,
            "MaxTTL": 0,
            "MinTTL": 0,
            "ParametersInCacheKeyAndForwardedToOrigin": {
                "EnableAcceptEncodingGzip": True,
                "EnableAcceptEncodingBrotli": True,
                "HeadersConfig": {"HeaderBehavior": "none"},
                "CookiesConfig": {"CookieBehavior": "none"},
                "QueryStringsConfig": {"QueryStringBehavior": "none"},
            },
        }
    )

    return {"policy_id": policy["CachePolicy"]["Id"]}

Prevention

  • CloudFront cache policy with TTL=0 for all API endpoints — dynamic responses must never be CDN-cached
  • Product-aware cache keys — chat responses track which products they reference for targeted invalidation
  • DynamoDB Streams trigger cache invalidation Lambda on product updates
  • Short default TTL (5 minutes, not 1 hour) for all chat response caches
  • Cache-Control: no-store header enforced on all chat API responses

Scenario 5: Unexpected Prompt Chains

Problem

The MangaAssist Prompt Flow for complex recommendations starts producing extremely long, expensive responses. A condition node bug causes the flow to loop through the recommendation node 3 times instead of once, tripling the token usage and cost. One user's query generates $0.45 in Bedrock costs (normally ~$0.015) and takes 45 seconds.

Detection

graph TB
    subgraph Signals["Detection Signals"]
        TOKEN_SPIKE[Token Usage Metric<br/>Single request: 12,000 output tokens<br/>Normal: 400 output tokens]
        COST_ALERT[Cost Alert<br/>$0.45 per request<br/>Normal: $0.015]
        LATENCY[Latency Spike<br/>45 seconds<br/>Normal: 3 seconds]
        FLOW_TRACE[Flow Execution Trace<br/>GenerateRec node invoked 3x]
    end

    subgraph Bug["Root Cause: Condition Bug"]
        COND[Condition Node<br/>$.genre != 'unknown']
        LOOP["Always true after first pass<br/>Genre is set to 'isekai'<br/>Loop: Recommend → Classify → Recommend"]
        EDGE["Edge configuration<br/>Missing exit condition<br/>Default routes back to start"]
    end

    TOKEN_SPIKE --> COND
    COST_ALERT --> LOOP
    FLOW_TRACE --> EDGE

    style TOKEN_SPIKE fill:#dc3545,color:#fff
    style LOOP fill:#ffc107,color:#000

Root Cause

  1. Condition node default branch pointed back to the classification node instead of the output node
  2. No loop detection in the flow execution — Bedrock Prompt Flows allow cycles if configured
  3. No per-flow token budget — the flow could consume unlimited tokens across multiple node invocations

Resolution

"""
MangaAssist Prompt Flow Safety Guards
Prevents runaway prompt chains, loops, and token budget explosions.
"""

import json
import time
import logging
from dataclasses import dataclass
from typing import Optional

import boto3

logger = logging.getLogger(__name__)


@dataclass
class FlowSafetyConfig:
    """Safety limits for Prompt Flow execution."""
    max_node_invocations: int = 5
    max_total_tokens: int = 5000
    max_execution_time_seconds: float = 20.0
    max_cost_per_flow_usd: float = 0.10
    enable_loop_detection: bool = True


class SafeFlowInvoker:
    """
    Invoke Prompt Flows with safety guards against loops and runaway costs.
    """

    def __init__(
        self,
        bedrock_agent_client,
        redis_client,
        config: Optional[FlowSafetyConfig] = None,
    ):
        self.bedrock = bedrock_agent_client
        self.redis = redis_client
        self.config = config or FlowSafetyConfig()

    def invoke_flow_safe(
        self,
        flow_id: str,
        alias_id: str,
        inputs: list[dict],
        request_id: str,
    ) -> dict:
        """
        Invoke a Prompt Flow with safety guards.
        Monitors token usage, execution time, and node invocation count.
        """
        start = time.time()
        node_invocations = 0
        total_tokens = 0
        total_cost = 0.0
        visited_nodes = []

        try:
            response = self.bedrock.invoke_flow(
                flowIdentifier=flow_id,
                flowAliasIdentifier=alias_id,
                inputs=inputs,
            )

            result_parts = []

            for event in response.get("responseStream", []):
                # Time guard
                elapsed = time.time() - start
                if elapsed > self.config.max_execution_time_seconds:
                    logger.error(
                        f"Flow timeout: {elapsed:.1f}s > "
                        f"{self.config.max_execution_time_seconds}s"
                    )
                    self._record_safety_violation(
                        request_id, "timeout", elapsed
                    )
                    break

                if "flowTraceEvent" in event:
                    trace = event["flowTraceEvent"]
                    node_name = trace.get("nodeName", "unknown")
                    visited_nodes.append(node_name)
                    node_invocations += 1

                    # Node invocation guard
                    if node_invocations > self.config.max_node_invocations:
                        logger.error(
                            f"Max node invocations exceeded: "
                            f"{node_invocations} > {self.config.max_node_invocations}. "
                            f"Visited: {visited_nodes}"
                        )
                        self._record_safety_violation(
                            request_id, "max_nodes", node_invocations
                        )
                        break

                    # Loop detection
                    if self.config.enable_loop_detection:
                        if self._detect_loop(visited_nodes):
                            logger.error(
                                f"Loop detected in flow: {visited_nodes}"
                            )
                            self._record_safety_violation(
                                request_id, "loop", visited_nodes
                            )
                            break

                    # Track token usage from trace events
                    usage = trace.get("usage", {})
                    tokens = usage.get("output_tokens", 0)
                    total_tokens += tokens

                    if total_tokens > self.config.max_total_tokens:
                        logger.error(
                            f"Token budget exceeded: {total_tokens} > "
                            f"{self.config.max_total_tokens}"
                        )
                        self._record_safety_violation(
                            request_id, "token_budget", total_tokens
                        )
                        break

                elif "flowOutputEvent" in event:
                    output = event["flowOutputEvent"]
                    content = output.get("content", {})
                    if "document" in content:
                        result_parts.append(str(content["document"]))

                elif "flowCompletionEvent" in event:
                    pass  # Normal completion

            elapsed = time.time() - start

            return {
                "text": "".join(result_parts),
                "metrics": {
                    "node_invocations": node_invocations,
                    "total_tokens": total_tokens,
                    "execution_time_seconds": round(elapsed, 2),
                    "visited_nodes": visited_nodes,
                },
            }

        except Exception as e:
            logger.error(f"Flow invocation failed: {e}")
            raise

    def _detect_loop(self, visited: list[str]) -> bool:
        """
        Detect loops in node visitation pattern.
        A loop is detected if a sequence of 2+ nodes repeats.
        """
        if len(visited) < 4:
            return False

        # Check for repeated pairs (A→B→A→B)
        for pattern_len in range(2, len(visited) // 2 + 1):
            pattern = visited[-pattern_len:]
            preceding = visited[-(pattern_len * 2):-pattern_len]
            if pattern == preceding:
                return True

        # Check for single-node repetition (A→A→A)
        if len(visited) >= 3:
            last_three = visited[-3:]
            if len(set(last_three)) == 1:
                return True

        return False

    def _record_safety_violation(
        self, request_id: str, violation_type: str, details
    ) -> None:
        """Record safety violation for monitoring and alerting."""
        violation = {
            "request_id": request_id,
            "type": violation_type,
            "details": str(details),
            "timestamp": int(time.time()),
        }

        # Store in Redis for dashboard
        self.redis.lpush(
            "flow_safety_violations",
            json.dumps(violation),
        )
        self.redis.ltrim("flow_safety_violations", 0, 999)

        # Increment violation counter for alerting
        self.redis.incr(f"safety_violations:{violation_type}")
        self.redis.expire(f"safety_violations:{violation_type}", 3600)


def validate_flow_definition(flow_def: dict) -> list[str]:
    """
    Static analysis of a flow definition to detect potential issues.
    Run before deploying a new flow version.
    """
    issues = []
    nodes = {n["name"]: n for n in flow_def.get("nodes", [])}
    connections = flow_def.get("connections", [])

    # Build adjacency list
    adj = {}
    for conn in connections:
        src = conn["source"]
        tgt = conn["target"]
        adj.setdefault(src, []).append(tgt)

    # Check 1: Detect cycles using DFS
    def has_cycle(node, visited, rec_stack):
        visited.add(node)
        rec_stack.add(node)
        for neighbor in adj.get(node, []):
            if neighbor not in visited:
                if has_cycle(neighbor, visited, rec_stack):
                    return True
            elif neighbor in rec_stack:
                return True
        rec_stack.discard(node)
        return False

    visited = set()
    for node_name in nodes:
        if node_name not in visited:
            if has_cycle(node_name, visited, set()):
                issues.append(
                    f"CRITICAL: Cycle detected in flow graph involving node '{node_name}'"
                )

    # Check 2: Verify all condition branches have targets
    for node in flow_def.get("nodes", []):
        if node.get("type") == "Condition":
            outputs = {o["name"] for o in node.get("outputs", [])}
            connected = {
                conn["sourceOutput"]
                for conn in connections
                if conn["source"] == node["name"]
            }
            missing = outputs - connected
            if missing:
                issues.append(
                    f"WARNING: Condition node '{node['name']}' has unconnected "
                    f"branches: {missing}"
                )

    # Check 3: Verify output node is reachable
    output_nodes = [n["name"] for n in flow_def["nodes"] if n["type"] == "Output"]
    for out in output_nodes:
        incoming = [c for c in connections if c["target"] == out]
        if not incoming:
            issues.append(
                f"ERROR: Output node '{out}' has no incoming connections"
            )

    return issues

Prevention

  • Static analysis of flow definitions before deployment — detect cycles, unconnected branches, and unreachable outputs
  • Runtime loop detection — pattern matching on node visitation sequence (A-B-A-B detection)
  • Per-flow token budget — hard limit of 5,000 tokens across all nodes in a single invocation
  • Max node invocation count — stop execution after 5 node visits
  • Safety violation dashboard with CloudWatch alarm on violation rate > 0

Key Takeaways

# Takeaway MangaAssist Application
1 Mobile WebSocket needs carrier-aware keepalive — Japanese carrier NAT tables timeout at 30 seconds; application-level pings at 20-second intervals are required. MangaAssist mobile users on Docomo, au, and SoftBank maintain stable connections through aggressive keepalive.
2 Spec drift is a process failure, not a code failure — Pre-commit hooks and CI checks that block route changes without spec updates prevent drift at the source. Partners auto-regenerate SDKs on spec changes; breaking changes require manual API team approval before merge.
3 Prompt Flow timeouts need time-budget awareness — Track elapsed time across flow nodes and adapt strategy (skip steps, use fallback models) as the budget shrinks. Isekai genre queries skip the classification node when genre is known, saving 3 seconds of the 25-second budget.
4 Dynamic API responses must never be CDN-cached — CloudFront Cache-Control: no-store is mandatory for chat endpoints; only static assets should be cached. A single misconfigured header caused 2 hours of stale inventory data in chat responses for all users.
5 Prompt Flows need static analysis + runtime guards — Cycle detection before deployment plus loop detection and token budgets during execution prevent runaway costs. The flow validator runs in CI; the runtime guard has a hard stop at 5 node invocations and 5,000 tokens per flow execution.