LOCAL PREVIEW View on GitHub

Circuit Breakers, Timeout Mechanisms, and Resource Boundaries

MangaAssist context: JP Manga store chatbot on AWS — Bedrock Claude 3 (Sonnet at $3/$15 per 1M tokens input/output, Haiku at $0.25/$1.25), OpenSearch Serverless (vector store), DynamoDB (sessions/products), ECS Fargate (orchestrator), API Gateway WebSocket, ElastiCache Redis. Target: useful answer in under 3 seconds, 1M messages/day scale.


Skill Mapping

Field Value
Certification AWS AI Practitioner (AIP-C01)
Domain 2 — Implementation and Integration
Task 2.1 — Implement agentic AI solutions and tool integrations
Skill 2.1.3 — Develop safeguarded AI workflows to ensure controlled FM behavior
Focus Areas Circuit breakers, Lambda timeouts, IAM resource boundaries, Step Functions stopping conditions, token budget guardrails

Mind Map — Safeguard Patterns for Controlled FM Behavior

mindmap
  root((Safeguarded AI Workflows))
    Circuit Breakers
      Closed State
        Normal operation
        Failure counter tracks errors
        Threshold triggers transition
      Open State
        All requests rejected immediately
        Fallback responses served
        Recovery timer countdown
      Half-Open State
        Limited probe requests allowed
        Success resets to closed
        Failure returns to open
      Configuration
        Failure threshold 5 errors
        Recovery timeout 30 seconds
        Half-open max probes 3
    Timeout Mechanisms
      Lambda Timeouts
        Hard limit 15 minutes max
        Soft timeout at 80 percent
        Graceful degradation path
      API Gateway Timeouts
        WebSocket idle 10 minutes
        Integration timeout 29 seconds
        Custom timeout per route
      Bedrock InvokeModel Timeout
        SDK read timeout 60 seconds
        Connection timeout 10 seconds
        Retry with exponential backoff
      Streaming Timeouts
        First byte timeout 5 seconds
        Inter-chunk timeout 2 seconds
        Total stream timeout 30 seconds
    IAM Resource Boundaries
      Least Privilege Policies
        Model-specific ARN restrictions
        Region-locked access
        Action-level granularity
      Service Control Policies
        Organization-wide FM limits
        Deny expensive models
        Enforce tagging requirements
      Resource Tags
        Cost center allocation
        Environment separation
        Team-level boundaries
    Step Functions Controls
      Stopping Conditions
        MaxConcurrency limits
        TimeoutSeconds per state
        HeartbeatSeconds monitoring
      Error Handling
        Retry with backoff
        Catch with fallback
        ResultPath for error context
      Cost Guards
        Execution time limits
        State transition caps
        Map state concurrency
    Token Budget Guardrails
      Input Token Limits
        Max prompt length 4096 tokens
        Context window management
        Truncation strategies
      Output Token Limits
        MaxTokens parameter
        Stop sequences
        Budget per conversation turn
      Cost Ceiling
        Daily spend limit
        Per-user token quota
        Alert at 80 percent threshold

1. Circuit Breaker Patterns for FM Calls

Why Circuit Breakers Matter for FM Services

Foundation Model APIs are external dependencies with variable latency and availability. Without circuit breakers, a degraded Bedrock endpoint can cascade failures through the entire MangaAssist stack: API Gateway queues requests, ECS tasks exhaust connections, DynamoDB session writes spike, and users see hanging WebSocket connections. A circuit breaker isolates the failure, serves cached or fallback responses, and allows the system to recover gracefully.

State Machine: Closed, Open, Half-Open

CLOSED STATE (Normal Operation)
  |
  | failure_count >= threshold (5 errors in 60s)
  v
OPEN STATE (Requests Blocked)
  |
  | recovery_timeout elapsed (30 seconds)
  v
HALF-OPEN STATE (Probe Mode)
  |
  |--- probe succeeds --> CLOSED STATE (reset counters)
  |--- probe fails -----> OPEN STATE (restart timer)

Closed State: Every FM request passes through. The breaker tracks consecutive failures. When MangaAssist sends a product recommendation query to Claude Sonnet and it succeeds, the failure counter resets. If Bedrock returns a ThrottlingException or ModelTimeoutException, the counter increments. Once failures hit the threshold (e.g., 5 within 60 seconds), the breaker trips to Open.

Open State: All FM requests are immediately rejected without contacting Bedrock. MangaAssist returns a cached response from ElastiCache Redis or a graceful fallback message. This prevents wasting compute and money on requests that will likely fail, and protects the 3-second latency target. After a recovery timeout (e.g., 30 seconds), the breaker transitions to Half-Open.

Half-Open State: A limited number of probe requests (e.g., 3) are forwarded to Bedrock. If they succeed, the breaker returns to Closed and normal traffic resumes. If any probe fails, the breaker returns to Open and the recovery timer restarts. This prevents flooding a recovering service.

MangaAssist-Specific Thresholds

Parameter Value Rationale
Failure threshold 5 errors Tolerates brief Bedrock blips without tripping
Failure window 60 seconds Distinguishes burst errors from sustained outage
Recovery timeout 30 seconds Bedrock throttling typically resolves in 15-20s
Half-open probes 3 requests Validates recovery without overwhelming endpoint
Probe timeout 5 seconds Quick validation — full 3s target + 2s buffer
Fallback TTL 300 seconds Cache popular manga responses for 5 minutes

Circuit Breaker for Multiple FM Models

MangaAssist uses both Sonnet (complex queries) and Haiku (simple lookups). Each model needs its own circuit breaker instance because they have independent availability:

Sonnet Circuit Breaker ── [CLOSED] ── Routes complex queries
Haiku Circuit Breaker  ── [OPEN]   ── Fallback to cached responses
                                       (Sonnet still operational)

When the Haiku breaker opens, MangaAssist can optionally promote simple queries to Sonnet (at higher cost) rather than serving only cached responses. This tiered fallback strategy maintains service quality.


2. Lambda Timeout Mechanisms and Graceful Degradation

Lambda Timeout Architecture

AWS Lambda enforces a hard timeout ceiling of 15 minutes. For MangaAssist's real-time chatbot, Lambda functions processing FM calls should have much tighter timeouts:

Lambda Function Timeout Rationale
manga-chat-orchestrator 30 seconds Full RAG pipeline: retrieve + generate
manga-bedrock-invoke 15 seconds Single Bedrock InvokeModel call
manga-embedding-generator 10 seconds Embedding for vector search
manga-session-manager 5 seconds DynamoDB read/write operations
manga-fallback-handler 3 seconds Serve cached response quickly

Soft Timeout vs Hard Timeout

Hard Timeout: Lambda kills the execution. The response is lost. The user sees an error or timeout message through the WebSocket.

Soft Timeout: Application-level timer fires before the Lambda hard timeout. The function catches the signal and returns a partial or fallback response. This is the preferred pattern for user-facing interactions.

Timeline for manga-bedrock-invoke (15s hard timeout):
|-------- Bedrock call --------|
0s        5s        10s       12s  13s  15s
          |          |         |    |    |
     First byte  Soft timeout  |  Grace  Hard
     expected    fires (80%)   |  period timeout
                               |
                          Return partial
                          or fallback

The soft timeout fires at 80% of the hard timeout (12 seconds for a 15-second function). This leaves a 3-second grace period to write session state to DynamoDB, log the timeout event to CloudWatch, and return a fallback response through the WebSocket.

