LOCAL PREVIEW View on GitHub

Scenarios and Runbooks — Safeguarded AI Workflows

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Dimension Detail
Certification AWS AIP-C01 — AI Practitioner
Domain 2 — Development and Implementation of GenAI Solutions
Task 2.1 — Develop agentic AI solutions using AWS services
Skill 2.1.3 — Define safeguards within AI workflows (for example, stopping conditions, timeout mechanisms, IAM boundaries, circuit breakers, Guardrails)
This File Five production scenarios with detection flowcharts, root cause analysis, resolution code, and prevention strategies for safeguard failures

Skill Scope Statement

This file presents five failure scenarios for the safeguard systems in MangaAssist: circuit breaker misbehavior, Lambda timeout edge cases, IAM policy misconfiguration, Step Functions infinite loops, and Bedrock Guardrail false positives. Each scenario includes the problem, a mermaid detection flowchart, root cause analysis, Python resolution code, and prevention measures.


Mind Map — Safeguard Failure Modes

mindmap
  root((Safeguard<br/>Failure Modes))
    Circuit Breaker Issues
      Stuck Open After Throttling
      Premature Opening
      State Corruption
    Lambda Timeout
      Partial Response on Timeout
      Cold Start Exceeding Budget
      Orphaned Bedrock Call
    IAM Boundaries
      Too Restrictive Blocking Model
      Missing Condition Key
      Cross-Account Access Denied
    Step Functions
      Infinite Loop via Choice State
      Heartbeat Timeout Mismatch
      Retry Multiplication
    Guardrails
      False Positive Blocking Manga Content
      Grounding Score Too Strict
      PII Detection Over-Triggering

Scenario Overview

# Scenario Severity Blast Radius Typical Detection Time
1 Circuit breaker stuck open after Bedrock throttling resolves P1 — Critical All requests get fallback (no AI responses) 1-5 minutes via fallback rate alarm
2 Lambda timeout returns partial response to user P2 — High Single user sees truncated/broken response Immediate for user; minutes for monitoring
3 IAM policy too restrictive — blocks Bedrock model access after deployment P1 — Critical All AI features broken Immediate on first request
4 Step Functions infinite loop via misconfigured Choice state P1 — Critical Runaway cost ($3-5/hour per stuck execution) 60 seconds (workflow timeout)
5 Guardrail false positive blocks legitimate manga content queries P3 — Medium Users cannot discuss certain manga topics Post-hoc via user complaints

Scenario 1: Circuit Breaker Stuck Open After Throttling Resolves

Problem

During a Bedrock API traffic spike, the bedrock_sonnet circuit breaker opens after 5 ThrottlingException errors in 60 seconds. The throttling resolves after 2 minutes when Bedrock scales up. However, the circuit breaker remains in OPEN state because the Redis TTL for the circuit state is 24 hours and the half-open probe fails due to a coincidental network blip during the first probe. The circuit stays open for another 60 seconds, re-probes, and the network blip clears — but a bug in the recovery logic resets the recovery_stage to 0 on every half-open failure, causing repeated single-probe attempts that are fragile.

Detection

flowchart TD
    A["CloudWatch Alarm:<br/>fallback_response_rate > 80%<br/>for > 5 minutes"] --> B{"Check circuit<br/>breaker state"}
    B -->|"State = OPEN"| C["Circuit Stuck Open<br/>Confirmed"]
    B -->|"State = CLOSED"| D["Different issue —<br/>check Bedrock directly"]
    C --> E{"Is Bedrock actually<br/>available? (manual probe)"}
    E -->|"Yes — Bedrock responds<br/>normally"| F["ROOT CAUSE:<br/>Circuit stuck open despite<br/>service recovery"]
    E -->|"No — Bedrock still<br/>failing"| G["Correct behavior —<br/>wait for Bedrock recovery"]
    F --> H{"Check half-open<br/>probe history"}
    H -->|"Probes attempted but<br/>coincidentally failed"| I["ROOT CAUSE:<br/>Fragile single-probe recovery<br/>fails on any transient error"]
    I --> J["Execute Runbook 1"]

Root Cause

The half-open state allows only 1 probe request at recovery stage 0. If this single probe fails for any reason (transient network issue, brief latency spike, DNS hiccup), the circuit immediately reverts to OPEN and starts the 60-second countdown again. With a single-probe stage, the probability of false failure is high (~5% per probe). Over multiple cycles, the circuit can stay open for 10+ minutes despite the downstream service being healthy.

Resolution

"""
Runbook 1: Fix stuck-open circuit breaker with improved recovery logic.
Two fixes: (1) multi-probe minimum even at stage 0, (2) manual override.
"""

import time
import json
import logging
import redis

logger = logging.getLogger("manga_cb_stuck_open")