Graceful Degradation Tiers

When timeouts occur, MangaAssist degrades service quality rather than failing completely:

Tier Trigger Response Quality Latency
Tier 0 Normal Full Sonnet response with recommendations < 3s
Tier 1 Sonnet slow Switch to Haiku mid-request 3-5s
Tier 2 All FM slow Serve cached similar response from Redis < 1s
Tier 3 Full timeout Static fallback with apology + retry button < 0.5s
Tier 4 Lambda crash API Gateway default response < 0.1s

3. IAM Policy Design for Least-Privilege FM Access

Principle of Least Privilege for Bedrock

Every MangaAssist component that touches Bedrock should have the minimum IAM permissions required. This prevents accidental or malicious use of expensive models, limits blast radius if credentials are compromised, and enforces cost boundaries at the infrastructure level.

Policy Layering Strategy

Organization SCP (Deny expensive models org-wide)
  └── Account-level policy (Allow only approved regions)
       └── Role-level policy (Allow specific models + actions)
            └── Session policy (Temporary further restrictions)
                 └── Resource tags (Cost allocation + environment)

IAM Policy: Bedrock Invoke — Production Chatbot Role

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "AllowBedrockInvokeApprovedModels",
      "Effect": "Allow",
      "Action": [
        "bedrock:InvokeModel",
        "bedrock:InvokeModelWithResponseStream"
      ],
      "Resource": [
        "arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0",
        "arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-3-haiku-20240307-v1:0"
      ],
      "Condition": {
        "StringEquals": {
          "aws:RequestedRegion": "us-east-1"
        },
        "NumericLessThanEquals": {
          "bedrock:MaxTokens": "4096"
        }
      }
    },
    {
      "Sid": "DenyExpensiveModels",
      "Effect": "Deny",
      "Action": "bedrock:InvokeModel",
      "Resource": [
        "arn:aws:bedrock:*::foundation-model/anthropic.claude-3-opus*",
        "arn:aws:bedrock:*::foundation-model/anthropic.claude-3.5-sonnet*"
      ]
    },
    {
      "Sid": "AllowBedrockGuardrails",
      "Effect": "Allow",
      "Action": [
        "bedrock:ApplyGuardrail"
      ],
      "Resource": "arn:aws:bedrock:us-east-1:123456789012:guardrail/manga-content-filter-*"
    },
    {
      "Sid": "AllowCloudWatchLogging",
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": "arn:aws:logs:us-east-1:123456789012:log-group:/aws/lambda/manga-*"
    }
  ]
}

IAM Policy: Service Control Policy (Organization-Level)

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DenyBedrockOutsideApprovedRegions",
      "Effect": "Deny",
      "Action": "bedrock:*",
      "Resource": "*",
      "Condition": {
        "StringNotEquals": {
          "aws:RequestedRegion": [
            "us-east-1",
            "us-west-2"
          ]
        }
      }
    },
    {
      "Sid": "EnforceBedrockTagging",
      "Effect": "Deny",
      "Action": "bedrock:InvokeModel",
      "Resource": "*",
      "Condition": {
        "Null": {
          "aws:RequestTag/CostCenter": "true",
          "aws:RequestTag/Environment": "true"
        }
      }
    }
  ]
}

IAM Permission Boundaries for Development Teams

Role Allowed Models Allowed Actions Max Tokens Region Lock
manga-prod-chatbot Sonnet, Haiku Invoke, InvokeStream 4096 us-east-1
manga-dev-sandbox Haiku only Invoke 1024 us-east-1
manga-eval-pipeline Sonnet, Haiku Invoke, GetModelEvaluation 8192 us-east-1, us-west-2
manga-admin All approved Full Bedrock 8192 us-east-1, us-west-2

4. Step Functions Stopping Conditions and Heartbeat Patterns

Why Stopping Conditions Matter

Step Functions state machines coordinate multi-step AI workflows in MangaAssist: retrieving context from OpenSearch, invoking Bedrock, validating output with guardrails, and writing results back. Without stopping conditions, a stuck state (e.g., Bedrock hanging) keeps the execution running indefinitely, accumulating costs at $0.025 per 1,000 state transitions.

TimeoutSeconds vs HeartbeatSeconds

TimeoutSeconds: The maximum duration a Task state will wait for completion. If the timeout expires, the state fails with a States.Timeout error that can be caught and handled.

HeartbeatSeconds: The interval within which a long-running task must send a heartbeat signal. If no heartbeat arrives within this window, the state fails with a States.Timeout error. This detects tasks that are alive but stuck.

TimeoutSeconds = 60 (total budget)
HeartbeatSeconds = 15 (must check in every 15s)

Timeline:
0s     15s     30s     45s     60s
|------|------|------|------|
  HB1    HB2    HB3    HB4   TIMEOUT
  OK     OK     MISS → States.Timeout error

Step Functions ASL: Safeguarded RAG Pipeline

{
  "Comment": "MangaAssist Safeguarded RAG Pipeline with Stopping Conditions",
  "StartAt": "ValidateInput",
  "TimeoutSeconds": 120,
  "States": {
    "ValidateInput": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-validate-input",
      "TimeoutSeconds": 5,
      "ResultPath": "$.validation",
      "Next": "CheckValidation",
      "Retry": [
        {
          "ErrorEquals": ["Lambda.ServiceException"],
          "IntervalSeconds": 1,
          "MaxAttempts": 2,
          "BackoffRate": 2.0
        }
      ],
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "Next": "HandleValidationError",
          "ResultPath": "$.error"
        }
      ]
    },
    "CheckValidation": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.validation.isValid",
          "BooleanEquals": true,
          "Next": "RetrieveContext"
        }
      ],
      "Default": "ReturnValidationError"
    },
    "RetrieveContext": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-retrieve-context",
      "TimeoutSeconds": 10,
      "HeartbeatSeconds": 5,
      "ResultPath": "$.context",
      "Next": "InvokeFoundationModel",
      "Retry": [
        {
          "ErrorEquals": ["States.Timeout"],
          "IntervalSeconds": 2,
          "MaxAttempts": 1,
          "BackoffRate": 1.0
        }
      ],
      "Catch": [
        {
          "ErrorEquals": ["States.Timeout"],
          "Next": "ServeCachedResponse",
          "ResultPath": "$.error"
        },
        {
          "ErrorEquals": ["States.ALL"],
          "Next": "HandleRetrievalError",
          "ResultPath": "$.error"
        }
      ]
    },
    "InvokeFoundationModel": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-bedrock-invoke",
      "TimeoutSeconds": 20,
      "HeartbeatSeconds": 8,
      "ResultPath": "$.fmResponse",
      "Next": "ApplyGuardrails",
      "Retry": [
        {
          "ErrorEquals": ["BedrockThrottling"],
          "IntervalSeconds": 3,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        },
        {
          "ErrorEquals": ["States.Timeout"],
          "IntervalSeconds": 5,
          "MaxAttempts": 1,
          "BackoffRate": 1.0
        }
      ],
      "Catch": [
        {
          "ErrorEquals": ["States.Timeout", "BedrockThrottling"],
          "Next": "FallbackToHaiku",
          "ResultPath": "$.error"
        },
        {
          "ErrorEquals": ["States.ALL"],
          "Next": "HandleFMError",
          "ResultPath": "$.error"
        }
      ]
    },
    "FallbackToHaiku": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-bedrock-invoke-haiku",
      "TimeoutSeconds": 10,
      "HeartbeatSeconds": 5,
      "ResultPath": "$.fmResponse",
      "Next": "ApplyGuardrails",
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "Next": "ServeCachedResponse",
          "ResultPath": "$.error"
        }
      ]
    },
    "ApplyGuardrails": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-apply-guardrails",
      "TimeoutSeconds": 5,
      "ResultPath": "$.guardrailResult",
      "Next": "CheckGuardrailResult",
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "Next": "HandleGuardrailError",
          "ResultPath": "$.error"
        }
      ]
    },
    "CheckGuardrailResult": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.guardrailResult.passed",
          "BooleanEquals": true,
          "Next": "ReturnResponse"
        }
      ],
      "Default": "ReturnFilteredResponse"
    },
    "ReturnResponse": {
      "Type": "Succeed"
    },
    "ReturnFilteredResponse": {
      "Type": "Pass",
      "Result": {
        "message": "I can help with manga recommendations. Could you rephrase your question?",
        "filtered": true
      },
      "End": true
    },
    "ServeCachedResponse": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:manga-serve-cached",
      "TimeoutSeconds": 3,
      "End": true,
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "Next": "ReturnStaticFallback",
          "ResultPath": "$.error"
        }
      ]
    },
    "ReturnStaticFallback": {
      "Type": "Pass",
      "Result": {
        "message": "Our manga assistant is temporarily busy. Please try again in a moment.",
        "fallback": true
      },
      "End": true
    },
    "ReturnValidationError": {
      "Type": "Pass",
      "Result": {
        "message": "Please provide a valid manga-related question.",
        "error": "validation_failed"
      },
      "End": true
    },
    "HandleValidationError": { "Type": "Pass", "Result": {"error": "validation_error"}, "End": true },
    "HandleRetrievalError": { "Type": "Pass", "Result": {"error": "retrieval_error"}, "End": true },
    "HandleFMError": { "Type": "Pass", "Result": {"error": "fm_error"}, "End": true },
    "HandleGuardrailError": { "Type": "Pass", "Result": {"error": "guardrail_error"}, "End": true }
  }
}

Stopping Condition Summary Table

State TimeoutSeconds HeartbeatSeconds Retry Fallback Path
ValidateInput 5 2 attempts HandleValidationError
RetrieveContext 10 5 1 attempt ServeCachedResponse
InvokeFoundationModel 20 8 3 (throttle), 1 (timeout) FallbackToHaiku
FallbackToHaiku 10 5 ServeCachedResponse
ApplyGuardrails 5 HandleGuardrailError
ServeCachedResponse 3 ReturnStaticFallback

Execution-Level vs State-Level Timeouts

The state machine itself has "TimeoutSeconds": 120 — a global budget for the entire workflow. Individual states have their own budgets. This dual-level approach ensures that even if individual retries and fallbacks chain together, the total execution never exceeds 2 minutes.


5. Token Budget Guardrails as Safety Mechanisms

The Cost Risk Without Token Budgets

At MangaAssist's scale of 1M messages/day, uncontrolled token usage is a financial risk:

Scenario Input Tokens/Msg Output Tokens/Msg Daily Cost (Sonnet) Monthly Cost
Controlled (avg) 800 400 $4,200 $126,000
Uncontrolled (prompt injection) 4,000 4,096 $64,440 $1,933,200
With token budget (capped) 800 1,024 $7,800 $234,000

Token budgets prevent a single malicious or poorly constructed prompt from consuming excessive resources.

Token Budget Strategy

Per-Request Budget:
  Input:  max 2,048 tokens (truncate context if needed)
  Output: max 1,024 tokens (set MaxTokens parameter)

Per-Session Budget (conversation):
  Total:  max 20,000 tokens across all turns
  Action: After limit, require new session

Per-User Budget (daily):
  Total:  max 100,000 tokens per user per day
  Action: After 80%, warn user; at 100%, serve cached only

System Budget (hourly):
  Total:  max 50M tokens per hour across all users
  Action: At 80%, route new requests to Haiku; at 100%, cached only

Token Counting and Enforcement Points

Enforcement Point Mechanism Latency Impact
API Gateway Request size limit (256KB) < 1ms
Lambda pre-processing tiktoken token count ~5ms
Bedrock InvokeModel max_tokens parameter None (API param)
Redis token counter Atomic increment per user/session ~1ms
CloudWatch alarm Hourly aggregation alert Async (no latency)

6. Production Python Code

CircuitBreaker Class

import time
import threading
import logging
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional, Callable, Any, Dict
from functools import wraps

logger = logging.getLogger("manga_assist.circuit_breaker")


class CircuitState(Enum):
    """Circuit breaker states for FM call protection."""
    CLOSED = "closed"        # Normal operation — requests flow through
    OPEN = "open"            # Failure detected — requests blocked
    HALF_OPEN = "half_open"  # Recovery probe — limited requests allowed


@dataclass
class CircuitBreakerConfig:
    """Configuration for a circuit breaker instance."""
    failure_threshold: int = 5          # Errors before tripping
    failure_window_seconds: float = 60  # Window to count failures
    recovery_timeout_seconds: float = 30  # Time in OPEN before HALF_OPEN
    half_open_max_probes: int = 3       # Probes allowed in HALF_OPEN
    probe_timeout_seconds: float = 5    # Timeout for probe requests
    model_id: str = ""                  # Which FM model this protects
    name: str = "default"               # Human-readable breaker name