class CircuitBreakerRecoveryFix:
    """
    Fixes the fragile single-probe recovery problem.

    Changes:
    1. Minimum 3 probes even at stage 0 (was 1)
    2. Tolerate 1 failure per stage (was 0)
    3. Add manual override capability for on-call engineers
    4. Add health check endpoint that bypasses the circuit
    """

    REDIS_PREFIX = "manga:cb:"

    def __init__(self, redis_client: redis.Redis):
        self._redis = redis_client

    def force_close_circuit(self, service_name: str, reason: str) -> dict:
        """
        Manual override: force a circuit breaker to CLOSED state.
        Used by on-call engineers when the circuit is stuck open
        but the service is confirmed healthy.
        """
        key = f"{self.REDIS_PREFIX}{service_name}"

        old_state = self._redis.hgetall(key)
        old_state_str = {
            k.decode(): v.decode()
            for k, v in old_state.items()
        } if old_state else {}

        # Force to closed
        new_state = {
            "state": "CLOSED",
            "failure_count": "0",
            "success_count": "0",
            "total_requests": "0",
            "last_failure": "0",
            "last_state_change": str(time.time()),
            "recovery_stage": "0",
            "probes_in_flight": "0",
        }

        self._redis.hset(key, mapping=new_state)
        self._redis.expire(key, 86400)

        # Log the override with audit trail
        audit_key = f"{self.REDIS_PREFIX}audit:{service_name}:{int(time.time())}"
        self._redis.setex(audit_key, 86400 * 7, json.dumps({
            "action": "FORCE_CLOSE",
            "reason": reason,
            "previous_state": old_state_str,
            "new_state": new_state,
            "timestamp": time.time(),
        }))

        logger.warning(
            "Circuit [%s] FORCE CLOSED by operator: %s (was %s)",
            service_name,
            reason,
            old_state_str.get("state", "UNKNOWN"),
        )

        return {
            "service": service_name,
            "action": "FORCE_CLOSE",
            "previous_state": old_state_str.get("state", "UNKNOWN"),
            "new_state": "CLOSED",
        }

    def health_check_bypass(
        self, service_name: str, check_fn
    ) -> dict:
        """
        Run a health check that bypasses the circuit breaker.
        If the service is healthy, force the circuit closed.
        """
        try:
            start = time.monotonic()
            result = check_fn()
            latency_ms = (time.monotonic() - start) * 1000

            if result.get("healthy", False):
                self.force_close_circuit(
                    service_name,
                    f"Health check passed (latency={latency_ms:.0f}ms)",
                )
                return {
                    "healthy": True,
                    "action": "CIRCUIT_FORCE_CLOSED",
                    "latency_ms": latency_ms,
                }
            else:
                return {
                    "healthy": False,
                    "action": "NONE",
                    "reason": result.get("reason", "Health check failed"),
                }

        except Exception as e:
            return {
                "healthy": False,
                "action": "NONE",
                "reason": str(e),
            }


def create_improved_recovery_config():
    """
    Improved recovery stages that are more resilient to transient failures.

    Old: [1, 3, 10] with 0 failure tolerance
    New: [3, 5, 15] with 1 failure tolerance per stage
    """
    return {
        "progressive_recovery_stages": [3, 5, 15],
        "failures_tolerated_per_stage": 1,
        "probe_timeout_seconds": 10,
        "stage_evaluation_window_seconds": 30,
    }

Prevention

  • Minimum 3 probes at stage 0 — a single probe is too fragile. Requiring 3 probes and tolerating 1 failure makes recovery robust against transient issues.
  • Add a health check endpoint that bypasses the circuit breaker and tests the downstream service directly. Run it every 30 seconds when a circuit is OPEN.
  • Provide a manual override for on-call engineers with an audit trail. Sometimes the fastest resolution is human intervention.
  • Set a maximum OPEN duration — if a circuit has been open for 10 minutes, automatically transition to half-open regardless of probe results.

Scenario 2: Lambda Timeout Returns Partial Response

Problem

The Bedrock invocation Lambda has a 30-second timeout. A complex recommendation query causes Claude 3 Sonnet to generate a long response. The model finishes generating after 28 seconds, but the Lambda runs out of time during the response saving phase (DynamoDB write). The Lambda returns a partial result: the user sees a truncated answer that cuts off mid-sentence, and the session state is not updated, causing context loss on the next turn.

Detection

flowchart TD
    A["User reports:<br/>'Response cut off mid-sentence'"] --> B["Check Lambda<br/>CloudWatch logs"]
    B --> C{"Lambda execution<br/>duration close to timeout?"}
    C -->|"Duration > 28s<br/>(timeout = 30s)"| D["Lambda Timeout<br/>During Post-Processing"]
    C -->|"Duration < 25s"| E["Different issue —<br/>check response formatting"]
    D --> F{"Check if session<br/>state was saved"}
    F -->|"No DynamoDB write<br/>in logs"| G["ROOT CAUSE:<br/>Lambda timed out after model<br/>response but before session save"]
    G --> H["Execute Runbook 2"]