class CircuitBreaker:
    """
    Circuit breaker for Bedrock FM calls in MangaAssist.

    Protects the system from cascading failures when Bedrock
    endpoints are degraded. Tracks failures per model and
    provides fast-fail with fallback responses.

    Usage:
        breaker = CircuitBreaker(CircuitBreakerConfig(
            failure_threshold=5,
            recovery_timeout_seconds=30,
            name="sonnet-breaker",
            model_id="anthropic.claude-3-sonnet-20240229-v1:0"
        ))

        try:
            response = breaker.call(bedrock_invoke, prompt=user_query)
        except CircuitOpenError:
            response = serve_cached_fallback(user_query)
    """

    def __init__(self, config: CircuitBreakerConfig):
        self.config = config
        self._state = CircuitState.CLOSED
        self._failure_timestamps: list[float] = []
        self._last_failure_time: float = 0
        self._open_since: float = 0
        self._half_open_probes: int = 0
        self._half_open_successes: int = 0
        self._lock = threading.Lock()
        self._metrics = {
            "total_calls": 0,
            "total_failures": 0,
            "total_rejections": 0,
            "total_fallbacks": 0,
            "state_transitions": [],
        }
        logger.info(
            f"CircuitBreaker '{config.name}' initialized for model "
            f"'{config.model_id}' — threshold={config.failure_threshold}, "
            f"recovery={config.recovery_timeout_seconds}s"
        )

    @property
    def state(self) -> CircuitState:
        """Current breaker state, auto-transitioning OPEN -> HALF_OPEN if recovery elapsed."""
        with self._lock:
            if self._state == CircuitState.OPEN:
                elapsed = time.time() - self._open_since
                if elapsed >= self.config.recovery_timeout_seconds:
                    self._transition_to(CircuitState.HALF_OPEN)
            return self._state

    def _transition_to(self, new_state: CircuitState) -> None:
        """Internal state transition with logging and metrics."""
        old_state = self._state
        self._state = new_state
        transition = {
            "from": old_state.value,
            "to": new_state.value,
            "timestamp": time.time(),
        }
        self._metrics["state_transitions"].append(transition)
        logger.warning(
            f"CircuitBreaker '{self.config.name}': "
            f"{old_state.value} -> {new_state.value}"
        )
        if new_state == CircuitState.HALF_OPEN:
            self._half_open_probes = 0
            self._half_open_successes = 0
        elif new_state == CircuitState.CLOSED:
            self._failure_timestamps.clear()

    def _count_recent_failures(self) -> int:
        """Count failures within the configured window."""
        cutoff = time.time() - self.config.failure_window_seconds
        self._failure_timestamps = [
            t for t in self._failure_timestamps if t > cutoff
        ]
        return len(self._failure_timestamps)

    def call(self, func: Callable, *args, **kwargs) -> Any:
        """
        Execute a function through the circuit breaker.

        Args:
            func: The function to call (e.g., bedrock_invoke)
            *args, **kwargs: Arguments to pass to the function

        Returns:
            The function's return value if successful

        Raises:
            CircuitOpenError: If the breaker is OPEN and blocking requests
        """
        current_state = self.state  # Triggers auto-transition check
        self._metrics["total_calls"] += 1

        if current_state == CircuitState.OPEN:
            self._metrics["total_rejections"] += 1
            logger.info(
                f"CircuitBreaker '{self.config.name}': "
                f"OPEN — rejecting request (recovery in "
                f"{self._time_until_half_open():.1f}s)"
            )
            raise CircuitOpenError(
                breaker_name=self.config.name,
                model_id=self.config.model_id,
                time_until_recovery=self._time_until_half_open(),
            )

        if current_state == CircuitState.HALF_OPEN:
            with self._lock:
                if self._half_open_probes >= self.config.half_open_max_probes:
                    raise CircuitOpenError(
                        breaker_name=self.config.name,
                        model_id=self.config.model_id,
                        time_until_recovery=0,
                    )
                self._half_open_probes += 1

        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure(e)
            raise

    def _on_success(self) -> None:
        """Handle successful call — potentially close the breaker."""
        with self._lock:
            if self._state == CircuitState.HALF_OPEN:
                self._half_open_successes += 1
                if self._half_open_successes >= self.config.half_open_max_probes:
                    self._transition_to(CircuitState.CLOSED)
                    logger.info(
                        f"CircuitBreaker '{self.config.name}': "
                        f"Recovery confirmed — back to CLOSED"
                    )

    def _on_failure(self, error: Exception) -> None:
        """Handle failed call — potentially open the breaker."""
        with self._lock:
            now = time.time()
            self._failure_timestamps.append(now)
            self._last_failure_time = now
            self._metrics["total_failures"] += 1

            if self._state == CircuitState.HALF_OPEN:
                self._transition_to(CircuitState.OPEN)
                self._open_since = now
                logger.warning(
                    f"CircuitBreaker '{self.config.name}': "
                    f"Probe failed — returning to OPEN"
                )
            elif self._state == CircuitState.CLOSED:
                recent = self._count_recent_failures()
                if recent >= self.config.failure_threshold:
                    self._transition_to(CircuitState.OPEN)
                    self._open_since = now
                    logger.error(
                        f"CircuitBreaker '{self.config.name}': "
                        f"Threshold reached ({recent} failures) — OPEN"
                    )

    def _time_until_half_open(self) -> float:
        """Seconds remaining until OPEN -> HALF_OPEN transition."""
        if self._state != CircuitState.OPEN:
            return 0
        elapsed = time.time() - self._open_since
        return max(0, self.config.recovery_timeout_seconds - elapsed)

    def get_metrics(self) -> Dict[str, Any]:
        """Return current metrics for monitoring."""
        return {
            **self._metrics,
            "current_state": self._state.value,
            "breaker_name": self.config.name,
            "model_id": self.config.model_id,
            "recent_failures": self._count_recent_failures(),
        }

    def force_open(self) -> None:
        """Manually trip the breaker (e.g., from admin console)."""
        with self._lock:
            self._transition_to(CircuitState.OPEN)
            self._open_since = time.time()

    def force_close(self) -> None:
        """Manually reset the breaker (e.g., after confirming recovery)."""
        with self._lock:
            self._transition_to(CircuitState.CLOSED)


class CircuitOpenError(Exception):
    """Raised when a circuit breaker is OPEN and blocking requests."""
    def __init__(self, breaker_name: str, model_id: str, time_until_recovery: float):
        self.breaker_name = breaker_name
        self.model_id = model_id
        self.time_until_recovery = time_until_recovery
        super().__init__(
            f"Circuit breaker '{breaker_name}' is OPEN for model '{model_id}'. "
            f"Recovery in {time_until_recovery:.1f}s"
        )

TimeoutManager Class

import signal
import asyncio
import time
import logging
from dataclasses import dataclass
from typing import Optional, Callable, Any, Dict
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError

logger = logging.getLogger("manga_assist.timeout_manager")


@dataclass
class TimeoutConfig:
    """Timeout configuration for a Lambda function or service call."""
    hard_timeout_seconds: float      # Lambda/system hard limit
    soft_timeout_pct: float = 0.80   # Soft timeout as fraction of hard
    grace_period_seconds: float = 2  # Time to write state + return fallback
    name: str = "default"

    @property
    def soft_timeout_seconds(self) -> float:
        return self.hard_timeout_seconds * self.soft_timeout_pct

    @property
    def effective_work_seconds(self) -> float:
        return self.soft_timeout_seconds - self.grace_period_seconds


class TimeoutManager:
    """
    Manages soft and hard timeouts for MangaAssist Lambda functions.

    Provides graceful degradation by firing a soft timeout before the
    Lambda hard limit, giving the function time to return a fallback
    response rather than being killed mid-execution.

    Usage:
        tm = TimeoutManager(TimeoutConfig(
            hard_timeout_seconds=15,
            soft_timeout_pct=0.80,
            name="bedrock-invoke"
        ))

        result = tm.execute_with_timeout(
            func=invoke_bedrock,
            fallback_func=serve_cached_response,
            prompt=user_query
        )
    """

    def __init__(self, config: TimeoutConfig):
        self.config = config
        self._executor = ThreadPoolExecutor(max_workers=2)
        self._metrics = {
            "total_calls": 0,
            "soft_timeouts": 0,
            "hard_timeouts": 0,
            "successful_calls": 0,
            "fallback_served": 0,
            "avg_duration_ms": 0,
        }
        self._total_duration_ms: float = 0
        logger.info(
            f"TimeoutManager '{config.name}' initialized — "
            f"hard={config.hard_timeout_seconds}s, "
            f"soft={config.soft_timeout_seconds:.1f}s, "
            f"effective_work={config.effective_work_seconds:.1f}s"
        )

    def execute_with_timeout(
        self,
        func: Callable,
        fallback_func: Optional[Callable] = None,
        *args,
        **kwargs,
    ) -> Dict[str, Any]:
        """
        Execute a function with soft timeout and fallback.

        Returns:
            Dict with 'result', 'source' ('primary'|'fallback'|'error'),
            and 'duration_ms'.
        """
        self._metrics["total_calls"] += 1
        start_time = time.time()

        try:
            future = self._executor.submit(func, *args, **kwargs)
            result = future.result(timeout=self.config.soft_timeout_seconds)
            duration_ms = (time.time() - start_time) * 1000
            self._record_success(duration_ms)
            return {
                "result": result,
                "source": "primary",
                "duration_ms": round(duration_ms, 2),
                "timeout_budget_remaining_ms": round(
                    (self.config.hard_timeout_seconds * 1000) - duration_ms, 2
                ),
            }
        except FuturesTimeoutError:
            self._metrics["soft_timeouts"] += 1
            elapsed = time.time() - start_time
            logger.warning(
                f"TimeoutManager '{self.config.name}': Soft timeout at "
                f"{elapsed:.2f}s — invoking fallback"
            )
            if fallback_func:
                return self._execute_fallback(fallback_func, start_time, *args, **kwargs)
            return {
                "result": None,
                "source": "error",
                "error": "soft_timeout",
                "duration_ms": round(elapsed * 1000, 2),
            }
        except Exception as e:
            elapsed = time.time() - start_time
            logger.error(
                f"TimeoutManager '{self.config.name}': Error after "
                f"{elapsed:.2f}s — {type(e).__name__}: {e}"
            )
            if fallback_func:
                return self._execute_fallback(fallback_func, start_time, *args, **kwargs)
            return {
                "result": None,
                "source": "error",
                "error": str(e),
                "duration_ms": round(elapsed * 1000, 2),
            }

    def _execute_fallback(
        self,
        fallback_func: Callable,
        start_time: float,
        *args,
        **kwargs,
    ) -> Dict[str, Any]:
        """Execute the fallback function within the grace period."""
        try:
            grace_deadline = self.config.grace_period_seconds
            future = self._executor.submit(fallback_func, *args, **kwargs)
            result = future.result(timeout=grace_deadline)
            duration_ms = (time.time() - start_time) * 1000
            self._metrics["fallback_served"] += 1
            return {
                "result": result,
                "source": "fallback",
                "duration_ms": round(duration_ms, 2),
            }
        except Exception as e:
            duration_ms = (time.time() - start_time) * 1000
            logger.error(
                f"TimeoutManager '{self.config.name}': Fallback also failed — {e}"
            )
            return {
                "result": {"message": "Service temporarily unavailable. Please retry."},
                "source": "error",
                "error": f"fallback_failed: {e}",
                "duration_ms": round(duration_ms, 2),
            }

    def _record_success(self, duration_ms: float) -> None:
        self._metrics["successful_calls"] += 1
        self._total_duration_ms += duration_ms
        self._metrics["avg_duration_ms"] = round(
            self._total_duration_ms / self._metrics["successful_calls"], 2
        )

    def get_metrics(self) -> Dict[str, Any]:
        return {**self._metrics, "name": self.config.name}


class StreamingTimeoutManager:
    """
    Timeout manager for Bedrock streaming responses.

    Tracks three timeout types:
    - First byte timeout: max time to receive first chunk
    - Inter-chunk timeout: max gap between consecutive chunks
    - Total stream timeout: max time for complete response
    """

    def __init__(
        self,
        first_byte_timeout: float = 5.0,
        inter_chunk_timeout: float = 2.0,
        total_timeout: float = 30.0,
        name: str = "streaming",
    ):
        self.first_byte_timeout = first_byte_timeout
        self.inter_chunk_timeout = inter_chunk_timeout
        self.total_timeout = total_timeout
        self.name = name

    async def consume_stream(self, stream_iterator) -> Dict[str, Any]:
        """Consume a Bedrock streaming response with timeout enforcement."""
        chunks = []
        start_time = time.time()
        last_chunk_time = start_time
        first_chunk_received = False

        try:
            async for chunk in stream_iterator:
                now = time.time()
                total_elapsed = now - start_time

                if not first_chunk_received:
                    if total_elapsed > self.first_byte_timeout:
                        raise TimeoutError(
                            f"First byte timeout: {self.first_byte_timeout}s exceeded"
                        )
                    first_chunk_received = True
                else:
                    gap = now - last_chunk_time
                    if gap > self.inter_chunk_timeout:
                        raise TimeoutError(
                            f"Inter-chunk timeout: {gap:.2f}s > {self.inter_chunk_timeout}s"
                        )

                if total_elapsed > self.total_timeout:
                    logger.warning(
                        f"StreamingTimeoutManager '{self.name}': "
                        f"Total timeout reached at {total_elapsed:.2f}s — "
                        f"returning partial ({len(chunks)} chunks)"
                    )
                    break

                chunks.append(chunk)
                last_chunk_time = now

            return {
                "chunks": chunks,
                "complete": total_elapsed <= self.total_timeout,
                "total_chunks": len(chunks),
                "duration_ms": round((time.time() - start_time) * 1000, 2),
            }
        except TimeoutError as e:
            return {
                "chunks": chunks,
                "complete": False,
                "error": str(e),
                "total_chunks": len(chunks),
                "duration_ms": round((time.time() - start_time) * 1000, 2),
            }

SafeguardOrchestrator Class

import json
import time
import logging
import hashlib
from dataclasses import dataclass, field
from typing import Optional, Dict, Any, List

import boto3
import redis

logger = logging.getLogger("manga_assist.safeguard_orchestrator")


@dataclass
class TokenBudget:
    """Token budget limits for various scopes."""
    max_input_tokens: int = 2048
    max_output_tokens: int = 1024
    max_session_tokens: int = 20_000
    max_user_daily_tokens: int = 100_000
    max_system_hourly_tokens: int = 50_000_000
    warn_threshold_pct: float = 0.80


@dataclass
class SafeguardConfig:
    """Complete safeguard configuration for MangaAssist."""
    token_budget: TokenBudget = field(default_factory=TokenBudget)
    primary_model: str = "anthropic.claude-3-sonnet-20240229-v1:0"
    fallback_model: str = "anthropic.claude-3-haiku-20240307-v1:0"
    redis_host: str = "manga-cache.abc123.ng.0001.use1.cache.amazonaws.com"
    redis_port: int = 6379
    cache_ttl_seconds: int = 300
    region: str = "us-east-1"