Root Cause

The Lambda handler has four phases: (1) load context, (2) build prompt, (3) invoke Bedrock, (4) save results. The Bedrock invocation in Phase 3 takes 28 seconds for a complex query, leaving only 2 seconds for Phase 4 (DynamoDB write). If the DynamoDB write takes more than 2 seconds (e.g., due to throttling), the Lambda times out. The Step Functions state receives a States.Timeout error, but the Bedrock response (which was already generated and received) is lost.

Resolution

"""
Runbook 2: Lambda timeout-safe response handling with pre-save checkpointing.
Ensures the model response is saved BEFORE any post-processing.
"""

import json
import time
import logging
import boto3

logger = logging.getLogger("manga_timeout_safe")

dynamodb = boto3.resource("dynamodb", region_name="us-east-1")
response_table = dynamodb.Table("manga_response_checkpoint")


class TimeoutSafeResponseHandler:
    """
    Saves the model response to a checkpoint immediately after receiving it,
    BEFORE any post-processing. If the Lambda times out during post-processing,
    the response is still retrievable from the checkpoint table.
    """

    def __init__(self, context):
        self._context = context
        self._checkpoint_id = None

    def checkpoint_response(
        self, session_id: str, response_text: str, usage: dict
    ) -> str:
        """
        Save the model response to a checkpoint table immediately.
        This is the first thing we do after receiving the Bedrock response.
        Fast write: ~50ms to DynamoDB.
        """
        checkpoint_id = f"{session_id}:{int(time.time() * 1000)}"

        response_table.put_item(Item={
            "checkpoint_id": checkpoint_id,
            "session_id": session_id,
            "response_text": response_text,
            "usage": json.dumps(usage),
            "saved_at": int(time.time()),
            "post_processed": False,
            "ttl": int(time.time()) + 3600,  # 1-hour retention
        })

        self._checkpoint_id = checkpoint_id
        logger.info(
            "Response checkpointed: %s (remaining: %dms)",
            checkpoint_id,
            self._context.get_remaining_time_in_millis(),
        )
        return checkpoint_id

    def mark_post_processed(self) -> None:
        """Mark the checkpoint as fully post-processed."""
        if self._checkpoint_id:
            response_table.update_item(
                Key={"checkpoint_id": self._checkpoint_id},
                UpdateExpression="SET post_processed = :pp",
                ExpressionAttributeValues={":pp": True},
            )

    @staticmethod
    def recover_from_checkpoint(session_id: str) -> dict:
        """
        Recover the last un-post-processed response for a session.
        Called by a Step Functions recovery state after a timeout.
        """
        response = response_table.query(
            IndexName="session_id-saved_at-index",
            KeyConditionExpression="session_id = :sid",
            ExpressionAttributeValues={":sid": session_id},
            ScanIndexForward=False,  # Most recent first
            Limit=1,
        )

        items = response.get("Items", [])
        if not items:
            return {"recovered": False}

        checkpoint = items[0]
        return {
            "recovered": True,
            "response_text": checkpoint["response_text"],
            "checkpoint_id": checkpoint["checkpoint_id"],
            "post_processed": checkpoint.get("post_processed", False),
        }


def lambda_handler_improved(event: dict, context) -> dict:
    """
    Improved Lambda handler with checkpoint-first pattern.
    """
    handler = TimeoutSafeResponseHandler(context)
    session_id = event.get("sessionId", "")

    # Phase 1-3: Load context, build prompt, invoke Bedrock (as before)
    response_text = _invoke_bedrock(event)
    usage = {"input_tokens": 500, "output_tokens": 300}

    # CRITICAL: Checkpoint the response IMMEDIATELY
    checkpoint_id = handler.checkpoint_response(
        session_id, response_text, usage
    )

    # Phase 4: Post-processing (may timeout — that is OK now)
    remaining_ms = context.get_remaining_time_in_millis()
    if remaining_ms > 3000:
        # Enough time for full post-processing
        _save_session_state(session_id, response_text)
        _update_metrics(usage)
        handler.mark_post_processed()
    else:
        # Not enough time — skip post-processing; it will be retried
        logger.warning(
            "Insufficient time for post-processing (%dms) — "
            "checkpoint %s will be recovered",
            remaining_ms, checkpoint_id,
        )

    return {
        "statusCode": 200,
        "answer": response_text,
        "checkpoint_id": checkpoint_id,
    }


def _invoke_bedrock(event: dict) -> str:
    """Placeholder for Bedrock invocation."""
    return "Model response text"


def _save_session_state(session_id: str, response: str) -> None:
    """Placeholder for session state save."""
    pass


def _update_metrics(usage: dict) -> None:
    """Placeholder for metrics update."""
    pass

Prevention

  • Checkpoint model responses immediately — save to a DynamoDB checkpoint table (~50ms) before any post-processing.
  • Reserve 3-5 seconds for post-processing — if context.get_remaining_time_in_millis() < 5000 after the Bedrock call, skip post-processing and let a recovery step handle it.
  • Add a Step Functions recovery state that checks the checkpoint table after a Lambda timeout and retries only the post-processing.
  • Use streaming for long responses — stream the Bedrock response directly to the WebSocket so the user sees it as it generates, even if the Lambda times out on post-processing.

Scenario 3: IAM Too Restrictive — Blocks Model Access

Problem

A deployment updates the Lambda execution role's IAM policy. The new policy restricts Bedrock access to specific model ARNs but uses the us-east-1 region in the ARN while the Lambda runs in ap-northeast-1. Every InvokeModel call returns AccessDeniedException. The chatbot is completely non-functional for all users.

Detection

flowchart TD
    A["CloudWatch Alarm:<br/>Bedrock InvokeModel errors = 100%"] --> B{"Check error type"}
    B -->|"AccessDeniedException"| C["IAM Permission<br/>Issue Confirmed"]
    B -->|"ThrottlingException"| D["Rate limiting —<br/>different runbook"]
    C --> E{"When did this start?"}
    E -->|"Correlates with<br/>deployment timestamp"| F["ROOT CAUSE:<br/>IAM policy change in<br/>latest deployment"]
    E -->|"No recent deployment"| G["Check for SCP change<br/>or permission boundary update"]
    F --> H{"Inspect the<br/>IAM policy diff"}
    H --> I["Region mismatch:<br/>Policy says us-east-1<br/>Lambda runs in ap-northeast-1"]
    I --> J["Execute Runbook 3"]

Root Cause

The IAM policy restricts bedrock:InvokeModel to specific model ARNs:

arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-3-sonnet-*
But the Lambda runs in ap-northeast-1 and calls the Tokyo endpoint. The correct ARN should be:
arn:aws:bedrock:ap-northeast-1::foundation-model/anthropic.claude-3-sonnet-*

Resolution

"""
Runbook 3: IAM policy validation and automated region-aware policy generation.
Prevents region mismatch in Bedrock model ARNs.
"""

import json
import logging
import boto3

logger = logging.getLogger("manga_iam_validator")

iam_client = boto3.client("iam")


class IAMPolicyValidator:
    """
    Validates IAM policies for MangaAssist Lambda roles.
    Catches common mistakes like region mismatches before deployment.
    """

    EXPECTED_REGIONS = ["ap-northeast-1", "us-east-1"]  # Primary + DR

    REQUIRED_BEDROCK_MODELS = [
        "anthropic.claude-3-sonnet-20240229-v1:0",
        "anthropic.claude-3-haiku-20240307-v1:0",
    ]

    def validate_bedrock_access(
        self, policy_document: dict, expected_region: str
    ) -> list[str]:
        """
        Validate that the IAM policy grants Bedrock access in the correct region.
        Returns a list of issues found (empty = valid).
        """
        issues = []

        for statement in policy_document.get("Statement", []):
            if statement.get("Effect") != "Allow":
                continue

            actions = statement.get("Action", [])
            if isinstance(actions, str):
                actions = [actions]

            bedrock_actions = [a for a in actions if a.startswith("bedrock:")]
            if not bedrock_actions:
                continue

            resources = statement.get("Resource", [])
            if isinstance(resources, str):
                resources = [resources]

            for resource in resources:
                if resource == "*":
                    issues.append(
                        "WARN: Wildcard resource for Bedrock — should restrict to specific models"
                    )
                    continue

                if "foundation-model/" in resource:
                    # Check region in ARN
                    arn_parts = resource.split(":")
                    if len(arn_parts) >= 4:
                        arn_region = arn_parts[3]
                        if arn_region != expected_region:
                            issues.append(
                                f"ERROR: Region mismatch — policy ARN has "
                                f"'{arn_region}' but Lambda runs in "
                                f"'{expected_region}'. Fix the ARN to: "
                                f"arn:aws:bedrock:{expected_region}::foundation-model/..."
                            )

            # Check that required models are covered
            for model in self.REQUIRED_BEDROCK_MODELS:
                model_covered = any(
                    model in r or "*" in r for r in resources
                )
                if not model_covered:
                    issues.append(
                        f"WARN: Model '{model}' not explicitly covered in policy"
                    )

        return issues

    def generate_correct_policy(self, region: str, account_id: str) -> dict:
        """
        Generate a correct Bedrock access policy for the given region.
        """
        model_arns = [
            f"arn:aws:bedrock:{region}::foundation-model/{model}"
            for model in self.REQUIRED_BEDROCK_MODELS
        ]

        return {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Sid": "AllowBedrockInvoke",
                    "Effect": "Allow",
                    "Action": [
                        "bedrock:InvokeModel",
                        "bedrock:InvokeModelWithResponseStream",
                    ],
                    "Resource": model_arns,
                    "Condition": {
                        "StringEquals": {
                            "aws:RequestedRegion": region,
                        },
                    },
                },
                {
                    "Sid": "AllowGuardrails",
                    "Effect": "Allow",
                    "Action": ["bedrock:ApplyGuardrail"],
                    "Resource": [
                        f"arn:aws:bedrock:{region}:{account_id}:guardrail/*"
                    ],
                },
            ],
        }