class SafeguardOrchestrator:
    """
    Orchestrates all safeguard mechanisms for MangaAssist FM calls.

    Combines circuit breakers, timeout management, token budgets,
    and IAM-aware invocation into a single entry point. Every FM
    request flows through this orchestrator.

    Flow:
      1. Check token budget (user, session, system)
      2. Check circuit breaker state
      3. Truncate input if over token limit
      4. Invoke FM with timeout management
      5. Record token usage
      6. Return result or fallback

    Usage:
        orchestrator = SafeguardOrchestrator(config)
        result = orchestrator.process_request(
            user_id="user-123",
            session_id="sess-456",
            prompt="Recommend manga like One Piece",
            context_docs=retrieved_docs,
        )
    """

    def __init__(self, config: SafeguardConfig):
        self.config = config
        self._bedrock = boto3.client("bedrock-runtime", region_name=config.region)
        self._redis = redis.Redis(
            host=config.redis_host,
            port=config.redis_port,
            decode_responses=True,
        )

        # Circuit breakers — one per model
        self._breakers = {
            config.primary_model: CircuitBreaker(CircuitBreakerConfig(
                failure_threshold=5,
                recovery_timeout_seconds=30,
                name="sonnet-breaker",
                model_id=config.primary_model,
            )),
            config.fallback_model: CircuitBreaker(CircuitBreakerConfig(
                failure_threshold=8,
                recovery_timeout_seconds=20,
                name="haiku-breaker",
                model_id=config.fallback_model,
            )),
        }

        # Timeout manager for FM calls
        self._timeout_mgr = TimeoutManager(TimeoutConfig(
            hard_timeout_seconds=15,
            soft_timeout_pct=0.80,
            name="fm-invoke",
        ))

        logger.info("SafeguardOrchestrator initialized with full safeguard stack")

    def process_request(
        self,
        user_id: str,
        session_id: str,
        prompt: str,
        context_docs: Optional[List[str]] = None,
    ) -> Dict[str, Any]:
        """
        Process a user request through all safeguard layers.

        Returns:
            Dict with 'response', 'model_used', 'tokens_used',
            'safeguard_actions', and 'duration_ms'.
        """
        start_time = time.time()
        safeguard_actions = []

        # --- Step 1: Token budget check ---
        budget_status = self._check_token_budget(user_id, session_id)
        if budget_status["blocked"]:
            safeguard_actions.append(f"token_budget_exceeded: {budget_status['reason']}")
            return self._serve_budget_exceeded_response(
                user_id, budget_status, safeguard_actions, start_time
            )
        if budget_status.get("warning"):
            safeguard_actions.append(f"token_budget_warning: {budget_status['warning']}")

        # --- Step 2: Prepare input with token truncation ---
        prepared_prompt, truncated = self._prepare_prompt(prompt, context_docs)
        if truncated:
            safeguard_actions.append("input_truncated_to_budget")

        # --- Step 3: Try primary model through circuit breaker ---
        model_used = self.config.primary_model
        try:
            breaker = self._breakers[model_used]
            result = self._timeout_mgr.execute_with_timeout(
                func=lambda: breaker.call(
                    self._invoke_bedrock, model_used, prepared_prompt
                ),
                fallback_func=lambda: self._try_fallback_model(prepared_prompt),
            )
        except CircuitOpenError as e:
            safeguard_actions.append(f"circuit_open: {e.breaker_name}")
            result = self._try_fallback_model(prepared_prompt)
            if result is None:
                return self._serve_cached_response(
                    prompt, safeguard_actions, start_time
                )
            model_used = self.config.fallback_model

        # --- Step 4: Process result ---
        if result.get("source") == "error":
            safeguard_actions.append("all_models_failed")
            return self._serve_cached_response(prompt, safeguard_actions, start_time)

        response_text = result.get("result", {}).get("text", "")
        tokens_used = result.get("result", {}).get("usage", {})

        # --- Step 5: Record token usage ---
        self._record_token_usage(user_id, session_id, tokens_used)

        duration_ms = (time.time() - start_time) * 1000
        return {
            "response": response_text,
            "model_used": model_used,
            "tokens_used": tokens_used,
            "safeguard_actions": safeguard_actions,
            "source": result.get("source", "primary"),
            "duration_ms": round(duration_ms, 2),
        }

    def _check_token_budget(self, user_id: str, session_id: str) -> Dict[str, Any]:
        """Check all token budget scopes and return status."""
        budget = self.config.token_budget

        # Check user daily budget
        user_key = f"tokens:user:{user_id}:daily"
        user_tokens = int(self._redis.get(user_key) or 0)
        if user_tokens >= budget.max_user_daily_tokens:
            return {"blocked": True, "reason": "user_daily_limit_exceeded",
                    "used": user_tokens, "limit": budget.max_user_daily_tokens}

        # Check session budget
        session_key = f"tokens:session:{session_id}"
        session_tokens = int(self._redis.get(session_key) or 0)
        if session_tokens >= budget.max_session_tokens:
            return {"blocked": True, "reason": "session_limit_exceeded",
                    "used": session_tokens, "limit": budget.max_session_tokens}

        # Check system hourly budget
        system_key = f"tokens:system:hourly:{int(time.time() // 3600)}"
        system_tokens = int(self._redis.get(system_key) or 0)
        if system_tokens >= budget.max_system_hourly_tokens:
            return {"blocked": True, "reason": "system_hourly_limit_exceeded",
                    "used": system_tokens, "limit": budget.max_system_hourly_tokens}

        # Check for warnings
        warning = None
        user_pct = user_tokens / budget.max_user_daily_tokens
        if user_pct >= budget.warn_threshold_pct:
            warning = f"user_daily_at_{int(user_pct * 100)}%"

        return {"blocked": False, "warning": warning}

    def _prepare_prompt(
        self, prompt: str, context_docs: Optional[List[str]]
    ) -> tuple:
        """Prepare and optionally truncate the prompt to fit token budget."""
        max_tokens = self.config.token_budget.max_input_tokens
        full_prompt = prompt
        if context_docs:
            context_text = "\n\n".join(context_docs)
            full_prompt = (
                f"Context:\n{context_text}\n\n"
                f"User question: {prompt}"
            )

        estimated_tokens = len(full_prompt.split()) * 1.3
        if estimated_tokens > max_tokens:
            char_limit = int(max_tokens * 3.5)
            full_prompt = full_prompt[:char_limit] + "\n[Context truncated for safety]"
            return full_prompt, True
        return full_prompt, False

    def _invoke_bedrock(self, model_id: str, prompt: str) -> Dict[str, Any]:
        """Invoke Bedrock with the given model and prompt."""
        body = json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": self.config.token_budget.max_output_tokens,
            "messages": [{"role": "user", "content": prompt}],
        })
        response = self._bedrock.invoke_model(
            modelId=model_id,
            contentType="application/json",
            accept="application/json",
            body=body,
        )
        result = json.loads(response["body"].read())
        return {
            "text": result["content"][0]["text"],
            "usage": result.get("usage", {}),
        }

    def _try_fallback_model(self, prompt: str) -> Optional[Dict[str, Any]]:
        """Attempt the fallback model through its own circuit breaker."""
        fallback = self.config.fallback_model
        try:
            breaker = self._breakers[fallback]
            result = breaker.call(self._invoke_bedrock, fallback, prompt)
            return {"result": result, "source": "fallback"}
        except (CircuitOpenError, Exception) as e:
            logger.error(f"Fallback model also failed: {e}")
            return None

    def _serve_cached_response(
        self, prompt: str, actions: List[str], start_time: float
    ) -> Dict[str, Any]:
        """Serve a cached response from Redis as last resort."""
        cache_key = f"cache:response:{hashlib.md5(prompt.encode()).hexdigest()[:16]}"
        cached = self._redis.get(cache_key)
        duration_ms = (time.time() - start_time) * 1000
        actions.append("served_cached_response")
        if cached:
            return {
                "response": cached,
                "model_used": "cache",
                "tokens_used": {},
                "safeguard_actions": actions,
                "source": "cache",
                "duration_ms": round(duration_ms, 2),
            }
        actions.append("no_cache_available")
        return {
            "response": "Our manga assistant is temporarily busy. Please try again shortly.",
            "model_used": "static_fallback",
            "tokens_used": {},
            "safeguard_actions": actions,
            "source": "static_fallback",
            "duration_ms": round(duration_ms, 2),
        }

    def _serve_budget_exceeded_response(
        self, user_id: str, budget_status: Dict, actions: List[str], start_time: float
    ) -> Dict[str, Any]:
        duration_ms = (time.time() - start_time) * 1000
        return {
            "response": (
                "You have reached your usage limit for this period. "
                "Please try again later or start a new session."
            ),
            "model_used": "none",
            "tokens_used": {},
            "safeguard_actions": actions,
            "source": "budget_exceeded",
            "budget_status": budget_status,
            "duration_ms": round(duration_ms, 2),
        }

    def _record_token_usage(
        self, user_id: str, session_id: str, usage: Dict[str, int]
    ) -> None:
        """Record token usage across all budget scopes in Redis."""
        total = usage.get("input_tokens", 0) + usage.get("output_tokens", 0)
        if total == 0:
            return

        pipe = self._redis.pipeline()
        # User daily budget
        user_key = f"tokens:user:{user_id}:daily"
        pipe.incrby(user_key, total)
        pipe.expire(user_key, 86400)  # 24 hours

        # Session budget
        session_key = f"tokens:session:{session_id}"
        pipe.incrby(session_key, total)
        pipe.expire(session_key, 3600)  # 1 hour

        # System hourly budget
        system_key = f"tokens:system:hourly:{int(time.time() // 3600)}"
        pipe.incrby(system_key, total)
        pipe.expire(system_key, 7200)  # 2 hours (buffer)

        pipe.execute()

    def get_system_health(self) -> Dict[str, Any]:
        """Return health status of all safeguard components."""
        return {
            "circuit_breakers": {
                name: breaker.get_metrics()
                for name, breaker in self._breakers.items()
            },
            "timeout_manager": self._timeout_mgr.get_metrics(),
            "timestamp": time.time(),
        }