def pre_deployment_iam_check(
    role_name: str, expected_region: str
) -> dict:
    """
    Run as a pre-deployment check in the CI/CD pipeline.
    Validates the IAM policy before deploying Lambda changes.
    """
    validator = IAMPolicyValidator()

    # Get current role policies
    response = iam_client.list_attached_role_policies(RoleName=role_name)
    all_issues = []

    for policy in response.get("AttachedPolicies", []):
        policy_arn = policy["PolicyArn"]
        policy_version = iam_client.get_policy(
            PolicyArn=policy_arn
        )["Policy"]["DefaultVersionId"]

        policy_doc = iam_client.get_policy_version(
            PolicyArn=policy_arn,
            VersionId=policy_version,
        )["PolicyVersion"]["Document"]

        issues = validator.validate_bedrock_access(policy_doc, expected_region)
        if issues:
            all_issues.extend(issues)

    return {
        "role": role_name,
        "issues": all_issues,
        "valid": len([i for i in all_issues if i.startswith("ERROR")]) == 0,
    }

Prevention

  • Run IAM policy validation in the CI/CD pipeline before every deployment. Block deployment if any ERROR-level issues are found.
  • Use parameterized IAM policies with ${AWS::Region} substitution in CloudFormation/CDK templates instead of hardcoded region strings.
  • Add a canary test that makes a lightweight Bedrock call (Haiku with 10 max tokens) immediately after deployment. If it fails, auto-rollback.
  • Set up a CloudWatch alarm on bedrock:InvokeModel errors with 0% threshold — any access denied error should trigger immediately.

Scenario 4: Step Functions Infinite Loop

Problem

A Step Functions state machine has a Choice state that routes back to a previous state when a condition is not met (an agentic loop). Due to a bug, the condition variable is never set to the expected value, creating an infinite loop. The Express Workflow's 60-second timeout prevents truly infinite execution, but during those 60 seconds, the workflow makes ~20 Lambda invocations, burning tokens and compute. If multiple users trigger this simultaneously, Lambda concurrency and Bedrock quota are consumed by stuck workflows.

Detection

flowchart TD
    A["CloudWatch Alarm:<br/>StepFunctions ExecutionDuration<br/>> 55s (p95)"] --> B{"Check execution<br/>event history"}
    B -->|"Same state visited > 10 times"| C["Infinite Loop<br/>Confirmed"]
    B -->|"Normal progression"| D["Slow execution —<br/>different issue"]
    C --> E{"Check the Choice state<br/>condition variable"}
    E -->|"Variable never reaches<br/>expected value"| F["ROOT CAUSE:<br/>Condition variable not<br/>updated by Lambda handler"]
    E -->|"Variable oscillates"| G["ROOT CAUSE:<br/>Lambda handler alternates<br/>the variable value"]
    F --> H["Execute Runbook 4"]
    G --> H

Root Cause

The Lambda handler for UpdateReasoningContext is supposed to set $.reasoning_context.current_iteration to an incrementing value. However, a bug causes it to always return the iteration value from the input (not input + 1). The Choice state checks current_iteration >= 5 and always sees the initial value (0), so it always routes back to the loop start.

Resolution

"""
Runbook 4: Step Functions loop guard with external iteration tracking
and automatic execution termination.
"""

import time
import json
import logging
import boto3

logger = logging.getLogger("manga_loop_guard")

sfn_client = boto3.client("stepfunctions", region_name="us-east-1")
cloudwatch = boto3.client("cloudwatch", region_name="us-east-1")


class StepFunctionsLoopGuard:
    """
    External loop guard for Step Functions workflows.
    Tracks state visit counts in Redis and terminates executions
    that exceed the maximum iteration count.
    """

    MAX_STATE_VISITS = 10  # No single state should be visited more than 10 times
    REDIS_PREFIX = "manga:sfn:guard:"

    def __init__(self, redis_client):
        self._redis = redis_client

    def record_state_visit(
        self, execution_id: str, state_name: str
    ) -> dict:
        """
        Record a state visit. Called by a Lambda at the start of each state.
        Returns whether the execution should continue.
        """
        key = f"{self.REDIS_PREFIX}{execution_id}"
        field = f"state:{state_name}"

        visit_count = self._redis.hincrby(key, field, 1)
        self._redis.expire(key, 300)  # 5-minute TTL

        total_visits = sum(
            int(v) for v in self._redis.hvals(key) if v
        )

        if visit_count > self.MAX_STATE_VISITS:
            logger.error(
                "Loop detected: execution=%s state=%s visits=%d",
                execution_id, state_name, visit_count,
            )
            return {
                "continue": False,
                "reason": f"State '{state_name}' visited {visit_count} times (max: {self.MAX_STATE_VISITS})",
                "total_visits": total_visits,
            }

        return {"continue": True, "visit_count": visit_count}

    def terminate_stuck_execution(
        self, execution_arn: str, reason: str
    ) -> None:
        """Terminate a stuck Step Functions execution."""
        try:
            sfn_client.stop_execution(
                executionArn=execution_arn,
                error="LoopGuardTermination",
                cause=reason,
            )
            logger.warning(
                "Terminated stuck execution: %s%s",
                execution_arn, reason,
            )
        except Exception as e:
            logger.error(
                "Failed to terminate execution %s: %s",
                execution_arn, str(e),
            )


def create_loop_safe_increment_handler():
    """
    Fixed Lambda handler that properly increments the iteration counter.
    Includes a guard check that prevents the loop from running away.
    """
    def handler(event, context):
        ctx = event.get("reasoning_context", {})

        # BUG FIX: Properly increment the counter
        current = int(ctx.get("current_iteration", 0))
        ctx["current_iteration"] = current + 1  # Was: current (no increment)

        # Guard check: raise error if iteration is unreasonably high
        if ctx["current_iteration"] > 20:
            raise Exception(
                f"Safety guard: iteration count {ctx['current_iteration']} "
                f"exceeds maximum. Possible infinite loop."
            )

        return ctx

    return handler

Prevention

  • Add a loop guard Lambda at the start of every loop body that tracks state visit counts externally (in Redis) and terminates the execution if a state is visited more than N times.
  • Unit test the iteration counter — the specific bug (returning input value instead of input+1) is a simple regression test.
  • Add a CloudWatch alarm on ExecutionDuration > 50s for Express Workflows — these should complete in under 5 seconds normally.
  • Use the Map state with MaxConcurrency instead of manual Choice-based loops where possible — Map state has built-in iteration limits.

Scenario 5: Guardrail False Positive Blocking Manga Content

Problem

The Bedrock Guardrail (ID: manga-content-guardrail) is configured with content filters for violence and sexual content. A user asks "What volumes of Attack on Titan have the most graphic battle scenes?" The guardrail blocks this query with a GUARDRAIL_INTERVENED response, categorizing it as violence-related content. The user sees the generic redirect message instead of a legitimate product question answer.

Detection

flowchart TD
    A["Support ticket:<br/>'Chatbot won't answer my<br/>question about manga'"] --> B["Check session trace<br/>for guardrail block"]
    B -->|"GUARDRAIL_INTERVENED<br/>in trace"| C["Guardrail Block<br/>Confirmed"]
    C --> D{"Review the blocked<br/>content"}
    D -->|"Content is a legitimate<br/>manga product query"| E["False Positive<br/>Confirmed"]
    D -->|"Content violates policy"| F["Correct block —<br/>no action needed"]
    E --> G{"Check which filter<br/>triggered"}
    G -->|"Violence filter with<br/>'graphic' keyword"| H["ROOT CAUSE:<br/>Guardrail violence filter<br/>triggered by manga-appropriate<br/>vocabulary"]
    H --> I["Execute Runbook 5"]

Root Cause

The guardrail's violence content filter is set to LOW threshold, which catches any mention of violence-related words. In the manga domain, words like "graphic," "battle," "fight," "blood," and "death" are normal product vocabulary (e.g., "graphic novel," "battle scenes," "fight choreography"). The guardrail does not have domain context and treats these as violence-related content.

Resolution

"""
Runbook 5: Guardrail false positive management with domain-specific
word lists and pre-screening bypass for product queries.
"""

import json
import re
import logging
import boto3

logger = logging.getLogger("manga_guardrail_fp")

bedrock = boto3.client("bedrock", region_name="us-east-1")