IAMPolicyGenerator Class

import json
import logging
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional

logger = logging.getLogger("manga_assist.iam_policy_generator")


@dataclass
class IAMPolicyRequest:
    """Request parameters for generating a scoped IAM policy."""
    role_name: str
    environment: str                     # "production", "staging", "development"
    allowed_models: List[str]            # Model IDs allowed
    allowed_actions: List[str]           # Bedrock actions allowed
    allowed_regions: List[str]           # AWS regions allowed
    max_tokens: Optional[int] = None     # MaxTokens condition
    account_id: str = "123456789012"
    require_tags: bool = True
    tags: Dict[str, str] = field(default_factory=lambda: {
        "Project": "MangaAssist",
        "CostCenter": "ai-chatbot",
    })


class IAMPolicyGenerator:
    """
    Generates least-privilege IAM policies for MangaAssist Bedrock access.

    Creates role policies, permission boundaries, and SCPs that enforce
    model-level, action-level, and region-level restrictions on FM access.

    Usage:
        generator = IAMPolicyGenerator()
        policy = generator.generate_role_policy(IAMPolicyRequest(
            role_name="manga-prod-chatbot",
            environment="production",
            allowed_models=[
                "anthropic.claude-3-sonnet-20240229-v1:0",
                "anthropic.claude-3-haiku-20240307-v1:0",
            ],
            allowed_actions=["bedrock:InvokeModel", "bedrock:InvokeModelWithResponseStream"],
            allowed_regions=["us-east-1"],
            max_tokens=4096,
        ))
    """

    # Models considered expensive and blocked by default
    EXPENSIVE_MODELS = [
        "anthropic.claude-3-opus*",
        "anthropic.claude-3.5-sonnet*",
        "amazon.titan-text-premier*",
        "meta.llama3-1-405b*",
    ]

    def generate_role_policy(self, request: IAMPolicyRequest) -> Dict[str, Any]:
        """Generate a complete IAM role policy document."""
        statements = []

        # Allow statement for approved models
        statements.append(self._allow_approved_models(request))

        # Deny statement for expensive models
        statements.append(self._deny_expensive_models(request))

        # Allow guardrail access
        statements.append(self._allow_guardrails(request))

        # Allow CloudWatch logging
        statements.append(self._allow_logging(request))

        # Allow DynamoDB session access
        if request.environment == "production":
            statements.append(self._allow_dynamodb_sessions(request))

        # Allow ElastiCache access
        if request.environment == "production":
            statements.append(self._allow_elasticache(request))

        policy = {
            "Version": "2012-10-17",
            "Statement": statements,
        }

        logger.info(
            f"Generated IAM policy for role '{request.role_name}' — "
            f"{len(statements)} statements, "
            f"{len(request.allowed_models)} models, "
            f"{len(request.allowed_regions)} regions"
        )
        return policy

    def _allow_approved_models(self, request: IAMPolicyRequest) -> Dict[str, Any]:
        """Build Allow statement for approved Bedrock models."""
        resources = []
        for model_id in request.allowed_models:
            for region in request.allowed_regions:
                resources.append(
                    f"arn:aws:bedrock:{region}::foundation-model/{model_id}"
                )

        statement = {
            "Sid": f"AllowBedrockInvoke{request.environment.title()}",
            "Effect": "Allow",
            "Action": request.allowed_actions,
            "Resource": resources,
            "Condition": {
                "StringEquals": {
                    "aws:RequestedRegion": request.allowed_regions,
                },
            },
        }

        if request.max_tokens:
            statement["Condition"]["NumericLessThanEquals"] = {
                "bedrock:MaxTokens": str(request.max_tokens)
            }

        return statement

    def _deny_expensive_models(self, request: IAMPolicyRequest) -> Dict[str, Any]:
        """Build Deny statement for expensive models."""
        resources = [
            f"arn:aws:bedrock:*::foundation-model/{model}"
            for model in self.EXPENSIVE_MODELS
        ]
        return {
            "Sid": "DenyExpensiveModels",
            "Effect": "Deny",
            "Action": "bedrock:InvokeModel",
            "Resource": resources,
        }

    def _allow_guardrails(self, request: IAMPolicyRequest) -> Dict[str, Any]:
        """Allow access to Bedrock Guardrails."""
        return {
            "Sid": "AllowBedrockGuardrails",
            "Effect": "Allow",
            "Action": ["bedrock:ApplyGuardrail"],
            "Resource": (
                f"arn:aws:bedrock:{request.allowed_regions[0]}:"
                f"{request.account_id}:guardrail/manga-*"
            ),
        }

    def _allow_logging(self, request: IAMPolicyRequest) -> Dict[str, Any]:
        """Allow CloudWatch log access for Lambda functions."""
        return {
            "Sid": "AllowCloudWatchLogs",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents",
            ],
            "Resource": (
                f"arn:aws:logs:{request.allowed_regions[0]}:"
                f"{request.account_id}:log-group:/aws/lambda/manga-*"
            ),
        }

    def _allow_dynamodb_sessions(self, request: IAMPolicyRequest) -> Dict[str, Any]:
        """Allow DynamoDB access for session management."""
        return {
            "Sid": "AllowDynamoDBSessions",
            "Effect": "Allow",
            "Action": [
                "dynamodb:GetItem",
                "dynamodb:PutItem",
                "dynamodb:UpdateItem",
                "dynamodb:Query",
            ],
            "Resource": (
                f"arn:aws:dynamodb:{request.allowed_regions[0]}:"
                f"{request.account_id}:table/manga-sessions*"
            ),
        }

    def _allow_elasticache(self, request: IAMPolicyRequest) -> Dict[str, Any]:
        """Allow ElastiCache access for caching responses."""
        return {
            "Sid": "AllowElastiCacheAccess",
            "Effect": "Allow",
            "Action": [
                "elasticache:Connect",
            ],
            "Resource": (
                f"arn:aws:elasticache:{request.allowed_regions[0]}:"
                f"{request.account_id}:replicationgroup:manga-cache-*"
            ),
        }

    def generate_permission_boundary(
        self, allowed_regions: List[str]
    ) -> Dict[str, Any]:
        """Generate a permission boundary that caps maximum possible permissions."""
        return {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Sid": "BoundaryAllowBedrock",
                    "Effect": "Allow",
                    "Action": [
                        "bedrock:InvokeModel",
                        "bedrock:InvokeModelWithResponseStream",
                        "bedrock:ApplyGuardrail",
                    ],
                    "Resource": "*",
                    "Condition": {
                        "StringEquals": {
                            "aws:RequestedRegion": allowed_regions,
                        }
                    },
                },
                {
                    "Sid": "BoundaryAllowSupporting",
                    "Effect": "Allow",
                    "Action": [
                        "logs:*",
                        "dynamodb:GetItem",
                        "dynamodb:PutItem",
                        "dynamodb:UpdateItem",
                        "dynamodb:Query",
                        "elasticache:Connect",
                    ],
                    "Resource": "*",
                },
                {
                    "Sid": "BoundaryDenyAll",
                    "Effect": "Deny",
                    "Action": [
                        "iam:*",
                        "organizations:*",
                        "account:*",
                    ],
                    "Resource": "*",
                },
            ],
        }

    def generate_scp(self, allowed_regions: List[str]) -> Dict[str, Any]:
        """Generate an Organization-level Service Control Policy."""
        return {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Sid": "DenyBedrockOutsideRegions",
                    "Effect": "Deny",
                    "Action": "bedrock:*",
                    "Resource": "*",
                    "Condition": {
                        "StringNotEquals": {
                            "aws:RequestedRegion": allowed_regions,
                        }
                    },
                },
                {
                    "Sid": "DenyBedrockWithoutTags",
                    "Effect": "Deny",
                    "Action": "bedrock:InvokeModel",
                    "Resource": "*",
                    "Condition": {
                        "Null": {
                            "aws:RequestTag/CostCenter": "true",
                        }
                    },
                },
            ],
        }

    def policy_to_json(self, policy: Dict[str, Any], indent: int = 2) -> str:
        """Serialize a policy to formatted JSON."""
        return json.dumps(policy, indent=indent, default=str)