class GuardrailFalsePositiveManager:
    """
    Manages guardrail false positives for manga-domain queries.

    Strategy:
    1. Pre-screen queries for manga-domain vocabulary
    2. If the query is product-related, apply guardrail with elevated thresholds
    3. Track false positive patterns for guardrail config updates
    """

    # Manga-domain words that trigger guardrail violence filters but are
    # legitimate product vocabulary
    MANGA_DOMAIN_VOCABULARY = {
        "graphic",       # "graphic novel" not "graphic violence"
        "battle",        # "battle manga" / "battle scenes"
        "fight",         # "fight choreography"
        "death",         # "Death Note" (manga title)
        "attack",        # "Attack on Titan" (manga title)
        "kill",          # "kill la kill" (anime/manga title)
        "blood",         # "Blood+" (manga title) / common genre element
        "demon",         # "Demon Slayer" (manga title)
        "chainsaw",      # "Chainsaw Man" (manga title)
        "assassination", # "Assassination Classroom" (manga title)
        "hell",          # "Hellsing" (manga title)
        "gun",           # "Gundam" (franchise)
        "sword",         # common manga element
        "dark",          # "darker" tone preferences
        "violent",       # "violent manga" as genre search
    }

    # Patterns that indicate a product query vs. harmful content
    PRODUCT_QUERY_PATTERNS = [
        re.compile(r"volume[s]?\s+\d+", re.IGNORECASE),
        re.compile(r"(which|what|best|favorite)\s+manga", re.IGNORECASE),
        re.compile(r"(recommend|suggest|similar)\s+", re.IGNORECASE),
        re.compile(r"(price|cost|buy|order|stock)", re.IGNORECASE),
        re.compile(r"(genre|shounen|seinen|shojo)", re.IGNORECASE),
        re.compile(r"(chapter|arc|series|author)", re.IGNORECASE),
    ]

    def is_likely_product_query(self, query: str) -> bool:
        """
        Determine if a query is about manga products (vs. harmful content).
        If it matches product patterns and uses domain vocabulary,
        it is likely a false positive candidate.
        """
        has_domain_word = any(
            word in query.lower() for word in self.MANGA_DOMAIN_VOCABULARY
        )
        has_product_pattern = any(
            pattern.search(query) for pattern in self.PRODUCT_QUERY_PATTERNS
        )
        return has_domain_word and has_product_pattern

    def apply_guardrail_with_context(
        self,
        content: str,
        source: str = "INPUT",
        is_product_query: bool = False,
    ) -> dict:
        """
        Apply Bedrock Guardrail with domain-aware context.
        For product queries, use a higher threshold guardrail version.
        """
        # Use different guardrail versions based on query type
        if is_product_query:
            guardrail_id = "manga-content-guardrail"
            guardrail_version = "2"  # Version 2: Higher violence threshold for manga domain
        else:
            guardrail_id = "manga-content-guardrail"
            guardrail_version = "1"  # Version 1: Standard thresholds

        bedrock_runtime = boto3.client("bedrock-runtime", region_name="us-east-1")

        try:
            response = bedrock_runtime.apply_guardrail(
                guardrailIdentifier=guardrail_id,
                guardrailVersion=guardrail_version,
                source=source,
                content=[{"text": {"text": content}}],
            )

            action = response.get("action", "NONE")

            if action == "GUARDRAIL_INTERVENED" and is_product_query:
                # Log as potential false positive
                logger.info(
                    "Guardrail blocked product query (potential FP): '%s...'",
                    content[:100],
                )
                self._log_false_positive(content, response)

            return {
                "action": action,
                "guardrail_version": guardrail_version,
                "is_product_query": is_product_query,
                "response": response,
            }

        except Exception as e:
            logger.error("Guardrail call failed: %s", e)
            return {"action": "NONE", "error": str(e)}

    def _log_false_positive(self, content: str, response: dict) -> None:
        """Log a potential false positive for review and guardrail tuning."""
        dynamodb = boto3.resource("dynamodb", region_name="us-east-1")
        table = dynamodb.Table("manga_guardrail_fp_log")

        import time
        table.put_item(Item={
            "log_id": f"fp-{int(time.time() * 1000)}",
            "content_preview": content[:500],
            "guardrail_response": json.dumps(response, default=str)[:2000],
            "timestamp": int(time.time()),
            "reviewed": False,
            "ttl": int(time.time()) + 86400 * 30,  # 30-day retention
        })


def update_guardrail_for_manga_domain():
    """
    Create Guardrail Version 2 with manga-domain-appropriate thresholds.

    Changes from Version 1:
    - Violence filter: LOW → MEDIUM (allows manga battle discussion)
    - Add contextual grounding check (prevents actual harmful content)
    - Add word filter exemptions for manga titles containing violent words
    """
    return {
        "guardrailId": "manga-content-guardrail",
        "version": "2",
        "contentPolicyConfig": {
            "filtersConfig": [
                {
                    "type": "VIOLENCE",
                    "inputStrength": "MEDIUM",   # Was LOW
                    "outputStrength": "MEDIUM",  # Was LOW
                },
                {
                    "type": "HATE",
                    "inputStrength": "LOW",
                    "outputStrength": "LOW",
                },
                {
                    "type": "SEXUAL",
                    "inputStrength": "LOW",
                    "outputStrength": "LOW",
                },
                {
                    "type": "INSULTS",
                    "inputStrength": "HIGH",
                    "outputStrength": "HIGH",
                },
            ],
        },
        "wordPolicyConfig": {
            "managedWordListsConfig": [
                {"type": "PROFANITY"},
            ],
            # Do NOT add manga titles to the word filter blocklist
        },
        "contextualGroundingPolicyConfig": {
            "filtersConfig": [
                {
                    "type": "GROUNDING",
                    "threshold": 0.7,
                },
                {
                    "type": "RELEVANCE",
                    "threshold": 0.6,
                },
            ],
        },
    }

Prevention

  • Create a manga-domain-specific guardrail version with higher violence filter threshold (MEDIUM instead of LOW) for product queries.
  • Pre-screen queries for manga-domain vocabulary before applying the guardrail. Route product queries to the domain-specific guardrail version.
  • Maintain a false positive log — review blocked queries weekly and tune the guardrail configuration based on patterns.
  • Use contextual grounding as the primary safety check instead of keyword-based content filters. Grounding checks whether the response is factual, which is more appropriate for a product catalog chatbot.

Cross-Scenario Decision Tree

flowchart TD
    START["Safeguard System Alert"] --> Q1{"What is the<br/>symptom?"}
    Q1 -->|"High fallback rate<br/>with OPEN circuit"| S1["Scenario 1:<br/>Stuck Circuit Breaker"]
    Q1 -->|"Truncated user response"| S2["Scenario 2:<br/>Lambda Timeout"]
    Q1 -->|"AccessDeniedException<br/>on all requests"| S3["Scenario 3:<br/>IAM Misconfiguration"]
    Q1 -->|"Workflow exceeding<br/>60s regularly"| S4["Scenario 4:<br/>Infinite Loop"]
    Q1 -->|"Users report legitimate<br/>queries blocked"| S5["Scenario 5:<br/>Guardrail False Positive"]
    S1 --> R1["Health check bypass<br/>→ Force close circuit<br/>→ Fix probe logic"]
    S2 --> R2["Checkpoint response before post-processing<br/>→ Recovery state in Step Functions"]
    S3 --> R3["Fix region in ARN<br/>→ Add CI/CD IAM validation<br/>→ Canary test post-deploy"]
    S4 --> R4["Fix increment handler<br/>→ External loop guard<br/>→ Alarm on duration > 50s"]
    S5 --> R5["Create domain-specific guardrail version<br/>→ Pre-screen product queries<br/>→ FP review pipeline"]

Runbook Summary Table

Scenario Detection Signal Immediate Action Long-Term Fix Owner
1. Stuck Circuit Breaker Fallback rate > 80% for > 5 min Force-close circuit via manual override Multi-probe recovery (min 3); health check endpoint; max open duration Platform Team
2. Lambda Timeout Partial Response User sees truncated response; Lambda duration > 28s Recover response from checkpoint table Checkpoint-first pattern; streaming responses; reserve post-processing budget Backend Team
3. IAM Region Mismatch 100% AccessDeniedException after deployment Rollback deployment; fix ARN region CI/CD IAM validation; parameterized regions; post-deploy canary test DevOps Team
4. Step Functions Infinite Loop ExecutionDuration > 55s at p95 Terminate stuck executions; fix increment bug External loop guard in Redis; unit test iteration logic; Map state over Choice loops Backend Team
5. Guardrail False Positive User complaints about blocked manga queries Route product queries to domain-specific guardrail version Manga-domain vocabulary exemptions; FP log review pipeline; contextual grounding over keyword filters AI Safety Team

Key Takeaways

  1. Circuit breakers can be worse than the problem they solve — a stuck-open circuit blocks all traffic even when the downstream service is healthy. Always have a manual override and a health check bypass.

  2. Lambda timeouts must be designed, not hoped for — save the most valuable output (model response) immediately after receiving it, before any post-processing. The checkpoint-first pattern ensures no work is lost.

  3. IAM policy bugs are silent until deployment — they do not show up in unit tests or integration tests unless you explicitly test against real AWS APIs. A CI/CD IAM validation step catches region mismatches, missing permissions, and overly restrictive policies.

  4. Step Functions loops need external guards — the Choice-based loop pattern is common but fragile. A single bug in the counter update creates a 60-second runaway. External loop guards (Redis counters) provide defense-in-depth.

  5. Guardrail configurations are domain-specific — the same violence filter threshold that works for a general chatbot will produce false positives for a manga store chatbot. Create domain-specific guardrail versions with appropriate thresholds and use pre-screening to route queries correctly.


Previous file: 02-circuit-breaker-timeout-patterns.md Back to overview: 01-safeguard-architecture.md