7. Comparison Tables

Circuit Breaker vs Retry vs Timeout — When to Use Each

Pattern Purpose Protects Against MangaAssist Use Case
Circuit Breaker Stop calling a failing service Cascading failures, resource exhaustion Bedrock endpoint degradation
Retry with Backoff Recover from transient errors Throttling, network blips ThrottlingException from Bedrock
Timeout Bound execution time Hanging requests, slow responses Lambda + Bedrock latency control
Token Budget Cap resource consumption Cost overruns, abuse Per-user and system-wide spend caps
IAM Boundary Restrict access scope Unauthorized model use, credential leak Least-privilege FM access
Stopping Condition Limit workflow execution Runaway state machines, infinite loops Step Functions timeout/heartbeat

Safeguard Implementation Complexity vs Risk Reduction

Safeguard Implementation Effort Risk Reduced Cost to Implement Monthly Savings
Circuit Breaker Medium (2-3 days) Cascading failure ~$2K dev time $10K-50K avoided downtime
Lambda Timeout Low (0.5 day) Hanging functions ~$500 dev time $1K-5K wasted compute
IAM Policies Low (1 day) Unauthorized access ~$1K dev time $5K-100K credential abuse
Step Functions Stops Medium (1-2 days) Runaway workflows ~$1.5K dev time $2K-20K execution costs
Token Budgets Medium (2 days) Cost overruns ~$2K dev time $50K-200K prompt abuse
Full Safeguard Stack High (1-2 weeks) All of the above ~$10K dev time $70K-375K combined

Timeout Values Across the MangaAssist Stack

Component Timeout Configurable Via Impact if Too Low Impact if Too High
API Gateway WebSocket 10 min idle Stage settings User reconnects frequently Zombie connections accumulate
API Gateway Integration 29 sec Integration settings Complex queries fail Users wait too long
Lambda hard timeout 15 sec Function config Sonnet responses cut off Costs increase, latency rises
Lambda soft timeout 12 sec Application code Less time for fallback Grace period too short
Bedrock SDK read 60 sec boto3 config Long outputs fail Holds connection too long
Bedrock SDK connect 10 sec boto3 config Intermittent failures Delayed error detection
Circuit breaker recovery 30 sec Application code Probes too early Extended outage for users
Redis cache TTL 300 sec Application code Cache misses increase Stale responses served
Step Functions state 5-20 sec ASL definition Legitimate work killed Dead states linger
Step Functions execution 120 sec ASL definition Complex RAG fails Runaway cost risk

8. Cost and Risk Analysis

Cost of NOT Having Safeguards (Monthly at 1M msgs/day)

Failure Scenario Without Safeguard With Safeguard Monthly Savings
Bedrock outage cascades to all services 4h downtime = $120K revenue loss 30s failover to cache ~$118K
Prompt injection generates max tokens $1.9M token costs $234K (capped) ~$1.7M
Runaway Step Functions execution $15K/day execution costs Auto-terminates at 120s ~$14.5K
Compromised credentials invoke Opus $450K/day model costs IAM denies access ~$450K
Lambda functions hang indefinitely $8K/day compute waste 15s timeout kills ~$7.5K

Safeguard Monitoring Dashboard Metrics

Metric CloudWatch Alarm Threshold Action
Circuit breaker OPEN events > 3 in 5 minutes Page on-call engineer
Soft timeout rate > 10% of requests Scale Bedrock provisioned throughput
Token budget warnings > 100 users/hour at 80% Investigate potential abuse
IAM deny events > 0 in production Security incident review
Step Functions timeout rate > 5% of executions Review state machine configuration
Fallback response rate > 20% of responses Investigate Bedrock health

Key Takeaways

  1. Circuit breakers are non-negotiable for FM calls — Bedrock is an external dependency with variable availability. The closed/open/half-open pattern isolates failures and maintains MangaAssist's 3-second latency target by fast-failing to cached responses during outages.

  2. Soft timeouts beat hard timeouts for user experience — Setting application-level timeouts at 80% of the Lambda hard limit provides a grace period to write state, log context, and return a fallback response instead of letting Lambda kill the execution mid-stream.

  3. IAM policies enforce cost boundaries at infrastructure level — Least-privilege policies that restrict specific model ARNs, regions, and max tokens prevent both accidental expensive model usage and credential compromise scenarios that could cost hundreds of thousands monthly.

  4. Step Functions need both TimeoutSeconds and HeartbeatSeconds — TimeoutSeconds caps total state duration while HeartbeatSeconds detects tasks that are alive but stuck. Together they prevent runaway execution costs at $0.025 per 1,000 state transitions.

  5. Token budgets operate at multiple scopes — Per-request, per-session, per-user-daily, and system-hourly budgets create layered defense against cost overruns. Redis atomic counters enforce these in real-time with sub-millisecond overhead.

  6. The full safeguard stack costs ~$10K to implement but prevents $70K-375K monthly in potential losses — Circuit breakers, timeouts, IAM policies, stopping conditions, and token budgets together provide defense-in-depth that protects both system reliability and financial health at MangaAssist's 1M messages/day scale.